Source code for pySPACE.missions.nodes.sink.classification_performance_sink
# This Python file uses the following encoding: utf-8
# The upper line is needed for one comment in this module.
""" Calculate performance measures from classification results and store them
All performance sink nodes interface to the
:mod:`~pySPACE.resources.dataset_defs.metric` datasets, where the final metric values .
are calculated.
These results can be put together using the
:class:`~pySPACE.resources.dataset_defs.performance_result.PerformanceResultSummary`.
"""
import os
import copy
import warnings
import cPickle
import numpy
import timeit
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.tools.filesystem import create_directory
from pySPACE.resources.dataset_defs.metric import metricdict, \
BinaryClassificationDataset, \
MultinomialClassificationDataset,\
RegressionDataset
import logging
[docs]class PerformanceSinkNode(BaseNode):
""" Calculate performance measures from standard prediction vectors and store them
It takes all classification vectors that are passed on to it
from a continuous classifier, calculates the performance measures and
stores them. The results can be later on collected and merged
into one tabular with the
:class:`~pySPACE.missions.operations.node_chain.NodeChainOperation`.
This one can be read manually or it can be
visualized with a gui.
.. note:: FeatureVectorSinkNode was the initial model of this node.
**Parameters**
:evaluation_type:
Define type of incoming results to be processed.
Currently ``binary``
(:class:`~pySPACE.resources.dataset_defs.metric.BinaryClassificationDataset`)
and ``multinomial``
(:class:`~pySPACE.resources.dataset_defs.metric.MultinomialClassificationDataset`)
classification (also denoted as ``multiclass'' classification) and ``regression`` (even for n-dimensional output)
(:class:`~pySPACE.resources.dataset_defs.metric.RegressionDataset`)
metrics can be calculated.
For the multinomial and regression case
several parameters are not yet important.
These are:
* ir_class
* save_roc_points
* calc_AUC
* calc_soft_metrics
* calc_loss
* sum_up_splits
.. warning:: Multinomial classification and regression have not yet
been used often enough with pySPACE and require
additional testing.
(*optional, default: "binary"*)
:ir_class:
The class name (as string) for which IR statistics are to be output.
(*recommended, default: 'Target'*)
:sec_class:
For binary classification the second class (not the *ir_class*)
can be specified. Normally it is detected by default and not
required, except for one_vs_REST scenarios,
where it can not be determined.
(*optional, default: None*)
:save_individual_classifications:
If True, for every processed split a pickle file will be generated
that contains the numerical classification result (cresult) for every
individual window along with the estimated class label (c_est), the
true class label (c_true) and the number of features used (nr_feat).
The result is a list whose elements correspond to a single window and
have the following shape:
::
[ [c_est, cresult, nr_feat], c_true ]
(*optional, default: False*)
:save_roc_points:
If True, for every processed split a pickle file will be generated
that contains a list of tuples (=points) increasing by FP rate, that
can be used to plot a Receiver Operator Curve (ROC) and a list, that
contains the actually used point in the ROC space together with (0|0)
and (1|1). The result has the following shape:
::
( [(fp_rate_1,tp_rate_1), ... ,(fp_rate_n,tp_rate_n)],
[(0.0,0.0), (fp_rate, tp_rate), (1.0,1.0)])
For comparing ROC curves, you can use the analysis GUI
(*performance_results_analysis.py*).
(*optional, default: False*)
:weight:
weight is the weight for the weighted accuracy. For many scenarios
a relevant performance measure is a combination of
True-Positive-Rate (TPR) and True-Negative-Rate (TNR), where one of the
two might be of higher importance than the other, and thus gets a
higher weight. Essentially, the weighted accuracy is
calculated by
.. math:: \\text{Weighted\_accuracy} = weight \\cdot TPR + (1 - weight) \\cdot TNR
If this parameter is not set, the value equals the balanced accuracy.
In the case of `multinomial` classification, this parameter
has to be a dictionary.
(*optional, default: 0.5*)
:measure_times:
measure the average and maximum time that is needed for the processing
of the data between the last sink node in the node chain and this node.
(*optional, default: True*)
:calc_soft_metrics:
integrate uncertainty of classifier into metric
prediction value is projected to interval [-1,1]
(*optional, default: False*)
:calc_train:
Switch for calculating metrics on the training data
(*optional, default: True*)
:calc_AUC:
Calculate the AUC metric
(*optional, default: True*)
:calc_loss:
Integrates the calculated losses into the final csv-file.
(L1, L2), (LDA, SVM, RMM), (restricted, unrestricted)
and (equal weighted, *balanced*) losses are
calculated in all combinations, resulting in 24 entries.
(*optional, default: True*)
:loss_restriction:
Maximum value of the single loss values.
Everything above is reduced to the maximum.
(*optional, default: 2*)
:sum_up_splits:
If you use a CV-Splitter in your node chain, the performance sink adds up
the basic metrics and calculates confusion matrix metrics with these
values. The other metrics are averaged.
So a lot of more testing examples are relevant for the calculation.
(*optional, default: False*)
:dataset_pattern:
If the __Dataset__ is of the form "X_Y_Z", then this pattern can be
specified with this parameter. The different values X, Y, Z will then
appear in corresponding columns in the results.csv. Example: If the
datasets are of the form "NJ89_20111128_3", and one passes the
dataset_pattern "subject_date_setNr", then the results.csv will have
the columns __Subject__, __Date__ and __SetNr__ with the corresponding
values parsed (note the added underscores and capitalized first letter).
(*optional, default: None*)
:decision_boundary:
If your decision boundary is not at zero you should specify this for
the calculation of metrics depending on the prediction values.
Probabilistic classifiers often have a boundary at 0.5.
(*optional, default: 0.0*)
:save_trace:
Generates a table which contains a confusion matrix over time/samples.
There are two types of traces: short traces and long traces.
The short traces contain only the information, if a classification
was a TP, FN, FP or TN. The long traces furthermore contain
loss values and are saved as a dictionary.
To save only short traces (for, e.g. performance reasons),
set save_trace to ``short``.
To save long and short traces, set save_trace to True.
The encoding in trace is:
:TP: 0
:FN: 1
:FP: 2
:TN: 3
(*optional, default: False*)
**Exemplary Call**
.. code-block:: yaml
-
node : Classification_Performance_Sink
parameters :
ir_class : "Target"
weight : 0.5
:input: PredictionVector
:output: ClassificationDataset
:Author: Mario Krell (mario.krell@dfki.de)
:Created: 2012/08/02
"""
input_types = ["PredictionVector"]
[docs] def __init__(self, classes_names=[], ir_class="Target", sec_class=None,
save_individual_classifications=False, save_roc_points=False,
weight=0.5, measure_times=True, calc_soft_metrics=False,
sum_up_splits=False, dataset_pattern=None, calc_AUC=True,
calc_loss=True, calc_train=True, save_trace=False,
decision_boundary=None, loss_restriction=2,
evaluation_type="binary",
**kwargs):
super(PerformanceSinkNode, self).__init__(**kwargs)
if save_roc_points:
calc_AUC = True
if evaluation_type in ["multinomial", "multiclass"]:
evaluation_type = "multinomial"
save_trace = False
save_roc_points = False
calc_AUC = False
calc_loss = False
calc_soft_metrics = False
sum_up_splits = False
cc = MultinomialClassificationDataset(dataset_pattern=
dataset_pattern)
elif evaluation_type == "binary":
cc = BinaryClassificationDataset(dataset_pattern=dataset_pattern)
elif evaluation_type == "regression":
save_trace = False
save_roc_points = False
calc_AUC = False
calc_loss = False
calc_soft_metrics = False
sum_up_splits = False
cc = RegressionDataset(dataset_pattern=dataset_pattern)
store = \
save_individual_classifications or \
save_roc_points or \
self.store or \
save_trace
self.set_permanent_attributes(
ir_class=ir_class.strip(),
classification_dataset=cc,
classes_names=classes_names,
# determined later on for checks in binary classification
sec_class=sec_class,
weight=weight,
save_individual_classifications=save_individual_classifications,
save_roc_points=save_roc_points,
measure_times=measure_times,
calc_soft_metrics=calc_soft_metrics,
example=None,
sum_up_splits=sum_up_splits,
calc_AUC=calc_AUC,
calc_loss=calc_loss,
decision_boundary=decision_boundary,
loss_restriction=loss_restriction,
calc_train=calc_train,
save_trace=save_trace,
store=store,
evaluation_type=evaluation_type,
invert_classification=False)
[docs] def reset(self):
""" classification_dataset has to be kept over all splits """
# We have to create a temporary reference since we remove
# the self.permanent_state reference in the next step by overwriting
# self.__dict__
tmp = self.permanent_state
# reset should not delete classification dataset
# if you want to delete the dataset just do it explicitly.
tmp["classification_dataset"] = self.classification_dataset
self.__dict__ = copy.copy(tmp)
self.permanent_state = tmp
[docs] def is_trainable(self):
""" Return whether this node is trainable.
.. todo:: Check if return should be False and edit documentation
"""
# Though this node is not really trainable, it returns true in order
# to request the training data from previous notes.
return True
[docs] def is_supervised(self):
""" Return whether this node requires supervised training. """
return True
[docs] def process_current_split(self):
""" Main processing part on test and training data of current split
Performance metrics are calculated for training and test data separately.
Metrics on training data help to detect errors in classifier construction
and to compare in how far it behaves the same way as on testing data.
The function only collects the data, measures execution times
and calls functions to update confusion matrices.
"""
################
### TRAINING ###
################
self._log("Processing training data",level=logging.INFO)
self.train_classification_outcome = []
self.training_time = 0
if self.measure_times:
start_time_stamp = timeit.default_timer()
for classification_vector, label in self.input_node.request_data_for_training(False):
if self.calc_train:
self.set_helper_parameters(classification_vector,label)
self.train_classification_outcome.append((classification_vector, label))
if self.measure_times:
stop_time_stamp = timeit.default_timer()
self.training_time = stop_time_stamp - start_time_stamp
if self.calc_train and self.evaluation_type == "binary" and not self.train_classification_outcome==[]:
if self.decision_boundary is None and self.train_classification_outcome[0][0].predictor.node_name in \
["PlattsSigmoidFitNode",
"LinearFitNode",
"SigmoidTransformationNode"]:
self.decision_boundary=0.5
elif self.decision_boundary is None:
self.decision_boundary=0
train_result = BinaryClassificationDataset.calculate_metrics(
classification_results=self.train_classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
invert_classification=self.invert_classification,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
time_periods=[],
calc_AUC=self.calc_AUC,calc_loss=self.calc_loss,
weight=self.weight,save_roc_points=self.save_roc_points,
decision_boundary=self.decision_boundary)
try:
train_metrics, self.train_R = train_result
except:
train_metrics = train_result
elif self.calc_train and self.evaluation_type == "multinomial":
train_metrics = MultinomialClassificationDataset.calculate_metrics(
classification_results=self.train_classification_outcome,
weight=self.weight, classes=self.classes_names)
elif self.calc_train and self.evaluation_type == "regression":
train_metrics = RegressionDataset.calculate_metrics(
regression_results=self.train_classification_outcome,
weight=self.weight)
elif not self.train_classification_outcome:
train_metrics = metricdict()
###############
### TESTING ###
###############
self._log("Processing testing data",level=logging.INFO)
# for saving the actual numerical classification results
self.classification_outcome = []
# class\guess ir sec
# ir_class: TP FN
# sec_class: FP TN
# initialization to measure execution speed
self.time_periods = []
if self.measure_times:
self.time_periods = []
start_time_stamp = timeit.default_timer()
self.example = None
for classification_vector, label in \
self.input_node.request_data_for_testing():
if self.measure_times:
stop_time_stamp = timeit.default_timer()
self.time_periods.append(stop_time_stamp - start_time_stamp)
self.set_helper_parameters(classification_vector,label)
self.classification_outcome.append((classification_vector, label))
# re-initialization of time before next item is requested
if self.measure_times:
start_time_stamp = timeit.default_timer()
if self.decision_boundary is None and \
len(self.classification_outcome) > 0 and \
self.classification_outcome[0][0].predictor.node_name in \
["PlattsSigmoidFitNode",
"LinearFitNode",
"SigmoidTransformationNode"]:
self.decision_boundary=0.5
elif self.decision_boundary is None:
self.decision_boundary=0
if self.evaluation_type == "binary":
result = BinaryClassificationDataset.calculate_metrics(
classification_results=self.classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
invert_classification=self.invert_classification,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
time_periods=self.time_periods,
calc_AUC=self.calc_AUC,calc_loss=self.calc_loss,
weight=self.weight,save_roc_points=self.save_roc_points,
decision_boundary=self.decision_boundary)
try:
metrics, self.R = result
except:
metrics = result
elif self.evaluation_type=="multinomial":
metrics = MultinomialClassificationDataset.calculate_metrics(
classification_results=self.classification_outcome,
weight=self.weight)
elif self.evaluation_type=="regression":
metrics = RegressionDataset.calculate_metrics(
regression_results=self.classification_outcome,
weight=self.weight)
# add the training time if training was done
if self.measure_times:
metrics["Training_time"] = self.training_time
try:
classifier_information = self.classification_outcome[0][0].\
predictor.classifier_information
except:
classifier_information=dict()
# add the training metrics
if self.calc_train:
skip_keys = classifier_information.keys()
for key,value in train_metrics.items():
if not key in skip_keys:
metrics["train_"+key] = value
self.classification_dataset.add_split(metrics,
train=False,
split=self.current_split,
run=self.run_number)
if self.save_trace:
self.trace, self.long_trace=self.calculate_classification_trace(
classification_results=self.classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
ir_class=self.ir_class,
sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
calc_loss=self.calc_loss,
decision_boundary=self.decision_boundary,
save_trace=self.save_trace)
self._log("Metrics added to dataset",level=logging.INFO)
[docs] def set_helper_parameters(self, classification_vector, label):
""" Fetch some node parameters from the classification vector """
# get an example for a classification vector for further analysis
if self.example is None:
self.example = classification_vector
if not self.evaluation_type == "binary":
return
try:
self.decision_boundary = \
classification_vector.predictor.classifier_information["decision_boundary"]
except:
pass
if self.decision_boundary is None and classification_vector.predictor.node_name in \
["PlattsSigmoidFitNode",
"LinearFitNode",
"SigmoidTransformationNode"]:
self.decision_boundary = 0.5
elif self.decision_boundary is None:
self.decision_boundary = 0
if (self.example.prediction > self.decision_boundary and
self.example.label == self.ir_class) or \
(self.example.prediction <= self.decision_boundary and not
self.example.label == self.ir_class):
self.invert_classification = False
elif self.evaluation_type == "binary":
self.invert_classification = True
warnings.warn(
"Your ir_class did not get the higher value " +
"from the classifier.\n " +
"Label %s, got value %f.\n" % (self.example.label,
self.example.prediction) +
"You should adjust that and " +
"maybe switch the given class_labels or add " +
"preserve_score: False to the " +
"Threshold_Optimization node! " +
"Furthermore you should check the parameter" +
": decision_boundary!")
if self.evaluation_type == "binary":
if self.sec_class is None:
p_label = classification_vector.label.strip()
if self.sec_class is None and not (p_label == self.ir_class):
self.sec_class = p_label
@classmethod
[docs] def calculate_classification_trace(cls,classification_results,
calc_soft_metrics=False,
ir_class="Target", sec_class=None,
loss_restriction=2.0,
calc_loss=False,
decision_boundary=0.0,
save_trace=True):
""" Calculate the classification trace, i.e. TN,TP,FN,FP for every sample
The trace entries are encoded for size reasons as short (trace) or
in a comprehensive version as dicts (long_trace)
The encoding in trace is:
:TP: 0
:FN: 1
:FP: 2
:TN: 3
:Author: Hendrik Woehrle (hendrik.woehrle@dfki.de), Mario Krell (mario.krell@dfki.de)
:Returns: trace, long_trace
"""
trace = []
long_trace = []
for prediction_vector,label in classification_results:
if sec_class is None and not label == ir_class:
sec_class = label
confusion_matrix = metricdict(float)
BinaryClassificationDataset.update_confusion_matrix(prediction_vector,
label,calc_soft_metrics=calc_soft_metrics,
ir_class=ir_class, sec_class=sec_class,
confusion_matrix=confusion_matrix,
decision_boundary=decision_boundary)
#
if calc_loss and not save_trace == "short":
BinaryClassificationDataset.update_loss_values(classification_vector=prediction_vector,
label=label,
ir_class=ir_class, sec_class=sec_class,
loss_dict=confusion_matrix,
loss_restriction=loss_restriction)
if not save_trace == "short":
long_trace.append(confusion_matrix)
if confusion_matrix["True_positives"] == 1:
trace.append(0)
elif confusion_matrix["False_negatives"] == 1:
trace.append(1)
elif confusion_matrix["False_positives"] == 1:
trace.append(2)
elif confusion_matrix["True_negatives"] == 1:
trace.append(3)
else:
raise ValueError("At least one element in the confusion matrix should be 1")
return trace, long_trace
[docs] def get_result_dataset(self):
""" Return the result dataset """
if not self.sum_up_splits:
return self.classification_dataset
else:
self.classification_dataset.merge_splits()
return self.classification_dataset
[docs] def store_state(self, result_dir, index=None):
""" Stores additional information (classification_outcome, roc_points) in the *result_dir* """
if self.store:
node_dir = os.path.join(result_dir, self.__class__.__name__)
create_directory(node_dir)
if self.save_individual_classifications:
name = 'classification_outcome_sp%s.pickle' % self.current_split
result_file = open(os.path.join(node_dir, name), "wb")
# predictor is a reference to the actual classification node
# object. This can not be pickled! Therefore, replace the
# predictor attribute by the classification node's node_specs
for single_classification in self.classification_outcome:
single_classification[0].predictor = \
single_classification[0].predictor.node_specs
result_file.write(cPickle.dumps(self.classification_outcome, protocol=2))
result_file.close()
if self.save_roc_points:
name = 'roc_points_sp%s.pickle' % self.current_split
result_file = open(os.path.join(node_dir, name), "wb")
result_file.write(cPickle.dumps(self.R, protocol=2))
result_file.close()
if self.save_trace:
name = 'trace_sp%s.pickle' % self.current_split
result_file = open(os.path.join(node_dir, name), "wb")
result_file.write(cPickle.dumps(self.trace, protocol=2))
result_file.close()
if len(self.long_trace) > 0:
name = 'long_trace_sp%s.pickle' % self.current_split
result_file = open(os.path.join(node_dir, name), "wb")
result_file.write(cPickle.dumps(self.long_trace, protocol=2))
result_file.close()
[docs]class LeaveOneOutSinkNode(PerformanceSinkNode):
""" Request the leave one out metrics from the input node
**Parameters**
see: :class:`PerformanceSinkNode`
**Exemplary Call**
.. code-block:: yaml
-
node : LOO_Sink
parameters :
ir_class : "Target"
"""
[docs] def process_current_split(self):
""" Get training results and input node metrics """
### TRAINING ### # code copy till main part
self.train_classification_outcome = []
if self.measure_times:
start_time_stamp = timeit.default_timer()
for classification_vector, label in \
self.input_node.request_data_for_training(False):
if self.calc_train:
self.set_helper_parameters(classification_vector,label)
self.train_classification_outcome.append((classification_vector, label))
if self.measure_times:
stop_time_stamp = timeit.default_timer()
self.training_time = stop_time_stamp - start_time_stamp
if self.calc_train:
train_result = BinaryClassificationDataset.calculate_metrics(
classification_results=self.train_classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
invert_classification=self.invert_classification,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
time_periods=[],
calc_AUC=self.calc_AUC, calc_loss=self.calc_loss,
weight=self.weight, save_roc_points=self.save_roc_points,
decision_boundary=self.decision_boundary)
try:
train_metrics,self.train_R = train_result
except:
train_metrics = train_result
######################### Main Part #########################
try:
metrics = copy.deepcopy(self.train_classification_outcome[0][0].predictor.loo_metrics)
except AttributeError:
warnings.warn("Input node does not provide LOO metrics.")
metrics = metricdict(float)
#############################################################
# add the training time #Code copy from here
if self.measure_times:
metrics["Training_time"] = self.training_time
# add the training metrics
if self.calc_train:
try:
classifier_information = \
self.train_classification_outcome[0][0].predictor.\
classifier_information
except:
classifier_information=dict()
skip_keys=classifier_information.keys()
for key,value in train_metrics.items():
if not key in skip_keys:
metrics["train_"+key] = value
self.classification_dataset.add_split(metrics,
train=False,
split=self.current_split,
run=self.run_number)
[docs]class SlidingWindowSinkNode(PerformanceSinkNode):
""" Calculate and store performance measures from classifications of sliding windows
This node inherits most of its functionality from *PerformanceSinkNode*.
Thus, for parameter description of super class parameters see documentation
of *PerformanceSinkNode*.
Additionally the following functionality is provided:
1) The definition of uncertain areas, which are excluded in the metrics
calculation process, are possible. This is useful for sliding window
classification, i.e. if the true label is not known in each sliding step.
2) It is possible to label the test data only now. For that an epoch signal
(e.g. a movement marker window) must be specified.
3) Instead of excluding sliding windows from classifier evaluation, the
'true' label function shape (a step function, which is zero for the
negative class and one for the positive class) can be somehow fit in the
uncertain range. At the moment there is only one way for doing this:
* from_right_count_negatives: Find the point where prediction of the
negative class starts by searching backwards
in time. There can be specified how many
'outliers' are ignored, i.e. how stable the
prediction has to be.
**Parameters**
:uncertain_area:
A list of tuples of the lower and the upper time value (in ms) for which
no metrics calculation is done. The values should be given with respect
to the last window of an epoch, i.e. sliding window series (which has
time value zero).
If additionally *determine_labels* is specified then the first tuple of
*uncertain_area* describes the bounds in which the label-change-point is
determined. The lower bound should be the earliest time point when the
detection makes sense; the upper bound should be the earliest time point
when there MUST BE a member of the positive class.
(*optional, default: None*)
:sliding_step:
The time (in ms) between two consecutive windows.
(*optional, default: 50*)
:determine_labels:
If specified the label-change-point (index where the class label changes
from negative to positive class) is determined for every epoch. This is
done via counting the occurrence of negative classified sliding windows
from the index point where the positive class is sure
(uncertain_area[1]) to the index point where the negative class is sure
(uncertain_area[0]) If *determine_labels* instances were found in
consecutively windows the label-change-point is has been reached.
If *determine_labels* > 1, the methods accounts for outliers.
.. note:: Using this option makes it hard to figure out to which true
class errors pertain (since it is somehow arbitrary). You
should be careful which metric you analyze for performance
evaluation (different class instance costs can't be modeled).
:epoch_signal:
The class name (label) of the event that marks the end of an epoch,
e.g. the movement. This can be used when null_marker windows (of an
unknown class) and a signal window which marks the event were cut out.
With respect to this event the former windows will be relabeled
according to *classes_names*.
(*optional, default: None*)
:epoch_eval:
If True, evaluation is done per epoch, i.e. per movement. Performance
metrics are averaged across epochs for every split. This option might
be necessary if the epochs have variable length, i.e. the class
distribution alters in every epoch.
(*optional, default: False*)
:save_score_plot:
If True a plot is stored which shows the average prediction value
against the time point of classification.
(*optional, default: False*)
:save_trial_plot:
If True a plot is stored which shows developing of the prediction
scores for each single trial.
(*optional, default: False*)
:save_time_plot:
If True a plot is stored which shows the predicted labels for all
trials across time.
(*optional, default: False*)
:sort:
If True the data has to be sorted according to the time (encoded in the
tag attribute. Be aware that this only makes sense for data sets with
unique time tags.
(*optional, default: False*)
:unused_win_defs:
List of window definition names which shall not be used for evaluation.
(*optional, default: []*)
**Exemplary Call**
.. code-block:: yaml
-
node : Sliding_Window_Performance_Sink
parameters :
ir_class : "LRP"
classes_names : ['NoLRP','LRP']
uncertain_area : ['(-600,-350)']
calc_soft_metrics : True
save_score_plot : True
:input: PredictionVector
:output: ClassificationCollection
:Author: Anett Seeland (anett.seeland@dfki.de)
:Created: 2011/01/23
"""
[docs] def __init__(self,
uncertain_area=None, sliding_step=50, save_score_plot=False,
save_trial_plot=False, save_time_plot=False,
determine_labels=None, epoch_eval=False, epoch_signal=None,
sort=False, unused_win_defs=[], **kwargs):
if epoch_eval:
kwargs["save_roc_points"] = False
kwargs["calc_AUC"] = False
super(SlidingWindowSinkNode,self).__init__(**kwargs)
self.set_permanent_attributes(uncertain_area=uncertain_area,
sliding_step=sliding_step,
determine_labels=determine_labels,
epoch_signal=epoch_signal,
epoch_eval=epoch_eval,
save_score_plot=save_score_plot,
save_trial_plot=save_trial_plot,
save_time_plot=save_time_plot,
sort=sort,
unused_win_defs=unused_win_defs)
if self.store == False:
self.store = save_score_plot or save_trial_plot or save_time_plot
[docs] def process_current_split(self):
""" Compute for the current split of training and test data performance
one sliding windows.
"""
### TRAINING ###
# Code from classificationSinkNode #
self._log("Processing training data", level=logging.INFO)
self.train_classification_outcome = []
self.training_time = 0
if self.measure_times:
start_time_stamp = timeit.default_timer()
for classification_vector, label in self.input_node.request_data_for_training(False):
if classification_vector.specs["wdef_name"] in self.unused_win_defs:
continue
if self.calc_train:
self.set_helper_parameters(classification_vector,label)
self.train_classification_outcome.append((classification_vector,
label))
if self.measure_times:
stop_time_stamp = timeit.default_timer()
self.training_time = stop_time_stamp - start_time_stamp
# we assume that in the training case no sliding windows are used, i.e.,
# the windows have a known true label
if self.calc_train and self.evaluation_type == "binary" and not self.train_classification_outcome==[]:
if self.decision_boundary is None and \
self.train_classification_outcome[0][0].predictor.node_name in [
"PlattsSigmoidFitNode", "LinearFitNode", "SigmoidTransformationNode"]:
self.decision_boundary = 0.5
elif self.decision_boundary is None:
self.decision_boundary = 0
train_result = BinaryClassificationDataset.calculate_metrics(
classification_results=self.train_classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
invert_classification=self.invert_classification,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
time_periods=[],
calc_AUC=self.calc_AUC,calc_loss=self.calc_loss,
weight=self.weight,save_roc_points=self.save_roc_points,
decision_boundary=self.decision_boundary)
try:
train_metrics,self.train_R = train_result
except:
train_metrics = train_result
elif self.calc_train and self.evaluation_type == "multinomial":
train_metrics = MultinomialClassificationDataset.calculate_metrics(
classification_results=self.train_classification_outcome,
weight=self.weight)
elif self.calc_train and self.evaluation_type == "regression":
train_metrics = RegressionDataset.calculate_metrics(
regression_results=self.train_classification_outcome,
weight=self.weight)
elif not self.train_classification_outcome:
train_metrics = metricdict()
# TESTING
self._log("Processing testing data",level=logging.INFO)
# for saving the actual numerical classification results
self.classification_outcome = []
# class\guess ir sec
# ir_class: TP FN
# sec_class: FP TN
# initialization to measure execution speed
self.time_periods = []
if self.measure_times:
start_time_stamp = timeit.default_timer()
for classification_vector, label in \
self.input_node.request_data_for_testing():
if self.measure_times:
stop_time_stamp = timeit.default_timer()
self.time_periods.append(stop_time_stamp - start_time_stamp)
# parse 'tag': 'Epoch Start: 395772ms; End: 396772ms; Class: Target'
classification_vector.specs['start_time']= \
float(classification_vector.tag.split(';')[0].split(':')[1].strip('ms'))
classification_vector.specs['end_time']= \
float(classification_vector.tag.split(';')[1].split(':')[1].strip('ms'))
self.set_helper_parameters(classification_vector,label)
self.classification_outcome.append((classification_vector,label))
if self.measure_times:
start_time_stamp = timeit.default_timer()
if self.sort:
# sort classification vectors in time
self.classification_outcome.sort(key=lambda tupel:tupel[0].specs['start_time'])
if self.decision_boundary is None and len(self.classification_outcome) \
> 0 and self.classification_outcome[0][0].predictor.node_name \
in ["PlattsSigmoidFitNode", "LinearFitNode",
"SigmoidTransformationNode"]:
self.decision_boundary = 0.5
elif self.decision_boundary is None:
self.decision_boundary = 0
self.data_time = dict()
if self.epoch_signal is not None:
marker = 0
self.data_time[marker] = []
# split according to signal
for classification_vector, label in self.classification_outcome:
if label == self.epoch_signal:
marker += 1
self.data_time[marker] = []
else:
self.data_time[marker].append((classification_vector, label))
del self.data_time[marker]
else:
# split windows according to the time
last_window_end_time = 0.0
marker = -1
for classification_vector, label in self.classification_outcome:
if classification_vector.specs['start_time'] > \
last_window_end_time or \
classification_vector.specs['end_time'] < \
last_window_end_time:
marker += 1
self.data_time[marker] = [(classification_vector, label)]
elif classification_vector.specs['end_time'] == \
last_window_end_time + self.sliding_step:
self.data_time[marker].append((classification_vector,
label))
elif "bis-2000" in classification_vector.specs['wdef_name']:
marker += 1
self.data_time[marker] = [(classification_vector, label)]
else:
# TODO: overlapping epochs - what shall we do???
# may be store it with marker = -1 and handle it afterwards
self._log("Error: Overlapping epochs in Sink detected!",
level=logging.ERROR)
#raise Exception("Overlapping epochs in Sink detected!")
last_window_end_time = classification_vector.specs['end_time']
# delete uncertain classification outcomes or relabel data in
# self.classification_outcome and calculate the confusion matrix
self.classification_outcome = []
self.label_change_points = []
performance = None
for k in self.data_time.keys():
if self.determine_labels:
# calculate uncertain indices
nr_sliding_windows = len(self.data_time[k])
if self.uncertain_area!=None:
bound_indices = range(nr_sliding_windows - \
abs(self.uncertain_area[0][0])/self.sliding_step - 1,
nr_sliding_windows-abs(self.uncertain_area[0][1])/ \
self.sliding_step)
if len(self.uncertain_area)>1:
uncertain_indices = []
for t in self.uncertain_area[1:]:
uncertain_indices.extend(range(nr_sliding_windows - \
abs(t[0])/self.sliding_step - 1,
nr_sliding_windows-abs(t[1])/ \
self.sliding_step))
else:
uncertain_indices = []
else: # if not specified, assume unbound
bound_indices = range(nr_sliding_windows)
uncertain_indices = []
label_change_point = self.from_right_count_negatives(
self.data_time[k], self.determine_labels, bound_indices)
self.label_change_points.append(label_change_point)
for index, (classification_vector, label) \
in enumerate(self.data_time[k]):
if index not in uncertain_indices:
if index < label_change_point: # assume neg class
self.classification_outcome.append(
(classification_vector, self.classes_names[0]))
else: # assume that last elem in trial has correct label
self.classification_outcome.append(
(classification_vector,
self.data_time[k][-1][1]))
else:
# calculate uncertain indices
if self.uncertain_area!=None:
nr_sliding_windows = len(self.data_time[0])
uncertain_indices = []
for t in self.uncertain_area:
uncertain_indices.extend(range(nr_sliding_windows - \
abs(t[0])/self.sliding_step - 1,
nr_sliding_windows-abs(t[1]) / self.sliding_step))
else:
uncertain_indices = []
for index, (classification_vector, label) \
in enumerate(self.data_time[k]):
if index not in uncertain_indices:
if self.epoch_signal:
if index < uncertain_indices[0]: # negative class
new_label = self.classes_names[0]
else: # assume last elem in trial has correct label
new_label = self.data_time[k][-1][1]
self.classification_outcome.append(
(classification_vector, new_label))
if self.epoch_eval:
result = self.get_result_metrics()
if performance == None:
performance = result
else: # combine with old performance
new_performance = result
performance = self.combine_perf_dict(performance,
new_performance, k+1)
self.classification_outcome = []
if not self.epoch_eval:
result = self.get_result_metrics()
try:
performance, self.R = result
except:
performance = result
# add the training time
if self.measure_times:
performance["Training_time"] = self.training_time
try:
classifier_information = self.classification_outcome[0][0].\
predictor.classifier_information
except:
classifier_information = dict()
# add the training metrics
if self.calc_train:
skip_keys = classifier_information.keys()
for key, value in train_metrics.items():
if not key in skip_keys:
performance["train_"+key] = value
if self.determine_labels:
performance["~~Avg_Label_Change_Index~~"] = \
numpy.mean(self.label_change_points)
self.classification_dataset.add_split(performance, train=False,
split=self.current_split,
run=self.run_number)
if self.save_trace:
self.trace, self.long_trace=self.calculate_classification_trace(
classification_results=self.classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
calc_loss=self.calc_loss,
decision_boundary=self.decision_boundary,
save_trace=self.save_trace)
self._log("Metrics added to dataset",level=logging.INFO)
[docs] def get_result_metrics(self):
""" Calculate metrics based on evaluation type """
if self.evaluation_type == 'binary':
result = BinaryClassificationDataset.calculate_metrics(
classification_results=self.classification_outcome,
calc_soft_metrics=self.calc_soft_metrics,
invert_classification=self.invert_classification,
ir_class=self.ir_class, sec_class=self.sec_class,
loss_restriction=self.loss_restriction,
time_periods=self.time_periods, weight=self.weight,
calc_AUC=self.calc_AUC, calc_loss=self.calc_loss,
save_roc_points=self.save_roc_points,
decision_boundary=self.decision_boundary)
elif self.evaluation_type == 'multinomial':
result = MultinomialClassificationDataset.calculate_metrics(
classification_results=self.classification_outcome,
weight=self.weight)
elif self.evaluation_type=="regression":
result = RegressionDataset.calculate_metrics(
regression_results=self.classification_outcome,
weight=self.weight)
return result
[docs] def from_right_count_negatives(self, y, target_number, bounds):
"""Go through the bounded y (reverse) and find the index i, where
target_number values have been consecutively the negative class.
Return i+target_number as critical index point (labels change)"""
countNegatives = 0
countTotal = 0
for index in range(len(y)-1,-1,-1):
if index not in bounds:
continue
countTotal+=1
if y[index][0].label==self.classes_names[0]:
countNegatives+=1
if countNegatives==target_number:
if countTotal==target_number:
return bounds[-1]
else:
return index+target_number
else:
countNegatives=0
return bounds[0]+countNegatives
[docs] def combine_perf_dict(self, old_dict, new_dict, weight):
""" Combine the values of the dicts by a weighting (iterative) average
.. math:: \\frac{weight-1}{weight} \\cdot \\text{old\_dict} + \\frac{1}{weight} \\cdot \\text{new\_dict}
"""
return_dict = dict()
for key in old_dict.keys():
try:
return_dict[key] = (weight-1.0)/weight * old_dict[key] + \
1.0/weight * new_dict[key]
except TypeError: # for Strings (like parameters)
# they should not be different among epochs
if old_dict[key] == new_dict[key]:
return_dict[key] = old_dict[key]
return return_dict
[docs] def store_state(self, result_dir, index=None):
""" Stores additional information in the given directory *result_dir* """
if self.store:
node_dir = os.path.join(result_dir, self.__class__.__name__)
create_directory(node_dir)
super(SlidingWindowSinkNode,self).store_state(result_dir)
if self.save_score_plot or self.save_trial_plot or self.save_time_plot:
import matplotlib.pyplot as plt
data = [[pv.prediction for pv, _ in self.data_time[k]] for \
k in self.data_time.keys()]
if self.save_time_plot:
label_data = [numpy.array([(pv.label,
float(pv.tag.split(';')[1].split(':')[1][:-2])) \
for pv, _ in self.data_time[k]]) \
for k in self.data_time.keys()]
fig = plt.figure()
ax1 = plt.subplot(111)
ax1.yaxis.grid(True, linestyle='-', which='major', color='grey',
alpha=0.5)
ax1.xaxis.grid(True, linestyle='-', which='major', color='grey',
alpha=0.5)
for trial in label_data:
ind = numpy.arange(-1.0*(len(trial)-1)*self.sliding_step,
50, 50)
x = [ind[i] for i, (label, start_time) in enumerate(trial) \
if label == self.ir_class]
y = [start_time] * len(x)
if x == []:
plt.plot(ind[0],start_time,'ro')
else:
plt.plot(x, y,'bo')
plt.xlabel("Time (ms)")
plt.ylabel("Start time of trial (s)")
name = 'trail-time_plot_sp%s.pdf' % self.current_split
plt.savefig(os.path.join(node_dir,name),dpi=None,facecolor='w',
edgecolor='w',orientation='portrait',papertype=None,
format=None,transparent=False)
plt.close(fig)
if self.save_score_plot:
max_time = max([len(trial) for trial in data])
data_across_time = [[] for time_step in range(max_time)]
for trial in data:
for i, elem in enumerate(trial[::-1]):
data_across_time[i].append(elem)
means = [numpy.mean(time_step) for time_step in data_across_time]
stds = [numpy.std(time_step) for time_step in data_across_time]
ind = numpy.arange(-1.0*(max_time-1)*self.sliding_step, 50, 50)
fig = plt.figure()
ax1 = plt.subplot(111)
ax1.yaxis.grid(True, linestyle='-', which='major', color='grey',
alpha=0.5)
ax1.xaxis.grid(True, linestyle='-', which='major', color='grey',
alpha=0.5)
width = 30
plt.bar(ind, means[::-1], width, color='r', yerr=stds[::-1],
ecolor='black')
ax1.set_xlim(ind[0]-width,ind[-1]+width)
ax1.set_ylim(min(means)-max(stds),max(means)+max(stds))
t = result_dir.split('/')[-2].strip('{}').split("}{")
title = "SVM score average: "+str(t[0])+" "+str(t[1:])
plt.title(title, size=10)
plt.ylabel('Average SVM prediction value')
plt.xlabel('Time point of classification [ms]')
params = str(t[0])
for i in range(2,len(t)):
params += str(t[i])
name = 'average_score_plot_sp%s_%s.pdf' % (self.current_split,
params)
plt.savefig(os.path.join(node_dir, name), dpi=None, format=None,
facecolor='w', edgecolor='w', papertype=None,
orientation='portrait', transparent=False)
plt.close(fig)
if self.save_trial_plot:
fig = plt.figure()
num_trials = len(self.data_time.keys())
num_rows = num_cols = int(round(numpy.sqrt(num_trials)+0.5))
fig.set_size_inches((10*num_cols, 6*num_rows))
ymin = numpy.inf
ymax = -1.0*numpy.inf
for trial in data:
ymin = min([ymin,min(trial)])
ymax = max([ymax,max(trial)])
for i in range(num_trials):
ind = numpy.arange(-1.0*(len(data[i])-1)*self.sliding_step,
50, 50)
ax = plt.subplot(num_rows, num_cols, i+1)
ax.yaxis.grid(True, linestyle='-', which='major',
color='grey', alpha=0.5)
ax.xaxis.grid(True, linestyle='-', which='major',
color='grey', alpha=0.5)
plt.plot(ind, data[i], 'o')
ax.set_xlim(ind[0],ind[-1])
ax.set_ylim(ymin,ymax)
start = self.data_time[i][0][0].tag.split(';')[1].split(':')[1]
end = self.data_time[i][-1][0].tag.split(';')[1].split(':')[1]
title = "Classification of "+self.data_time[i][-1][0].specs['wdef_name']+" from "+start+" to "+end
plt.title(title, size=10)
plt.ylabel('SVM prediction value')
plt.xlabel('Time point of classification [ms]')
if self.label_change_points != []:
x = (self.label_change_points[i] - len(data[i]) + 1) \
* self.sliding_step
# plot a line when positive class starts
plt.plot([x,x],[ymin,ymax])
x_search_low = self.uncertain_area[0][0]
x_search_up = self.uncertain_area[0][1]
# plot dashed lines to indicate bounds (where the label
# change point could be)
plt.plot([x_search_low,x_search_low],[ymin,ymax],'r--')
plt.plot([x_search_up, x_search_up], [ymin,ymax],'r--')
#fig.text(0.4, 0.98, "Sliding window classifications; BA: " + \
# str(self.performance["Balanced_accuracy"]), size=14)
name = 'single-trails_score_plot_sp%s.pdf' % self.current_split
plt.savefig(os.path.join(node_dir,name),dpi=None,facecolor='w',
edgecolor='w',orientation='portrait',papertype=None,
format=None,transparent=False)
plt.close(fig)
# Specify special node names
_NODE_MAPPING = {"Classification_Performance_Sink": PerformanceSinkNode,
"ClassificationSinkNode": PerformanceSinkNode,
"Sliding_Window_Performance_Sink": SlidingWindowSinkNode,
"LOO_Sink":LeaveOneOutSinkNode,
"LOOSink":LeaveOneOutSinkNode,
}