Source code for pySPACE.environments.backends.mpi_backend

""" Distribute the processes on a High Performance Cluster

Very simple Implementation.

.. todo:: Documentation

:Author: Yohannes Kassahun (kassahun@informatik.uni-bremen.de)
:Created: 2011/01/11
"""

import os
import sys
import logging
import logging.handlers
import cPickle
import shutil
import time
import subprocess


import pySPACE
from pySPACE.environments.backends.base import Backend
from pySPACE.tools.progressbar import ProgressBar, Percentage, ETA, Bar


[docs]class MpiBackend(Backend): """ A message passing interface (mpi) backend to pySPACE In order to use this backend, you need a working MPI distribution and mpi4py. You can download mpi4py from http://code.google.com/p/mpi4py/. mpi4py is compatible with a python 2.3 to 2.7 or 3.0 to 3.1 distribution. This backend assumes a global file system that is seen by all nodes running the processes. **Parameters** :pool_size: Define how many MPI processes should be started in parallel. This should not exceed the amount of available processors. (or the number of mpi slots defined in the hostsfile) (*recommended, default: 156*) """
[docs] def __init__(self, pool_size = 156): super(MpiBackend, self).__init__() #self.COMMAND_MPI = '/usr/lib64/openmpi/bin/mpirun' self.COMMAND_MPI = 'mpirun' self.COMMAND_PYTHON = sys.executable self.runner_script = os.sep.join([pySPACE.configuration.root_dir, "environments", "backends", "mpi_runner.py"]) # start as many processes as the total number of processors # available self.NumberOfProcessesToRunAtBeginning = pool_size self.NumberOfProcessesToRunLater = pool_size #39
[docs] def __del__(self): pass
[docs] def stage_in(self, operation): """ Stage the current operation """ super(MpiBackend, self).stage_in(operation) # init of process lists, because backend is only initialized once self.process_args_list = [] self.IndexCopyStart = 0 self.ProcessingSuccessful = True self.TotalProcessesFinished = 0 self.CrashedProcesses = [] # Set up progress bar widgets = ['Operation progress: ', Percentage(), ' ', Bar(), ' ', ETA()] self.progress_bar = ProgressBar(widgets = widgets, maxval = self.current_operation.number_processes) self.progress_bar.start() # The handler that is used remotely for logging handler_class = logging.handlers.SocketHandler handler_args = {"host" : self.host, "port" : self.port} # Set up stage in directory stagein_dir = os.sep.join([self.current_operation.result_directory, ".stagein"]) # Check if hosts file is created in the right directoy HostfileCreated = pySPACE.configuration.root_dir+ "/" +'hostsfile' if (not os.path.isfile(HostfileCreated)): print "***************************************************************************************************" print "hostsfile not created !" print "Please create the hosts file with a filename 'hostsfile' under ", pySPACE.configuration.root_dir print "***************************************************************************************************" raise UserWarning('Missing hostsfile.') if not os.path.exists(stagein_dir): os.mkdir(stagein_dir) process = self.current_operation.processes.get() print "Preparing processes. This might take a few minutes...." # Until not all Processes have been created prepare all processes # from the queue for remote execution and execute them i = 0 while process != False: process.prepare(pySPACE.configuration, handler_class, handler_args) # since preparing the process might be quite faster than executing # it we need another queue where processes get out when they have # finished execution #self.result_handlers.put(1) # Execute all functions in the process pool but return immediately #self.pool.apply_async(process, callback=self.dequeue_process) proc_file_name = os.sep.join([stagein_dir, "process_%d.pickle" % i]) proc_file = open(proc_file_name, "w") cPickle.dump(process, proc_file) proc_file.close() # Add task to job specification self.process_args_list.append(proc_file_name) # Get the next process process = self.current_operation.processes.get() i+=1 self._log("Operation - staged") self.state = "staged"
[docs] def execute(self, timeout=1e6): """ Executes all processes specified in the currently staged operation. """ assert(self.state == "staged")
[docs] def check_status(self): """ Returns a description of the current state of the operations execution. """ #self.progress_bar.update(float(self.current_job.info()["percentDone"])) #return float(self.current_job.info()["percentDone"]) / 100.0 #return float(self.current_process) / self.current_operation.number_processes return 1.0
[docs] def not_xor(self, a, b): return not((a or b) and not (a and b))
[docs] def retrieve(self, timeout=1e6): """ Returns the result of the operation. """ self.state = "executing" self._log("Operation - executing") if (self.NumberOfProcessesToRunAtBeginning > len(self.process_args_list)): args = ([self.COMMAND_MPI] + ['--loadbalance']+ ['--nolocal']+ ['--hostfile'] + [pySPACE.configuration.root_dir+ "/" +'hostsfile'] + ['-n', str(len(self.process_args_list))] + [self.COMMAND_PYTHON] + [self.runner_script] + self.process_args_list) # Start the processes. self._log("mpi-parameters: %s" % args, level=logging.DEBUG); self._log("mpi-parameters-joined: %s" % os.path.join(args), level=logging.DEBUG); p =subprocess.Popen(args) #self.pids.append(p) self.IndexCopyStart += self.NumberOfProcessesToRunAtBeginning #print args else: #copy the arguments of the processes to run sub_process_args_list = (self.process_args_list[self.IndexCopyStart: self.NumberOfProcessesToRunAtBeginning]) args = ([self.COMMAND_MPI] + ['--loadbalance']+ ['--nolocal']+ ['--hostfile'] + [pySPACE.configuration.root_dir+ "/" +'hostsfile'] + ['-n', str(len(sub_process_args_list))] + [self.COMMAND_PYTHON] + [self.runner_script] + sub_process_args_list) # Start the processes. p = subprocess.Popen(args) #self.pids.append(p) # TODO: call p.poll() for p in self.pids after all processes have exited self.IndexCopyStart += self.NumberOfProcessesToRunAtBeginning #print args # Create a list of boolean for processes which are finished. # First we assume that all processes are not started, so we set # every element of the list to false FinishedProcesses=[False for i in range(len(self.process_args_list))] # Wait until all processes finish and start new processes # when old ones finish print "Waiting for the processes to finish...." # Counter for the processes which are finished. It will be reset # after 'NumberOfProcessesToRunLater' processes are finished CounterProcessesFinished = 0 processes_Finished = False while not processes_Finished: try: processes_Finished = True for LoopCounter, process_args in enumerate(self.process_args_list): if (self.not_xor (os.path.isfile(process_args+"_Finished"), os.path.isfile(process_args+"_Crashed"))): processes_Finished = False else: if (FinishedProcesses[LoopCounter] == False): # Record that the process is finished FinishedProcesses[LoopCounter] = True # If the process is crashed take note of that if (os.path.isfile(process_args+"_Crashed")): self.CrashedProcesses.append(process_args) # Increment the counter for the number of processes finished # by one CounterProcessesFinished += 1 self.TotalProcessesFinished += 1 # update the progress bar self.progress_bar.update(self.TotalProcessesFinished) if (CounterProcessesFinished == self.NumberOfProcessesToRunLater): # Define a variable for a subset of processes to run sub_process_args_list = [] if (self.IndexCopyStart==len(self.process_args_list)): break elif ((self.IndexCopyStart+self.NumberOfProcessesToRunLater)< len(self.process_args_list)): sub_process_args_list = (self.process_args_list[self.IndexCopyStart: self.IndexCopyStart +self.NumberOfProcessesToRunLater]) else: sub_process_args_list = self.process_args_list[self.IndexCopyStart:len(self.process_args_list)] args = ([self.COMMAND_MPI] + ['--loadbalance']+ ['--nolocal']+ ['--hostfile'] + [pySPACE.configuration.root_dir+ "/" +'hostsfile'] + ['-n', str(len(sub_process_args_list))] + [self.COMMAND_PYTHON] + [self.runner_script] + sub_process_args_list) # Start the processes if (len(sub_process_args_list) > 0): p = subprocess.Popen(args) #print args # Adjust the start index self.IndexCopyStart += self.NumberOfProcessesToRunLater # Reset the counter for processes finished CounterProcessesFinished = 0 # sleep for one second time.sleep(1) except (KeyboardInterrupt, SystemExit): # if processes hang forever self.ProcessingSuccessful = False print "*********************************************************************************************************" print "pySPACE forced to stop ..." print "Please wait until mpi_backend is finished with consolidating the results generated and with clean up ..." print "**********************************************************************************************************" import pySPACE.resources.dataset_defs.performance_result.PerformanceResultSummary as PerformanceResultSummary # merge the remaining files print "***************************************************************************************************" print "Starting merging . . ." PerformanceResultSummary.merge_performance_results(self.current_operation.result_directory) print "Merging complete . . ." print "***************************************************************************************************" break #The while loop will break self._log("Operation - processing finished") # Change the state to retrieved self.state = "retrieved" return None
[docs] def consolidate(self): """ Consolidates the results of the single processes into a consistent result of the whole operation """ assert(self.state == "retrieved") if ((self.ProcessingSuccessful ==True) and (len(self.CrashedProcesses) == 0)): self.current_operation.consolidate() if ((self.ProcessingSuccessful ==True) and (len(self.CrashedProcesses) != 0)): import pySPACE.resources.dataset_defs.performance_result.PerformanceResultSummary as PerformanceResultSummary # merge the remaining files print "***************************************************************************************************" print "Starting merging . . ." PerformanceResultSummary.merge_performance_results(self.current_operation.result_directory) print "Merging complete . . ." print "***************************************************************************************************" self._log("Operation - consolidated") self.state = "consolidated"
[docs] def cleanup(self): """ Remove the current operation and all potential results that have been stored in this object """ self.state = "idling" # Cleaning up... stagein_dir = os.sep.join([self.current_operation.result_directory, ".stagein"]) if ((self.ProcessingSuccessful == True) and (len(self.CrashedProcesses) == 0)): deleted = False while not deleted: try: os.chdir("..") shutil.rmtree(stagein_dir) deleted = True except OSError, e: if e.errno == 66: self._log("Could not remove .stagein dir " ", waiting for NFS lock", level=logging.WARNING) time.sleep(5) self._log("Operation - cleaned up") self._log("Idling...") # Remove the file logger for this operation logging.getLogger('').removeHandler(self.file_handler) # close listener socket self.sock.close() self.current_operation = None