Source code for pySPACE.missions.nodes.source.time_series_source

""" Sources for windowed times series, e.g from streaming data

.. seealso::

    - :class:`~pySPACE.resources.data_types.time_series.TimeSeries`
    - :class:`~pySPACE.resources.dataset_defs.time_series.TimeSeriesDataset`
    - :class:`~pySPACE.resources.dataset_defs.stream.StreamDataset`

"""
import logging
import os

from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.missions.nodes.decorators import NoOptimizationParameter, NormalParameter
from pySPACE.missions.support.windower import Windower, WindowFactory
from pySPACE.tools.memoize_generator import MemoizeGenerator


[docs]class TimeSeriesSourceNode(BaseNode): """ Source for windowed :class:`~pySPACE.resources.data_types.time_series.TimeSeries` saved in pickle format via :class:`~pySPACE.missions.nodes.sink.time_series_sink.TimeSeriesSinkNode` **Parameters** **Exemplary Call** .. code-block:: yaml - node : TimeSeriesSource :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Created: 2008/11/25 """ input_types = ["TimeSeries"]
[docs] def __init__(self, **kwargs): super(TimeSeriesSourceNode, self).__init__(**kwargs) self.set_permanent_attributes(dataset=None)
[docs] def set_input_dataset(self, dataset): """ Sets the dataset from which this node reads the data """ self.set_permanent_attributes(dataset=dataset)
[docs] def register_input_node(self, node): """ Register the given node as input """ raise Exception("No nodes can be registered as inputs for source nodes")
[docs] def use_next_split(self): """ Use the next split of the data into training and test data. Returns True if more splits are available, otherwise False. This method is useful for benchmarking """ # if the input dataset has more than one split/run we will compute # the splits in parallel, i.e. we don't return any further splits return False
[docs] def train_sweep(self, use_test_data): """ Performs the actual training of the node. .. note:: Source nodes cannot be trained """ raise Exception("Source nodes cannot be trained")
[docs] def request_data_for_training(self, use_test_data): """ Returns the time windows that can be used for training of subsequent nodes .. todo:: to document """ if not use_test_data: # If the input dataset consists only of one single run, # we use this as input for all runs to be conducted (i.e. we # rely on later randomization of the order). Otherwise # we use the data for this run number if self.dataset.meta_data["runs"] > 1: key = (self.run_number, self.current_split, "train") self._log("Run %s." % self.run_number) else: key = (0, self.current_split, "train") self._log("Run %s. Using input data of run 0." % self.run_number) # Check if there is training data for the current split and run if key in self.dataset.data.keys(): self._log("Accessing input dataset's training time series windows.") self.data_for_training = \ MemoizeGenerator(self.dataset.get_data(*key).__iter__(), caching=self.caching) else: # Returns an iterator that iterates over an empty sequence # (i.e. an iterator that is immediately exhausted), since # this node does not provide any data that is explicitly # dedicated for training self._log("No training data available.") self.data_for_training = MemoizeGenerator((x for x in [].__iter__()), caching=self.caching) else: # Return the test data as there is no additional data that # was dedicated for training return self.request_data_for_testing() # Return a fresh copy of the generator return self.data_for_training.fresh()
[docs] def request_data_for_testing(self): """ Returns the data that can be used for testing of subsequent nodes .. todo:: to document """ # If we haven't read the data for testing yet if self.data_for_testing is None: self._log("Accessing input dataset's test time series windows.") # If the input dataset consists only of one single run, # we use this as input for all runs to be conducted (i.e. we # rely on later randomization of the order). Otherwise # we use the data for this run number if self.dataset.meta_data["runs"] > 1: key = (self.run_number, self.current_split, "test") else: key = (0, self.current_split, "test") test_data_generator = self.dataset.get_data(*key).__iter__() self.data_for_testing = MemoizeGenerator(test_data_generator, caching=self.caching) # Return a fresh copy of the generator return self.data_for_testing.fresh()
[docs] def get_metadata(self, key): """ Return the value corresponding to the given key from the dataset meta data of this source node. """ return self.dataset.meta_data.get(key)
[docs] def __del__(self): del self.dataset self.dataset = None
@NoOptimizationParameter("local_window_conf") @NoOptimizationParameter("data_consistency_check")
[docs]class Stream2TimeSeriesSourceNode(TimeSeriesSourceNode): """ Transformation of streaming data to windowed time series This node contains an interfaces the streaming dataset to provide it with a windowing specification and then get a data generator. This is a main difference, since other source nodes, get access to the real data and generate a generator object. For the segmentation of the data, the :class:`~pySPACE.resources.dataset_defs.stream.StreamDataset` uses the :class:`~pySPACE.missions.support.windower.MarkerWindower`. **Parameters** :windower_spec_file: The specification file for the Windower containing information which data should be windowed and which data can be discarded. For a detailed description look at the module description. (*recommended, default: windower.WindowFactory.default_windower_spec*) :local_window_conf: Can be set to True if the user wants to specify the location of the windower spec file manually. In the default situation, the spec file is looked up according to the location of the spec files. When set to True, the windower spec file can be specified with path (e.g. '/home/myuser/myspecs/mywindow.yaml') or without path, which indicates that the window specs file is located in current local folder or the specification file folder of the node chain. For the parameterization of the windower configuration file, you should have a look at the documentation of the :class:`~pySPACE.missions.support.windower.MarkerWindower` (*optional, default: False*) :nullmarker_stride_ms: An integer to specify the interval of the null marker. The null marker is than inserted into the data stream every *null_marker_stride_ms* ms. This marker can be used to cut out sliding windows at a constant rate. Either *nullmarker_stride_ms* or *windower_spec_file* should be specified! (*recommended, default: 1000*) :no_overlap: When having streamed windows, the last data point of the previous window might be the same as the one of the current window, since when using a fixed window size, first and last point of the window are normally used. This effect can be turned off with this parameter. When a window spec file is given, the default is *False*. If not, the default is *True*. (*recommended, default: None*) :data_consistency_check: When True it will be checked if cut windows contain channels with zero standard deviation and the user will be informed. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : Stream2TimeSeriesSourceNode parameters : windower_spec_file : "example_lrp_window_spec.yaml" :Author: Johannes Teiwes (johannes.teiwes@dfki.de) :Created: 2010/10/12 :LastChanges: Mario Michael Krell """ input_types = ["TimeSeries"]
[docs] def __init__(self, windower_spec_file=None, local_window_conf=False, nullmarker_stride_ms=1000, no_overlap=False, continuous=False, data_consistency_check=False, **kwargs): super(Stream2TimeSeriesSourceNode, self).__init__(**kwargs) assert not(nullmarker_stride_ms is None and windower_spec_file is None),\ "No segmentation parameters specified!" if windower_spec_file is None: no_overlap = True continuous = True wdefs = WindowFactory.default_windower_spec( endoffsetms=nullmarker_stride_ms) else: wdefs = Windower._load_window_spec(windower_spec_file, local_window_conf) self.set_permanent_attributes( window_definition=wdefs, nullmarker_stride_ms=nullmarker_stride_ms, no_overlap=no_overlap, data_consistency_check=data_consistency_check, dataset=None, continuous=continuous)
[docs] def get_source_file_name(self): """ Returns the file name of the source file""" return self.dataset.data_file.split('/')[-1]
[docs] def process(self): """ Returns a generator that yields all data received by the client This is helpful, when using this source node in online application, since for most other source nodes, :func:`request_data_for_testing` is used instead. ..todo:: check code """ # self._log("Processing data.", level = logging.DEBUG) # # # Create a generator that emits the windows # data_generator = ((sample, label) for (sample, label) in \ # self.marker_windower) # return data_generator return self.request_data_for_testing()
[docs] def request_data_for_training(self, use_test_data): """ Returns the data that can be used for training of subsequent nodes .. todo:: to document """ self._log("Requesting train data...") if not use_test_data: # If we haven't read the data for training yet if self.data_for_training is None: self._log("Start streaming.") self.dataset.set_window_defs( window_definition=self.window_definition, nullmarker_stride_ms=self.nullmarker_stride_ms, no_overlap=self.no_overlap, data_consistency_check=self.data_consistency_check) if self.dataset.meta_data["runs"] > 1: key = (self.run_number, self.current_split, "train") else: key = (0, self.current_split, "train") # Create a generator that emits the windows train_data_generator = ( (sample, label) for (sample, label) in self.dataset.get_data(*key)) self.data_for_training = \ MemoizeGenerator(train_data_generator, caching=self.caching) # Return a fresh copy of the generator return self.data_for_training.fresh() else: # Return the test data as there is no additional data that # was dedicated for training return self.request_data_for_testing()
[docs] def request_data_for_testing(self): """ Returns the data that can be used for testing of subsequent nodes .. todo:: to document """ self._log("Requesting test data...") # If we haven't read the data for testing yet if self.data_for_testing is None: self._log("Start streaming.") self.dataset.set_window_defs( window_definition=self.window_definition, nullmarker_stride_ms=self.nullmarker_stride_ms, no_overlap=self.no_overlap, data_consistency_check=self.data_consistency_check) if self.dataset.meta_data["runs"] > 1: key = (self.run_number, self.current_split, "test") else: key = (0, self.current_split, "test") # Create a generator that emits the windows test_data_generator = ( (sample, label) for (sample, label) in self.dataset.get_data(*key)) self.data_for_testing = \ MemoizeGenerator(test_data_generator, caching=self.caching) # Return a fresh copy of the generator return self.data_for_testing.fresh()
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir* """ from pySPACE.tools.filesystem import create_directory node_dir = os.path.join(result_dir, self.__class__.__name__) create_directory(node_dir) result_file = open(os.path.join(node_dir, "window_definitions.txt"), "w") for window_def in self.window_definition: result_file.write(str(window_def)) result_file.close()
[docs]class TimeSeries2TimeSeriesSourceNode(Stream2TimeSeriesSourceNode): """ Source for streamed time series data for later windowing Source node that interprets a stream of time series windows as raw data stream. The markers stored in marker_name attribute are used as the markers for a :class:`~pySPACE.missions.support.windower.MarkerWindower`. This node pretends to be a Stream2TimeSeriesSourceNode but takes real time series data and interprets it as a stream. Main functionality is implemented in the :class:`TimeSeriesDataset` inspired by the :class:`StreamDataset` **Parameters** :windower_spec_file: The specification file for the windower containing information which data should be windowed and which data can be discarded. For a detailed description look at the module description. :...: For further parameters check the :class:`Stream2TimeSeriesSourceNode` **Exemplary Call** .. code-block:: yaml - node: Time_Series_Stream_Source parameters : windower_spec_file : "example_lrp_window_spec.yaml" :Author: Hendrik Woehrle (hendrik.woehrle@dfki.de) :Created: 2011/08/12 """
[docs] def __init__(self, **kwargs): super(TimeSeries2TimeSeriesSourceNode, self).__init__(**kwargs)
[docs] def get_source_file_name(self): """ Source file name is unknown or preprocessing specific .. todo:: check possibility for access source file name if possible, e.g. by using metadata.yaml """ pass
_NODE_MAPPING = {"Time_Series_Source": TimeSeriesSourceNode, "BCI_Competition_Source": TimeSeriesSourceNode, "EEG_Source": Stream2TimeSeriesSourceNode, "Offline_EEG_Source": Stream2TimeSeriesSourceNode, "Time_Series_Stream_Source": TimeSeries2TimeSeriesSourceNode, "TimeSeriesStreamSource": TimeSeries2TimeSeriesSourceNode}