Source code for pySPACE.missions.nodes.sink.time_series_sink
""" Gather all time series objects that are passed through
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/11/28
"""
import itertools
import copy
import numpy
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.dataset_defs.time_series import TimeSeriesDataset
from pySPACE.resources.data_types.time_series import TimeSeries
[docs]class TimeSeriesSinkNode(BaseNode):
""" Collect all :mod:`time series objects <pySPACE.resources.data_types.time_series>` in a :mod:`collection <pySPACE.resources.dataset_defs.time_series>`
**Parameters**
:sort_string:
A lambda function string that is passed to the TimeSeriesDataset and
evaluated before the data is stored.
(*optional, default: None*)
:max_num_stored_objects:
Number of maximal stored time series objects. Can be used if only a part
of a dataset should be exported, e.g. for size purposes in debugging.
Applies to train and test set separately.
(*optional, default: numpy.inf*)
:merge:
Can be set to true if the use wants to get one timeseries containing the
entier input data
(*optional, default: False*)
**Exemplary Call**
.. code-block:: yaml
-
node: Time_Series_Sink
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/11/28
:LastChange: 2011/04/13 Anett Seeland (anett.seeland@dfki.de)
"""
input_types = ["TimeSeries"]
[docs] def __init__(self, sort_string=None, merge = False, **kwargs):
super(TimeSeriesSinkNode, self).__init__(**kwargs)
self.set_permanent_attributes(sort_string=sort_string,
merge = merge,
# This will be created lazily
time_series_collection = None,
max_num_stored_objects = numpy.inf)
[docs] def reset(self):
"""
Reset the state of the object to the clean state it had after its
initialization
"""
# We have to create a temporary reference since we remove
# the self.permanent_state reference in the next step by overwriting
# self.__dict__
tmp = self.permanent_state
# TODO: just a hack to get it working quickly...
tmp["time_series_collection"] = self.time_series_collection
self.__dict__ = copy.copy(tmp)
self.permanent_state = tmp
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
# Though this node is not really trainable, it returns true in order
# to get trained. The reason is that during this training phase,
# it stores all time windows along with their class label
return True
[docs] def _get_train_set(self, use_test_data):
""" Returns the data that can be used for training """
# We take data that is provided by the input node for training
# NOTE: This might involve training of the preceding nodes
train_set = self.input_node.request_data_for_training(use_test_data)
# Add the data provided by the input node for testing to the
# training set
# NOTE: This node is not really learning but creating a labeled set
# of time windows. Because of that it must take all
# data for training (even when use_test_data is False)
train_set = itertools.chain(train_set,
self.input_node.request_data_for_testing())
return train_set
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
# We do nothing
pass
[docs] def process_current_split(self):
"""
Compute the results of this sink node for the current split of the data
into train and test data
"""
index = 0
# Compute the time series for the data used for training
for time_series, label in self.input_node.request_data_for_training(False):
# Do lazy initialization of the class
if self.time_series_collection == None:
self.time_series_collection = \
TimeSeriesDataset(sort_string=self.sort_string)
if index < self.max_num_stored_objects:
# Add sample
self.time_series_collection.add_sample(time_series,
label = label,
train = True,
split = self.current_split,
run = self.run_number)
index += 1
# Compute the time series for the data used for testing
index = 0
for time_series, label in self.input_node.request_data_for_testing():
# Do lazy initialization of the class
# (maybe there were no training examples)
if self.time_series_collection == None:
self.time_series_collection = \
TimeSeriesDataset(sort_string=self.sort_string)
if index < self.max_num_stored_objects:
# Add sample
self.time_series_collection.add_sample(time_series,
label = label,
train = False,
split = self.current_split,
run = self.run_number)
index += 1
[docs] def merge_time_series(self, input_collection):
""" Merges all timeseries of the input_collection to one big timeseries """
# Retriev the time series from the input_collection
input_timeseries = input_collection.get_data(0,0,'test')
# Get the data from the first timeseries
output_data = input_timeseries[0][0]
skiped_range = output_data.start_time
# Change the endtime of the first timeseries to the one of the last
# timeseries inside the input_collection
input_timeseries[0][0].end_time = input_timeseries[-1][0].end_time
# For all the remaining timeseries
for ts in input_timeseries[1:]:
# Concatenate the data...
output_data = numpy.vstack((output_data,ts[0]))
# ... and add the marker to the first timeseries
if(len(ts[0].marker_name) > 0):
for k in ts[0].marker_name:
if(not input_timeseries[0][0].marker_name.has_key(k)):
input_timeseries[0][0].marker_name[k] = []
for time in ts[0].marker_name[k]:
input_timeseries[0][0].marker_name[k].append(time+ts[0].start_time - skiped_range)
# Use the meta information from the first timeseries e.g. marker start/end_time
# and create a new timeseries with the concatenated data
merged_time_series = TimeSeries.replace_data(input_timeseries[0][0],output_data)
# Change the name of the merged_time_series
merged_time_series.name = "%s, length %d ms, %s" % (merged_time_series.name.split(',')[0], \
(len(merged_time_series)*1000.0)/merged_time_series.sampling_frequency,\
merged_time_series.name.split(',')[-1])
return merged_time_series
[docs] def get_result_dataset(self):
""" Return the result """
# Merges all timeseries inside the collection if merge flag is set to true
if self.merge:
merged_time_series = self.merge_time_series(self.time_series_collection)
self.time_series_collection = None
self.time_series_collection = \
TimeSeriesDataset(sort_string=self.sort_string)
self.time_series_collection.add_sample(merged_time_series,
label = 'Window',
train = False)
return self.time_series_collection
_NODE_MAPPING = {"Time_Series_Sink": TimeSeriesSinkNode}