""" Script for running pyspace live controlling
.. image:: ../../graphics/launch_live.png
:width: 500
A script for running pyspace live. The script contains
a class to control the other related classes needed in the online mode,
and several methods that are used for the general startup of the suite.
"""
import sys
import os
import time
import traceback
import logging
import yaml
import datetime
import optparse
import multiprocessing
from collections import defaultdict
from select import select
file_path = os.path.dirname(os.path.realpath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
sys.path.append(pyspace_path)
import pySPACE
# create logger with handlers
from pySPACE.environments.live import online_utilities
online_logger = logging.getLogger("OnlineLogger")
[docs]class LiveController(object):
""" Controlling suite.
This class provides a clean interface to the live environment.
It provides contains objects of the classes that are used
for the online mode and configures them as needed.
The controller uses the config-files for user related configuration,
and additional parameter files for scenario/task specific parameterization.
"""
[docs] def __init__(self, parameters_, live_processing=None):
# create two level defaultdict for parameters!
parameters = defaultdict(lambda : defaultdict(lambda:None))
for key,value in parameters_.items():
#if the dict value is also a dict, convert this also to a defaultdict
if type(value) is dict:
value = self.convert_dict_to_defaultdict(value)
parameters[key] = value
# fetch parameters that would be nice to have
datafile_info = parameters["data_files"]
data_source = parameters["data_source"]
potentials = parameters["potentials"]
self.flow_persistency_directory = parameters["flow_persistency_directory"]
self.prewindowed_data_directory = parameters["prewindowed_data_directory"]
self.datafile_train = datafile_info["eeg_data_file_train"]
self.datafile_test = datafile_info["eeg_data_file_test"]
self.eeg_server_nullmarker_stride_ms = data_source["nullmarker_stride_ms"]
self.eeg_server_eeg_port = data_source["default_port"]
self.eeg_server_offline_predict_ip = data_source["predict_offline"]["ip"]
self.eeg_server_predict_ip = data_source["predict"]["ip"]
self.eeg_server_train_ip = data_source["train"]["ip"]
self.eeg_server_prewindow_ip = data_source["prewindow"]["ip"]
self.eeg_server_record = data_source["record"]
# settings for recording
self.subject = parameters["record"]["subject"]
self.experiment = parameters["record"]["experiment"]
# try to fetch optional parameters, set them to None if not present
# live_server_info = parameters["live_server"]
# self.live_server_ip = live_server_info["live_server_ip"]
# self.live_xmlrpc_port = live_server_info["live_xmlrpc_port"]
self.configuration = pySPACE.configuration
self.label = None
self.prediction_process = None
self.live_processing = None
self.live_prewindower = None
# figure out all potentials and
# store all relevant information
self.erps = dict()
if not isinstance(potentials, list):
for (_, potential) in potentials.iteritems():
if type(potential) is dict:
potential = self.convert_dict_to_defaultdict(potential)
potential["configuration"] = self.configuration
self.erps[potential["flow_id"]] = potential
else:
for potential in potentials:
if type(potential) is dict:
potential = self.convert_dict_to_defaultdict(potential)
potential["configuration"] = self.configuration
# online_logger.info(potential)
self.erps[potential["flow_id"]] = potential
results_to_send = []
for key in self.erps:
if self.erps[key].has_key('online_result_messenger') and self.erps[key]['online_result_messenger']:
results_to_send.append(key)
if live_processing == None:
if len(results_to_send) > 0:
self.messenger = pySPACE.environments.live.communication.socket_messenger.SocketMessenger(key=results_to_send)
online_logger.info('Prediction results from %s will be send via socket.', ' and '.join(results_to_send))
else:
self.messenger = pySPACE.environments.live.communication.log_messenger.LogMessenger()
# self.messenger = pySPACE.environments.live.communication.socket_messenger.SocketMessenger(key="P3")
# self.messenger = pySPACE.environments.live.communication.socket_messenger.EmbeddedSocketManager(key="P3")
else:
self.messenger = live_processing
[docs] def convert_dict_to_defaultdict(self, dict_to_convert):
if type(dict_to_convert) is not dict:
online_logger.warn( str('CARE: trying to convert a %s to defaultdict') % type(dict_to_convert))
return dict_to_convert
mydefaultdict = defaultdict(lambda:None)
for key in dict_to_convert:
mydefaultdict[key] = dict_to_convert[key]
return mydefaultdict
[docs] def prewindowing(self, online = True):
""" Prewindows the pyspace flows on the data streamed from
an external EEG-Server
"""
online_logger.info("starting prewindowing")
# Create prewindower
self.live_prewindower = trainer.LiveTrainer()
# online_logger.info(self.erps)
prewindowing_files = []
if online:
# create the stream manager
stream_manager = eeg_stream_manager.LiveEegStreamManager(online_logger)
stream_manager.initialize_eeg_server(ip=self.eeg_server_prewindow_ip,
port=self.eeg_server_eeg_port)
# setup recording if option is set
if self.subject is not None and \
self.experiment is not None:
stream_manager.record_with_options(self.subject, self.experiment)
else:
online_logger.error("RAW DATA IS NOT RECORDED!")
# set the stream manager into the trainer object
self.live_prewindower.set_eeg_stream_manager(stream_manager)
# in online case just connect to the streaming server
self.live_prewindower.prepare_training(prewindowing_files,
self.erps,
"prewindowing",
nullmarker_stride_ms = self.eeg_server_nullmarker_stride_ms)
else:
# when running offline prepare local streaming
if isinstance(self.datafile_train, str):
prewindowing_files = \
os.path.join(self.configuration.storage, self.datafile_train)
else:
for datafile in self.datafile_train:
if os.path.isabs(datafile):
prewindowing_files = prewindowing_files + [datafile]
else:
prewindowing_files = prewindowing_files + \
[os.path.join(self.configuration.storage, datafile)]
online_logger.info("prewindowing files:")
online_logger.info(prewindowing_files)
self.live_prewindower.prepare_training(prewindowing_files,
self.erps,
"prewindowing_offline",
nullmarker_stride_ms = self.eeg_server_nullmarker_stride_ms)
self.start_prewindowing(online)
[docs] def start_prewindowing(self, online = True):
""" Start the prewindowing process """
online_logger.info("Start prewindowing")
self.live_prewindower.start_training("prewindowing") # pass an additional True for profiling
[docs] def stop_prewindowing(self):
""" Create pyspace live processing server """
self.live_prewindower.process_external_command("STOP")
[docs] def prewindowed_train(self):
""" Trains the pyspace flows which have been prewindowed using the prewindower"""
# Create trainer and initialize the eeg data stream
pw_trainer = trainer.LiveTrainer()
postprocessing_files = []
pw_trainer.prepare_training(postprocessing_files,
self.erps,
"prewindowed_train",
nullmarker_stride_ms = self.eeg_server_nullmarker_stride_ms)
# Let pyspace live train on this data
online_logger.info("Start pyspace live training")
pw_trainer.start_training("prewindowed_train") # pass an additional True for profiling
[docs] def train(self):
""" Trains the pyspace flows on the data streamed from
an external EEG-Server
"""
# Create trainer and initialize the eeg data stream
online_trainer = trainer.LiveTrainer()
stream_manager = \
eeg_stream_manager.LiveEegStreamManager(online_logger)
stream_manager.initialize_eeg_server(ip=self.eeg_server_train_ip,
port=self.eeg_server_eeg_port)
# Prepare trainer for training
online_trainer.set_eeg_stream_manager(stream_manager)
training_files = []
if isinstance(self.datafile_train, str):
training_files = self.datafile_train
else:
for datafile in self.datafile_train:
if os.path.isabs(datafile):
datafile_train = [datafile]
else:
datafile_train = \
[os.path.join(self.configuration.storage, datafile)]
training_files = training_files + datafile_train
online_logger.info(training_files)
online_logger.info("#"*30)
online_trainer.prepare_training(training_files,
self.erps,
"train",
nullmarker_stride_ms = self.eeg_server_nullmarker_stride_ms)
# Let pyspace live train on this data
online_logger.info("Start pyspace live training")
online_trainer.start_training("train") # pass an additional True for profiling
[docs] def adapt_classification_threshold(self, load_model=True):
""" Adapts classification threshold on a special function """
# Create pyspace live processing server
live_adaptor = adaptation.LiveAdaptor()
# Reloading stored models
if load_model:
online_logger.info("Reloading Models")
live_adaptor.load_model(self.flow_persistency_directory, self.erps)
online_logger.info("Creating eeg stream")
# Start EEG server that streams data for testing
stream_manager = \
eeg_stream_manager.LiveEegStreamManager(online_logger)
stream_manager.initialize_eeg_server(ip=self.eeg_server_train_ip,
port=self.eeg_server_eeg_port)
# Prepare live_adaptor for adaptation
live_adaptor.set_eeg_stream_manager(stream_manager)
adaptation_files = []
if isinstance(self.datafile_train, str):
adaptation_files = self.datafile_train
else:
for datafile in self.datafile_train:
if os.path.isabs(datafile):
datafile_train = [datafile]
else:
datafile_train = \
[os.path.join(self.configuration.storage, datafile)]
adaptation_files = adaptation_files + datafile_train
online_logger.info(adaptation_files)
online_logger.info("#"*30)
live_adaptor.prepare_adaptation(adaptation_files,
self.erps)
# Prepare for adaptation
# register the pyspace live module with the ControlManager
# Let pyspace live train on this data
online_logger.info("Start pyspace live adaptation")
live_adaptor.start_adaptation()
# We block and wait until either adaptation is finished or
# the user enters a 'X'
time.sleep(5)
try:
while live_adaptor.is_adaptation_active():
time.sleep(1)
except Exception as exc:
online_logger.log(logging.ERROR,"Training interrupted")
exc_type, exc_value, exc_traceback = sys.exc_info()
online_logger.log(logging.ERROR, repr(
traceback.format_exception(
exc_type, exc_value, exc_traceback)))
online_logger.log(logging.ERROR, str(exc))
live_adaptor.stop_adaptation()
online_logger.info("Adaptation Finished")
[docs] def predict(self, load_model = True, online = True, remote = False):
""" Classifies new instances based on the trained pyspace flows"""
# do all preparations only if there is no prepared prediction process
if self.live_processing == None:
# create pyspace live processing server
self.live_processing = prediction.Predictor(self.messenger)
self.live_processing.set_controller(self)
self.prediction_process = self.live_processing
# reloading stored models
if load_model:
online_logger.info("Reloading Models")
self.live_processing.load_model(self.flow_persistency_directory, self.erps)
# connect to the server
if online:
# init eeg streaming and recording
stream_manager = eeg_stream_manager.LiveEegStreamManager(online_logger)
stream_manager.initialize_eeg_server(ip=self.eeg_server_predict_ip,
port=self.eeg_server_eeg_port)
# set teht stream manager into the trainger object
self.live_processing.set_eeg_stream_manager(stream_manager)
# prepare the prediction
self.live_processing.prepare_predicting(self.erps, nullmarker_stride_ms=self.eeg_server_nullmarker_stride_ms)
# setup recording if option is set
if self.subject is not None and self.experiment is not None:
stream_manager.record_with_options(self.subject, self.experiment, online=True)
else:
online_logger.warn("RAW DATA IS NOT RECORDED!")
else:
# when running offline prepare local streaming
if isinstance(self.datafile_test, str):
testing_file = \
os.path.join(self.configuration.storage, self.datafile_test)
elif isinstance(self.data_test, list):
testing_file = \
os.path.join(self.configuration.storage, self.datafile_test[0])
else:
raise Exception, "could not determine testing data!"
online_logger.info(str("testing file: %s " % testing_file))
self.live_processing.prepare_predicting(self.erps, testing_file, nullmarker_stride_ms = self.eeg_server_nullmarker_stride_ms)
online_logger.info("Finished")
if not remote and not online:
raw_input("\nPress Enter to start predicting ")
# Let pyspace live classify the test data
online_logger.info("Start pyspace live classification")
# pass an additional True for profiling
self.live_processing.start_predicting()
[docs] def stop_prediction(self):
# stop running prediction
self.live_processing.process_external_command("STOP")
[docs] def record(self):
# just record incoming data - press enter to stop
online_logger.info("recording .. (press enter to stop)")
data_stream = eeg_stream_manager.LiveEegStreamManager(online_logger)
data_stream.initialize_eeg_server(**self.eeg_server_record)
window_stream = data_stream.request_window_stream(window_spec=None, no_overlap=True)
data_stream.record_with_options(subject=self.subject,
experiment=self.experiment,
online=False)
for window, label in window_stream:
if select([sys.stdin], [], [], 0) == ([sys.stdin], [], []):
break
online_logger.info("Received window with label %s" % label)
data_stream.stop()
online_logger.info("stopped recording!")
[docs]def parse_arguments():
""" Parses the command line arguments to create options object"""
usage = "Usage: %prog [--config <configuration.yaml>] "\
"[--params <params.yaml>] "
parser = optparse.OptionParser(usage=usage)
parser.add_option("-c", "--configuration",
help="Choose the configuration file",
action="store")
parser.add_option("-p", "--params",
help="Specify parameter file that contains information about the data and environment",
action="store")
parser.add_option("-t","--train",
help="Train a flow according to parameters in parameter file",
action="store_true",
dest="train",
default=False)
parser.add_option("--prewindowing",
help="Prewindow a flow according to parameters in parameter file",
action="store_true",
dest="prewindowing",
default=False)
parser.add_option("--prewindowing_offline",
help="Prewindow an offline flow for test purpose",
action="store_true",
dest="prewindowing_offline",
default=False)
parser.add_option("--prewindowed_train",
help="Train a prewindowed flow according to parameters in parameter file",
action="store_true",
dest="prewindowed_train",
default=False)
parser.add_option("-a","--adapt",
help="Adapt the threshold of the flow according to parameters in parameter file",
action="store_true",
dest="adapt",
default=False)
parser.add_option("--predict",
help="Predict with trained flow",
action="store_true",
dest="predict",
default=False)
parser.add_option("--predict_offline",
help="Prediction using an offline flow for testing purposes",
action="store_true",
dest="predict_offline",
default=False)
parser.add_option("--all",
help="First train a flow according to parameters in parameter file and then do prediction using the trained flow",
action="store_true",
dest="all",
default=False)
parser.add_option("--remote",
help="Start remote control",
action="store_true",
dest="remote",
default=False)
parser.add_option("--record",
help="Just record data into the specified storage dir",
action="store_true",
dest="record",
default=False)
(parse_options, parse_args) = parser.parse_args()
return (parse_options, parse_args)
[docs]def read_parameter_file(parameter_file_name):
""" Reads and interprets the given parameter file """
# interpret parameter file
online_logger.info(parameter_file_name)
param_path = os.path.join(pySPACE.configuration.spec_dir, "live_settings", parameter_file_name)
stream = file(param_path, 'r')
online_logger.info( "Loading parameter file..")
parameters = yaml.load(stream)
online_logger.info( "Done.")
online_logger.debug(yaml.dump(parameters))
return parameters
[docs]def create_and_start_rpc_server(controller_instance, rpc_port=16254):
""" Creates and starts the server for the remote procedure calls """
# starting rpc server
rpc_server_ip = "localhost"
rpc_server_port = rpc_port
online_logger.info(str("Starting RPC server on port %d .." % rpc_server_port))
from SimpleXMLRPCServer import SimpleXMLRPCServer
server = \
SimpleXMLRPCServer((rpc_server_ip, rpc_server_port), logRequests=False)
online_logger.info( "RPCServer listens on "+str(rpc_server_ip)+":"+str(rpc_server_port))
# register and start
server.register_instance(controller_instance)
server_process = multiprocessing.Process(target = server.serve_forever)
server_process.start()
return server_process
[docs]def create_backup(liveControl, options):
"""Create backup files"""
online_logger.info( "Creating backup...")
#path to be created
path = os.path.realpath(__file__)
dir_path = os.path.dirname(path)
newdir = dir_path + os.path.sep + "backup"
if not os.path.exists(newdir):
os.makedirs (newdir)
date_time = datetime.datetime.now()
path_datetime = newdir + os.path.sep + date_time.strftime("%Y%m%d_%H%M%S")
os.mkdir (path_datetime)
path_flow = path_datetime + os.path.sep + "flow_storage"
path_node_chain = path_datetime + os.path.sep + "node_chains"
path_windower = path_datetime + os.path.sep + "windower"
path_param = path_datetime + os.path.sep + "live_settings"
os.mkdir (path_flow)
os.mkdir (path_node_chain)
os.mkdir (path_windower)
os.mkdir (path_param)
import distutils.dir_util
distutils.dir_util.copy_tree(
liveControl.flow_persistency_directory, path_flow)
if os.path.isdir (path_flow):
online_logger.info( "flow storage backup successful!")
param_path = os.path.join(pySPACE.configuration.spec_dir, "live_settings", options.params)
if param_path == None:
return
distutils.file_util.copy_file(param_path, path_param)
if os.path.isdir (path_param):
online_logger.info( "parameters file backup successful!")
online_logger.info("Creating backup finished!")
if __name__ == "__main__":
(options,args) = parse_arguments()
server_process = None
if options.remote:
online_logger.info("Starting remote modus")
conf_file_name = options.configuration
conf = pySPACE.load_configuration(conf_file_name)
adrf = pySPACE.environments.live.communication.adrf_messenger.AdrfMessenger()
adrf.register()
# register the interface with ADRF
online_logger.info("Starting event loop")
while True:
online_logger.info("Check register status")
time.sleep(0.5)
while adrf.is_registered():
#online_logger.info("Get command")
command = adrf.adrf_receive_command()
if command[0] == 3: # 3 = C_CONFIGURE
online_logger.info( "received command: C_CONFIGURE")
online_logger.info( "Loading parameter file..")
online_logger.info( "Done")
adrf.set_state(5) # 5 = S_CONFIGURED
# starting controller
cfg = adrf.get_config()
online_logger.info( "Constructing Controller...")
liveControl = LiveController(cfg, adrf)
online_logger.info( "Constructing Controller finished")
if server_process == None:
online_logger.info("Starting XMLRPCServer.. ")
server_process = create_and_start_rpc_server(liveControl)
else :
online_logger.info(str("XMLRPCServer already running (%s)" % server_process))
elif command[0] == 4: # 4 = C_STARTAPP
online_logger.info( "received command: C_STARTAPP")
adrf.set_state(6) # 6 = S_RUNNING
cfg = adrf.get_config()
# mode can be defined in the configuration file, predict_offline as an example
if cfg["mode"] == 'prewindowing_offline':
liveControl.prewindowing(online=False)
create_backup(liveControl, options)
elif cfg["mode"] == 'prewindowing':
# first start eegclient
liveControl.prewindowing(online=True)
create_backup(liveControl, options)
elif cfg["mode"] == 'prewindowed_train':
liveControl.prewindowed_train()
create_backup(liveControl, options)
elif cfg["mode"] == 'train':
liveControl.train()
create_backup(liveControl, options)
elif cfg["mode"] == 'adapt':
liveControl.adapt_classification_threshold()
elif cfg["mode"] == 'predict':
liveControl.predict(online=True, remote=True)
elif cfg["mode"] == 'predict_offline':
liveControl.predict(online=False, remote=True)
elif cfg["mode"] == 'all':
liveControl.train()
create_backup(liveControl, options)
liveControl.predict(online=False, remote=True)
else :
online_logger.warn(str("mode \'%s\' was not recognized!" % cfg["mode"]))
elif command[0] == 5: # 5 = C_STOPAPP
online_logger.info( "received command: C_STOPAPP")
adrf.set_state(8) # 8 = S_STOPPED
if cfg["mode"] in ('prewindowing', 'prewindowing_offline'):
liveControl.stop_prewindowing()
elif cfg["mode"] in ('predict', 'predict_offline'):
liveControl.stop_prediction()
else:
pass
adrf.undo_registration()
elif options.all:
online_logger.info("Starting training and then predicting...")
param_file_name = options.params
parameters = read_parameter_file(param_file_name)
conf_file_name = options.configuration
if conf_file_name is not None:
conf = pySPACE.load_configuration(conf_file_name)
else:
conf = None
# starting controller
online_logger.info( "Constructing Controller...")
liveControl = LiveController(parameters)
online_logger.info( "Constructing Controller finished")
server_process = create_and_start_rpc_server(liveControl)
liveControl.prewindowing()
liveControl.prewindowed_train()
create_backup(liveControl, options)
server_process.terminate()
server_process.join()
liveControl.predict(online=False)
server_process.terminate()
server_process.join()
else:
pySPACE.load_configuration(options.configuration)
conf = pySPACE.configuration
param_file_name = options.params
parameters = read_parameter_file(param_file_name)
from pySPACE.environments.live import eeg_stream_manager, prediction, adaptation, communication, trainer
import pySPACE.environments.live.communication.log_messenger
# starting controller
online_logger.info( "Constructing Controller...")
liveControl = LiveController(parameters)
online_logger.info( "Constructing Controller finished")
server_process = create_and_start_rpc_server(liveControl)
# start main work....
if options.prewindowing:
# first start eegclient
liveControl.prewindowing(online=True)
create_backup(liveControl, options)
elif options.prewindowing_offline:
liveControl.prewindowing(online=False)
create_backup(liveControl, options)
elif options.prewindowed_train:
liveControl.prewindowed_train()
create_backup(liveControl, options)
elif options.train:
liveControl.train()
create_backup(liveControl, options)
elif options.adapt:
liveControl.adapt_classification_threshold()
create_backup(liveControl, options)
elif options.predict:
liveControl.predict(online=True)
elif options.predict_offline:
liveControl.predict(online=False)
elif options.record:
liveControl.record()
server_process.terminate()
server_process.join()