Source code for pySPACE.missions.operations.base

""" Interface for the numerous operations and processes

An **operation** takes one summary of data as input and produces a
second one as output with a set of *processes*.

Each **process** can be considered as
an atomic task that can be executed independently from all other processes
of the operation.

Standard Specification File Parameters
++++++++++++++++++++++++++++++++++++++

type
----

Specifies which operation is used.
Type and corresponding module name are the same in lower case (with underscore).
The respective operation name is written in camel case with the ending
``Operation``.

(*obligatory*)

input_path
------------
Path name relative to the *storage* in your configuration file.

The input has to fulfill certain rules,
specified in the fitting: :mod:`pySPACE.resources.dataset_defs` subpackage.

Depending on the operation, the input datasets are combined, or each parameter
combination is applied separately to the data.

(*mostly obligatory*)

storage_format
--------------

Some datasets give the opportunity to choose between different
formats for storing the result. The choice can be specified here.
For details look at the :mod:`~pySPACE.resources.dataset_defs` documentation.
If the format is not specified, the default of the
dataset is used.

(*optional, default: default of dataset*)

hide_parameters
---------------

Normally each parameter is added in the format
*{__PARAMETER__: value}* added to the *__RESULT_DIRECTORY__*.
Every parameter specified in the list *hide_parameters* is not
added to the folder name. Be careful not to have
the same name for different results.

(*optional, default: []*)

runs
----

Number of repetitions for each running process.
The run number is given to the process, to use it for *choosing*
random components, especially when using cross-validation.

(*optional, default: 1*)

backend
-------

Overwrite the backend defined in the command line
or, if not specified, the default (serial) backend

(*optional, default: command line or default backend*)

others
------

For the operation specific parameters have a look at operation
documentation.
"""
import glob

import os
import time
import logging
import socket
import sys
import warnings
import yaml
import copy

