""" Independent Component Analysis variants """
import os
import cPickle
import numpy
try:
import mdp
from mdp.nodes import FastICANode
import_error = False
except ImportError, e:
import_error = e
from pySPACE.missions.nodes.spatial_filtering.spatial_filtering \
import SpatialFilteringNode
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.tools.filesystem import create_directory
import logging
try:
class FastICANodeWrapper(FastICANode):
"""The only reason for this node is to deal with the fact
that the ICANode super class does not accept the output_dim kwarg
"""
def __init__(self, trainable=True, *args, **kwargs):
if "output_dim" in kwargs:
kwargs.pop("output_dim")
if trainable is False:
self._trainable = False
super(FastICANodeWrapper, self).__init__(*args, **kwargs)
def is_training(self):
""" Mapping to *self._training variable* """
return self._training
except:
pass
[docs]class ICAWrapperNode(SpatialFilteringNode):
""" Wrapper around the Independent Component Analysis filtering of mdp
This Node implements the unsupervised independent component
analysis algorithm for spatial filtering.
**Parameters**
:retained_channels: Determines how many of the ICA pseudo channels
are retained. Default is None which means "all channels".
For ICA channels, there is no sorting,
hence data has to be reduced in the internal whitening beforehand.
(*optional, default: None*)
:load_path: An absolute path from which the ICA filter
is loaded.
If not specified, this matrix is learned from the training data.
(*optional, default: None*)
**Exemplary Call**
.. code-block:: yaml
-
node : ICA
parameters:
retained_channels : 42
"""
[docs] def __init__(self, retained_channels=None, load_path=None, **kwargs):
if import_error:
raise ImportError(import_error)
# Must be set before constructor of superclass is set
self.trainable = (load_path is None)
if "output_dim" in kwargs:
kwargs.pop("output_dim")
super(ICAWrapperNode, self).__init__(**kwargs)
# Load filters from file if requested
wrapped_node = None
if load_path is not None:
filters_file = open(load_path, 'r')
filters, white, whitened = cPickle.load(filters_file)
wrapped_node = FastICANodeWrapper(trainable=False,
white_comp=retained_channels)
wrapped_node.filters = filters
wrapped_node.white = white
wrapped_node.whitened = whitened
wrapped_node._training = False
wrapped_node._train_phase = -1
wrapped_node._train_phase_started = False
self.set_permanent_attributes(filters=filters, white=white,
whitened=whitened)
self.set_permanent_attributes(
# The number of channels that will be retained
retained_channels=retained_channels,
# Determine whether this node is trainable
trainable=(load_path is None),
output_dim=retained_channels,
new_channel_names=None,
channel_names=None,
wrapped_node=wrapped_node)
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
return self.trainable
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training. """
return False
[docs] def _train(self, data, label=None):
""" Uses *data* to learn a decomposition into independent components."""
# We simply ignore the class label since we
# are doing unsupervised learning
if self.channel_names is None:
self.channel_names = data.channel_names
if self.wrapped_node is None:
self.wrapped_node = FastICANode()
data *= 1.0
self.wrapped_node.train(data)
[docs] def _execute(self, data):
""" Execute learned transformation on *data*.
Changes the base of the space in which the data is located so
that the dimensions correspond to independent components
"""
# If this is the first data sample we obtain
if self.retained_channels is None:
# Count the number of channels
self.set_permanent_attributes(retained_channels=data.shape[1])
if self.channel_names is None:
self.channel_names = data.channel_names
if len(self.channel_names) < self.retained_channels:
self.retained_channels = len(self.channel_names)
self._log("To many channels chosen for the retained channels!"
" Replaced by maximum number.", level=logging.CRITICAL)
if not(self.output_dim == self.retained_channels):
# overwrite internal output_dim variable, since it is set wrong
self._output_dim = self.retained_channels
projected_data = self.wrapped_node.execute(data.view(numpy.ndarray))
# Select the channels that should be retained
# Note: We have to create a new array since otherwise the removed
# channels remains in memory
projected_data = numpy.array(projected_data[:, :self.retained_channels])
if self.new_channel_names is None:
self.new_channel_names = ["ica%03d" % i
for i in range(projected_data.shape[1])]
return TimeSeries(projected_data, self.new_channel_names,
data.sampling_frequency, data.start_time,
data.end_time, data.name, data.marker_name)
[docs] def store_state(self, result_dir, index=None):
""" Stores this node in the given directory *result_dir*. """
if self.store:
node_dir = os.path.join(result_dir, self.__class__.__name__)
create_directory(node_dir)
# This node only stores the learned eigenvector and eigenvalues
name = "%s_sp%s.pickle" % ("filters", self.current_split)
result_file = open(os.path.join(node_dir, name), "wb")
result_file.write(cPickle.dumps((self.wrapped_node.filters,
self.wrapped_node.white,
self.wrapped_node.whitened),
protocol=2))
result_file.close()
[docs] def get_filter(self):
return self.wrapped_node.get_projmatrix()
_NODE_MAPPING = {"ICA": ICAWrapperNode}