Source code for pySPACE.missions.nodes.meta.stream_windowing
""" Perform windowing on stream of windows
.. todo:: documentation: Delete upper summaries!
Check if *meta* is the correct module.
If yes the documentation should make clear, why.
The documentation should show in detail the use case of this node.
.. todo:: this module should work without EEG acquisition
"""
import itertools
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.dataset_defs.time_series import TimeSeriesClient
from pySPACE.tools.memoize_generator import MemoizeGenerator
from pySPACE.missions.support.windower import Windower, MarkerWindower
[docs]class StreamWindowingNode(BaseNode):
"""Get a stream of time series objects and window them inside a flow.
Node that interprets a stream of incoming time series objects as
a raw data stream.
The markers stored in marker_name attribute are used as the markers
for a :class:`~pySPACE.missions.support.windower.MarkerWindower`.
This should done *before* any splitter, since all incoming windows
are regarded as parts of a consecutive data stream.
**Parameters**
:windower_spec_file:
The window specification file for the
:class:`~pySPACE.missions.support.windower.MarkerWindower`.
Used for testing and training, if windower_spec_file_train
is not specified.
:windower_spec_file_train:
A separate window file for training only.
If not specified, windower_spec_file is used for training
and testing.
**Parameters**
**Exemplary Call**
.. code-block:: yaml
-
node : Stream_Windowing
parameters :
windower_spec_file : "example_lrp_window_spec.yaml"
:Authors: Hendrik Woehrle (hendrik.woehrle@dfki.de)
:Created: 2012/07/09
"""
[docs] def __init__(self,
windower_spec_file,
windower_spec_file_train = None,
local_window_conf=False,
nullmarker_stride_ms=1000,
*args,
**kwargs):
super(StreamWindowingNode, self).__init__(*args, **kwargs)
if windower_spec_file_train is None:
windower_spec_file_train = windower_spec_file
self.set_permanent_attributes(client = None,
marker_windower = None,
window_definition = None,
local_window_conf = local_window_conf,
windower_spec_file = windower_spec_file,
windower_spec_file_train = windower_spec_file_train,
nullmarker_stride_ms=nullmarker_stride_ms)
[docs] def request_data_for_training(self, use_test_data):
""" Returns the data that can be used for training of subsequent nodes
.. todo:: to document
"""
# set window definition for train phase windower file
self.window_definition = \
Windower._load_window_spec(self.windower_spec_file_train,
self.local_window_conf)
self._log("Requesting train data...")
if self.data_for_training is None:
if not use_test_data:
# Get training and test data (with labels)
train_data = \
list(self.input_node.request_data_for_training(use_test_data=use_test_data))
# If training or test data is an empty list
if train_data == []:
self.data_for_training=MemoizeGenerator(
(x for x in [].__iter__()), caching=True)
return self.data_for_training.fresh()
# create stream of
self.window_stream(train_data)
# Create a generator that emits the windows
train_data_generator = ((sample, label) for (sample, label)
in self.marker_windower)
self.data_for_training = MemoizeGenerator(train_data_generator,
caching=True)
return self.data_for_training.fresh()
else:
# Return the test data as there is no additional data that
# was dedicated for training
self.data_for_training = self.request_data_for_testing()
return self.data_for_training
else:
return self.data_for_training.fresh()
[docs] def request_data_for_testing(self):
""" Returns the data for testing of subsequent nodes
.. todo:: to document
"""
if self.data_for_testing is None:
# set window definition for test phase windower file
self.window_definition = \
Windower._load_window_spec(self.windower_spec_file,
self.local_window_conf)
test_data = list(self.input_node.request_data_for_testing())
# create stream of windows
self.window_stream(test_data)
# Create a generator that emits the windows
test_data_generator = ((sample, label) \
for (sample, label) in self.marker_windower)
self.data_for_testing = MemoizeGenerator(test_data_generator)
# Return a fresh copy of the generator
return self.data_for_testing.fresh()
else:
return self.data_for_testing.fresh()
[docs] def process(self):
""" Processes all data that is provided by the input node
Returns a generator that yields the data after being processed by this
node.
"""
assert(self.input_node != None), "No input node specified!"
# Assert that this node has already been trained
assert(not self.is_trainable() or
self.get_remaining_train_phase() == 0), "Node not trained!"
if self.window_definition is None:
if self.is_training() and self.windower_spec_file_train is not None:
self.window_definition = \
Windower._load_window_spec(self.windower_spec_file_train,
self.local_window_conf)
else:
self.window_definition = \
Windower._load_window_spec(self.windower_spec_file,
self.local_window_conf)
data_generator = \
itertools.imap(lambda (data, label):
(self.execute(data), label),
self.input_node.process())
self.client = TimeSeriesClient(ts_stream = data_generator)
self.client.set_window_defs(self.window_definition)
self.client.connect()
self.marker_windower = MarkerWindower(data_client=self.client,
windowdefs=self.window_definition,
nullmarker_stride_ms=self.nullmarker_stride_ms)
if self.marker_windower == None:
self.window_stream()
# Create a generator that emits the windows
test_data_generator = ((sample, label) \
for (sample, label) in self.marker_windower)
self.data_for_testing = MemoizeGenerator(test_data_generator)
# Return a fresh copy of the generator
return self.data_for_testing.fresh()
[docs] def window_stream(self, data):
# Creates a windower that splits the given data data into windows
# based in the window definitions provided
# and assigns correct labels to these windows
self.client = TimeSeriesClient(ts_stream = iter(data))
self.client.connect()
self.client.set_window_defs(self.window_definition)
self.marker_windower = MarkerWindower(data_client=self.client,
windowdefs=self.window_definition,
nullmarker_stride_ms=self.nullmarker_stride_ms)
[docs] def __getstate__(self):
""" Return a pickable state for this object """
self.window_definition = None
return super(StreamWindowingNode, self).__getstate__()
[docs] def get_output_type(self, input_type, as_string=True):
from pySPACE.resources.data_types.time_series import TimeSeries
if as_string:
return "TimeSeries"
else:
return TimeSeries
_NODE_MAPPING = {"Stream_Windowing": StreamWindowingNode}