Source code for pySPACE.tests.unittests.nodes.type_manipulation.test_type_conversion

#!/usr/bin/python

"""
This module contains unittests which test FeatureVector2TimeSeriesNode.

:Author: Titiruck Nuntapramote (titiruck.nuntapramote@dfki.de)
:Created: 2011/10/18
"""

import numpy as np
import unittest
import random

if __name__ == '__main__':
    import sys
    import os
    # The root of the code
    file_path = os.path.dirname(os.path.abspath(__file__))
    sys.path.append(file_path[:file_path.rfind('pySPACE') - 1])

from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.missions.nodes.feature_generation.time_domain_features import TimeDomainFeaturesNode
from pySPACE.missions.nodes.type_manipulation.type_conversion import *
from pySPACE.tests.utils.data.test_data_generation import *
from pySPACE.resources.data_types.prediction_vector import PredictionVector


[docs]class FeatureVector2TimeSeriesTestCase(unittest.TestCase): """ Test whether 2 TimeSeries are equal after converting using FeatureVector2TimeSeriesNode. :Author: Titiruck Nuntapramote (titiruck.nuntapramote@dfki.de) :Created: 2011/11/04 """
[docs] def setUp(self): ch_names = [("ch%s" % i) for i in range(10)] self.ts1 = TestTimeSeriesGenerator().generate_test_data( channels=10, channel_names=ch_names, time_points=1000, function=Sine(), sampling_frequency=50.0) fv1 = TimeDomainFeaturesNode()._execute(self.ts1) self.ts2 = FeatureVector2TimeSeriesNode()._execute(fv1) ch_names2 = [("ch%s" % i) for i in range(15)] self.ts3 = TestTimeSeriesGenerator().generate_test_data( channels=15, channel_names=ch_names2, time_points=500, function=GaussianNoise(), sampling_frequency=20.0) fv2 = TimeDomainFeaturesNode()._execute(self.ts3) self.ts4 = FeatureVector2TimeSeriesNode()._execute(fv2)
[docs] def test_sampling_frequency(self): self.assertEqual( (self.ts1).sampling_frequency, (self.ts2).sampling_frequency) self.assertEqual( (self.ts3).sampling_frequency, (self.ts4).sampling_frequency)
[docs] def test_channel_names(self): self.assertEqual( set(self.ts1.channel_names), set(self.ts2.channel_names)) self.assertEqual( set(self.ts3.channel_names), set(self.ts4.channel_names))
# Compare value to each list in TimeSeries.
[docs] def cmp_value(self, l1, l2): diff = np.array(l1, dtype='f4') - np.array(l2, dtype='f4') if not(diff.any()): return True return False
# Compare value in the whole TimeSeries
[docs] def cmp_TimeSeries(self, t1, t2): t1 = t1.view(numpy.ndarray) t2 = t2.view(numpy.ndarray) for i in range(len(t1)): val = self.cmp_value(t1[i][0], t2[i][0]) if not(val): return False return True
[docs] def test_TimeSeries(self): self.assertTrue(self.cmp_TimeSeries(self.ts1, self.ts2)) self.assertTrue(self.cmp_TimeSeries(self.ts3, self.ts4)) self.assertFalse(self.cmp_TimeSeries(self.ts1, self.ts4)) self.assertFalse(self.cmp_TimeSeries(self.ts3, self.ts1))
[docs]class Prediction2FeaturesTestCase(unittest.TestCase): """ Test Prediction2FeaturesNode :Author: Titiruck Nuntapramote (titiruck.nuntapramote@dfki.de) :Created: 2011/11/16 """
[docs] def setUp(self): ran1 = random.randint(2, 100) ran2 = random.randint(2, 100) list1 = range(ran1) list2 = range(ran2) self.pv1 = PredictionVector(prediction=1) self.pv2 = PredictionVector(prediction=2) self.pv3 = PredictionVector(prediction=list1) self.pv4 = PredictionVector(prediction=list2) self.fv1 = Prediction2FeaturesNode()._execute(self.pv1) self.fv2 = Prediction2FeaturesNode(name='test')._execute(self.pv2) self.fv3 = Prediction2FeaturesNode()._execute(self.pv3) self.fv4 = Prediction2FeaturesNode(name='test')._execute(self.pv4)
[docs] def test_array(self): self.assertEqual( self.fv1.view(numpy.ndarray), self.pv1.view(numpy.ndarray), msg="fv1 is incorrect") self.assertEqual( self.fv2.view(numpy.ndarray), self.pv2.view(numpy.ndarray), msg="fv2 is incorrect") diff = np.array(self.fv3) - np.array(self.pv3) self.assertTrue(not(diff.all()), msg="fv3 is incorrect") diff2 = np.array(self.fv4) - np.array(self.pv4) self.assertTrue(not(diff2.all()), msg="fv4 is incorrect")
[docs] def test_name(self): self.assertEqual( self.fv1.feature_names, ["prediction"], msg="fv1 is incorrect") self.assertEqual( self.fv2.feature_names, ["testprediction"], msg="fv1 is incorrect") # compare size self.assertEqual( self.fv3.shape[1], len(self.fv3.feature_names), msg="len(fv3.feature_names) is incorrect") self.assertEqual( self.fv4.shape[1], len(self.fv4.feature_names), msg="len(fv4.feature_names) is incorrect") msg = '' equal = True for i in range(len(self.fv3.feature_names)): if self.fv3.feature_names[i] != "prediction_" + str(i): equal = False msg = str(i) break self.assertTrue(equal, "fv3.feature_names is incorrect at" + msg) equal2 = True for i in range(len(self.fv4.feature_names)): if self.fv4.feature_names[i] != "testprediction_" + str(i): equal = False msg = str(i) break self.assertTrue(equal2, "fv4.feature_names is incorrect at" + msg)
[docs]class Features2PredictionTestCase(unittest.TestCase): """ Test Features2PredictionNode :Author: Titiruck Nuntapramote (titiruck.nuntapramote@dfki.de) :Created: 2011/11/24 """
[docs] def setUp(self): ran1 = random.randint(2, 100) ran2 = random.randint(2, 100) list1 = range(-10, ran1) list2 = range(-5, ran2) self.fv1 = FeatureVector(np.array([list1])) self.fv2 = FeatureVector(np.array([list2])) self.class_labels = ['standard', 'target'] self.pv1 = Features2PredictionNode( self.class_labels)._execute( self.fv1) self.pv2 = Features2PredictionNode( self.class_labels)._execute( self.fv2) self.classification = lambda x: self.class_labels[0] if x <= 0 \ else self.class_labels[1] self.pv1_label = map( self.classification, self.pv1.view(numpy.ndarray)[0, :]) self.pv2_label = map( self.classification, self.pv2.view(numpy.ndarray)[0, :])
[docs] def test_array(self): diff = np.array(self.fv1) - np.array(self.pv1) self.assertTrue(not(diff.all()), "pv1 is incorrect") diff2 = np.array(self.fv2) - np.array(self.pv2) self.assertTrue(not(diff2.all()), "pv2 is incorrect")
[docs] def test_label(self): self.assertEqual( self.pv1_label, self.pv1.label, msg="pv1 label is incorrect") self.assertEqual( self.pv2_label, self.pv2.label, msg="pv1 label is incorrect")
[docs]class Feature2MonoTimeSeriesTestCase(unittest.TestCase): """ Some simple tests, if conversion works as expected """
[docs] def setUp(self): """ Define basic needed FeatureVector instances """ self.x = FeatureVector([[0, 1, 2, 3, 4, 5]], ["a", "b", "ab", "cb", "c4", "abc"]) self.y = FeatureVector([[0, 1, 2, 3, 4, 5]], ["a_7ms", "b_7ms", "ab_7ms", "cb_7ms", "c4_7ms", "abc_7ms"]) self.tx = TimeSeries([[0, 1, 2, 3, 4, 5]], channel_names=["a", "b", "ab", "cb", "c4", "abc"], sampling_frequency=1) self.ty = TimeSeries([[0, 1, 2, 3, 4, 5]], channel_names=["a_7ms", "b_7ms", "ab_7ms", "cb_7ms", "c4_7ms", "abc_7ms"], sampling_frequency=1)
[docs] def test_conversion(self): """ Two simple conversion tests """ assert( Feature2MonoTimeSeriesNode()._execute(self.x) == self.tx), \ "Transformation to TimeSeries failed" assert( Feature2MonoTimeSeriesNode()._execute(self.y) == self.ty), \ "Transformation to TimeSeries failed!"
[docs]class MonoTimeSeries2FeatureTestCase(Feature2MonoTimeSeriesTestCase):
[docs] def test_conversion(self): """ Two simple conversion tests""" assert( MonoTimeSeries2FeatureNode()._execute( self.tx) == self.x), "Transformation to FeatureVector failed" assert( MonoTimeSeries2FeatureNode()._execute( self.ty) == self.y), "Transformation to FeatureVector failed!"
if __name__ == '__main__': suite = unittest.TestLoader().loadTestsFromName('test_type_conversion') unittest.TextTestRunner(verbosity=2).run(suite)