Source code for pySPACE.missions.nodes.classification.svm_variants.brmm

""" Relative Margin Machines (original and some variants) """

# the output is a prediction vector
import timeit

from pySPACE.missions.nodes.decorators import NoOptimizationParameter, BooleanParameter, QNormalParameter, \
    ChoiceParameter, QLogUniformParameter, LogUniformParameter
from pySPACE.resources.data_types.prediction_vector import PredictionVector
from pySPACE.missions.nodes.classification.base import RegularizedClassifierBase
# classification vector may be saved as a feature vector
from pySPACE.resources.data_types.feature_vector import FeatureVector
from pySPACE.resources.data_types.time_series import TimeSeries
# timeout package
import signal

import numpy
from numpy import dot

from pySPACE.missions.nodes.classification.base import TimeoutException

import warnings
import logging

import random
import copy

from pySPACE.resources.dataset_defs.metric import BinaryClassificationDataset

from pySPACE.missions.nodes.classification.svm_variants.external import LibSVMClassifierNode

from pySPACE.missions.nodes.base_node import BaseNode


@NoOptimizationParameter("use_list")
@NoOptimizationParameter("omega")
@QLogUniformParameter("max_iterations", min_value=1, max_value=1000, q=1)
@LogUniformParameter("range_", min_value=1.01, max_value=1000)
@ChoiceParameter("offset_factor", choices=[0, 1, 10, 100])
@BooleanParameter("squared_loss")
@NoOptimizationParameter("linear_weighting")
[docs]class RMM2Node(RegularizedClassifierBase): """ Classify with 2-norm SVM relaxation (b in target function) for BRMM The balanced relative margin machine (BRMM) is a modification of the original relative margin machine (RMM). The details to this algorithm can be found in the given reference. This node extends a successive over relaxation algorithm for adaption on new data with some variants. For further details, have a look at the given reference, the *reduced_descent* method which is an elemental processing step and the *_inc_train* method, which uses the status of the algorithm as a warm start. **References** ========= ================================================== main ========= ================================================== author Krell, M. M. and Feess, D. and Straube, S. title `Balanced Relative Margin Machine - The Missing Piece Between FDA and SVM Classification <http://dx.doi.org/10.1016/j.patrec.2013.09.018>`_ journal Pattern Recognition Letters publisher Elsevier doi 10.1016/j.patrec.2013.09.018 year 2014 ========= ================================================== **Parameters** Most parameters are already included into the :class:`RegularizedClassifierBase <pySPACE.missions.nodes.classification.base.RegularizedClassifierBase>`. :random: *Numerical recipes* suggests to randomize the order of alpha. *M&M* suggest to sort the alpha by their magnitude. (*optional, default: False*) :omega: Descent factor of optimization algorithm. Should be between 0 and 2! *Numerical recipes* uses 1.3 and *M&M* choose 1.0. (*optional, default: 1.0*) :version: Using the *matrix* with the scalar products or using only the *samples* and track changes in w and b for fast calculations. Both versions give totally the same result but they are available for comparison. Samples is mostly a bit faster. For kernel usage only *matrix* is possible. (*optional, default: "samples"*) :reduce_non_zeros: In the inner loops, indices are rejected, if they loose there support. (*optional, default: True*) :linear_weighting: Add a linear weighting in the loss term. We use the index as weight and normalize it to have a total sum of 1. (*optional, default: False*) :calc_looCV: Calculate the leave-one-out metrics on the training data (*optional, default: False*) :range\_: Upper bound for the prediction value before its 'outer loss' is punished with `outer_complexity`. Using this parameter (with value >1) activates the RMM. (*optional, default: numpy.inf*) :outer_complexity: Cost factor for to high values in classification. (see `range\_`) (*optional, default: `complexity`*) :offset_factor: Reciprocal weight, for offset treatment in the model :0: Use no offset :1: Normal affine approach from augmented feature vectors :high: Only small punishment of offset, enabling larger offsets (*danger of numerical instability*) If 0 is used, the offset b is set to zero, otherwise it is used via augmented feature vectors with different augmentation factors. The augmentation value corresponds to 1/*offset_factor*, where 1/0 corresponds to infinity. (*optional, default: 1*) :squared_loss: Use L2 loss (optional) instead of L1 loss (default). (*optional, default: False*) In the implementation we do not use alpha but dual_solution for the variables of the dual optimization problem, which is optimized with this algorithm. In the RMM case dual solution weights are concatenated. The RMM algorithm was constructed in the same way as in the mentioned references. As a stopping criterion we use the maximum change to be less than some tolerance. **Exemplary Call** .. code-block:: yaml - node : 2RMM parameters : complexity : 1.0 weight : [1,3] debug : True store : True class_labels : ['Standard', 'Target'] :input: FeatureVector :output: PredictionVector :Author: Mario Michael Krell (mario.krell@dfki.de) :Created: 2012/06/27 """
[docs] def __init__(self, random=False, omega=1.0, max_iterations=numpy.inf, version="samples", reduce_non_zeros=True, linear_weighting=False, calc_looCV=False, range_=numpy.inf, outer_complexity=None, offset_factor=1, squared_loss=False, co_adaptive=False, co_adaptive_index=numpy.inf, history_index=1, **kwargs): self.old_difference = numpy.inf # instead of lists, arrays are concatenated in training super(RMM2Node, self).__init__(use_list=False, **kwargs) if not(version in ["samples", "matrix"]): self._log("Version %s is not available. Default to 'samples'!"%version, level=logging.WARNING) version = "samples" if not self.kernel_type == 'LINEAR' and not version == "matrix": self._log( "Version %s is not available for nonlinear kernel. " % version + "Default to 'matrix'!", level=logging.WARNING) version = "matrix" range_ = numpy.float64(range_) if outer_complexity is None: outer_complexity = self.complexity # factor to only use relation between complexities # instead of outer_complexity complexity_correction = 1.0 * outer_complexity / self.complexity if self.tolerance > 0.1 * min(self.complexity, outer_complexity): self.set_permanent_attributes(tolerance=0.1*min(self.complexity, outer_complexity)) warnings.warn("Using to high tolerance. Reduced to 0.1 times " + "complexity (tolerance=%f)." % self.tolerance) # mapping of the binary variables to {0,1} if not squared_loss: squ_factor = 0.0 else: squ_factor = 1.0 if version == "matrix": M = None else: M = [] self.set_permanent_attributes(random=random, omega=omega, max_iterations_factor=max_iterations, max_sub_iterations=numpy.inf, iterations=0, sub_iterations=0, version=version, M=M, reduce_non_zeros=reduce_non_zeros, calc_looCV=calc_looCV, offset_factor=offset_factor, squ_factor=squ_factor, num_samples=0, outer_complexity=outer_complexity, complexity_correction= complexity_correction, linear_weighting=linear_weighting, range=range_, # RBF for zero training b=0, w=None, bi=[], ci=[], dual_solution=None, one_class=False, co_adaptive=co_adaptive, co_adaptive_index=co_adaptive_index, history_index=history_index )
[docs] def _execute(self, x): """ Executes the classifier on the given data vector in the linear case prediction value = <w,data>+b """ if self.kernel_type == 'LINEAR': if self.w is None: self.w = numpy.zeros(x.shape[1], dtype=numpy.float) return super(RMM2Node, self)._execute(x) data = x.view(numpy.ndarray) data = data[0, :] prediction = self.b for i in range(self.num_samples): dual = self.dual_solution[i][0] - self.dual_solution[i][1] if not dual == 0: prediction += dual*self.kernel_func(data, self.samples[i])*self.bi[i] # one-class multinomial handling of REST class if "REST" in self.classes and self.multinomial: if "REST" == self.classes[0]: label = self.classes[1] elif "REST" == self.classes[1]: label = self.classes[0] prediction *= -1 # Look up class label # prediction --> {-1,1} --> {0,1} --> Labels elif prediction > 0: label = self.classes[1] else: label = self.classes[0] return PredictionVector(label=label, prediction=prediction, predictor=self)
[docs] def _stop_training(self, debug=False): """ Train the SVM with the SOR algorithm on the collected training data """ self._log("Preprocessing of SOR SVM") self._log("Instances of Class %s: %s, %s: %s" % (self.classes[0], self.labels.count(self.classes.index(self.classes[0])), self.classes[1], self.labels.count(self.classes.index(self.classes[1])))) ## initializations of relevant values and objects ## self.calculate_weigts_and_class_factors() self.num_samples = len(self.samples) self.max_iterations = self.max_iterations_factor*self.num_samples # RMM variable self.dual_solution = numpy.zeros((self.num_samples, 2)) if self.version == "matrix" and self.kernel_type == "LINEAR": self.A = numpy.array(self.samples) self.D = numpy.diag(self.bi) self.M = dot(self.D, dot(dot(self.A, self.A.T) + self.offset_factor * numpy.ones((self.num_samples, self.num_samples)), self.D)) elif self.version == "samples" and self.kernel_type == "LINEAR": if not self.squ_factor: self.M = \ [1.0/(numpy.linalg.norm(sample)**2.0 + self.offset_factor) for sample in self.samples] else: self.M = [(1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i])), 1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i] * self.complexity_correction))) for i in range(self.num_samples)] # changes of w and b are tracked in the samples version self.w = numpy.zeros(self.dim, dtype=numpy.float) else: ## iterative calculation of M self.M = numpy.zeros((self.num_samples, self.num_samples)) for i in range(self.num_samples): bi = self.bi[i] si = self.samples[i] for j in range(self.num_samples): if i > j: self.M[i][j] = self.M[j][i] else: self.M[i][j] = bi * self.bi[j] * ( self.kernel_func(si, self.samples[j]) + self.offset_factor) ## SOR Algorithm ## self.iteration_loop(self.M) self.classifier_information["~~Solver_Iterations~~"] = self.iterations try: self.classifier_information["~~offset~~"] = self.b self.classifier_information["~~w0~~"] = self.w[0] self.classifier_information["~~w1~~"] = self.w[1] except: pass ## calculate leave one out metrics ## if self.calc_looCV: self.looCV()
[docs] def looCV(self): """ Calculate leave one out metrics """ # remember original solution optimal_w = copy.deepcopy(self.w) optimal_b = copy.deepcopy(self.b) optimal_dual_solution = copy.deepcopy(self.dual_solution) # preparation of sorting sort_dual = self.dual_solution[:, 0] + self.dual_solution[:, 1] # sort indices --> zero weights do not need any adaption and # low weights are less relevant for changes sorted_indices = map(list,[numpy.argsort(sort_dual)])[0] sorted_indices.reverse() prediction_vectors = [] using_initial_solution = True for index in sorted_indices: d_i = self.dual_solution[index, 0]-self.dual_solution[index, 1] # delete each index from the current observation if d_i == 0 and using_initial_solution: # no change in classifier necessary pass else: # set weight to zero and track the corresponding changes self.reduce_dual_weight(index) # reiterate till convergence but skip current index temp_iter = self.iterations self.iteration_loop(self.M, reduced_indices=[index]) self.iterations += temp_iter using_initial_solution = False prediction_vectors.append((self._execute( numpy.atleast_2d(self.samples[index])), self.classes[self.labels[index]])) self.loo_metrics = BinaryClassificationDataset.calculate_metrics( prediction_vectors, ir_class=self.classes[1], sec_class=self.classes[0]) #undo changes self.b = optimal_b self.w = optimal_w self.dual_solution = optimal_dual_solution
[docs] def reduce_dual_weight(self,index): """ Change weight at index to zero """ if self.version == "sample": old_weight = self.dual_solution[index] old_weight = old_weight[0] - old_weight[1] self.update_classification_function(delta=-old_weight, index=index) else: # the matrix algorithm doesn't care for the old weights pass self.dual_solution[index] = [0, 0]
[docs] def calculate_weigts_and_class_factors(self): """ Calculate weights in the loss term and map label to -1 and 1 """ # Weights for soft margin (dependent on class or time) self.ci = [] # Mapping from class to value of classifier (-1,1) self.bi = [] self.num_samples = len(self.samples) for label in self.labels: self.append_weights_and_class_factors(label) #care for zero sum if self.linear_weighting: c = 1/(self.num_samples*(self.num_samples+1)*0.5) self.ci = [c*value for value in self.ci]
[docs] def append_weights_and_class_factors(self, label): """ Mapping between labels and weights/class factors The values are added to the corresponding list. This is done in a separate function, since it is also needed for adaption. """ self.bi.append(label*2-1) if label == 0: if self.linear_weighting: self.ci.append(self.complexity*self.weight[0]*self.num_samples) else: self.ci.append(self.complexity*self.weight[0]) else: if self.linear_weighting: self.ci.append(self.complexity*self.weight[1]*self.num_samples) else: self.ci.append(self.complexity*self.weight[1])
[docs] def iteration_loop(self, M, reduced_indices=[]): """ The algorithm is calling the :func:`reduced_descent<pySPACE.missions.nodes.classifiers.ada_SVM.SORSVMNode.reduced_descent>` method in loops over alpha In the first step it uses a complete loop over all components of alpha and in the second inner loop only the non zero alpha are observed till come convergence criterion is reached. *reduced_indices* will be skipped in observation. """ ## Definition of tracking variables ## self.max_iterations = self.max_iterations_factor*self.num_samples self.iterations = 0 self.difference = numpy.inf # recalculate factor in case of changing complexities self.complexity_correction = \ 1.0 * self.outer_complexity / self.complexity # initial call on all samples self.total_descent(self.dual_solution, M, reduced_indices) ## outer iteration loop ## while self.difference > self.tolerance and \ self.iterations < self.max_iterations: # inner iteration loop only on active vectors/alpha (non zero) ## self.sub_iterations = 0 # sorting or randomizing non zero indices # arrays are mapped to lists for later iteration sort_dual = self.dual_solution[:, 0] + self.dual_solution[:, 1] num_non_zeros = len(map(list, sort_dual.nonzero())[0]) max_values = len(map(list, numpy.where(sort_dual == sort_dual.max()))[0]) # sort the entries of the current dual # and get the corresponding indices sorted_indices = map(list, [numpy.argsort(sort_dual)])[0] if num_non_zeros == 0 or num_non_zeros == max_values: # skip sub iteration if everything is zero or maximal active_indices = [] else: active_indices = sorted_indices[-num_non_zeros:-max_values] for index in reduced_indices: try: active_indices.remove(index) except ValueError: pass if self.random: random.shuffle(active_indices) #min(self.max_iterations_factor, 200) * \ self.max_sub_iterations = self.max_iterations_factor * \ len(active_indices) * 0.5 while (self.difference > self.tolerance and self.sub_iterations < self.max_sub_iterations and self.iterations < self.max_iterations): ## iteration step ## self.reduced_descent(self.dual_solution, M, active_indices) ## outer loop ## if not (self.iterations < self.max_iterations): break # For the first run, the previous reduced descent is skipped # but for adaptivity and looCV it is important # to have first the small loop, since normally, this is sufficient. # Furthermore having it at the end simplifies the stop criterion self.max_sub_iterations = numpy.inf self.total_descent(self.dual_solution, M, reduced_indices) ## Final solution ## # in the case without kernels, we have to calculate the result # by hand new for each incoming sample if self.version == "matrix": dual_diff = self.dual_solution[:, 0] - self.dual_solution[:, 1] if self.offset_factor: # else: keep b fixed and do NOT change self.b = self.offset_factor * dot(dual_diff, self.bi) if self.kernel_type == "LINEAR": self.w = numpy.array([dot(dot(self.A.T, self.D), dual_diff)]).T elif self.version == "samples" and self.kernel_type == "LINEAR": # w and b are pre-computed in the loop # transferring of 1-d array to 2d array # self.w = numpy.array([self.w]).T pass
[docs] def reduced_descent(self,current_dual, M, relevant_indices): """ Basic iteration step over a set of indices, possibly subset of all The main principle is to make a descent step with just one index, while fixing the other dual_solutions. The main formula comes from *M&M*: .. math:: d = \\alpha_i - \\frac{\\omega}{M[i][i]}(M[i]\\alpha-1) \\text{with } M[i][j] = y_i y_j(<x_i,x_j>+1) \\text{and final projection: }\\alpha_i = \\max(0,\\min(d,c_i)). Here we use c for the weights for each sample in the loss term, which is normally complexity times corresponding class weight. y is used for the labels, which have to be 1 or -1. In the *sample* version only the diagonal of M is used. The sum with the alpha is tracked by using the classification vector w and the offset b. .. math:: o = \\alpha_i d = \\alpha_i - \\frac{\\omega}{M[i][i]}(y_i(<w,x_i>+b)-1) \\text{with projection: }\\alpha_i = \\max(0,\\min(d,c_i)), b=b+(\\alpha_i-o)y_i w=w+(\\alpha_i-o)y_i x_i """ self.irrelevant_indices = [] self.difference = 0 if self.version == "matrix": dual_diff = current_dual[:, 0] - current_dual[:, 1] for i in relevant_indices: if not (self.sub_iterations < self.max_sub_iterations and self.iterations < self.max_iterations): break beta = False old_dual = current_dual[i] ### Main Function ### ### elemental update step of SOR algorithm ### dual_1 = current_dual[i,0] dual_2 = current_dual[i,1] if self.version == "samples": xi = self.samples[i] bi = self.bi[i] fi = bi * (dot(xi.T, self.w) + self.b) elif self.version == "matrix": fi = dot(M[i], dual_diff) if self.one_class: # correction if offset should not be changed fi += self.bi[i] * self.b s1 = self.squ_factor / (2 * self.ci[i]) * dual_1 s2 = self.squ_factor / (2 * self.ci[i] * self.complexity_correction) * dual_2 if dual_1 > 0: # alpha update old_dual = dual_1 if self.version == "matrix": x = old_dual - \ self.omega / (M[i][i] + self.squ_factor / (2 * self.ci[i])) \ * (fi - 1 + s1) elif self.version == "samples" and self.squ_factor: x = old_dual - self.omega * M[i][0] * (fi - 1 + s1) elif self.version == "samples" and not self.squ_factor: x = old_dual - self.omega * M[i] * (fi - 1) elif dual_2 > 0: # beta update old_dual = dual_2 if self.version == "matrix": x = old_dual - \ self.omega / (M[i][i] + self.squ_factor / (2 * self.ci[i] * self.complexity_correction)) \ * (self.range - fi + s2) elif self.version == "samples" and self.squ_factor: x = old_dual - self.omega * M[i][1] \ * (self.range - fi + s2) elif self.version == "samples" and not self.squ_factor: x = old_dual - self.omega * M[i] \ * (self.range - fi) beta = True else: # both values are zero and we need to check, #if one gets active alpha or beta dual coefficient old_dual = 0 # just used to have the same formulas as above if self.version == "matrix": x = old_dual - \ self.omega/(M[i][i] + self.squ_factor / (2 * self.ci[i])) \ * (fi - 1 + s1) elif self.version == "samples" and self.squ_factor: x = old_dual - self.omega * M[i][0] * (fi - 1 + s1) elif self.version == "samples" and not self.squ_factor: x = old_dual - self.omega * M[i] * (fi - 1) # no update of alpha but update of beta (eventually zero update) if not x > 0: beta = True if self.version == "matrix": x = old_dual - \ self.omega / (M[i][i] + self.squ_factor / (2 * self.ci[i] * self.complexity_correction)) \ * (self.range - fi + s2) elif self.version == "samples" and self.squ_factor: x = old_dual - self.omega * M[i][1] * ( self.range - fi + s2) elif self.version == "samples" and not self.squ_factor: x = old_dual - self.omega * M[i] * ( self.range - fi + s2) # map dual solution to the interval [0,C] in L1 case or # just make it positive in the L2 case # current_dual[i]=self.project(x,index=i) if x <= 0: self.irrelevant_indices.append(i) current_dual[i] = [0, 0] elif not beta and not self.squ_factor: current_dual[i, 0] = min(x, self.ci[i]) elif beta and not self.squ_factor: current_dual[i, 1] = min(x, self.ci[i] * self.complexity_correction) elif not beta and self.squ_factor: current_dual[i, 0] = x elif beta and self.squ_factor: current_dual[i, 1] = x if self.version == "matrix": old_diff = dual_diff[i] dual_diff[i] = current_dual[i, 0] - current_dual[i, 1] delta = dual_diff[i] - old_diff # update w and b in samples case if self.version == "samples": delta = (current_dual[i, 0] + current_dual[i, 1] - old_dual) * bi # for beta: difference needed if beta: delta = -delta # update classification function parameter w and b # self.update_classification_function(delta=delta, index=i) self.b += self.offset_factor * delta self.w += delta * xi current_difference = numpy.abs(delta) # if current_difference > self.difference: # self.difference = current_difference self.difference += current_difference self.sub_iterations += 1 self.iterations += 1 if self.reduce_non_zeros: for index in self.irrelevant_indices: relevant_indices.remove(index) if self.random: random.shuffle(relevant_indices)
[docs] def update_classification_function(self, delta, index): """ update classification function parameter w and b """ bi=self.bi[index] self.b = self.b + self.offset_factor * delta * bi self.w = self.w + delta * bi * self.samples[index]
[docs] def project(self,value,index): """ Projection method of *soft_relax* """ if value<=0: self.irrelevant_indices.append(index) return 0 else: return min(value, self.ci[index])
[docs] def total_descent(self,current_dual,M,reduced_indices=[]): """ Different sorting of indices and iteration over all indices .. todo:: check, which parameters are necessary """ if not self.random: sorted_indices = range(self.num_samples) # if current_dual.ndim == 2: # sort_dual = current_dual[:,0]+current_dual[:,1] # else: # sort_dual = current_dual # # highest first # sort_dual = sort_dual * -1.0 # # sort the entries of the current dual and get the corresponding indices # sorted_indices = map(list,[numpy.argsort(sort_dual)])[0] # #sorted_indices.reverse() else: sorted_indices = range(self.num_samples) random.shuffle(sorted_indices) for index in reduced_indices: sorted_indices.remove(index) self.reduced_descent(current_dual, M,sorted_indices)
[docs] def _inc_train(self,data,label): """ Warm Start Implementation by Mario Michael Krell The saved status of the algorithm, including the Matrix M, is used as a starting point for the iteration. Only the problem has to be lifted up one dimension. """ #one vs. REST case if "REST" in self.classes and not label in self.classes: label = "REST" # one vs. one case if not self.multinomial and len(self.classes) == 2 and \ not label in self.classes: return self._train(data, label) # here it is important to use the mapped label self.append_weights_and_class_factors(self.labels[-1]) if self.linear_weighting: self.ci[-1] = self.ci[-1] * 2 / ( self.num_samples * (self.num_samples + 1)) scaling = self.num_samples/(self.num_samples+2) self.ci = [value * scaling for value in self.ci] self.b *= scaling self.w *= scaling self.num_samples += 1 # The new example is at first assumed to be irrelevant (zero weight). if self.dual_solution is None: self.dual_solution = numpy.zeros((1,2)) else: self.dual_solution = numpy.append(self.dual_solution, [[0.0, 0.0]], axis=0) # update of the relevant matrix if self.version == "matrix": # very inefficient!!! M = self.M self.M = numpy.zeros((self.num_samples,self.num_samples)) if not M is None: self.M[:-1, :-1] = M del M bj = self.bi[-1] d = self.samples[-1] # calculation of missing entries of matrix M by hand for i in range(self.num_samples): if self.kernel_type == "LINEAR": # y_i*y_j*(<x_i,x_j>+1) self.M[-1, i] = bj*self.bi[i]*( self.kernel_func(d, self.samples[i])+self.offset_factor) self.M[i, -1] = self.M[-1, i] else: raise NotImplementedError elif self.version == "samples": # very efficient :) if not self.squ_factor: self.M.append(1.0/(numpy.linalg.norm(self.samples[-1])**2.0 + self.offset_factor)) else: self.M.append( (1 / (numpy.linalg.norm(self.samples[-1])**2.0 + self.offset_factor + 1 / (2 * self.ci[-1])), 1 / (numpy.linalg.norm(self.samples[-1])**2.0 + self.offset_factor + 1 / (2 * self.ci[-1] * self.complexity_correction)) )) prediction = self._execute(data) if (not prediction.label == label or abs(prediction.prediction) < 1 or (self.range and abs(prediction.prediction) > self.range)): if self.co_adaptive: self.update_data() if self.version == "matrix": # relevant parameters for getting w and b # updates should be done using old variables self.A = numpy.array(self.samples) self.D = numpy.diag(self.bi) temp_iter = self.iterations self.iteration_loop(self.M) self.iterations += temp_iter
[docs] def reiterate(self, parameters): """ Change the parameters in the model and reiterate """ changed_c = False for parameter in parameters: p = parameter.strip("~") if p.startswith("log"): value = 10 ** parameters[parameter] p = p[4:] else: value = parameters[parameter] if p in ["range", "R", "Range", "range_"]: self.range = value elif p in ["complexity","C", "Complexity"] \ and not value == self.complexity: changed_c = True difference = abs(self.complexity-value) self.complexity = value if self.tolerance > 0.1 * difference: self.set_permanent_attributes(tolerance=0.1*difference) # # fix ci # scaling = 1.0 * self.complexity / old_complexity # self.ci = [value * scaling for value in self.ci] elif p in ["w1", "W1", "weight1", "Weight1"] \ and not value == self.weight[0]: changed_c = True difference = abs(self.weight[0]-value) if self.tolerance > 0.1 * difference: self.set_permanent_attributes(tolerance=0.1*difference) self.weight[0] = value elif p in ["w2", "W2", "weight2", "Weight2"] \ and not value == self.weight[1]: changed_c = True difference = abs(self.weight[1]-value) if self.tolerance > 0.1 * difference: self.set_permanent_attributes(tolerance=0.1*difference) self.weight[1] = value # recalculate ci if changed_c: self.calculate_weigts_and_class_factors() # fix M if self.version == "samples" and self.kernel_type == "LINEAR" \ and self.squ_factor and changed_c: self.M = [(1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i])), 1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i] * self.complexity_correction))) for i in range(self.num_samples)] self.iteration_loop(self.M)
[docs] def append_sample(self, sample): super(RMM2Node, self).append_sample(sample) if self.co_adaptive: try: hist_data = copy.deepcopy(sample.history[self.history_index-1]) except IndexError: self._log("No history data available for classifier! " + "Co-adaptivity is turned off.", level=logging.CRITICAL) self.co_adaptive = False if not "hist_samples" in self.__dict__: self.hist_samples = [] self.hist_samples.append(hist_data)
[docs] def update_data(self): """ Recalculate w, samples and M """ # initialize w self.w = numpy.zeros(self.dim, dtype=numpy.float) for i in range(self.num_samples): # recalculate sample new_sample = self.get_previous_execute( data=self.hist_samples[i], number=self.co_adaptive_index) data_array = new_sample.view(numpy.ndarray) self.samples[i] = data_array[0, :] # recalculate w self.w += data_array[0, :] * self.bi[i] * \ (self.dual_solution[i][0] - self.dual_solution[i][1]) if not self.squ_factor: self.M = \ [1.0/(numpy.linalg.norm(sample)**2.0 + self.offset_factor) for sample in self.samples] else: self.M = [(1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i])), 1 / (numpy.linalg.norm(self.samples[i])**2.0 + self.offset_factor + 1 / (2 * self.ci[i] * self.complexity_correction))) for i in range(self.num_samples)]
# =========================================================================== @BooleanParameter("normalize_C") @QNormalParameter("range", mu=2, sigma=1, q=1)
[docs]class RMM1ClassifierNode(RegularizedClassifierBase): """ Classify with 1-Norm SVM and relative margin Implementation via Simplex Algorithms for exact solutions. It is important, that the data is reduced and has not more then 2000 features. This algorithm is an extension of the original RMM as outlined in the reference. **References** ========= ================================================== main ========= ================================================== author Krell, M. M. and Feess, D. and Straube, S. title `Balanced Relative Margin Machine - The Missing Piece Between FDA and SVM Classification <http://dx.doi.org/10.1016/j.patrec.2013.09.018>`_ journal Pattern Recognition Letters publisher Elsevier doi 10.1016/j.patrec.2013.09.018 year 2013 ========= ================================================== **Parameters** :complexity: Complexity sets the weighting of punishment for misclassification in comparison to generalizing classification from the data. Value in the range form 0 to infinity. (*optional, default: 1*) :outer_complexity: Outer complexity sets the weighting of punishment being outside the *range* in comparison to generalizing classification from the data and the misclassification above. Value in the range form 0 to infinity. By default it uses the outer complexity. For using infinity, use numpy.inf, a string containing *inf* or a negative value. (*recommended, default: complexity_value*) :weight: Defines parameter for class weights. I is an array with two entries. Set the parameter C of class i to weight*C. (*optional, default: [1,1]*) :range: Defines constraint radius for the outer boarder of the classification. Going to infinity, this classifier will be identical to the 1 Norm SVM. This parameter should be always greater then one. Going to one, the classifier will be a variant of the Regularized Linear Discriminant analysis. (*optional, default: 2*) :class_labels: Sets the labels of the classes. This can be done automatically, but setting it will be better, if you want to have similar predictions values for classifiers trained on different sets. Otherwise this variable is built up by occurrence of labels. Furthermore the important class (ir_class) should get the second position in the list, such that it gets higher prediction values by the classifier. (*recommended, default: []*) :debug: If *debug* is True one gets additional output concerning the classification. (*optional, default: False*) :store: Parameter of super-class. If *store* is True, the classification vector is stored as a feature vector. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : 1RMM parameters : complexity : 1.0 weight : [1,2.5] debug : False store : True range : 2 :input: FeatureVector :output: PredictionVector :Author: Mario Krell (Mario.krell@dfki.de) :Revised: 2010/04/09 """
[docs] def __init__(self, normalize_C=False, outer_complexity=None, range=2, **kwargs): super(RMM1ClassifierNode, self).__init__(**kwargs) # Check if old parameter name 'sensitivity' was used instead of range if 'sensitivity' in kwargs: range = kwargs['sensitivity'] self._log("Please use parameter 'range' instead of 'sensitivity'" + " in 1RMM Node!", level=logging.ERROR) if outer_complexity is None: self._log("No outer complexity given, set to complexity parameter!", level=logging.WARNING) outer_complexity = self.complexity elif outer_complexity < 0 or outer_complexity == numpy.inf or \ (type(outer_complexity) == str and "inf" in outer_complexity): outer_complexity = None self.set_permanent_attributes(outer_complexity=outer_complexity, normalize_C=normalize_C, range=range)
# Here we define a function that defines the matrix, so that we can # insert it as an argument for the solver and optimize it
[docs] def create_problem_matrix(self, n, e): # First, the declaration of variables # We only need to define within the function those variables which we # don't need in the _stop_training() function ci = [] co = [] bi = numpy.array(self.bi) for label in self.labels: if label == 0: if not self.normalize_C: ci.append(1.0/(1.0*self.complexity*self.weight[0])) co.append(1.0/(1.0*self.outer_complexity*self.weight[0])) else: ci.append(1.0/(1.0*self.complexity*self.weight[0]/n)) co.append(1.0/(1.0*self.outer_complexity*self.weight[0]/n)) else: if not self.normalize_C: ci.append(1.0/(1.0*self.complexity*self.weight[1])) co.append(1.0/(1.0*self.outer_complexity*self.weight[1])) else: ci.append(1.0/(1.0*self.complexity*self.weight[1]/n)) co.append(1.0/(1.0*self.outer_complexity*self.weight[1]/n)) if not self.outer_soft_margin: co = n*[0] # Now we start defining the matrix by chunks, as follows # # # # dim dim 1 n # ********* ********* * d**-C*...*** # ********* ********* * *i*-I*...*** # n ***(-D)** ***(D)*** B . # ********* ********* I . # ********* ********* * ****...**a** # ********* ********* * ****...***g* # # dim dim 1 n # ********* ********* * d**-C*...*** # ********* ********* * *i*-O*...*** # n ***(D)*** ***(-D)** B . # ********* ********* I . # ********* ********* * ****...**a** # ********* ********* * ****...***g* # # -1**************...****************** # *-1*************...****************** # **-1************...****************** # . # . # 2dim . # +n . # ****************-1***+****...******** # *****************-1**+****...******** # ******************-1*+****...******** # *******************-1+****...******** # ********************+-1***...******** # ********************+*-1**...******** # ********************+**-1*...******** # . # . # ********************+****...******-1* # ********************+****...*******-1 # # Note : there is a skipped vertical row, marked by the "+", # at 2*dim + 1 in the matrix. output_matrix = numpy.zeros((2*self.dim+3*n,2*self.dim+n+1), 'd') output_matrix[:n, :self.dim] = \ - numpy.multiply(numpy.array(self.samples).T, bi).T output_matrix[n:2*n, :self.dim] = - output_matrix[:n, :self.dim] output_matrix[:n, self.dim:2*self.dim] = output_matrix[n:2*n, :self.dim] output_matrix[n:2*n, self.dim:2*self.dim] = output_matrix[:n, :self.dim] output_matrix[:n, 2*self.dim] = - numpy.array(self.bi) output_matrix[n:2*n, 2*self.dim] = output_matrix[:n, 2*self.dim] output_matrix[:n, 2*self.dim+1:2*self.dim+n+1] = - numpy.diag(ci) output_matrix[n:2*n, 2*self.dim+1:2*self.dim+n+1] = - numpy.diag(co) output_matrix[2*n:2*self.dim+2*n, :2*self.dim] = - numpy.diag(e+e) output_matrix[2*self.dim+2*n:, 2*self.dim+1:] = - numpy.diag(n*[1.0]) return output_matrix
[docs] def _stop_training(self, debug = False): """ Finish the training, i.e. train the SVM. This makes the same as the 1-Norm RMM, except that there are additional restrictions pushing the classification into two closed intervals instead of two open. At both ends there is the same kind of soft Margin.""" self._log("Preprocessing of 1-Norm RMM") self._log("Instances of Class %s: %s, %s: %s" % (self.classes[0], self.labels.count(self.classes.index(self.classes[0])), self.classes[1], self.labels.count(self.classes.index(self.classes[1])))) # Dimension of the data self.num_samples = len(self.samples) n = self.num_samples e = self.dim*[1.0] if self.outer_complexity is None: self.outer_soft_margin = False self.outer_complexity = self.complexity # arbitrary value else: self.outer_soft_margin = True # Mapping from class to value of classifier self.bi = [] for label in self.labels: #if self.classes.index(label) == 0: if label == 0: self.bi.append(-1) else: self.bi.append(1) # Import the necessary optimization module. try: import cvxopt import cvxopt.solvers except ImportError: raise Exception("Using the 1-Norm-SVM requires the Python CVXOPT module.") self._log("optimization preparation") # Target function (w+ + w- + \sum t_i) # Weighting is done in the inequalities c = cvxopt.matrix(e+e+[0]+n*[1.0]) # First the n classification restrictions # then the n restrictions for the outer margin # and finally the restrictions for positive variables h = numpy.hstack((-numpy.ones(n,'d'), self.range*numpy.ones(n,'d'), numpy.zeros(2*self.dim+n))) h = cvxopt.matrix(h) #Suppress printing of GLPK and cvxopt if not self.debug: cvxopt.solvers.options['LPX_K_MSGLEV'] = 0 cvxopt.solvers.options['show_progress'] = self.debug # try: # import mosek # cvxopt.solvers.options['MOSEK'] = {mosek.iparam.log: 0} # except: # warnings.warn('No Mosek import possible!') # Do the optimization # (-c_i x_i,c_i x_i, -c_i,-e_i) * (w+,w-,b,t) \leq -1 # (c_i x_i,-c_i x_i, c_i,-e_i) * (w+,w-,b,t) \leq R # (-w+,-w-,-t) \leq 0 self._log("Classifier under construction") if not self.max_time is None: cvxopt.solvers.options['LPX_K_TMLIM'] = int(self.max_time) self.sol = cvxopt.solvers.lp( c, cvxopt.matrix(self.create_problem_matrix(n, e)), h, solver='glpk') self._log("Construction complete") model = [] self.calculate_classification_vector(model) if self.debug: self.calculate_slack_variables(model) inner_dual_sol = self.sol['z'][:n] dual_inner_weights = numpy.multiply(numpy.array(inner_dual_sol).T, numpy.array(self.bi))[0] self.max_inner_weight = [] self.max_inner_weight.append(-dual_inner_weights.min()) self.max_inner_weight.append(dual_inner_weights.max()) if self.debug: print "Maximal used inner weights, depending on class:" print self.max_inner_weight outer_dual_sol = self.sol['z'][n:2*n] dual_outer_weights = numpy.multiply(numpy.array(outer_dual_sol).T, numpy.array(self.bi))[0] self.max_outer_weight = [] self.max_outer_weight.append(-dual_outer_weights.min()) self.max_outer_weight.append(dual_outer_weights.max()) if self.debug: print "Maximal used inner weights, depending on class:" print self.max_outer_weight self.max_weight = [] self.max_weight.append(max(self.max_outer_weight[0],self.max_inner_weight[0])) self.max_weight.append(max(self.max_outer_weight[1],self.max_inner_weight[1])) if self.debug: print "Maximal used weights, depending on class:" print self.max_weight print "RMM 1 parameter results:" self.print_variables() print str(self.outer_margin), " vectors of ", \ str(self.num_samples), \ " have been used for the outer margin and " numpy.set_printoptions(edgeitems=100, linewidth=75, precision=2, suppress=True, threshold=1000) print self.to, " are the outer Slack variables." numpy.set_printoptions(edgeitems=3, infstr='Inf', linewidth=75, nanstr='NaN', precision=8, suppress=False, threshold=1000) print "The number of support vectors(", self.num_sv, \ ") can be split into", self.num_osv, " and ", self.num_isv, \ "for the outer and inner margin support vectors." self.delete_training_data()
# algorithms to stop process after a certain time # from http://pguides.net/python-tutorial/python-timeout-a-function/ # Mainly Code copy from 1SVM
[docs] def get_solution_with_timeout(self,c,n,e,h): def timeout_handler(signum, frame): raise TimeoutException() # Import the necessary optimization module. try: import cvxopt import cvxopt.solvers except ImportError: raise Exception("Using the 1-Norm-RMM requires the Python CVXOPT module.") old_handler = signal.signal(signal.SIGALRM, timeout_handler) signal.alarm(self.max_time) # trigger alarm in self.time seconds try: self.sol = cvxopt.solvers.lp( c, cvxopt.matrix(self.create_problem_matrix(n,e)), h, solver='glpk') except TimeoutException: self._log("Classifier construction interrupted!", level=logging.CRITICAL) self.sol = None return finally: signal.signal(signal.SIGALRM, old_handler) signal.alarm(0) self._log("Finished classifier construction successfully.") return
[docs] def calculate_slack_variables(self,model): """ Calculate slack variables from the given SVM model """ self.t=self.sol['x'][2*self.dim+1:] # differentiation between inner and outer soft margin vectors self.num_sv = 0 self.num_osv = 0 self.num_isv = 0 # count the margin vectors lying not at the border of the margin # but inside self.inner_margin = 0 self.outer_margin = 0 self.ti = [] self.to = [] for i in range(self.num_samples): p = 2*(self.labels[i]-0.5)*((numpy.dot(self.w.T, self.samples[i]))[0]+self.b) if p >= self.range: self.ti.append(0) self.num_sv += 1 self.num_osv += 1 if p < 1e-5+self.range: self.to.append(0) else: self.to.append(p-self.range) self.outer_margin += 1 elif p > 1: self.ti.append(0) self.to.append(0) else: self.to.append(0) self.num_sv += 1 self.num_isv += 1 if (1-p) < 1e-9: self.ti.append(0) else: self.ti.append(1-p) self.inner_margin += 1
[docs] def calculate_classification_vector(self, model): """ Copy from Norm1ClassifierNode due to avoid cross importing """ w = numpy.zeros((self.dim,1)) self.num_retained_features = 0 try: self.b = float(self.sol['x'][2*self.dim]) self.w = numpy.array(self.sol['x'][:self.dim] - self.sol['x'][self.dim:2*self.dim]) except TypeError: self.b = 0 self.w = w self._log('Classification failed. C=%f'%self.complexity, level=logging.CRITICAL) self.features = FeatureVector(numpy.atleast_2d(self.w.T).astype( numpy.float64), self.feature_names) try: wf = [] for i,feature in enumerate(self.feature_names): if not float(self.w[i]) == 0: wf.append((float(self.w[i]), feature)) wf.sort(key=lambda (x, y): abs(x)) if not len(wf) == 0 : w = numpy.array(wf,dtype='|S200') self.num_retained_features = len(wf) except ValueError : print 'w could not be converted.' except IndexError : print 'There are more feature names than features. Please check your feature generation and input data.' self.b = 0 w = numpy.zeros(self.dim) self.w = w self.classifier_information["~~Num_Retained_Features~~"] = \ self.num_retained_features self.print_w = w
# ===========================================================================
[docs]class SVR2BRMMNode(LibSVMClassifierNode): """ Simple wrapper around epsilon-SVR with BRMM parameter mapping """
[docs] def __init__(self, range=3, complexity=1, **kwargs): super(SVR2BRMMNode, self).__init__( epsilon=(range-1.0)/(range+1.0), complexity=2.0*complexity/(range+1.0), svm_type ='epsilon-SVR', **kwargs)
@QNormalParameter("range_", mu=2, sigma=1, q=1)
[docs]class RMMClassifierMatlabNode(RegularizedClassifierBase): """ Classify with Relative Margin Machine using original matlab code This node integrates of the "original" Shivaswamy RMM code. This RMM is implemented in Matlab and uses the mosek optimization suite. For this node to work, make sure that 1. Matlab is installed. 2. The pymatlab Python package is installed properly. pymatlab can be downloaded from http://pypi.python.org/pypi/pymatlab/0.1.3 For a MacOS setup guide please see https://svn.hb.dfki.de/IMMI-Trac/wiki/pymatlab 3. mosek is installed and matlab can access it. See http://mosek.com/ People with university affiliation can request free academic licenses within seconds from http://license.mosek.com/cgi-bin/student.py **References** ========= ================================================== main ========= ================================================== author Shivaswamy, P. K. and Jebara, T. title `Maximum relative margin and data-dependent regularization <http://portal.acm.org/citation.cfm?id=1756031>`_ journal Journal of Machine Learning Research pages 747-788 volume 11 year 2010 ========= ================================================== **Parameters** :complexity: Complexity sets the weighting of punishment for misclassification in comparison to generalizing classification from the data. Value in the range form 0 to infinity. (*optional, default: 1*) :range: Defines constraint radius for the outer boarder of the classification. Going to infinity, this classifier will be identical to the SVM. This parameter should be always greater then one. Going to one, the classifier will be a variant of the Regularized Linear Discriminant analysis. (*optional, default: 2*) .. note:: This classification node doesn't have nice debugging outputs as most errors will occur in mosek, i.e., from within the matlab session. In the rmm.m matlab code one might want to save the 'res' variable as it contains mosek error codes. .. note:: This implementation doesn't use class weights, i.e., w=[1,1] is fixed. **Exemplary Call** .. code-block:: yaml - node : RMMmatlab parameters : complexity : 1.0 range : 2.0 class_labels : ['Standard', 'Target'] :input: FeatureVector :output: PredictionVector :Author: David Feess (David.Feess@dfki.de) :Revised: 2011/03/10 """
[docs] def __init__(self,range=2, **kwargs): super(RMMClassifierMatlabNode, self).__init__(**kwargs) self.set_permanent_attributes(range=range)
[docs] def _stop_training(self, debug = False): """ Finish the training, i.e. train the RMM. Essentially, the data is passed to matlab, and the classification vector w and the offset b are returned.""" self._log("Preprocessing of Matlab RMM") self._log("Instances of Class %s: %s, %s: %s" \ % (self.classes[0], self.labels.count(self.classes.index(self.classes[0])), self.classes[1], self.labels.count(self.classes.index(self.classes[1])))) # Dimension of the data self.num_samples = len(self.samples) # Mapping from class to value of classifier self.bi = [] for label in self.labels: #if self.classes.index(label) == 0: if label == 0: self.bi.append(-1) else: self.bi.append(1) self._log("optimization preparation") # Import the necessary matlab interface try: from pymatlab.matlab import MatlabSession except ImportError: raise Exception("Using the RMMClassifierMatlabNode requires the pymatlab module.") try: matlab = MatlabSession('matlab -nojvm -nodisplay') except OSError: raise Exception("pymatlab couldn't start matlab session. Is pymatlab configured properly?") matlab.run('clear') matlab.run("addpath('pySPACE/missions/nodes/classification/svm_variants/')") labels = numpy.array(self.bi)[:, numpy.newaxis]*1.0 if self.kernel_type == "LINEAR": train_data = numpy.array(self.samples).T # Kzz = numpy.dot(train_data.T, train_data) matlab.putvalue('labels', labels) matlab.putvalue('train_data', train_data) matlab.run("[w_rmm,b_rmm] = compute_linear_RMM(labels, train_data, %f, %f);" % (self.complexity, self.range)) self.w = matlab.getvalue('w_rmm') self.b = matlab.getvalue('b_rmm') else: raise NotImplementedError( "Nonlinear kernels have not been wrapped, yet. " + "The required matlab implementation is already provided.") # KZZ = numpy.zeros((self.num_samples,self.num_samples)) # for i in range(self.num_samples): # si=self.samples[i] # for j in range(self.num_samples): # if i>j: # KZZ[i][j] = KZZ[j][i] # else: # KZZ[i][j] = self.kernel_func(si,self.samples[j]) matlab.close() self._log("Construction complete") self.delete_training_data()
# =========================================================================== @NoOptimizationParameter("version") @NoOptimizationParameter("kernel_type")
[docs]class RmmPerceptronNode(RMM2Node, BaseNode): """ Online Learning variants of the 2-norm RMM **Parameters** .. seealso:: :class:`RMM2Node` :co_adaptive: Integrate backtransformation into classifier to catch changing preprocessing (*optional, default: False*) :co_adaptive_index: Number of nodes to go back (*optional, default: numpy.inf*) :history_index: Number of history index of original data used for co-adaptation (*optional, default: 1*) **Exemplary Call** .. code-block:: yaml - node : RmmPerceptronNode parameters : range : 3 complexity : 1.0 weight : [1,3] class_labels : ['Standard', 'Target'] :Author: Mario Michael Krell :Created: 2014/01/02 """
[docs] def __init__(self, version="samples", kernel_type='LINEAR', co_adaptive=False, co_adaptive_index=numpy.inf, history_index=1, **kwargs): RMM2Node.__init__(self, version=version, kernel_type=kernel_type, **kwargs) self.set_permanent_attributes(co_adaptive=co_adaptive, co_adaptive_index=co_adaptive_index, history_index=history_index, hist_w=None, hist_b=0, zero_s=None)
[docs] def _train(self, data, class_label): """ Main method for incremental and normal training Code is a shortened version from the batch algorithm. """ if self.co_adaptive: try: hist_data = copy.deepcopy(data.history[self.history_index-1]) except IndexError: self._log("No history data available for classifier! " + "Co-adaptivity is turned off.", level=logging.CRITICAL) self.co_adaptive = False data_array = data.view(numpy.ndarray) # shortened initialization part from RegularizedClassifierBase if self.feature_names is None: try: self.feature_names = data.feature_names except AttributeError as e: warnings.warn("Use a feature generator node before a " + "classification node.") raise e if self.dim is None: self.dim = data.shape[1] if class_label not in self.classes and not "REST" in self.classes: warnings.warn("Please give the expected classes to the classifier! " + "%s unknown. "%class_label + "Therefore define the variable 'class_labels' in " + "your spec file, where you use your classifier. " + "For further info look at the node documentation.") if not(len(self.classes) == 2): self.classes.append(class_label) self.set_permanent_attributes(classes=self.classes) # individual initialization of classification vector if self.w is None: self.w = numpy.zeros(self.dim, dtype=numpy.float) if self.co_adaptive and (self.hist_w is None or self.zero_s is None): if type(hist_data) == FeatureVector: self.hist_w = FeatureVector.replace_data( hist_data, numpy.zeros(hist_data.shape, dtype=numpy.float)) self.zero_s = FeatureVector.replace_data( hist_data, numpy.zeros(hist_data.shape, dtype=numpy.float)) elif type(hist_data) == TimeSeries: self.hist_w = TimeSeries.replace_data( hist_data, numpy.zeros(hist_data.shape, dtype=numpy.float)) self.zero_s = TimeSeries.replace_data( hist_data, numpy.zeros(hist_data.shape, dtype=numpy.float)) if self.samples is None: self.samples = ["empty"] # to suppress logging warning # update part init for compatibility with batch mode formulas i = 0 weight = self.weight[self.classes.index(class_label)] self.ci = [float(self.complexity * weight)] if not self.squ_factor: M = [1.0/(numpy.linalg.norm(data_array)**2.0 + self.offset_factor)] else: M = [(1 / (numpy.linalg.norm(data_array)**2.0 + self.offset_factor + 1 / (2 * self.ci[i])), 1 / (numpy.linalg.norm(data_array)**2.0 + self.offset_factor + 1 / (2 * self.ci[i] * self.complexity_correction)))] bi = float(self.classes.index(class_label) * 2 - 1) xi = data_array[0, :] fi = bi * (dot(xi.T, self.w) + self.b) s1 = 0.0 s2 = 0.0 beta = False old_dual = 0 # just used to have the same formulas as in batch version current_dual = numpy.zeros((1, 2)) # update part if self.squ_factor: x = old_dual - self.omega * M[i][0] * (fi - 1 + s1) elif not self.squ_factor: x = old_dual - self.omega * M[i] * (fi - 1) # no update of alpha but update of beta (eventually zero update) if not x > 0: beta = True if self.squ_factor: x = old_dual - self.omega * M[i][1] * ( self.range - fi + s2) elif not self.squ_factor: x = old_dual - self.omega * M[i] * ( self.range - fi + s2) # map dual solution to the interval [0,C] in L1 case or # just make it positive in the L2 case # current_dual[i]=self.project(x,index=i) if x <= 0: current_dual[i] = [0, 0] elif not beta and not self.squ_factor: current_dual[i, 0] = min(x, self.ci[i]) elif beta and not self.squ_factor: current_dual[i, 1] = min(x, self.ci[i] * self.complexity_correction) elif not beta and self.squ_factor: current_dual[i, 0] = x elif beta and self.squ_factor: current_dual[i, 1] = x # update w and b delta = (current_dual[i, 0] + current_dual[i, 1] - old_dual) * bi # for beta: difference needed if beta: delta = -delta # update classification function parameter w and b # self.update_classification_function(delta=delta, index=i) self.b += self.offset_factor * delta if not self.co_adaptive: self.w += delta * xi else: self.hist_b += delta self.hist_w += delta * hist_data self.update_from_history()
[docs] def update_from_history(self): """ Update w from historic data to catch changing preprocessing """ temp_w = self.get_previous_execute( data=self.hist_w, number=self.co_adaptive_index) temp_T = self.get_previous_execute( data=self.zero_s, number=self.co_adaptive_index) temp_w += (self.hist_b-1.0)*temp_T self.w = temp_w.view(numpy.ndarray)[0, :]
[docs] def train(self,data,label): """ Prevent RegularizedClassifierBase method from being called """ #one vs. REST case if "REST" in self.classes and not label in self.classes: label = "REST" # one vs. one case if not self.multinomial and len(self.classes) == 2 and \ not label in self.classes: return start_time_stamp = timeit.default_timer() BaseNode.train(self, data, label) stop_time_stamp = timeit.default_timer() if not self.classifier_information.has_key("Training_time(classifier)"): self.classifier_information["Training_time(classifier)"] = \ stop_time_stamp - start_time_stamp else: self.classifier_information["Training_time(classifier)"] += \ stop_time_stamp - start_time_stamp
[docs] def _inc_train(self, data, label): """ Incremental training and normal training are the same """ #one vs. REST case if "REST" in self.classes and not label in self.classes: label = "REST" # one vs. one case if not self.multinomial and len(self.classes) == 2 and \ not label in self.classes: return if self.co_adaptive == "double" or not self.co_adaptive: self._train(data, label) if self.co_adaptive: self.update_from_history()
[docs] def _stop_training(self, debug=False): """ Do nothing than suppressing mother method """ self.classifier_information["~~Solver_Iterations~~"] = 0 try: self.classifier_information["~~offset~~"] = self.b self.classifier_information["~~w0~~"] = self.w[0] self.classifier_information["~~w1~~"] = self.w[1] except: pass
_NODE_MAPPING = {"1RMM": RMM1ClassifierNode, "2RMM": RMM2Node, "RMMmatlab": RMMClassifierMatlabNode, }