Source code for pySPACE.environments.backends.serial
""" Serial process execution on the local machine for easy debugging
All processes are executed in sequence in the main
process thread.
This does not exploit multiple cores or grids but
simplifies debugging and gives a simple implementation.
"""
import logging
import logging.handlers
import traceback
from functools import partial
import pySPACE
from pySPACE.environments.backends.base import Backend
from pySPACE.tools.progressbar import ProgressBar, Percentage, ETA, Bar
[docs]class SerialBackend(Backend):
""" A backend that allows for easy debugging since the program flow
is not threaded or distributed over several OS processes.
"""
[docs] def __init__(self):
super(SerialBackend, self).__init__()
self.state = "idling"
self.current_process = 0
[docs] def stage_in(self, operation):
"""
Stage the current operation
"""
super(SerialBackend, self).stage_in(operation)
# Set up progress bar
widgets = ['Operation progress: ', Percentage(), ' ', Bar(), ' ', ETA()]
self.progress_bar = ProgressBar(widgets = widgets,
maxval = self.current_operation.number_processes)
self.progress_bar.start()
self._log("Operation - staged")
self.state = "staged"
[docs] def execute(self, timeout=1e6):
"""
Executes all processes specified in the currently staged
operation.
"""
assert(self.state == "staged")
self.state = "executing"
self._log("Operation - executing")
# The handler that is used remotely for logging
handler_class = logging.handlers.SocketHandler
handler_args = {"host" : self.host, "port" : self.port}
get_process = partial(self.current_operation.processes.get, timeout=timeout)
for process in iter(get_process, False):
process.prepare(pySPACE.configuration, handler_class, handler_args)
# Execute process, update progress bar and get next queue-element
try:
process()
# if an exception is raised somewhere in the code we maybe want to
# further try other processes
except Exception:
self._log(traceback.format_exc(), level=logging.CRITICAL)
process.post_benchmarking()
process = False
# if ctrl+c is pressed we want to immediately stop everything
except KeyboardInterrupt:
self._log(traceback.format_exc(), level=logging.CRITICAL)
process.post_benchmarking()
process = False
else:
self.current_process += 1
self.progress_bar.update(self.current_process)
[docs] def check_status(self):
"""
Returns a description of the current state of the operations
execution.
.. todo:: do we really need this method???
"""
# Returns which percentage of processes of the current operation
# is already finished
return float(self.current_process)/self.current_operation.number_processes
[docs] def retrieve(self, timeout=1e6):
"""
Returns the result of the operation.
This is trivial in the Debug-Backend since execute blocks.
"""
assert(self.state == "executing")
self._log("Operation - retrieved")
self.current_operation.processes.close()
# if process creation has another thread
if hasattr(self.current_operation, "create_process") \
and self.current_operation.create_process != None:
self.current_operation.create_process.join(timeout=1e6)
# Change the state to retrieved
self.state = "retrieved"
[docs] def consolidate(self):
"""
Consolidates the results of the single processes into a consistent result of the whole
operation
"""
assert(self.state == "retrieved")
try:
self.current_operation.consolidate()
except Exception:
self._log(traceback.format_exc(), level=logging.CRITICAL)
self._log("Operation - consolidated")
self.state = "consolidated"
[docs] def cleanup(self):
"""
Remove the current operation and all potential results that
have been stored in this object
"""
self.state = "idling"
self._log("Operation - cleaned up")
self._log("Idling...")
# Remove the file logger for this operation
logging.getLogger('').removeHandler(self.file_handler)
# close listener socket
self.sock.close()
self.current_operation = None
self.current_process = 0