""" Define train and test data for One versus Rest or Rest versus One in cross validation fashion
The result summary of this operation contains one dataset for every
dataset of the *input_path*, which uses data from this dataset as test
data and the data of all other datasets as training data. For instance, if
the input consists of the three datasets "A", "B", "C", the result summary
will contain the 3 datasets "Rest_vs_A", "Rest_vs_B", and "Rest_vs_C".
The result dataset "Rest_vs_A" uses the data from dataset "A" as test data
and the data from all other datasets as train data. If *reverse* is True
this will result in the 3 datasets "A_vs_Rest", "B_vs_Rest" and
"C_vs_Rest".
Specification file Parameters
+++++++++++++++++++++++++++++
type
----
Has to be set to *merge* to use this operation!
(*obligatory, merge*)
input_path
----------
The input path of this operation has to contain several datasets of one of
the types :mod:`~pySPACE.resources.dataset_defs.time_series` or
:mod:`~pySPACE.resources.dataset_defs.feature_vector`.
The input datasets must not contain split data.
(*obligatory*)
name_pattern
------------
String to customize 'Rest' in the name of the result dataset.
(*optional, default: 'Rest'*)
reverse
-------
Switch to use *One_vs_Rest* scheme instead of *Rest_vs_one* scheme
(*optional, default: False*)
set_flag
-----------
If *set_flag* is True, the first time series object, that is merged from a
different dataset, gets a flag. This is only done if data is stored in pickle
format.
(*optional, default: True*)
collection_constraints
----------------------
Optionally, constraints can be passed to the operation that specify which
datasets are used as training data for which test data. For instance, the
constraint '"%(source_train_collection_name)s".strip("}{").split("}{")[1:] ==
"%(source_test_collection_name)s".strip("}{").split("}{")[1:]' would cause
that only datasets are combined that were created by the same processing
with the same parametrization.
(*optional, default: []*)
Exemplary Call
++++++++++++++
.. code-block:: yaml
type: merge
input_path: "operation_results/2009_8_13_15_8_57"
reverse: False
collection_constraints:
# Combine only collections that have been created using the same parameterization
- '"%(source_train_collection_name)s".strip("}{").split("}{")[1:] == "%(source_test_collection_name)s".strip("}{").split("}{")[1:]'
.. todo:: When applying a rewindowing on merged time series data with the
:class:`~pySPACE.missions.nodes.source.time_series_source.TimeSeries2TimeSeriesSourceNode`
and specifying an endmarker in the windower spec file only data from
the first set might be used. Here, an additional marker handling
could be implemented, e.g., delete middle start- and end-marker
('S 8', 'S 9'), to have one merged dataset with defined start
and endpoint or add synthetic ones.
Alternatively the :mod:`~pySPACE.missions.support.windower` could
be modified, e.g., to handle multiple start and end markers.
The marker information is stored in the `marker_name` variable
of the :class:`~pySPACE.resources.data_types.time_series.TimeSeries`
objects.
"""
import os
import sys
import glob
import shutil
import time
if sys.version_info[0] == 2 and sys.version_info[1] < 6:
import processing
else:
import multiprocessing as processing
import logging
import pySPACE
from pySPACE.missions.operations.base import Operation, Process
from pySPACE.tools.filesystem import create_directory
from pySPACE.tools.filesystem import get_author
from pySPACE.resources.dataset_defs.base import BaseDataset
[docs]class MergeOperation(Operation):
""" Operation to create 'All_vs_One' datasets """
[docs] def __init__(self, processes, operation_spec, result_directory,
number_processes, create_process=None):
super(MergeOperation, self).__init__(processes, operation_spec,
result_directory)
self.number_processes = number_processes
self.create_process = create_process
@classmethod
[docs] def create(cls, operation_spec, result_directory, debug=False, input_paths=[]):
""" [factory method] Create a MergeOperation object.
A factory method that creates a MergeOperation based on the
information given in the operation specification operation_spec
"""
assert(operation_spec["type"] == "merge")
# Determine constraints for collections that are combined
collection_constraints = []
if "collection_constraints" in operation_spec:
collection_constraints.extend(operation_spec["collection_constraints"])
reverse = operation_spec.get("reverse", False)
set_flag = operation_spec.get("set_flag", True)
name_pattern = operation_spec.get("name_pattern", "Rest")
# merging is not distributed over different processes
number_processes = 1
processes = processing.Queue()
# Create the Merge Process
cls._createProcesses(processes, input_paths, result_directory,
collection_constraints, reverse, set_flag, name_pattern)
# create and return the merge operation object
return cls(processes, operation_spec, result_directory, number_processes)
@classmethod
[docs] def _createProcesses(cls, processes, input_collections, result_directory,
collection_constraints, reverse, set_flag, name_pattern):
"""[factory method] Create the MergeProcess object."""
# Create the merge process and put it in the execution queue
processes.put(MergeProcess(input_collections, result_directory,
collection_constraints, reverse, set_flag, name_pattern))
# give executing process the sign that creation is now finished
processes.put(False)
[docs] def consolidate(self):
""" Consolidation of the operation's results """
# Just do nothing
pass
[docs]class MergeProcess(Process):
""" Create 'All_vs_One' collections where 'All' are all collections that fulfill the *collection_constraints* and are different from the "One" collection
Restricted to pickle and arff files!
.. todo:: Merging of csv-files?
"""
[docs] def __init__(self, input_collection, result_directory,
collection_constraints, reverse, set_flag, name_pattern):
super(MergeProcess, self).__init__()
self.input_collections = input_collection
self.result_directory = result_directory
self.collection_constraints = collection_constraints
self.reverse = reverse
self.set_flag = set_flag
self.name_pattern = name_pattern
[docs] def __call__(self):
""" Executes this process on the respective modality """
############## Prepare benchmarking ##############
super(MergeProcess, self).pre_benchmarking()
# For all input collections
for source_test_collection_path in self.input_collections:
# Check if the input data is splitted
# e.g. only a single test file is in the source directory
source_files = glob.glob(os.sep.join([source_test_collection_path,
"data_run0", "*test*"]))
splitted = len(source_files) > 1
assert(not splitted)
source_file_name = str(source_files[-1])
# check if train sets are also present
train_data_present = len(glob.glob(os.sep.join(
[source_test_collection_path,"data_run0",\
"*train*"]))) > 0
# if training data is present -> use train and test sets separately
if train_data_present:
train_set_name_suffix = "train"
else:
train_set_name_suffix = "test"
# We create the collection Rest_vs_Collection
source_test_collection_name = \
source_test_collection_path.split(os.sep)[-2]
test_base_collection_name = \
source_test_collection_name.strip("}{").split("}{")[0]
if self.reverse:
target_collection_name = source_test_collection_name.replace(
test_base_collection_name,
test_base_collection_name + "_vs_" + self.name_pattern)
key = "train"
else:
target_collection_name = source_test_collection_name.replace(
test_base_collection_name,
self.name_pattern + "_vs_" + test_base_collection_name)
key = "test"
target_collection_path = os.sep.join([self.result_directory,
target_collection_name])
# determine the parameter_settings of the test collection
test_collection = BaseDataset.load(source_test_collection_path)
target_collection_params = \
test_collection.meta_data["parameter_setting"]
target_collection_params["__INPUT_DATASET__"] = \
{key: source_test_collection_name}
if source_file_name.endswith("arff"):
file_ending = "arff"
# Copy arff file from input collection to target collection
source_test_file_path = os.sep.join([source_test_collection_path,
"data_run0","features_sp0" +
train_set_name_suffix + ".arff"])
target_test_file_path = os.sep.join([target_collection_path,
"data_run0","features_sp0_"+key+".arff"])
else:
file_ending = source_file_name.split(".")[-1]
source_test_file_path = source_test_collection_path
target_test_file_path = target_collection_path
source_train_pathes = []
for source_train_collection_path in self.input_collections:
source_train_collection_name = \
source_train_collection_path.split(os.sep)[-2]
# We must not use data originating from the same input
# collection both in train and test files
if source_test_collection_name == source_train_collection_name:
continue
# Check that all constraints are fulfilled for this pair of
# input collections
if not all(eval(constraint_template % \
{'source_train_collection_name': source_train_collection_name,
'source_test_collection_name': source_test_collection_name})
for constraint_template in self.collection_constraints):
continue
# check if all parameters are stored in the target path
source_collection = \
BaseDataset.load(source_train_collection_path)
source_collection_params = \
source_collection.meta_data["parameter_setting"]
remaining_params = \
[param for param in source_collection_params.items() \
if param not in target_collection_params.items() and \
param[0] not in ["__INPUT_DATASET__",
"__RESULT_DIRECTORY__", "__OUTPUT_BUNDLE__",
"__INPUT_COLLECTION__" ]] # for old data
if remaining_params != []:
for k,v in remaining_params:
target_collection_path += "{%s#%s}" % (k,str(v))
target_collection_params[k]=v
if "arff" == file_ending:
source_train_file_path = \
os.sep.join([source_train_collection_path,
"data_run0", "features_sp0_" + \
train_set_name_suffix + ".arff"])
else:
source_train_file_path = source_train_collection_path
source_train_pathes.append(source_train_file_path)
if "arff" == file_ending:
target_train_file_path = os.sep.join([target_collection_path,
"data_run0","features_sp0_"+key+".arff"])
else:
target_train_file_path = target_collection_path
if len(source_train_pathes) == 0:
continue
create_directory(os.sep.join([target_collection_path,
"data_run0"]))
if "arff" == file_ending:
self._copy_arff_file(source_test_file_path,
target_test_file_path,
source_test_collection_name,
target_collection_name)
self._merge_arff_files(target_train_file_path,
source_train_pathes,
target_collection_name)
# Copy metadata.yaml
# TODO: Adapt to new collection
input_meta = BaseDataset.load_meta_data(source_test_collection_path)
BaseDataset.store_meta_data(target_collection_path,input_meta)
else:
self._copy_file(source_test_collection_path,
target_collection_path,
train_set_name_suffix)
self._merge_files(target_train_file_path,
source_train_pathes,
train_set_name_suffix,
target_collection_params)
############## Clean up after benchmarking ##############
super(MergeProcess, self).post_benchmarking()
[docs] def _merge_arff_files(self, target_arff_file_path, merge_arff_file_pathes,
target_collection_name):
""" Copy the instances from the merge arff files to the target arff file"""
source_train_collection_name = merge_arff_file_pathes[0].split(os.sep)[-4]
self._copy_arff_file(merge_arff_file_pathes[0], target_arff_file_path,
source_train_collection_name, target_collection_name)
# Open target file for appending new instances
target_file = open(target_arff_file_path, 'a')
# Append all instances contained in the extension arff files to the
# target arff file
for merge_arff_file_path in merge_arff_file_pathes[1:]:
merge_arff_file = open(merge_arff_file_path, 'r')
target_file.writelines(line for line in merge_arff_file.readlines()
if not line.startswith('@'))
merge_arff_file.close()
target_file.close()
[docs] def _copy_arff_file(self, input_arff_file_path, target_arff_file_path,
input_collection_name, target_collection_name):
""" Copy the arff files and adjust the relation name in the arff file"""
file = open(input_arff_file_path, 'r')
content = file.readlines()
file.close()
content[0] = content[0].replace(input_collection_name,
target_collection_name)
file = open(target_arff_file_path, 'w')
file.writelines(content)
file.close()
[docs] def _merge_files(self, target_collection_path, source_collection_pathes,
train_set_name_suffix, target_collection_params):
""" Merge all collections in source_collection_pathes and store them \
in the target collection
**Parameters**
:target_collection_path:
Path of the dataset, in which the data of all other datasets
is assembled.
:source_collection_pathes:
Paths of the datasets to be merged.
:train_set_name_suffix:
Either 'train' or 'test'. Specifies if datasets are merged for
training or testing.
:target_collection_params:
Dictionary with all the parameters of the target dataset.
"""
# load a first collection, in which the data of all other collections
# is assembled
target_collection = BaseDataset.load(source_collection_pathes[0])
author = get_author()
date = time.strftime("%Y%m%d_%H_%M_%S")
# Delete node_chain file name
try:
target_collection.meta_data.pop("node_chain_file_name")
except:
pass
# Update meta data and store it
k = "test" if self.reverse else "train"
target_collection_params["__INPUT_DATASET__"][k] = \
[s_c_p.split(os.sep)[-2] for s_c_p in source_collection_pathes]
target_collection_params["__RESULT_DIRECTORY__"] = self.result_directory
target_collection.meta_data.update({
"author" : author,
"date" : date,
"dataset_directory" : target_collection_path,
"train_test" : True,
"parameter_setting" : target_collection_params,
"input_collection_name" : source_collection_pathes[0][len(
pySPACE.configuration.storage):]
})
# merge data of all other collections to target collection
for source_collection_path in source_collection_pathes[1:]:
source_collection = BaseDataset.load(source_collection_path)
for run in source_collection.get_run_numbers():
for split in source_collection.get_split_numbers():
target_data = target_collection.get_data(run, split,
train_set_name_suffix)
if self.set_flag:
for ts, l in target_data:
if ts.specs == None:
ts.specs = {"new_set": False}
elif ts.specs.has_key("new_set"):
break
else:
ts.specs["new_set"]= False
data = source_collection.get_data(run, split,
train_set_name_suffix)
if self.set_flag:
for i, (ts, l) in enumerate(data):
# flag first element of the concatenated data list
if ts.specs == None:
ts.specs = {"new_set": i==0}
else:
ts.specs["new_set"] = (i==0)
# actual data is stored in a list that has to be extended
target_data.extend(data)
# if only test data was given, the "Rest_vs" collection is stored as
# training data
if not self.reverse and "test" == train_set_name_suffix:
# exchange the "test" in key tuple to "train" before storing
for key in target_collection.data.keys():
assert("test" == key[2])
value = target_collection.data.pop(key)
key = (key[0],key[1],"train")
target_collection.data[key] = value
# we store the data in the same format as before
target_collection.store(target_collection_path,
target_collection.meta_data["storage_format"])
[docs] def _copy_file(self, source_collection_path, target_collection_path,
train_set_name_suffix):
""" Copy a dataset to a new destination
**Parameters**
:source_collection_path:
The path to the dataset that has to be copied.
:target_collection_path:
The path to where the dataset should be copied.
:train_set_name_suffix:
Either 'train' or 'test'. Specifies if the target dataset is
handeled as training or testing data.
"""
source_collection = BaseDataset.load(source_collection_path)
# if only test data was given, the "Rest_vs" collection is stored as
# training data
if self.reverse and "test" == train_set_name_suffix:
# exchange the "test" in key tuple to "train" before storing
for key in source_collection.data.keys():
assert("test" == key[2])
value = source_collection.data.pop(key)
key = (key[0],key[1],"train")
source_collection.data[key] = value
# we store the data in the same format as before
source_collection.store(target_collection_path,
source_collection.meta_data["storage_format"])