Source code for pySPACE.missions.nodes.preprocessing.window_func
""" Multiply a signal with a window """
import warnings
import numpy
# These imports are all for loading the functions.yaml file for the abbreviations of functions for benchmarking.
# Kept here local for debugging and because the file is just used in this node.
# Maybe this will change.
try:
import os
import yaml
import pySPACE
import math
except:
pass
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.time_series import TimeSeries
[docs]class InvalidWindowException(Exception):
pass
[docs]class WindowFuncNode(BaseNode):
""" Multiply the :class:`~pySPACE.resources.data_types.time_series.TimeSeries` with a window
If the window has trailing zeros, the time series is chopped.
**Parameters**
:window_function_str:
This string has to be either the name of a function specified in
functions.yaml or a lambda expression that evaluates to a valid
window function. Such a window function has to be of the form
lambda n: lambda x: something
where n is the number of samples (the length of the window
function) and x is the respective value.
:reduce_window:
If True, zeros at the beginning or ending are chopped.
(*optional, default: False*)
**Exemplary call**
.. code-block:: yaml
-
node : Windowing
parameters :
window_function_str : "hanning" # loaded from functions.yaml
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/09/01
:Revised: 2009/09/15 (Mario Krell)
"""
[docs] def __init__(self, window_function_str, reduce_window = False, **kwargs):
super(WindowFuncNode, self).__init__(**kwargs)
# Do the import of the abbreviations of the functions.
if not window_function_str.startswith("lambda"):
try:
functions_file = open(os.path.join(pySPACE.configuration.spec_dir,
'functions.yaml'), 'r')
functions = yaml.load(functions_file)
functions_file.close()
except AttributeError:
# Running outside of pySPACE, not able to load functions from YAML file
# TODO: Fix that
functions = {}
warnings.warn("Function in spec folder could not be loaded. Please fix that!")
try:
window_function_str = functions[window_function_str]
except KeyError:
# window_function_str is not the key for a function in functions.yaml,
# we assume that it is the window function itself
pass
self.set_permanent_attributes(window_function_str = window_function_str,
reduce_window = reduce_window,
num_of_samples = None,
window_array = None)
[docs] def create_window_array(self):
""" Create a permanent array for the windowing of the data"""
# the window is given as a lambda expression where the first variable
# is the length of the window (num of samples of the data) and the
# second one is for the time axis
# resolve first variable
window_function = eval(self.window_function_str)(self.num_of_samples)
# resolve second variable for final window creation
self.window_array = numpy.array([window_function(i) for i in \
range(self.num_of_samples)])
# Check if there are zeros at the beginning or end of window
# If yes, skip these ranges (i.e. shorten the time series window)
self.window_not_equal_zero = numpy.where(self.window_array != 0)[0]
self.window_has_zeros = (len(self.window_not_equal_zero) != \
self.num_of_samples)
# A window with only zeros does not make sense
if len(self.window_not_equal_zero) == 0:
raise InvalidWindowException("The window does contain only zeros!\n"+
"Function_str: %s\nn_samples: %d" % (self.window_function_str,
self.num_of_samples))
[docs] def _execute(self, data):
""" Apply the windowing to the given data and return the result """
#Create a window of the correct length for the given data
if self.num_of_samples is None:
self.num_of_samples = data.shape[0]
self.create_window_array()
data_array=data.view(numpy.ndarray)
#Do the actual windowing
# TODO: check if windowed_data = (self.window_array.T * data) works also???
windowed_data = (self.window_array * data_array.T).T
# Skip trailing zeros
if self.window_has_zeros and self.reduce_window:
windowed_data = windowed_data[
range(self.window_not_equal_zero[0],
self.window_not_equal_zero[-1] + 1), :]
result_time_series = TimeSeries.replace_data(data, windowed_data)
# Adjust start and end time when chopping was done
if data.start_time != None:
result_time_series.start_time = data.start_time + \
self.window_not_equal_zero[0] * 1000.0 / \
data.sampling_frequency
result_time_series.end_time = data.end_time - \
(data.shape[0] - self.window_not_equal_zero[-1] - 1) * \
1000.0 / data.sampling_frequency
else:
result_time_series = TimeSeries.replace_data(data, windowed_data)
return result_time_series
[docs]class ScaleNode(BaseNode):
""" Scale all value by a constant factor
Scales (i.e. multiplies) all values with a given factor.
**Parameters**
:factor:
The factor
**Exemplary Call**
.. code-block:: yaml
-
node : Scale
parameters :
factor : 2
:Authors: Hendrik Woehrle (hendrik.woehrle@dfki.de)
:Created: 2013/03/08
"""
input_types=["TimeSeries"]
[docs] def __init__(self,
factor=1,
**kwargs):
super(ScaleNode, self).__init__(**kwargs)
if type(factor) == str:
factor = eval(factor)
self.set_permanent_attributes(factor=factor)
[docs] def _execute(self, data):
"""
Apply the scaling to the given data x
and return a new time series.
"""
x = data.view(numpy.ndarray).astype(numpy.double)
x = x * self.factor
result_time_series = TimeSeries.replace_data(data, x)
return result_time_series
_NODE_MAPPING = {"Windowing": WindowFuncNode,
}