"""Print out formatted data.
"""
import numpy
import time
import warnings
import logging
import itertools
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.feature_vector import FeatureVector
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.tools.memoize_generator import MemoizeGenerator
[docs]class PrintDataNode(BaseNode):
"""Print out formatted data.
This prints out the data to support debugging.
**Parameters**
:print_delimiters:
Separate prints with delimiters for readibility
(*optional, default: True*)
:print_markers:
Print the markers.
(*optional, default: True*)
:print_shape:
Print the the datas shape.
(*optional, default: False*)
:print_samples:
Print the data.
(*optional, default: True*)
:print_hex:
Print the data in flattened hex format.
(*optional, default: False*)
:print_normal:
Print the data "normally".
(*optional, default: True*)
:numpy_printoptions:
Specify numpy printoptions. Use none, if it does not apply.
(*optional, default: None*)
**Exemplary Call**
.. code-block:: yaml
-
node : PrintData
parameters :
numpy_printoptions :
precision : 12
threshold : 100
:Authors: Hendrik Woehrle (hendrik.woehrle@dfki.de)
:Created: 2012/04/20
"""
[docs] def __init__(self,
print_delimiters = True,
print_markers = True,
print_hex = False,
print_normal = True,
numpy_printoptions = None,
print_samples = True,
print_shape = False,
**kwargs):
super(PrintDataNode, self).__init__(*kwargs)
self.set_permanent_attributes(item = 0,
print_delimiters = print_delimiters,
print_markers = print_markers,
print_hex = print_hex,
print_normal = print_normal,
numpy_printoptions = numpy_printoptions,
print_samples = print_samples,
print_shape = print_shape
)
[docs] def process(self):
""" Processes all data that is provided by the input node
Returns a generator that yields the data after being processed by this
node.
"""
assert(self.input_node != None), "No input node specified!"
# Assert that this node has already been trained
assert(not self.is_trainable() or
self.get_remaining_train_phase() == 0), "Node not trained!"
self._log("Processing data.", level=logging.DEBUG)
data_generator = \
itertools.imap(lambda (data, label):
self.print_data(data, label),
self.input_node.process())
return data_generator
[docs] def request_data_for_training(self, use_test_data):
""" Returns data for training of subsequent nodes of the node chain
A call to this method might involve training of the node chain up this
node. If use_test_data is true, all available data is used for
training, otherwise only the data that is explicitly for training.
"""
assert(self.input_node != None)
self._log("Data for training is requested.", level = logging.DEBUG)
# If we haven't computed the data for training yet
if self.data_for_training == None:
self._log("Producing data for training.", level = logging.DEBUG)
# Train this node
self.train_sweep(use_test_data)
# Compute a generator the yields the train data and
# encapsulate it in an object that memoizes its outputs and
# provides a "fresh" method that returns a new generator that'll
# yield the same sequence
# This line crashes without the NodeMetaclass bug fix
train_data_generator = \
itertools.imap(lambda (data, label) :
self.print_data(data, label),
self.input_node.request_data_for_training(
use_test_data))
self.data_for_training = MemoizeGenerator(train_data_generator,
caching=self.caching)
self._log("Data for training finished", level = logging.DEBUG)
# Return a fresh copy of the generator
return self.data_for_training.fresh()
[docs] def request_data_for_testing(self):
""" Returns data for testing of subsequent nodes of the node chain
A call to this node might involve evaluating the whole node chain
up to this node.
"""
assert(self.input_node != None)
self._log("Data for testing is requested.", level = logging.DEBUG)
# If we haven't computed the data for testing yet
if self.data_for_testing == None:
# Assert that this node has already been trained
assert(not self.is_trainable() or
self.get_remaining_train_phase() == 0)
# Compute a generator the yields the test data and
# encapsulate it in an object that memoizes its outputs and
# provides a "fresh" method that returns a new generator that'll
# yield the same sequence
self._log("Producing data for testing.", level = logging.DEBUG)
test_data_generator = \
itertools.imap(lambda (data, label):
self.print_data(data, label),
self.input_node.request_data_for_testing())
self.data_for_testing = MemoizeGenerator(test_data_generator,
caching=self.caching)
self._log("Data for testing finished", level = logging.DEBUG)
# Return a fresh copy of the generator
return self.data_for_testing.fresh()
[docs] def print_data(self, data, label):
"""
Print the data according to the specified constraints.
"""
if self.print_delimiters == True:
print 50 *"*"
if hasattr(data,"marker_name") and data.marker_name != None and self.print_markers:
print "%s: markers: %s" % (str(type(data)), str(data.marker_name))
else :
print "%s" % (str(type(data)))
if issubclass(FeatureVector, type(data)):
print "%04d: %s %s" % (self.item, data.tag, label)
elif issubclass(TimeSeries, type(data)):
print "%04d: %s %s %s" % (self.item, data.name, data.marker_name, label)
# backup printoptions
if self.numpy_printoptions:
default_printoptions = numpy.get_printoptions()
numpy.set_printoptions(**self.numpy_printoptions)
if self.print_shape:
print "shape:", data.shape
if self.print_normal:
if self.print_delimiters == True:
print 25 *"-"
print data
if self.print_hex:
if self.print_delimiters == True:
print 25 *"-"
print map(hex,data.flatten())
if self.print_delimiters == True:
print 50 *"*"
#set back default printoptions
if self.numpy_printoptions:
numpy.set_printoptions(default_printoptions)
self.item += 1
return (data, label)
[docs]class EstimateBandwidthNode(BaseNode):
"""Estimates the Bandwidth of the data which is forwarded through this node
**Parameters**
:print_bw:
print the results for every data blob
(*optional, default: True*)
**Exemplary Call**
.. code-block:: yaml
-
node : EstimateBandwidth
parameters :
print_bw : False
:Authors: Johannes Teiwes (johannes.teiwes@dfki.de)
:Created: 2013/06/18
"""
[docs] def __init__(self,
print_bw = True,
**kwargs):
super(EstimateBandwidthNode, self).__init__(*kwargs)
self.set_permanent_attributes(item = 0,
print_bw = print_bw,
starttime = time.time(),
num_channels = None,
num_samples = None,
frequency = None,
data_rate = None
)
[docs] def _execute(self, data):
"""
forward data and just take the current time
"""
# ignore all non-timeseries data
if not isinstance(data, TimeSeries):
return data
# gather some relevant parameters once
if self.num_channels is None or \
self.num_samples is None or \
self.data_rate is None:
(self.num_channels, self.num_samples) = data.shape
self.data_rate = self.num_channels*self.num_samples
if self.frequency is None:
self.frequency = data.sampling_frequency
# calculate duration
rate = self.data_rate / (time.time() - self.starttime)
if rate < self.data_rate:
print("%f Samples/s are too slow for online!" % rate)
elif self.print_bw:
print "Current Bandwidth: %f Samples/second" % rate
self.starttime = time.time()
self.item += 1
# if self.item > 100:
# raise Exception
return data
_NODE_MAPPING = {"Print_Data": PrintDataNode, "EstimateBandwidth": EstimateBandwidthNode}