""" Create splits of the data into train and test data used for cross-validation """
import random
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.missions.nodes.decorators import NoOptimizationParameter
from pySPACE.tools.memoize_generator import MemoizeGenerator
import logging
@NoOptimizationParameter("stratified")
@NoOptimizationParameter("random")
@NoOptimizationParameter("time_dependent")
[docs]class CrossValidationSplitterNode(BaseNode):
""" Perform (stratified) cross-validation
During benchmarking, n pairs of training and test data are generated, where
n is configurable via the parameter splits. The n test datasets are pairwise
disjunct. Internally, the available data is partitioned into n pairwise
disjunct sets s_1, ..., s_n of equal size (the "splits"). The i-th pair of
training and test data is generated by using s_i as test data and the
union of the remaining datasets as training data.
The partitioning is stratified per default, i.e. the splits have the same
class ratio as the overall dataset. Per default, the partitioning is based
on shuffling the data randomly. In this case, the partitioning of the data
into s_1, ..., s_n is determined solely based on the run number (used as
random seed), yielding the same split for the same run_number and different
ones for two different run_numbers.
**Parameters**
:splits:
The number of splits created internally. If n data points exist and
m splits are created, each of these splits consists of approx. m/n
data points.
(*optional, default: 10*)
:stratified:
If true, the cross-validation is stratified, i.e. the overall
class-ratio is retained in each split (as good as possible).
(*optional, default: True*)
:random:
If true, the order of the data is randomly shuffled.
(*optional, default: True*)
:time_dependent:
If True splitting is done separately for different (= not
overlapping) time windows to ensure that instances corresponding to the
same marker will be in the same split.
.. note:: Stratification is only allowed here if there is only one
class label for one marker.
(*optional, default: False*)
:stratified_class:
If *time_dependent* is True and *stratified_class* is specified
stratification is only done for the specified class label (String).
The other class is filling the split preserving the time order of the
data. This also means that *random* has no effect here.
(*optional, default: None*)
**Exemplary Call**
.. code-block:: yaml
-
node : CV_Splitter
parameters :
splits : 10
stratified : True
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/12/16
"""
[docs] def __init__(self, splits=10, stratified=True, random=True,
time_dependent=False, stratified_class = None, *args, **kwargs):
super(CrossValidationSplitterNode, self).__init__(*args, **kwargs)
self.set_permanent_attributes(splits = int(splits), #how many splits
current_split = 0, # current split for testing
split_indices = None,
run_number = -1,
random = random,
stratified = stratified,
stratified_class = stratified_class,
time_dependent = time_dependent)
[docs] def is_split_node(self):
""" Return whether this is a split node """
return True
[docs] def use_next_split(self):
""" Use the next split of the data into training and test data.
Returns True if more splits are available, otherwise False.
This method is useful for benchmarking
"""
if self.current_split + 1 < self.splits:
self.current_split = self.current_split + 1
self._log("Benchmarking with split %s/%s" % (self.current_split + 1,
self.splits))
return True
else:
return False
[docs] def train_sweep(self, use_test_data):
""" Performs the actual training of the node.
.. note:: Split nodes cannot be trained
"""
raise Exception("Split nodes cannot be trained")
[docs] def request_data_for_training(self, use_test_data):
""" Returns the data for training of subsequent nodes
.. todo:: to document
"""
# Create cv-splits lazily when required
if self.split_indices == None:
self._create_splits()
# All data can be used for training which is not explicitly
# specified for testing by the current cv-split
self.data_for_training = MemoizeGenerator(
self.data[i] for i in range(len(self.data))
if not i in self.split_indices[self.current_split])
return self.data_for_training.fresh()
[docs] def request_data_for_testing(self):
""" Returns the data for testing of subsequent nodes
.. todo:: to document
"""
# Create cv-splits lazily when required
if self.split_indices == None:
self._create_splits()
# Only that data can be used for testing which is explicitly
# specified for this purpose by the current cv-split
self.data_for_testing = MemoizeGenerator(
self.data[i] for i in self.split_indices[self.current_split])
return self.data_for_testing.fresh()
[docs] def _create_splits(self):
""" Create the split of the data for n-fold cross-validation """
self._log("Creating %s splits for cross validation" % self.splits)
# Get training and test data (with labels)
train_data = \
list(self.input_node.request_data_for_training(use_test_data=False))
test_data = list(self.input_node.request_data_for_testing())
# If there is already a non-empty training set,
# it means that we are not the first split node in the node chain
if len(train_data) > 0:
raise Exception("No iterated splitting of data sets allowed\n "
"(Calling a splitter on a data set that is "
"already split)")
# Remember all the data and store it in memory
# TODO: This might cause problems for large dataset
self.data = train_data + test_data
# initialize result structure: Determine which data points are
# reserved for testing in which cross validation run
split_indices = []
if self.time_dependent:
# sort the data according to start_time
self.data.sort(key=lambda swindow: swindow[0].start_time)
# divide the data with respect to the time_point
data_time = dict()
last_window_end_time = 0.0
marker = -1
label_marker = dict()
for (index, (window, label)) in enumerate(self.data):
if window.start_time > last_window_end_time:
marker += 1
data_time[marker] = [index]
if self.stratified or self.stratified_class:
if label not in label_marker:
label_marker[label] = [marker]
else:
label_marker[label].append(marker)
else:
data_time[marker].append(index)
# check label consistency for later stratification
if (self.stratified or self.stratified_class) and \
self.data[data_time[marker][0]][1] != label:
import warnings
warnings.warn(
"Since there are several class labels"
" for one marker stratification is set to False.",
UserWarning)
self.stratified = False
self.stratified_class = None
last_window_end_time = window.end_time
#print "data_time: \n", data_time
if self.stratified: # each marker has only one label
# not more splits then markers of every class!
assert(min([len(markers) for markers in
label_marker.values()]) >= self.splits)
# extend result structure since we need it in the next block
split_indices = [[] for i in range(self.splits)]
# determine the splits of the data
for label, markers in label_marker.iteritems():
data_size = len(markers)
# Set random seed and randomize the order of the data
if self.random:
r = random.Random(self.run_number)
r.shuffle(markers)
for j in range(self.splits):
split_start = int(round(float(j) * data_size/self.splits))
split_end = int(round(float(j+1) * data_size/self.splits))
# means half-open interval [split_start, split_end)
for i in range(split_start, split_end):
split_indices[j].extend(data_time[markers[i]])
# avoid sorted labels by sorting time dependent
split_indices = [sorted(split_list)
for split_list in split_indices]
#print "run_number:", self.run_number
#print "time_dependent && stratified:\n", split_indices
elif self.stratified_class:
# extend result structure since we need it in the next block
split_indices = [[] for i in range(self.splits)]
# determine the splits of the data
data_size = len(label_marker[self.stratified_class])
for j in range(self.splits):
split_start = int(round(float(j) * data_size/self.splits))
split_end = int(round(float(j+1) * data_size/self.splits))
# means half-open interval [split_start, split_end)
for i in range(split_start, split_end):
split_indices[j].extend(data_time[label_marker[self.stratified_class][i]])
#print "time_dependent && stratified_class:\n before filling up\n", split_indices
# fill up with other classes
last_max_index = 0
for split_list in split_indices:
max_index = max(split_list)
for i in range(last_max_index, max_index):
if self.data[i][1] != self.stratified_class:
split_list.append(i)
last_max_index = max_index+1
for i in range(last_max_index, len(self.data)):
if self.data[i][1] != self.stratified_class:
split_indices[-1].append(i)
# avoid sorted labels by sorting time dependent
split_indices = [sorted(split_list)
for split_list in split_indices]
print "time_dependent && stratified_class:\n", split_indices
else:
# we should not have more splits then (marker)time points
data_size = len(data_time.keys())
assert(data_size >= self.splits)
# Set random seed and randomize the order of the data
indices = data_time.keys()
if self.random:
r = random.Random(self.run_number)
r.shuffle(indices)
# determine the splits of the data
for i in range(self.splits):
split_indices.append([])
split_start = int(round(float(i) * data_size / self.splits))
split_end = int(round(float(i + 1) * data_size / self.splits))
# means half-open interval [split_start, split_end)
for j in range(split_start,split_end):
split_indices[i].extend(data_time[indices[j]])
# avoid sorted labels by sorting time dependent
split_indices = [sorted(split_list)
for split_list in split_indices]
#for index, splitlist in enumerate(split_indices):
# print index, "first: ", self.data[splitlist[0]][0].start_time, ", last: ", self.data[splitlist[-1]][0].start_time, ", Laenge: ", len(data_time.keys())
#print "time_dependent:\n", split_indices
elif self.stratified: # Stratified cross-validation
# divide the data with respect to the class_label
data_labeled = dict()
for (index, (window, label)) in enumerate(self.data):
if not data_labeled.has_key(label):
data_labeled[label] = [index]
else:
data_labeled[label].append(index)
# we should not have more splits then instances of every class!
min_nr_per_class = min([len(data) for data in data_labeled.values()])
if self.splits > min_nr_per_class:
self.splits = min_nr_per_class
self._log("Reducing number of splits to %s since no more "
"instances of one of the classes are available."
% self.splits, level=logging.CRITICAL)
# extend result structure since we need it in the next block
split_indices = [[] for i in range(self.splits)]
# determine the splits of the data
for label, indices in data_labeled.iteritems():
data_size = len(indices)
# Set random seed and randomize the order of the data
if self.random:
r = random.Random(self.run_number)
r.shuffle(indices)
for j in range(self.splits):
split_start = int(round(float(j) * data_size/self.splits))
split_end = int(round(float(j+1) * data_size/self.splits))
# means half-open interval [split_start, split_end)
split_indices[j].extend(indices[split_start: split_end])
# avoid sorted labels
for j in range(self.splits):
r = random.Random(self.run_number)
r.shuffle(split_indices[j])
# print "stratified:\n", split_indices
# old trunk version
# =================
# data_size = len(self.data)
# # Determine ratio of class1
# instance_labels = map(lambda x: x[1], self.data)
# classes = list(set(instance_labels))
# assert (len(classes) == 2),\
# "Stratified cross-validation works currently only for "\
# "binary classification tasks."
# class1_instances = instance_labels.count(classes[0])
# class2_instances = instance_labels.count(classes[1])
# if self.splits > min(class1_instances, class2_instances):
# self.set_permanent_attributes(splits = min(class1_instances,
# class2_instances))
# self._log("Reducing number of splits to %s since no more " \
# "instances of one of the classes are available."
# % self.splits)
# class1_ratio = float(class1_instances) / data_size
# # Determine which instances belong to which class
# class1_indices = []
# class2_indices = []
# for index, instance_label in enumerate(instance_labels):
# if instance_label == classes[0]:
# class1_indices.append(index)
# else:
# class2_indices.append(index)
#
# # Randomize order
# if self.random:
# r = random.Random(self.run_number)
# r.shuffle(class1_indices)
# r.shuffle(class2_indices)
#
# # Merge the two classes (such that they alternate in the appropriate
# # frequency)
# indices = []
# n = 0 # class1 counter
# for i in range(data_size):
# if i == round((n + 0.5) / class1_ratio):
# indices.append(class1_indices.pop())
# n += 1
# else:
# indices.append(class2_indices.pop())
else: # Non-stratified cross-validation
data_size = len(self.data)
# We cannot have more splits than data points
assert(data_size >= self.splits)
# Set random seed and randomize the order of the data
indices = range(data_size)
if self.random:
r = random.Random(self.run_number)
r.shuffle(indices)
# Determine the splits of the data
for i in range(self.splits):
split_start = int(round(float(i) * data_size / self.splits))
split_end = int(round(float(i + 1) * data_size / self.splits))
# means half-open interval [split_start, split_end)
split_indices.append(indices[split_start: split_end])
self.split_indices = split_indices
self._log("Benchmarking with split %s/%s" % (self.current_split + 1,
self.splits))
_NODE_MAPPING = {"CV_Splitter": CrossValidationSplitterNode}