Source code for pySPACE.missions.operations.concatenate

""" Concatenate datasets of :mod:`time series data<pySPACE.resources.dataset_defs.time_series>`

This operation requires test data with no splits.

The result of this operation concatenates the datasets of the input.
For instance, if the input consists of the three datasets "A",
"B", "C", the result will contain only one dataset "All".

.. note:: Each dataset can only be used once for concatenation!

Specification file Parameters
++++++++++++++++++++++++++++++

type
----

This parameter has to be set to **concatenate**.

(*obligatory, concatenate*)


name_pattern
------------

The name
of the result dataset can be specified within *name_pattern*.
The first time series object of every concatenated set will contain a 'new_set'
flag in the specs to allow later reconstruction of the individual sets.

(*optional, default:'"%(dataset_name)s"[:-1]+"_All"'*)


dataset_constraints
----------------------

Optionally, constraints can be passed to the operation that specify which
datasets are concatenated. For instance, the constraint
'"%(dataset_name1)s".strip("}{").split("}{")[1:] ==
"%(dataset_name2)s".strip("}{").split("}{")[1:]' would cause
that only datasets are combined, that were created by the same processing
with the same parametrization.

.. todo:: Document the definition of dataset1 and is dataset2!

change_time
-----------

If *change_time* is True, the appended time series objects get a new, artificial
start and end time, to ensure that the time is unique for further investigations.

(*optional, default: False*)

Exemplary Call
++++++++++++++

A typical operation specification file might look like this

.. code-block:: yaml

    type: concatenate
    name_pattern: '"%(dataset_name)s"[:-1]'
    change_time: False
    input_path: "operation_results/2009_8_13_15_8_57"
    dataset_constraints:
      # Combine only datasets that have been created using the same parameterization
      - '"%(dataset_name1)s".strip("}{").split("}{")[1:] == "%(dataset_name2)s".strip("}{").split("}{")[1:]'

Example dataset_constraints
++++++++++++++++++++++++++++++

    :Combine only datasets that have been created using the same parameterization:
        ``- '"%(dataset_name1)s".strip("}{").split("}{")[1:] == "%(dataset_name2)s".strip("}{").split("}{")[1:]'``

Application Examples
++++++++++++++++++++

Run123 versus Run45
-------------------

The following example concatenates Runs 1, 2 and 3 from within the same Session 
of the same Subject to a joint "Run123". The similar is done for "Run45".

.. code-block:: yaml

    type: concatenate
    
    input_path: "prewindowed/BRIO_Oddball_5subjects_0_1000ms_Preprocessed"
    change_time: False
    name_pattern: '"%(dataset_name)s"[:-1] + ("123" if "%(dataset_name)s"[-1:] in ["1","2","3"] else "45")'
    dataset_constraints:
    - '"%(dataset_name1)s".strip("}{").split("_")[0] == "%(dataset_name2)s".strip("}{").split("_")[0]'
    - '"%(dataset_name1)s".strip("}{").split("_")[1] == "%(dataset_name2)s".strip("}{").split("_")[1]'
    - '(("%(dataset_name1)s".strip("}{").split("_")[2] == "Run1") and ("%(dataset_name2)s".strip("}{").split("_")[2] == "Run2" or "%(dataset_name2)s".strip("}{").split("_")[2] == "Run3")) or ("%(dataset_name1)s".strip("}{").split("_")[2] == "Run4" and "%(dataset_name2)s".strip("}{").split("_")[2] == "Run5")'

In the following shuffle example, the Runs called "Run123" 
will be used for training, and the runs called "Run45" from the same subject 
and session will be used for test:

.. code-block:: yaml

    type: shuffle
    
    input_path: "prewindowed/BRIO_Oddball_5subjects_0_1000ms_Preprocessed_Run123_Run45"
    change_time: False
    dataset_constraints:
    - '"%(dataset_name1)s".strip("}{").split("_")[0] == "%(dataset_name2)s".strip("}{").split("_")[0]'
    - '"%(dataset_name1)s".strip("}{").split("_")[1] == "%(dataset_name2)s".strip("}{").split("_")[1]'
    - '"%(dataset_name1)s".strip("}{").split("_")[2] == "Run123"'
    - '"%(dataset_name2)s".strip("}{").split("_")[2] == "Run45"'

For the usage o the shuffle operation refer to :mod:`pySPACE.missions.operations.shuffle`.

.. note::
    Problems in connection with the
    :class:`~pySPACE.missions.nodes.source.time_series_source.TimeSeries2TimeSeriesSourceNode`
    can also occur as described in the
    :mod:`~pySPACE.missions.operations.merge` module.

:Author: Anett Seeland (anett.seeland@dfki.de)
:Input: :mod:`pySPACE.pySPACE.resources.dataset_defs.time_series`
"""

