Source code for pySPACE.missions.nodes.meta.parameter_optimization

""" Determine the optimal parameterization of a subflow

.. todo:: documentation: reference to subflow handler
"""

from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.environments.chains.node_chain import NodeChain, NodeChainFactory, SubflowHandler
from pySPACE.tools.filesystem import create_directory

import copy
from numpy import ndarray, array, vstack, identity, ones, inf

import os
import cPickle
import logging


[docs]class ParameterOptimizationBase(BaseNode): """ Base class for parameter optimization nodes The overall goal is to determine the parameterization of a subflow that maximizes a given metric. Nodes derived from this class can determine the optimal parameterization (*variables*) of a subpart of a flow (*nodes*) fully autonomously. For instance, for different numbers of features retained during feature selection, different complexities/regularization coefficients might be optimal for a classifier. In order to determine which feature-number is optimal, one must choose classifier' complexity separately for each feature number. First of all the training data is split into training and validation data e.g. by cross validation (*validation_set*). Then for a chosen set of parameters the data is processed as described in the specification (*nodes*). Then the classifier is evaluated (*evaluation*) using another node, some combination of *metrics* or something else as for example a derivative in future implementations. So the nodes specification should always include a classifier which is evaluated. This procedure is maybe repeated until a good set of parameters is found. The algorithm, which defines the way of choosing parameters should determine the node name (e.g. PatternSearchNode) and parameters specifically for the optimization procedure can be passed via the *optimization* spec. Also, general functions, especially the parts belonging to the evaluation of a parameter are provided in this base class (e.g., function mapping parameter dictionaries to keys and a function to create a grid from given parameters) When a good parameter is found, the nodes are trained with this parameter on the whole data set. .. note:: In future, alternatives can be added, e.g. to combine the different flows of the cross validation with a simple ensemble classifier. .. note:: If you want to optimize parameters for each sub-split, this should not be done in this node. **Parameters** It is important to mention, that the definition of parameters of this node is structured into main parameters and sub parameters to describe the different aspects of parameter optimization. So take care of indentations. :optimization: As mentioned above this parameter dictionary is used by the specific subclasses, i.e., by the specific optimization algorithms. Hence, see subclasses for documentation of possible parameters. :parallelization: This parameter dictionary is used for parallelization of subflow execution. Possible parameters so far are *processing_modality*, *pool_size* and *batch_size*. See :class:`~pySPACE.environments.chains.node_chain.SubflowHandler` for more information. :validation_set: :splits: The number of splits used in an internal cross-validation loop. Note that more splits lead to better estimates of the parametrization's performance but do also increase computation time considerably. (*recommended, default: 5*) :split_node: If no standard CV-splitter with "cv_splits" splits is used, an alternative node is specified by this node in YAML node syntax. (*optional, default: CV_Splitter*) :runs: Number of internal runs used for evaluation. Nodes as the CV_Splitter behave different with every run. So we repeat the calculation *runs* times and have a different randomizer each time. The random seed used in each repetition is: 10 * external_run_number + internal_run_number The resulting performance measure is calculated with the performances of different (internal) runs and splits. (average - w * standard deviation) (*optional, default: 1*) :randomize: Changing cv-splitter with every parameter evaluation step. .. note:: Not yet implemented :evaluation: Specification of the sink node and the corresponding evaluation function :performance_sink_node: Specify a different sink node in YAML node syntax. Otherwise the default 'Classification_Performance_Sink' will be used with the following parameter *ir_class*. (*optional, default: Classification_Performance_Sink*) :ir_class: The class name (as string) for which IR statistics are computed. (*recommended, default: 'Target'*) :metric: This is the metric that should be maximized. Each :ref:`metric <metrics>` which is computed by the *performance_sink_node* can be used, for instance "Balanced_accuracy", "AUC", "F_measure" or even soft metrics or loss metrics. (*recommended, default: 'Balanced_accuracy'*) :inverse_metric: If True, metric values are multiplied by -1. In this way, loss metrics can be used for parameter optimization. (*optional, boolean, default: False*) :std_weight: Cross validation gives several values for the estimated performance. Therefore we use the difference expected value minus std_weight times standard deviation. (*optional, default: 0*) :variables: List of the parameters, to be optimized and replaced in the node_spec. :nodes: The original specification of the nodes that should be optimized. The value of the "nodes" parameter must be a standard NodeChain definition in YAML syntax (properly indented). :nominal_ranges: Similar to the ranges in the grid search, a grid can be specified mainly for nominal parameters. All the other parameters are then optimized dependent on these parameters. Afterwards the resulting performance value is compared to choose the best nominal parameter. When storing the results each nominal parameter is stored (*optional, default: None*) :debug: Switch on eventually existing debug messages (*optional, default: False*) :validation_parameter_settings: Dictionary, of parameter mappings to be replaced in the nodes in the validation phase of the parameter optimization. This works together with the *final_training_parameter_settings*. (*optional, default: dict()*) :final_training_parameter_settings: Dictionary, of parameter mappings to be replaced in the nodes after the validation phase of the parameter optimization in the final training phase. This works together with the *validation_parameter_settings*. A very important use case of these parameters is to switch of *retrain* mode in validation phase but nevertheless have it active in the final subflow or node. (*optional, default: dict()*) :Author: Mario Krell (mario.krell@dfki.de) :Created: 2011/08/03 :LastChange: 2012/09/03 Anett Seeland - new structure for parallelization """
[docs] def __init__(self, flow_template, variables=[], metric='Balanced_accuracy', std_weight=0, inverse_metric=False, runs=1, nominal_ranges=None, debug=False, validation_parameter_settings={}, final_training_parameter_settings={}, **kwargs): super(ParameterOptimizationBase, self).__init__(**kwargs) self.set_permanent_attributes( flow_template=flow_template, variables=variables, metric=metric, w=std_weight, inverse_metric=-1 if inverse_metric else 1, runs=runs, nom_rng=nominal_ranges, debug=debug, flow=None, train_instances=None, performance_dict={}, search_history=[], validation_parameter_settings=validation_parameter_settings, final_training_parameter_settings=final_training_parameter_settings, classifier_information=None)
[docs] def is_trainable(self): """ Return whether this node is trainable """ return True
[docs] def is_supervised(self): """ Return whether this node requires supervised training """ return True
@staticmethod
[docs] def check_parameters(param_spec): """ Check input parameters of existence and appropriateness """ assert("nodes" in param_spec and "optimization" in param_spec), \ "Parameter Optimization node requires specification of a " \ "list of nodes and optimization algorithm!" validation_set = param_spec.pop("validation_set", {}) validation_set["splits"] = validation_set.get("splits",5) validation_set["split_node"] = validation_set.get("split_node", {'node': 'CV_Splitter', 'parameters': {'splits': validation_set["splits"]}}) evaluation = param_spec.pop("evaluation", {}) evaluation["ir_class"] = evaluation.get("ir_class", "Target") evaluation["performance_sink_node"] = \ evaluation.get("performance_sink_node", {'node': 'Classification_Performance_Sink', 'parameters': {'ir_class': evaluation["ir_class"]}}) # build flow template nodes_spec = param_spec.pop("nodes") flow_template = [{'node': 'External_Generator_Source_Node'}, validation_set["split_node"]] flow_template.extend(nodes_spec) flow_template.append(evaluation["performance_sink_node"]) # Evaluate all remaining parameters BaseNode.eval_dict(param_spec) # params with defaults in __init__ have to be added to param_spec dict if validation_set.has_key("runs"): param_spec["runs"] = validation_set["runs"] if evaluation.has_key("metric"): param_spec["metric"] = evaluation["metric"] if evaluation.has_key("inverse_metric"): param_spec["inverse_metric"] = evaluation["inverse_metric"] if evaluation.has_key("std_weight"): param_spec["std_weight"] = evaluation['std_weight'] return param_spec, flow_template
[docs] def _train(self, data, label): """ Train the flow on the given data vector *data* """ # Remember the data, the actual training is done when all data is known if self.train_instances is None: self.train_instances = [] self.train_instances.append((data, label))
[docs] def _stop_training(self): """ Do the optimization step and define final parameter choice This is the main method of this node! .. todo:: Allow also parallelization over nominal_ranges! """ self._log("Starting optimization Process.") self.runs = [10 * self.run_number + run for run in range(self.runs)] original_flow_template = copy.copy(self.flow_template) # Fill in validation parameters in the template self.flow_template = NodeChainFactory.replace_parameters_in_node_chain( original_flow_template, self.validation_parameter_settings) if self.nom_rng is None: self.prepare_optimization() self.best_parametrization, self.best_performance = \ self.get_best_parametrization() self.performance_dict[self.p2key(self.best_parametrization)] = \ (self.best_performance, self.best_parametrization) else: nom_grid = self.search_grid(self.nom_rng) iterations = 0 search_history = [] # copy flow_template since we have to instantiate for every nom_par flow_template = copy.copy(self.flow_template) for nom_par in nom_grid: # for getting the best parameterization, # the class attribute flow_template must be overwritten self.flow_template = \ NodeChainFactory.replace_parameters_in_node_chain( flow_template, nom_par) self.prepare_optimization() parametrization, performance = self.get_best_parametrization() self.performance_dict[self.p2key(nom_par)] = (performance, parametrization) iterations += self.iterations search_history.append((nom_par,self.search_history)) # reinitialize optimization parameters self.re_init() # reconstructing the overwritten flow for further usage self.flow_template = flow_template self.iterations = iterations self.search_history = sorted(search_history, key=lambda t: t[1][-1]["best_performance"]) best_key = max(sorted(self.performance_dict.items()), key=lambda t: t[1])[0] self.best_performance, self.best_parametrization = \ self.performance_dict[best_key] self.best_parametrization.update(dict(best_key)) # when best parameter dict is calculated, this has to be logged # or saved and the chosen parameter is used for training on the # whole data set, independent of the chosen algorithm self._log("Using parameterization %s with optimal performance %s for " \ "metric %s." % (self.best_parametrization, self.best_performance, self.metric)) # Fill in the final parameters in the flow template self.flow_template = NodeChainFactory.replace_parameters_in_node_chain( original_flow_template, self.final_training_parameter_settings) best_flow_template = self.flow_template best_flow_template[1] = {'node': 'All_Train_Splitter'} #delete last node best_flow_template.pop(-1) self.flow = self.generate_subflow(best_flow_template, self.best_parametrization, NodeChain) self.flow[-1].set_run_number(self.run_number) self.flow[0].set_generator(self.train_instances) self.flow.train() self._log("Training of optimal flow finished") # delete training instances that would be stored to disk if this node # is saved del self.train_instances
[docs] def _execute(self, data): """ Execute the flow on the given data vector *data* This method is used in offline mode and for delivering the training data for the next node. In the other case, *request_data_for_testing* is used. """ if not self.classifier_information is None: # Delegate to internal flow object return self._get_flow().execute(data) else: result = self._get_flow().execute(data) # forward important node information via. classifier information try: self.classifier_information = \ result.predictor.classifier_information except: result.predictor.classifier_information = dict() self.classifier_information = \ result.predictor.classifier_information for key,value in self.best_parametrization.items(): self.classifier_information[key] = value self.classifier_information["~~Pon_Iterations~~"] = self.iterations self.classifier_information["~~Pon_value~~"] = self.best_performance return result
[docs] def get_output_type(self, input_type, as_string=True): """ Returns the output type of the entire flow""" return self.flow.get_output_type(input_type, as_string)
[docs] def _get_flow(self): """ Method introduced for consistency with flow_node This node itself is no real flow_node, since the final flow is unknown during initialization, but specified during the optimization process. """ return self.flow
[docs] def _inc_train(self, data, class_label=None): """ Iterate through the nodes to train them The optimal parameter remains fixed and then the nodes in the optimal flow get the incremental training. Here it is important to know, that *first* the node is changed and then the changed data is forwarded to the *next* node. This is different to the normal offline retraining scheme. """ self._get_flow()._inc_train(data, class_label)
[docs] def is_retrainable(self): """ Retraining if one node in subflow is retrainable """ if self.is_retrainable: return True else: for node in self._get_flow(): if node.is_retrainable(): return True return False
[docs] def present_label(self, label): """ Forward the label to the subflow *buffering* must be set to *True* only for the main node for incremental learning in application (live environment). The inner nodes must not have set this parameter. .. todo:: Implement check on flow, if this the inner nodes do not buffer. """ super(ParameterOptimizationBase, self).present_label(label)
[docs] def store_state(self, result_dir, index=None): """ Store this node in the given directory *result_dir* """ # ..todo :: mapping of flow_id and parameterization?! if self.store: for node in self.flow: node.store_state(result_dir, index) class_dir = os.path.join(result_dir, self.__class__.__name__) create_directory(class_dir) # Store the search history name = "search_history_sp%d.pickle" % self.current_split result_file = open(os.path.join(class_dir, name), "wb") result_file.write(cPickle.dumps(self.search_history, protocol=cPickle.HIGHEST_PROTOCOL)) result_file.close()
[docs] def get_sensor_ranking(self): """ Get the sensor ranking from the optimized trained flow """ # The last node is the irrelevant 'sink node'. We need the previous one. return self._get_flow()[-1].get_sensor_ranking()
[docs] def get_previous_transformations(self, sample=None): """ Recursively construct a list of (linear) transformations Overwrites BaseNode function, since meta node needs to get transformations from subflow. """ transformations = self.input_node.get_previous_transformations(sample) own_transformations = self.flow[-1].get_previous_transformations(sample) transformations.extend(own_transformations) return transformations
[docs] def re_init(self): """ Reset optimization params Subclasses can overwrite this method if necessary, e.g. in case some parameters have to be reinitialized if several optimizations are done """ # handles nominal_ranges case pass
[docs] def prepare_optimization(self): """ Initialize optimization procedure Subclasses can overwrite this method if necessary. """ pass
[docs] def get_best_parametrization(self): """ Apply optimization algorithm This method has to be implemented in the subclass. """ raise NotImplementedError("Method get_best_parametrization has not " \ "been implemented in subclass %s" % self.__class__.__name__)
[docs] def get_best_dict_entry(self, performance_dict): """ Find the highest performance value in the dictionary """ # get best performance value performance = max(performance_dict.items(), key=lambda t: t[1])[1] # get corresponding parameters # sorted is used here to have no randomness in the list best_parametrizations = [dict(par) for par,p in \ sorted(performance_dict.items()) if p == performance] return best_parametrizations[0], performance
@staticmethod
[docs] def search_grid(parameter_ranges): """ Combine each parameter in *parameter ranges* to a grid via cross product """ # define cross product function crossproduct = lambda ss,row=[],level=0: len(ss)>1 \ and reduce(lambda x,y:x+y,[crossproduct(ss[1:],row+[i],level+1) for i in ss[0]]) \ or [row+[i] for i in ss[0]] # Generate grid of parameterization that should be analyzed if isinstance(parameter_ranges, basestring): parameter_ranges = eval(parameter_ranges) for key, value in parameter_ranges.iteritems(): if isinstance(value, basestring) and value.startswith("eval("): parameter_ranges[key] = eval(value[5:-1]) grid = map(lambda x: dict(zip(parameter_ranges.keys(), x)), crossproduct(parameter_ranges.values())) return grid
@staticmethod
[docs] def p2key(parameters): """ Map parameter dictionary to hashable tuple (key for dictionary) """ return tuple(sorted(parameters.items()))
[docs]class GridSearchNode(ParameterOptimizationBase, SubflowHandler): """ Grid search for optimizing the parameterization of a subflow For each parameter a list of parameters is specified (*ranges*). The crossproduct of all values in *ranges* is computed and a subflow is evaluated for each of this parameterizations using cross-validation on the training data and finally the best point in the search grid is chosen as optimal point. **Parameters** This algorithms does not need the *variables* parameter, since it is also included in the ranges parameter. :ranges: A dictionary mapping parameters to the values they should be tested for. If more than one parameter is given, the crossproduct of all parameter values is computed (i.e. each combination). For each resulting parameter combination, the flow specified in the YAML syntax is evaluated. The parameter names should be used somewhere in this YAML definition and should be unique since the instantiation is based on pure textual replacement. It is common to enforce this by starting and ending the parameter names by "~~". In the example below, the two parameters are called "~~OUTLIERS~~" and "~~COMPLEXITY~~", each having 3 values. This results in 9 parameter combinations to be tested. :grid: From the ranges parameter a grid is generated. As a replacement, the grid can be specified directly using this parameter. Therefore, a list of dictionaries is used. For example a ranges parametrization like '{~~OUTLIERS~~ : [0, 5, 10], ~~COMPLEXITY~~ : [0.01, 0.1, 1.0]}' could be transferred to: .. code-block:: yaml - ~~OUTLIERS~~ : 0 ~~COMPLEXITY~~ : 0.01 - ~~OUTLIERS~~ : 0 ~~COMPLEXITY~~ : 0.1 - ~~OUTLIERS~~ : 0 ~~COMPLEXITY~~ : 1 - ~~OUTLIERS~~ : 5 ~~COMPLEXITY~~ : 0.01 - ~~OUTLIERS~~ : 5 ~~COMPLEXITY~~ : 0.1 - ~~OUTLIERS~~ : 5 ~~COMPLEXITY~~ : 1 - ~~OUTLIERS~~ : 10 ~~COMPLEXITY~~ : 0.01 - ~~OUTLIERS~~ : 10 ~~COMPLEXITY~~ : 0.1 - ~~OUTLIERS~~ : 10 ~~COMPLEXITY~~ : 1 **Exemplary Call** .. code-block:: yaml - node : Grid_Search parameters : optimization: ranges : {~~OUTLIERS~~ : [0, 5, 10], ~~COMPLEXITY~~ : [0.01, 0.1, 1.0]} parallelization: processing_modality : 'backend' pool_size : 2 validation_set : split_node : node : CV_Splitter parameters : splits : 10 stratified : True time_dependent : True evaluation: metric : "Balanced_accuracy" std_weight: 1 performance_sink_node : node : Sliding_Window_Performance_Sink parameters : ir_class : "Movement" classes_names : ['NoMovement','Movement'] uncertain_area : 'eval([(-600,-350)])' calc_soft_metrics : True save_score_plot : True variables: [~~OUTLIERS~~, ~~COMPLEXITY~~] nodes : - node : Feature_Normalization parameters : outlier_percentage : ~~OUTLIERS~~ - node: LibSVM_Classifier parameters : complexity : ~~COMPLEXITY~~ class_labels : ['NoMovement', 'Movement'] weight : [1.0, 2.0] kernel_type : 'LINEAR' """
[docs] def __init__(self, ranges=None, grid=None, *args, **kwargs): ParameterOptimizationBase.__init__(self, *args, **kwargs) # extract parallelization dict for subflow handler SubflowHandler.__init__(self, **kwargs.get('parallelization',{})) self.set_permanent_attributes( grid=self.search_grid(ranges) if ranges is not None else grid)
@staticmethod
[docs] def node_from_yaml(node_spec): """ Create the node based on the node_spec """ node_spec = copy.deepcopy(node_spec) # call parent class method for most of the work node_spec["parameters"], flow_template = \ ParameterOptimizationBase.check_parameters(node_spec["parameters"]) # check grid search specific params optimization = node_spec["parameters"].pop("optimization") assert("ranges" in optimization or "grid" in optimization), \ "Grid Search needs *ranges* or *grid* parameter" BaseNode.eval_dict(optimization) node_obj = GridSearchNode(ranges=optimization.get("ranges", None), grid=optimization.get("grid", None), flow_template=flow_template, **node_spec["parameters"]) return node_obj
[docs] def get_best_parametrization(self): """ Evaluate each flow-parameterization by running a cross validation on the training data for grid search """ performance_dict = {} # create subflows subflows = [self.generate_subflow(self.flow_template, grid_node) for grid_node in self.grid] # execute subflows result_collections = self.execute_subflows(self.train_instances, subflows, self.runs) for grid_node, result in zip(self.grid, result_collections): key = self.p2key(grid_node) performance = self.inverse_metric * (result.get_average_performance( self.metric) - self.w * result.get_performance_std(self.metric)) performance_dict[key] = performance del subflows, result_collections # Determine the flow-parameterization that performed optimal with regard # to the specified metric on the grid best_parametrization, performance = \ self.get_best_dict_entry(performance_dict) self.iterations = len(self.grid) self.search_history=[{"best_parameter":best_parametrization, "best_performance":performance, "performance_dict":performance_dict, "iterations":self.iterations}] return best_parametrization, performance
[docs]class PatternSearchNode(ParameterOptimizationBase, SubflowHandler): """ Extension of the standard Pattern Search algorithm For main principle see: Numerical Optimization, Jorge Nocedal & Stephen J. Wright **Special Components** - No double calculation of already visited points - Step size increased, when better point is found, to speed up search - Possible limit on iteration steps to be comparable to grid search - cross validation cycle inside **Parameters** The following parameters have to be specified in the optimization spec. For the Algorithm the thereof variables parameter is important since it gives the order of parameters to simplify the specification of vectors, corresponding to point, directions or bounds. They can be specified as dictionaries, lists or tuple and they are transformed internally to array with the method *get_vector*. The transformation back to keys for filling them in in the node chains is done by *v2d*. :start: Starting point of the algorithm. For SVM optimization, the complexity has to be sufficiently small. (*recommended, default: ones(dimension)*) :directions: List of directions, being evaluated around current best point. (*optional, default: unit directions*) :start_step_size: First value to scale the direction vectors (*optional, default: 1.0*) :stop_step_size: Minimal value to scale the direction vectors If the step size gets lower, the algorithm stops. (*optional, default: 1e-10*) :scaling_factor: When evaluations does not deliver a better point, the current scaling of the directions is reduced by the scaling factor. Otherwise it is increased by the `up_scaling_factor`. (*optional, default: 0.5*) :up_scaling_factor: If the evaluation gives a better point, the step size is increased by this factor. In the default, there is no up scaling. (*optional, default: 1*) :max_iter: If the total number of evaluation of the directions exceeds this value, the algorithm also stops. (*optional, default: infinity*) :max_bound: Points exceeding this bounds are not evaluated. (*optional, default: inf*array(ones(dimension))*) :min_bound: Undershooting points are not evaluated. (*optional, default: -inf*array(ones(dimension))*) .. todo:: Evaluate if up_scaling makes sense and should be used here **Exemplary Call** .. code-block:: yaml - node : Pattern_Search parameters : parallelization : processing_modality : 'local' pool_size : 4 optimization: start_step_size : 0.002 start : [0.005,0.01] directions : [[-1,-1],[1,1],[1,-1],[-1,1]] stop_step_size : 0.00001 scaling_factor : 0.25 min_bound : [0,0] max_bound : [10,10] max_iter : 100 validation_set : split_node : node : CV_Splitter parameters : splits : 5 runs : 2 evaluation: metric : "Balanced_accuracy" std_weight: 1 ir_class : "Target" variables: [~~W1~~, ~~W2~~] nodes : - node: LibSVM_Classifier parameters : complexity : 1 class_labels : ['Standard', 'Target'] weight : [~~W1~~, ~~W2~~] kernel_type : 'LINEAR' """
[docs] def __init__(self, start=[], directions=[], start_step_size=1.0, stop_step_size=1e-10, scaling_factor=0.5, up_scaling_factor=1, max_iter=inf, max_bound=[], min_bound=[], red_pars=[], **kwargs): ParameterOptimizationBase.__init__(self, **kwargs) # extract parallelization dict for subflow handler SubflowHandler.__init__(self, **kwargs.get('parallelization',{})) dim = len(self.variables) if start != []: x_opt = self.get_vector(start) else: x_opt = ones(dim) self._log("No starting vector given. Vector of all ones taken.", level = logging.CRITICAL) if directions == []: directions = list(vstack((identity(dim),-identity(dim)))) self._log("No search directions given! Using unit directions.", level = logging.WARNING) directions = [tuple(d) for d in directions] # delete duplicates directions = list(set(directions)) max_bound = self.get_vector(max_bound) if max_bound != [] else inf*ones(dim) min_bound = self.get_vector(min_bound) if min_bound != [] else -inf*ones(dim) assert((x_opt < max_bound).all() and (x_opt > min_bound).all()), \ "Starting point is not valid!" # copy all params which will be changed during processing init_params = {"x_opt": x_opt, "directions": directions, "step_size": start_step_size} self.set_permanent_attributes(x_opt = x_opt, directions = directions, step_size = start_step_size, stop_step_size = stop_step_size, scaling_factor = scaling_factor, up_scaling_factor = up_scaling_factor, max_iter = max_iter, min_bound = min_bound, max_bound = max_bound, init_params = init_params)
@staticmethod
[docs] def node_from_yaml(node_spec): """ Create the node based on the node_spec """ node_spec = copy.deepcopy(node_spec) # call parent class method for most of the work node_spec["parameters"], flow_template = \ ParameterOptimizationBase.check_parameters(node_spec["parameters"]) if node_spec["parameters"].has_key("optimization"): BaseNode.eval_dict(node_spec["parameters"]["optimization"]) # since pattern search specific params are all optional, add them to # **kwargs and let the __init__ do the default assignments for key, value in node_spec["parameters"].pop("optimization").iteritems(): node_spec["parameters"][key] = value node_obj = PatternSearchNode(flow_template=flow_template, **node_spec["parameters"]) return node_obj
[docs] def re_init(self): """ Reset search for optimum """ for key, value in self.init_params.iteritems(): setattr(self, key, value)
[docs] def prepare_optimization(self): """ Calculate initial performance value """ # we need a subflow subflow = self.generate_subflow(self.flow_template,self.v2d(self.x_opt)) # parameter_key for later evaluation x_opt_key = self.p2key(self.v2d(self.x_opt)) # run the flow result_collections = self.execute_subflows(self.train_instances, [subflow], self.runs) f_max = self.inverse_metric * \ (result_collections[0].get_average_performance(self.metric) \ - self.w * result_collections[0].get_performance_std( self.metric)) self.set_permanent_attributes(f_max = f_max, t_performance_dict = {x_opt_key: f_max})
[docs] def get_vector(self,v_spec): """Transform list, tuple or dict to an array/vector""" if type(v_spec) == list: return array(v_spec) elif type(v_spec) == tuple: return array(v_spec) elif type(v_spec) == dict: assert(sorted(v_spec.keys())==sorted(self.variables)), \ "Dictionary %s is no vector!" % v_spec new_v =[] for key in self.variables: new_v.append(v_spec[key]) return array(new_v) elif type(v_spec) == ndarray: pass else: raise Exception("Could not convert %s to array." % str(v_spec))
[docs] def v2d(self,v): """ Transform vector to dictionary using self.variables """ if type(v)==dict: import warnings warnings.warn("Type conversion error in v2d. Got dictionary as input") return v else: return dict([(self.variables[i],v[i]) for i in range(len(self.variables))])
[docs] def get_best_parametrization(self): """ Perform pattern search Evaluate set of directions around current best solution. If better solution is found start new evaluation of directions. Otherwise, reduce length of directions by scaling factor """ self.iterations = 1 self.search_history=[{"best_parameter":self.x_opt, "best_performance":self.f_max, "performance_dict":dict(), "step_size":self.step_size, "iterations":self.iterations}] while self.step_size >= self.stop_step_size and \ (self.iterations<self.max_iter or self.max_iter==-1): self.ps_step() return self.v2d(self.x_opt), self.f_max
[docs] def ps_step(self): """ Single descent step of the pattern search algorithm """ grid = [array(array(d)*self.step_size+self.x_opt,dtype='float64') \ for d in self.directions \ if (array(array(d)*self.step_size+self.x_opt,dtype='float64') \ < self.max_bound).all() \ and (array(array(d)*self.step_size+self.x_opt,dtype='float64') \ > self.min_bound).all()] performance_dict = {} if self.debug: print "########################################################" print "number of total iterations:" print self.iterations print "current center:" print self.x_opt print "current function value:" print self.f_max print "current step size:" print self.step_size print "current search grid:" print grid chec_p = len(grid) red_grid =[] for grid_node in grid: grid_key = self.p2key(self.v2d(grid_node)) if self.t_performance_dict.has_key(grid_key): performance_dict[grid_key] = self.t_performance_dict[grid_key] else: red_grid.append(self.v2d(grid_node)) # each flow has to be evaluated, so we don't need more flows # than possible iterations self.iterations += 1 if self.iterations == self.max_iter: break del(grid) # create subflows subflows = [self.generate_subflow(self.flow_template, grid_node) for \ grid_node in red_grid] # execute subflows result_collections = self.execute_subflows(self.train_instances, subflows, self.runs) for grid_node, result in zip(red_grid, result_collections): key = self.p2key(grid_node) performance = self.inverse_metric * (result.get_average_performance( self.metric) - self.w * result.get_performance_std( self.metric)) self.t_performance_dict[key] = performance performance_dict[key] = performance del subflows, result_collections if self.debug: print "results of grid evaluation" print performance_dict if not(self.iterations==self.max_iter): assert(chec_p==len(performance_dict)), \ "Entries missing in performance dict!" # Determine the flow-parameterization that performed optimal with regard # to the specified metric on the grid if not(len(performance_dict)==0): # max sorted is there to get the same parameter best_parametrization, performance = \ self.get_best_dict_entry(performance_dict) else: performance = -inf best_parametrization = "None" # the following ordering of comparison is necessary, # a performance result may be nan. old_step_size = self.step_size if performance>self.f_max: self.x_opt = self.get_vector(best_parametrization) self.f_max = performance # increasing step size self.step_size=self.step_size*self.up_scaling_factor elif performance == self.f_max: self.x_opt = self.get_vector(best_parametrization) else: # new scaling factor if we don't get an improvement self.step_size = self.step_size*self.scaling_factor # Search documentation - TODO: maybe only log relevant steps? self.search_history.append({"best_parameter": self.x_opt, "best_performance": self.f_max, "performance_dict": performance_dict, "step_size": old_step_size, "iterations": self.iterations})
# Specify special node names _NODE_MAPPING = {"Grid_Search": GridSearchNode, "Pattern_Search": PatternSearchNode}