Source code for pySPACE.missions.nodes.meta.same_input_layer

""" Combine several other nodes together in parallel

This is useful to be combined with the
:class:`~pySPACE.missions.nodes.meta.flow_node.FlowNode`.
"""

import numpy
from pySPACE.environments.chains.node_chain import NodeChainFactory

from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.feature_vector import FeatureVector
from pySPACE.resources.data_types.time_series import TimeSeries
from pySPACE.resources.data_types.prediction_vector import PredictionVector

# ensemble imports
import os
try:
    import portalocker
except ImportError, e:
    pass
# import fcntl
import fnmatch
import cPickle
import logging
import warnings
from collections import defaultdict

from pySPACE.missions.nodes.meta.flow_node import FlowNode
from pySPACE.tools.filesystem import locate


[docs]class SameInputLayerNode(BaseNode): """ Encapsulates a set of other nodes that are executed in parallel in the flow. This node was a thin wrapper around MDP's SameInputLayer node but is now an own implementation. **Parameters** :enforce_unique_names: When combining time series channels or feature vectors, the node adds the index of the current node to the channel names or feature names as a prefix to enforce unique names. (*optional, default: True*) **Exemplary Call** .. code-block:: yaml - node : Same_Input_Layer parameters : enforce_unique_names : True nodes : - node : Time_Domain_Features parameters : moving_window_length : 1 - node : STFT_Features parameters : frequency_band : [2.0, 8.0] frequency_resolution : 1.0 """ input_types = ["TimeSeries"]
[docs] def __init__(self, nodes,enforce_unique_names=True, store=False, **kwargs): self.nodes = nodes # needed to find out dimensions and trainability,... super(SameInputLayerNode, self).__init__(**kwargs) self.permanent_state.pop("nodes") self.set_permanent_attributes(output_type=None, names=None, unique=enforce_unique_names)
@staticmethod
[docs] def node_from_yaml(layer_spec): """ Load the specs and initialize the layer nodes """ # This node requires one parameters, namely a list of nodes assert("parameters" in layer_spec and "nodes" in layer_spec["parameters"]),\ "SameInputLayerNode requires specification of a list of nodes!" # Create all nodes that are packed together in this layer layer_nodes = [] for node_spec in layer_spec["parameters"]["nodes"]: node_obj = BaseNode.node_from_yaml(node_spec) layer_nodes.append(node_obj) layer_spec["parameters"].pop("nodes") # Create the node object node_obj = SameInputLayerNode( nodes=layer_nodes, **layer_spec["parameters"]) return node_obj
[docs] def reset(self): """ Also reset internal nodes """ nodes = self.nodes for node in nodes: node.reset() super(SameInputLayerNode, self).reset() self.nodes = nodes
[docs] def register_input_node(self, input_node): """ All sub-nodes have the same input node """ super(SameInputLayerNode, self).register_input_node(input_node) # Register the node as the input for all internal nodes for node in self.nodes: node.register_input_node(input_node)
[docs] def _execute(self, data): """ Process the data through the internal nodes """ names = [] result_array = None result_label = [] result_predictor = [] result_prediction = [] # For all node-layers for node_index, node in enumerate(self.nodes): # Compute node's result node_result = node.execute(data) # Determine the output type of the node if self.output_type is None: self.output_type = type(node_result) else: assert (self.output_type == type(node_result)), \ "SameInputLayerNode requires that all of its layers return "\ "the same type. Types found: %s %s" \ % (self.output_type, type(node_result)) # Merge the nodes' outputs depending on the type if self.output_type == FeatureVector: result_array = \ self.add_feature_vector(node_result, node_index, result_array, names) elif self.output_type == PredictionVector: if type(node_result.label) == list: result_label.extend(node_result.label) else: # a single classification is expected here result_label.append(node_result.label) if type(node_result.prediction) == list: result_prediction.extend(node_result.prediction) else: result_prediction.append(node_result.prediction) if type(node_result.predictor) == list: result_predictor.extend(node_result.predictor) else: result_predictor.append(node_result.predictor) else: assert (self.output_type == TimeSeries), \ "SameInputLayerNode can not merge data of type %s." \ % self.output_type if self.names is None and not self.unique: names.extend(node_result.channel_names) elif self.names is None and self.unique: for name in node_result.channel_names: names.append("%i_%s" % (node_index, name)) if result_array == None: result_array = node_result if self.dtype == None: self.dtype = node_result.dtype else : result_array = numpy.concatenate((result_array, node_result), axis=1) # Construct output with correct type and names if self.names is None: self.names = names if self.output_type == FeatureVector: return FeatureVector(result_array, self.names) elif self.output_type == PredictionVector: return PredictionVector(label=result_label, prediction=result_prediction, predictor=result_predictor) else: return TimeSeries(result_array, self.names, node_result.sampling_frequency, node_result.start_time, node_result.end_time, node_result.name, node_result.marker_name)
[docs] def add_feature_vector(self, data, index, result_array, names): """ Concatenate feature vectors, ensuring unique names """ if self.names is None and self.unique: for name in data.feature_names: names.append("%i_%s" % (index,name)) elif self.names is None and not self.unique: names.extend(data.feature_names) if result_array == None: result_array = data else: result_array = numpy.concatenate((result_array,data), axis=1) return result_array
[docs] def is_trainable(self): """ Trainable if one subnode is trainable """ for node in self.nodes: if node.is_trainable(): return True return False
[docs] def is_supervised(self): """ Supervised if one subnode requires supervised training """ for node in self.nodes: if node.is_supervised(): return True return False
# # def train_sweep(self, use_test_data): # """ Train all internal nodes """ # for node in self.nodes: # node.train_sweep(use_test_data)
[docs] def _train(self, x, *args, **kwargs): """ Perform single training step by training the internal nodes """ for node in self.nodes: if node.is_training(): node.train(x, *args, **kwargs)
[docs] def _stop_training(self): """ Perform single training step by training the internal nodes """ for node in self.nodes: if node.is_training(): node.stop_training()
[docs] def store_state(self, result_dir, index=None): """ Stores all nodes in subdirectories of *result_dir* """ for i, node in enumerate(self.nodes): node_dir = os.path.join(result_dir, (self.__class__.__name__+str(index).split("None")[0]+str(i))) node.store_state(node_dir, index=i)
[docs] def _inc_train(self, data, label): """ Forward data to retrainable nodes So the single nodes do not need to buffer or *present_labels* does not have to be reimplemented. """ for node in self.nodes: if node.is_retrainable(): node._inc_train(data, label)
[docs] def set_run_number(self, run_number): """ Informs all subnodes about the number of the current run """ for node in self.nodes: node.set_run_number(run_number) super(SameInputLayerNode, self).set_run_number(run_number)
[docs] def get_output_type(self, input_type, as_string=True): """ Returns expected output from first node Additionally the type is compared with the expected output of the other nodes to ensure consistency. """ output = None for node in self.nodes: if output is None: output = node.get_output_type(input_type, as_string) elif output != node.get_output_type(input_type, as_string): warnings.warn("Node %s yields has a different output from" "the rest of the nodes.", str(node)) else: continue return output
[docs]class EnsembleNotFoundException(Exception): """Error when loading of ensembles is not possible""" pass
[docs]class ClassificationFlowsLoaderNode(BaseNode): """ Combine an ensemble of pretrained node chains This node loads all "pickled" flows whose file names match *ensemble_pattern* and are contained in the directory tree rooted at *ensemble_base_dir*. If the *flow_select_list* is not empty, only the flows with indices contained in flow_select_list are used. The index "-1" corresponds to "all flows". **Parameters** :ensemble_base_dir: The root directory under which the stored flow objects which constitute the ensemble are stored. :ensemble_pattern: Pickled flows must match the given pattern to be included into the ensemble. :flow_select_list: This optional parameter allows to select only a subset of the flows that are found in ensemble_base_dir. It must be a list of indices. Only the flows with the given index are included into the ensemble. If -1 is contained in the list, all flows are automatically added to the ensemble. .. note:: The order of the flows in the ensemble is potentially random or at least hard to predict. Thus, this parameter should not be used to select a specific flow. In contrast, this parameter can be used to select a certain number of flows from the available flows (where it doesn't matter which ones). This can be useful for instance in benchmarking experiments when one is interested in the average performance of an ensemble of a certain size. (*optional, default: [-1]*) :cache_dir: If this argument is given, all results of all ensembles are remembered and stored in a persistent cache file in the given cache_dir. These cached results can be later reused without actually loading and executing the ensemble. (*optional, default: None*) **Exemplary Call** .. code-block:: yaml - node : Ensemble_Node parameters : ensemble_base_dir : "/tmp/" # <- insert suitable directory here ensemble_pattern : "flow*.pickle" flow_select_list : "eval(range(10))" :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Created: 2010/05/20 """
[docs] def __init__(self, ensemble_base_dir, ensemble_pattern, flow_select_list=[-1], cache_dir=None, **kwargs): super(ClassificationFlowsLoaderNode, self).__init__(**kwargs) try: import portalocker except ImportError, e: warnings.warn("Before running " + self.__class__.__name__ + ", please install the portalocker module, e.g.," + " via pip install") raise(e) # Load all flow-pickle files that match the given ensemble_pattern # in the directory tree rooted in ensemble_base_dir flow_pathes = tuple(locate(ensemble_pattern, ensemble_base_dir)) if -1 not in flow_select_list: # Select only flows for ensemble whose index is contained in # flow_select_list flow_pathes = tuple(flow_pathes[index] for index in flow_select_list) if len(flow_pathes) == 0: raise EnsembleNotFoundException( "No ensemble found in %s for pattern %s" % (ensemble_base_dir, ensemble_pattern)) self.feature_names = \ map(lambda s: "_".join(s.split(os.sep)[-1].split('_')[0:2]), flow_pathes) self.set_permanent_attributes(ensemble=None, flow_pathes=flow_pathes, cache_dir=cache_dir, cache=None, cache_updated=False, store=True) # always store cache
[docs] def _load_cache(self): self.cache = defaultdict(dict) # Check if there are cached results for this ensemble for flow_path in self.flow_pathes: file_path = self.cache_dir + os.sep + "ensemble_cache" + os.sep \ + "cache_%s" % hash(flow_path) if os.path.exists(file_path): # Load ensemble cache self._log("Loading flow cache from %s" % file_path) lock_file = open(file_path + ".lock", 'w') portalocker.lock(lock_file, portalocker.LOCK_EX) # fcntl.flock(lock_file, fcntl.LOCK_EX) self._log("Got exclusive lock on %s" % (file_path + ".lock"), logging.INFO) cache_file = open(file_path, 'r') self.cache[flow_path] = cPickle.load(cache_file) cache_file.close() self._log("Release exclusive lock on %s" % (file_path + ".lock"), logging.INFO) # fcntl.flock(lock_file, fcntl.LOCK_UN) portalocker.unlock(lock_file)
[docs] def _load_ensemble(self): self._log("Loading ensemble") # Create a flow node for each flow pickle flow_nodes = [FlowNode(load_path=flow_path) for flow_path in self.flow_pathes] # Create an SameInputLayer node that executes all flows independently # with the same input ensemble = SameInputLayerNode(flow_nodes, enforce_unique_names=True) # We can now set the input dim and output dim self.input_dim = ensemble.input_dim self.output_dim = ensemble.output_dim self.set_permanent_attributes(ensemble = ensemble)
[docs] def _train(self, data, label): """ Trains the ensemble on the given data vector *data* """ if self.ensemble == None: # Load ensemble since data is not cached self._load_ensemble() return self.ensemble.train(data, label)
[docs] def _execute(self, data): # Compute data's hash data_hash = hash(tuple(data.flatten())) # Load ensemble's cache if self.cache == None: if self.cache_dir: self._load_cache() else: # Caching disabled self.cache = defaultdict(dict) # Try to lookup the result of this ensemble for the given data in the cache labels = [] predictions = [] for i, flow_path in enumerate(self.flow_pathes): if data_hash in self.cache[flow_path]: label, prediction = self.cache[flow_path][data_hash] else: self.cache_updated = True if self.ensemble == None: # Load ensemble since data is not cached self._load_ensemble() node_result = self.ensemble.nodes[i].execute(data) label = node_result.label prediction = node_result.prediction self.cache[flow_path][data_hash] = (label, prediction) labels.append(label) predictions.append(prediction) result = PredictionVector(label=labels, prediction=predictions, predictor=self) result.dim_names = self.feature_names return result
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir* """ # Store cache if caching is enabled and cache has changed if self.cache_dir and self.cache_updated: if not os.path.exists(self.cache_dir + os.sep + "ensemble_cache"): os.makedirs(self.cache_dir + os.sep + "ensemble_cache") for flow_path in self.flow_pathes: file_path = self.cache_dir + os.sep + "ensemble_cache" + os.sep \ + "cache_%s" % hash(flow_path) if os.path.exists(file_path): self._log("Updating flow cache %s" % file_path) # Update existing cache persistency file lock_file = open(file_path + ".lock", 'w') portalocker.lock(lock_file, portalocker.LOCK_EX) # fcntl.flock(lock_file, fcntl.LOCK_EX) self._log("Got exclusive lock on %s" % (file_path + ".lock"), logging.INFO) cache_file = open(file_path, 'r') self.cache[flow_path].update(cPickle.load(cache_file)) cache_file.close() cache_file = open(file_path, 'w') cPickle.dump(self.cache[flow_path], cache_file) cache_file.close() self._log("Release exclusive lock on %s" % (file_path + ".lock"), logging.INFO) portalocker.unlock(lock_file) # fcntl.flock(lock_file, fcntl.LOCK_UN) else: self._log("Writing flow cache %s" % file_path) # Create new cache persistency file lock_file = open(file_path + ".lock", 'w') portalocker.lock(lock_file, portalocker.LOCK_EX) # fcntl.flock(lock_file, fcntl.LOCK_EX) self._log("Got exclusive lock on %s" % (file_path + ".lock"), logging.INFO) cache_file = open(file_path, 'w') cPickle.dump(self.cache[flow_path], cache_file) cache_file.close() self._log("Release exclusive lock on %s" % (file_path + ".lock"), logging.INFO) portalocker.unlock(lock_file)
# fcntl.flock(lock_file, fcntl.LOCK_UN)
[docs] def get_output_type(self, input_type, as_string=True): if as_string: return "PredictionVector" else: return PredictionVector
[docs]class MultiClassLayerNode(SameInputLayerNode): """ Wrap the one vs. rest or one vs. one scheme around the given node The given class labels are forwarded to the internal nodes. During training, data is relabeled. Everything else is the same as in the base node. Though this scheme is most important for classification it permits other trainable algorithms to use this scheme. **Parameters** :class_labels: This is the complete list of expected class labels. It is needed to construct the necessary flows in the initialization stage. :node: Specification of the wrapped node for the used scheme As class labels , for the *1vsR* scheme, this node has to use *REST* and *LABEL*. *LABEL* is replaced with the different `class_labels`. The other label should be *REST*. For the *1vs1* scheme *LABEL1* and *LABEL2* have to be used. :scheme: One of *1v1* (One vs. One) or *1vR* (One vs. Rest) .. note:: The one class approach is included by simply not giving 'REST' label to the classifier, but filtering it out. (*optional, default:'1v1'*) **Exemplary Call** .. code-block:: yaml - node : MultiClassLayer parameters : class_labels : ["Target", "Standard","Artifact"] scheme : "1vR" node : - node : 1SVM parameters : class_labels : ["LABEL","REST"] complexity : 1 """ input_types=["FeatureVector"] @staticmethod
[docs] def node_from_yaml(layer_spec): """ Load the specs and initialize the layer nodes """ assert("parameters" in layer_spec and "class_labels" in layer_spec["parameters"] and "node" in layer_spec["parameters"]),\ "Node requires specification of a node and classification labels!" scheme = layer_spec["parameters"].pop("scheme","1vs1") # Create all nodes that are packed together in this layer layer_nodes = [] node_spec = layer_spec["parameters"]["node"][0] classes = layer_spec["parameters"]["class_labels"] if scheme=='1vR': for label in layer_spec["parameters"]["class_labels"]: node_obj = BaseNode.node_from_yaml(NodeChainFactory.instantiate(node_spec,{"LABEL":label})) layer_nodes.append(node_obj) else: n=len(classes) for i in range(n-1): for j in range(i+1,n): replace_dict = {"LABEL1":classes[i],"LABEL2":classes[j]} node_obj = BaseNode.node_from_yaml(NodeChainFactory.instantiate(node_spec,replace_dict)) layer_nodes.append(node_obj) layer_spec["parameters"].pop("node") layer_spec["parameters"].pop("class_labels") # Create the node object node_obj = MultiClassLayerNode(nodes = layer_nodes,**layer_spec["parameters"]) return node_obj
_NODE_MAPPING = {"Ensemble_Node": ClassificationFlowsLoaderNode, "Same_Input_Layer": SameInputLayerNode, }