Source code for pySPACE.tests.unittests.nodes.preprocessing.test_filtering

#!/usr/bin/python

""" Unittests which test filtering nodes

.. todo:: Implement tests for VarianceFilterNode

:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/08/22


"""

if __name__ == '__main__':
    import sys
    import os
    # The root of the code
    file_path = os.path.dirname(os.path.abspath(__file__))
    sys.path.append(file_path[:file_path.rfind('pySPACE') - 1])

import unittest

import numpy
import scipy
import time
import pylab

import logging
import pySPACE.missions.nodes.preprocessing.filtering as filtering
import pySPACE.tests.utils.data.test_data_generation as test_data_generation
import pySPACE.tests.generic_unittest as gen_test

test_ts_generator = test_data_generation.TestTimeSeriesGenerator()
noise_generator = test_data_generation.GaussianNoise()

def _fourierSpectrum(time_series, channel):
    fourier_spectrum = numpy.absolute(numpy.fft.fft(time_series[25:-25, channel])) # The first and the last data points are not trustworthy for filtering
    return fourier_spectrum[:len(fourier_spectrum)/2]

[docs]class SimpleLowPassFilterTestCase(unittest.TestCase): """ Test for SimpleLowPassFilterNode """
[docs] def setUp(self): self.sampling_frequency = 100 self.time_series \ = test_ts_generator.generate_test_data(channels=8, time_points=8096, sampling_frequency=self.sampling_frequency, function \ = test_data_generation.ChannelDependentSine(sampling_frequency=self.sampling_frequency))
[docs] def test_specgram_low_pass(self): time_points = 10000 sampling_frequency = 100.0 data = numpy.zeros((time_points,1)) for time_index in range(time_points): data[time_index,0] = numpy.sin(2.0 * numpy.pi * 5.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 15.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 25.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 35.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 45.0 * (time_index / sampling_frequency)) #Generate a time series build out of the data from pySPACE.resources.data_types.time_series import TimeSeries test_data = TimeSeries(input_array = data, channel_names = ["test_channel_1"], sampling_frequency = sampling_frequency, start_time = 0, end_time = float(time_points) / sampling_frequency) lpf_node = filtering.SimpleLowPassFilterNode(cutoff_frequency = 20.0) filtered_time_series = lpf_node.execute(test_data) lpf_node_fir = filtering.FIRFilterNode([20.0]) filtered_time_series_fir = lpf_node_fir.execute(test_data)
# import pylab # pylab.figure(0) # pylab.specgram(test_data[:, 0], Fs = sampling_frequency) # pylab.colorbar() # pylab.figure(1) # pylab.specgram(filtered_time_series[:, 0], Fs = sampling_frequency) # pylab.colorbar() # pylab.figure(2) # pylab.specgram(filtered_time_series_fir[:, 0], Fs = sampling_frequency) # pylab.colorbar() # pylab.show()
[docs] def test_specgram_band_pass(self): time_points = 10000 sampling_frequency = 100.0 data = numpy.zeros((time_points,1)) for time_index in range(time_points): data[time_index,0] = numpy.sin(2.0 * numpy.pi * 5.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 15.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 25.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 35.0 * (time_index / sampling_frequency)) data[time_index,0] += numpy.sin(2.0 * numpy.pi * 45.0 * (time_index / sampling_frequency)) pass_band=(20.,30.) #Generate a time series build out of the data from pySPACE.resources.data_types.time_series import TimeSeries test_data = TimeSeries(input_array = data, channel_names = ["test_channel_1"], sampling_frequency = sampling_frequency, start_time = 0, end_time = float(time_points) / sampling_frequency) lpf_node = filtering.FFTBandPassFilterNode(pass_band=pass_band) filtered_time_series = lpf_node.execute(test_data) lpf_node_fir = filtering.FIRFilterNode(pass_band=pass_band) filtered_time_series_fir = lpf_node_fir.execute(test_data) lpf_node_fir2 = filtering.FIRFilterNode(pass_band=pass_band,window='hann') filtered_time_series_fir2 = lpf_node_fir2.execute(test_data) lpf_node_iir = filtering.IIRFilterNode(pass_band=pass_band,stop_band_rifle=90) filtered_time_series_iir = lpf_node_iir.execute(test_data)
[docs] def test_low_pass_filtering(self): lpf_node = filtering.SimpleLowPassFilterNode(cutoff_frequency = 4.0) filtered_time_series = lpf_node.execute(self.time_series) self.assert_(id(self.time_series) != id(filtered_time_series)) # The object should be different! #Channels with frequencies significantly below the cutoff frequencies should remain nearly unchanged self.assertAlmostEqual(max(_fourierSpectrum(filtered_time_series, 0)) / max(_fourierSpectrum(self.time_series, 0)), 1.0, places = 1) pylab.plot(_fourierSpectrum(filtered_time_series, 0)) #Channels with frequencies significantly above the cutoff frequencies should be nearly completely removed self.assertAlmostEqual(max(_fourierSpectrum(filtered_time_series, 7)) /max(_fourierSpectrum(self.time_series, 7)), 0.0, places = 1)
[docs]class FFTBandPassFilterTestCase(unittest.TestCase): """ Test for FFTBandPassFilterNode """
[docs] def setUp(self): self.sampling_frequency = 100 self.time_series \ = test_ts_generator.generate_test_data(channels=8, time_points=8096, sampling_frequency=self.sampling_frequency, function \ = test_data_generation.ChannelDependentSine(sampling_frequency=self.sampling_frequency))
[docs] def test_fft_band__pass_filtering(self): bpf_node = filtering.FFTBandPassFilterNode(pass_band = (3.0,5.0)) filtered_time_series = bpf_node.execute(self.time_series) self.assert_(id(self.time_series) != id(filtered_time_series)) # The object should be different! #Channels with frequencies within the pass band should remain nearly unchanged self.assertAlmostEqual(max(_fourierSpectrum(filtered_time_series, 3)) / max(_fourierSpectrum(self.time_series, 3)), 1.0, places = 1) #Channels with frequencies significantly outside the pass band should be nearly completely removed self.assertAlmostEqual(max(_fourierSpectrum(filtered_time_series, 6)) /max(_fourierSpectrum(self.time_series, 6)), 0.0, places = 1)
[docs]class FIRFilterTestCase(unittest.TestCase): """ Test for the band pass filter node """
[docs] def setUp(self): self.logger = logging.getLogger("TestLogger") self.pass_band = [10,20] self.filter_node = filtering.FIRFilterNode(self.pass_band) time_points = 1000 channels = 10 self.data = test_ts_generator.generate_test_data(channels,time_points,function=test_data_generation.Delta())
[docs] def tearDown(self): self.filter_node = None
[docs] def testTimeShifting(self): time_points = 10 channels = 1 import pySPACE.tests.utils.data.test_data_generation as tdg counter = tdg.Counter() test_data = test_ts_generator.generate_test_data(channels=channels, time_points=time_points, function=counter) filter_node = filtering.FIRFilterNode([10,20],time_shift="middle",taps=3) filter_node.initialize_data_dependencies(test_data) filter_node.filter_kernel = numpy.array([1,1,1])/3. filtered_data = filter_node.execute(test_data) # the result should be #[[ 0.33333333] # [ 1. ] # [ 2. ] # [ 3. ] # [ 4. ] # [ 5. ] # [ 6. ] # [ 8. ] # [ 7. ] # [ 5.66666667]] desired_result=numpy.array([1.0/3,1,2,3,4,5,6,7,8,17./3]) desired_result.shape=(10,1) #print filtered_data #print desired_result self.assertTrue(numpy.allclose(filtered_data,desired_result))
# Functions for plotting # Plot frequency and phase response
[docs]def mfreqz(b,a=1): pylab.subplot(211) plot_freqz_response(b,a) pylab.subplot(212) plot_phase_response(b,a)
[docs]def plot_freqz_response(b,a=1): w,h = scipy.signal.freqz(b,a) h_dB = 20 * pylab.log10 (abs(h)) pylab.plot(w/max(w),h_dB) pylab.ylim(-150, 5) pylab.ylabel('Magnitude (db)') pylab.xlabel(r'Normalized Frequency (x$\pi$rad/sample)') pylab.title(r'Frequency response')
[docs]def plot_freqz_response_lin(b,a=1): w,h = scipy.signal.freqz(b,a) pylab.plot(w/max(w),abs(h)) pylab.ylim(0, 1.1) pylab.ylabel('Magnitude') pylab.xlabel(r'Normalized Frequency (x$\pi$rad/sample)') pylab.title(r'Frequency response')
[docs]def plot_phase_response(b,a=1): w,h = scipy.signal.freqz(b,a) h_Phase = pylab.unwrap(numpy.arctan2(numpy.imag(h),numpy.real(h))) pylab.plot(w/max(w),h_Phase) pylab.ylabel('Phase (radians)') pylab.xlabel(r'Normalized Frequency (x$\pi$rad/sample)') pylab.title(r'Phase response') pylab.subplots_adjust(hspace=0.5)
[docs]def plot_kernel(b,a=1): pylab.plot(range(0,len(b)),b) pylab.ylabel('Coefficient value') pylab.xlabel(r'tap') pylab.title(r'Kernel structure')
[docs]def plot_time_series_response(x,b,a=1): import scipy.signal y = scipy.signal.lfilter(b,a,x) x_zero_tapped = numpy.append(numpy.zeros(len(y)-len(x)),x) pylab.subplot(211) pylab.plot(range(0,len(x_zero_tapped)),x_zero_tapped) pylab.subplot(212) pylab.plot(range(0,len(y)),y) return y
# Plot step and impulse response
[docs]def impz(b,a=1): l = len(b) impulse = numpy.repeat(0.,l); impulse[0] =1. x = numpy.arange(0,l) response = scipy.signal.lfilter(b,a,impulse) pylab.subplot(211) pylab.stem(x, response) pylab.ylabel('Amplitude') pylab.xlabel(r'n (samples)') pylab.title(r'Impulse response') pylab.subplot(212) step = numpy.cumsum(response) pylab.stem(x, step) pylab.ylabel('Amplitude') pylab.xlabel(r'n (samples)') pylab.title(r'Step response') pylab.subplots_adjust(hspace=0.5)
[docs]def plot_fir_filter(b,a=1): pylab.subplot(211) plot_kernel(b,a) pylab.subplot(212) plot_freqz_response(b,a)
#pylab.subplot(313) #plot_phase_response(b) #class FilterPlayground(unittest.TestCase): # # def testFilterPlayground(self): # # create a signal # # sampled at 5000Hz # sampling_rate = 4096 # # for 2s # end_time = 1. # # time points # x = numpy.arange(0,end_time,1.0/sampling_rate,dtype=numpy.float64) # theta = 2 * numpy.pi * x # # # the actual signal # y = numpy.sin(theta*10) + numpy.sin(theta*20) + numpy.sin(theta*300) # # e = 2 * scipy.randn(sampling_rate) # # print x.shape # print y.shape # print e.shape # # y = y+e # # pylab.plot(x,abs(y)) # pylab.savefig("signal") # pylab.figure() # # # perform 1024 tap fft of the signal # fft_order = 4096 # Y = scipy.fft(y,fft_order) # # and calculate at which points the fft is sampled # F = pylab.fftfreq(fft_order, d=1.0) # # pylab.rc('text', usetex=True) # pylab.xlabel(r'Normalized Frequency (x$\pi$rad/sample)') # pylab.ylabel(r'$|X(j\omega)|/N$') # Y_normalized = Y* 1./fft_order # pylab.plot(F,abs(Y_normalized)) # pylab.savefig("fft_1") # # downsampling_factor = 8 # y_down = numpy.arange(0,end_time/downsampling_factor,end_time/sampling_rate,dtype=numpy.float64) # # for i in numpy.arange(0,end_time*sampling_rate/downsampling_factor): # y_down[i]=y[i*downsampling_factor] # # print y_down.shape # # perform 1024 tap fft of the signal # fft_order_down = fft_order/downsampling_factor # Y_down = scipy.fft(y_down,fft_order_down) # # and calculate at which points the fft is sampled # F_down = pylab.fftfreq(fft_order_down, d=1.0) # # pylab.figure() # pylab.rc('text', usetex=True) # pylab.xlabel(r'Normalized Frequency (x$\pi$rad/sample)') # pylab.ylabel(r'$|X(j\omega)|/N$') # Y_down_normalized = Y_down * 1./fft_order_down # pylab.plot(F_down,abs(Y_down_normalized)) # pylab.savefig("fft_2") # filtering out frequencies above 15 Hz #fc = 256. #wc = fc * 2. / sampling_rate #taps = 256 #filter_coeffs = scipy.signal.firwin(taps,wc) # plot filter response #pylab.figure(figsize=(8,8)) #plot_fir_filter(filter_coeffs) #pylab.savefig("filter") #pylab.show() #filter the signal #y01 = scipy.signal.lfilter(filter_coeffs,1,y) #Y01=fft(y01,fft_order) #pylab.plot(F,Y01) if __name__ == '__main__': suite = unittest.TestLoader().loadTestsFromName('test_filtering') # Test the generic initialization of the class methods suite.addTest(gen_test.ParametrizedTestCase.parametrize( current_testcase=gen_test.GenericTestCase, node=filtering.SimpleLowPassFilterNode)) suite.addTest(gen_test.ParametrizedTestCase.parametrize( current_testcase=gen_test.GenericTestCase, node=filtering.HighPassFilterNode)) suite.addTest(gen_test.ParametrizedTestCase.parametrize( current_testcase=gen_test.GenericTestCase, node=filtering.FIRFilterNode)) suite.addTest(gen_test.ParametrizedTestCase.parametrize( current_testcase=gen_test.GenericTestCase, node=filtering.IIRFilterNode)) suite.addTest(gen_test.ParametrizedTestCase.parametrize( current_testcase=gen_test.GenericTestCase, node=filtering.TkeoNode)) unittest.TextTestRunner(verbosity=2).run(suite)