file_path = os.path.dirname(os.path.abspath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
    sys.path.append(pyspace_path)

import pySPACE
from pySPACE.tools.filesystem import create_directory


[docs]class Operation(object): """ Take one summary of data as input and produces a second one as output with a set of *processes*., that can be executed on an arbitrary backend modality """
[docs] def __init__(self, processes, operation_spec, result_directory): self.processes = processes self.operation_spec = operation_spec self.result_directory = result_directory # Check if the required directories exist # and create them if necessary create_directory(self.result_directory) # Store the specification of this operation in the directory # without the base_file entry base_file = self.operation_spec.pop("base_file", None) source_operation_file = open(os.sep.join([self.result_directory, "source_operation.yaml"]), 'w') yaml.dump(self.operation_spec, source_operation_file) source_operation_file.close() if not base_file is None: self.operation_spec["base_file"] = base_file
[docs] def consolidate(self, results): """ Consolidates the results obtained by the single processes of this operation into a consistent structure of collections. """ raise NotImplementedError( "Method consolidate has not been implemented in subclass %s" % self.__class__.__name__)
[docs] def _log(self, message, level=logging.INFO): """ Logs the given message with the given logging level """ logging.getLogger("%s" % self).log(level, message)
@classmethod
[docs] def create(cls, operation_spec, base_result_dir=None): """ A factory method that calls the responsible method for creating an operation of the type specified in the operation specification dictionary (*operation_spec*). """ # Determine result directory result_directory = cls.get_unique_result_dir(base_result_dir) print("--> Results will be stored at: \n\t\t %s"%str(result_directory)) # Check if the required directories exist # and create them if necessary create_directory(result_directory) # Determine all input datasets (note: they can be specified by # extended syntax for the glob package) storage = pySPACE.configuration.storage if not operation_spec.has_key("input_path"): warnings.warn("No input path found in operation specification.") input_path_pattern = os.sep.join([storage, operation_spec.get("input_path", ""), "*", ""]) input_paths = glob.glob(input_path_pattern) obsolete_paths=[] for path in input_paths: if os.path.isfile(os.sep.join([path,"metadata.yaml"])): continue elif os.path.isfile(os.sep.join([path,"collection.yaml"])): continue # warning comes, when data is loaded else: obsolete_paths.append(path) warnings.warn('Folder' + str(path) + ' seems not to be a pySPACE'+ ' dataset (no "metadata.yaml" found)! '+ 'Skipping this folder in operation...') for path in obsolete_paths: input_paths.remove(path) op_type = operation_spec["type"] if op_type.endswith("_operation"): l=len("_operation")*-1 op_type=op_type[:l] operation_spec["type"] = op_type warnings.warn("'%s_operation' has the wrong ending. Using '%s' instead."%(op_type,op_type),DeprecationWarning) op_class_name = ''.join([x.title() for x in op_type.split('_')]) op_class_name += "Operation" # dynamic class import: from data_mod_name import col_class_name try: op_module = __import__('pySPACE.missions.operations.%s' % op_type, fromlist=[op_class_name]) except: msg = "Operation module %s is unknown. Trying to use node_chain." % (op_type) from pySPACE.missions.operations.node_chain import NodeChainOperation op_class = NodeChainOperation else: op_class = getattr(op_module,op_class_name) return op_class.create(operation_spec, result_directory, input_paths=input_paths)
@classmethod
[docs] def get_unique_result_dir(cls, base_result_dir): """ Creates a new result directory with current time stamp """ # Determine the directory in which the operation's result # are stored time_stamp = time.strftime("%Y%m%d_%H_%M_%S") result_directory = os.path.join(base_result_dir, time_stamp) return result_directory
@classmethod
[docs] def _get_parameter_space(cls, operation_spec): """ Determines all parameter combinations that should be tested """ # Crossproduct crossproduct = lambda ss,row=[],level=0: len(ss)>1 \ and reduce(lambda x,y:x+y,[crossproduct(ss[1:],row+[i],level+1) for i in ss[0]]) \ or [row+[i] for i in ss[0]] # Compute all possible parameter combinations # for the node chain template instantiation if "parameter_ranges" in operation_spec: parametrization_def = operation_spec["parameter_ranges"] parameter_ranges = [eval(range_expression[5:-1]) if isinstance(range_expression, basestring) and range_expression.startswith("eval(") else range_expression for range_expression in parametrization_def.values()] parameter_settings = map(lambda x: dict(zip(parametrization_def.keys(), x)), crossproduct(parameter_ranges)) elif "parameter_settings" in operation_spec: # Just use specified parameters parameter_settings = operation_spec["parameter_settings"] else: parameter_settings = [dict()] # Filter the parameter combinations if constraints have been set def check_constraint(constraint, parameters): for key, value in parameters.iteritems(): # TODO Fix string mapping for floats constraint = constraint.replace(key, str(value)) return eval(constraint) if "constraints" in operation_spec: for constraint_def in operation_spec["constraints"]: # for every elem (*x*) in list *parameter_settings* call # lambda-function, if this returns true, elem will stay in the # list, otherwise it is filtered out parameter_settings = filter(lambda x: check_constraint(constraint_def, x), parameter_settings) return parameter_settings
[docs] def get_output_directory(self): """ Returns the directory where the output is stored """ return self.result_directory
[docs] def __repr__(self): """ Return a representation of this class""" return self.__class__.__name__
[docs]class Process(object): """ A process is an atomic task that can be executed on an arbitrary backend modality by executing the __call__ method. """
[docs] def __init__(self): pass
[docs] def prepare(self, configuration, handler_class=None, handler_args=None, backend_com=None): """ This method is called by the respective backend in order to prepare the process for remote execution """ self.configuration = copy.deepcopy(configuration) self.handler_class = handler_class self.handler_args = handler_args self.handler = None self.backend_com = backend_com
[docs] def __repr__(self): """ Return a representation of this class""" return self.__class__.__name__
[docs] def pre_benchmarking(self): """ Execute some code which is not specific for the respective operation but needs to be executed before benchmarking remotely (e.g. set up logging, setting environment variables etc.) """ # Prepare remote logging root_logger = logging.getLogger("%s-%s" % (socket.gethostname(), os.getpid())) root_logger.setLevel(logging.DEBUG) root_logger.propagate = False if self.handler_class != None: self.handler = self.handler_class(**self.handler_args) root_logger.addHandler(self.handler) new_python_path = self.configuration.python_path python_path_set = set(new_python_path) import sys unique_sys_path = [i for i in sys.path if i not in python_path_set and not python_path_set.add(i)] new_python_path.extend(unique_sys_path) sys.path = new_python_path
[docs] def post_benchmarking(self): """ Execute some code which is not specific for the respective operation but needs to be executed after benchmarking remotely to clean up """ # Remove potential logging handlers if self.handler is not None: root_logger = logging.getLogger("%s-%s" % (socket.gethostname(), os.getpid())) self.handler.close() root_logger.removeHandler(self.handler)
[docs] def _log(self, message, level = logging.INFO): """ Log the given message into the logger of this class """ logging.getLogger("%s-%s.%s" % (socket.gethostname(), os.getpid(), self)).log(level, message)
[docs]def create_operation_from_file(operation_filename, base_result_dir = None): """ Creates the operation for the file *operation_filename* Creates the operation for a given operation filename. If *operation_filename* is an absolute path, it is expected to be the path to the YAML specification file of an operation. Alternatively, if *operation_filename* is not an absolute path, the name is used to look up a YAML specification file for the operation in the spec dir. If *base_result_dir* is not None, the results of the operation are written into the specified directory. """ # Load operation from specs directory when not an absolute path if not os.path.isabs(operation_filename): spec_file_name = os.path.join(pySPACE.configuration.spec_dir, "operations", operation_filename) else: spec_file_name = operation_filename print("--> Loading operation: \n\t\t %s."%spec_file_name) try: with open(spec_file_name, "r") as op_file: operation_spec = yaml.load(op_file) with open(spec_file_name, "r") as op_file: operation_spec["base_file"] = op_file.read() except IOError,e: if not spec_file_name.endswith(".yaml"): warnings.warn( "Operation specification not found. Trying with yaml ending.") spec_file_name += ".yaml" with open(spec_file_name, "r") as op_file: operation_spec = yaml.load(op_file) with open(spec_file_name, "r") as op_file: operation_spec["base_file"] = op_file.read() else: raise e storage = pySPACE.configuration.storage if operation_spec.has_key("input_path"): input_path = os.sep.join([storage, operation_spec["input_path"], ""]) print("--> Input data is taken from: \n\t\t %s"%input_path) return create_operation(operation_spec, base_result_dir)
[docs]def create_operation(operation_spec, base_result_dir = None): """ Creates the operation for the given operation spec Simple wrapper to the *create* function of the operation, getting the operation result dir from the configuration file. """ if base_result_dir is None: base_result_dir = os.path.join(pySPACE.configuration.storage, "operation_results") # use the operation factory method to create operation operation = Operation.create(operation_spec, base_result_dir = base_result_dir) return operation