Source code for pySPACE.missions.nodes.preprocessing.differentiation

""" Differentiate :class:`~pySPACE.resources.data_types.time_series.TimeSeries` channel wise

:Author: Marc Tabie (mtabie@informatik.uni-bremen.de)
:Created: 2010/01/20
"""
import numpy
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.time_series import TimeSeries

[docs]class Simple2DifferentiationNode(BaseNode): """ Calculate difference to previous time point to generate new :class:`~pySPACE.resources.data_types.time_series.TimeSeries` **Parameters** :datapoints: The indices of the data points that are used as features, If None, all data points are used. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : diff2 """ # Simple copy of the TimeDifferenceFeatureNode with slight change at the end. # More complex would be a short time regression between more than two points. # Yet the possibility to make it a "real" differentiation is in comment.
[docs] def __init__(self, datapoints = None, moving_window_length = 1, keep_number_of_samples = False, **kwargs): super(Simple2DifferentiationNode, self).__init__(**kwargs) self.set_permanent_attributes(datapoints = datapoints, moving_window_length = moving_window_length, keep_number_of_samples = keep_number_of_samples)
[docs] def _execute(self, x): """ f' = (f(x+h)-f(x)) """ if self.datapoints == None: self.datapoints = len(x) #create new channel names new_names = [] for channel in range(len(x.channel_names)): new_names.append("%s'" % (x.channel_names[channel])) #Derive the f' d2 from data x timeSeries = [] for datapoint in range(self.datapoints): temp = [] if((datapoint+1)<self.datapoints): for channel in range(len(x.channel_names)): temp.append(x[datapoint+1][channel]-x[datapoint][channel])#*8*sampling_frequency timeSeries.append(temp) #padding with zero's if the original length of the time series have to remain equal. if self.keep_number_of_samples: temp = [] for i in range(len(x.channel_names)): temp.append(0) timeSeries.append(temp) #Create a new time_series with the new data and channel names result_time_series = TimeSeries.replace_data(x, numpy.array(timeSeries)) result_time_series.channel_names = new_names #if necessary adjust the length of the time series if not self.keep_number_of_samples: result_time_series.end_time -= 1 return result_time_series
[docs]class Simple5DifferentiationNode(BaseNode): """ Calculate smoothed derivative using 5 time points Method taken from http://www.holoborodko.com/pavel/numerical-methods/numerical-derivative/smooth-low-noise-differentiators/ .. math:: f'(x)=\\frac{2(f(x+h)-f(x-h))-(f(x+2h)-f(x-2h))}{8h} Further smoothing functions are available, but seemingly not necessary, because we have already a smoothing of the signal when doing the subsampling. Dividing by 8h is omitted, because it is just multiplication with the same scalar each time. **Parameters** :datapoints: The indices of the data points that are used as features, If None, all data points are used. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : diff5 """ # Simple copy of the TimeDifferenceFeatureNode with slight change at the end. # More complex would be a short time regression between more than two points. # Yet the possibility to make it a "real" differentiation is in comment.
[docs] def __init__(self, datapoints = None, moving_window_length = 1, keep_number_of_samples = False, **kwargs): super(Simple5DifferentiationNode, self).__init__(**kwargs) self.set_permanent_attributes(datapoints = datapoints, moving_window_length = moving_window_length, keep_number_of_samples = keep_number_of_samples)
[docs] def _execute(self, x): """ Calculate derivative .. math:: f'(x)=\\frac{2(f(x+h)-f(x-h))-(f(x+2h)-f(x-2h))}{8h} """ if self.datapoints == None: self.datapoints = len(x) #create new channel names new_names = [] for channel in range(len(x.channel_names)): new_names.append("%s'" % (x.channel_names[channel])) #Derive the f' d5 from data x timeSeries = [] for datapoint in range(self.datapoints): temp = [] if((datapoint+4)<self.datapoints): for channel in range(len(x.channel_names)): temp.append(x[datapoint][channel]-2*x[datapoint+1][channel]+2*x[datapoint+3][channel]-x[datapoint+4][channel])#*8*sampling_frequency timeSeries.append(temp) #padding with zero's if the original length of the time series have to remain equal. if self.keep_number_of_samples: for i in range(4): temp = [] for i in range(len(x.channel_names)): temp.append(0) timeSeries.append(temp) #Create a new time_series with the new data and channel names result_time_series = TimeSeries.replace_data(x, numpy.array(timeSeries)) result_time_series.channel_names = new_names #if necessary adjust the length of the time series if not self.keep_number_of_samples: result_time_series.end_time -= 4 return result_time_series
_NODE_MAPPING = {"diff2": Simple2DifferentiationNode, "diff5": Simple5DifferentiationNode}