ll_backend

Module: environments.backends.ll_backend

Execute operations on a cluster with the LoadLeveler scheduler

Inheritance diagram for pySPACE.environments.backends.ll_backend:

Inheritance diagram of pySPACE.environments.backends.ll_backend

Class Summary

LoadLevelerBackend() Commits every process to LoadLeveler cluster, which resumes parallel execution
LoadLevelerComHandler(sock, executing_queue, ...) Server socket thread for accepting connections and reacting on it
SubflowStarter(task_queue, sock, loadl_temp, ...) Submit subflows to LoadLeveler

Classes

LoadLevelerBackend

class pySPACE.environments.backends.ll_backend.LoadLevelerBackend[source]

Bases: pySPACE.environments.backends.base.Backend

Commits every process to LoadLeveler cluster, which resumes parallel execution

Each process corresponds to one combination of input data set and parameter choice. The process objects are first pickled. The path to the pickled object together with a helper script is then submitted to LoadLeveler. There the object is unpickled, called and the backend is informed when the results are stored.

Communication between the independent processes and the backend is done via TCP socket connection (see LoadLevelerComHandler for detailed information).

Author:Anett Seeland (anett.seeland@dfki.de)
Created:2011/06/08
LastChange:2012/09/06 Add communication to SubflowHandler

Class Components Summary

LL_COMMAND_FILE_TEMPLATE
__abstractmethods__
_abc_cache
_abc_negative_cache
_abc_negative_cache_version
_abc_registry
check_status() Return a description of the current state of the operations execution
cleanup() Remove the current operation and all potential results that have been stored in this object
consolidate() Consolidate the single processes’ results into a consistent result of the whole operation
execute([timeout]) Execute all processes specified in the currently staged operation
retrieve([timeout]) Wait for all results of the operation
stage_in(operation) Stage the given operation.
LL_COMMAND_FILE_TEMPLATE = '\n# @ job_type = serial\n# @ notification = never\n# @ class = {job_class}\n# @ resources = ConsumableMemory({memory}) ConsumableCPUs({CPUs})\n# @ requirements = {requirements}\n# @ executable = {executable}\n# @ arguments = {arguments}\n# @ output = %(op_result_dir)s/log/pySPACE_$(jobid).out\n# @ error = %(op_result_dir)s/log/pySPACE_$(jobid).err\n# @ queue'
__init__()[source]
stage_in(operation)[source]

Stage the given operation.

Parameters:operation (Operation) – The operation to stage.
execute(timeout=1000000.0)[source]

Execute all processes specified in the currently staged operation

check_status()[source]

Return a description of the current state of the operations execution

retrieve(timeout=1000000.0)[source]

Wait for all results of the operation

This call blocks until all processes are finished or the given timeout is reached. If the timeout is zero, the timeout is disabled.

Parameters:timeout (int) – The time to wait until a job is considered as “finished” and will be stopped.
consolidate()[source]

Consolidate the single processes’ results into a consistent result of the whole operation

cleanup()[source]

Remove the current operation and all potential results that have been stored in this object

__abstractmethods__ = frozenset([])
_abc_cache = <_weakrefset.WeakSet object>
_abc_negative_cache = <_weakrefset.WeakSet object>
_abc_negative_cache_version = 33
_abc_registry = <_weakrefset.WeakSet object>

LoadLevelerComHandler

class pySPACE.environments.backends.ll_backend.LoadLevelerComHandler(sock, executing_queue, progress_bar, loadl_temp, log_func, operation_dir=None)[source]

Bases: threading.Thread

Server socket thread for accepting connections and reacting on it

A helper class for LoadLevelerBackend, which releases finished processes from the execution queue making new submits to LoadLeveler possible. In addition the Backends progress bar is updated.

It is also possible to communicate with this thread via the SubflowHandler or via terminal (port and ip can be checked in ‘ll_call.cmd’). The benefit of communication is to find out, which processes are running and to find out the details on the current state of the processing.

Parameters

sock:The server socket which listens to messages
executing_queue:
 A queue where one element has to be removed if ‘finished’ is sent.
progress_bar:A progress bar object with is updated, if ‘finished’ is sent.
loadl_temp:A template for a cmd-file that is used to submit subflow-jobs.
Author:Anett Seeland (anett.seeland@dfki.de)
Created:2011/06/08
LastChange:2012/09/06 added communication with SubflowHandler

Class Components Summary

JOB_END_TOKEN
MESSAGES(\*keys, \*\*kwargs)
close_sock(conn) Close the given connection and remove it from lists of potentially readers/writers
control_subflow_submission(path, runs, ...) Give SubflowStarter only a limited amount of tasks
parse_message(conn, subflow_starter) Parse incoming message and react
run() Accept, read and write on connections until all processes are finished
send_message(connection, host, port, ...) Sends a message with the given type and optional args via the given connection.
JOB_END_TOKEN = '!END!'
class MESSAGES(*keys, **kwargs)[source]

Bases: enum.Enum

