Source code for pySPACE.environments.backends.multicore

""" Execute as many processes in parallel as there are (logical) CPUs on the local machine

This backend is based on the multiprocessing package and should work on every
multicore system without additional settings even on virtual machines.
"""

import os
import time
import multiprocessing
import logging
import logging.handlers
import threading
import socket
import select
import cPickle
import warnings
from functools import partial

import pySPACE
from pySPACE.environments.backends.base import Backend
from pySPACE.tools.progressbar import ProgressBar, Percentage, ETA, Bar


[docs]class MulticoreBackend(Backend): """ Execute as many processes in parallel as there are (logical) CPUs on the local machine This backend is based on the multiprocessing package and should work on every multicore system without additional settings even on virtual machines. Each process corresponds to one combination of input data set and parameter choice. :Author: Anett Seeland (anett.seeland@dfki.de) :LastChange: 2012/09/24 """
[docs] def __init__(self, pool_size=None): super(MulticoreBackend, self).__init__() # Set the number of processes in the pool # per default to the number of CPUs if pool_size is None: pool_size = MulticoreBackend.detect_CPUs() self.pool_size = pool_size self.state = "idling" # queue for execution self.result_handlers = [] self.pool = None self.current_process = 0 self._log("Created MulticoreBackend with pool size %s" % pool_size)
[docs] def reset_queue(self): """ Resets the execution queue""" self.result_handlers = []
[docs] def stage_in(self, operation): """ Stage the current operation """ super(MulticoreBackend, self).stage_in(operation) self.pool = multiprocessing.Pool(processes = self.pool_size) # Set up progress bar widgets = ['Operation progress: ', Percentage(), ' ', Bar(), ' ', ETA()] self.progress_bar = ProgressBar(widgets = widgets, maxval = self.current_operation.number_processes) self.progress_bar.start() self._log("Operation - staged") self.state = "staged"
[docs] def execute(self, timeout=1e6): """ Execute all processes specified in the currently staged operation """ assert(self.state == "staged") self._log("Operation - executing") self.state = "executing" # The handler that is used remotely for logging handler_class = logging.handlers.SocketHandler handler_args = {"host" : self.host, "port" : self.port} backend_com = (self.SERVER_IP, self.SERVER_PORT) # A socket communication thread to handle e.g. subflows self.listener = LocalComHandler(self.sock) self.listener.start() # Until not all Processes have been created prepare all processes # from the queue for remote execution and execute them get_process = partial(self.current_operation.processes.get, timeout=timeout) for process in iter(get_process, False): process.prepare(pySPACE.configuration, handler_class, handler_args, backend_com) # Execute all functions in the process pool but return immediately self.result_handlers.append( self.pool.apply_async(process, callback=self.dequeue_process))
[docs] def dequeue_process(self, result): """ Callback function for finished processes """ self.current_process += 1 self.progress_bar.update(self.current_process)
[docs] def check_status(self): """ Return a description of the current state of the operations execution .. todo:: do we really need this method??? """ # Returns which percentage of processes of the current operation # is already finished return float(self.current_process) / self.current_operation.number_processes
[docs] def retrieve(self, timeout=1e6): """ Wait for all results of the operation This call blocks until all processes are finished. """ assert(self.state == "executing") # Prevent any other processes from being submitted to the pool # (necessary for join) self.pool.close() self._log("Closing pool", level=logging.DEBUG) self._log("Operation - retrieved") self.current_operation.processes.close() # if process creation has another thread if hasattr(self.current_operation, "create_process") \ and self.current_operation.create_process != None: self.current_operation.create_process.join(timeout=timeout) # Close the result handler and wait for every process # to terminate try: for result in self.result_handlers: result.wait(timeout=timeout) except multiprocessing.TimeoutError: # A timeout occurred, terminate the pool self._log("Timeout occurred, terminating worker processes") self.pool.terminate() return False finally: self.pool.join() # Wait for worker processes to exit # inform listener that its time to die self.listener.operation_finished = True time.sleep(1) self.listener.join(timeout=timeout) # Change the state to finished self.state = "retrieved" self._log("Worker processes have exited gracefully") return True
[docs] def consolidate(self): """ Consolidate the single processes' results into a consistent result of the whole operation """ assert(self.state == "retrieved") try: self.current_operation.consolidate() except Exception: import traceback self._log(traceback.format_exc(), level = logging.ERROR) self._log("Operation - consolidated") self.state = "consolidated"
[docs] def cleanup(self): """ Remove the current operation and all potential results that have been stored in this object """ self.state = "idling" self._log("Operation - cleaned up") self._log("Idling...") # Remove the file logger for this operation logging.getLogger('').removeHandler(self.file_handler) # close listener socket self.sock.close() self.current_operation = None self.current_process = 0
@classmethod
[docs] def detect_CPUs(cls): """ Detects the number of CPUs on a system. Cribbed from pp. :from: http://codeliberates.blogspot.com/2008/05/detecting-cpuscores-in-python.html """ ncpus = None # Linux, Unix and MacOS: if hasattr(os, "sysconf"): if os.sysconf_names.has_key("SC_NPROCESSORS_ONLN"): # Linux & Unix: ncpus = os.sysconf("SC_NPROCESSORS_ONLN") if isinstance(ncpus, int) and ncpus > 0: return ncpus else: # OSX: return int(os.popen2("sysctl -n hw.ncpu")[1].read()) # Windows: if os.environ.has_key("NUMBER_OF_PROCESSORS"): ncpus = int(os.environ["NUMBER_OF_PROCESSORS"]); if ncpus > 0: return ncpus return 1 # Default
[docs]class LocalComHandler(threading.Thread): """ Server socket thread for accepting connections and reacting on it A helper class for :class:`~pySPACE.environments.backends.multicore.MulticoreBackend`, which handles incoming connections (e.g. from nodes that want to compute subflows). **Parameters** :sock: The socket object to which messages are send. """
[docs] def __init__(self, sock): threading.Thread.__init__(self) self.sock = sock self.subflow_pool = None self.results = {} # variables for monitoring self.subflow_ids_running = set() self.subflow_ids_finished = set() # flag from backend to stop run-method self.operation_finished = False # initialize select concept (multiplexing of socket connections) self.sock.listen(socket.SOMAXCONN) # define potentially readers and writers self.readers = [self.sock] self.writers = [] # data structure to store all established connections and messages for # reading and writing: data[connection] = [message_read, message_to_write] self.data = {} # end flag of messages self.end_token = "!END!"
[docs] def run(self): """ Accept, read and write on connections until the operation is finished """ while not (self.operation_finished): # multiplexing on potentially requests (in self.readers/writers) readable, writable, others = select.select(self.readers, self.writers, [], 1.0) if self.sock in readable: conn, _ = self.sock.accept() self.readers.append(conn) self.data[conn] = ["",""] readable.remove(self.sock) for reader in readable: try: tmp = reader.recv(4096) except socket.error,e: warnings.warn('recv '+ str(e)) self.close_sock(reader) else: if tmp: self.data[reader][0] += tmp # Complete messages are processed if self.end_token in self.data[reader][0]: self.parse_message(reader) # New data to send. Make sure client is in the # server's writer queue. if self.data[reader][1] != "" and reader not in self.writers: self.writers.append(reader) else: self.close_sock(reader) for writer in writable: try: # send data; tmp is #chars sent (may not be all in writbuf). tmp = writer.send(self.data[writer][1]) except socket.error,e: warnings.warn('send: '+ str(e)) self.close_sock(writer) else: # Removed sent characters from writbuf. self.data[writer][1] = self.data[writer][1][tmp:] # If writbuf is empty, remove socket from potentially writers if not self.data[writer][1]: self.writers.remove(writer) if not self.subflow_pool is None: self.subflow_pool.close() self.subflow_pool.join(timeout=1e6)
[docs] def close_sock(self, conn): """ Close connection and remove it from lists of potentially readers/writers """ conn.close() if conn in self.readers: self.readers.remove(conn) if conn in self.writers: self.writers.remove(conn) del self.data[conn]
[docs] def parse_message(self, conn): """ Parse incoming message and react The following string messages can be send: :name: Sends back the name of the backend, i.e. 'mcore'. :subflow_poolsize;*poolsize*: Create a multiprocessing Pool object with *poolsize* worker threads for executing subflows. :is_ready;*nr_subflows*;*subflow_ids*: Asks the listener which of the *nr_subflows* subflows (identified by their subflow_id) have already finished executing. *subflow_ids* must be a string representation of a set. The listener sends the set of finished ids back. :execute_subflows;*path*;*nr_subflows*;*subflow_obj*;*runs*: Asks the listener to execute *nr_subflows* subflows via a multiprocessing Pool. *path* is the absolute path where the training data is stored, e.g. the *temp_dir* of a node. *subflow_obj* are pickled strings of the subflows. *runs* is a list containing the run numbers the flow should be executed with: the *run_number* determines the random seed, e.g., for a splitter node. :send_results;*subflow_ids*: Sends back a list of results (PerformanceResultSummary) of *subflow_ids*. """ end_ind = self.data[conn][0].find(self.end_token) message = self.data[conn][0][:end_ind] if message == 'name': self.data[conn][1] = 'mcore' + self.end_token elif message.startswith('subflow_poolsize'): if self.subflow_pool == None: text = message.split(';') self.subflow_pool = multiprocessing.Pool(processes=int(text[1])) elif message.startswith('execute_subflows'): text = message.split(';') if len(text) > 5: # splitted within pickled object :-( subflow_str = eval(";".join(text[3:-1])) else: subflow_str = eval(text[3]) path, runs, nr_subflows = text[1], eval(text[-1]), eval(text[2]) subflows = [cPickle.loads(s) for s in subflow_str] subflow_ids = [s.id for s in subflows] assert(nr_subflows == len(subflows)), "incorrect number of subflows" # load training data and submit calculation to the pool training_data_path = os.path.join(path,'subflow_data.pickle') train_instances = cPickle.load(open(training_data_path, 'rb')) for subflow in subflows: self.subflow_pool.apply_async(func=subflow, kwds={"train_instances":train_instances, "runs": runs}, callback=self.subflow_finished) # minitor running jobs self.subflow_ids_running.update(subflow_ids) elif message.startswith('is_ready'): text = message.split(';') nr_requested, requested_subflows = [eval(s) for s in text[1:]] assert(nr_requested == len(requested_subflows)), "incorrect number"\ " of subflows" # check which subflows have already finished and tell to client finished = requested_subflows & self.subflow_ids_finished # .. todo: maybe reduced self.subflow_ids_finished since they are # unique and will never be requested again self.data[conn][1] = str(finished) + self.end_token elif message.startswith('send_results'): text = message.split(';') subflow_ids = eval(text[1]) requested_results = [cPickle.dumps(self.results[i], cPickle.HIGHEST_PROTOCOL) \ for i in subflow_ids] # delete requested results to free memory for key in subflow_ids: del self.results[key] self.data[conn][1] = str(requested_results) + self.end_token else: warnings.warn("Got unknown message: %s" % message) self.data[conn][0] = self.data[conn][0][end_ind+len(self.end_token):]
[docs] def subflow_finished(self, result): """ Callback method for pool execution of subflows """ # result is a tuple of flow_id and PerformanceResultSummary flow_id, result_collection = result self.results[flow_id]= result_collection self.subflow_ids_running.remove(flow_id) self.subflow_ids_finished.add(flow_id)