Source code for pySPACE.missions.nodes.postprocessing.threshold_optimization

""" Optimize classification thresholds """

import logging
from operator import itemgetter
from bisect import insort

import scipy
import numpy

import copy

from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.missions.nodes.decorators import BooleanParameter
from pySPACE.resources.data_types.prediction_vector import PredictionVector
from pySPACE.resources.dataset_defs.metric import BinaryClassificationDataset as BinaryClassificationDataset

@BooleanParameter("preserve_score")
@BooleanParameter("recalibrate")
[docs]class ThresholdOptimizationNode(BaseNode): """ Optimize the classification threshold for a specified metric This node changes the classification threshold (i.e. the mapping from real valued classifier prediction onto class label) by choosing a threshold that is optimal for a given metric on the training data. This may be useful in situations when a classifier tries to optimize a different metric than the one one is interested. However, it is always preferable to use a classifier that optimizes for the right target metric since this node can only correct the threshold but not the hyperplane. If store is set to true, a graphic is stored in the persistency directory that shows the mapping of threshold onto F-Measure on training and test data. **Parameters** :metric: A string that determines the metric for which the threshold is optimized. The string must be a valid Python expression that evaluates to a float. Within this string, the quantities {TP} (true positive), {FP} (false positives), {TN} (true negatives), and {FN} (false negatives) can be used to compute the the metric. For instance, the string "({TP}+{TN})/({TP}+{TN}+{FP}+{FN})" would correspond to the accuracy. Some standard metrics (F-Measure, Accuracy) are predefined, i.e. it suffices to give the names of these metrics as parameter, the corresponding Python expression is determined automatically. For details and inspiration have a look at :ref:`metric <metrics>` in the :class:`~pySPACE.resources.dataset_defs.metric.BinaryClassificationDataset`. .. warning:: If your metric is not existing, the algorithm will get zero instead and will get problems optimizing. This is due to the fact, that default values for metrics are zero. (*optional, default: "Balanced_accuracy"*) :class_labels: Determines the order of classes, i.e. the mapping of class labels onto integers. The first element of the list will be mapped onto 0, the second onto 1. (*recommended, default: ['Standard', 'Target']*) :preserve_score: If True, only the class labels are changed according to the new threshold. If False, the classifier prediction score is also adjusted by adding the new threshold, i.e. .. math:: score_{new} = score_{old} - (threshold_{new} - threshold_{old}) (*optional, default: False*) :classifier_threshold: Old decision threshold of the classifier. For SVMs this is zero. For bayesian classifier or after probability fits this is 0.5. (*optional, default: 0.0*) :recalibrate: If the distribution in the incremental learning is expected to be significantly different from the training session, a new threshold is calculated using only the new examples and not considering the old ones. If the parameter is active, *retrain* is also active! (*optional, default: False*) :weight: Parameter for weighted metrics If you want to use it, have a look at :ref:`metric <metrics>` and the :mod:`pySPACE.missions.nodes.sink.classification_performance_sink.PerformanceSinkNode` (*optional, default: 0.5*) :inverse_metric: For some metrics one has to optimize for a low value and not a high. This is done by multiplication with -1 in the formula or by setting this parameter to True, if you use some predefined metrics, which requires minimization. **Exemplary Call** .. code-block:: yaml - node : Threshold_Optimization parameters : metric : "-{FP} - 5*{FN}" class_labels : ['Standard', 'Target'] :Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de) :Created: 2010/11/25 """ input_types=["PredictionVector"]
[docs] def __init__(self, metric="Balanced_accuracy", class_labels=None, preserve_score = False, classifier_threshold = 0.0, recalibrate = False, weight = 0.5, inverse_metric=False, **kwargs): super(ThresholdOptimizationNode, self).__init__(**kwargs) if metric.startswith("k_"): metric=metric[2:] self._log(message="Soft metrics are not supported by this node! Switching to hard variant.", level=logging.CRITICAL) if metric.startswith("soft_"): metric=metric[5:] self._log(message="Soft metrics are not supported by this node! Switching to hard variant.", level=logging.CRITICAL) if metric.startswith("pol_"): metric=metric[4:] self._log(message="Soft metrics are not supported by this node! Switching to hard variant.", level=logging.CRITICAL) if metric == "AUC": metric = "Balanced_accuracy" self._log(message="AUC is no relevant metric for this node! Balanced_accuracy taken.", level=logging.CRITICAL) # Some hard coded standard metrics if metric == "F_measure": metric = "0 if {TP} == 0 else 2*{TP}**2/(2*{TP}**2 + {TP}*{FP} + {TP}*{FN})" elif metric == "F_measure_standard": metric = "0 if {TN} == 0 else 2*{TN}**2/(2*{TN}**2 + {TN}*{FN} + {TN}*{FP})" elif metric == "Accuracy": metric = "({TP}+{TN})/({TP}+{TN}+{FP}+{FN})" elif metric == "Balanced_accuracy": #metric = "(0.5*{TP}/({TP}+{FN}) + 0.5*{TN}/({TN}+{FP}))" pass if recalibrate: self.retrainable = True self.set_permanent_attributes(metric=metric, metric_fct=None, classes=class_labels, preserve_score=preserve_score, classifier_threshold=classifier_threshold, weight=weight, recalibrate=recalibrate, orientation_up=True, threshold=0, instances=[], # list sorted by prediction score example=None, # classification vector input example classifier_information={}, # information from the example+own classification information inverse_metric=inverse_metric)
[docs] def balanced_accuracy(self,TP, FP, TN, FN): if (TP+FN) == 0 or (TN+FP) == 0: return 0.5 return (0.5*TP/(TP+FN) + 0.5*TN/(TN+FP))
[docs] def is_trainable(self): """ Returns whether this node is trainable """ return True
[docs] def is_supervised(self): """ Returns whether this node requires supervised training """ return True
[docs] def _train(self, data, class_label): """ Collect training data and class labels """ if self.classes is None: try: self.set_permanent_attributes(classes=data.predictor.classes) except: self.set_permanent_attributes(classes=['Standard', 'Target']) self._log("No class labels given. Using default: ['Standard', 'Target'].\ If you get errors, this was the wrong choice.",level=logging.CRITICAL) if type(data.label).__name__ == 'str': prediction_label = self.classes.index(data.label) elif type(data.label).__name__ == 'list' and len(data.label) == 1: prediction_label = self.classes.index(data.label[0]) else: raise Exception("The ThresholdOptimizationNode can only handle a " "string or a list with a string as its only element " "as input. Got: %s with type: %s"%(str(data.label),type(data.label))) if not class_label in self.classes and "REST" in self.classes: class_label = "REST" # Insert new (score, predicted_label, actual_label) tuple into list of # instances that is sorted by ascending prediction score insort(self.instances, (data.prediction, prediction_label, self.classes.index(class_label))) # copying of important classifier parameters to give it to the sink node if self.example is None: self.example = data try: self.classifier_information=copy.deepcopy(data.predictor.classifier_information) except: pass
[docs] def _stop_training(self, debug=False): """ Call the optimization algorithm """ self.calculate_threshold()
[docs] def calculate_threshold(self): """ Optimize the threshold for the given scores, labels and metric. .. note:: This method requires O(n) time (n being the number of training instances). There should be an asymptotically more efficient implementation that is better suited for fast incremental learning. """ # Create metric function lazily since it cannot be pickled if not hasattr(self,"metric_fct") or self.metric_fct is None: self.metric_fct = self._get_metric_fct() # Split 3-tuples in instance heap into the three components # predictions, predicted labels, and actual label predictions = map(itemgetter(0), self.instances) prediction_labels = map(itemgetter(1), self.instances) labels = map(itemgetter(2), self.instances) if prediction_labels[0] == 0: self.orientation_up = True else: self.orientation_up = False # Determine orientation of hyperplane if self.orientation_up: TP = labels.count(1) FP = labels.count(0) TN = 0 FN = 0 else: TP = 0 FP = 0 TN = labels.count(0) FN = labels.count(1) if self.store: self.predictions_train = [[], []] # Determine the threshold for which the given metric is maximized metric_values = [] for label, prediction_value, in zip (labels, predictions): if label == 0 and self.orientation_up: TN += 1 FP -= 1 elif label == 0 and not self.orientation_up: TN -= 1 FP += 1 elif label == 1 and self.orientation_up: FN += 1 TP -= 1 elif label == 1 and not self.orientation_up: FN -= 1 TP += 1 assert (TP >= 0 and FP >= 0 and TN >= 0 and FN >=0), \ "TP: %s FP: %s TN: %s FN: %s" % (TP, FP, TN, FN) metric_values.append(self.metric_fct(TP, FP, TN, FN)) if self.store: self.predictions_train[0].append(prediction_value) self.predictions_train[1].append(metric_values[-1]) # Fit a polynomial of degree 2 to the threshold that maximizes the # metric and its two neighbors. The peak of this polynomial is then # used as threshold of classification max_index = metric_values.index(max(metric_values)) if max_index in [0, len(metric_values)-1]: # pathologic cases self.threshold = predictions[max_index] else: polycoeffs = scipy.polyfit(predictions[max_index-1:max_index+2], metric_values[max_index-1:max_index+2], 2) self.threshold = -polycoeffs[1]/(2*polycoeffs[0])
[docs] def start_retraining(self): """ Start retraining phase of this node """ if self.recalibrate: # We remove all old training data since we expect that the # distributions have shifted and thus, the old data does not help to # model the new distributions self.set_permanent_attributes(instances=[])
[docs] def _inc_train(self, data, class_label): """ Provide training data for retraining """ result = self._train(data, class_label) # Recalculate threshold self.calculate_threshold() return result
[docs] def _execute(self, data): """ Shift the data with the new offset """ if self.orientation_up: predicted_label = \ self.classes[1] if data.prediction > self.threshold \ else self.classes[0] else: predicted_label = \ self.classes[1] if data.prediction < self.threshold \ else self.classes[0] # print "data.prediction ", data.prediction # print "self.threshold ", self.threshold # print "self.classifier_threshold ", self.classifier_threshold if self.preserve_score: prediction_score = data.prediction else: prediction_score = data.prediction - \ (self.threshold - self.classifier_threshold) return PredictionVector(label=predicted_label, prediction=prediction_score, predictor=self)
[docs] def _get_metric_fct(self): if self.metric == 'Mutual_information': metric_fct = lambda TP, FP, TN, FN: BinaryClassificationDataset.mutual_information(TN, FN, TP, FP) elif self.metric == 'Normalized_mutual_information': metric_fct = lambda TP, FP, TN, FN: BinaryClassificationDataset.normalized_mutual_information(TN, FN, TP, FP) elif self.metric == "Balanced_accuracy": metric_fct = self.balanced_accuracy elif '{TP}' in self.metric or '{FP}' in self.metric or '{TN}' in self.metric or '{FN}' in self.metric: metric_fct = lambda TP, FP, TN, FN: eval(self.metric.format(TP=float(TP), FP=float(FP), TN=float(TN), FN=float(FN))) elif self.inverse_metric: metric_fct = lambda TP, FP, TN, FN: \ (-1.0)*BinaryClassificationDataset.calculate_confusion_metrics( {"True_negatives": TN, "True_positives": TP, "False_positives": FP, "False_negatives": FN}, weight=self.weight,)[self.metric] else: metric_fct = lambda TP, FP, TN, FN: \ BinaryClassificationDataset.calculate_confusion_metrics( {"True_negatives": TN, "True_positives": TP, "False_positives": FP, "False_negatives": FN}, weight=self.weight,)[self.metric] return metric_fct
[docs] def store_state(self, result_dir, index=None): """ Stores this node in the given directory *result_dir* .. todo:: Documentation! What is stored? And how? """ if self.store: try: # Create metric function lazily since it cannot be pickled metric_fct = self._get_metric_fct() # Determine curve on test data # TODO: Code duplication (mostly already in train) predictions_test = [] labels_test = [] for data, label in self.input_node.request_data_for_testing(): predictions_test.append(data.prediction) labels_test.append(self.classes.index(label)) sort_index = numpy.argsort(predictions_test) labels_test = numpy.array(labels_test)[sort_index] predictions_test = numpy.array(predictions_test)[sort_index] # Determine orientation of hyperplane if self.orientation_up: TP = list(labels_test).count(1) FP = list(labels_test).count(0) TN = 0 FN = 0 else: TP = 0 FP = 0 TN = list(labels_test).count(0) FN = list(labels_test).count(1) self.predictions_test = [[], []] for label, prediction_value, in zip(labels_test, predictions_test): if label == 0 and self.orientation_up: TN += 1 FP -= 1 elif label == 0 and not self.orientation_up: TN -= 1 FP += 1 elif label == 1 and self.orientation_up: FN += 1 TP -= 1 elif label == 1 and not self.orientation_up: FN -= 1 TP += 1 assert (TP >= 0 and FP >= 0 and TN >= 0 and FN >= 0), \ "TP: %s FP: %s TN: %s FN: %s" % (TP, FP, TN, FN) metric_value = metric_fct(TP, FP, TN, FN) self.predictions_test[0].append(prediction_value) self.predictions_test[1].append(metric_value) ### Plot ## import pylab pylab.close() fig_width_pt = 307.28987*2 # Get this from LaTeX using \showthe\columnwidth inches_per_pt = 1.0/72.27 # Convert pt to inches fig_width = fig_width_pt*inches_per_pt # width in inches fig_height =fig_width * 0.5 # height in inches fig_size = [fig_width,fig_height] params = {'axes.labelsize': 10, 'text.fontsize': 8, 'legend.fontsize': 8, 'xtick.labelsize': 10, 'ytick.labelsize': 10} pylab.rcParams.update(params) fig = pylab.figure(0, dpi=400, figsize=fig_size) xmin = min(min(self.predictions_train[0]), min(self.predictions_test[0])) xmax = max(max(self.predictions_train[0]), max(self.predictions_test[0])) ymin = min(min(self.predictions_train[1]), min(self.predictions_test[1])) ymax = max(max(self.predictions_train[1]), max(self.predictions_test[1])) pylab.plot(self.predictions_train[0], self.predictions_train[1], 'b', label='Training data') pylab.plot(self.predictions_test[0], self.predictions_test[1], 'g', label='Unseen test data') pylab.plot([self.classifier_threshold, self.classifier_threshold], [ymin, ymax], 'r', label='Original Threshold', lw=5) pylab.plot([self.threshold, self.threshold], [ymin, ymax], 'c', label='Optimized Threshold', lw=5) pylab.legend(loc = 0) pylab.xlim((xmin, xmax)) pylab.ylim((ymin, ymax)) pylab.xlabel("Threshold value") pylab.ylabel("Metric: %s" % self.metric) # Store plot from pySPACE.tools.filesystem import create_directory import os node_dir = os.path.join(result_dir, self.__class__.__name__) create_directory(node_dir) pylab.savefig(node_dir + os.sep + "threshold_metric.pdf") except: self._log("To many channels chosen for the retained channels! " "Replaced by maximum number.", level=logging.WARNING) super(ThresholdOptimizationNode,self).store_state(result_dir)
_NODE_MAPPING = {"Threshold_Optimization": ThresholdOptimizationNode}