""" Visualize data in time-amplitude or time-frequency representations
This module contains nodes that can be used to visualize
the data as time series. All these nodes use the functionality
of the VisualizationBaseNode.
import numpy
import math
import pylab
#import itertools
#from collections import defaultdict
import warnings
import os
from matplotlib import mlab
#import logging
#from pySPACE.tools.memoize_generator import MemoizeGenerator
from pySPACE.missions.nodes.visualization.base import VisualizationBase
from pySPACE.tools.filesystem import create_directory
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.resources.dataset_defs.stream import StreamDataset
#delete me after conversion to vis supernode
from pySPACE.missions.nodes.base_node import BaseNode
[docs]class TimeSeriesPlotNode(VisualizationBase):
"""A node that allows to monitor the processing of time series
This node plots the time series data either in one column for all channels
or for a single selected channel. The node inherits the
functionality of the VisualisationBase.
See documentation of :mod:`VisualisationBase <pySPACE.missions.nodes.visualization.base>`
to view basic functionality.
If not specified differently using the parameters below, the data is plotted
in one matrix for each class (using pylab.matshow).
If one channel_name is given, only information about
this channel is plotted. If more channels are specified,
they are plotted separately in one subplot
(forces separate_channels to True).
(*optional, default: None*)
Each channel gets a separate subplot (amplitude vs time),
either arranged in rows and columns or arranged according to
position on the head (see parameter physiological_arrangement in base class).
(*optional, default: False*)
If this is True, the created plot shows the difference between the two classes.
This option only works if 2 classes are present at the same time!
(*optional, default: False*)
.. image:: ../../graphics/time_series_plot.png
:width: 1024
**Exemplary Call**
.. code-block:: yaml
node : Time_Series_Plot
parameters :
averaging : True
online : True
separate_channels: True
:Author: Sirko Straube (sirko.straube@dfki.de)
:Date of Last Revision: 2012/12/21
input_types = ["TimeSeries"]
[docs] def __init__(self, channel_names=None, separate_channels = False, class_difference=False, **kwargs):
super(TimeSeriesPlotNode, self).__init__(**kwargs)
#if the user specified more than one channel
if channel_names and len(channel_names)>1:
separate_channels = True
separate_channels = separate_channels,
min_value = pylab.inf,
max_value = -pylab.inf,
[docs] def reset(self):
Reset the state of the object to the clean state it had after its
super(TimeSeriesPlotNode, self).reset()
[docs] def _plotValues(self,
values, #dict TimeSeries values
plot_label, #str Plot-Label
fig_num, #int Figure-number for classify plots
# 1: average,
# 2: single trial,
# 3: average accumulating
store_dir = None, #str Directory to store the plots
counter=0): #int Plotcounter for all trials
if self.class_difference:
#we have to compute the difference
list_of_classes = values.keys()
num_of_classes = len(list_of_classes)
if num_of_classes != 2:
warnings.warn("TimeSeriesPlot:: Difference plots only possible for two classes Plotting ignored!")
data1 = values[list_of_classes[0]].view(numpy.ndarray)
data2 = values[list_of_classes[1]].view(numpy.ndarray)
#construct new TimeSeries with differences
data = TimeSeries(data1 - data2,
#overwrite old incoming values
sampling_frequency = values[values.keys()[0]].sampling_frequency
list_of_classes = values.keys()
num_of_classes = len(list_of_classes)
#computing time points to show
num_tpoints = values.values()[0].shape[0]
all_tpoints = numpy.arange(0,
num_tpoints * (1000/sampling_frequency),
1000 / sampling_frequency) + self.timeshift
pylab.subplots_adjust(left = 0.05, # the left side of the subplots of the figure
right = 0.95, # the right side of the subplots of the figure
bottom = 0.05, # the bottom of the subplots of the figure
top = 0.95, # the top of the subplots of the figure
wspace = 0.15, # the amount of width reserved for blank space between subplots
hspace = 0.1, # the amount of height reserved for white space between subplots
f=pylab.figure(fig_num, figsize = (18,13))
#assure that figure is displayed with an interactive backend
if pylab.get_backend() in pylab.matplotlib.backends.interactive_bk:
#plot all channels separately in one figure
if self.separate_channels:
self._plot_all_channels_separated(all_tpoints, values)
for index, class_label in enumerate(list_of_classes):
data = values[class_label].view(numpy.ndarray)
axis = pylab.subplot(1, num_of_classes, index + 1)
#plot just 1 channel
if self.channel_names != None and len(self.channel_names)==1:
assert (self.channel_names[0] in values[class_label].channel_names),\
"TimeSeriesPlot::Channel requested for plotting is not available in data"
channel_index = values[class_label].channel_names.index(self.channel_names[0])
# Update minimal and maximal value
self.min_value = min(self.min_value,
min(data[:, channel_index].flatten()))
self.max_value = max(self.max_value,
max(data[:, channel_index].flatten()))
title = pylab.title(str(self.channel_names) + ' ' + class_label)
pylab.plot(all_tpoints, data[:, channel_index])
#plot all channels as matrix
self._plot_all_channels(data, all_tpoints, values[class_label].channel_names)
title = pylab.title(class_label)
# Draw or store the figure
if store_dir is None:
if current_split != 0 and not\
plot_label.endswith('_split_' + str(current_split)): #more than one split and first call
plot_label = plot_label + '_split_' + str(current_split)
f_name=str(store_dir) + str(os.sep) + str(plot_label)
pylab.savefig(f_name + ".png")
[docs] def _plot_all_channels(self, data, tpoints, channel_names):
if not self.class_difference:
# Normalize the data to be plotted so that all values are
# between 0 and 1. This is useful since the same color
# in the two plots corresponds to the same value
min_value = min(data.ravel())
max_value = max(data.ravel())
plot_values = (data - min_value) / (max_value - min_value)
im = pylab.matshow(plot_values.T, fignum = False, aspect = 4.0)
ax = pylab.gca()
#ax.set_xticks(pylab.linspace(0.0, plot_values.shape[0], 20))
ax.set_xticklabels(map(lambda s: "%.2f" %s,tpoints))
labels = ax.get_xticklabels()
pylab.setp(labels, rotation=-45)
pylab.xlabel("time (ms)")
[docs] def _plot_all_channels_separated(self, tpoints, values):
""" This function generates time series plot separated for each channel
#def _generate_time_series_plot(self, label, data):
# in case of [phys_arrangement], write the figure title only once
# and large in the upper left corner of the plot. this fails whenever
# there are no more channels in that area, as the plot gets cropped
# if self.physiological_arrangement:
# h, l = pylab.gca().get_legend_handles_labels()
# prop = pylab.matplotlib.font_manager.FontProperties(size='xx-large')
# f.legend(h, l, prop=prop, loc=1)
# text_x = .4
# text_y = .4
# #if self.shrink_plots: text_y = 1.2
# f.text(text_x, text_y, 'Channel-wise time series\n' +
# samples_per_condition_string,
# ha='center', color='black', size=32)
for index, class_label in enumerate(values.keys()):
data = values[class_label].view(numpy.ndarray)
line_color = 'bgrcmyk'[index]
# Compute number of rows and cols for subplot-arrangement:
# use 8 as upper limit for cols and compute rows accordingly
if number_of_channels <= 8:
nr_of_cols = number_of_channels
nr_of_cols = 8
nr_of_rows = (number_of_channels - 1) / 8 + 1
# Set canvas size in inches. These values turned out fine, depending
# on [physiological_arrangement]
#if not self.physiological_arrangement:
# f.set_size_inches((5 * nr_of_cols, 3 * nr_of_rows))
#if not self.shrink_plots:
# figure.set_size_inches((3*11.7, 3*8.3))
# f.set_size_inches((4*11.7, 4*8.3))
ec = self.get_metadata("electrode_coordinates")
if ec is None:
ec = StreamDataset.ec
ec_2d = StreamDataset.project2d(ec)
# plot everything channel-wise
for channel_index in range(number_of_channels):
channel_name = channel_names[channel_index]
if self.channel_names and (channel_name not in self.channel_names):
if not skip_plot:
f.add_subplot(nr_of_rows, nr_of_cols, channel_index + 1)
# actual plotting of the data. This can always be done
pylab.plot(tpoints, data[:, channel_index],
channel_name = channel_names[channel_index]
if self.physiological_arrangement:
x, y = ec_2d[channel_name]
w = .05
h = .045
pylab.gca().set_position([(x + 110) / 220, (y + 110) / 220, w, h])
[docs]class SpectrumPlotNode(VisualizationBase):
""" Construct spectrogram of the data using FFT
This node uses the data for a power-spectral density (psd) computation
plotted as time-frequency representation. The core function here is specgram
from matplotlib.mlab. The node inherits the
functionality of the VisualisationBase.
See documentation of :mod:`VisualisationBase <pySPACE.missions.nodes.visualization.base>`
to view basic functionality.
If not specified differently using the parameters below, all channels
are plotted in physiological arrangement separately for each class.
.. note::
If you average data, then the spectrogram is always computed on
the average. Currently, the averaging of the power values is not
All classes are plotted directly in one figure,
if only one channel_name is specified (e.g., 'Pz').
If more channels are specified, you get a plot of all channels
for one class, i.e. for two classes two plots are returned.
The default is that all channels are displayed which the user can
explicitly specify using the keyword 'all'.
(*optional, default: None*)
Determine if the colorbar should be displayed.
Currently, the colorbar is switched off when parameter
physiological_arrangement is True.
(*optional, default: False*)
The number of data points used in each block
for the FFT. Must be even; a power 2 is most efficient.
(*optional, default: 128*)
The number of points of overlap between blocks of the FFT.
(*optional, default: 0*)
**Exemplary Call**
.. code-block:: yaml
node : Spectrum_Plot
parameters :
averaging : True
online : True
:Author: Sirko Straube (sirko.straube@dfki.de)
:Date of Last Revision: 2013/01/18
input_types = ["TimeSeries"]
[docs] def __init__(self,
colorbar = False,
super(SpectrumPlotNode, self).__init__(**kwargs)
if channel_names=='all':
elif type(channel_names) == str:
min_value = pylab.inf,
max_value = -pylab.inf)
[docs] def _plotValues(self,
values, #dict TimeSeries values
plot_label, #str Plot-Label
fig_num, #int Figure-number for classify plots
# 1: average,
# 2: single trial,
# 3: average accumulating
store_dir = None, #str Directory to store the plots
counter=0): #int Plotcounter for all trialsdef plot_values
sampling_frequency = values[values.keys()[0]].sampling_frequency
list_of_classes = values.keys()
num_of_classes = len(list_of_classes)
#computing time points to show
#num_tpoints = values.values()[0].shape[0]
pylab.subplots_adjust(left = 0.05, # the left side of the subplots of the figure
right = 0.95, # the right side of the subplots of the figure
bottom = 0.05, # the bottom of the subplots of the figure
top = 0.95, # the top of the subplots of the figure
wspace = 0.15, # the amount of width reserved for blank space between subplots
hspace = 0.1, # the amount of height reserved for white space between subplots
#plot all channels separately in one figure
#plot just 1 channel
if self.channel_names != None and len(self.channel_names)==1:
f=pylab.figure(fig_num, figsize = (18,13))
#assure that figure is displayed with an interactive backend
if pylab.get_backend() in pylab.matplotlib.backends.interactive_bk:
for index, class_label in enumerate(list_of_classes):
assert (self.channel_names[0] in values[class_label].channel_names),\
"SpectrumPlot::Channel requested for plotting is not available in data"
channel_index = values[class_label].channel_names.index(self.channel_names[0])
#operations like splicing only on view of object
data = values[class_label].view(numpy.ndarray)
pylab.subplot(1, num_of_classes, index + 1)
title = pylab.title(str(self.channel_names) + ' ' + class_label)
self._plot_spectrum(data[:,channel_index], sampling_frequency)
# Draw or store the figure
if store_dir is None:
if current_split != 0 and not\
plot_label.endswith('_split_' + str(current_split)): #more than one split and first call
plot_label = plot_label + '_split_' + str(current_split)
f_name=str(store_dir) + str(os.sep) + str(plot_label)
pylab.savefig(f_name + ".png")
#plot more than one channel in one figure for each label
for index, class_label in enumerate(list_of_classes):
title = pylab.title(class_label)
if self.channel_names==None: #this means all channels are plotted
f=pylab.figure(fig_num, figsize = (18,13))
#assure that figure is displayed with an interactive backend
if pylab.get_backend() in pylab.matplotlib.backends.interactive_bk:
# Compute number of rows and cols for subplot-arrangement:
# use 8 as upper limit for cols and compute rows accordingly
if number_of_channels <= 8:
nr_of_cols = number_of_channels
nr_of_cols = 8
nr_of_rows = (number_of_channels - 1) / 8 + 1
data = values[class_label].view(numpy.ndarray)
ec = self.get_metadata("electrode_coordinates")
if ec is None:
ec = StreamDataset.ec
ec_2d = StreamDataset.project2d(ec)
# plot everything channel-wise
for channel_index in range(number_of_channels):
channel_name = self.channel_names[channel_index]
f.add_subplot(nr_of_rows, nr_of_cols, channel_index + 1)
# actual plotting of the data
self._plot_spectrum(data[:,channel_index], sampling_frequency)
if self.physiological_arrangement:
x, y = ec_2d[channel_name]
w = .05
h = .045
pylab.gca().set_position([(x + 110) / 220, (y + 110) / 220, w, h])
# Draw or store the figure
if store_dir is None:
if current_split != 0 and not\
plot_label.endswith('_split_' + str(current_split)): #more than one split and first call
plot_label = plot_label + '_split_' + str(current_split)
f_name=str(store_dir) + str(os.sep) + str(plot_label) + '_' + class_label
pylab.savefig(f_name + ".png")
# title.set_fontsize(24)
# pylab.draw()
[docs] def _plot_spectrum(self, data, sampling_frequency):
(Pxx, freqs, bins) = mlab.specgram(data,
if numpy.any(Pxx[0,0] == 0):
self._log("SpectrumPlot::Instance has power 0 in a frequency band, skipping...")
# Update minimal and maximal value
self.min_value = min(self.min_value, min(Pxx.flatten()))
self.max_value = max(self.max_value, max(Pxx.flatten()))
Z = numpy.flipud(Pxx)
extent = 0, numpy.amax(bins), freqs[0], freqs[-1]
pylab.imshow(Z, None, extent=extent, vmin=self.min_value,
if self.colorbar and not self.physiological_arrangement:
return (Pxx, freqs, bins)
[docs]class ScatterPlotNode(BaseNode):
"""Creates a scatter plot of the given channels for the given point in time
This node creates scatter_plot of the values of all vs. all specified
channels for the given point in time (plot_ms).
:plot_ms: The point of time, for which the scatter plots are drawn.
For instance, if plot_ms = 200, all the values of the
selected channels are collected that were measured
200ms after the window start and the scatter plots for
these values are drawn
:channels: If channels is not None, only scatter plots for
these specified channels are plotted. If channels is not
specified, scatter plots for the first 7 available
channels are drawn.
.. note:: The maximal number of channels has to be less than 8 since
more than a 7*7 matrix of plots is hard to get plotted
into one window.
.. image:: ../../graphics/scatter_plot.png
:width: 1024
**Exemplary Call**
.. code-block:: yaml
- node : ScatterPlot
parameters :
plot_ms : 2
figure_number = 0
input_types = ["TimeSeries"]
[docs] def __init__(self, plot_ms, channels=None, **kwargs):
super(ScatterPlotNode, self).__init__(**kwargs)
# An attribute that stores the number of channels
number_of_channels = None, # Set lazily
# A set of colors that can be used to distinguish different classes
colors=set(["r", "b"]),
# A mapping from class label to its color in the plot
self.figure_number = ScatterPlotNode.figure_number
ScatterPlotNode.figure_number += 1
figure = pylab.figure(self.figure_number,
figsize=(21, 11))
figure.subplots_adjust(left=0.01, bottom=0.01, right=0.99, top= 0.99,
wspace=0.2, hspace=0.2)
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
# Though this node is not really trainable, it returns true in order
# to get trained. The reason is that during this training phase,
# it visualizes all samChannel_Visples that are passed as arguments
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
This node is not really trained but uses the labeled examples to
generate a scatter plot.
#Determine the number of channels if not yet done
if self.number_of_channels == None:
# If no channels are specified -> use the first seven
# For more than 7*7 subplot, plotting is too slow
if self.channels == None:
self.channels = data.channel_names[:7]
elif len(self.channels) > 7:
self.channels = self.channels[:7]
self.number_of_channels = len(self.channels)
self.plot_index = data.shape[0] * self.plot_ms \
/ (data.end_time - data.start_time)
# Determine color of this class if not yet done
if label not in self.class_colors.keys():
self.class_colors[label] = self.colors.pop()
print self.figure_number
# For all pairs of channels
#for ch1 in range(self.number_of_channels):
# for ch2 in range(self.number_of_channels):
for index1, channel_name1 in enumerate(self.channels):
for index2, channel_name2 in enumerate(self.channels):
channel_index1 = data.channel_names.index(channel_name1)
channel_index2 = data.channel_names.index(channel_name2)
# Determine the relevant scatter plot
index1 * self.number_of_channels + index2 + 1)
pylab.text(0.1, 0.9,
"%s vs. %s" % (channel_name1, channel_name2),
transform = pylab.gca().transAxes)
if index1 == self.number_of_channels \
and index2 == self.number_of_channels:
# Plot the point projected on the respective subspace
pylab.plot([data[self.plot_index, channel_index1]],
[data[self.plot_index, channel_index2]],
self.class_colors[label] + "o")
[docs] def _stop_training(self, debug=False):
[docs] def _execute(self, data):
# We simply pass the given data on to the next node
return data
[docs]class HistogramPlotNode(BaseNode):
""" Creates a histogram of the given channels for the given point in time
This node creates histograms of the values of all specified channels for
the given point in time (plot_ms). The value range is restricted to the
specified value_range, values outside of this range are not plotted.
:plot_ms: The point of time, for which the histograms are drawn.
For instance, if plot_ms = 200, all the values of the
selected channels are collected that were measured
200ms after the window start and the histograms for these
values are drawn
:value_range: A pair (tuple) that specifies the range of
values that are plotted in the histogram. Values
outside this range are not drawn.
:channels: If channels is not None, only histograms for
these specified channels are plotted. If channels is not
specified, histograms for all available channels
are plotted.
.. image:: ../../graphics/histogram.png
:width: 1024
**Exemplary Call**
.. code-block:: yaml
node : HistogramPlot
parameters :
plot_ms : 2
value_range : [-10, 10]
input_types = ["TimeSeries"]
[docs] def __init__(self, plot_ms, value_range, channels=None, **kwargs):
super(HistogramPlotNode, self).__init__(**kwargs)
# An attribute that stores the number of channels
number_of_channels=None, # Set lazily
# A mapping from class label to samples for this class
# A set of colors that can be used to distinguish different classes
colors=set(["r", "b"]))
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
# Though this node is not really trainable, it returns true in order
# to get trained. The reason is that during this training phase,
# it visualizes all samples that are passed as arguments
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
This node is not really trained but uses the labeled examples to
generate a histogram
#Determine the number of channels if not yet done
if self.number_of_channels == None:
# If no channels are specified -> plot all
if self.channels == None:
self.channels = data.channel_names
self.number_of_channels = len(self.channels)
self.plot_index = data.shape[0] * self.plot_ms \
/ (data.end_time - data.start_time)
# Intialize Plotting
figure = pylab.figure(figsize=(21, 11))
figure.subplots_adjust(left=0.03, bottom=0.03,
right=0.97, top= 0.97,
wspace=0.2, hspace=0.2)
# Set title of subplots
for i, channel_name in enumerate(self.channels):
i + 1)
pylab.text(0.5, 0.5, channel_name,
transform = pylab.gca().transAxes)
if label not in self.class_samples.keys():
self.class_samples[label] = [[] for i in range(self.number_of_channels)]
plot_column = data[self.plot_index, :]
for index, channel_name in enumerate(self.channels):
channel_index = data.channel_names.index(channel_name)
[docs] def _stop_training(self, debug=False):
for class_label, class_samples in self.class_samples.iteritems():
color = self.colors.pop()
for i, channel_samples in enumerate(class_samples):
pylab.subplot(int(math.ceil(float(self.number_of_channels )/2)),
pylab.hist(channel_samples, bins = 25, fc=color,
histtype = 'stepfilled', normed = True, alpha = 0.5,
range = self.value_range,
label = class_label)
for i in range(self.number_of_channels):
pylab.subplot(int(math.ceil(float(self.number_of_channels )/2)),
[docs] def _execute(self, data):
# We simply pass the given data on to the next node
return data
_NODE_MAPPING = {"Time_Series_Plot": TimeSeriesPlotNode,
"Spectrum_Plot": SpectrumPlotNode,
"Scatter_Plot": ScatterPlotNode,
"Histogram_Plot": HistogramPlotNode}