Source code for pySPACE.missions.nodes.preprocessing.filtering

""" Digital filtering of :class:`~pySPACE.resources.data_types.time_series.TimeSeries`"""

import numpy
import math
import scipy.signal
import scipy.fftpack
import warnings
import logging

from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.time_series import TimeSeries

try:
    from pySPACE.missions.support.CPP.variance_tools import variance_tools as vt
except:
    pass


[docs]class SimpleLowPassFilterNode(BaseNode): """ Low-pass filtering with the given cutoff frequency using SciPy This node performs low pass filtering with the given *cutoff_frequency*. It uses a FIR filter whose *taps*, *width*, and *window* can be specified. .. note:: Deprecated, because functionality is contained in the other nodes, with much more important features. Use the FIRFilterNode or IIRFilterNode. **Exemplary Call** .. code-block:: yaml - node : SimpleLowPassFilterNode parameters : cutoff_frequency : 0.25 :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Revisited: Hendrik Woehrle (hendrik.woehrle@dfki.de) :Created: 2008/08/18 """
[docs] def __init__(self, cutoff_frequency, taps = None, width = None, window = 'hamming', selected_channels= None, **kwargs): assert(cutoff_frequency > 0.0) super(SimpleLowPassFilterNode, self).__init__(**kwargs) self.set_permanent_attributes(cutoff_frequency = cutoff_frequency, taps = taps, width = width, window = window, selected_channels = selected_channels, b = None) warnings.warn("Use LowPassFilterNode",DeprecationWarning)
[docs] def _execute(self, x): """ Apply low pass filter to data x and return the result """ #Determine the indices of the channels which will be filtered selected_channel_names = self.selected_channels \ if self.selected_channels != None else x.channel_names selected_channel_indices = [x.channel_names.index(channel_name) \ for channel_name in selected_channel_names] # Compute the FIR window which is required for the low pass filter # This is quite slow! # filter_order = 2 * x.sampling_frequency / self.cutoff_frequency filter_order = 31 if self.b is None: try: b = \ scipy.signal.firwin(numtaps = filter_order, cutoff = self.cutoff_frequency * 2.0 / x.sampling_frequency, width = self.width, window = self.window) except TypeError: b = \ scipy.signal.firwin(N = filter_order-1, cutoff = self.cutoff_frequency * 2.0 / x.sampling_frequency, width = self.width, window = self.window) self.set_permanent_attributes(b=b) #Do the actual filtering filtered_data = numpy.zeros(x.shape) y=x.view(type=numpy.ndarray) for channel_index in selected_channel_indices: filtered_data[:,channel_index] = scipy.signal.convolve(self.b, \ y[:,channel_index])[len(self.b)/2:-len(self.b)/2+1] result_time_series = TimeSeries.replace_data(x, filtered_data) return result_time_series
[docs]class HighPassFilterNode(BaseNode): """ High-pass filtering with a FIR filter .. todo:: This nodes needs revision concerning computation time and correctness. .. todo:: Check if cutoff frequency is higher than sampling frequency. **Parameters** :cutoff_frequency: A frequency in Hz. Frequencies above the cutoff frequency can pass, but below are reduced (attenuated). Recommended cutoff_frequency for EMG preprocessing: 40 Hz :taps: Number of taps of the filter kernel. Also called filter order. For EMG preprocessing the recommended filter order is 150. :width: Approximate width of transition region (normalized so that 1 corresponds to pi) for use in kaiser FIR filter design. (*optional, default: None*) :window: Window function to use. See Scipy documentation http://docs.scipy.org/ doc/scipy/reference/generated/scipy.signal.firwin.html#scipy.signal.firwin for possible windows. (*optional, default: ('kaiser', 0.5)*) :selected_channels: A list of channel names for which the filter should be applied. E.g. the names of the EMG channels. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : High_Pass_Filter parameters : cutoff_frequency : 0.1 taps : 150 selected_channels : ['C3','C4'] :Author: Judith Suttrup :Created: 2010/02/10 """
[docs] def __init__(self, cutoff_frequency, taps, width = None, window = ("kaiser", 0.5), selected_channels= None, **kwargs): assert(cutoff_frequency > 0.0) super(HighPassFilterNode, self).__init__(**kwargs) self.set_permanent_attributes(cutoff_frequency = cutoff_frequency, taps = taps, width = width, window = window, selected_channels = selected_channels, b=None)
[docs] def _execute(self, x): """ Apply high pass filter to data x and return the result """ #Determine the indices of the channels which will be filtered selected_channel_names = self.selected_channels \ if self.selected_channels != None else x.channel_names selected_channel_indices = [x.channel_names.index(channel_name) \ for channel_name in selected_channel_names] if self.b is None: #Compute the FIR window which is required for the high pass filter try: b = scipy.signal.firwin(numtaps = self.taps, cutoff = self.cutoff_frequency * 2.0 / x.sampling_frequency, width = self.width, window = self.window) except TypeError: b = scipy.signal.firwin(N = self.taps-1, cutoff = self.cutoff_frequency * 2.0 / x.sampling_frequency, width = self.width, window = self.window) b = -b b[self.taps/2] = b[self.taps/2]+1 self.set_permanent_attributes(b=b) #Do the actual filtering y=x.view(numpy.ndarray) filtered_data = numpy.zeros(x.shape) for channel_index in selected_channel_indices: filtered_data[:,channel_index] = \ scipy.signal.lfilter(self.b, [1], y[:,channel_index]) result_time_series = TimeSeries.replace_data(x, filtered_data) return result_time_series
[docs]class FFTBandPassFilterNode(BaseNode): """ Band-pass filtering using a Fourier transform This node performs a band-pass filtering for a given *pass_band* by converting the signal into the frequency domain using an FFT, setting all bands outside the pass band to zero, and going back to the time domain using an IFFT. .. note:: Deprecated. Use the FIRFilterNode or IIRFilterNode. **Exemplary Call** .. code-block:: yaml - node : FFTBandPassFilter parameters : pass_band : [0.1, 1.0] :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Revisited: Hendrik Woehrle (hendrik.woehrle@dfki.de) :Created: 2008/08/18 """
[docs] def __init__(self, pass_band, selected_channels= None, **kwargs): super(FFTBandPassFilterNode, self).__init__(**kwargs) self.set_permanent_attributes(pass_band = pass_band, selected_channels = selected_channels) warnings.warn("Use BandPassFilterNode",DeprecationWarning)
[docs] def _execute(self, x): """ Apply band pass filter to data x and return the result """ #Determine the indices of the channels which will be filtered selected_channel_names = self.selected_channels \ if self.selected_channels != None else x.channel_names selected_channel_indices = [x.channel_names.index(channel_name) \ for channel_name in selected_channel_names] #Do the actual filtering x_data=x.view(numpy.ndarray) # More efficient slicing, without memory copy... filtered_data = numpy.zeros(x.shape) for channel_index in selected_channel_indices: #Fourier transform fourier_transformed = scipy.fftpack.fft(x_data[:, channel_index]) #Compute the pass band indices lower_bound = int(round(float(self.pass_band[0]) / \ x.sampling_frequency * len(fourier_transformed))) upper_bound = int(round(float(self.pass_band[1]) / \ x.sampling_frequency * len(fourier_transformed))) #Setting frequencies outside the pass band to 0 for i in range(0, lower_bound): fourier_transformed[i] = 0 fourier_transformed[-i-1] = 0 for i in range(upper_bound,len(fourier_transformed)/2): fourier_transformed[i] = 0 fourier_transformed[-i-1] = 0 #Inverse Fourier transform and project to real component filtered_data[:, channel_index] = \ scipy.fftpack.ifft(fourier_transformed).real result_time_series = TimeSeries.replace_data(x, filtered_data) return result_time_series
[docs]class FIRFilterNode(BaseNode): """ Band-pass or low-pass filtering with a time domain convolution based on a FIR filter kernel This node performs a finite impulse response filtering for a given *pass_band* by applying a time domain convolution with a FIR filter kernel. .. todo:: Check if ``pass_band`` frequencies are higher than sampling frequency. .. todo:: Check why generic unittest does not work with a ``pass_band`` interval of [0, 0.4] **Parameters** :pass_band: The pass band. Tuple for band pass, single value for low pass filtering. :taps: Number of taps of the filter kernel (i.e. order-1) (*optional, default: 33*) :width: Approximate width of transition region (normalized so that 1 corresponds to pi) for use in kaiser FIR filter design. (*optional, default: None*) :window: Window function to use. See scipy doc for possible windows. (*optional, default: 'hamming'*) :comp_type: Type of computation, e.g. 'normal': the computations are performed using scipy 'parallel': the computations are performed using the adappt module (*optional, default: 'normal'*) :skip: Number of values that are skipped during convolution, for e.g. in decimation. Needs the comp_type set to 'parallel'. (*optional, default: 0*) :time_shift: Normally, the convolution is performed as follows: If a signal of length N is convolved with a signal of length M, the result has the length N+M-1. Scipy picks the first N values to assure, that the resulting signal is of valid length. If time_shift is set to 'normal, the filter behaves as stated above. If time_shift is set to 'middle', the values of the interval [M/2,N+M/2-1] are picked. If time_shift is set to 'end', the values of the interval [(N+M-1)/2,N+M-1] are picked. If time_shift is set to 'stream', a block-wise computation is performed, i.e. all incoming time series objects are assumed to be adjacent sub-blocks of a larger data stream. Therefore, the internal filter state is preserved between different executions of the filter. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : FIRBandPassFilter parameters : pass_band : [0.4] comp_type : "normal" window : "hamming" taps : 3 skip : 0 :Author: Hendrik Woehrle (hendrik.woehrle@dfki.de) """
[docs] def __init__(self, pass_band, taps = 33, width = None, window = 'hamming', skip = 0, comp_type = 'normal', time_shift = "middle", selected_channels= None, **kwargs): super(FIRFilterNode, self).__init__(**kwargs) self.set_permanent_attributes(pass_band = pass_band, taps = taps, width = width, window = window, selected_channels = selected_channels, comp_type = comp_type, skip = skip, time_shift = time_shift) self.comp_hw = kwargs.pop('comp_hw', None) self.filter_kernel = None self.filtered_data = None self.shrinked_data = None self.data_buffer = None # the internal filter state self.internal_state = None
[docs] def initialize_data_dependencies(self, data): """ Initialize several data dependent buffer variables and data items """ # compute the filter kernel self.calc_filter_kernel(data) #Determine the indices of the channels which will be filtered self.selected_channel_names = self.selected_channels \ if self.selected_channels != None else data.channel_names self.selected_channel_indices = [data.channel_names.index(channel_name) \ for channel_name in self.selected_channel_names] # buffer for all filter results if self.time_shift == "middle": self.filtered_data = numpy.zeros((data.shape[0]+ \ (len(self.filter_kernel)-1)/2,data.shape[1]), dtype=numpy.float) self.shrinked_data = numpy.zeros(data.shape, dtype=numpy.float) self.data_buffer = numpy.zeros((data.shape[0]+ \ (len(self.filter_kernel)-1)/2,data.shape[1]), dtype=numpy.float) elif self.time_shift == "end": self.filtered_data = numpy.zeros((data.shape[0]+ \ (len(self.filter_kernel)-1),data.shape[1]), dtype=numpy.float) self.shrinked_data = numpy.zeros(data.shape, dtype=numpy.float) self.data_buffer = numpy.zeros((data.shape[0]+ \ (len(self.filter_kernel)-1),data.shape[1]), dtype=numpy.float) else: self.filtered_data = numpy.zeros(data.shape, dtype=numpy.float) self.data_buffer = numpy.zeros(data.shape, dtype=numpy.float)
[docs] def calc_filter_kernel(self, data): """ Calculate filter kernel """ self.sampling_frequency = data.sampling_frequency if len(self.pass_band) == 2: # highpass with spectral inversion try: highpass_kernel = - scipy.signal.firwin(numtaps = self.taps, cutoff = float(self.pass_band[0])*2/self.sampling_frequency, width = self.width, window = self.window) # lowpass lowpass_kernel = scipy.signal.firwin(numtaps = self.taps, cutoff = float(self.pass_band[1])*2/self.sampling_frequency, width = self.width, window = self.window) except TypeError: highpass_kernel = - scipy.signal.firwin(N = self.taps-1, cutoff = float(self.pass_band[0])*2/self.sampling_frequency, width = self.width, window = self.window) # lowpass lowpass_kernel = scipy.signal.firwin(N = self.taps-1, cutoff = float(self.pass_band[1])*2/self.sampling_frequency, width = self.width, window = self.window) lowpass_kernel[self.taps/2] = lowpass_kernel[self.taps/2] + 1 bandpass_kernel = - (highpass_kernel+lowpass_kernel) bandpass_kernel[self.taps/2] = bandpass_kernel[self.taps/2] + 1 self.filter_kernel = bandpass_kernel elif len(self.pass_band) == 1: try: lowpass_kernel = \ scipy.signal.firwin(numtaps = self.taps, cutoff = float(self.pass_band[0])*2/self.sampling_frequency, width = self.width, window = self.window) except TypeError: lowpass_kernel = \ scipy.signal.firwin(N = self.taps-1, cutoff = float(self.pass_band[0])*2/self.sampling_frequency, width = self.width, window = self.window) self.filter_kernel = lowpass_kernel else: raise ValueError("No valid number of pass band arguments: " + \ "pass_band must be a tuple (band pass) or single value (low pass)") # the optional blockwise filtering if self.time_shift == "stream": self.internal_state = dict() for channel_index in xrange(data.shape[1]): self.internal_state[channel_index] = numpy.zeros(len(self.filter_kernel)-1)
[docs] def _execute(self, data): """ Apply filter to data and return the result """ # compute the FIR window which is required for the low pass filter, if # it not exists if self.filter_kernel is None: self.initialize_data_dependencies(data) self.time_offset = 0 assert(len(self.filter_kernel)>0), "Filter construction failed." if numpy.dtype('float64') != data.dtype: data = data.astype(numpy.float) data_array = data.view(numpy.ndarray) if self.time_shift == "middle": # append zeros to the selected channels and copy them to the data # buffer (e.g. just copy the relevant data to data_buffer that # contains zeros) self.time_offset = (len(self.filter_kernel)-1)/2 for channel_index in self.selected_channel_indices: self.data_buffer[0:data.shape[0],channel_index] = \ data_array[:,channel_index] elif self.time_shift == "end": # append zeros to the selected channels and copy them to the data # buffer (e.g. just copy the relevant data to data_buffer that # contains zeros) self.time_offset = (len(self.filter_kernel)-1) for channel_index in self.selected_channel_indices: self.data_buffer[0:data.shape[0],channel_index] = \ data_array[:,channel_index] else: self.data_buffer = data_array #Do the actual filtering if self.comp_type == 'normal': # do sequential filtering if self.time_shift == "stream": for channel_index in self.selected_channel_indices: (self.filtered_data[:,channel_index], self.internal_state[channel_index]) = \ scipy.signal.lfilter( self.filter_kernel, 1, x = self.data_buffer[:,channel_index], zi = self.internal_state[channel_index]) else: for channel_index in self.selected_channel_indices: self.filtered_data[:,channel_index] = \ scipy.signal.lfilter( self.filter_kernel, 1, x = self.data_buffer[:,channel_index]) else: raise ValueError("Computation type %s unknown" % self.comp_type) if self.time_shift == "middle" \ or self.time_shift=="end": # cut away the irrelevant data # (e.g. just copy relevant data back) for channel_index in self.selected_channel_indices: self.shrinked_data[0:data.shape[0],channel_index] = \ self.filtered_data[self.time_offset: self.filtered_data.shape[0]+self.time_offset, channel_index] result_time_series = TimeSeries.replace_data(data,self.shrinked_data) else: result_time_series = TimeSeries.replace_data(data,self.filtered_data) return result_time_series
[docs] def __setstate__(self, sdict): """ Restore object from its pickled state""" super(FIRFilterNode, self).__setstate__(sdict) self.filter_kernel = None self.filtered_data = None self.shrinked_data = None self.data_buffer = None self.internal_state = None
[docs]class IIRFilterNode(BaseNode): """ Band-pass or low-pass filtering with a direct form IIR filter .. todo:: Check if ``pass_band`` frequencies are higher than sampling frequency. .. todo:: Check why generic unittest does not work with a ``pass_band`` interval of [0, 0.4] **Parameters** :pass_band: The pass band. Tuple for band pass, single value for low pass filtering. :pass_band_loss: Allowed pass band loss in dB. (*optional, default: 0.5*) :stop_band_rifle: Allowed remaining stop band rifle in dB. (*optional, default: 60*) :ftype: Type of used filter, e.g. elliptic or butterworth. See scipy.signal.filter_design.iirdesign for further information. :selected_channels: selected_channels :comp_type: Type of computation, e.g. 'normal' (*optional, default: 'normal'*) **Exemplary Call** .. code-block:: yaml - node : IIRBandPassFilter parameters : pass_band : [0.4] comp_type : "normal" window : "hamming" taps : 3 skip : 0 :Author: Hendrik Woehrle (hendrik.woehrle@dfki.de) """
[docs] def __init__(self, pass_band, pass_band_loss = 0.5, stop_band_rifle = 60, ftype = 'ellip', selected_channels = None, comp_type = 'normal', **kwargs): super(IIRFilterNode, self).__init__(**kwargs) self.set_permanent_attributes(pass_band = pass_band, pass_band_loss = pass_band_loss, stop_band_rifle = stop_band_rifle, ftype = ftype, selected_channels = selected_channels, comp_type = comp_type, filter_kernel = None) self.comp_hw = kwargs.pop('comp_hw', None)
[docs] def calc_filter_kernel(self, data): """ Calculate filter kernel """ self.sampling_frequency = data.sampling_frequency if len(self.pass_band) == 1: wp = self.pass_band[0] * 2. * 0.9 / self.sampling_frequency ws = self.pass_band[0] * 2. * 1.0 / self.sampling_frequency elif len(self.pass_band) == 2: wp = [self.pass_band[0] * 2. * 1.0 / self.sampling_frequency, self.pass_band[1] * 2. * 0.9 / self.sampling_frequency] ws = [self.pass_band[0] * 2. * 0.9 / self.sampling_frequency, self.pass_band[1] * 2. * 1.0 / self.sampling_frequency] else: raise ValueError("No valid number of pass band arguments: pass_band"\ + " must be a tuple (band pass) or single value (low pass)") b,a = scipy.signal.iirdesign(wp, ws, self.pass_band_loss, self.stop_band_rifle, ftype=self.ftype) self.filter_kernel=[b,a]
[docs] def _execute(self, data): """ Apply filter to data and return the result .. todo:: check if other view is needed here please """ #Compute the FIR window which is required for the low pass filter, if it #not exists if self.filter_kernel is None: self.calc_filter_kernel(data) #Determine the indices of the channels which will be filtered self.selected_channel_names = self.selected_channels \ if self.selected_channels != None else data.channel_names self.selected_channel_indices = \ [data.channel_names.index(channel_name) for channel_name in \ self.selected_channel_names] if self.comp_type == 'normal': #normal filtering with scipy filtered_data = numpy.zeros(data.shape) for channel_index in self.selected_channel_indices: filtered_data[:,channel_index] = \ scipy.signal.lfilter(self.filter_kernel[0], self.filter_kernel[1], data[:,channel_index]) result_time_series = TimeSeries.replace_data(data, filtered_data) elif self.comp_type == 'mirror': #filtering with scipy, mirror the data beforehand on the right border data_mirrored = numpy.vstack((data,numpy.flipud(data))) pre_filtered_data = numpy.zeros(data_mirrored.shape) for channel_index in self.selected_channel_indices: pre_filtered_data[:,channel_index] = \ scipy.signal.lfilter(self.filter_kernel[0], self.filter_kernel[1], data_mirrored[:,channel_index]) pre_filtered_data[:,channel_index] = \ scipy.signal.lfilter(self.filter_kernel[0], self.filter_kernel[1], numpy.flipud(pre_filtered_data[:,channel_index])) result_time_series = \ TimeSeries.replace_data(data, pre_filtered_data[:len(data)]) else: raise ValueError("Computation type unknown") return result_time_series
[docs]class VarianceFilterNode(BaseNode): """ Take the variance as filtered data or standardize with moving variance and mean This node can perform a low-pass filtering using the variance, for example used to enhance the SNR of raw EMG Signals, or calculates a standardization with the variance. Filter: The variance is calculated for each sample at time point t using the following formula: .. math:: n*var(t) = n*var(t-1) + (x(t)-x(t-n)) * ((n-1)*x(t) + (n+1)*x(t-n) - 2*n*m(t-1)), where: - n is the width of the "filter", number of samples used for calculating the variance - Var(t) is the variance as time point t - x(t) is the sample at time point t - m(t) is the mean at time point t Standardization: The standardization is calculated for each sample at time point t using the following formula: .. math:: S(t) = \\frac{x(t)-m(t)}{Std(t)}, where: - S(t) = is the standardization for the sample at time point t - x(t) is the sample at time point t - m(t) is the mean at time point t - Std(t) is the standard deviation at time point t The standard deviation is calculated using the formula for the variance explained above, followed by a applying the normal square root **Parameters** :width: Size of the window used to calculate the variance, the higher the value the smoother is the resulting signal. The value is given in ms. .. todo:: Check if given value is too small. (*optional, default: 50*) :standardization: Flag which indicates if the filter should simply calculate the variance for a given size (False), or if a standardization should be calculated, meaning the mean for the given size is subtracted from the sample followed by a division by the variance. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : VarianceFilter parameters : width : 2000 standardization : False :Author: Marc Tabie (mtabie@informatik.uni-bremen.de) :Created: 2012/05/02 """
[docs] def __init__(self, width = 50, standardization = False, normalize = False, lenNormalize = 1, **kwargs): super(VarianceFilterNode, self).__init__(**kwargs) #Try to import the c-implementation of the variance and the standardization try: from pySPACE.missions.support.CPP.variance_tools import variance_tools as vt var_tools = True except: warnings.warn("The variance_tools module is not compiled\nIt is located in missions/support/CPP/variance_tools\nPlease compile it using python \"setup.py build_ext --inplace\"") var_tools = False self.set_permanent_attributes(ringbuffer = None, # List with ringbuffers for the last n samples used for calculating the variance variables = None, # List with the variables needed to calculate (0=variance, 1=mean) index = None, # List with the current indices for the ringbuffers width = width, # Window size of the variance standardization = standardization, # Use standardization? nChannels = None, # Number of channels var_tools = var_tools,# C-implementation of var/std normalize = normalize, lenNormalize = lenNormalize, ringbufferNormalize = None)
[docs] def _execute(self, data): # Initialize the ringbuffers and variables one for each channel if(self.ringbuffer == None): self.width /= 1000.0 self.width = int(self.width * data.sampling_frequency) self.nChannels = len(data.channel_names) self.ringbuffer = numpy.zeros((self.width,self.nChannels),dtype=numpy.double) self.variables = numpy.zeros((2,self.nChannels),dtype=numpy.double) self.index = numpy.zeros(self.nChannels,'i') if(self.width <= 1): warnings.warn("Width have to be greater than 1!\nThe data won't be changed!!!") return data if(self.normalize and self.ringbufferNormalize == None): self.ringbufferNormalize = numpy.zeros((self.lenNormalize,self.nChannels),dtype=numpy.double) # Convert the input data to double x = data.view(numpy.ndarray).astype(numpy.double) # Initialize the result data array filtered_data = numpy.zeros(x.shape) # Lists which are passed to the standadization # TODO: make self processing_filtered_data = None processing_ringbuffer = None processing_variables = None processing_index = None if(self.standardization): for channel_index in range(self.nChannels): # Copy the different data to the processing listst processing_filtered_data = numpy.array(filtered_data[:,channel_index],'d') processing_ringbuffer = numpy.array(self.ringbuffer[:,channel_index],'d') processing_variables = numpy.array(self.variables[:,channel_index],'d') processing_index = int(self.index[channel_index]) if self.var_tools: # Perform the standardization # The module vt (variance_tools) is implemented in c using # boost to wrap the code in Python # The module is located in trunk/library/variance_tools # and have to be compiled self.index[channel_index] = vt.standardization( processing_filtered_data, numpy.array( x[:,channel_index],'d'), processing_ringbuffer, processing_variables, self.width, processing_index) else: self.index[channel_index] = self.standardisation( processing_filtered_data, numpy.array( x[:,channel_index],'d'), processing_ringbuffer, processing_variables, self.width, processing_index) # Copy the processing lists back to the local variables filtered_data[:,channel_index] = processing_filtered_data self.ringbuffer[:,channel_index] = processing_ringbuffer self.variables[:,channel_index] = processing_variables else: for channel_index in range(self.nChannels): # Copy the different data to the processing listst processing_filtered_data = numpy.array( filtered_data[:,channel_index],'d') processing_ringbuffer = numpy.array( self.ringbuffer[:,channel_index],'d') processing_variables = numpy.array( self.variables[:,channel_index],'d') processing_index = int(self.index[channel_index]) if self.var_tools: # Perform the filtering with the variance # The module vt (variance_tools) is implemented in c using # boost to wrap the code in Python # The module is located in trunk/library/variance_tools # and have to be compiled self.index[channel_index] = vt.filter( processing_filtered_data, numpy.array( x[:,channel_index],'d'), processing_ringbuffer, processing_variables, self.width, processing_index) if self.normalize: lenData = len(processing_filtered_data) self.ringbufferNormalize[0:-lenData,channel_index] = \ self.ringbufferNormalize[lenData:,channel_index] self.ringbufferNormalize[-lenData:,channel_index] = \ processing_filtered_data[:] processing_filtered_data /= numpy.max((50, numpy.max( self.ringbufferNormalize[:,channel_index]))) else: self.index[channel_index] = self.variance( processing_filtered_data, numpy.array( x[:,channel_index],'d'), processing_ringbuffer, processing_variables, self.width, processing_index) # Copy the processing lists back to the local variables filtered_data[:,channel_index] = processing_filtered_data self.ringbuffer[:,channel_index] = processing_ringbuffer self.variables[:,channel_index] = processing_variables # Return the result result_time_series = TimeSeries.replace_data(data, filtered_data) return result_time_series
# Fallback functions if the c implementation of the variance filter and the # standardisation could not be loaded
[docs] def variance(self, outData, inData, ringbuffer, variables, width, index): #Some local variables for speed up ww = width * width wm1 = width - 1.0 wp1 = width + 1.0 ringbufferValue = 0.0 variable1 = 0.0 for i in range(len(inData)): # Speedup for array entries which are needed several times ringbufferValue = ringbuffer[index] inDataValue = inData[i] variable1 = variables[1] # Calculating the new variance variables[0] += (inDataValue - ringbufferValue) * ( (wm1 * inDataValue) + (wp1 * ringbufferValue) - (2.0 * variable1)) # Calculating the new mean value variables[1] = variable1 + (inDataValue-ringbufferValue) # Store the actual sample in the ringbuffer ringbuffer[index] = inDataValue; # Increment the ringbuffer index index = index + 1 if (index < wm1) else 0 # Calculate the standardization outData[i] = variables[0] / ww return index
[docs] def standardisation(self, outData, inData, ringbuffer, variables, width, index): # Some local variables for speed up ww = width * width wm1 = width - 1.0 wp1 = width + 1.0 ringbufferValue = 0.0 variable1 = 0.0 invalidValue = False invalidValueOccured = False for i in range(len(inData)): # Speedup for array entries which are needed several times ringbufferValue = ringbuffer[index]; inDataValue = inData[i] variable1 = variables[1] # Calculating the new variance variables[0] += (inDataValue - ringbufferValue) * ( (wm1 * inDataValue) + (wp1 * ringbufferValue) - (2.0 * variable1)) if variables[0] <= 0.0: variablesInvalidValueBuffer = variables[0] variables[0] = 1.0 invalidValue = True invalidValueOccured = True # Calculating the new mean value variables[1] = variable1 + (inDataValue-ringbufferValue); # Store the actual sample in the ringbuffer ringbuffer[index] = inDataValue; # Increment the ringbuffer index index = index + 1 if (index < wm1) else 0 # Calculate the standardization outData[i] = (inDataValue - (variables[1] / width)) / \ math.sqrt(variables[0] / ww) \ if (math.sqrt(variables[0] / ww) != 0.0) else 0.0 if invalidValue == True: variables[0] = variablesInvalidValueBuffer invalidValue = False if invalidValueOccured: self._log( "OnlineStandardization:: Warning: Prevented division" + " by zero during standardization!", level=logging.WARNING) return index
[docs]class TkeoNode(BaseNode): """ Calculate the energy of a signal with the Teager Kaiser Energy Operator (TKEO) as new signal This is a quadratic filter with the formula: .. math:: x_{i-1}^2 - x_{i-2} \\cdot x_i The formula is taken from the following publication:: Kaiser J. F. (1990) On a simple algorithm to calculate 'energy' of a signal. In Proceedings: International Conference on Acoustics, Speech, and Signal Processing (ICASSP-90) Pages 381-384 (http://ieeexplore.ieee.org/stamp/stamp.jsp?tp=&arnumber=115702) For processing EMG-Data a high pass filter at 20 Hz before the node and a low pass filter at 50 Hz after the node is recommended. **Parameters** :selected_channels: A list of channel names the algorithm should work with. I.e. the EMG channel names. If this parameter is not specified, all channels are used. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : TKEO parameters : selected_channels : ["C3", "C4"] :Author: Marc Tabie (mtabie@informatik.uni-bremen.de) :Created: 2012/05/02 """
[docs] def __init__(self, selected_channels= None, **kwargs): super(TkeoNode, self).__init__(**kwargs) self.set_permanent_attributes(selected_channels = selected_channels, old_data = None, selected_channel_indices = None)
[docs] def _execute(self, x): """ Compute the energy of the given signal x using the TKEO """ # Determine the indices of the channels which will be filtered # Done only once... if self.selected_channel_indices is None: self.selected_channels = self.selected_channels \ if self.selected_channels != None else x.channel_names self.selected_channel_indices = [x.channel_names.index(channel_name) \ for channel_name in self.selected_channels] self.old_data = numpy.zeros((2,len(self.selected_channel_indices))) filtered_data = numpy.zeros(x.shape) channel_counter = -1 for channel_index in self.selected_channel_indices: channel_counter += 1 for i in range(len(x)): if i == 0: filtered_data[i][channel_index] = \ math.pow(self.old_data[1][channel_counter],2) - ( self.old_data[0][channel_counter] * x[0][channel_index]) elif i == 1: filtered_data[i][channel_index] = \ math.pow(x[0][channel_index],2) - ( self.old_data[1][channel_counter] * x[1][channel_index]) else: filtered_data[i][channel_index] = \ math.pow(x[i-1][channel_index],2) - ( x[i-2][channel_index] * x[i][channel_index]) self.old_data[0][channel_counter] = x[-2][channel_index] self.old_data[1][channel_counter] = x[-1][channel_index] result_time_series = TimeSeries.replace_data(x, filtered_data) return result_time_series
_NODE_MAPPING = {"Simple_Low_Pass_Filter": SimpleLowPassFilterNode, "High_Pass_Filter": HighPassFilterNode, "FFT_Band_Pass_Filter": FFTBandPassFilterNode, "BandPassFilter": FIRFilterNode, "LowPassFilter": FIRFilterNode, "FIRBandPassFilter": FIRFilterNode, "FIRLowPassFilter": FIRFilterNode, "IIRBandPassFilter": IIRFilterNode, "IIRLowPassFilter": IIRFilterNode, "TKEO": TkeoNode, }