""" Ensemble classifiers
http://en.wikipedia.org/wiki/Ensemble_learning
Current implementations use gating for training ensemble methods.
Each gating function expects as input a special kind of
:class:`~pySPACE.resources.data_types.prediction_vector.PredictionVector`: Each
component in the vector should correspond to the classification of one node chain
of the ensembles (i.e. the dimensionality should be equal to the cardinality
of the ensemble and each value of the vector should be one of the prediction
scores and you should get a list of labels).
This can be created using the
:class:`~pySPACE.missions.nodes.meta.same_input_layer.ClassificationFlowsLoader`
or the
:class:`~pySPACE.missions.nodes.meta.same_input_layer.SameInputLayerNode`.
"""
from collections import defaultdict
import heapq
import numpy
from pySPACE.missions.nodes.base_node import BaseNode
# the output is of Gating Functions is a prediction vector
from pySPACE.resources.data_types.prediction_vector import PredictionVector
import logging
[docs]class ProbVotingGatingNode(BaseNode):
""" Add up prediction values for labels to find out most probable label
**Parameters**
:enforce_absolute_values:
Switch to map the prediction values to their absolute value.
(*optional, default:False*)
**Exemplary Call**
.. code-block:: yaml
-
node : ProbVotingGating
:Author: Mario M. Krell (mario.krell@dfki.de)
:Created: 2012/10/01
"""
[docs] def __init__(self, enforce_absolute_values=False, **kwargs):
super(ProbVotingGatingNode, self).__init__(**kwargs)
self.set_permanent_attributes(enforce_absolute_values=enforce_absolute_values)
[docs] def _execute(self,data):
""" Label with highest sum of prediction values wins """
pred = defaultdict(float)
for i,label in enumerate(data.label):
if self.enforce_absolute_values:
pred[label] += abs(data.prediction[i])
else:
pred[label] += data.prediction[i]
res = sorted(pred.items(), key=lambda t: t[1])
best = res[-1]
return PredictionVector(prediction=best[1],
label=best[0], predictor=self)
[docs]class LabelVotingGatingNode(ProbVotingGatingNode):
""" Gating function to classify based on the majority vote
This gating function counts how often each class occurs in the feature
vectors. It assigns the instance to the class that got the most votes.
It does not require training.
If there is no clear vote, the base class is used.
**Parameters**
see: base node documentation
**Exemplary Call**
.. code-block:: yaml
-
node : LabelVotingGating
:Author: Mario M. Krell (mario.krell@dfki.de)
:Created: 2012/10/01
"""
# def __init__(self, **kwargs):
# super(VotingGatingNode, self).__init__(**kwargs)
[docs] def _execute(self, data):
""" Executes the classifier on the given data vector *data* """
prediction_value = numpy.mean([prediction for prediction in data.prediction])
votes_counter = defaultdict(int)
for label in data.label:
votes_counter[label] += 1
voting = sorted((votes, label) for label, votes in votes_counter.iteritems())
max_label = [label for votes, label in voting if votes == voting[-1][0]]
if len(max_label) == 1:
majority_vote = voting[-1][1]
return PredictionVector(prediction=prediction_value,
label=majority_vote, predictor=self)
else:
relevant_indices = [index for index,label in enumerate(data.label) if label in max_label]
new_data = PredictionVector(prediction = [data.prediction[i] for i in relevant_indices],
label = [data.label[i] for i in relevant_indices],
predictor =[data.predictor[i] for i in relevant_indices])
return super(LabelVotingGatingNode, self)._execute(new_data)
[docs]class PrecisionWeightedGatingNode(BaseNode):
""" Gating function to classify based on weighted majority vote
This gating function computes weights for the ensemble's classification results based on
training data. These weights are set based on the relative
precision (compared to the other classification results) on the predicted class.
If more than *required_vote_ratio* of the sum of weighted votes are for class
1, than this node classifies as class 1 from *class_labels*, else as
class 2 from *class_labels*.
**Parameters**
:class_labels:
Determines the order of the two classes.
This is important, when you want that the prediction
value is negative for the first class and
positive for the other one.
Here it is used to define the relevant class
for the voting.
:required_vote_ratio:
Determines the value the weighted sum of votes has to exceed
to classify for the first class.
The acceptable range is from zero to one,
where zero means, classification is always class one
and one means, classification is class two if and only if
all the votes are for class one.
(*optional, default: 0.5*)
**Exemplary Call**
.. code-block:: yaml
-
node : Precision_Weighted_Gating_Function
parameters :
class_labels : ["Target","Standard"]
required_vote_ratio : 0.25
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2010/05/21
"""
[docs] def __init__(self, class_labels, required_vote_ratio=0.5, **kwargs):
super(PrecisionWeightedGatingNode, self).__init__(**kwargs)
assert len(class_labels) == 2, \
"%s can only be used for binary classification tasks!" % self.__class__.__name__
self.set_permanent_attributes(class_labels = class_labels,
required_vote_ratio = required_vote_ratio,
classification_counter = None,
correct_classification_counter = None,
weights = None)
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, class_label):
if self.classification_counter == None:
self.classification_counter = [defaultdict(int) for i in range(len(data.label))]
self.correct_classification_counter = [defaultdict(int) for i in range(len(data.label))]
for index1 in range(len(data.label)):
# Count how often each classifier (index1) classifies instances to the
# two classes
self.classification_counter[index1][data.label[index1].strip()] +=1
if data.label[index1].strip() == class_label:
# Count the how often each classifier (index1) classifies instances
# of the two classes correctly
self.correct_classification_counter[index1][data.label[index1].strip()] +=1
[docs] def _stop_training(self, debug=False):
# Initialization
precisions = [dict() for i in range(len(self.correct_classification_counter))]
acc_precision = defaultdict(float)
self.weights = [dict() for i in range(len(self.correct_classification_counter))]
for class_label in self.class_labels:
for i in range(len(self.correct_classification_counter)):
# Compute the precision of classifier "i" on class "class_label"
if self.classification_counter[i][class_label] > 0:
precision = float(self.correct_classification_counter[i][class_label])\
/ self.classification_counter[i][class_label]
else:
precision = 0
precisions[i][class_label] = precision
# Compute the accumulated precision
acc_precision[class_label] += precision
# Set weights to their relative contribution to the
# accumulated precision.
# Note: This is not a very well-founded way of computing weights
for i in range(len(self.correct_classification_counter)):
if not acc_precision[class_label] == 0:
self.weights[i][class_label] = precisions[i][class_label] / acc_precision[class_label]
else:
self._log("ZeroDevision problem occurred. Check Classifiers for class %s."%class_label, level = logging.CRITICAL)
self.weights[i][class_label] = 1
super(PrecisionWeightedGatingNode, self)._stop_training()
[docs] def _execute(self, data):
""" Executes the classifier on the given data vector *data* """
# Count weighted votes for the two classes
votes_counter = defaultdict(int)
for index, prediction in enumerate(data.label):
votes_counter[prediction] += self.weights[index][prediction.strip()]
# Compute ratio of votes that voted for class 1
vote_ratio = \
float(votes_counter[self.class_labels[0]]) / sum(votes_counter.values())
# If this ratio is above the threshold "self.required_vote_ratio",
# classify instance as class 1 else as class 2
vote = self.class_labels[0] if vote_ratio >= self.required_vote_ratio \
else self.class_labels[1]
return PredictionVector(prediction = vote_ratio, label = vote, predictor = self)
[docs]class ChampionGatingNode(BaseNode):
""" Gating function to classify with the classifier that performs best on training data
This gating function evaluates the ensemble classifiers on the training data.
It picks the classifier that maximizes the F-Measure on the *relevant_class*
and uses this one to classify instances from the test data.
**Parameters**
:relevant_class:
Determines the class being relevant for the F-measure calculation.
(*optional, default: first occurring class in training phase*)
**Exemplary Call**
.. code-block:: yaml
-
node : Champion_Gating_Function
parameters :
relevant_class : "Target"
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2010/05/21
"""
[docs] def __init__(self, relevant_class=None, **kwargs):
super(ChampionGatingNode, self).__init__(**kwargs)
self.set_permanent_attributes(relevant_class = relevant_class,
confusion_matrix = None,
chosen_index = None)
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
if self.relevant_class==None:
self.relevantclass=label
if self.confusion_matrix == None:
self.confusion_matrix = [defaultdict(float) for i in range(len(data.label))]
for index in range(len(data.label)):
if data.label[index].strip() == self.relevant_class:
if label == self.relevant_class:
self.confusion_matrix[index]["tp"] +=1
else:
self.confusion_matrix[index]["fp"] +=1
else:
if label == data.label[index].strip():
self.confusion_matrix[index]["tn"] +=1
else:
self.confusion_matrix[index]["fn"] +=1
[docs] def _stop_training(self):
# Compute the f-measures on the chosen class
f_measures = []
for index in range(len(self.confusion_matrix)):
if (self.confusion_matrix[index]["tp"] + self.confusion_matrix[index]["fp"]) > 0:
precision = self.confusion_matrix[index]["tp"] / (self.confusion_matrix[index]["tp"] + self.confusion_matrix[index]["fp"])
else:
precision = 0.0
recall = self.confusion_matrix[index]["tp"] / (self.confusion_matrix[index]["tp"] + self.confusion_matrix[index]["fn"])
if precision + recall > 0:
f_measures.append(2*precision*recall/(precision + recall))
else:
f_measures.append(0.0)
# Choose classifier that maximizes F-Measure
self.chosen_index = f_measures.index(max(f_measures))
[docs] def _execute(self, data):
""" Executes the classifier on the given data vector *data* """
return PredictionVector(prediction = data.prediction[self.chosen_index], label = data.label[self.chosen_index],
predictor = data.predictor[self.chosen_index])
[docs]class RidgeRegressionGatingNode(BaseNode):
""" Gating function using ridge regression to learn weighting
This method performs ridge regression solving the linear least
squares solution with Tikhonov regularization:
weights = (A^TA + Tau^T Tau)^-1 * A^T b
where A is the feature matrix, b is the class vector and Tau is
the Tikhonov regularization matrix.
It classifies as class 1 from *class_labels* if the dot product of
weights and data is larger than the the *classification_threshold* else
as class 2 from *class_labels*.
The regularization matrix is diag(regularization_coefficient**0.5).
.. todo:: Implement the usage of prediction values
**Parameters**
:class_labels:
Determines the order of the two classes.
This is important, when you want that the prediction
value is negative for the first class and
positive for the other one.
Here it is used to define the relevant class
where the resulting voting value has to exceed the threshold.
(*optional, default:["Standard","Target"]*)
:use_labels:
Should determine whether the labels are mapped to
-1 and 1 or if the prediction value is used.
NOT yet implemented!
(*optional, default:True*)
:regularization_coefficient:
Necessary parameter of the Tikhanov regularization.
As a default this is not active.
(*optional, default:0.0*)
:classification_threshold:
Threshold which has to be exceeded by regression,
such that the sample is classified with the second class.
(*optional, default:0.0*)
**Exemplary Call**
.. code-block:: yaml
-
node : Ridge_Regression_Gating_Function
parameters :
class_labels : ["Target","Standard"]
regularization_coefficien : 0.0
classification_threshold : 0.2
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2010/05/21
"""
[docs] def __init__(self, class_labels=["Standard","Target"], use_labels=True,
regularization_coefficient=0.0,
classification_threshold=0.0, **kwargs):
super(RidgeRegressionGatingNode, self).__init__(**kwargs)
self.set_permanent_attributes(class_labels = class_labels,
use_labels = use_labels,
regularization_coefficient = regularization_coefficient,
classification_threshold = classification_threshold,
A = None,
b = None,
weights= None)
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, class_label):
# Collect data and corresponding class labels in two lists
if self.A == None:
self.A = []
self.b = []
if self.use_labels:
assert (hasattr(data.label, "__len__"))
# TODO: Check mapping
self.A.append(map(lambda x: self.class_labels.index(x.strip())*2-1,
data.label))
else:
self.A.append(data.prediction)
self.b.append(self.class_labels.index(class_label)*2-1)
[docs] def _stop_training(self, debug=False):
# This method performs ridge regression solving the linear least
# squares solution with Tikhonov regularization:
# weights = (A^TA + Tau^T Tau)^-1 * A^T b
# where Tau is the Tikhonov regularization matrix
assert len(self.class_labels) == 2, \
"%s can only be used for binary classification tasks!" % self.__class__.__name__
A = numpy.array(self.A)
b = numpy.array(self.b)
tau = numpy.diag([self.regularization_coefficient
for i in range(A.shape[1])])
try:
self.weights = numpy.dot(numpy.linalg.inv(numpy.dot(A.T, A) + tau),
numpy.dot(A.T, b))
except numpy.linalg.LinAlgError:
raise numpy.linalg.LinAlgError("Singular matrix. Choose a larger "
"regularization coefficient!")
super(RidgeRegressionGatingNode, self)._stop_training()
[docs] def _execute(self, data):
""" Executes the classifier on the given data vector *data*
Classifies as class 1 if the dot product of weights and data
is larger than the the classification threshold else as class 2.
.. todo:: Check mapping"""
data = map(lambda x: self.class_labels.index(x.strip())*2-1, data.label)
value = numpy.dot(self.weights, data)
vote = self.class_labels[1] if value > self.classification_threshold else self.class_labels[0]
return PredictionVector(prediction = value, label = vote, predictor = self)
[docs]class KNNGatingNode(BaseNode):
""" Gating function based on k-Nearest-Neighbors
**Parameters**
:n: Number of considered neighbors
(*optional, default: 1*)
**Exemplary Call**
.. code-block:: yaml
-
node : :KNN_Gating_Function
parameters :
n : 1
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2010/05/21
"""
[docs] def __init__(self, n=1, **kwargs):
super(KNNGatingNode, self).__init__(**kwargs)
self.set_permanent_attributes(n = n,
training_examples = [])
[docs] def is_trainable(self):
""" Returns whether this node is trainable. """
return True
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
self.training_examples.append((data, label))
[docs] def _execute(self, data):
""" Executes the classifier on the given data vector *data* """
distance_fct = lambda x,y: sum((numpy.array(x) != numpy.array(y)))
label_distance = ((label, distance_fct(training_data.label, data.label))
for training_data, label in self.training_examples)
n_smallest_labels = map(lambda x: x[0],
heapq.nsmallest(self.n, label_distance, key=lambda x: x[1]))
votes_counter = defaultdict(int)
for label in n_smallest_labels:
votes_counter[label] += 1
voting = sorted((votes, label) for label, votes in votes_counter.iteritems())
majority_vote = voting[-1][1]
return PredictionVector(label = majority_vote, predictor = self)
_NODE_MAPPING = {"Voting_Gating_Function": LabelVotingGatingNode,
"Precision_Weighted_Gating_Function" : PrecisionWeightedGatingNode,
"Champion_Gating_Function" : ChampionGatingNode,
"Ridge_Regression_Gating_Function": RidgeRegressionGatingNode,
"KNN_Gating_Function" : KNNGatingNode}