Source code for pySPACE.missions.nodes.source.external_generator_source
""" Use external data as input """
from pySPACE.missions.nodes.base_node import BaseNode
[docs]class ExternalGeneratorSourceNode(BaseNode):
""" Yield the data provided by an external generator
This node enables to easy input data from an external
data generator within an internal data node chain.
The easiest usage is together with meta nodes, where
the data is given explicitly to the node with the `set_generator` function.
This node only uses one data generator which is taken as testing data.
For training usage an additional splitter is needed as usual.
.. note:: Below you can see a more complex example on how to make use of this node.
.. code-block:: python
# Training is done in separate threads, we send the time series
# windows to these threads via two queues
self.queueS2 = Queue.Queue()
self.queueS3 = Queue.Queue()
# The two classification threads access the two queues via two
# generators
def s2_generator():
# Yield all windows until a None item is found in the queue
while True:
window = self.queueS2.get(block = True, timeout = None)
if window == None: break
yield window
def s3_generator():
# Yield all windows until a None item is found in the queue
while True:
window = self.queueS3.get(block = True, timeout = None)
if window == None: break
yield window
# Create the actual data chains for S1 vs S2 discrimination
# and S1 vs S3 discrimination
self.S1S2 = NodeChainFactory.flow_from_yaml(Flow_Class = NodeChain,
flow_spec_file = dataflow_spec_S2)
self.S1S2[0].set_generator(s2_generator())
self.S1S3 = NodeChainFactory.flow_from_yaml(Flow_Class = NodeChain,
flow_spec_file = dataflow_spec_S3)
self.S1S3[0].set_generator(s3_generator())
**Parameters**
**Exemplary Call**
.. code-block:: yaml
-
node: External_Generator_Source_Node
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2009/09/17
"""
input_types = ["TimeSeries", "FeatureVector", "PredictionVector"]
[docs] def __init__(self, **kwargs):
super(ExternalGeneratorSourceNode, self).__init__(**kwargs)
self.set_permanent_attributes(
current_split=0)
[docs] def set_generator(self, generator):
""" Sets the generator from which this node reads the data """
self.set_permanent_attributes(generator=generator)
[docs] def use_next_split(self):
""" Use the next split (if any) """
assert(False)
[docs] def train_sweep(self, use_test_data):
""" Performs the actual training of the node.
.. note:: Source nodes cannot be trained
"""
raise Exception("Source nodes cannot be trained")
[docs] def request_data_for_training(self, use_test_data):
""" Returns time windows usable for training of subsequent nodes
.. todo:: to document
"""
if not use_test_data:
# Returns an iterator that iterates over an empty sequence
# (i.e. an iterator that is immediately exhausted), since
# this node does not provide any data that is explicitly
# dedicated for training
return (x for x in [].__iter__())
else:
# Return the test data as there is no additional data that
# was dedicated for training
return self.request_data_for_testing()
[docs] def request_data_for_testing(self):
""" Returns time windows usable for testing of subsequent nodes
.. todo:: to document
"""
def logging_generator():
for data, label in self.generator:
yield (data, label)
return logging_generator()
[docs] def process(self):
""" Returns a generator that yields all data contained in the generator
"""
def logging_generator():
for data, label in self.generator:
yield (data, label)
return logging_generator()
[docs] def __getstate__(self):
""" Return a pickable state for this object """
odict = super(ExternalGeneratorSourceNode, self).__getstate__()
odict['generator'] = None
return odict
_NODE_MAPPING = {"External_Generator_Source_Node": ExternalGeneratorSourceNode}