import os
import sys
import getpass
import glob
import time
if sys.version_info[0] == 2 and sys.version_info[1] < 6:
    import processing
else:
    import multiprocessing as processing

import pySPACE
from pySPACE.missions.operations.base import Operation, Process
from pySPACE.tools.filesystem import create_directory

from pySPACE.resources.dataset_defs.base import BaseDataset
    
[docs]class ConcatenateOperation(Operation): """ Concatenate operation for creating 'All' datasets """
[docs] def __init__(self, processes, operation_spec, result_directory, number_processes, create_process=None): super(ConcatenateOperation, self).__init__(processes, operation_spec, result_directory) self.number_processes = number_processes self.create_process = create_process
@classmethod
[docs] def create(cls, operation_spec, result_directory, debug=False, input_paths=[]): """ [Factory method] Create a ConcatenateOperation A factory method that creates a ConcatenateOperation based on the information given in the operation specification operation_spec """ assert(operation_spec["type"] == "concatenate") # Determine constraints for datasets that are combined and other # parameters dataset_constraints = [] if "dataset_constraints" in operation_spec: dataset_constraints.extend(operation_spec["dataset_constraints"]) if "name_pattern" in operation_spec: name_pattern = operation_spec["name_pattern"] else: name_pattern = None if "change_time" in operation_spec: change_time = operation_spec["change_time"] else: change_time = False # merging is not distributed over different processes number_processes = 1 processes = processing.Queue() # Create the Concatenate Process cls._createProcesses(processes, input_paths, result_directory, dataset_constraints, name_pattern, change_time) # create and return the Concatenate operation object return cls(processes, operation_spec, result_directory, number_processes)
@classmethod
[docs] def _createProcesses(cls, processes, input_datasets, result_directory, dataset_constraints, name_pattern, change_time): """[Factory method] Create the Concatenate process. """ # Create the Concatenate process and put it in the execution queue processes.put(ConcatenateProcess(input_datasets, result_directory, dataset_constraints, name_pattern, change_time)) # give executing process the sign that creation is now finished processes.put(False)
[docs] def consolidate(self): """ Consolidation of the operation's results """ # Just do nothing pass
[docs]class ConcatenateProcess(Process): """ Create 'All' datasets where 'All' are all datasets that fulfill the *dataset_constraints* """
[docs] def __init__(self, input_dataset, result_directory, dataset_constraints, name_pattern, change_time): super(ConcatenateProcess, self).__init__() self.input_datasets = input_dataset self.result_directory = result_directory self.dataset_constraints = dataset_constraints self.name_pattern = name_pattern self.change_time = change_time
[docs] def __call__(self): """ Executes this process on the respective modality """ ############## Prepare benchmarking ############## super(ConcatenateProcess, self).pre_benchmarking() # remember what has already been merged merged_dataset_pathes = [] # For all input datasets for source_dataset_path1 in self.input_datasets: if source_dataset_path1 in merged_dataset_pathes: continue # At the moment split data is not supported, so there should be only # a single test file is in the source directory source_files = glob.glob(os.sep.join([source_dataset_path1, "data_run0", "*test*"])) source_pathes = [] is_split = len(source_files) > 1 assert(not is_split),"Multiple test splits as in %s \ are not yet supported."%str(source_files) # At the moment train data is not supported, so check if train sets # are also present train_data_present = len(glob.glob(os.sep.join( [source_dataset_path1,"data_run0",\ "*train*"]))) > 0 assert(not train_data_present),"Using training data is not yet implemented." # We create the "All" dataset source_dataset_name1 = source_dataset_path1.split(os.sep)[-2] base_dataset_name = \ source_dataset_name1.strip("}{").split("}{")[0] if self.name_pattern != None: target_dataset_name = source_dataset_name1.replace( base_dataset_name, eval(self.name_pattern % \ {"dataset_name" : base_dataset_name})) else: target_dataset_name = source_dataset_name1.replace( base_dataset_name, base_dataset_name[:-1]+"_all") source_pathes.append(source_dataset_path1) target_dataset_path = os.sep.join([self.result_directory, target_dataset_name]) for source_dataset_path2 in self.input_datasets: source_dataset_name2 = source_dataset_path2.split(os.sep)[-2] # Do not use data we have already in the source_path list if (source_dataset_path2 == source_dataset_path1) or (source_dataset_path2 in merged_dataset_pathes): continue # Check that all constraints are fulfilled for this pair of # input datasets if not all(eval(constraint_template % \ {'dataset_name1': source_dataset_name1, 'dataset_name2': source_dataset_name2}) for constraint_template in self.dataset_constraints): continue source_pathes.append(source_dataset_path2) merged_dataset_pathes.append(source_dataset_path1) merged_dataset_pathes.append(source_dataset_path2) create_directory(os.sep.join([target_dataset_path, "data_run0"])) self._merge_pickle_files(target_dataset_path, source_pathes) ############## Clean up after benchmarking ############## super(ConcatenateProcess, self).post_benchmarking()
[docs] def _merge_pickle_files(self, target_dataset_path, source_dataset_pathes): """ Concatenate all datasets in source_dataset_pathes and store them in the target dataset""" # sort the dataset source_dataset_pathes.sort() # load a first dataset, in which the data of all other datasets is assembled target_dataset = BaseDataset.load(source_dataset_pathes[0]) # Determine author and date try: author = getpass.getuser() except : author = "Unknown" date = time.strftime("%Y%m%d_%H_%M_%S") # Delete node_chain file name try: target_dataset.meta_data.pop("node_chain_file_name") except: pass # Update meta data and store it params = target_dataset.meta_data.pop("parameter_setting") params["__INPUT_DATASET__"] = \ [s_c_p.split(os.sep)[-2] for s_c_p in source_dataset_pathes] params["__RESULT_DIRECTORY__"] = self.result_directory target_dataset.meta_data.update({"author" : author, "date" : date, "dataset_directory" : target_dataset_path, "train_test" : False, "parameter_setting" : params, "changed_time" : self.change_time, "input_dataset_name" : source_dataset_pathes[0][len( pySPACE.configuration.storage):] }) # Concatenate data of all other datasets to target dataset for source_dataset_path in source_dataset_pathes[1:]: source_dataset = BaseDataset.load(source_dataset_path) for run in source_dataset.get_run_numbers(): for split in source_dataset.get_split_numbers(): target_data = target_dataset.get_data(run, split, "test") if self.change_time: # ensure sorted target_data # TODO: encode this in meta data? target_data.sort(key=lambda t: t[0].end_time) last_end_time = target_data[-1][0].end_time for ts, l in target_data: if ts.specs == None: ts.specs = {"new_set": False} elif ts.specs.has_key("new_set"): break else: ts.specs["new_set"]= False data = source_dataset.get_data(run, split, "test") if self.change_time: # ensure sorted target_data # TODO: encode this in meta data? data.sort(key=lambda t: t[0].end_time) # flag the first element of the concatenated data list for i, (ts, l) in enumerate(data): if ts.specs == None: ts.specs = {"new_set": i==0} else: ts.specs["new_set"] = (i==0) if self.change_time: ts.start_time = last_end_time + ts.start_time ts.end_time = last_end_time + ts.end_time # actual data is stored in a list that has to be extended target_data.extend(data) target_dataset.store(target_dataset_path)