""" SVM variants using the SOR or dual gradient descent algorithm
All these variants have their offset in the target function.
SOR is used as abbreviation for Successive Overrelaxation.
"""
import numpy
from numpy import dot
import matplotlib.pyplot as plt
import scipy.spatial.distance
import logging
import warnings
#import matplotlib as mpl
#mpl.rcParams['text.usetex'] = True
#mpl.rcParams['text.latex.unicode'] = True
# the output is a prediction vector
import sys
from pySPACE.missions.nodes.decorators import BooleanParameter, NoOptimizationParameter,\
ChoiceParameter, QLogUniformParameter
from pySPACE.resources.data_types.prediction_vector import PredictionVector
from pySPACE.missions.nodes.classification.base import RegularizedClassifierBase
# needed for speed up
# order of examined samples is shuffled
import random
import copy
# needed for loo-metrics
from pySPACE.resources.dataset_defs.metric import BinaryClassificationDataset
@ChoiceParameter("version", ["samples", "matrix"])
@BooleanParameter("squared_loss")
[docs]class SorSvmNode(RegularizedClassifierBase):
""" Classify with 2-norm SVM relaxation using the SOR algorithm
This node extends the algorithm with some variants.
SOR means successive overrelaxation.
The offset b becomes part of the target function, which simplifies
the optimization algorithm and allows for some dual gradient descent.
For further details, have a look at the given references
and the *reduced_descent* which is an elemental processing step.
**References**
========= ==========================================================================================
main source: M&M (matrix version)
========= ==========================================================================================
author Mangasarian, O. L. and Musicant, David R.
title Successive Overrelaxation for Support Vector Machines
journal IEEE Transactions on Neural Networks
year 1998
volume 10
pages 1032--1037
========= ==========================================================================================
========= ==========================================================================================
minor source: Numerical Recipes (randomization)
========= ==========================================================================================
author Press, William H. and Teukolsky, Saul A. and Vetterling, William T. and Flannery, Brian P.
title Numerical Recipes 3rd Edition: The Art of Scientific Computing
year 2007
isbn 0521880688, 9780521880688
edition 3
publisher Cambridge University Press
address New York, NY, USA
========= ==========================================================================================
========= ==========================================================================================
minor source: sample version
========= ==========================================================================================
author Hsieh, Cho-Jui and Chang, Kai-Wei and Lin, Chih-Jen and Keerthi, S. Sathiya and Sundararajan, S.
title A dual coordinate descent method for large-scale linear SVM <http://doi.acm.org/10.1145/1390156.1390208>`_
booktitle Proceedings of the 25th international conference on Machine learning
series ICML '08
year 2008
isbn 978-1-60558-205-4
location Helsinki, Finland
pages 408--415
numpages 8
doi 10.1145/1390156.1390208
acmid 1390208
publisher ACM
address New York, NY, USA
========= ==========================================================================================
**Parameters**
Most parameters are already included into the
:class:`RegularizedClassifierBase <pySPACE.missions.nodes.classification.base.RegularizedClassifierBase>`.
:random:
*Numerical recipes* suggests to randomize the order of alpha.
*M&M* suggest to sort the alpha by their magnitude.
(*optional, default: False*)
:omega:
Descent factor of optimization algorithm. Should be between 0 and 2!
*Numerical recipes* uses 1.3 and *M&M* choose 1.0.
(*optional, default: 1.0*)
:version:
Using the *matrix* with the scalar products or using only the
*samples* and track changes in w and b for fast calculations.
Both versions give totally the same result but they are available for
comparison.
Samples is mostly a bit faster.
For kernel usage only *matrix* is possible.
(*optional, default: "samples"*)
:reduce_non_zeros:
In the inner loops, indices are rejected, if they loose there support.
(*optional, default: True*)
:calc_looCV:
Calculate the leave-one-out metrics on the training data
(*optional, default: False*)
:offset_factor:
Reciprocal weight, for offset treatment in the model
:0: Use no offset
:1: Normal affine approach from augmented feature vectors
:high: Only small punishment of offset, enabling larger offsets
(*danger of numerical instability*)
If 0 is used, the offset b is set to zero, otherwise it is used via
augmented feature vectors with different augmentation factors.
The augmentation value corresponds to 1/*offset_factor*,
where 1/0 corresponds to infinity.
(*optional, default: 1*)
:squared_loss:
Use L2 loss (optional) instead of L1 loss (default).
(*optional, default: False*)
In the implementation we do not use the name alpha but dual_solution for the
variables of the dual optimization problem,
which is optimized with this algorithm.
As a stopping criterion we use the maximum change to be less than some tolerance.
**Exemplary Call**
.. code-block:: yaml
-
node : SOR
parameters :
complexity : 1.0
weight : [1,3]
debug : True
store : True
class_labels : ['Standard', 'Target']
:input: FeatureVector
:output: PredictionVector
:Author: Mario Michael Krell (mario.krell@dfki.de)
:Created: 2012/06/27
"""
[docs] def __init__(self, random=False, omega=1.0,
max_iterations=numpy.inf,
version="samples", reduce_non_zeros=True,
calc_looCV=False, squared_loss=False, offset_factor=1,
**kwargs):
self.old_difference=numpy.inf
# instead of lists, arrays are concatenated in training
if "use_list" in kwargs:
self._log("Got use_list argument. Overwriting with False")
kwargs["use_list"] = False
super(SorSvmNode, self).__init__(**kwargs)
if not(version in ["samples", "matrix"]):
self._log("Version %s is not available. Default to 'samples'!"%version, level=logging.WARNING)
version = "samples"
if not self.kernel_type == 'LINEAR' and not version == "matrix":
self._log("Version %s is not available for nonlinear" % version +
"kernel. Default to 'matrix'!", level=logging.WARNING)
version = "matrix"
if self.tolerance > 0.1 * self.complexity:
self.set_permanent_attributes(tolerance=0.1*self.complexity)
warnings.warn("Using to high tolerance." +
" Reduced to 0.1 times complexity (tolerance=%f)."
% self.tolerance)
if float(offset_factor) or offset_factor >= 0:
offset_factor = float(offset_factor)
else:
warnings.warn(
"'offset_factor' parameter must be nonnegative float. " +
"But it is '%s'. Now set to 1." % str(offset_factor))
offset_factor = 1
if not squared_loss:
squ_factor = 0.0
else:
squ_factor = 1.0
# Weights for soft margin (dependent on class or time)
ci = []
# Mapping from class to value of classifier (-1,1)
bi = []
self.set_permanent_attributes(random=random,
omega=omega,
max_iterations_factor=max_iterations,
max_sub_iterations=numpy.inf,
iterations=0,
sub_iterations=0,
version=version,
M=None,
reduce_non_zeros=reduce_non_zeros,
calc_looCV=calc_looCV,
offset_factor=offset_factor,
squ_factor=squ_factor,
ci=ci,
bi=bi,
num_samples=0,
dual_solution=None,
max_iterations=42,
b=0
)
[docs] def _execute(self, x):
""" Executes the classifier on the given data vector in the linear case
prediction value = <w,data>+b
"""
if self.zero_training and self.num_samples == 0:
self.w = numpy.zeros(x.shape[1], dtype=numpy.float)
self.b = 0.0
self.dual_solution = numpy.zeros(self.num_samples)
return PredictionVector(label=self.classes[0], prediction=0,
predictor=self)
if self.kernel_type == 'LINEAR':
return super(SorSvmNode, self)._execute(x)
# else:
data = x.view(numpy.ndarray)
data = data[0,:]
prediction = self.b
for i in range(self.num_samples):
dual = self.dual_solution[i]
if not dual == 0:
prediction += dual * self.bi[i] * \
self.kernel_func(data, self.samples[i])
# Look up class label
# prediction --> {-1,1} --> {0,1} --> Labels
if prediction >0:
label = self.classes[1]
else:
label = self.classes[0]
return PredictionVector(label=label, prediction=prediction,
predictor=self)
[docs] def _stop_training(self, debug=False):
""" Forward process to complete training cycle """
if not self.is_trained:
self._complete_training(debug)
self.relabel_training_set()
[docs] def _complete_training(self, debug=False):
""" Train the SVM with the SOR algorithm on the collected training data
"""
self._log("Preprocessing of SOR SVM")
self._log("Instances of Class %s: %s, %s: %s"
% (self.classes[0],
self.labels.count(self.classes.index(self.classes[0])),
self.classes[1],
self.labels.count(self.classes.index(self.classes[1]))))
# initializations of relevant values and objects #
self.calculate_weigts_and_class_factors()
self.num_samples = len(self.samples)
self.max_iterations = self.max_iterations_factor*self.num_samples
self.dual_solution = numpy.zeros(self.num_samples)
if self.version == "matrix" and self.kernel_type == "LINEAR":
self.A = numpy.array(self.samples)
self.D = numpy.diag(self.bi)
self.M = dot(self.D,
dot(dot(self.A, self.A.T) + self.offset_factor *
numpy.ones((self.num_samples, self.num_samples)),
self.D))
elif self.version == "samples" and self.kernel_type == "LINEAR":
self.M = [1 / (numpy.linalg.norm(self.samples[i])**2.0
+ self.offset_factor
+ self.squ_factor / (2 * self.ci[i]))
for i in range(self.num_samples)]
# changes of w and b are tracked in the samples version
self.w = numpy.zeros(self.dim, dtype=numpy.float)
self.b = 0.0
else: # kernel case
# iterative calculation of M
self.M = numpy.zeros((self.num_samples, self.num_samples))
for i in range(self.num_samples):
bi = self.bi[i]
si = self.samples[i]
for j in range(self.num_samples):
if i > j:
self.M[i][j] = self.M[j][i]
else:
self.M[i][j] = bi * self.bi[j] * (
self.kernel_func(si, self.samples[j]) +
self.offset_factor)
## SOR Algorithm ##
self.iteration_loop(self.M)
self.classifier_information["~~Solver_Iterations~~"] = self.iterations
## calculate leave one out metrics ##
if self.calc_looCV:
self.looCV()
[docs] def looCV(self):
""" Calculate leave one out metrics """
# remember original solution
optimal_w = copy.deepcopy(self.w)
optimal_b = copy.deepcopy(self.b)
optimal_dual_solution = copy.deepcopy(self.dual_solution)
# preparation of sorting
sort_dual = self.dual_solution
# sort indices --> zero weights do not need any changing and
# low weights are less relevant for changes
sorted_indices = map(list, [numpy.argsort(sort_dual)])[0]
sorted_indices.reverse()
prediction_vectors = []
using_initial_solution = True
for index in sorted_indices:
d_i = self.dual_solution[index]
# delete each index from the current observation
if d_i == 0 and using_initial_solution:
# no change in classifier necessary
pass
else:
# set weight to zero and track the corresponding changes
self.reduce_dual_weight(index)
# reiterate till convergence but skip current index
temp_iter = self.iterations
self.iteration_loop(self.M, reduced_indices=[index])
self.iterations += temp_iter
using_initial_solution = False
prediction_vectors.append((
self._execute(numpy.atleast_2d(self.samples[index])),
self.classes[self.labels[index]]))
self.loo_metrics = BinaryClassificationDataset.calculate_metrics(
prediction_vectors,
ir_class=self.classes[1],
sec_class=self.classes[0])
# undo changes
self.b = optimal_b
self.w = optimal_w
self.dual_solution = optimal_dual_solution
[docs] def reduce_dual_weight(self, index):
""" Change weight at index to zero """
if self.version == "sample":
old_weight = self.dual_solution[index]
self.update_classification_function(delta=-old_weight, index=index)
else:
# the matrix algorithm doesn't care for the old weights
pass
self.dual_solution[index] = 0
[docs] def calculate_weigts_and_class_factors(self):
""" Calculate weights in the loss term and map label to -1 and 1 """
self.num_samples=0
for label in self.labels:
self.num_samples += 1
self.append_weights_and_class_factors(label)
#care for zero sum
[docs] def append_weights_and_class_factors(self, label):
""" Mapping between labels and weights/class factors
The values are added to the corresponding list.
"""
if label == 0:
self.bi.append(-1)
self.ci.append(self.complexity*self.weight[0])
else:
self.bi.append(1)
self.ci.append(self.complexity*self.weight[1])
[docs] def iteration_loop(self, M, reduced_indices=[]):
""" The algorithm is calling the :func:`reduced_descent<pySPACE.missions.nodes.classifiers.ada_SVM.SORSVMNode.reduced_descent>` method in loops over alpha
In the first step it uses a complete loop over all components of alpha
and in the second inner loop only the non zero alpha are observed till
come convergence criterion is reached.
*reduced_indices* will be skipped in observation.
"""
## Definition of tracking variables ##
self.iterations = 0
self.difference = numpy.inf
## outer iteration loop ##
while (self.difference > self.tolerance and
self.iterations <= self.max_iterations):
# inner iteration loop only on active vectors/alpha (non zero) ##
self.sub_iterations = 0
# sorting or randomizing non zero indices
# arrays are mapped to lists for later iteration
sort_dual = self.dual_solution
num_non_zeros = len(map(list,sort_dual.nonzero())[0])
max_values = len(map(list,
numpy.where(sort_dual == sort_dual.max()))[0])
# sort the entries of the current dual
# and get the corresponding indices
sorted_indices = map(list,[numpy.argsort(sort_dual)])[0]
if num_non_zeros == 0 or num_non_zeros==max_values:
# skip sub iteration if everything is zero or maximal
non_zero_indices = []
else:
non_zero_indices = sorted_indices[-num_non_zeros:-max_values]
for index in reduced_indices:
try:
non_zero_indices.remove(index)
except ValueError:
pass
if self.random:
random.shuffle(non_zero_indices)
self.max_sub_iterations = self.max_iterations_factor * \
len(non_zero_indices) * 0.5
while (self.difference > self.tolerance and
self.sub_iterations < self.max_sub_iterations
and self.iterations < self.max_iterations):
## iteration step ##
self.reduced_descent(self.dual_solution, M, non_zero_indices)
## outer loop ##
if not (self.iterations < self.max_iterations):
break
# For the first run, the previous reduced descent is skipped
# but for retraining it is important
# to have first the small loop, since normally, this is sufficient.
# Furthermore having it at the end simplifies the stop criterion
self.max_sub_iterations = numpy.inf
self.total_descent(self.dual_solution, M, reduced_indices)
## Final solution ##
# in the case without kernels, we have to calculate the result
# by hand new for each incoming sample
if self.version == "matrix":
self.b = self.offset_factor * dot(self.dual_solution, self.bi)
# self.w = self.samples[0]*self.dual_solution[0]*self.bi[0]
# for i in range(self.num_samples-1):
# self.w = self.w + self.bi[i+1] * self.samples[i+1] *
# self.dual_solution[i+1]
if self.kernel_type == "LINEAR":
self.w = numpy.array([dot(dot(self.A.T, self.D),
self.dual_solution)]).T
elif self.version == "samples" and self.kernel_type == "LINEAR":
# w and b are pre-computed in the loop
# transferring of 1-d array to 2d array
# self.w = numpy.array([self.w]).T
pass
[docs] def reduced_descent(self, current_dual, M, relevant_indices):
""" Basic iteration step over a set of indices, possibly subset of all
The main principle is to make a descent step with just one index,
while fixing the other dual_solutions.
The main formula comes from *M&M*:
.. math::
d = \\alpha_i - \\frac{\\omega}{M[i][i]}(M[i]\\alpha-1)
\\text{with } M[i][j] = y_i y_j(<x_i,x_j>+1)
\\text{and final projection: }\\alpha_i = \\max(0,\\min(d,c_i)).
Here we use c for the weights for each sample in the loss term,
which is normally complexity times corresponding class weight.
y is used for the labels, which have to be 1 or -1.
In the *sample* version only the diagonal of M is used.
The sum with the alpha is tracked by using the classification vector w
and the offset b.
.. math::
o = \\alpha_i
d = \\alpha_i - \\frac{\\omega}{M[i][i]}(y_i(<w,x_i>+b)-1)
\\text{with projection: }\\alpha_i = \\max(0,\\min(d,c_i)),
b=b+(\\alpha_i-o)y_i
w=w+(\\alpha_i-o)y_i x_i
"""
self.irrelevant_indices = []
self.difference = 0
for i in relevant_indices:
old_dual = current_dual[i]
### Main Function ###
### elemental update step of SOR algorithm ###
if self.version == "matrix":
# this step is kernel independent
x = old_dual - self.omega / (
M[i][i] + self.squ_factor/(2 * self.ci[i])) * \
(dot(M[i], current_dual) - 1)
elif self.version == "samples":
xi = self.samples[i]
bi = self.bi[i]
x = old_dual - self.omega * (M[i]) * \
(bi * (dot(xi.T, self.w) + self.b) - 1 +
self.squ_factor * old_dual / (2 * self.ci[i]))
# map dual solution to the interval [0,C]
if x <= 0:
self.irrelevant_indices.append(i)
current_dual[i] = 0
elif not self.squ_factor:
current_dual[i] = min(x, self.ci[i])
else:
current_dual[i] = x
if self.version == "matrix":
delta = (current_dual[i] - old_dual)
# update w and b in samples case
if self.version == "samples":
delta = (current_dual[i] - old_dual) * bi
# update classification function parameter w and b
# self.update_classification_function(delta=delta, index=i)
self.b = self.b + self.offset_factor * delta
self.w = self.w + delta * xi
current_difference = numpy.abs(delta)
if current_difference > self.difference:
self.difference = current_difference
self.sub_iterations += 1
self.iterations += 1
if not (self.sub_iterations < self.max_sub_iterations
and self.iterations < self.max_iterations):
break
if self.reduce_non_zeros:
for index in self.irrelevant_indices:
try:
relevant_indices.remove(index)
except:
# special mapping for RMM case
if index < self.num_samples:
relevant_indices.remove(index+self.num_samples)
else:
relevant_indices.remove(index-self.num_samples)
if self.random:
random.shuffle(relevant_indices)
[docs] def update_classification_function(self,delta, index):
""" update classification function parameter w and b """
bi = self.bi[index]
self.b = self.b + self.offset_factor * delta * bi
self.w = self.w + delta * bi * self.samples[index]
[docs] def project(self, value, index):
""" Projection method of *soft_relax* """
if value <= 0:
self.irrelevant_indices.append(index)
return 0
else:
return min(value, self.ci[index])
[docs] def total_descent(self, current_dual, M, reduced_indices=[]):
""" Different sorting of indices and iteration over all indices
.. todo:: check, which parameters are necessary
"""
if not self.random:
sort_dual = current_dual
# sort the entries of the current dual
# and get the corresponding indices
sorted_indices = map(list, [numpy.argsort(sort_dual)])[0]
# highest first
sorted_indices.reverse()
else:
sorted_indices = range(self.num_samples)
random.shuffle(sorted_indices)
for index in reduced_indices:
sorted_indices.remove(index)
self.reduced_descent(current_dual, M, sorted_indices)
# Code for forgetting strategies
[docs] def remove_no_border_points(self, retraining_required):
""" Discard method to remove all samples from the training set that are
not in the border of their class.
The border is determined by a minimum distance from the center of
the class and a maximum distance.
:param retraining_required: flag if retraining is
required (the new point is a potential sv or a removed
one was a sv)
"""
# get centers of each class
targetSamples = [s for (s, l) in zip(self.samples, self.labels)\
if l == 1] # self.classes.index("Target")]
standardSamples = [s for (s, l) in zip(self.samples, self.labels)\
if l == 0] # self.classes.index("Standard")]
if self.training_set_ratio == "KEEP_RATIO_AS_IT_IS":
# subtract one from the class for which a new sample was added
num_target = len(targetSamples) - (self.labels[-1] == 1)
num_standard = len(standardSamples) - (self.labels[-1] == 0)
num_target = 1.0 * num_target / (num_target + num_standard) * \
self.basket_size
num_standard = self.basket_size - num_target
# mean vector of each class (its center)
mTarget = numpy.mean(targetSamples, axis=0)
mStandard = numpy.mean(standardSamples, axis=0)
# euclidean distance between the class centers
R = scipy.spatial.distance.euclidean(mTarget, mStandard)
if self.show_plot:
dim = numpy.shape(self.samples)[1]
if dim == 2:
self.plot_class_borders(
mStandard, mTarget, R,
self.scale_factor_small, self.scale_factor_tall)
# get distance of each point to its class center
distances = []
for i, (s, l) in enumerate(zip(self.samples, self.labels)):
if i >= len(self.dual_solution):
ds = 1.0
else:
ds = self.dual_solution[i]
if l == self.classes.index("Target"):
r_1 = scipy.spatial.distance.euclidean(s, mTarget)
r_2 = scipy.spatial.distance.euclidean(s, mStandard)
distances.append([i, s, l, r_1, ds, r_2/(r_1+r_2)])
else:
r_1 = scipy.spatial.distance.euclidean(s, mStandard)
r_2 = scipy.spatial.distance.euclidean(s, mTarget)
distances.append([i, s, l, r_1, ds, r_2/(r_1+r_2)])
if self.border_handling == "USE_ONLY_BORDER_POINTS":
# remove all points that are not in the border (in a specific
# radius) around the center
# does not guarantee that demanded number of samples are
# contained in the new training set
distances = filter(lambda x: (
self.scale_factor_small*R < x[3] < self.scale_factor_tall*R) or
x[4] != 0, distances)
# sort according to weight
distances.sort(key=lambda x: x[5])
# pay attention to the basket size
distances = distances[:self.basket_size]
elif self.border_handling == "USE_DIFFERENCE":
# take that point that differ most
# first sort by distance,
# support vectors are prioritized by (x[4]==0), then sort by weight
distances.sort(key=lambda x:\
(abs(x[3] - \
((self.scale_factor_tall - \
self.scale_factor_small) / 2.0) * R)\
* (x[4] == 0\
and x[0] != len(self.samples)),\
x[5]))
else:
# use only support vectors and new data point
distances = filter(lambda x: x[4] != 0 \
or x[0] == len(self.samples), distances)
if self.border_handling == "USE_ONLY_BORDER_POINTS":
# pay attention to the basket size
distances = distances[:self.basket_size]
elif self.training_set_ratio == "KEEP_RATIO_AS_IT_IS":
distances_tmp = []
for d in distances:
if d[2] == 1 and num_target > 0:
num_target -= 1
distances_tmp.append(d)
elif d[2] == 0 and num_standard > 0:
num_standard -= 1
distances_tmp.append(d)
distances = distances_tmp
elif self.training_set_ratio == "BALANCED_RATIO":
distances_tmp = []
num_target = 0
num_standard = 0
for d in distances:
if d[2] == 1 and num_target < (self.basket_size/2):
num_target += 1
distances_tmp.append(d)
elif d[2] == 0 and num_standard < (self.basket_size/2):
num_standard += 1
distances_tmp.append(d)
distances = distances_tmp
else:
# pay attention to the basket size
distances = distances[:self.basket_size]
[idxs, _, _, _, _, _] = zip(*distances)
retraining_required = self.remove_samples(list(
set(numpy.arange(self.num_samples)) - set(idxs))) \
or retraining_required
return retraining_required
[docs] def add_new_sample(self, data, class_label=None, default=False):
""" Add a new sample to the training set.
:param data: A new sample for the training set.
:type data: list of float
:param class_label: The label of the new sample.
:type class_label: str
:param default: Specifies if the sample is added to the current
training set or to a future training set
:param default: bool
"""
# use a separate knowledge base when old samples will be totally removed
if (self.discard_type == "CDT" or self.discard_type == "INC_BATCH")\
and default is False:
self.future_samples.append(data)
self.future_labels.append(class_label)
# the sample size for the new knowledge base is limited
# to basket size, so pop oldest
while len(self.future_samples) > self.basket_size:
self.future_samples.pop(0)
self.future_labels.pop(0)
else: # (copy from *incremental_training*)
# add new data
self._train_sample(data, class_label)
# here it is important to use the mapped label
self.append_weights_and_class_factors(self.labels[-1])
self.num_samples += 1
# The new example is at first assumed to be irrelevant (zero weight)
if self.dual_solution is None:
self.dual_solution = numpy.zeros(1)
else:
self.dual_solution = numpy.append(self.dual_solution, 0.0)
# update of the relevant matrix
if self.version == "matrix":
# very inefficient!!!
M = self.M
self.M = numpy.zeros((self.num_samples, self.num_samples))
self.M[:-1, :-1] = M
del M
bj = self.bi[-1]
d = self.samples[-1]
# calculation of missing entries of matrix M by hand
for i in range(self.num_samples):
self.M[-1, i] = bj*self.bi[i]*(
self.kernel_func(d, self.samples[i]) +
self.offset_factor)
self.M[i, -1] = self.M[-1, i]
elif self.version == "samples":
# very efficient :)
if self.M is None:
self.M = []
self.M.append(1.0/(numpy.linalg.norm(self.samples[-1])**2.0 +
self.offset_factor +
self.squ_factor / (2 * self.ci[-1])))
[docs] def remove_samples(self, idxs):
""" Remove the samples at the given indices from the training set.
:param: idxs: Indices of the samples to remove.
:type: idxs: list of int
:rtype: bool - True if a support vector was removed.
"""
ret = False
# reverse sort of indices
# this enables removing first the higher indices such that the low
# indices are still valid and do not need to be shifted
# according to the removed index
idxs.sort(reverse=True)
for idx in idxs:
# TODO: reduce efficiently the training size (tests)
if not self.dual_solution[idx] == 0:
ret = True
self.reduce_dual_weight(idx)
self.samples.pop(idx)
self.labels.pop(idx)
self.ci.pop(idx)
self.bi.pop(idx)
if self.add_type == "UNSUPERVISED_PROB":
self.decisions.pop(idx)
self.dual_solution = numpy.delete(self.dual_solution, idx)
self.num_samples -= 1
# update of the relevant matrix
if self.version == "matrix":
# very inefficient!!!
M_temp = numpy.delete(self.M, idx, axis=0)
del self.M
self.M = numpy.delete(M_temp, idx, axis=1)
elif self.version == "samples":
# very efficient :)
self.M.pop(idx)
return ret
[docs] def remove_non_support_vectors(self):
""" Remove all samples that are no support vectors.
"""
idxs = numpy.where(self.dual_solution == 0.0)
self.remove_samples(list(idxs[0]))
[docs] def incremental_training(self, data, class_label):
""" Warm Start Implementation by Mario Michael Krell
The saved status of the algorithm, including the Matrix M, is used
as a starting point for the iteration.
Only the problem has to be lifted up one dimension.
"""
self._train_sample(data, class_label)
# here it is important to use the mapped label
self.append_weights_and_class_factors(self.labels[-1])
self.num_samples += 1
# The new example is at first assumed to be irrelevant (zero weight).
if self.dual_solution is None:
self.dual_solution = numpy.zeros(1)
else:
self.dual_solution = numpy.append(self.dual_solution, 0.0)
# update of the relevant matrix
if self.version == "matrix":
# very inefficient!!!
M = self.M
self.M = numpy.zeros((self.num_samples, self.num_samples))
self.M[:-1, :-1] = M
del M
bj = self.bi[-1]
d = self.samples[-1]
# calculation of missing entries of matrix M by hand
for i in range(self.num_samples):
self.M[-1, i] = bj*self.bi[i]*(
self.kernel_func(d,self.samples[i])+self.offset_factor)
self.M[i, -1] = self.M[-1, i]
elif self.version == "samples":
# very efficient :)
if self.M is None:
self.M = []
self.M.append(1.0/(numpy.linalg.norm(self.samples[-1])**2.0 +
self.offset_factor +
self.squ_factor / (2 * self.ci[-1])))
prediction = self._execute(data)
if not prediction.label == class_label or \
abs(prediction.prediction) < 1:
if self.version == "matrix":
# relevant parameters for getting w and b
# updates should be done using old variables
self.A = numpy.array(self.samples)
self.D = numpy.diag(self.bi)
temp_iter = self.iterations
self.iteration_loop(self.M)
self.iterations += temp_iter
[docs] def retrain_SVM(self):
""" Retrain the svm with the current training set """
# reset all parameters
self.old_difference = numpy.inf
# start retraining process (copy from *incremental_training*)
if self.version == "matrix":
# relevant parameters for getting w and b
# updates should be done using old variables
self.A = numpy.array(self.samples)
self.D = numpy.diag(self.bi)
temp_iter = self.iterations
self.iteration_loop(self.M)
self.iterations += temp_iter
self.future_samples = []
self.future_labels = []
if self.discard_type == "CDT":
self.learn_CDT()
[docs] def visualize(self):
""" Show the training samples, the support vectors if possible and the
current decision function.
"""
dim = numpy.shape(self.samples)[1]
if dim == 2:
ax = plt.gca()
ax.set_xlabel(r'$x_0$')
ax.set_ylabel(r'$x_1$')
super(SorSvmNode, self).plot_samples()
super(SorSvmNode, self).plot_hyperplane()
super(SorSvmNode, self).plot_support_vectors()
elif dim == 3:
ax = plt.gca(projection='3d')
ax.set_xlabel(r'$x_0$')
ax.set_ylabel(r'$x_1$')
ax.set_zlabel(r'$x_2$')
super(SorSvmNode, self).plot_samples_3D()
super(SorSvmNode, self).plot_hyperplane_3D()
if dim == 2 or dim == 3:
plt.draw()
if self.save_plot is True:
imagename = "%s/tmp%010d.png"\
% (self.plot_storage, self.m_counter_i)
self.m_counter_i += 1
plt.savefig(imagename)
_NODE_MAPPING = {"SOR": SorSvmNode}