Source code for pySPACE.missions.nodes.meta.flow_node

""" Encapsulate complete :mod:`~pySPACE.environments.chains.node_chain` into a single node """

import operator
import cPickle
import copy
import logging
import warnings
import itertools
import numpy

import pySPACE.missions.nodes
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.missions.nodes.source.external_generator_source\
import ExternalGeneratorSourceNode
from pySPACE.missions.nodes.splitter.all_train_splitter import AllTrainSplitterNode
from pySPACE.environments.chains.node_chain import NodeChain
from pySPACE.tools.memoize_generator import MemoizeGenerator

# BacktransformationNode imports
from pySPACE.resources.data_types.feature_vector import FeatureVector
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.missions.nodes.feature_generation.time_domain_features import TimeDomainFeaturesNode


[docs]class FlowNode(BaseNode): """ Encapsulate a whole node chain from YAML specification or path into a single node The FlowNode encapsulates a whole node chain so that it can be used like a node. The encapsulated chain can either be passed directly via the *nodes* parameter. Alternatively, the path to a pickled node chain can be passed via *load_path*. In the second case, the object is loaded lazily (i.e. only when required). This is important in situations where the FlowNode is pickled again (for instance when using :class:`~pySPACE.environments.backends.multicore.MulticoreBackend`). .. note:: When defining this node in YAML syntax, one can pass a "nodes" parameter instead of the "subflow" parameter (see exemplary call below). The value of this parameter must be a NodeChain definition in YAML syntax (properly indented). This NodeChain definition is converted into the actual "subflow" parameter passed to the constructor in the class' static method "node_from_yaml" (overwriting the default implementation of BaseNode). Furthermore, it is determined whether trainable and supervised must be True. Thus, these parameters need not be specified explicitly. **Parameters** :subflow: The NodeChain object that is encapsulated in this node. Must be provided when no load_path is given. (*semi-optional, default: None*) :load_path: The path to the pickled NodeChain object that is loaded and encapsulated in this flow node. Must be given when no subflow is provided. The path string can contain phrases like __SPLIT__ - they are replaced in the super node. (*semi-optional, default: None*) :trainable: If True, the nodes of the NodeChain require training, thus this node itself must be trainable. When reading the specification, it is tested, if the subnodes need training. (*optional, default: False*) :supervised: If True, the nodes require supervised training, thus this node itself must be supervised. (*optional, default: False*) :input_dim: This node may require in contrast to the other nodes that the dimensionality of the input data is explicitly set. This is the case when the input dimensionality cannot be inferred from the passed subflow parameter. (*optional, default: None*) :output_dim: This node may require in contrast to the other nodes that the dimensionality of the output data is explicitly set. This is the case when the output dimensionality cannot be inferred from the passed subflow parameter. (*optional, default: None*) :change_parameters: List of tuple specifying, which parameters to change in the internal nodes Each tuple is a dictionary with the keys: :node: Name of the node, :parameters: dictionary of new parameters, :number: optional number of occurrence in the node (default: 1). By default we assume, that node parameters and program variables are identical. This default is implemented in the BaseNode and can be overwritten by the relevant node with the function *_change_parameters*. (*optional, default: []*) **Exemplary Call** .. code-block:: yaml - node : FlowNode parameters : input_dim : 64 output_dim : 1612 nodes : - node : ChannelNameSelector parameters : inverse : True selected_channels: ["EMG1","EMG2","TP7","TP8"] - node : Decimation parameters : target_frequency : 25.0 - node : FFT_Band_Pass_Filter parameters : pass_band : [0.0, 4.0] - node : Time_Domain_Features parameters : moving_window_length : 1 change_parameters : - node : ChannelNameSelector parameters : inverse : False selected_channels: ["EMG1","EMG2"] :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Created: 2010/07/28 """
[docs] def __init__(self, nodes=None, load_path=None, trainable=False, supervised=False, input_dim=None, output_dim=None, dtype=None, change_parameters=[], **kwargs): # We need either a nodes or the path to one assert (nodes or load_path) # set trainable permanently self.trainable = trainable if load_path: # assume that all splits have same relevant parameters check_nodes_path = load_path.replace("__SPLIT__","0") # We load the flow only temporarily, since we might need to # pickle this object (depending on the backend), and storing the flow # object too early causes overhead flow = cPickle.load(open(check_nodes_path, 'r')) # Determine some properties of the flow (input_dim, output_dim) if input_dim==None and load_path: if not input_dim: if flow[0].is_source_node(): input_dim = flow[1].input_dim else: input_dim = flow[0].input_dim if not output_dim: if flow[-1].is_sink_node(): output_dim = flow[-2].output_dim else: output_dim = flow[-1].output_dim elif input_dim==None: input_dim = nodes[0].input_dim # assert input_dim is not None, "You must specify the input dim of " \ # "node %s explicitly!" % self.__class__.__name__ output_dim = nodes[-1].output_dim if load_path: trainable = reduce(operator.or_, [node.is_retrainable() for node in flow]) supervised = trainable # flow is not made permanent but later on loaded try: del(flow) except: pass # Now we can call the superclass constructor super(FlowNode, self).__init__(input_dim=input_dim, output_dim=output_dim, dtype=dtype, **kwargs) self.set_permanent_attributes(trainable = trainable, supervised = supervised, train_instances = [], change_parameters = change_parameters, changed=False) if nodes: # if we got a flow object # Remove dtype of the nodes for node in nodes: node._dtype = None self.set_permanent_attributes(flow = nodes) else: # Do not load now, but only store path to pickled object self.set_permanent_attributes(load_path = load_path, flow = None) # We will load the nodes lazily
@staticmethod
[docs] def node_from_yaml(nodes_spec): """ Creates the FlowNode node and the contained chain based on the node_spec """ node_obj = FlowNode(**FlowNode._prepare_node_chain(nodes_spec)) return node_obj
@staticmethod
[docs] def _prepare_node_chain(nodes_spec): """ Creates the FlowNode node and the contained chain based on the node_spec """ assert "parameters" in nodes_spec if "load_path" in nodes_spec["parameters"]: # Let node load pickled object return nodes_spec["parameters"] else: # The node chain has to be specified in YAML syntax assert "nodes" in nodes_spec["parameters"], \ "FlowNode requires specification of a list of nodes " \ "or of a load_path to a pickled node chain." node_sequence = [ExternalGeneratorSourceNode(), AllTrainSplitterNode()] # For all nodes in the specs for node_spec in nodes_spec["parameters"]["nodes"]: # Use factory method to create node node_obj = BaseNode.node_from_yaml(node_spec) # Append this node to the sequence of node node_sequence.append(node_obj) # Check if the nodes have to cache their outputs for index, node in enumerate(node_sequence): # If a node is trainable, it uses the outputs of its input node # at least twice, so we have to cache. if node.is_trainable(): node_sequence[index - 1].set_permanent_attributes(caching=True) # Split node might also request the data from their input nodes # (once for each split), depending on their implementation. We # assume the worst case and activate caching if node.is_split_node(): node_sequence[index - 1].set_permanent_attributes(caching=True) # Determine if any of the nodes is trainable trainable = reduce(operator.or_, [node.is_trainable() for node in node_sequence]) # Determine if any of the nodes requires supervised training supervised = reduce(operator.or_, [node.is_trainable() for node in node_sequence]) # Create the nodes flow = NodeChain(node_sequence) nodes_spec["parameters"].pop("nodes") # Evaluate all remaining parameters if they are eval statements for key, value in nodes_spec["parameters"].iteritems(): if isinstance(value, basestring) and value.startswith("eval("): nodes_spec["parameters"][key] = eval(value[5:-1]) # Create the node object member_dict = copy.deepcopy(nodes_spec["parameters"]) member_dict["nodes"] = flow member_dict["trainable"] = trainable member_dict["supervised"] = supervised return member_dict
[docs] def is_trainable(self): """ Returns whether this node is trainable. """ return self.trainable
[docs] def is_supervised(self): """ Returns whether this node requires supervised training """ return self.supervised
[docs] def set_run_number(self, run_number): """ Forward run number to flow """ if self.load_path is None: self._get_flow()[-1].set_run_number(run_number) super(FlowNode, self).set_run_number(run_number)
[docs] def set_temp_dir(self, temp_dir): """ Forward temp_dir to flow """ self._get_flow()[-1].set_temp_dir(temp_dir) super(FlowNode, self).set_semp_dir(temp_dir)
[docs] def _get_flow(self): """ Return flow (load flow lazily if not yet loaded). .. todo:: Check if first node is source node and if yes remove .. todo:: Add ExternalGeneratorSourceNode if self.trainable .. todo:: Check if last node is sink node and remove """ if not self.flow: # Load nodes lazily self.replace_keywords_in_load_path() nodes = cPickle.load(open(self.load_path, 'r')) for node in nodes: node._dtype = None self.flow = nodes if not self.changed: self.change_flow() self.changed=True return self.flow
[docs] def change_flow(self): for changeset in self.change_parameters: number=changeset.get("number",1) if not changeset.has_key("node") or not changeset.has_key("parameters"): import warnings warnings.warn("Could not change change set: "+str(changeset)+"!") continue i = 1 for node in self.flow: if pySPACE.missions.nodes.NODE_MAPPING[changeset["node"]]==type(node): if i == number: node._change_parameters(changeset["parameters"]) break else: i += 1
[docs] def _execute(self, data): """ Executes the flow on the given data vector *data* """ # Delegate to internal flow object return self._get_flow().execute(data)
[docs] def _train(self, data, label): """ Trains the flow on the given data vector *data* """ self.train_instances.append((data, label))
[docs] def _stop_training(self): self._get_flow()[0].set_generator(self.train_instances) self._get_flow().train() self.train_instances = [] # We do no longer need the training data
[docs] def _inc_train(self, data, class_label=None): """ Iterate through the nodes to train them """ self._get_flow()._inc_train(data, class_label)
[docs] def _batch_retrain(self,data_list, label_list): """ Batch retraining for node chains The input data is taken, to change the first retrainable node. After the change, the data is processed and given to the next node, which is trained with the data coming from the retrained algorithm. """ for node in self._get_flow(): for i in range(len(label_list)): if node.is_retrainable() and not node.buffering and hasattr(node, "_inc_train"): if not node.retraining_phase: node.retraining_phase=True node.start_retraining() node._inc_train(data_list[i],label_list[i]) data_list = [node._execute(data) for data in data_list] data_list = None label_list = None
[docs] def is_retrainable(self): """ Retraining needed if one node is retrainable """ if self.retrainable: return True else: for node in self._get_flow(): if node.is_retrainable(): return True return False
[docs] def present_label(self, label): """ Forward the label to the nodes *buffering* must be set to *True* only for the main node for using incremental learning in the application (live environment). The inner nodes must not have set this parameter. .. todo:: Implement check on, if the inner nodes do not buffer. """ super(FlowNode, self).present_label(label)
[docs] def reset(self): """ Reset the state to the clean state it had after its initialization """ # Reset not only the node but also all nodes of the encapsulated node chain. # Irrelevant, since the node chain is made permanent or later on loaded. # if self._get_flow(): # for node in self._get_flow(): # node.reset() super(FlowNode, self).reset()
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir* """ if self._get_flow(): for node in self._get_flow(): node.store_state(result_dir, index)
[docs] def get_output_type(self, input_type, as_string=True): """ Get the output type of the flow The method calls the method with the same name from the NodeChain module where the output of an entire flow is determined """ flow = self._get_flow() return flow.get_output_type(input_type, as_string)
[docs]class UnsupervisedRetrainingFlowNode(FlowNode): """ Use classified label for retraining All the other functionality is as described in :class:`FlowNode`. **Parameters** :confidence_boundary: Minimum distance to decision boundary which is required for retraining. By default every result is used. For regression algorithms, this option cannot be used. (*optional, default: 0*) :decision_boundary: Threshold for decision used for calculating classifier confidence. (*optional, default: 0*) .. seealso:: :class:`FlowNode` **Exemplary Call** .. code-block:: yaml - node : UnsupervisedRetrainingFlow parameters : retrain : True nodes : - node : 2SVM parameters : retrain : True :Author: Mario Michael Krell (mario.krell@dfki.de) :Created: 2015/02/07 """
[docs] def __init__(self, decision_boundary=0, confidence_boundary=0, **kwargs): super(UnsupervisedRetrainingFlowNode, self).__init__(**kwargs) self.set_permanent_attributes(decision_boundary=decision_boundary, confidence_boundary=confidence_boundary)
@staticmethod
[docs] def node_from_yaml(nodes_spec): """ Create the FlowNode node and the contained chain """ node_obj = UnsupervisedRetrainingFlowNode( **FlowNode._prepare_node_chain(nodes_spec)) return node_obj
[docs] def _inc_train(self, data, class_label=None): """ Execute for label guess and retrain if appropriate """ result = self._execute(data) if not type(result.prediction) == list and\ (abs(result.prediction - self.decision_boundary) >= self.confidence_boundary): super(UnsupervisedRetrainingFlowNode, self)._inc_train( data, result.label) else: # no adaptation because prediction is not confident pass
[docs]class BatchAdaptSubflowNode(FlowNode): """ Load and retrain a pre-trained NodeChain for recalibration This node encapsulates a whole NodeChain so that it can be used like a node. The path to a pickled NodeChain object has to be passed via *load_path*. The NodeChain object is loaded lazily (i.e. only when required). This is important in situations where this node is pickled as part of a NodeChain again (for instance when using :class:`~pySPACE.environments.backends.multicore`). In contrast to the FlowNode, this node allows also to retrain the loaded NodeChain to novel training data. All nodes of the loaded NodeChain for which *retrain* is set ``True`` are provided with the training data. Before this, the method "start_retraining" is called on this node. The training data is then provided to the "_inc_train" method. **Parameters** :load_path: The path to the pickled NodeChain object that is loaded and encapsulated in this node. This parameter is not optional! **Exemplary Call** .. code-block:: yaml - node : BatchAdaptSubflow parameters : load_path : "some_path" :Author: Mario Krell (mario.krell@dfki.de) :Created: 2012/06/20 """
[docs] def __init__(self, load_path, **kwargs): super(BatchAdaptSubflowNode, self).__init__( load_path=load_path, **kwargs) self.set_permanent_attributes(batch_labels=None)
@staticmethod
[docs] def node_from_yaml(nodes_spec): """ Create the FlowNode node and the contained chain """ node_obj = BatchAdaptSubflowNode(**FlowNode._prepare_node_chain(nodes_spec)) return node_obj
[docs] def _train(self, data, label): """ Expects the nodes to buffer the training samples, when they are executed on the data. """ if self.batch_labels is None: self.batch_labels=[] # save labels self.batch_labels.append(label) # save examples via the buffering parameter in the node # Only the relevant nodes will save there data self._get_flow().execute(data)
[docs] def _stop_training(self): # Present_label is a BaseNode method. # It recursively goes through all nodes. # There _batch_retrain is called with our label list and # the nodes are retrained with this labels on the previously buffered samples. self._get_flow()[-1].present_label(self.batch_labels) self.batch_labels = None
[docs]class BacktransformationNode(FlowNode): """ Determine underlying linear transformation of classifier or regression algorithm The resulting linear transformation can be accessed with the method: *get_previous_transformations* of the following node, e.g., for visualization and sensor ranking. It is stored in the same format as the input data. .. warning:: This node makes sense if and only if the underlying transformations are linear. For nonlinear transformations a more generic approach needs to be implemented. This implementation is not using direct access to the internal algorithms but determining the transformation by testing a large number of samples, which is not efficient but most generic. .. warning:: Currently this node requires stationary processing and does not catch the changing transformation from incremental learning. **References** ========= ========================================================================================== main source: Backtransformation ========= ========================================================================================== author Krell, M. M. and Straube, S. journal Advances in Data Analysis and Classification title `Backtransformation: a new representation of data processing chains with a scalar decision function <http://dx.doi.org/10.1007/s11634-015-0229-3>`_ year 2015 doi 10.1007/s11634-015-0229-3 pages 1-25 ========= ========================================================================================== **Parameters** .. seealso:: :class:`FlowNode` :eps: the step of the difference method :math:`\\varepsilon`. Should be set manually for each differentiation (*default: 2.2e-16*) :method: the method that should be used for the derivation. The available methods are encoded as strings * Forward difference method -> ``method="forward_difference"`` * Central difference method -> ``method="central_difference"`` * Central difference method using a half step -> ``method="central_difference_with_halfstep"`` :mode: the method used to obtain the backtransformation. The choice of method depends to the current dataset and hence * Linear, affine datasets -> ``mode="linear"`` * Non-linear datasets -> ``mode="nonlinear"`` :store_format: specify the format in which the data is to be stored. The options here are: * `txt` file - this file is generated automatically using `numpy.savetxt` * `pickle` file - this file is generated using `pickle.dump` * `mat` file - saved using the scipy matlab interface If no format is specified, no file will be stored. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : Backtransformation parameters : nodes : - node : FFTBandPassFilter parameters : pass_band : [0.0, 4.0] - node : TimeDomainFeatures - node : LinearDiscriminantAnalysisClassifier :Author: Mario Michael Krell :Created: 2013/12/24 """ input_types=["TimeSeries", "FeatureVector"]
[docs] def __init__(self, mode="linear", method="central_difference", eps=2.2*1e-16, store_format=None, **kwargs): super(BacktransformationNode, self).__init__(**kwargs) self.set_permanent_attributes(trafo=None, offset=0.0, example=None, mode=mode, method=method, eps=eps, num_samples=0, covariance=None, store_format=store_format)
[docs] def _execute(self, data): """ Determine example at first call, forward normal processing """ # generate example at first call if self.example is None: self.example = copy.deepcopy(data) result = super(BacktransformationNode, self)._execute(data) return result return super(BacktransformationNode, self)._execute(data)
[docs] def _train(self, data, label): """ Update covariance matrix and forward training """ super(BacktransformationNode, self)._train(data, label) flattened_data = numpy.atleast_2d(data.get_data().flatten()) if self.covariance is None: self.covariance = flattened_data * flattened_data.T else: self.covariance += flattened_data * flattened_data.T self.num_samples += 1
[docs] def _stop_training(self): """ Update covariance matrix and forward training """ super(BacktransformationNode, self)._stop_training() self.covariance = 1.0 * self.covariance / self.num_samples
[docs] def get_own_transformation(self, sample=None): """ Return the transformation parameters """ if sample is None: sample = self.example if self.example is None: self._log("No transformation generated!", level=logging.ERROR) return None elif self.trafo is None and self.mode == "linear": self.generate_affine_backtransformation() elif self.mode == "nonlinear": self.get_derivative(sample=sample) if type(self.example) == TimeSeries: return (self.trafo, (self.offset, self.covariance), self.example.channel_names, "generic_backtransformation") elif type(self.example) == FeatureVector: return (self.trafo, (self.offset, self.covariance), self.example.feature_names, "generic_backtransformation")
[docs] def generate_affine_backtransformation(self): """ Generate synthetic examples and test them to determine transformation This is the key method! """ if type(self.example) == FeatureVector: testsample = FeatureVector.replace_data( self.example, numpy.zeros(self.example.shape)) self.offset = numpy.longdouble(self._execute(testsample)) self.trafo = FeatureVector.replace_data( self.example, numpy.zeros(self.example.shape)) for j in range(len(self.example.feature_names)): testsample = FeatureVector.replace_data( self.example, numpy.zeros(self.example.shape)) testsample[0][j] = 1.0 self.trafo[0][j] = \ numpy.longdouble(self._execute(testsample) - self.offset) elif type(self.example) == TimeSeries: testsample = TimeSeries.replace_data( self.example, numpy.zeros(self.example.shape)) self.offset = numpy.longdouble(numpy.squeeze( self._execute(testsample))) self.trafo = TimeSeries.replace_data( self.example, numpy.zeros(self.example.shape)) for i in range(self.example.shape[0]): for j in range(self.example.shape[1]): testsample = TimeSeries.replace_data( self.example, numpy.zeros_like(self.example)) testsample[i][j] = 1.0 self.trafo[i][j] = \ numpy.longdouble(numpy.squeeze(self._execute(testsample)) - self.offset)
[docs] def normalization(self, sample): """ normalizes the results of the transformation to the same norm as the input **Principle** The function first computes the norm of the input and then applies the same norm to the self.trafo variable such that the results will be on the same scale .. note:: If either the input or the derivative have not been computed already the node will will raise an IOError. """ if self.trafo is None: raise IOError("The derivative has not be computed. Cannot perform normalization.") if sample is None: raise IOError("The initial sample has not been given. Cannot perform normalization.") initial = sample.view(numpy.ndarray) a = initial[0,:] norm_a = numpy.linalg.norm(a) if norm_a == 0: norm_a = 1 initial = self.trafo.view(numpy.ndarray) b = initial[0,:] norm_b = numpy.linalg.norm(b) if norm_b == 0: norm_b = 1 self.trafo = FeatureVector.replace_data(self.trafo, b*norm_a/norm_b)
[docs] def get_derivative(self, sample=None): """ obtain the derivative of the entire transformation The method is just a wrapper for different methods of derivation that are called by the method. The first order derivative is saved to a variable called ``self.trafo`` and can be visualised using specific methods The methods used in the following pieces of code are described in `Numerical Methods in Engineering with Python <http://books.google.de/books?id=WiDie-hev1kC>`_ by Jaan Kiusalaas. Namely, the three methods implemented here are: * Forward difference method * Central difference method * Central difference method using a half step More details about the implementations can be found in the descriptions of the functions **Parameters** :sample: the initial values on which the derivative is to be computed. If no sample is provided, the default ``self.example`` variable is used. (*default: None*) """ if sample is None: warnings.warn("No new sample was given. Using the default example.") sample = self.example if self.method == "forward_difference": self.forward_difference_method(sample=sample) elif self.method == "central_difference": self.central_difference_method(sample=sample) elif self.method == "central_difference_with_halfstep": self.central_difference_with_halfstep_method(sample=sample) else: warnings.warn("Method " + self.method + " is not know. " "Using the forward difference approach") self.forward_difference_method(sample=sample)
#self.normalization(sample)
[docs] def forward_difference_method(self, sample): """ implementation of the forward difference method **Principle** The principle applied by this method of numerical differentiation is .. math:: f'(x)=\\frac{f(x+h)-f(x)}{h} where :math:`h` is the step of the differentiation that is computed as :math:`h(x)=\sqrt{\\varepsilon} \\cdot x` for :math:`x \\neq 0` and :math:`h(0)=\\sqrt{\\varepsilon}` for :math:`x=0`. The differentiation method distinguishes between ``FeatureVector`` and ``TimeSeries`` inputs and applies the derivative according to the input type. **Parameters** :sample: the initial value used for the derivation .. note:: Out of the three numerical differentiation methods, this one has the least overhead. Nonetheless, this method is less accurate than the half step method. """ initial_value = self._execute(sample) if type(sample) == FeatureVector: self.trafo = FeatureVector.replace_data( self.example, numpy.zeros(self.example.shape)) for j in range(len(sample.feature_names)): data_with_offset = copy.deepcopy(sample) if data_with_offset[0][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*data_with_offset[0][j] orig = data_with_offset[0][j] data_with_offset[0][j] += diff diff = data_with_offset[0][j] - orig new_feature_vector = FeatureVector.replace_data( sample, data_with_offset ) self.trafo[0][j] = \ numpy.longdouble((self._execute(new_feature_vector) - initial_value)/diff) elif type(sample) == TimeSeries: self.trafo = TimeSeries.replace_data( self.example, numpy.zeros(self.example.shape)) for i in range(sample.shape[0]): for j in range(sample.shape[1]): data_with_offset = copy.deepcopy(sample) if data_with_offset[i][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*data_with_offset[0][j] data_with_offset[i][j] += diff new_time_series = TimeSeries.replace_data( sample, data_with_offset) self.trafo[i][j] = \ numpy.longdouble((numpy.squeeze(self._execute(new_time_series)) - numpy.squeeze(initial_value))/diff)
[docs] def central_difference_method(self, sample): """ implementation of the central difference method **Principle** The principle applied by the central difference method is .. math:: f'(x)=\\frac{f(x+h)-f(x-h)}{2h} where :math:`h` is the step of the differentiation that is computed as :math:`h(x)=\sqrt{\\varepsilon} \\cdot x` for :math:`x \\neq 0` and :math:`h(0)=\\sqrt{\\varepsilon}` for :math:`x=0`. **Parameters** :sample: the initial value used for the derivation """ if type(sample) == FeatureVector: self.trafo = FeatureVector.replace_data( sample, numpy.zeros(sample.shape)) for j in range(len(sample.feature_names)): positive_offset = copy.deepcopy(sample) negative_offset = copy.deepcopy(sample) if positive_offset[0][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*positive_offset[0][j] positive_offset[0][j] += diff negative_offset[0][j] -= diff diff = (positive_offset[0][j]-negative_offset[0][j])/2. positive_vector = FeatureVector.replace_data( sample, positive_offset ) negative_vector = FeatureVector.replace_data( sample, negative_offset ) self.trafo[0][j] = \ numpy.longdouble((self._execute(positive_vector) - self._execute(negative_vector))/(2.*diff)) elif type(sample) == TimeSeries: self.trafo = TimeSeries.replace_data( self.example, numpy.zeros(self.example.shape)) for i in range(sample.shape[0]): for j in range(sample.shape[1]): positive_offset = copy.deepcopy(sample) negative_offset = copy.deepcopy(sample) if positive_offset[i][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*positive_offset[i][j] positive_offset[i][j] += diff negative_offset[i][j] -= diff diff = (positive_offset[i][j]-negative_offset[i][j])/2. positive_series = TimeSeries.replace_data( sample, positive_offset ) negative_series = TimeSeries.replace_data( sample, negative_offset ) self.trafo[i][j] = \ numpy.longdouble((self._execute(positive_series) - self._execute(negative_series))/(2.*diff))
[docs] def central_difference_with_halfstep_method(self, sample): """ implementation of the central difference method with a half step **Principle** The principle applied by the central difference method with a half step is .. math:: f'(x)=\\frac{f(x-h)-8f(x-\\frac{h}{2})+8f(x+\\frac{h}{2})-f(x-h)}{6h} where :math:`h` is the step of the differentiation that is computed as :math:`h(x)=\sqrt{\\varepsilon} \\cdot x` for :math:`x \\neq 0` and :math:`h(0)=\\sqrt{\\varepsilon}` for :math:`x=0`. **Parameters** :sample: the initial value used for the derivation .. note:: This method is the most accurate differentiation method but also has the greatest overhead. """ if type(sample) == FeatureVector: self.trafo = FeatureVector.replace_data( self.example, numpy.zeros(self.example.shape)) for j in range(len(sample.feature_names)): positive_offset = copy.deepcopy(sample) negative_offset = copy.deepcopy(sample) half_positive_offset = copy.deepcopy(sample) half_negative_offset = copy.deepcopy(sample) if positive_offset[0][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*positive_offset[0][j] positive_offset[0][j] += diff negative_offset[0][j] -= diff half_positive_offset[0][j] += diff/2. half_negative_offset[0][j] -= diff/2. diff = (positive_offset[0][j]-negative_offset[0][j])/2. positive_vector = FeatureVector.replace_data( sample, positive_offset ) negative_vector = FeatureVector.replace_data( sample, negative_offset ) half_positive_vector = FeatureVector.replace_data( sample, half_positive_offset ) half_negative_vector = FeatureVector.replace_data( sample, half_negative_offset ) self.trafo[0][j] = \ numpy.longdouble((self._execute(negative_vector) - 8*self._execute(half_negative_vector) + 8*self._execute(half_positive_vector) - self._execute(positive_vector))/(6.*diff)) elif type(sample) == TimeSeries: self.trafo = TimeSeries.replace_data( self.example, numpy.zeros(self.example.shape)) for i in range(sample.shape[0]): for j in range(sample.shape[1]): positive_offset = copy.deepcopy(sample) negative_offset = copy.deepcopy(sample) half_positive_offset = copy.deepcopy(sample) half_negative_offset = copy.deepcopy(sample) if positive_offset[i][j] == 0.: diff = numpy.sqrt(self.eps) else: diff = numpy.sqrt(self.eps)*positive_offset[i][j] positive_offset[i][j] += diff negative_offset[i][j] -= diff half_positive_offset[i][j] += diff/2. half_negative_offset[i][j] -= diff/2. diff = (positive_offset[i][j]-negative_offset[i][j])/2. positive_series = TimeSeries.replace_data( sample, positive_offset ) negative_series = TimeSeries.replace_data( sample, negative_offset ) half_positive_series = TimeSeries.replace_data( sample, half_positive_offset ) half_negative_series = TimeSeries.replace_data( sample, half_negative_offset ) self.trafo[i][j] = \ numpy.longdouble((self._execute(negative_series) - 8*self._execute(half_negative_series) + 8*self._execute(half_positive_series) - self._execute(positive_series))/(6.*diff))
[docs] def get_sensor_ranking(self): """ Transform the transformation to a sensor ranking by adding the respective absolute values This method is following the principles as implemented in :class:`~pySPACE.missions.nodes.classification.base.RegularizedClassifierBase`. There might be some similarities in the code. """ self.generate_backtransformation() ## interfacing to code from RegularizedClassifierBase if type(self.trafo) == FeatureVector: trafo = self.trafo elif type(self.trafo) == TimeSeries: # canonic mapping of time series to feature vector for simplicity node = TimeDomainFeaturesNode() trafo = node._execute(self.trafo) ## code from RegularizedClassifierBase with ``trafo`` instead of ## ``self.features`` # channel name is what comes after the first underscore feat_channel_names = [chnames.split('_')[1] for chnames in trafo.feature_names] from collections import defaultdict ranking_dict = defaultdict(float) for i in range(len(trafo[0])): ranking_dict[feat_channel_names[i]] += abs(trafo[0][i]) ranking = sorted(ranking_dict.items(),key=lambda t: t[1]) return ranking
[docs] def _inc_train(self, data, class_label=None): """ This method is not yet implemented """ self._log("Incremental backtransformation is not yet available!", level=logging.ERROR) super(BacktransformationNode,self)._inc_train(data, class_label)
@staticmethod
[docs] def node_from_yaml(nodes_spec): """ Creates the FlowNode node and the contained chain based on the node_spec """ node_obj = BacktransformationNode(**FlowNode._prepare_node_chain(nodes_spec)) return node_obj
[docs] def store_state(self, result_dir, index=None): """ Store the results This method stores the transformation matrix, the offset, the covariance matrix and the channel names. The `store_format` variable must be set to either of the 3 corresponding formats: `txt`, `pickle` or `mat`. If the `store_format` variable is `None`, the output will not be stored. """ import os if self.store_format == "txt" : numpy.set_printoptions(threshold='nan') file_name = os.path.join(result_dir, "backtransformation.txt") numpy.savetxt(file_name, self.get_own_transformation(), delimiter=" ", fmt="%s") elif self.store_format == "pickle": import pickle file_name = os.path.join(result_dir, "backtransformation.pickle") pickle.dump(self.get_own_transformation(), open(file_name, "w")) elif self.store_format == "mat": import scipy.io file_name = os.path.join(result_dir, "backtransformation.mat") result = self.get_own_transformation() result_dict = { "Transformation matrix":result[0], "Offset":result[1][0], "Covariance matrix":result[1][1], "Feature/Channel names": result[2], "Transformation name": result[3] } scipy.io.savemat(open(file_name, "w"), result_dict) elif self.store_format is not None: message = ("Storage format \"%s\" unrecognized. " + "Please choose between \"mat\",\"txt\""+ " and \"pickle\"") % self.store_format warnings.warn(message)
_NODE_MAPPING = {"Flow_Node": FlowNode, "Batch_Adapt_Subflow" : BatchAdaptSubflowNode}