""" Feature selection based on the RELIEF algorithm """
import os
import random
import warnings
import cPickle
from collections import defaultdict
import numpy
import heapq
from pySPACE.missions.nodes.base_node import BaseNode
from pySPACE.resources.data_types.feature_vector import FeatureVector
from pySPACE.tools.filesystem import create_directory
[docs]class ReliefFeatureSelectionNode(BaseNode):
""" Feature selection based on the RELIEF algorithm
Feature selection based on the RELIEF algorithm. A feature is preferred
if instances of the same class (hits) are comparatively close to each other
compared to instances of the other class (misses) in the feature dimension.
Please refer to "Estimating Attributes: Analysis and Extensions of RELIEF"
by Kononenko for more information.
**Parameters**
:num_retained_features:
The number of features that should be retained by the node. This
information must be specified if selected_features_path is not
specified.
(*optional, default: None*)
:selected_features_path:
An absolute path from which the selected features are loaded. If
not specified, these features are learned from the training data.
In this case, num_retained_features must be specified.
(*optional, default: None*)
:k:
The number of nearest neighbors that are considered when computing
the closest hits and misses. Defaults to 1.
(*optional, default: 1*)
**Exemplary Call**
.. code-block:: yaml
-
node : ReliefFeatureSelection
parameters :
num_retained_features : 100
k : 10
:Author: Jan Hendrik Metzen (jhm@informatik.uni-bremen.de)
:Created: 2010/07/12
"""
[docs] def __init__(self, num_retained_features=None, selected_features_path=None,
k=1, **kwargs):
# Must be set before constructor of superclass is set
self.trainable = (selected_features_path == None)
super(ReliefFeatureSelectionNode, self).__init__(**kwargs)
retained_feature_indices = None
# Load patterns from file if requested
if selected_features_path != None:
features_file = open(selected_features_path, 'r')
retained_feature_indices = cPickle.load(features_file)
if num_retained_features is not None:
if len(retained_feature_indices) > num_retained_features:
retained_feature_indices = retained_feature_indices[0:num_retained_features]
elif len(retained_feature_indices) < num_retained_features:
warnings.warn("Only %s features available, cannot retain "
"%s features!" % (len(retained_feature_indices),
num_retained_features))
features_file.close()
self.set_permanent_attributes(
retained_feature_indices = retained_feature_indices,
num_retained_features = num_retained_features,
k = k,
class_data = defaultdict(list),
feature_names = None)
[docs] def is_trainable(self):
""" Returns whether this node is trainable """
return self.trainable
[docs] def is_supervised(self):
""" Returns whether this node requires supervised training """
return True
[docs] def _train(self, data, label):
""" Add given data point along with its label to the training set. """
assert isinstance(data, FeatureVector), "Relief feature selection " \
"requires that its input node outputs feature vectors!"
# Gather feature vectors and labels in two lists
self.class_data[label].append(data[0])
if self.feature_names == None:
self.feature_names = data.feature_names
[docs] def _stop_training(self, debug=False):
""" Called automatically at the end of training
Computes a ranking of features and stores
a list of the indices of those feature that should be retained
"""
assert (len(self.class_data.keys()) == 2),\
"Relief supports only 2-class problems!"
# Generate data structures for searching class conditioned approximate
# nearest neighbors
for class_label, instances in self.class_data.iteritems():
# K needs to be decreased if not enough data is available
self.k = min(self.k, len(instances)-1)
# Mapping from class label to the other class' label
other_class = {self.class_data.keys()[0]: self.class_data.keys()[1],
self.class_data.keys()[1]: self.class_data.keys()[0]}
# Compute the features' weights (its quality).
# The quality of a feature is the better the smaller the
# instance's distance in this feature dimension is to the
# closest hit and the larger the distance to the closest miss
weights = numpy.zeros(self.class_data.values()[0][0].shape)
instance_counter = 0
for class_label, instances in self.class_data.iteritems():
instance_counter += len(instances)
# For all instances of the given class
for instance in instances:
other_class_label = other_class[class_label]
# Compute k - nearest neighbors within the same class
for hit in self._search_k_nearest_neighbors(instance, class_label,
self.k + 1)[1:]:
# Subtract distance from instance to hit from the weights
# (normalized by the number of neighbors)
weights -= numpy.absolute(instance - hit)/self.k
# Compute k - nearest neighbors in the other class
for miss in self._search_k_nearest_neighbors(instance,
other_class_label,
self.k):
# Add distance from instance to miss to the weights
# (normalized by the number of neighbors)
weights += numpy.absolute(instance - miss)/self.k
weights = weights / instance_counter
self.retained_feature_indices = \
numpy.argsort(-weights)[:self.num_retained_features]
self.class_data = defaultdict(list) # Get rid of stored training data
[docs] def _execute(self, feature_vector):
""" Projects the feature vector onto the retained features """
# Project the features onto the selected subspace
proj_features = feature_vector[:,self.retained_feature_indices]
# Update the feature_names list
feature_names = [feature_vector.feature_names[index]
for index in self.retained_feature_indices]
# Create feature vector instance
projected_feature_vector = FeatureVector(proj_features,
feature_names)
return projected_feature_vector
[docs] def _search_k_nearest_neighbors(self, instance, class_label, k):
# Search the k training examples of class class_label that are most
# similar to instance
heap = []
for example in self.class_data[class_label]:
# Sort based on distance of example to instance (1-norm)
distance = numpy.linalg.norm(instance - example, ord=2)
heapq.heappush(heap, (distance, random.random(), example)) # break ties randomly
assert len(heap) >= k
return [heapq.heappop(heap)[2] for i in range(k)]
[docs] def store_state(self, result_dir, index=None):
""" Stores this node in the given directory *result_dir* """
if self.store:
node_dir = os.path.join(result_dir, self.__class__.__name__)
create_directory(node_dir)
# This node only stores the order of the selected features' indices
name = "%s_sp%s.pickle" % ("selected_features", self.current_split)
result_file = open(os.path.join(node_dir, name), "wb")
result_file.write(cPickle.dumps(self.retained_feature_indices,
protocol=2))
result_file.close()
# Store feature names
name = "feature_names_sp%s.txt" % self.current_split
result_file = open(os.path.join(node_dir, name), "w")
result_file.write("%s" % self.feature_names)
result_file.close()