Source code for pySPACE.missions.operations.base

""" Interface for the numerous operations and processes

An **operation** takes one summary of data as input and produces a
second one as output with a set of *processes*.

Each **process** can be considered as
an atomic task that can be executed independently from all other processes
of the operation.

Standard Specification File Parameters


Specifies which operation is used.
Type and corresponding module name are the same in lower case (with underscore).
The respective operation name is written in camel case with the ending


Path name relative to the *storage* in your configuration file.

The input has to fulfill certain rules,
specified in the fitting: :mod:`pySPACE.resources.dataset_defs` subpackage.

Depending on the operation, the input datasets are combined, or each parameter
combination is applied separately to the data.

(*mostly obligatory*)


Some datasets give the opportunity to choose between different
formats for storing the result. The choice can be specified here.
For details look at the :mod:`~pySPACE.resources.dataset_defs` documentation.
If the format is not specified, the default of the
dataset is used.

(*optional, default: default of dataset*)


Normally each parameter is added in the format
*{__PARAMETER__: value}* added to the *__RESULT_DIRECTORY__*.
Every parameter specified in the list *hide_parameters* is not
added to the folder name. Be careful not to have
the same name for different results.

(*optional, default: []*)


Number of repetitions for each running process.
The run number is given to the process, to use it for *choosing*
random components, especially when using cross-validation.

(*optional, default: 1*)


Overwrite the backend defined in the command line
or, if not specified, the default (serial) backend

(*optional, default: command line or default backend*)


For the operation specific parameters have a look at operation
import glob

import os
import time
import logging
import socket
import sys
import warnings
import yaml
import copy

file_path = os.path.dirname(os.path.abspath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:

import pySPACE
from import create_directory

[docs]class Operation(object): """ Take one summary of data as input and produces a second one as output with a set of *processes*., that can be executed on an arbitrary backend modality """
[docs] def __init__(self, processes, operation_spec, result_directory): self.processes = processes self.operation_spec = operation_spec self.result_directory = result_directory # Check if the required directories exist # and create them if necessary create_directory(self.result_directory) # Store the specification of this operation in the directory # without the base_file entry base_file = self.operation_spec.pop("base_file", None) source_operation_file = open(os.sep.join([self.result_directory, "source_operation.yaml"]), 'w') yaml.dump(self.operation_spec, source_operation_file) source_operation_file.close() if not base_file is None: self.operation_spec["base_file"] = base_file
[docs] def consolidate(self, results): """ Consolidates the results obtained by the single processes of this operation into a consistent structure of collections. """ raise NotImplementedError( "Method consolidate has not been implemented in subclass %s" % self.__class__.__name__)
[docs] def _log(self, message, level=logging.INFO): """ Logs the given message with the given logging level """ logging.getLogger("%s" % self).log(level, message)
[docs] def create(cls, operation_spec, base_result_dir=None): """ A factory method that calls the responsible method for creating an operation of the type specified in the operation specification dictionary (*operation_spec*). """ # Determine result directory result_directory = cls.get_unique_result_dir(base_result_dir) print("--> Results will be stored at: \n\t\t %s"%str(result_directory)) # Check if the required directories exist # and create them if necessary create_directory(result_directory) # Determine all input datasets (note: they can be specified by # extended syntax for the glob package) storage = if not operation_spec.has_key("input_path"): warnings.warn("No input path found in operation specification.") input_path_pattern = os.sep.join([storage, operation_spec.get("input_path", ""), "*", ""]) input_paths = glob.glob(input_path_pattern) obsolete_paths=[] for path in input_paths: if os.path.isfile(os.sep.join([path,"metadata.yaml"])): continue elif os.path.isfile(os.sep.join([path,"collection.yaml"])): continue # warning comes, when data is loaded else: obsolete_paths.append(path) warnings.warn('Folder' + str(path) + ' seems not to be a pySPACE'+ ' dataset (no "metadata.yaml" found)! '+ 'Skipping this folder in operation...') for path in obsolete_paths: input_paths.remove(path) op_type = operation_spec["type"] if op_type.endswith("_operation"): l=len("_operation")*-1 op_type=op_type[:l] operation_spec["type"] = op_type warnings.warn("'%s_operation' has the wrong ending. Using '%s' instead."%(op_type,op_type),DeprecationWarning) op_class_name = ''.join([x.title() for x in op_type.split('_')]) op_class_name += "Operation" # dynamic class import: from data_mod_name import col_class_name try: op_module = __import__('pySPACE.missions.operations.%s' % op_type, fromlist=[op_class_name]) except: msg = "Operation module %s is unknown. Trying to use node_chain." % (op_type) from pySPACE.missions.operations.node_chain import NodeChainOperation op_class = NodeChainOperation else: op_class = getattr(op_module,op_class_name) return op_class.create(operation_spec, result_directory, input_paths=input_paths)
[docs] def get_unique_result_dir(cls, base_result_dir): """ Creates a new result directory with current time stamp """ # Determine the directory in which the operation's result # are stored time_stamp = time.strftime("%Y%m%d_%H_%M_%S") result_directory = os.path.join(base_result_dir, time_stamp) return result_directory
[docs] def _get_parameter_space(cls, operation_spec): """ Determines all parameter combinations that should be tested """ # Crossproduct crossproduct = lambda ss,row=[],level=0: len(ss)>1 \ and reduce(lambda x,y:x+y,[crossproduct(ss[1:],row+[i],level+1) for i in ss[0]]) \ or [row+[i] for i in ss[0]] # Compute all possible parameter combinations # for the node chain template instantiation if "parameter_ranges" in operation_spec: parametrization_def = operation_spec["parameter_ranges"] parameter_ranges = [eval(range_expression[5:-1]) if isinstance(range_expression, basestring) and range_expression.startswith("eval(") else range_expression for range_expression in parametrization_def.values()] parameter_settings = map(lambda x: dict(zip(parametrization_def.keys(), x)), crossproduct(parameter_ranges)) elif "parameter_settings" in operation_spec: # Just use specified parameters parameter_settings = operation_spec["parameter_settings"] else: parameter_settings = [dict()] # Filter the parameter combinations if constraints have been set def check_constraint(constraint, parameters): for key, value in parameters.iteritems(): # TODO Fix string mapping for floats constraint = constraint.replace(key, str(value)) return eval(constraint) if "constraints" in operation_spec: for constraint_def in operation_spec["constraints"]: # for every elem (*x*) in list *parameter_settings* call # lambda-function, if this returns true, elem will stay in the # list, otherwise it is filtered out parameter_settings = filter(lambda x: check_constraint(constraint_def, x), parameter_settings) return parameter_settings
[docs] def get_output_directory(self): """ Returns the directory where the output is stored """ return self.result_directory
[docs] def __repr__(self): """ Return a representation of this class""" return self.__class__.__name__
[docs]class Process(object): """ A process is an atomic task that can be executed on an arbitrary backend modality by executing the __call__ method. """
[docs] def __init__(self): pass
[docs] def prepare(self, configuration, handler_class=None, handler_args=None, backend_com=None): """ This method is called by the respective backend in order to prepare the process for remote execution """ self.configuration = copy.deepcopy(configuration) self.handler_class = handler_class self.handler_args = handler_args self.handler = None self.backend_com = backend_com
[docs] def __repr__(self): """ Return a representation of this class""" return self.__class__.__name__
[docs] def pre_benchmarking(self): """ Execute some code which is not specific for the respective operation but needs to be executed before benchmarking remotely (e.g. set up logging, setting environment variables etc.) """ # Prepare remote logging root_logger = logging.getLogger("%s-%s" % (socket.gethostname(), os.getpid())) root_logger.setLevel(logging.DEBUG) root_logger.propagate = False if self.handler_class != None: self.handler = self.handler_class(**self.handler_args) root_logger.addHandler(self.handler) new_python_path = self.configuration.python_path python_path_set = set(new_python_path) import sys unique_sys_path = [i for i in sys.path if i not in python_path_set and not python_path_set.add(i)] new_python_path.extend(unique_sys_path) sys.path = new_python_path
[docs] def post_benchmarking(self): """ Execute some code which is not specific for the respective operation but needs to be executed after benchmarking remotely to clean up """ # Remove potential logging handlers if self.handler is not None: root_logger = logging.getLogger("%s-%s" % (socket.gethostname(), os.getpid())) self.handler.close() root_logger.removeHandler(self.handler)
[docs] def _log(self, message, level = logging.INFO): """ Log the given message into the logger of this class """ logging.getLogger("%s-%s.%s" % (socket.gethostname(), os.getpid(), self)).log(level, message)
[docs]def create_operation_from_file(operation_filename, base_result_dir = None): """ Creates the operation for the file *operation_filename* Creates the operation for a given operation filename. If *operation_filename* is an absolute path, it is expected to be the path to the YAML specification file of an operation. Alternatively, if *operation_filename* is not an absolute path, the name is used to look up a YAML specification file for the operation in the spec dir. If *base_result_dir* is not None, the results of the operation are written into the specified directory. """ # Load operation from specs directory when not an absolute path if not os.path.isabs(operation_filename): spec_file_name = os.path.join(pySPACE.configuration.spec_dir, "operations", operation_filename) else: spec_file_name = operation_filename print("--> Loading operation: \n\t\t %s."%spec_file_name) try: with open(spec_file_name, "r") as op_file: operation_spec = yaml.load(op_file) with open(spec_file_name, "r") as op_file: operation_spec["base_file"] = except IOError,e: if not spec_file_name.endswith(".yaml"): warnings.warn( "Operation specification not found. Trying with yaml ending.") spec_file_name += ".yaml" with open(spec_file_name, "r") as op_file: operation_spec = yaml.load(op_file) with open(spec_file_name, "r") as op_file: operation_spec["base_file"] = else: raise e storage = if operation_spec.has_key("input_path"): input_path = os.sep.join([storage, operation_spec["input_path"], ""]) print("--> Input data is taken from: \n\t\t %s"%input_path) return create_operation(operation_spec, base_result_dir)
[docs]def create_operation(operation_spec, base_result_dir = None): """ Creates the operation for the given operation spec Simple wrapper to the *create* function of the operation, getting the operation result dir from the configuration file. """ if base_result_dir is None: base_result_dir = os.path.join(, "operation_results") # use the operation factory method to create operation operation = Operation.create(operation_spec, base_result_dir = base_result_dir) return operation