""" 2d array of channels x time series for windowed time series"""
import numpy
import warnings
from pySPACE.resources.data_types import base
[docs]class TimeSeries(base.BaseData):
""" Time Series object
Represents a finite length time series consisting (potentially) of
several channels. Objects of this type are called "windows",
"epochs", or "trials" in other contexts.
Normally one channel corresponds to one sensor.
The time series object is a
2d array of channels times time series amplitudes
(*mandatory* first argument in constructor)
with some additional properties.
The additional properties are:
* channel_names (*mandatory* second argument in constructor,
list of stings without underscores)
* sampling_frequency (*mandatory* third argument in constructor,
e.g., 5000.0 for 5kHz)
* start_time (*optional*)
* end_time (*optional*)
* marker_name (the name of the marker used to create this object,
dictionary of included marker names and time stamps,
*optional*)
* name & tag (text format of object meta info, *optional*)
Channels can also be pseudo channels after spatial filtering.
When creating a TimSeries object, first the array has to be given to
the init function and then the other parameters/properties as
keyword arguments.
The array can be specified as two dimensional numpy array or in
list notation. The channels are on the second axes.
For example using the list ``[[1,2,3],[4,5,6]]`` would result in three
channels and two time points.
For accessing the array only without the meta information,
please use the command
.. code-block:: python
x = data.view(numpy.ndarray)
which hides this information.
TimeSeries objects are normally organized/collected in a
:class:`~pySPACE.resources.dataset_defs.time_series.TimeSeriesDataset`.
This type of dataset can be also used to generate the objects,
e.g., from csv files.
For data access in a node chain, data is loaded with a node from the
:mod:`~pySPACE.missions.nodes.source.time_series_source` module
as first node
and saved with the
:class:`~pySPACE.missions.nodes.sink.time_series_sink.TimeSeriesSinkNode`
as the last node.
It is also possible to create time series data from
not segmented data streams as described in the
:class:`~pySPACE.resources.dataset_defs.stream.StreamDataset`.
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2008/03/05
:Completely Refactored: 2008/08/18
:BaseData compatibility: David Feess, 2010/09/27
"""
[docs] def __new__(subtype, input_array, channel_names, sampling_frequency,
start_time=None, end_time=None, name=None,
marker_name=None, tag=None):
if type(input_array) == dict:
data = []
for channel in channel_names:
data.append(input_array[channel])
input_array = numpy.array(data)
# Input array is an already formed ndarray instance
# We first cast to be our class type
obj = base.BaseData.__new__(subtype, numpy.atleast_2d(input_array))
if obj.ndim > 2:
input_array = obj[0]
obj = base.BaseData.__new__(subtype, input_array)
warnings.warn("To many dimensions for Time Series Object!")
# add subclasses attributes to the created instance
obj.channel_names = channel_names
try:
assert(len(channel_names) == obj.shape[1]),\
"Channel names (%s) do not match array dimensions (%s)! Fix this!" \
% (str(channel_names), str(obj.shape))
except:
warnings.warn(
"Array dimensions (%s) do not match channel names (len: %i, names: %s)! Fix this!"
% (str(obj.shape), len(channel_names), str(channel_names)))
obj.sampling_frequency = float(sampling_frequency)
obj.start_time = start_time
obj.end_time = end_time
obj.name = name
obj.marker_name = marker_name
if not tag is None:
obj.tag = tag
# Finally, we must return the newly created object:
return obj
[docs] def __array_finalize__(self, obj):
super(TimeSeries, self).__array_finalize__(obj)
if not obj is None and not type(obj)==numpy.ndarray:
self.channel_names_hash = getattr(obj, 'channel_names_hash', None)
self.sampling_frequency = getattr(obj, 'sampling_frequency', None)
self.start_time = getattr(obj, 'start_time', None)
self.end_time = getattr(obj, 'end_time', None)
self.name = getattr(obj, 'name', None)
self.marker_name = getattr(obj, 'marker_name', None)
else:
# TODO: do we need this part or the other one?
self.channel_names_hash = None
self.sampling_frequency = None
self.start_time = None
self.end_time = None
self.name = None
self.marker_name = None
[docs] def __reduce__(self):
# Refer to
# http://www.mail-archive.com/numpy-discussion@scipy.org/msg02446.html
# for infos about pickling ndarray subclasses
object_state = list(super(TimeSeries, self).__reduce__())
subclass_state = (self.channel_names, self.sampling_frequency,
self.start_time, self.end_time, self.name,
self.marker_name)
object_state[2].append(subclass_state)
object_state[2] = tuple(object_state[2])
return tuple(object_state)
[docs] def __setstate__(self, state):
if len(state) == 2: # For compatibility with old TS implementation
nd_state, own_state = state
numpy.ndarray.__setstate__(self, nd_state)
else: # len == 3: new BaseData timeseries.
nd_state, base_state, own_state = state
super(TimeSeries, self).__setstate__((nd_state, base_state))
(self.channel_names, self.sampling_frequency, self.start_time,
self.end_time, self.name, self.marker_name) = own_state
@staticmethod
[docs] def _generate_tag(obj):
"""generate new tag based on time series attributes start_time,
end_time and name. The name is usually a sentence, with the last word
indicating the class. """
# if no information present: return None
if getattr(obj, 'name', None) == None and \
getattr(obj, 'start_time', None) == None and \
getattr(obj, 'end_time', None) == None:
return None
else:
# If attribute name is provided, the last word should represent class:
if getattr(obj, 'name', None) == None:
class_name = 'na'
else:
class_name = obj.name.split(' ')[-1]
if getattr(obj, 'start_time', None) == None:
start = 'na'
else:
start = str(int(obj.start_time))
if getattr(obj, 'end_time', None) == None:
end = 'na'
else:
end = str(int(obj.end_time))
return 'Epoch Start: %sms; End: %sms; Class: %s' % \
(start, end, class_name)
# In order to reduce the memory footprint, we do not store the channel
# names once per instance but only once per occurence. Instead we store a
# unique hash once per instance that allows to retrieve the channel names
channel_names_dict = {}
[docs] def get_channel_names(self):
return TimeSeries.channel_names_dict[self.channel_names_hash]
[docs] def set_channel_names(self, channel_names):
self.channel_names_hash = hash(str(channel_names))
if not TimeSeries.channel_names_dict.has_key(self.channel_names_hash):
TimeSeries.channel_names_dict[self.channel_names_hash] = channel_names
[docs] def del_channel_names(self):
pass
channel_names = property(get_channel_names, set_channel_names,
del_channel_names,
"The channel_names property.")
@staticmethod
[docs] def replace_data(old, data, **kwargs):
""" Create a new time series with the given data but the old metadata.
A factory method which creates a time series object with the given
data and the metadata from the old time_series
"""
data = TimeSeries(data,
channel_names=kwargs.get('channel_names',
old.channel_names),
sampling_frequency=kwargs.get('sampling_frequency',
old.sampling_frequency),
start_time=kwargs.get('start_time', old.start_time),
end_time=kwargs.get('end_time', old.end_time),
name=kwargs.get('name', old.name),
marker_name=kwargs.get('marker_name', old.marker_name))
data.inherit_meta_from(old)
if "tag" in kwargs.keys():
data.tag=kwargs["tag"]
return data
[docs] def get_channel(self, channel_name):
""" Return the values of the channel with name *channel_name* """
channel_index = self.channel_names.index(channel_name)
data = self.view(numpy.ndarray)
return data[:, channel_index]
[docs] def reorder(self, ordered_channel_list):
""" Reorder TimeSeries according to ordered_channel_list
This function takes the list given as argument as list of channel names,
orders the given TimeSeries object according to this list and returns a
reordered TimeSeries object.
"""
for elem in ordered_channel_list:
assert elem in self.channel_names, \
"TimeSeries:: Reordering impossible. %s is not present in original data!" % elem
current_pos=ordered_channel_list.index(elem)
#if True, the lines are swapped
if current_pos is not self.channel_names.index(elem):
old=self[:, current_pos]
self[:, current_pos]=self[:, self.channel_names.index(elem)]
self[:, self.channel_names.index(elem)]=old
self.channel_names=ordered_channel_list
[docs] def _ms_to_samples(self, ms):
return ms/1000.0*self.sampling_frequency
[docs] def _samples_to_ms(self, samples):
return samples/float(self.sampling_frequency)*1000
[docs] def __str__(self):
str_repr = "TimeSeriesObject \nChannel_names: "
str_repr+= str(self.channel_names)
str_repr+= "\n"
# str_repr+=str(self.view(numpy.ndarray))
va = self.view(numpy.ndarray)
for index, channel_name in enumerate(self.channel_names):
str_repr += "%s : %s \n" % (channel_name, va[:,index])
str_repr+="\n"
return str_repr
[docs] def __eq__(self,other):
""" Same channels (names) and values """
if not type(self) == type(other):
return False
if not set(self.channel_names) == set(other.channel_names):
return False
if not self.shape == other.shape:
return False
if self.channel_names == other.channel_names:
return numpy.allclose(self.view(numpy.ndarray), other.view(numpy.ndarray))
else:
# Comparison by hand
for channel in self.channel_names:
if not numpy.allclose((self[self.channel_names.index(channel)],
other[other.channel_names.index(channel)])):
return False
return True