NAME = 0
SUBFLOW_BATCHSIZE = 1
SUBMITTED = 2
FINISHED = 3
CREATION_FINISHED = 4
EXECUTE_SUBFLOWS = 5
IS_READY = 6
GET = 7
SET = 8
classmethod LoadLevelerComHandler.send_message(connection, host, port, message_type, *args)[source]

Sends a message with the given type and optional args via the given connection. If the connection is established, it is used. Otherwise a new connection is created and will be returned.

Parameters:
  • connection (socket.socket) – The connection to use for sending.
  • host (basestring) – The host to send the message to if the connection is not established
  • port (int) – The port to use for establishing a new connection
  • message_type – The type of the message
  • args – Optional arguments
Returns:

The connection that has been used to send the message

Return type:

socket.socket

LoadLevelerComHandler.__init__(sock, executing_queue, progress_bar, loadl_temp, log_func, operation_dir=None)[source]
LoadLevelerComHandler.run()[source]

Accept, read and write on connections until all processes are finished

LoadLevelerComHandler.close_sock(conn)[source]

Close the given connection and remove it from lists of potentially readers/writers

Parameters:conn (socket.socket) – The connection to close
LoadLevelerComHandler.parse_message(conn, subflow_starter)[source]

Parse incoming message and react

Parameters:
  • conn (socket.socket) – The socket to parse the message from
  • subflow_starter (SubflowStarter) – A callable to start possible subflows with

The following string messages can be send:

name:Send name of the Backend back, e.g. ‘loadl’ or ‘local’.
subflow_batchsize;*batchsize*:
 batch_size determines how many subflows are executed in one serial LoadLeveler job.
submitted;*process_nr*;*loadl_id*:
 Informs the listener that the pickled process process_*process_nr*.pickle has been submitted to LoadLeveler. In addition the LoadLeveler job id (without step id) is stored to be available for debugging purposes.
creation_finished:
 Informs the listener that no further process will be created anymore. Based on that the listener has only to process further messages until the number of running processes equals zero.
finished:Informs the listener that a process has been successfully executed. Hence, the number of running processes is decremented (the Backend is able to submit a new job) and the progress bar is updated.
finished;*flow_id*:
 Informs the listener that a subflow with unique identifier flow_id has stored its results.
is_ready;*nr_subflows*;*subflow_ids*:
 Asks the listener which of the nr_subflows subflows (identified by their subflow_id) have already finished executing. subflow_ids must be a string representation of a set. The listener sends the set of finished ids back.
execute_subflows;*path*;*nr_subflows*;*subflow_ids*;*runs*:
 Asks the listener to execute nr_subflows subflows. path is the absolute path where the subflows (named by their unique subflow id) are stored on disk, e.g. the temp_dir of a node. Hence, subflow_ids is a list of the filenames in path (without file extension) that have been stored before in pickle-format. runs is a list containing the run numbers the flow should be executed with: the run_number determines the random seed, e.g., for a splitter node. In addition it is assumed, that the data the subflows need for execution is also present in path (as ‘subflow_data.pickle’). Since the reaction of this request may take quite some time, it is swapped to another Thread.
get;*attribute_name*:
 Getter methods for debugging purposes. Sends the value of attribute_name back.
set;*attribute_name*;*value*:
 Setter methods for debugging purposes. Equals to ‘self.attribute_name = value’. Note that value must be evaluable.

Note

It is important that you don’t forget the end_flog at each message!

LoadLevelerComHandler.control_subflow_submission(path, runs, subflow_ids)[source]

Give SubflowStarter only a limited amount of tasks

The MAX_RUNNING variable specifies how many loadleveler subflow jobs should maximal run in parallel. With respect to this number the SubflowStarter gets only tasks if there are available slots.

SubflowStarter

class pySPACE.environments.backends.ll_backend.SubflowStarter(task_queue, sock, loadl_temp, log_func, operation_dir=None)[source]

Bases: threading.Thread

Submit subflows to LoadLeveler

A helper class for LoadLevelerComHandler, which submits subflows as independent jobs to LoadLeveler. The reason behind it is a better (more fair) scheduling of jobs when several users run the software on a cluster due to the fact that each single job has a short computation time.

Since LoadLeveler does not know that Processes have nothing to do than wait when their subflows have to be executed, another job class is used for the subflow that preempt the Processes. The name of this class ends with ‘_child’.

Parameters

task_queue:This queue is filled with jobs from incoming requests by the LoadLevelerComHandler. Each job is specified by a tuple: (path,*runs*,*subflow_ids*).
sock:The listener socket to be able to get information about it. Needed to fill out the LoadLeveler command file.
loadl_temp:LoadLeveler command file template that has been partly filled out by LoadLevelerBackend but needs to be further adjusted.

Class Components Summary

run() Portion and execute subflow tasks until subflow_ids equals ‘False’
submit(path, runs, subflow_ids) Submit one subflow job to LoadLeveler
__init__(task_queue, sock, loadl_temp, log_func, operation_dir=None)[source]
submit(path, runs, subflow_ids)[source]

Submit one subflow job to LoadLeveler

run()[source]

Portion and execute subflow tasks until subflow_ids equals ‘False’