Source code for pySPACE.missions.operations.weka_filter

""" Use Weka's Filter that transform one arff file into another.

A WEKA filter process consists of applying a filter to all arff-files contained 
in the input path. Filters may be using (un-)supervised training
on the train datasets. For instance, feature selector filter is trained  on
a train set so that a subset of the features are selected. The results of the
process consists of projecting both the train and the respective test set on the
selected features. The results of all these processes are stored in a temporary
directory and after the completion of all processes of the operation,
the consolidate method of the *WEKAFilterOperation* is executed and
the results are merged into a consistent representation of the operations result
collection.

http://www.cs.waikato.ac.nz/ml/weka/
"""

import sys
import os
import glob
import yaml
import time
import logging
if sys.version_info[0] == 2 and sys.version_info[1] < 6:
    import processing
else:
    import multiprocessing as processing

import pySPACE
from pySPACE.missions.operations.base import Operation, Process
from pySPACE.resources.dataset_defs.base import BaseDataset
from pySPACE.tools.filesystem import create_directory
from pySPACE.tools.filesystem import get_author


[docs]class WekaFilterOperation(Operation): """ Operation for feature selection using Weka A WEKA Filter operation consists of a set of WEKA Filter processes. Each of these processes stores its results in a temporary directory. The operation collects the results of these processes using the consolidate method that produces a consistent representation of the result collections. """
[docs] def __init__(self, processes, operation_spec, result_directory, number_processes, create_process=None): super(WekaFilterOperation, self).__init__(processes, operation_spec, result_directory) self.number_processes = number_processes self.create_process = create_process
@classmethod
[docs] def create(cls, operation_spec, result_directory, debug=False, input_paths=[]): """ A factory method that creates an WEKA operation based on the information given in the operation specification operation_spec """ assert(operation_spec["type"] == "weka_filter") # Determine all parameter combinations that should be tested parameter_settings = cls._get_parameter_space(operation_spec) if "hide_parameters" in operation_spec: hide_parameters = operation_spec["hide_parameters"] else: hide_parameters = [] # Read the command template from a file template_file = open(os.path.join(pySPACE.configuration.spec_dir, "operation", "weka_templates", operation_spec["template"]), 'r') command_template = template_file.read() template_file.close() # number of processes if "runs" in operation_spec: number_processes = len(input_paths) * len(parameter_settings) * \ operation_spec["runs"] else: # approximate the number of processes runs = [] for dataset_dir in input_paths: collection = BaseDataset.load(dataset_dir) runs.append(collection.meta_data["runs"]) runs = max(runs) number_processes = len(input_paths) * len(parameter_settings) * \ runs if debug == True: # To better debug creation of processes we don't limit the queue # and create all processes before executing them processes = processing.Queue() cls._createProcesses(processes, result_directory, operation_spec, parameter_settings, input_paths, command_template, hide_parameters) # create and return the weka operation object operation = cls(processes, operation_spec, result_directory, number_processes) else: # Create all processes by calling a recursive helper method in # another thread so that already created processes can be executed in # parallel. Therefor a queue is used which size is maximized to # guarantee that not to much objects are created (because this costs # memory). However, the actual number of 100 is arbitrary and might # be reviewed. processes = processing.Queue(4) create_process = processing.Process(target=cls._createProcesses, args=( processes, result_directory, operation_spec, parameter_settings, input_paths, command_template, hide_parameters)) create_process.start() # create and return the weka operation object operation = cls(processes, operation_spec, result_directory, number_processes, create_process) # We remember the command_template and how many parameters are passed # for consolidation operation.num_parameters = len(operation_spec["parameter_ranges"]) \ - len(hide_parameters) operation.command_template = command_template return operation
@classmethod
[docs] def _createProcesses(cls, processes, result_directory, operation_spec, parameter_settings, input_collections, command_template, hide_parameters): # For each combination of filter, input-collection and # run number, create one WEKA_process for dataset_dir in input_collections: collection = BaseDataset.load(dataset_dir) # Determine the number of iterations and splits to be used iterations = collection.meta_data["runs"] splits = collection.meta_data["splits"] if "runs" in operation_spec: assert(iterations in [1, operation_spec["runs"]]) iterations = operation_spec["runs"] for parametrization in parameter_settings: for run_number in range(iterations): for split_number in range(splits): process = WEKAFilterProcess(dataset_dir, command_template, parametrization, run_number, split_number, result_directory, hide_parameters = hide_parameters) processes.put(process) # give executing process the sign that creation is now finished processes.put(False)
[docs] def consolidate(self): """ Consolidates the results obtained by the single WEKA filter processes into a consistent summary of datasets that is stored on the file system. .. todo:: Some of the contents of this method should go into the :class:`~pySPACE.resources.dataset_defs.feature_vector.FeatureVectorDataset` """ # Iterate over all collections and store the collection meta data etc. for entries in os.listdir(self.result_directory): fullpath = os.path.join(self.result_directory, entries) # For each collection if os.path.isdir(fullpath): if entries.startswith("{"): # Extract the parameters from the collection name in order to # adjust the relation name if self.num_parameters > 0: parameter_strings = entries.strip("}{").split("}{")[-self.num_parameters:] parameter_postfix = "{" + "}{".join(parameter_strings) + "}" else: parameter_strings = "" parameter_postfix = "" # Postprocessing of the arff files of this collection for train_arff_file in glob.glob(fullpath + os.sep + "data_run*" + os.sep + "*train.arff"): # Adjust the relation name of the train file content = open(train_arff_file, 'r').readlines() # We strip everything after the last "}" endindex = content[0].rfind("}") content[0] = content[0][:endindex+1] content[0] += parameter_postfix + "'" open(train_arff_file, 'w').writelines(content) # Use relation name of train data for test data test_arff_file = train_arff_file.replace("train.arff", "test.arff") test_content = open(test_arff_file, 'r').readlines() test_content[0] = content[0] + "\n" open(test_arff_file, 'w').writelines(test_content) # Check which features are contained in the arff file feature_names = [] for line in content: if line.startswith("@attribute"): attribute = line.split()[1] if attribute is not "class": feature_names.append(attribute) # Store the collection meta data etc. if self.num_parameters > 0: input_collection_name = \ "{" + "}{".join(entries.strip("}{").split("}{")[:-self.num_parameters]) + "}" else: input_collection_name = entries input_collection_path = os.path.join(self.operation_spec["input_path"], input_collection_name) input_collection_meta = BaseDataset.load_meta_data( pySPACE.configuration.storage + os.sep + input_collection_path) # Store the input collection BaseDataset.store_meta_data(fullpath, input_collection_meta, file_name="input_metadata.yaml") # Adjust collection metadata for the new collection input_collection_meta["feature_names"] = feature_names input_collection_meta["num_features"] = len(feature_names) input_collection_meta["author"] = get_author() input_collection_meta["date"] = time.strftime("%Y%m%d") input_collection_meta["input_collection_name"] = input_collection_name # Write the collection meta information into the folder BaseDataset.store_meta_data(fullpath,input_collection_meta) # Store the command_template command_template_file = open(os.path.join(fullpath, "command_template"), 'w') command_template_file.write(self.command_template) command_template_file.close() else: # training and test arff need the same relation name # otherwise Weka can't relate it to each other; the collection # name and the parameters in {}{}-optic must be the relation # name for further processing self._log("WARNING: Collection name doesn't begin with '{'. Further processing may be collapse!", level= logging.WARNING) # Write the specification of this operation # to the result directory in order to make later # analysis of results more easy source_operation_file = open(os.path.join(self.result_directory, "source_operation.yaml"), 'w') yaml.dump(self.operation_spec, source_operation_file) source_operation_file.close()
[docs]class WEKAFilterProcess(Process): """ Process for using Weka's filters A WEKA filter process consists of applying a filter to all arff-files contained in the *input_path*. Filters may be using (un-)supervised training on the train datasets. For instance, the feature selector filter is trained on a train set so that a subset of the features are selected. The results of the process consists of projecting both the train and the respective test set on the selected features. The results of all these processes are stored in a temporary directory. """
[docs] def __init__(self, dataset_dir, command_template, parametrization, run_number, split_number, operation_result_dir, hide_parameters = []): super(WEKAFilterProcess, self).__init__() # Determine the directory in which the of the process' results # are stored result_collection_name = dataset_dir.split(os.sep)[-2] for parameter_name, parameter_value in parametrization.iteritems(): # If this is a parameter that should not be hidden, then we have to # encode it in the result collection name if not parameter_name in hide_parameters: result_collection_name += "{__%s__:%s}" % (parameter_name.upper(), parameter_value) self.result_directory = os.path.join(operation_result_dir, result_collection_name) # Create directory for intermediate results if it does not exist yet create_directory(self.result_directory + os.sep + "data_run%s" % run_number) # Create collection collection = BaseDataset.load(dataset_dir) # The parametrization that is independent of the collection type # and the specific weka command template that is executed self.params = {"dataset_name": dataset_dir.replace('/','_'), "dataset_dir": dataset_dir, "run_number": run_number, "split_number": split_number, "weka_class_path": pySPACE.configuration.weka_class_path, "temp_results": self.result_directory} # Load the abbreviations abbreviations_file = open(os.path.join(pySPACE.configuration.spec_dir, 'operations/weka_templates', 'abbreviations.yaml'), 'r') self.abbreviations = yaml.load(abbreviations_file) # Add custom parameters for the weka command template for parameter_name, parameter_value in parametrization.iteritems(): # Auto-expand abbreviations if parameter_value in self.abbreviations: parameter_value = self.abbreviations[parameter_value] self.params[parameter_name] = parameter_value # Build the WEKA command by repeatedly replacing all placeholders in # the template while True: instantiated_template = command_template % self.params if instantiated_template == command_template: # All placeholders replace self.weka_command = instantiated_template break else: # We have to continue since we are not converged command_template = instantiated_template self.handler_class = None
[docs] def __call__(self): """ Executes this process on the respective modality """ # Restore configuration pySPACE.configuration = self.configuration ############## Prepare benchmarking ############## super(WEKAFilterProcess, self).pre_benchmarking() # Execute the java command in this OS process os.system(self.weka_command) ############## Clean up after benchmarking ############## super(WEKAFilterProcess, self).post_benchmarking()