Source code for pySPACE.environments.backends.ll_backend

""" Execute operations on a cluster with the LoadLeveler scheduler """

import glob
import logging
import logging.handlers
import multiprocessing
import os
import select
import socket
import subprocess as sub
import sys
import threading
import time
from collections import defaultdict
from functools import partial

from enum import Enum

import pySPACE
from pySPACE.environments.backends.base import Backend
from pySPACE.missions.operations.base import Operation
from pySPACE.tools.progressbar import ProgressBar, Percentage, ETA, Bar
from pySPACE.tools.socket_utils import inform

try:
    # noinspection PyPep8Naming
    import cPickle as pickle
except ImportError:
    import pickle


[docs]class LoadLevelerBackend(Backend): """ Commits every process to LoadLeveler cluster, which resumes parallel execution Each process corresponds to one combination of input data set and parameter choice. The process objects are first pickled. The path to the pickled object together with a helper script is then submitted to LoadLeveler. There the object is unpickled, called and the backend is informed when the results are stored. Communication between the independent processes and the backend is done via TCP socket connection (see :class:`~pySPACE.environments.backends.ll_backend.LoadLevelerComHandler` for detailed information). :Author: Anett Seeland (anett.seeland@dfki.de) :Created: 2011/06/08 :LastChange: 2012/09/06 Add communication to SubflowHandler """ LL_COMMAND_FILE_TEMPLATE = """ # @ job_type = serial # @ notification = never # @ class = {job_class} # @ resources = ConsumableMemory({memory}) ConsumableCPUs({CPUs}) # @ requirements = {requirements} # @ executable = {executable} # @ arguments = {arguments} # @ output = %(op_result_dir)s/log/pySPACE_$(jobid).out # @ error = %(op_result_dir)s/log/pySPACE_$(jobid).err # @ queue"""
[docs] def __init__(self): super(LoadLevelerBackend, self).__init__() self.state = "idling" # create command file template for Loadleveler if "job_class" not in pySPACE.configuration or not pySPACE.configuration["job_class"]: pySPACE.configuration["job_class"] = "general" if "consumable_memory" not in pySPACE.configuration or not pySPACE.configuration["consumable_memory"]: pySPACE.configuration["consumable_memory"] = "3250mb" if "consumable_cpus" not in pySPACE.configuration or not pySPACE.configuration["consumable_cpus"]: pySPACE.configuration["consumable_cpus"] = 1 if "anode" not in pySPACE.configuration: pySPACE.configuration["anode"] = "" assert (pySPACE.configuration["job_class"] in ['critical', 'critical_forking', 'general', 'general_forking', 'longterm', 'longterm_forking', 'test']),\ "LL_Backend:: Job class not existing! Check your pySPACE config file!" self.template_file = LoadLevelerBackend.LL_COMMAND_FILE_TEMPLATE.format( executable=sys.executable, arguments=" ".join([os.path.join(pySPACE.configuration.root_dir, "environments", "backends", "ll_runner.py"), "%(process_file_path)s", self.SERVER_IP, "%(server_port)d"]), job_class=pySPACE.configuration["job_class"], memory=pySPACE.configuration["consumable_memory"], CPUs=pySPACE.configuration["consumable_cpus"], requirements=pySPACE.configuration["anode"]) self._log("Using '%s' as template", logging.DEBUG) # queue for execution self.result_handlers = None self.progress_bar = None self.process_dir = "" self._log("Created LoadLeveler Backend.")
[docs] def stage_in(self, operation): """ Stage the given operation. :param operation: The operation to stage. :type operation: Operation """ super(LoadLevelerBackend, self).stage_in(operation) # set up queue self.result_handlers = multiprocessing.Queue(200) # Set up progress bar widgets = ['Operation progress: ', Percentage(), ' ', Bar(), ' ', ETA()] self.progress_bar = ProgressBar(widgets=widgets, maxval=self.current_operation.number_processes) self.progress_bar.start() self._log("Operation - staged") self.state = "staged"
[docs] def execute(self, timeout=1e6): """ Execute all processes specified in the currently staged operation """ assert (self.state == "staged") self._log("Operation - executing") self.state = "executing" # The handler that is used remotely for logging handler_class = logging.handlers.SocketHandler handler_args = {"host": self.host, "port": self.port} # the communication properties to talk to LoadLevelerComHandler backend_com = (self.SERVER_IP, self.SERVER_PORT) self._log('--> Loadleveler Communication : \n\t\t host:%s, port:%s' % (self.SERVER_IP, self.SERVER_PORT)) # Prepare the directory where processes are stored before submitted # to LoadLeveler self.process_dir = os.sep.join([self.current_operation.result_directory, ".processes"]) if not os.path.exists(self.process_dir): os.mkdir(self.process_dir) process_counter = 0 # create and start server socket thread self.listener = LoadLevelerComHandler(self.sock, self.result_handlers, self.progress_bar, self.template_file, log_func=self._log, operation_dir=self.current_operation.result_directory) self.listener.start() # create a client socket to talk to server socket thread send_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) send_socket.connect((self.SERVER_IP, self.SERVER_PORT)) try: # Until not all Processes have been created, prepare all processes # from the queue for remote execution and execute them get_process = partial(self.current_operation.processes.get, timeout=timeout) for process in iter(get_process, False): process.prepare(pySPACE.configuration, handler_class, handler_args, backend_com) # since preparing the process might be quite faster than executing # it we need another queue where processes get out when they have # finished execution self.result_handlers.put(process) # pickle the process object proc_file_name = os.sep.join([self.process_dir, "process_%d.pickle" % process_counter]) with open(proc_file_name, "wb") as proc_file: pickle.dump(process, proc_file, pickle.HIGHEST_PROTOCOL) # fill out LoadLeveler template llfile = self.template_file % { "process_file_path": proc_file_name, "server_port": self.SERVER_PORT, "op_result_dir": self.current_operation.result_directory} llfilepath = os.path.join(self.current_operation.result_directory, "ll_call.cmd") with open(llfilepath, 'w') as f: f.write(llfile) # submit to LoadLeveler error_counter = 0 while True: outlog, errlog = sub.Popen(["llsubmit", llfilepath], stdout=sub.PIPE, stderr=sub.PIPE).communicate() if errlog == "": break elif error_counter < 100: self._log("Warning: Job submission to LoadLeveler failed" " with %s. Job will be resubmitted." % errlog, logging.WARNING) time.sleep(1) error_counter += 1 else: self._log("Warning: Job submission to LoadLeveler failed %d times" " with %s. skipping job" % (error_counter, errlog), logging.WARNING) break # parse job_id for monitoring loadl_id = outlog.split("\"")[1].split(".")[-1] # inform listener that we successfully submitted the job # noinspection PyTypeChecker send_socket = LoadLevelerComHandler.send_message(send_socket, self.SERVER_IP, self.SERVER_PORT, LoadLevelerComHandler.MESSAGES.SUBMITTED, process_counter, loadl_id) # update process_counter process_counter += 1 # send message 'creation finished' to listener # noinspection PyTypeChecker send_socket = LoadLevelerComHandler.send_message(send_socket, self.SERVER_IP, self.SERVER_PORT, LoadLevelerComHandler.MESSAGES.CREATION_FINISHED) finally: self.listener.creation_finished = True send_socket.shutdown(socket.SHUT_RDWR) send_socket.close()
[docs] def check_status(self): """ Return a description of the current state of the operations execution .. todo:: do we really need this method??? """ # Returns the current state of the operation return self.state
[docs] def retrieve(self, timeout=1e6): """ Wait for all results of the operation This call blocks until all processes are finished or the given timeout is reached. If the timeout is zero, the timeout is disabled. :param timeout: The time to wait until a job is considered as "finished" and will be stopped. :type timeout: int """ assert (self.state == "executing") self._log("All processes submitted. Waiting for finishing.") # since self.current_operation.number_processes is not reliable (maybe # to high) we wait until the listener thread is terminated self.listener.finished.wait(timeout=timeout) self._log("Worker processes have exited gracefully") self.current_operation.processes.close() # if process creation has another thread if self.current_operation.create_process is not None: self.current_operation.create_process.join(timeout=timeout) self.result_handlers.close() # join also listener thread self.listener.join(timeout=timeout) # Change the state to finished self._log("Operation - retrieved") self.state = "retrieved" return True
[docs] def consolidate(self): """ Consolidate the single processes' results into a consistent result of the whole operation """ assert (self.state == "retrieved") self.current_operation.consolidate() self._log("Operation - consolidated") # collect all log file def _merge_files(file_list, delete=True): result_str = "" for filename in file_list: tmp_str = "" try: if os.path.getsize(filename) != 0: tmp_str += filename.split(os.sep)[-1] + "\n" + len(filename.split(os.sep)[-1]) * "-" + "\n" with open(filename, 'r') as f: tmp_str += f.read() tmp_str += 80 * "-" + "\n" if delete: os.remove(filename) except (IOError, OSError), e: self._log("Problems with file %s: %s." % (filename, e), logging.WARNING) result_str += tmp_str return result_str outlist = glob.glob(self.current_operation.result_directory + "/log/pySPACE*.out") out = _merge_files(outlist) errlist = glob.glob(self.current_operation.result_directory + "/log/pySPACE*.err") err = _merge_files(errlist) with open(self.current_operation.result_directory + "/pySPACE.out", 'w') as merged_out: merged_out.write(out) with open(self.current_operation.result_directory + "/pySPACE.err", 'w') as merged_err: merged_err.write(err) try: outlist = glob.glob(self.current_operation.result_directory + "/sub_log/pySPACE*.out") out = _merge_files(outlist) errlist = glob.glob(self.current_operation.result_directory + "/sub_log/pySPACE*.err") err = _merge_files(errlist) with open(self.current_operation.result_directory + "/pySPACE_sub.out", 'w') as merged_out: merged_out.write(out) with open(self.current_operation.result_directory + "/pySPACE_sub.err", 'w') as merged_err: merged_err.write(err) except IOError: pass self._log("Process Logging - consolidated") self.state = "consolidated"
[docs] def cleanup(self): """ Remove the current operation and all potential results that have been stored in this object """ self.state = "idling" try: # Remove .process dir try: if os.path.isdir(self.process_dir): os.rmdir(self.process_dir) except OSError, e: self._log("Deleting process folder failed with error: %s" % e, level=logging.CRITICAL) try: os.rmdir(self.current_operation.result_directory + "/log") self._log("Operation - logging folder cleaned up") except OSError, e: self._log("Deleting log folder failed with error %s. " "Maybe no processes started. " "Please check your operation file constraints." % e, level=logging.CRITICAL) try: os.rmdir(self.current_operation.result_directory + "/sub_log") self._log("Operation - subflow log folder cleaned up") except OSError: pass finally: # Remove the file logger for this operation logging.getLogger('').removeHandler(self.file_handler) # close listener socket self.sock.shutdown(socket.SHUT_RDWR) self.sock.close() self._log("Operation - logging closed") self.current_operation = None self._log("Operation - cleaned up")
[docs]class LoadLevelerComHandler(threading.Thread): """ Server socket thread for accepting connections and reacting on it A helper class for :class:`LoadLevelerBackend<pySPACE.environments.backends.ll_backend.LoadLevelerBackend>`, which releases finished processes from the execution queue making new submits to LoadLeveler possible. In addition the Backends progress bar is updated. It is also possible to communicate with this thread via the :class:`~pySPACE.environments.chains.node_chain.SubflowHandler` or via terminal (port and ip can be checked in 'll_call.cmd'). The benefit of communication is to find out, which processes are running and to find out the details on the current state of the processing. **Parameters** :sock: The server socket which listens to messages :executing_queue: A queue where one element has to be removed if 'finished' is sent. :progress_bar: A progress bar object with is updated, if 'finished' is sent. :loadl_temp: A template for a cmd-file that is used to submit subflow-jobs. :Author: Anett Seeland (anett.seeland@dfki.de) :Created: 2011/06/08 :LastChange: 2012/09/06 added communication with SubflowHandler """ # to label message end when communicating via socket connection JOB_END_TOKEN = "!END!"
[docs] class MESSAGES(Enum): NAME = 0 SUBFLOW_BATCHSIZE = 1 SUBMITTED = 2 FINISHED = 3 CREATION_FINISHED = 4 EXECUTE_SUBFLOWS = 5 IS_READY = 6 GET = 7 SET = 8
@classmethod
[docs] def send_message(cls, connection, host, port, message_type, *args): """ Sends a message with the given type and optional args via the given connection. If the connection is established, it is used. Otherwise a new connection is created and will be returned. :param connection: The connection to use for sending. :type connection: socket.socket :param host: The host to send the message to if the connection is not established :type host: basestring :param port: The port to use for establishing a new connection :type port: int :param message_type: The type of the message :param args: Optional arguments :return: The connection that has been used to send the message :rtype: socket.socket """ message = str(message_type.value) for arg in args: message += ";%s" % arg message += cls.JOB_END_TOKEN return inform(message, conn=connection, ip_port=(host, port))
[docs] def __init__(self, sock, executing_queue, progress_bar, loadl_temp, log_func, operation_dir=None): threading.Thread.__init__(self) self.sock = sock self.executing_queue = executing_queue self.progress_bar = progress_bar self.loadl_temp = loadl_temp # variables for monitoring self.process_loadl_mapping = defaultdict(list) self.num_running_processes = 0 self.subflow_ids_running = set() self.subflows_waiting = [] self.MAX_RUNNING = 400 self.num_finished_processes = 0 self.subflow_ids_finished = set() self.creation_finished = False self.finished = threading.Event() # initialize select concept (multiplexing of socket connections) self.sock.listen(socket.SOMAXCONN) # define potentially readers and writers self.readers = [self.sock] self.writers = [] # data structure to store all established connections and messages for # reading and writing: data[connection] = [message_read, message_to_write] self.data = {} # a task queue for subflows to be started; starting is swapped to # another thread to be faster in handling all incoming requests self.subflow_msg = multiprocessing.Queue() self.batch_size = 1 self.operation_dir = operation_dir self._log = log_func
[docs] def run(self): """ Accept, read and write on connections until all processes are finished """ # start thread for handling long messages, i.e. executing subflows subflow_starter = SubflowStarter(self.subflow_msg, self.sock, self.loadl_temp, log_func=self._log, operation_dir=self.operation_dir) subflow_starter.start() while not (self.creation_finished and self.num_running_processes == 0): # multiplexing on potentially requests (in self.readers/writers) readable, writable, others = select.select(self.readers, self.writers, [], 1.0) if self.sock in readable: conn, _ = self.sock.accept() self.readers.append(conn) self.data[conn] = ["", ""] readable.remove(self.sock) for reader in readable: try: tmp = reader.recv(4096) except socket.error, e: self._log('recv %s' % e, logging.WARNING) self.close_sock(reader) else: if tmp: self.data[reader][0] += tmp # Complete messages are processed if self.JOB_END_TOKEN in self.data[reader][0]: self.parse_message(reader, subflow_starter) # New data to send. Make sure client is in the # server's writer queue. if self.data[reader][1] != "" and reader not in self.writers: self.writers.append(reader) else: self.close_sock(reader) for writer in writable: try: # send data; tmp is #chars sent (may not be all in write buffer) tmp = writer.send(self.data[writer][1]) except socket.error, e: self._log('send: %s' % e, logging.WARNING) self.close_sock(writer) else: # Removed sent characters from write buffer self.data[writer][1] = self.data[writer][1][tmp:] # If write buffer is empty, remove socket from potentially writers if not self.data[writer][1]: self.writers.remove(writer) # send all_tasks_finished signal to thread self.subflow_msg.put((False, False, False)) # give thread some time to realize end of tasks time.sleep(1) self.subflow_msg.close() subflow_starter.join(timeout=1e6) # raise event so that backend knows we have finished self.finished.set()
[docs] def close_sock(self, conn): """ Close the given connection and remove it from lists of potentially readers/writers :param conn: The connection to close :type conn: socket.socket """ try: conn.shutdown(socket.SHUT_RDWR) except socket.error: pass conn.close() if conn in self.readers: self.readers.remove(conn) if conn in self.writers: self.writers.remove(conn) del self.data[conn]
[docs] def parse_message(self, conn, subflow_starter): """ Parse incoming message and react :param conn: The socket to parse the message from :type conn: socket.socket :param subflow_starter: A callable to start possible subflows with :type subflow_starter: SubflowStarter The following string messages can be send: :name: Send name of the Backend back, e.g. 'loadl' or 'local'. :subflow_batchsize;*batchsize*: *batch_size* determines how many subflows are executed in one serial LoadLeveler job. :submitted;*process_nr*;*loadl_id*: Informs the listener that the pickled process `process_*process_nr*.pickle` has been submitted to LoadLeveler. In addition the LoadLeveler job id (without step id) is stored to be available for debugging purposes. :creation_finished: Informs the listener that no further process will be created anymore. Based on that the listener has only to process further messages until the number of running processes equals zero. :finished: Informs the listener that a process has been successfully executed. Hence, the number of running processes is decremented (the Backend is able to submit a new job) and the progress bar is updated. :finished;*flow_id*: Informs the listener that a subflow with unique identifier *flow_id* has stored its results. :is_ready;*nr_subflows*;*subflow_ids*: Asks the listener which of the *nr_subflows* subflows (identified by their subflow_id) have already finished executing. *subflow_ids* must be a string representation of a set. The listener sends the set of finished ids back. :execute_subflows;*path*;*nr_subflows*;*subflow_ids*;*runs*: Asks the listener to execute *nr_subflows* subflows. *path* is the absolute path where the subflows (named by their unique subflow id) are stored on disk, e.g. the *temp_dir* of a node. Hence, *subflow_ids* is a list of the filenames in *path* (without file extension) that have been stored before in pickle-format. *runs* is a list containing the run numbers the flow should be executed with: the *run_number* determines the random seed, e.g., for a splitter node. In addition it is assumed, that the data the subflows need for execution is also present in *path* (as 'subflow_data.pickle'). Since the reaction of this request may take quite some time, it is swapped to another :class:`Thread<pySPACE.environments.backends.ll_backend.SubflowStarter>`. :get;*attribute_name*: Getter methods for debugging purposes. Sends the value of *attribute_name* back. :set;*attribute_name*;*value*: Setter methods for debugging purposes. Equals to 'self.attribute_name = value'. Note that *value* must be evaluable. .. note:: It is important that you don't forget the end_flog at each message! """ end_ind = self.data[conn][0].find(self.JOB_END_TOKEN) message = self.data[conn][0][:end_ind] message_type = message.split(";")[0] message_args = message.split(";")[1:] # noinspection PyBroadException try: message_type = self.MESSAGES(int(message_type)) if message_type == self.MESSAGES.NAME: self.data[conn][1] = 'loadl%s' % self.JOB_END_TOKEN elif message_type == self.MESSAGES.SUBFLOW_BATCHSIZE: batch_size = eval(message_args[-1]) subflow_starter.batch_size = batch_size self.batch_size = batch_size elif message_type == self.MESSAGES.SUBMITTED: key = eval(message_args[0]) self.process_loadl_mapping[key].append(message_args[1]) self.num_running_processes += 1 elif message_type == self.MESSAGES.FINISHED: # it is a Process that has finished if not message_args: self.num_finished_processes += 1 self.num_running_processes -= 1 self.executing_queue.get(timeout=1e100) self.progress_bar.update(self.num_finished_processes) # it is a subflow that has finished else: self.subflow_ids_running.remove(message_args[0]) self.subflow_ids_finished.add(message_args[0]) self.data[conn][1] = "OK%s" % self.JOB_END_TOKEN elif message_type == self.MESSAGES.CREATION_FINISHED: self.creation_finished = True elif message_type == self.MESSAGES.EXECUTE_SUBFLOWS: path, runs = message_args[0], message_args[-1] nr_subflows, subflow_ids = [eval(s) for s in message_args[1:-1]] assert (nr_subflows == len(subflow_ids)), "incorrect number of subflows" # check if we can submit new jobs self.control_subflow_submission(path, runs, subflow_ids) self.data[conn][1] = "OK%s" % self.JOB_END_TOKEN elif message_type == self.MESSAGES.IS_READY: nr_requested, requested_subflows = [eval(s) for s in message_args] assert (nr_requested == len(requested_subflows)), "incorrect number" \ " of subflows" # check which subflows have already finished and tell to client finished = requested_subflows & self.subflow_ids_finished # .. todo: maybe reduced self.subflow_ids_finished since they are # unique and will never be requested again self.data[conn][1] = str(finished) + self.JOB_END_TOKEN # to be able to communicate with the backend in case something went # wrong give the user the possibility to get and set attributes elif message_type == self.MESSAGES.GET: self._log("variable wanted", logging.INFO) name = message_args[0] self.data[conn][1] = str(getattr(self, name, None)) elif message_type == self.MESSAGES.SET: name, value = message_args[0], eval(message_args[1]) setattr(self, name, value) else: self._log("Got unknown message: %s" % message, logging.WARNING) self.data[conn][0] = self.data[conn][0][end_ind + len(self.JOB_END_TOKEN):] # try to submit waiting jobs if self.subflows_waiting != [] and ( self.MAX_RUNNING - (len(self.subflow_ids_running) / self.batch_size) > 0): path, runs, subflow_ids = self.subflows_waiting.pop() self.control_subflow_submission(path, runs, subflow_ids) except Exception, e: self._log("Exception while parsing message '%s' %s:" % (message, e), logging.ERROR)
[docs] def control_subflow_submission(self, path, runs, subflow_ids): """ Give SubflowStarter only a limited amount of tasks The MAX_RUNNING variable specifies how many loadleveler subflow jobs should maximal run in parallel. With respect to this number the SubflowStarter gets only tasks if there are available slots. """ free_slots = self.MAX_RUNNING - (len(self.subflow_ids_running) / \ self.batch_size) if free_slots < (len(subflow_ids) / self.batch_size): # cannot submit all jobs at once end_ind = free_slots * self.batch_size self.subflow_msg.put((path, runs, subflow_ids[:end_ind])) self.subflows_waiting.append((path, runs, subflow_ids[end_ind:])) self.subflow_ids_running.update(subflow_ids[:end_ind]) else: self.subflow_msg.put((path, runs, subflow_ids)) self.subflow_ids_running.update(subflow_ids)
[docs]class SubflowStarter(threading.Thread): """ Submit subflows to LoadLeveler A helper class for :class:`~pySPACE.environments.backends.ll_backend.LoadLevelerComHandler`, which submits subflows as independent jobs to LoadLeveler. The reason behind it is a better (more fair) scheduling of jobs when several users run the software on a cluster due to the fact that each single job has a short computation time. Since LoadLeveler does not know that Processes have nothing to do than wait when their subflows have to be executed, another job class is used for the subflow that preempt the Processes. The name of this class ends with '_child'. **Parameters** :task_queue: This queue is filled with jobs from incoming requests by the :class:`LoadLevelerComHandler<pySPACE.environments.backends.ll_backend.LoadLevelerComHandler>`. Each job is specified by a tuple: (*path*,*runs*,*subflow_ids*). :sock: The listener socket to be able to get information about it. Needed to fill out the LoadLeveler command file. :loadl_temp: LoadLeveler command file template that has been partly filled out by :class:`LoadLevelerBackend<pySPACE.environments.backends.ll_backend.LoadLevelerBackend>` but needs to be further adjusted. .. todo:: create parallel Loadleveler jobs to reduce request from processes; its better to execute all subflows of one process first than one subflow of every process .. todo:: actually the server socket is not needed here, only the server port """
[docs] def __init__(self, task_queue, sock, loadl_temp, log_func, operation_dir=None): threading.Thread.__init__(self) self.task_queue = task_queue self.sock = sock self.loadl_temp = loadl_temp self.first_subflow = True self.batch_size = 1 self.operation_dir = operation_dir self._log = log_func
[docs] def submit(self, path, runs, subflow_ids): """ Submit one subflow job to LoadLeveler """ # TODO: Add path, runs and subflow_ids to docstring if self.operation_dir is None: operation_dir = path else: operation_dir = self.operation_dir llfile = self.loadl_temp % { "process_file_path": path, "server_port": self.sock.getsockname()[1], "op_result_dir": operation_dir, "runs": '"' + str(runs) + '"', "subflow_ids": str(subflow_ids)} llfilepath = os.path.join(path, "ll_call.cmd") with open(llfilepath, 'w') as f: f.write(llfile) # submit to LoadLeveler error_counter = 0 while True: outlog, errlog = sub.Popen(["llsubmit", llfilepath], stdout=sub.PIPE, stderr=sub.PIPE).communicate() if errlog == "": break elif error_counter < 100: self._log("Warning: Job submission to LoadLeveler failed" " with %s. Job will be resubmitted." % errlog, logging.WARNING) time.sleep(1) error_counter += 1 else: self._log("Warning: Job submission to LoadLeveler failed %d times" " with %s. skipping job" % (error_counter, errlog), logging.WARNING) break
[docs] def run(self): """ Portion and execute subflow tasks until *subflow_ids* equals 'False' """ rest_ids = [] path, runs, subflow_ids = self.task_queue.get(timeout=1e10) while subflow_ids: if self.first_subflow: # need this modification only once # modify template for subflows: class, ll_runner, params template = self.loadl_temp.split("\n") self.loadl_temp = "" for line in template: if line.startswith('# @ class'): assert ("forking" in line), "When using subflow " \ "paralellization a job class with forking " \ "functionality has to be used!" line = line.replace("forking", "child") elif line.startswith('# @ arguments'): line = line.replace('ll_runner.py', 'll_subflow_runner.py ') line += ' %(runs)s "%(subflow_ids)s"' elif line.startswith('# @ output') or line.startswith('# @ error'): line = line.replace("/log/pySPACE_", "/sub_log/pySPACE_") self.loadl_temp += line + " \n" self.first_subflow = False # check whether nr of subflows is dividable by batch_size if len(subflow_ids) % self.batch_size: rest = len(subflow_ids) % self.batch_size rest_ids = subflow_ids[-1 * rest:] subflow_ids = subflow_ids[:len(subflow_ids) - rest] # iterate over loadlevler jobs for ind in range(0, len(subflow_ids), self.batch_size): self.submit(path, runs, subflow_ids[ind:ind + self.batch_size]) if rest_ids: # need to submit one more job self.submit(path, runs, rest_ids) path, runs, subflow_ids = self.task_queue.get(timeout=1e10)