Source code for pySPACE.missions.nodes.debug.exchange_data
"""Exchange the data against some self created data
"""
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.tests.utils.data.test_data_generation import *
[docs]class ExchangeDataNode(BaseNode):
"""Exchange the data against some self created data
This can be used for testing/debugging purposes,
if the markers etc should be retained,
but the data should be replaced by data with known properties.
**Parameters**
:generator_expression:
Specify generator expression. Uses the data generators in :mod:`~pySPACE.tests.utils.data.test_data_generation`.
(*optional, default: "Adder([One(),Multiplier([Constant(200),Channel(data.shape[1],data.shape[0])]),TimePoint(data.shape[1],data.shape[0])])"*)
**Exemplary Call**
.. code-block:: yaml
-
node : Exchange_Data
parameters :
generator_expression : "One()"
:Authors: Hendrik Woehrle (hendrik.woehrle@dfki.de)
:Created: 2012/04/20
"""
input_types=["TimeSeries"]
[docs] def __init__(self,
generator_expression = "Adder([One(),Multiplier([Constant(200),Channel(data.shape[1],data.shape[0])]),TimePoint(data.shape[1],data.shape[0])])",
**kwargs):
super(ExchangeDataNode, self).__init__(*kwargs)
self.set_permanent_attributes(ts_generator = TestTimeSeriesGenerator(),
generator = None,
generator_expression = generator_expression)
[docs] def _execute(self, data):
"""
Exchanges the data with some manually generated data.
"""
if self.generator is None:
self.generator = eval(self.generator_expression)
self.data_item = \
self.ts_generator.generate_test_data(
channels=data.shape[1],
time_points=data.shape[0],
function=self.generator,
sampling_frequency=data.sampling_frequency,
channel_order=True,
channel_names=data.channel_names,
dtype=numpy.float)
result_time_series = TimeSeries.replace_data(data, self.data_item)
return result_time_series
_NODE_MAPPING = {"Exchange_Data": ExchangeDataNode}