Source code for pySPACE.missions.operations.mmlf

""" Execute MMLF experiments

This module contains an operation and a process for MMLF. One MMLF
process consists of a executing one run of a predefined number of
episodes of a MMLF world. An MMLF operation consists of one or several runs of
one or several MMLF worlds.

.. note:: The results of the runs are not stored in the standard operation
    result directories but in the MMLF ReadWrite Area. Only the outputs of the
    MMLF to stdout are stored in log files stored in the operation result
    directories.

.. todo:: documentation link to MMLF-framework

:Author: Jan-Hendrik Metzen

"""

import sys
import os
import time
import csv
import xml.etree.ElementTree as ElementTree
if sys.version_info[0] == 2 and sys.version_info[1] < 6:
    import processing
else:
    import multiprocessing as processing

import pySPACE
from pySPACE.missions.operations.base import Operation, Process


[docs]class MmlfOperation(Operation): """ Operation class for executing MMLF experiments. An MMLF operation consists of one or several runs of one or several MMLF worlds. """
[docs] def __init__(self, processes, operation_spec, result_directory, world_name, number_processes, create_process=None): super(MmlfOperation, self).__init__(processes, operation_spec, result_directory) self.world_name = world_name self.number_processes = number_processes self.create_process = create_process
@classmethod
[docs] def create(cls, operation_spec, result_directory, debug=False, input_paths=[]): """ A factory method that creates an MMLF operation based on the information given in the operation specification operation_spec """ assert(operation_spec["type"] == "mmlf") # The generic world configuration YAML file world_conf = """ worldPackage : %s environment: %s agent: %s monitor: policyLogFrequency : 100000 %s """ # Create directory for the experiment world_name = operation_spec['world_name'] world_path = "%s/config/%s" % (result_directory, world_name) if not os.path.exists(world_path): os.makedirs(world_path) # Compute all possible parameter combinations # Determine all parameter combinations that should be tested parameter_settings = cls._get_parameter_space(operation_spec) # If the operation spec defines parameters for a generalized domain: if "generalized_domain" in operation_spec: # We have to test each parameter setting in each instantiation of # the generalized domain. This can be achieved by computing the # crossproduct of parameter settings and domain settings augmented_parameter_settings = [] for parameter_setting in parameter_settings: for domain_parameter_setting in operation_spec["generalized_domain"]: instantiation = dict(parameter_setting) instantiation.update(domain_parameter_setting) augmented_parameter_settings.append(instantiation) parameter_settings = augmented_parameter_settings # Create and remember all worlds for the given parameter_settings world_pathes = [] # for all parameter setting for parameter_setting in parameter_settings: # Add 4 blanks to all lines in templates environment_template = operation_spec['environment_template'] environment_template = \ "\n".join(" " + line for line in environment_template.split("\n")) agent_template = operation_spec['agent_template'] agent_template = \ "\n".join(" " + line for line in agent_template.split("\n")) monitor_conf = \ operation_spec['monitor_conf'] if 'monitor_conf' in operation_spec else "" monitor_conf = \ "\n".join(" " + line for line in monitor_conf.split("\n")) # Instantiate the templates environment_conf = environment_template agent_conf = agent_template for parameter, value in parameter_setting.iteritems(): environment_conf = environment_conf.replace(parameter, str(value)) agent_conf = agent_conf.replace(parameter, str(value)) def get_parameter_str(parameter_name): return "".join(subpart[:4] for subpart in parameter_name.split("_")) configuration_str = "{" + "}{".join(["%s:%s" % (get_parameter_str(parameter), str(value)[:6]) for parameter, value in parameter_setting.iteritems()]) + "}" configuration_str = configuration_str.replace('_','') world_file_name = "world_%s.yaml" % configuration_str open(os.path.join(world_path, world_file_name), 'w').write(world_conf % (world_name, environment_conf, agent_conf, monitor_conf)) world_pathes.append(os.path.join(world_path, world_file_name)) number_processes = len(world_pathes)*int(operation_spec["runs"]) if debug is True: # To better debug creation of processes we don't limit the queue # and create all processes before executing them processes = processing.Queue() cls._createProcesses(processes, world_pathes, operation_spec, result_directory) return cls( processes, operation_spec, result_directory, world_name, number_processes) else: # Create all processes by calling a recursive helper method # in another thread so that already created processes can be # executed although creation of processes is not finished yet. # Therefor a queue is used which size is limited to guarantee that # not to much objects are created (since this costs memory). # However, the actual number of 100 is arbitrary and might be # changed according to the system at hand. processes = processing.Queue(100) create_process = processing.Process(target = cls._createProcesses, args=( processes, world_pathes, operation_spec, result_directory)) create_process.start() # create and return the operation object return cls( processes, operation_spec, result_directory, world_name, number_processes, create_process)
@classmethod
[docs] def _createProcesses(cls, processes, world_pathes, operation_spec, result_directory): """Function that creates the mmlf process. """ # For each created world, perform operation_spec["runs"] runs for world_conf_path in world_pathes: for run_number in range(operation_spec["runs"]): process = MMLFProcess(mmlf_path=operation_spec["mmlf_path"], world_conf_path=world_conf_path, learning_episodes=operation_spec["learning_episodes"], test_episodes=operation_spec["test_episodes"], result_directory=result_directory) processes.put(process) # give executing process the sign that creation is now finished processes.put(False)
[docs] def consolidate(self): """ Consolidates the results of the MMLF operation.""" return # Consolidation into a csv-file currently not supported. # Consolidate the results in an csv table output_file = open(os.path.join(self.result_directory, 'results.csv'), 'w') results_writer = csv.writer(output_file) # Determine directory in which the operation's results are stored log_dir = os.path.join(self.result_directory, 'logs', self.world_name) # For all configurations tested keys = None for conf_dir in [os.path.join(log_dir, name) for name in os.listdir(log_dir) if os.path.isdir(os.path.join(log_dir, name))]: # Read agent configuration dict agent_conf = ElementTree.parse(os.path.join(conf_dir, 'agent.xml')) conf_dict = dict(agent_conf.getroot().find('configDict').items()) # Read environment configuration dict env_conf = ElementTree.parse(os.path.join(conf_dir, 'env.xml')) conf_dict.update(dict(env_conf.getroot().find('configDict').items())) # Evaluate the values of all dict entries for key, value in conf_dict.iteritems(): conf_dict[key] = eval(value) # Flatten (potentially) nested dict expanded_conf_dict = MmlfOperation._flatten_dict(conf_dict) if keys is None: keys = expanded_conf_dict.keys() # TODO: Currently hard coded keys.append('reward') keys.append('accumulated_reward') keys.append('offline_reward') keys.append('offline_accumulated_reward') results_writer.writerow(keys) # For all runs conducted with this setup for run_dir in [os.path.join(conf_dir, name) for name in os.listdir(conf_dir) if os.path.isdir(os.path.join(conf_dir, name))]: # Read in the accumulated reward per episode file reward_file = \ open(os.path.join(run_dir, 'environment_logs', 'reward'), 'r') rewards = [eval(row) for row in reward_file.readlines()] # Chop off overlong sequences rewards = rewards[:self.operation_spec["learning_episodes"]] expanded_conf_dict['reward'] = rewards expanded_conf_dict['accumulated_reward'] = sum(rewards) # Read in the accumulated reward per episode file of the # offline test offline_reward_file = \ open(os.path.join(run_dir, 'environment_logs', 'offlineReward'),'r') offline_rewards = [eval(row) for row in offline_reward_file.readlines()] # Chop off overlong sequences offline_rewards = offline_rewards[:self.operation_spec["test_episodes"]] expanded_conf_dict['offline_reward'] = offline_rewards expanded_conf_dict['offline_accumulated_reward'] = sum(offline_rewards) results_writer.writerow([expanded_conf_dict[key] for key in keys]) output_file.close()
@classmethod
[docs] def _flatten_dict(cls, input_dict): """ Flatten the (potentially) nested dict *input_dict* """ flattened_dict = dict() for key, value in input_dict.iteritems(): if type(value) == dict: value = cls._flatten_dict(value) for subkey, subvalue in value.iteritems(): flattened_dict[key + '_' + subkey] = subvalue else: flattened_dict[key] = value return flattened_dict
[docs]class MMLFProcess(Process): """ Process for executing an MMLF run One MMLF process consists of a executing one run of a predefined number of episodes of a MMLF world. The following **parameters** are mandatory: :mmlf_path: The path to the MMLF code :world_conf_path: The relative path of the world configuration file within the specified MMLF rw area :learning_episodes: The number of episodes the MMLF agent is allowed to train (i.e. optimize its policy) :test_episodes: The number of episodes the learned policy is tested. This can be set to 1 for deterministic environments. :result_directory: The directory where the stdout of the MMLF run is stored """
[docs] def __init__(self, mmlf_path, world_conf_path, learning_episodes, test_episodes, result_directory): super(MMLFProcess, self).__init__() self.result_directory = result_directory self.mmlf_path = mmlf_path self.world_conf_path = world_conf_path self.learning_episodes = learning_episodes self.test_episodes = test_episodes self.handler_class = None
[docs] def __call__(self): """ Executes this process on the respective modality """ # Restore configuration pySPACE.configuration = self.configuration ############## Prepare benchmarking ############## super(MMLFProcess, self).pre_benchmarking() # Redirect stdout to a log file process_log_file_name = "%s/%s_%s" % (self.result_directory, time.strftime("%Y%m%d_%H_%M_%S"), os.getpid()) log_file = open(process_log_file_name, 'w') sys.stdout = log_file sys.stderr = log_file # Insert path to MMLF to python search path and import it sys.path.insert(0, self.mmlf_path) mmlf = __import__("mmlf") # On distributed file systems, it might happen that our configuration # file is not yet available on the machine where this process is # executed while not os.path.exists(self.world_conf_path): mmlf.log.warn("Config file %s not yet available on local machine" % self.world_conf_path) time.sleep(1) mmlf.initializeRWArea(rwPath=self.result_directory) # Create and start world world = mmlf.loadWorldFromConfigFile(self.world_conf_path, useGUI=False) try: world.run(self.learning_episodes) except Exception, e: mmlf.log.error("An exception of type %s occurred: %s" % (e.__class__.__name__, e)) print __import__("traceback").print_exc() # Stop the world world.stop() # Restore stdout sys.stdout.flush() sys.stdout = sys.__stdout__ ############## Clean up after benchmarking ############## super(MMLFProcess, self).post_benchmarking()