Source code for pySPACE.missions.nodes.preprocessing.subsampling

# This Python file uses the following encoding: utf-8
# The upper line is needed for the copyright symbol in this module.
""" Reduce sampling rate of the :class:`~pySPACE.resources.data_types.time_series.TimeSeries` by a specified fraction

Here different combinations with filters are possible.

.. todo:: Move gcd function to better place.
"""

from scipy.interpolate import interp1d
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.missions.nodes.decorators import ChoiceParameter, BooleanParameter
from pySPACE.missions.nodes.preprocessing import filtering
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.tools import prime_factors

from numpy import linspace

import warnings

import logging
import numpy
import scipy
import scipy.signal


[docs]def gcd(a, b): """ Return the greatest common divisor of a and b. :Input: * a -- an integer * b -- an integer :Output: * an integer -- the gcd of a and b Examples: >>> gcd(97,100) 1 >>> gcd(97 * 10**15, 19**20 * 97**2) 97L © William Stein, 2004 http://modular.math.washington.edu/ent/ent_py """ if a < 0: a = -a if b < 0: b = -b if a == 0: return b if b == 0: return a while b != 0: (a, b) = (b, a % b) return a
[docs]class SubsamplingNode(BaseNode): """ Downsampling with a simple low pass filter This is done by upsampling with a linear interpolation and downsampling with a corresponding low pass filter. **Exemplary Call** .. code-block:: yaml - node : Subsampling parameters : target_frequency : 1.0 :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Created: 2008/08/25 :Revised: 2010/02/25 by Mario Krell """
[docs] def __init__(self, target_frequency, **kwargs): super(SubsamplingNode, self).__init__(**kwargs) self.set_permanent_attributes(target_frequency = target_frequency) warnings.warn("Use Decimation to reduce the sampling rate",DeprecationWarning)
[docs] def _execute(self, x): """ Subsample the given data x and return a new time series """ # Compute the upsampling and downsampling factor source_frequency = int(round(x.sampling_frequency * 100)) target_frequency = int(round(self.target_frequency * 100)) reduce_fraction_factor = gcd(source_frequency, target_frequency) upsampling_factor = target_frequency / reduce_fraction_factor downsampling_factor = source_frequency /reduce_fraction_factor # Upsampling if upsampling_factor == 1: upsampled_time_series = x else: # We make a linear interpolation for upsampling as an easy version. # This should be enough because of the following Lowpassfilter. # It is maybe a better version to construction a C^1 spline, # because it prevents artifacts in frequency, but this would # decrease processing speed significantly. # The new Data Array for the upsampled data # The last timepoint does not bring additional points. # This results in the -1+1 calculation. self._log("Using upsampling.", level=logging.WARNING) interpolated_time_series = numpy.zeros((upsampling_factor*(len(x)-1)+1, len(x.channel_names))) # Array that corresponds to the x value of the fictive function # (original data) time = linspace(0, len(x)-1, len(x)) # Array that corresponds to the x value of the fictive function # (upsampled data) newTime = linspace(0, len(x)-1, len(interpolated_time_series)) # Linear interpolation of each channel for channel_name in x.channel_names: channel_index = x.channel_names.index(channel_name) f = interp1d(time, x[:,channel_index]) interpolated_time_series[:,channel_index] = f(newTime) upsampled_time_series = TimeSeries.replace_data(x, interpolated_time_series) # Low Pass filtering: According to Shannon-Nyquist's sampling theorem, # we should only retain frequency components below 1/2*target_frequency # We multiply with 0.45 instead of 0.5 because of the finite window # length lpf_node = filtering.SimpleLowPassFilterNode(cutoff_frequency = self.target_frequency * 0.45) filtered_time_series = lpf_node.execute(upsampled_time_series) # Downsampling (can be achieved by a simple re-striding) # Note: We have to create a new array since otherwise the off-strided # data remains in memory downsampled_data = numpy.array(filtered_time_series[::downsampling_factor, :]) downsampled_time_series = TimeSeries.replace_data(x, downsampled_data) downsampled_time_series.sampling_frequency = self.target_frequency # # Uncomment for graphical analyzation # import pylab # pylab.figure(1) # pylab.plot(pylab.linspace(0.0, 1.0, x.shape[0]), # x[:,x.channel_names.index('Pz')], # label = 'Pz' + "_subsampled") # pylab.figure(2) # pylab.subplot(312) # pylab.specgram(x[:,x.channel_names.index('Pz')], Fs = x.sampling_frequency, # NFFT = 64, noverlap = 63) # pylab.colorbar() # pylab.show() return downsampled_time_series
[docs]class FFTResamplingNode(BaseNode): """ Downsampling with a FFT filter **Exemplary Call** .. code-block:: yaml - node : Resampling parameters : target_frequency : 25.0 window : "nut" # optional, window to apply before downsampling mirror : True # optional, to make the data periodic before applying the FFT possible windows are: 'flattop' - 'flat', 'flt' 'boxcar' - 'ones', 'box' 'triang' - 'traing', 'tri' 'parzen' - 'parz', 'par' 'bohman' - 'bman', 'bmn' 'blackmanharris' - 'blackharr', 'bkh', 'nuttall' - 'nutl', 'nut' 'barthann' - 'brthan', 'bth' 'blackman' - 'black', 'blk' 'hamming' - 'hamm', 'ham' 'bartlett' - 'bart', 'brt' 'hanning' - 'hann', 'han' ('kaiser', beta) - 'ksr' ('gaussian', std) - 'gauss', 'gss' ('general gauss', power, width) - 'general', 'ggs' ('slepian', width) - 'slep', 'optimal', 'dss' :Author: Mario Krell (Mario.Krell@dfki.de) :Created: 2010/02/25 """
[docs] def __init__(self, target_frequency, window=None, mirror =False, **kwargs): super(FFTResamplingNode, self).__init__(**kwargs) self.set_permanent_attributes(target_frequency = target_frequency, window = window, new_len = 0, mirror=mirror)
[docs] def _execute(self, data): """ Subsample the given data and return a new time series """ if self.new_len == 0 : self.new_len = int(round(self.target_frequency*len(data)/(1.0*data.sampling_frequency))) if not self.mirror: downsampled_time_series = \ TimeSeries.replace_data(data, scipy.signal.resample(data, self.new_len, t=None, axis=0, window=self.window)) else: downsampled_time_series = \ TimeSeries.replace_data(data, scipy.signal.resample(numpy.vstack((data,numpy.flipud(data))), self.new_len*2, t=None, axis=0, window=self.window)[:self.new_len]) downsampled_time_series.sampling_frequency = self.target_frequency return downsampled_time_series
[docs]class DownsamplingNode(BaseNode): """ Pure downsampling without filter Reduce sampling rate by picking the values according to the downsampling factor. No low pass filter is used as needed for a proper decimation. **Exemplary Call** .. code-block:: yaml - node : Downsampling parameters : target_frequency : 2.5 phase_shift : 1 .. todo:: Perhaps it is better to not rely on frequencies, but on factors? :Author: Hendrik Woehrle (Hendrik.Woehrle@dfki.de) """
[docs] def __init__(self, target_frequency, **kwargs): super(DownsamplingNode, self).__init__(**kwargs) self.set_permanent_attributes(target_frequency = target_frequency, downsampling_factor = None)
[docs] def _execute(self, data): """ Subsample the given data data and return a new time series """ # Compute the downsampling factor source_frequency = int(round(data.sampling_frequency * 100)) target_frequency = int(round(self.target_frequency * 100)) reduce_fraction_factor = gcd(source_frequency, target_frequency) new_downsampling_factor = source_frequency /reduce_fraction_factor upsampling_factor = target_frequency / reduce_fraction_factor if not(upsampling_factor == 1): self._log("Upsampling used but not implemented!" +" Check target frequency and real frequency!", level=logging.WARNING) # reset filter if downsampling factor has changed if new_downsampling_factor != self.downsampling_factor: self.downsampling_factor = new_downsampling_factor if self.downsampling_factor <= 1: self._log("Inapplicable downsampling factor: " + str(self.downsampling_factor) +" Downsampling will be skipped", level=logging.WARNING) # omit downsampling, if inapplicable if self.downsampling_factor <= 1: return data # Downsampling (can be achieved by a simple re-striding) # Note: We have to create a new array since otherwise the off-strided # data remains in memory downsampled_data = numpy.array(data[::self.downsampling_factor, :]) downsampled_time_series = TimeSeries.replace_data(data, downsampled_data) downsampled_time_series.sampling_frequency = self.target_frequency return downsampled_time_series
@BooleanParameter("Multi_step") @ChoiceParameter("comp_type", ["normal", "parallel", "gpu"])
[docs]class DecimationBase(BaseNode): """ Decimate the signal to a given sampling frequency This class is the base class for the other decimation nodes and can not be used directly. The decimation is performed by doing a downsampling with a preceding filtering by a corresponding low pass filter beforehand. According to Shannon-Nyquist's sampling theorem, only frequencies below 1/2*target_frequency should be retained. For the decimation, one should use the FIR decimation (preferred) or IIR decimation, see below. **Parameters** :target_frequency: The frequency after the decimation. (*required*) :multi_step: The decimation is done in multiple sets, if the downsampling factor is high. The steps are chosen by the node. (*optional, default: True*) :comp_type: Type of computation for the filtering. One of the following Strings: 'normal', 'parallel', 'gpu'.... (*optional, default: 'normal'*) :norm_cutoff_freq: Cutoff frequency for the low pass filter. Normalized to the target frequency. (*optional, default: 'norm_cutoff_freq'*) :norm_trans_region_width: Width of transition region for the low pass filter. Normalized to the target frequency. (*optional, default: 'norm_trans_region_width'*) :filter_frequency: Optional frequency of the filter, if the value should not be chosen automatically. (*optional, default: None*) :Author: Hendrik Woehrle (Hendrik.Woehrle@dfki.de) """
[docs] def __init__(self, target_frequency, multi_step = True, comp_type = 'normal', norm_cutoff_freq = 1./3., norm_trans_region_width = 1./6., filter_frequency = None, **kwargs): super(DecimationBase, self).__init__(**kwargs) self.set_permanent_attributes(target_frequency = target_frequency, filter_frequency = filter_frequency, low_pass_nodes = None, downsampling_nodes = None, downsampling_factor = 0, comp_type = comp_type, multi_step = multi_step, multi_step_factors = None, trans_regions = None, norm_trans_region_width = norm_trans_region_width, norm_cutoff_freq = norm_cutoff_freq)
[docs] def compute_filter_factors(self): # compute the optimal downsamplinf factor according to Crochiere and Rabiner, # IEEE Trans. on Acoust. Speech and Signal proc, Vol. 23, N0.5, 1975 # "Optimum FIR digital filter implementations for decimation, # interpolation, and narrow-band filtering" # it is asummed that two steps are used # get filter transition region width f = self.norm_cutoff_freq - self.norm_trans_region_width # compute theoretically optimal filter d = self.downsampling_factor d_opt_num = 1. - numpy.sqrt(d*f/(2.-f)) d_opt_denom = 2. - f * (d + 1.) d_opt = 2 * self.downsampling_factor * d_opt_num / d_opt_denom # compute final (rounded) downsampling factors d_small = prime_factors.next_least_nice_integer_divisor(d, d_opt) # store multi step factors self.multi_step_factors.append(d_small) self.multi_step_factors.append(d/d_small) # use biggest factor first to save computation time self.multi_step_factors.sort() self.multi_step_factors.reverse() #compute the individual transition regions norm_pass_band = 1 - self.norm_cutoff_freq - self.norm_trans_region_width # first (wide) transition region # full width between pass band in final filter response to # stopband of intermediate filter self.trans_regions.append(self.norm_cutoff_freq - norm_pass_band / self.multi_step_factors[0]) # second, more steep transition region self.trans_regions.append(self.norm_trans_region_width)
[docs] def initialize_filters(self, data): # if the decimation factor is to big, the decimation is performed in several steps self.low_pass_nodes = [] self.downsampling_nodes = [] self.trans_regions = [] self.multi_step_factors = [] if self.downsampling_factor > 10: self.compute_filter_factors() else: self.multi_step_factors.append(self.downsampling_factor) self.trans_regions.append(self.norm_trans_region_width) target_frequency = data.sampling_frequency # create all filters i = 0 for dec_factor, trans_reg_width in zip(self.multi_step_factors,self.trans_regions): # compute general target frequencies target_frequency = target_frequency / dec_factor filter_frequency = target_frequency * self.norm_cutoff_freq # in the last step, use (eventually) the manually specified frequency if (i+1) == len(self.multi_step_factors) and self.filter_frequency is not None: filter_frequency = self.filter_frequency i = i + 1 self.low_pass_nodes.append(self.create_filter( target_frequency=filter_frequency, downsampling_factor=dec_factor, transition_region_width = trans_reg_width)) self.downsampling_nodes.append(DownsamplingNode(target_frequency = target_frequency))
[docs] def create_filter(self, target_frequency, downsampling_factor, transition_region_width): return None
[docs] def _execute(self, data): """ Subsample the given data data and return a new time series """ # compute the downsampling factor, if needed (first one or changed) if data.sampling_frequency != self.downsampling_factor * self.target_frequency: self.downsampling_factor = data.sampling_frequency / self.target_frequency if int(self.downsampling_factor) != self.downsampling_factor: self.downsampling_factor = int(self.downsampling_factor) self._log("Upsampling not implemented."+ " Downsampling factor needs to be an integer.", level=logging.WARNING) if self.downsampling_factor <= 1: self._log("Inapplicable downsampling factor: " + str(self.downsampling_factor) +" Downsampling will be skipped", level=logging.WARNING) else: self.initialize_filters(data) # omit downsampling, if inapplicable if self.downsampling_factor <= 1: return data for filter, downsampler in zip(self.low_pass_nodes,self.downsampling_nodes): data = filter.execute(data) data = downsampler.execute(data) return data
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir* """ if self.store: for index, node in enumerate(self.low_pass_nodes): node.store_state(result_dir, index) super(DecimationBase, self).store_state(result_dir, index)
[docs]class DecimationIIRNode(DecimationBase): """ Downsampling with a preceding IIR filter The decimation is performed by doing a downsampling with a preceding filtering by a corresponding low pass filter beforehand. According to Shannon-Nyquist's sampling theorem, only frequencies below 1/2*target_frequency should be retained. The decimation is applied in multiple steps, if the sampling factor is too big. The Filering is done using a IIR filter. **Exemplary Call** .. code-block:: yaml - node : DecimationIIR parameters : target_frequency : 25 :Author: Hendrik Woehrle (Hendrik.Woehrle@dfki.de) """
[docs] def __init__(self, **kwargs): super(DecimationIIRNode, self).__init__(**kwargs)
[docs] def create_filter(self,target_frequency,downsampling_factor,transition_region_width): return filtering.IIRFilterNode([target_frequency], comp_type = self.comp_type)
@ChoiceParameter("comp_type", ["normal", "parallel"]) @ChoiceParameter("time_shift", ["normal", "middle", "end", "stream"]) @BooleanParameter("skipping")
[docs]class DecimationFIRNode(DecimationBase): """ Downsampling with a preceding FIR filter The decimation is performed by doing a downsampling with a preceding filtering by a corresponding low pass filter beforehand. According to Shannon-Nyquist's sampling theorem, only frequencies below 1/2*target_frequency should be retained. The decimation is applied in multiple steps, if the sampling factor is too big. The filtering procedure is applied by an FIR filter, and only for values that are significant due to the downsampling procedure. **Parameters** :comp_type: Computation type of the filter, see FIRFilterNode for further information. (*optional, default: 'normal'*) :skipping: If output samples should be skipped in the filtering process, because they are discarded in the downsampling process. (*optional, default: True*) :time_shift: Parameter time_shift of the filter, see FIRFilterNode for further information. (*optional, default: 'middle'*) **Exemplary Call** .. code-block:: yaml - node : DecimationFIR parameters : target_frequency : 25 :Author: Hendrik Woehrle (Hendrik.Woehrle@dfki.de) """ input_types=["TimeSeries"]
[docs] def __init__(self, comp_type = 'normal', skipping = False, time_shift = "middle", **kwargs): super(DecimationFIRNode, self).__init__(comp_type = comp_type, **kwargs) self.set_permanent_attributes(skipping=skipping, time_shift = time_shift)
[docs] def create_filter(self,target_frequency,downsampling_factor,transition_region_width): skip = 0 if self.skipping: skip = int(round(downsampling_factor-1)) return filtering.FIRFilterNode([target_frequency], comp_type=self.comp_type, width=transition_region_width, skip=skip, time_shift = self.time_shift, store = self.store)
_NODE_MAPPING = {"Resampling": FFTResamplingNode, "Decimation": DecimationFIRNode, "DecimationIIR": DecimationIIRNode, "Subsampling":SubsamplingNode, "Downsampling": DownsamplingNode}