Source code for pySPACE.environments.backends.base
""" Backend Base Class and Methods """
import abc
import socket
import logging
import logging.handlers
import os
import sys
import signal
from multiprocessing.pool import IMapIterator
file_path = os.path.dirname(os.path.abspath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
sys.path.append(pyspace_path)
import pySPACE
[docs]class Backend(object):
""" Interface for backends
All other backends must implement several methods of
this interface in order to execute an operation
on a specific modality.
"""
__metaclass__ = abc.ABCMeta
STATES = {"idling", "staged", "executing", "retrieved", "consolidated"}
[docs] def __init__(self):
# Start logging
self._start_logging()
# Install signal handlers
#signal.signal(signal.SIGINT, self._grace)
#signal.signal(signal.SIGTERM, self._grace)
#signal.signal(signal.SIGQUIT, self._grace)
#signal.signal(signal.SIGPIPE, self._grace)
self.file_handler = None
self.current_operation = None
self.SERVER_IP = socket.gethostbyname(socket.gethostname())
self.SERVER_PORT = None
self.listener = None
[docs] def __del__(self):
# Stop logging
self._stop_logging()
[docs] def __str__(self):
return str(type(self).__name__)
[docs] def stage_in(self, operation):
"""
Stage the current operation
"""
self.current_operation = operation
# Add a handler that logs the operations output to a file
log_path = self.current_operation.result_directory
self.file_handler = logging.FileHandler(os.path.join(log_path,
"operation.log"))
# Determining level of the logger based on pySPACE configuration file
try:
log_level = eval(pySPACE.configuration.file_log_level) \
if hasattr(pySPACE.configuration, "file_log_level") \
else logging.INFO
if not isinstance(log_level, int):
raise NameError()
except (AttributeError, NameError):
import warnings
warnings.warn(
"%s is not a valid log level! Falling back to logging.INFO."
% pySPACE.configuration.log_level)
log_level = logging.INFO
self.file_handler.setLevel(log_level)
# set a format which is simpler for console use
formatter = logging.Formatter(
'%(asctime)s %(name)-40s %(levelname)-8s %(message)s')
# tell the handler to use the formatter
self.file_handler.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(self.file_handler)
# prepare socket connection and listener thread
self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) # TCP
self.sock.bind((self.SERVER_IP,0))
self.sock.setblocking(0)
self.SERVER_PORT = self.sock.getsockname()[1]
@abc.abstractmethod
[docs] def execute(self, timeout=1e6):
"""
Executes all processes specified in the currently staged
operation.
:param timeout: The timeout for this method, after which the execution is aborted
:type timeout: int
"""
raise NotImplementedError(
"Method execute has not been implemented in subclass %s"
% self.__class__.__name__)
@abc.abstractmethod
[docs] def check_status(self):
"""
Returns a description of the current state of the operations
execution.
"""
raise NotImplementedError(
"Method check_status has not been implemented in subclass %s"
% self.__class__.__name__)
@abc.abstractmethod
[docs] def retrieve(self, timeout=1e6):
"""
Fetches the results of the operation's processes.
... note:: This call might block until all processes are finished
"""
raise NotImplementedError(
"Method retrieve has not been implemented in subclass %s"
% self.__class__.__name__)
@abc.abstractmethod
[docs] def consolidate(self):
"""
Consolidates the results of the single processes into a consistent result of the whole
operation
"""
raise NotImplementedError(
"Method consolidate has not been implemented in subclass %s"
% self.__class__.__name__)
@abc.abstractmethod
[docs] def cleanup(self):
"""
Remove the current operation and all potential results that
have been stored in this object
"""
raise NotImplementedError(
"Method cleanup has not been implemented in subclass %s"
% self.__class__.__name__)
[docs] def get_result_directory(self):
""" Return the result directory of the current operation (if any) """
if self.current_operation == None:
raise Exception("No operation staged!")
return self.current_operation.result_directory
[docs] def _start_logging(self):
""" Configures and starts the logging of this operation """
# Remove the default handler
if len(logging.getLogger('').handlers) > 0:
logging.getLogger('').removeHandler(logging.getLogger('').handlers[0])
# define a handler which writes WARNING messages
# or higher to the sys.stderr
console = logging.StreamHandler()
# Determining level of the logger based on pySPACE configuration file
try:
log_level = eval(pySPACE.configuration.console_log_level) \
if hasattr(pySPACE.configuration, "console_log_level") \
else logging.WARNING
if not isinstance(log_level, int):
raise NameError()
except (AttributeError, NameError):
import warnings
warnings.warn("%s is not a valid log level! Falling back to " \
"logging.WARNING." % pySPACE.configuration.log_level)
log_level = logging.WARNING
console.setLevel(log_level)
# set a format which is simpler for console use
formatter = logging.Formatter(
'%(asctime)s %(name)-40s %(levelname)-8s %(message)s')
# tell the handler to use this format
console.setFormatter(formatter)
# add the handler to the root logger
logging.getLogger('').addHandler(console)
logging.getLogger('').setLevel(logging.DEBUG)
self._log("Logging started")
# Starting the socket server
from pySPACE.tools.socket_logger import LogRecordSocketReceiver
self._log("Starting the TCP logging server...")
# Determining host ip
host, aliaslist, lan_ip = socket.gethostbyname_ex(socket.gethostname())
self.host = lan_ip[0]
# Search for an available port
self.port = logging.handlers.DEFAULT_TCP_LOGGING_PORT
while True:
try:
self.tcpserver = LogRecordSocketReceiver(host=self.host,
port=self.port)
break
except socket.error:
self.port += 1
self.tcpserver.start()
self._log("Started the TCP logging server on port %s!" % self.port)
[docs] def _stop_logging(self):
""" Stops the logging of this operation """
self._log("Stopping the TCP logging server...")
if hasattr(self, "tcpserver"):
self.tcpserver.abort = True
self.tcpserver.join(timeout=1e6)
self.tcpserver.shutdown()
del(self.tcpserver)
# self._log("Stopping the TCP logging server... Done!")
# self._log("Logging stopped")
root_logger = logging.getLogger("%s-%s.%s" % (socket.gethostname(),
os.getpid(),
self))
for handler in root_logger.handlers:
handler.close()
root_logger.removeHandler(handler)
del(root_logger)
[docs] def _log(self, message, level = logging.INFO):
""" Logs the given message with the given logging level """
root_logger = logging.getLogger("%s-%s.%s" % (socket.gethostname(),
os.getpid(),
self))
if len(root_logger.handlers) == 0:
root_logger.addHandler(logging.handlers.SocketHandler('localhost',
logging.handlers.DEFAULT_TCP_LOGGING_PORT))
root_logger.log(level, message)
#def _grace(self, signal_number, stack_frame):
#self._log("Signal %s thrown! Gracing backend..." % signal_number, logging.WARNING)
#self.cleanup()
#self._stop_logging()
#import sys
#sys.exit(1)
[docs]def create_backend(backend_type = "serial"):
""" Creates the :mod:`backend object<pySPACE.environments.backends>` based on the given options
The following backends are available:
:``serial``: :class:`~pySPACE.environments.backends.serial.SerialBackend`
:``mcore``: :class:`~pySPACE.environments.backends.multicore.MulticoreBackend`
:``mpi``: :class:`~pySPACE.environments.backends.mpi_backend.MpiBackend`
:``loadl``: :class:`~pySPACE.environments.backends.ll_backend.LoadLevelerBackend`
"""
if backend_type == "serial":
from pySPACE.environments.backends.serial import SerialBackend
backend = SerialBackend()
elif backend_type == "mcore":
from pySPACE.environments.backends.multicore import MulticoreBackend
if hasattr(pySPACE.configuration, "pool_size"):
backend = MulticoreBackend(pool_size=pySPACE.configuration.pool_size)
else:
backend = MulticoreBackend()
elif backend_type == "mpi":
from pySPACE.environments.backends.mpi_backend import MpiBackend
if hasattr(pySPACE.configuration, "pool_size"):
backend = MpiBackend(pySPACE.configuration.pool_size)
else:
backend = MpiBackend()
elif backend_type == "loadl":
from pySPACE.environments.backends.ll_backend import LoadLevelerBackend
backend = LoadLevelerBackend()
else:
raise Exception("Invalid backend (must be either serial, mcore, or mpi). Is %s." % backend_type)
return backend