""" Extract statistical properties like moments or correlation coefficients
**Known issues**
No unit tests!
"""
import numpy
import scipy.stats
import copy
from matplotlib import mlab
import warnings
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.feature_vector import FeatureVector
[docs]class StatisticalFeatureNode(BaseNode):
""" Compute statistical features, e.g. central/raw moments, median, min etc.
This node computes statistical features like raw and central moments
of k-th order:
1. raw_moment_of_k_th_order
.. math::
\\frac{1}{n} \\cdot \\sum_{i=0}^n (x_i^k)
- k=1: mean;
- k=2: square root of value is quadratic mean
2. central_moment_of_k_th_order
.. math::
\\frac{1}{n} \\cdot sum_{i=0}^n (x_i - mean)^k
- k=1: 0;
- k=2: empirical variance;
- k=3: empirical skewness;
- k=4: kurtosis
In addition there could be computed: median, quadratic mean,
standard deviation, minimum/maximum amplitude and
the number of amplitudes above a given threshold.
**Parameters**
:raw_moment_order:
The highest order (as an integer number) of raw moments that will
be computed. For example:
:0: - no raw moment computation,
:1: - mean
:2: - mean *and* raw moment 2nd order
:3: - etc.
.. note::
Values greater then 2 might take long computation time and
are expected not to have any useful information.
(*optional, default: 1*)
:central_moment_order:
The highest order (as an integer number) of central moments that
will be computed. For example:
:1: - no central moment computation
:2: - empirical variance
:3: - empirical variance *and* empirical skewness
:etc.:
(*optional, default: 1*)
:std:
True or False. If True the standard deviation of each channel is
computed and retained as feature.
(*optional, default: False*)
:std_function:
A string of a Python *eval*able function. Specifies a function,
e.g. numpy.std, if the algorithm should not use use the one in
*standard_deviation*. Specification of this parameter makes only
sense if *std* is "True".
(*optional, default: None*)
:quadratic_mean:
True or False. If True the square root of the second raw moment
(quadratic mean) is computed and retained as feature.
(*optional, default: False*)
:median:
True or False. If True the median of each channel is computed
and retained as feature.
(*optional, default: False*)
:minimum:
True or False. If True the minimum value of each channel is stored
as a feature.
(*optional, default: False*)
:maximum:
True or False. If True the maximum value of each channel is stored
as a feature.
(*optional, default: False*)
:artifact_threshold:
A float value. The number of amplitudes within a window above the
given *artifact_threshold* is calculated and used as feature.
.. note::
The threshold depends on the actual resolution of the values.
Check header file (for EEG data).
.. todo:: Integrate resolution in TimeSeries data type and then scale.
(The input here should be in micro-volt.)
(*optional, default: None*)
**Known issues**
.. todo:: Try to use the methods on the whole data array
**Exemplary Call**
.. code-block:: yaml
-
node : Statistical_Features
parameters :
central_moment_order : 0
std : True
median : True
artifact_threshold : 500.0
:Author: Anett Seeland (anett.seeland@dfki.de)
:Created: 2009/10/06
:Refactoring: 2014/09/11 by Mario Michael Krell
"""
[docs] def __init__(self, raw_moment_order=1, central_moment_order=1,
std=False, std_function=None, quadratic_mean=False,
median=False, minimum=False, maximum=False,
artifact_threshold=None, **kwargs):
super(StatisticalFeatureNode, self).__init__(**kwargs)
self.set_permanent_attributes(raw_moment_order=raw_moment_order,
central_moment_order=central_moment_order,
std=std, std_function=std_function,
quadratic_mean=quadratic_mean,
median=median, minimum=minimum,
maximum=maximum,
artifact_threshold=artifact_threshold,
feature_names=None)
[docs] def raw_moment(self, one_channel_time_series, k):
"""Compute the k'th-order raw moment of a given channel.
**Arguments**
:one_channel_time_series:
a one dimensional time series
:k:
order of raw moment
"""
# raising int arrays to higher power can cause overflow!!!
# Hence we do the power-raising manually and NOT by "array ** k"
moment_sum = 0.0
for index, value in enumerate(one_channel_time_series):
power = value ** k
moment_sum += power
raw_moment = moment_sum / len(one_channel_time_series)
return raw_moment
[docs] def amplitudes_above(self, one_channel_time_series, threshold):
""" Return the number of values about the given threshold in the given time series.
**Arguments**
:window:
a tow dimensional time series
:threshold:
a number above the values are count.
"""
count = 0
for value in one_channel_time_series:
if value > threshold or value < -threshold:
count += 1
return count
[docs] def _execute(self, x):
""" Calculates statistical features """
# determine what has to be computed and initialize data structures
data = x.get_data()
if self.central_moment_order == 0:
self.central_moment_order = 1 # only for feature_size computation
feature_size = self.raw_moment_order * len(x.channel_names) + \
(self.central_moment_order-1) * len(x.channel_names) + \
self.std * len(x.channel_names) + \
self.quadratic_mean * len(x.channel_names) + \
self.median * len(x.channel_names) + \
self.minimum * len(x.channel_names) + \
self.maximum * len(x.channel_names)
if not self.artifact_threshold is None:
feature_size += 1
statistical_features = numpy.zeros((feature_size, ), numpy.float64)
# initialize the actual feature_index
feature_index = 0
# for every channel...
# TODO: Try to use the methods on the whole data array
for index in range(len(x.channel_names)):
current_channel = data[:, index]
# in these cases it is auspicious to compute and store variables
# cause we will need them again
if self.raw_moment_order > 0 or self.central_moment_order > 1 or \
self.std or self.quadratic_mean:
average = numpy.mean(current_channel)
if self.raw_moment_order > 1 or self.quadratic_mean:
# self.raw_moment(current_channel, 2)
second_raw_moment = scipy.stats.ss(current_channel)
# print second_raw_moment
if self.raw_moment_order > 0: # raw_moment_of_1_th_order needed?
# it's the mean, so don't compute it again
statistical_features[feature_index] = average
feature_index += 1 # update feature_index
if self.raw_moment_order > 1: # raw_moment_2nd_order needed?
# we have already computed it
statistical_features[feature_index] = second_raw_moment
feature_index += 1
# for the other orders of raw_moments
for order in range(3, self.raw_moment_order+1):
statistical_features[feature_index] = \
self.raw_moment(current_channel, order)
feature_index += 1
# central_moment
for order in range(2, self.central_moment_order+1):
statistical_features[feature_index] = \
scipy.stats.moment(current_channel, order)
feature_index += 1
# standard_deviation
if self.std:
statistical_features[feature_index] = \
numpy.std(current_channel)
feature_index += 1
# quadratic_mean
if self.quadratic_mean:
# we stored relevant results before
statistical_features[feature_index] = second_raw_moment ** 0.5
feature_index += 1
# median
if self.median:
statistical_features[feature_index] = \
numpy.median(current_channel)
feature_index += 1
# minimum
if self.minimum:
statistical_features[feature_index] = \
numpy.amin(current_channel)
feature_index += 1
# maximum
if self.maximum:
statistical_features[feature_index] = \
numpy.amax(current_channel)
feature_index += 1
if self.artifact_threshold is None:
statistical_features[-1] += self.amplitudes_above(
current_channel, self.artifact_threshold)
# if feature_names have to be determined
if self.feature_names is None:
# initialize data structure
self.feature_names = []
for name in x.channel_names:
# raw_moment
for order in range(1, self.raw_moment_order+1):
self.feature_names.append(
"RAW_MOMENT_%d_%s" % (order, name))
# central_moment
for order in range(2, self.central_moment_order+1):
self.feature_names.append(
"CENTRAL_MOMENT_%d_%s" % (order, name))
# standard_deviation
if self.std:
self.feature_names.append("STD_%s" % (name))
# quadratic_mean
if self.quadratic_mean:
self.feature_names.append("QUAD_MEAN_%s" %(name))
# median
if self.median:
self.feature_names.append("MEDIAN_%s" % (name))
# minimum
if self.minimum:
self.feature_names.append("MIN_%s" % (name))
# maximum
if self.maximum:
self.feature_names.append("MAX_%s" % (name))
if not self.artifact_threshold is None:
self.feature_names.append(
"AMP_ABOVE_%.2f" % (self.artifact_threshold))
feature_vector = FeatureVector(numpy.atleast_2d(statistical_features),
self.feature_names)
return feature_vector
[docs]class PearsonCorrelationFeatureNode(BaseNode):
""" Compute pearson correlation of all pairs of channels
This node computes for all pairs of channels the Pearson product-moment
correlation coefficient of certain time segments and
returns each of correlation coefficient as feature.
.. todo: Calculate maximum number of segments and catch wrong usage.
**Parameters**
:segments:
The number of segments the time series window is split.
(*optional, default: 1*)
:max_segment_shift:
If 0, only the same segments of the two channels are compared.
For n, each segment is also compared with the n,n-1,...,0 previous
and later segments of the other channel.
(*optional, default: 0*)
**Exemplary Call**
.. code-block:: yaml
-
node : Pearson_Correlation_Features
parameters :
segments : 3
max_segment_shift : 2
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2009/03/18
"""
[docs] def __init__(self, segments=1, max_segment_shift=0, *args, **kwargs):
super(PearsonCorrelationFeatureNode, self).__init__(*args, **kwargs)
assert(max_segment_shift < segments)
self.set_permanent_attributes(segments=segments,
segment_border_indices=None,
max_segment_shift=max_segment_shift)
[docs] def _execute(self, x):
# Compute the indices of the segment borders lazily when the data is
# known
if self.segment_border_indices == None:
datapoints = x.shape[0]
borders = [k * datapoints / (self.segments + 1)
for k in range(0, self.segments + 2)]
self.segment_border_indices = [(borders[i], borders[i + 2])
for i in range(self.segments)]
data = x.view(numpy.ndarray)
features = []
feature_names = []
# Iterate over all segment combinations:
for segment_index_channel1 in range(self.segments):
segment_borders1 = \
self.segment_border_indices[segment_index_channel1]
for segment_index_channel2 in range(0, min(self.segments,
segment_index_channel1 + self.max_segment_shift + 1)):
segment_borders2 = \
self.segment_border_indices[segment_index_channel2]
# Iterate over all channel pairs
for i, channel1_name in enumerate(x.channel_names):
channel1_index = x.channel_names.index(channel1_name)
for channel2_name in x.channel_names[i+1:]:
channel2_index = x.channel_names.index(channel2_name)
# Get segments whose correlation should be computed
segment1 = data[segment_borders1[0]:segment_borders1[1],
channel1_index]
segment2 = data[segment_borders2[0]:segment_borders2[1],
channel2_index]
# Bring segments to the same shape
if segment1.shape[0] != segment2.shape[0]:
min_shape = min(segment1.shape[0],
segment2.shape[0])
segment1 = segment1[0:min_shape]
segment2 = segment2[0:min_shape]
# Compute the pearson correlation of the two segments
correlation = scipy.corrcoef(segment1, segment2)[0,1]
features.append(correlation)
feature_names.append("Correlation_%s_%s_%ssec_%ssec_%s"
% (channel1_name, channel2_name,
segment_borders1[0] / x.sampling_frequency,
segment_borders1[1] / x.sampling_frequency,
segment_index_channel2 ))
feature_vector = \
FeatureVector(numpy.atleast_2d(features).astype(numpy.float64),
feature_names)
return feature_vector
[docs]class ClassAverageCorrelationFeatureNode(BaseNode):
""" Compute pearson correlation between channel segments and class averages
This node computes for all channels the Pearson product-moment
correlation coefficient between certain time segments and the class
averages. Then each of the correlation coefficients is returned as
feature.
**Parameters**
:segments:
The number of segments the time series window is split.
(*optional, default: 1*)
**Exemplary Call**
.. code-block:: yaml
-
node : Class_Average_Correlation_Features
parameters :
segments : 3
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2009/03/18
"""
[docs] def __init__(self, segments=1, *args, **kwargs):
super(ClassAverageCorrelationFeatureNode, self).__init__(*args,
**kwargs)
self.set_permanent_attributes(segments=segments,
segment_border_indices=None,
class_averages=dict(),
class_examples=dict())
[docs] def is_trainable(self):
""" Returns whether this node is trainable """
return True
[docs] def is_supervised(self):
""" Returns whether this node is trainable """
return True
[docs] def _execute(self, x):
# Compute the indices of the segment borders lazily when the data is
# known
if self.segment_border_indices == None:
datapoints = x.shape[0]
borders = [k * datapoints / (self.segments + 1)
for k in range(0, self.segments + 2)]
self.segment_border_indices = [(borders[i], borders[i + 2])
for i in range(self.segments)]
features = []
feature_names = []
# Iterate over all segments:
for segment_borders in self.segment_border_indices:
# Iterate over all channels
for channel_name in x.channel_names:
channel_index = x.channel_names.index(channel_name)
# Correlation of the channel to the class average
for label in self.class_averages.keys():
channel_seg_avg = self.class_averages[label] \
[segment_borders[0]:segment_borders[1], channel_index]
sample_seq = \
x[segment_borders[0]:segment_borders[1], channel_index]
correlation = scipy.corrcoef(channel_seg_avg,
sample_seq) # 0,1 or 1.0 doesn't matter
features.append(correlation)
feature_names.append("Pearson_%s_Class%s_%ssec_%ssec"
% (channel_name,
label,
segment_borders[0] / x.sampling_frequency,
segment_borders[1] / x.sampling_frequency))
# if segment_borders[0] == 14 and row == 0:
# print correlation
# import pylab
# pylab.plot(avg, label = ("Avg %s" % label))
# pylab.plot(x[:,row], label = "Sample")
# pylab.legend()
# pylab.show()
# raw_input()
# pylab.gca().clear()
feature_vector = \
FeatureVector(numpy.atleast_2d(features).astype(numpy.float64),
feature_names)
return feature_vector
[docs] def _train(self, x, label):
# Accumulate the examples for each class
if label not in self.class_averages.keys():
self.class_averages[label] = copy.deepcopy(x)
self.class_examples[label] = 1
else:
self.class_averages[label] += x
self.class_examples[label] += 1
[docs] def _stop_training(self):
for label in self.class_averages.keys():
# print "Examples for class %s: %s"
# % (label, self.class_examples[label])
self.class_averages[label] /= self.class_examples[label]
[docs]class CoherenceFeatureNode(BaseNode):
""" Compute pairwise coherence of two channels with *matplotlib.mlab.cohere*
**Parameters**
:frequency_band:
The frequency band which is used to extract features from the
spectrogram. If None, the whole frequency band is used.
(*Optional, default: None*)
**Exemplary Call**
.. code-block:: yaml
-
node : Coherence_Features
parameters :
frequency_band : [0.4,3.5]
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2009/03/18
"""
[docs] def __init__(self, frequency_band = None, frequency_resolution = None,
*args, **kwargs):
super(CoherenceFeatureNode, self).__init__(*args, **kwargs)
if frequency_band != None:
min_frequency, max_frequency = frequency_band
else:
min_frequency, max_frequency = (-numpy.inf, numpy.inf)
if frequency_resolution is None:
frequency_resolution = 1
warnings.warn("No initial frequency resolution set. Default"
"value of 1 used!")
self.set_permanent_attributes(min_frequency = min_frequency,
max_frequency = max_frequency,
frequency_resolution = frequency_resolution)
[docs] def _execute(self, x):
# Lazy computation of NFFT and noverlap
if not hasattr(self, "NFFT"):
# Compute NFFT to obtain the desired frequency resolution
# (if possible)
# self.NFFT has to be even
self.NFFT = int(round(0.5 * x.sampling_frequency / \
self.frequency_resolution) * 2)
self.noverlap = 0
# For each pair of channels, we compute the STFT
features = []
feature_names = []
for i, channel_name1 in enumerate(x.channel_names):
for j, channel_name2 in enumerate(x.channel_names[i + 1:]):
(Cxy, freqs) = mlab.cohere(x[:, i], x[:, i + 1 + j],
Fs = x.sampling_frequency,
NFFT = self.NFFT,
noverlap = self.noverlap)
# TODO: This would be more efficient without the explicit loop
for index1, freq in enumerate(freqs):
if not (self.min_frequency <= freq <= self.max_frequency):
continue
# Append as feature
features.append(Cxy[index1])
feature_names.append("Coherence_%s_%s_%.2fHz" %
(channel_name1, channel_name2,
freq))
feature_vector = \
FeatureVector(numpy.atleast_2d(features).astype(numpy.float64),
feature_names)
return feature_vector
# #UNCOMMENT FOR GRAPHICAL ANALYZATION
# def _train(self, x, label):
# #if label != "Target": return
# print label
#
# channel_name1 = 'CP2'
# channel_index1 = x.channel_names.index(channel_name1)
# channel_name2 = 'Pz'
# channel_index2 = x.channel_names.index(channel_name2)
#
# import pylab
# Cxy, Phase, freqs = pylab.mlab.cohere_pairs(x, [(channel_index1, channel_index2)],
# Fs = x.sampling_frequency,
# NFFT = self.NFFT,
# noverlap = self.noverlap)
#
# pylab.subplot(411)
# pylab.plot(freqs, Cxy[(channel_index1, channel_index2)])
# pylab.subplot(412)
# pylab.plot(freqs, Phase[(channel_index1, channel_index2)])
#
#
# pylab.subplot(421)
# pylab.plot(x[:, channel_index1])
#
# pylab.subplot(422)
# pylab.plot(x[:, channel_index2])
#
# pylab.show()
# raw_input()
# pylab.gca().clear()
_NODE_MAPPING = {"Pearson_Correlation_Features": PearsonCorrelationFeatureNode,
"Class_Average_Correlation_Features": ClassAverageCorrelationFeatureNode,
"Coherence_Features": CoherenceFeatureNode,
"Statistical_Features" : StatisticalFeatureNode}