""" Classification using the WEKA experimenter
A WEKA classification process consists of executing a certain
WEKA experiment. The results of all these processes are stored in a temporary
directory and after the completion of all processes of the operation,
the consolidate method of the *WekaClassificationOperation* is executed and the
results are merged into a consistent representation of the operations result
collection.
http://www.cs.waikato.ac.nz/ml/weka/
"""
import sys
import os
import glob
import yaml
if sys.version_info[0] == 2 and sys.version_info[1] < 6:
import processing
else:
import multiprocessing as processing
import pySPACE
from pySPACE.missions.operations.base import Operation, Process
from pySPACE.resources.dataset_defs.base import BaseDataset
from pySPACE.resources.dataset_defs.performance_result import PerformanceResultSummary
from pySPACE.tools.filesystem import create_directory
[docs]class WekaClassificationOperation(Operation):
""" Operation for classification using Weka experimenter
A Weka classification operation consists of a set of WEKA processes. Each of
these processes consists of executing a certain WEKA experiment.
The results of this operation are collected using the
consolidate method that produces a consistent representation
of the result collections.
"""
[docs] def __init__(self, processes, operation_spec, result_directory,
number_processes, create_process=None):
super(WekaClassificationOperation, self).__init__(processes,
operation_spec,
result_directory)
self.create_process = create_process
self.number_processes = number_processes
@classmethod
[docs] def create(cls, operation_spec, result_directory, debug=False, input_paths=[]):
"""
A factory method that creates an WEKA operation based on the
information given in the operation specification operation_spec
"""
assert(operation_spec["type"] == "weka_classification")
# Determine all parameter combinations that should be tested
parameter_settings = cls._get_parameter_space(operation_spec)
# Read the command template from a file
template_file = open(os.path.join(pySPACE.configuration.spec_dir,
"operations",
"weka_templates",
operation_spec["template"]),
'r')
command_template = template_file.read()
template_file.close()
# number of processes
if "runs" in operation_spec:
number_processes = len(input_paths) * len(parameter_settings) * \
operation_spec["runs"]
else: # approximate the number of processes
runs = []
for dataset_dir in input_paths:
collection = BaseDataset.load(dataset_dir)
runs.append(collection.meta_data["runs"])
runs = max(runs)
number_processes = len(input_paths) * len(parameter_settings) * \
runs
if debug == True:
# To better debug creation of processes we don't limit the queue
# and create all processes before executing them
processes = processing.Queue()
cls._createProcesses(processes, result_directory, operation_spec,
parameter_settings, input_paths,
command_template)
# create and return the weka operation object
return cls(processes, operation_spec, result_directory,
number_processes)
else:
# Create all processes by calling a recursive helper method in
# another thread so that already created processes can be executed in
# parallel. Therefore a queue is used which size is maximized to
# guarantee that not to much objects are created (because this costs
# memory). However, the actual number of 100 is arbitrary and might
# be reviewed.
processes = processing.Queue(100)
create_process = processing.Process(target=cls._createProcesses,
args=( processes, result_directory, operation_spec,
parameter_settings, input_paths,
command_template))
create_process.start()
# create and return the weka operation object
return cls(processes, operation_spec, result_directory,
number_processes, create_process)
@classmethod
[docs] def _createProcesses(cls, processes, result_directory, operation_spec,
parameter_settings, input_collections, command_template):
# For each combination of classifier, input-collection and
# run number, create one WEKA_process
for dataset_dir in input_collections:
collection = BaseDataset.load(dataset_dir)
# Determine the number of iterations and splits to be used
iterations = collection.meta_data["runs"]
splits = collection.meta_data["splits"]
if "runs" in operation_spec:
assert(iterations in [1, operation_spec["runs"]])
iterations = operation_spec["runs"]
if "cv_folds" in operation_spec:
assert(splits in [1, operation_spec["cv_folds"]])
splits = operation_spec["cv_folds"]
for parametrization in parameter_settings:
for run_number in range(iterations):
process = WEKAClassificationProcess(dataset_dir,
command_template,
parametrization,
splits,
run_number,
result_directory)
processes.put(process)
# give executing process the sign that creation is now finished
processes.put(False)
[docs] def consolidate(self):
"""
Consolidates the results obtained by the single WEKA processes into
a consistent structure of collections that are stored on the
file system.
"""
self._log("Consolidating results ...")
# We load and store the results once into a PerformanceResultSummary.
# From_multiple csv does the necessary consolidation
# and mixes and parses the table.
self._log("Reading intermediate results...")
result_collection = PerformanceResultSummary(dataset_dir=self.result_directory)
self._log("done")
self._log("Storing result collection")
result_collection.store(self.result_directory)
self._log("done")
# Write the specification of this operation
# to the result directory in order to make later
# analysis of results more easy
source_operation_file = open(os.path.join(self.result_directory,
"source_operation.yaml"), 'w')
yaml.dump(self.operation_spec, source_operation_file)
source_operation_file.close()
[docs]class WEKAClassificationProcess(Process):
""" Process for classification using Weka
A WEKA classification process consists of executing a certain WEKA
experiment. This experiment is defined by a template in which certain aspects
can be configured, for instance:
* which classifier is used
* which data set is processed
* how many cross validation folds are used etc.
The results of the WEKA experiment are written to the file system and
later on collected and consolidated during the consolidation of
the *WekaClassificationOperation*.
"""
unique_id = 0
[docs] def __init__(self,
dataset_dir,
command_template,
parametrization,
cv_folds,
run_number,
operation_result_dir):
super(WEKAClassificationProcess, self).__init__()
# Load the abbreviations
abbreviations_file = open(os.path.join(pySPACE.configuration.spec_dir,
'operations/weka_templates',
'abbreviations.yaml'), 'r')
self.abbreviations = yaml.load(abbreviations_file)
abbreviations_file.close()
# Determine the directory in which the process' results
# are stored
self.result_directory = operation_result_dir
# Create collection
collection = BaseDataset.load(dataset_dir)
# The parametrization that is independent of the collection type
# and the specific weka command template that is executed
self.params = {"collection_name": dataset_dir.strip(os.sep).split(os.sep)[-1],
"run_number": run_number,
"cv_folds": cv_folds,
"weka_class_path": pySPACE.configuration.weka_class_path,
"temp_results": self.result_directory,
"unique_id": WEKAClassificationProcess.unique_id}
# Collection dependent parameters
if not collection.meta_data["train_test"] \
and collection.meta_data["splits"] == 1:
raise NotImplementedError()
else:
# The pattern of the train and test files generated by crossvalidation
data_pattern = os.path.join(dataset_dir,
collection.meta_data["data_pattern"])
# One example arff file in which WEKa can look up relation name etc.
sample_dataset = data_pattern.replace("_run", "_run0")\
.replace("_sp_","_sp0_")\
.replace("_tt","_train")
self.params.update({"sample_dataset": sample_dataset,
"data_pattern": data_pattern})
# Add custom parameters for the weka command template
for parameter_name, parameter_value in parametrization.iteritems():
self.params[parameter_name + "_abbr"] = parameter_value
# Auto-expand abbreviations
if parameter_value in self.abbreviations:
parameter_value = self.abbreviations[parameter_value]
elif parameter_name == 'classifier':
import warnings
warnings.warn("Did not find classifier abbreviation %s. "
" Expecting full name." % parameter_value)
self.params[parameter_name] = parameter_value
# Build the WEKA command by repeatedly replacing all placeholders in
# the template
while True:
instantiated_template = command_template % self.params
if instantiated_template == command_template:
# All placeholders replace
self.weka_command = instantiated_template
break
else:
# We have to continue since we are not converged
command_template = instantiated_template
self.handler_class = None
WEKAClassificationProcess.unique_id += 1
[docs] def __call__(self):
"""
Executes this process on the respective modality
"""
# Restore configuration
pySPACE.configuration = self.configuration
############## Prepare benchmarking ##############
super(WEKAClassificationProcess, self).pre_benchmarking()
# Execute the java command in this OS process
os.system(self.weka_command)
############## Clean up after benchmarking ##############
super(WEKAClassificationProcess, self).post_benchmarking()