Source code for pySPACE.missions.nodes.spatial_filtering.pca

""" Principal Component Analysis variants """

import os
import cPickle

try:
    from mdp.nodes import PCANode
    from mdp.utils import mult
except:
    pass
from pySPACE.missions.nodes.spatial_filtering.spatial_filtering import SpatialFilteringNode

from pySPACE.resources.data_types.time_series import TimeSeries

from pySPACE.tools.filesystem import  create_directory
import logging
import numpy

[docs]class PCAWrapperNode(SpatialFilteringNode): #, PCANode): """ Reuse the implementation of the Principal Component Analysis of mdp For a theoretical description of how PCA works, the following tutorial is extremely useful. ======= ========================================================= Title A TUTORIAL ON PRINCIPAL COMPONENT ANALYSIS Derivation, Discussion and Singular Value Decomposition Author Jon Shlens Link http://www.cs.princeton.edu/picasso/mats/PCA-Tutorial-Intuition_jp.pdf ======= ========================================================= This node implements the unsupervised principal component analysis algorithm for spatial filtering. .. note:: The original PCANode can execute the Principal Component Analysis in 2 ways. The first method(which is also the default) involves the computation of the eigenvalues of a symmetric matrix. This is obviously a rather fast approach. Nonetheless, this approach sometimes fails and negative eigenvalues are obtained from the computation. The problem can be solved by using the Singular Value Decomposition method in the PCA. This is easily done by setting ``svd=True`` when initializing the :mod:`~pySPACE.missions.nodes.spatial_filtering.pca`. The SVD approach is more robust but also less cost-effective when it comes to computation time. **Parameters** :retained_channels: Determines how many of the PCA pseudo channels are retained. Default is None which means "all channels". :load_path: An absolute path from which the PCA eigenmatrix is loaded. If not specified, this matrix is learned from the training data. (*optional, default: None*) **mdp parameters** :svd: if True use Singular Value Decomposition instead of the standard eigenvalue problem solver. Use it when PCANode complains about singular covariance matrices (*optional, default: False*) :reduce: Keep only those principal components which have a variance larger than 'var_abs' and a variance relative to the first principal component larger than 'var_rel' and a variance relative to total variance larger than 'var_part' (set var_part to None or 0 for no filtering). Note: when the 'reduce' switch is enabled, the actual number of principal components (self.output_dim) may be different from that set when creating the instance. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : PCA parameters: retained_channels : 42 """
[docs] def __init__(self, retained_channels=None, load_path=None, svd=False, reduce=False, **kwargs): # Must be set before constructor of superclass is set self.trainable = (load_path == None) super(PCAWrapperNode, self).__init__(**kwargs) self.output_dim = retained_channels # Load patterns from file if requested if load_path != None: filter_file = open(load_path, 'r') avg, v = cPickle.load(filter_file) self.set_permanent_attributes(avg=avg, v=v, trainable=False, filters=v) self.set_permanent_attributes( # The number of channels that will be retained retained_channels=retained_channels, output_dim=retained_channels, channel_names=None, new_channels=None, wrapped_node=None, svd=svd, # choose the method to use when computing the PCA reduce=reduce )
[docs] def is_trainable(self): """ Returns whether this node is trainable. """ return self.trainable
[docs] def is_supervised(self): """ Returns whether this node requires supervised training. """ return False
[docs] def _train(self, data, label = None): """ Updates the estimated covariance matrix based on *data*. """ # We simply ignore the class label since we # are doing unsupervised learning if self.wrapped_node is None: self.wrapped_node = PCANode(svd=self.svd, reduce=self.reduce) x = 1.0 * data.view(type=numpy.ndarray) self.wrapped_node.train(x) if self.channel_names is None: self.channel_names = data.channel_names
[docs] def _stop_training(self, debug=False): """ Stops training by forwarding to super class. """ self.wrapped_node.stop_training() #super(PCAWrapperNode, self)._stop_training(debug) self.v = self.wrapped_node.v self.avg = self.wrapped_node.avg self.filters = self.v
[docs] def _execute(self, data, n = None): """ Execute learned transformation on *data*. Projects the given data to the axis of the most significant eigenvectors and returns the data in this lower-dimensional subspace. """ # 'INITIALIZATION' if self.retained_channels==None: self.retained_channels = data.shape[1] if n is None: n = self.retained_channels if self.channel_names is None: self.channel_names = data.channel_names if len(self.channel_names)<self.retained_channels: self.retained_channels = len(self.channel_names) self._log("To many channels chosen for the retained channels! Replaced by maximum number.",level=logging.CRITICAL) if not(self.output_dim==self.retained_channels): # overwrite internal output_dim variable, since it is set wrong self._output_dim = self.retained_channels # 'Real' Processing #projected_data = super(PCANodeWrapper, self)._execute(data, n) x = data.view(numpy.ndarray) projected_data = mult(x-self.avg, self.v[:, :self.retained_channels]) if self.new_channels is None: self.new_channel_names = ["pca%03d" % i for i in range(projected_data.shape[1])] return TimeSeries(projected_data, self.new_channel_names, data.sampling_frequency, data.start_time, data.end_time, data.name, data.marker_name)
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir*. """ if self.store: node_dir = os.path.join(result_dir, self.__class__.__name__) create_directory(node_dir) # This node only stores the learned eigenvector and eigenvalues name = "%s_sp%s.pickle" % ("eigenmatrix", self.current_split) result_file = open(os.path.join(node_dir, name), "wb") result_file.write(cPickle.dumps((self.avg, self.v), protocol=2)) result_file.close()
_NODE_MAPPING = {"PCA": PCAWrapperNode}