""" Script to run the actual online classification of data
"""
import os
import logging.handlers
import multiprocessing
import time
import numpy
import cPickle
import xmlrpclib
import sys
online_logger = logging.getLogger("OnlineLogger")
file_path = os.path.dirname(os.path.realpath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
sys.path.append(pyspace_path)
import pySPACE
from pySPACE.environments.live import eeg_stream_manager
import pySPACE.environments.live.communication.socket_messenger
from pySPACE.tools.logging_stream_colorer import ColorFormatter, COLORS
[docs]class SimpleResultCollection(object):
""" Base Class for Result collection.
Default behaviour is counting occurring events in
a dictionary.
**Parameters**
:name: potential-name (e.g. LRP)
:params: parameter/meta-data of the potential
:store: write result to a pickle file
"""
[docs] def __init__(self, name, params, store=True):
self.result = None
self.name = name
self.params = params
self.store = store
online_logger.info(str(self))
[docs] def __repr__(self):
return str("simple result collection for %s (%s) " % (self.name, self.params))
[docs] def event_notification(self, event_str):
""" simple event counting """
if self.result is None:
self.result = dict()
if event_str not in self.result.keys():
self.result[event_str] = 0
self.result[event_str] += 1
[docs] def dump(self):
""" simple print function """
online_logger.info(str("result for %s: %s" % (self.name, str(self.result))))
# write the result to the file system
if self.store:
f = open(str("%s.result" % self.name), 'w')
cPickle.dump(self.result, f)
f.close()
[docs]class ConfusionMatrix(SimpleResultCollection):
""" A confusion matrix.
Stores and handles a confusion matrix.
The confusion matrix is assumed to have the following form:
+--------+---+-----+------+
| | prediction |
+ +-----+------+
| | P | N |
+========+===+=====+======+
| actual | P | TP | FN |
+ +---+-----+------+
| | N | FP | TN |
+--------+---+-----+------+
**Parameters**
:name: potential-name (e.g. LRP)
:params: parameter/meta-data of the potential
"""
[docs] def __init__(self, name, params):
super(ConfusionMatrix, self).__init__(name, params)
self.last_result = None
[docs] def __repr__(self):
return str("confusing matrix for %s (%s) " % (self.name, self.params))
[docs] def event_notification(self, event_str):
""" Update the confusion matrix
It is assumed that we receive a trigger event
*after* the classification result (either pos. or neg.-event).
If the trigger_event occurrs it is a validation
for a positive prediction.
If no trigger_event but instead another
classification result comes in it is assumed
that no reaction on the previously presented
target appeared.
"""
# online_logger.info("Got event with event string:" + str(event_str))
if self.result is None:
self.result = numpy.zeros((2,2))
if self.params.has_key("trigger_event"):
# if current event is a trigger event we use
# it as a validation for the last prediction
if event_str == self.params["trigger_event"]:
if self.last_result == self.params["negative_event"]:
self.result[1,0] += 1
elif self.last_result == self.params["positive_event"]:
self.result[0,0] += 1
elif self.last_result is None:
online_logger.warn("Received trigger event without positive event!")
online_logger.warn("Possible reasons:")
online_logger.warn("- Subject reacted although no target was presented")
online_logger.warn("- The streaming is much faster than realtime and the prediction appeared later than the respone.")
online_logger.warn("- Marker for trigger event appeared twice.")
self.last_result = None
# current event is a classification result:
# - either infer, that no reaction occurred
# - or just store it
if event_str == self.params["positive_event"] or \
event_str == self.params["negative_event"]:
if self.last_result == self.params["positive_event"]:
self.result[0,1] += 1
elif self.last_result == self.params["negative_event"]:
self.result[1,1] += 1
elif self.last_result is not None:
online_logger.info(str("unknwon event type: event_str=%s last_result=%s" % (event_str, self.last_result)))
# store it
self.last_result = event_str
[docs] def dump(self):
"""
prints the collected result of the confusion matrix
"""
super(ConfusionMatrix, self).dump()
online_logger.info(str("Confusion matrix for %s:" % self.name))
online_logger.info(str("+--------+---+------+------+"))
online_logger.info(str("| | prediction |"))
online_logger.info(str("+ +------+------+"))
online_logger.info(str("| | P | N |"))
online_logger.info(str("+========+===+======+======+"))
online_logger.info(str("| actual | P | %4d | %4d |" % (self.result[0,0], self.result[1,0])))
online_logger.info(str("+ +---+------+------+"))
online_logger.info(str("| | N | %4d | %4d |" % (self.result[0,1], self.result[1,1])))
online_logger.info(str("+--------+---+------+------+"))
[docs]class Predictor(object):
""" Class that is responsible to perform the actual predictions.
"""
[docs] def __init__(self, live_processing = None):
self.configuration = pySPACE.configuration
self.predicting_active_potential = {}
self.abri_flow = {}
self.prewindowing_flow = {}
self.postprocessing_flow = {}
# init the message handling
self.event_queue = dict() # multiprocessing.Queue()
self.command_queue = multiprocessing.Queue()
# initialize the live_processing
if live_processing == None:
self.messenger = pySPACE.environments.live.communication.socket_messenger.SocketMessenger()
else:
self.messenger = live_processing
self.create_processor_logger()
self.predict_process = {}
self.predicting_fct_stream_data_process = {}
self.event_notification_process = None
self.controller_host = None
self.controller_port = None
self.mars_host = None
self.mars_port = None
self.queue = {}
self.stream_manager = None
self.window_stream = {}
self.nullmarker_stride_ms = None
[docs] def __del__(self):
self.messenger.end_transmission()
[docs] def initialize_xmlrpc(self,
controller_host,
controller_port,
mars_host = '127.0.0.1',
mars_port = 8080):
""" Setup communication to remote listeners
This method tells ABRIProcessing which remote processes are interested
in being informed about its classification results.
"""
# Create Server Proxy for control center
self.controller_host = controller_host
self.controller_port = controller_port
self.controller = xmlrpclib.ServerProxy('http://%s:%s' % (controller_host,
controller_port))
# Create Server Proxy for MARS simulation
self.mars_host = mars_host
self.mars_port = mars_port
self.marsXmlRpcServer = \
xmlrpclib.ServerProxy('http://%s:%s' % (mars_host,
mars_port))
[docs] def set_eeg_stream_manager(self, stream_manager):
""" Set manager class that provides the actual data for the prediction """
self.stream_manager = stream_manager
[docs] def load_model(self, directory, datasets):
""" Store the learned models """
online_logger.info( "Reloading learned models ...")
self.datasets = datasets
for key in self.datasets.keys():
adapted_flow_path = "%s/%s.pickle" % (directory , "abri_flow_adapted_"+ key)
trained_flow_path = "%s/%s.pickle" % (directory , "train_flow_"+ key)
trained_flow_path_svm_model = "%s/%s.model" % (directory , "train_flow_"+ key)
prewindowing_flow_path = "%s/%s.pickle" % (directory , "prewindowing_flow_"+ key)
prewindowing_offline_flow_path = "%s/%s.pickle" % (directory , "prewindowing_offline_flow_"+ key)
# check if adapted flow exists, if yes, use it for prediction
if os.path.exists(adapted_flow_path):
flh = {}
flh[key] = open(adapted_flow_path, 'r')
self.abri_flow[key] = cPickle.load(flh[key])
flh[key].close()
online_logger.info( "Predicting using adapted " + key +" flow...")
# check if trained flow exists, if yes, use it for prediction
elif os.path.exists(trained_flow_path):
flh = {}
flh[key] = open(trained_flow_path, 'r')
self.abri_flow[key] = cPickle.load(flh[key])
flh[key].close()
online_logger.info( "Predicting using trained " + key +" flow...")
# try to get the flow from prewindowing flow and postprocessing flow
if os.path.exists(trained_flow_path_svm_model):
self.abri_flow[key][-1].load_model(trained_flow_path_svm_model)
online_logger.info( "Predicting using trained " + key +" flow...")
else:
flh_1 = {}
flh_2 = {}
if os.path.exists(prewindowing_flow_path):
flh_1[key] = open(prewindowing_flow_path, 'r')
elif os.path.exists(prewindowing_offline_flow_path):
flh_1[key] = open(prewindowing_offline_flow_path, 'r')
flh_2[key] = open("%s/%s.pickle" % (directory , "prewindowed_train_flow_"+ key), 'r')
self.prewindowing_flow[key] = cPickle.load(flh_1[key])
self.prewindowing_flow[key].pop(-1)
self.postprocessing_flow[key] = cPickle.load(flh_2[key])
self.postprocessing_flow[key].pop(0)
self.abri_flow[key] = self.prewindowing_flow[key] + self.postprocessing_flow[key]
flh_1[key].close()
flh_2[key].close()
online_logger.info( "Predicting using prewindowed trained " + key +" flow...")
time.sleep(5)
online_logger.info( "Reloading learned models ... Done!")
return 0
[docs] def prepare_predicting(self, datasets, testing_data=None, nullmarker_stride_ms = None):
"""Prepares the trained aBRI-DP flows to classify new instances.
"""
self.messenger.register()
self.nullmarker_stride_ms = nullmarker_stride_ms
if self.nullmarker_stride_ms == None:
online_logger.warn( 'Nullmarker stride interval is %s. You can specify it in your parameter file.' % self.nullmarker_stride_ms)
else:
online_logger.info( 'Nullmarker stride interval is set to %s ms' % self.nullmarker_stride_ms)
if testing_data is not None:
if self.stream_manager is not None:
online_logger.warn("deleting stream manager %s - this should not happen" % self.stream_manager)
self.stream_manager = None
self.stream_manager = eeg_stream_manager.LiveEegStreamManager(online_logger)
self.stream_manager.stream_local_file(testing_data)
# create window streams for all potentials
spec_base = self.configuration.spec_dir
for key in self.datasets.keys():
online_logger.info( "Creating " + key + " windower stream")
window_spec = os.path.join(spec_base,"node_chains","windower", self.datasets[key]["windower_spec_path_prediction"])
self.window_stream[key] = \
self.stream_manager.request_window_stream(window_spec, \
nullmarker_stride_ms=nullmarker_stride_ms, \
no_overlap=True)
# Classification is done in separate threads, we send the time series
# windows to these threads via two queues
for key in self.datasets.keys():
self.queue[key] = multiprocessing.Queue()
self.predicting_active_potential[key] = multiprocessing.Value("b",False)
self.predicting_paused_potential = multiprocessing.Value('b',False)
# The two classification threads access the two queues via two
# generators
def flow_generator(key):
"""create a generator to yield all the abri flow windows"""
# Yield all windows until a None item is found in the queue
while True:
window = self.queue[key].get(block = True, timeout = None)
if window == None: break
yield window
for key in self.datasets.keys():
self.abri_flow[key][0].set_generator(flow_generator(key))
return 0
[docs] def start_predicting(self, trace = False):
""" Classify new instances based on the learned aBRI-DP flows. """
if trace:
for key in self.datasets.keys():
for node in self.abri_flow[key]:
node.trace = True
def handle_event_notification(key):
online_logger.info(str("handling event notification for %s" % key))
if self.datasets[key].has_key("trigger_event"):
result_collector = ConfusionMatrix(name=key, params=self.datasets[key])
else:
result_collector = SimpleResultCollection(name=key, params=self.datasets[key])
event = self.event_queue[key].get(block = True, timeout = None)
while event != None:
result_collector.event_notification(event)
event = self.event_queue[key].get(block = True, timeout = None)
result_collector.dump()
def predicting_fct(key):
""" A function that is executed in a separate thread, in which pyspace detects whether a
target is perceived or not and put them in the event queue
"""
self.predicting_active_potential[key].value = True
online_logger.debug(key +" detection process started")
for result in self.abri_flow[key].execute():
if self.predicting_paused_potential.value:
continue
if not self.datasets[key].get("messenger",True):
continue
if self.datasets[key].has_key("trigger_event"):
self.messenger.send_message((key, result[0].label in self.datasets[key]["positive_event"]))
if str(result[0].label) in self.datasets[key]["positive_event"]:
self.event_queue[key].put(self.datasets[key]["positive_event"])
else:
self.event_queue[key].put(self.datasets[key]["negative_event"])
online_logger.info("Classified target as " + str(result[0].label) + " with score " + str(result[0].prediction))
else:
self.messenger.send_message((key,result[0].prediction))
if str(result[0].label) == self.datasets[key]["positive_event"]:
self.pos_logger.info("Classified window as "
+ str(result[0].label) + " with score " + str(result[0].prediction))
self.event_queue[key].put(self.datasets[key]["positive_prediction"])
else:
self.neg_logger.info("Classified window as "
+ str(result[0].label) + " with score " + str(result[0].prediction))
self.event_queue[key].put(self.datasets[key]["negative_prediction"])
# when finished put a none in the event queue
self.event_queue[key].put(None)
self.predicting_active_potential[key].value = False
online_logger.info(str("predicition of %s finished!" % key))
def predicting_fct_stream_data(key):
""" A function that decides whether the window stream in p3 is a response or a
NoResponse or a Standard and put them in an event queue
"""
active = True
visualize = False
# distribute all windows to the responsible flows
for data, label in self.window_stream[key]:
if self.predicting_paused_potential.value:
continue
# distribution is performed according to different preconditions
# detection is performed if there is a preceding trigger event
if self.datasets[key].has_key("trigger_event"):
if label in self.datasets[key]["trigger_event"]:
self.event_queue[key].put(self.datasets[key]["trigger_event"])
else:
self.queue[key].put((data, label))
# switch detection on or off depending on activation label
elif self.datasets[key].has_key("activation_label"):
if label in self.datasets[key]["activation_label"]:
online_logger.info("Detection of " + key + "started")
active = True
elif label in self.datasets[key]["deactivation_label"]:
online_logger.info("Detection of " + key + "stopped")
active = False
if label in self.datasets[key]["positive_event"] and active:
self.event_queue[key].put(self.datasets[key]["positive_event"])
self.queue[key].put((data, label))
time.sleep(0.1)
# just put data into the flow
else:
self.queue[key].put((data, label))
# Put a None into the data-queue to stop classification threads
self.queue[key].put(None)
# self.predicting_active_potential[key].value = False
online_logger.info("Finished stream data " + key)
online_logger.info( "Starting Evaluation")
# Start two threads for predicting
for key in self.datasets.keys():
if not key in self.event_queue.keys():
self.event_queue[key] = multiprocessing.Queue()
if not key in self.predict_process.keys():
self.predict_process[key] = \
multiprocessing.Process(target = predicting_fct, args = (key,))
self.predict_process[key].start()
if not key in self.predicting_fct_stream_data_process.keys():
self.predicting_fct_stream_data_process[key] = \
multiprocessing.Process(target = predicting_fct_stream_data, args = (key,))
self.predicting_fct_stream_data_process[key].start()
self.predicting_paused_potential.value = False
if not self.event_notification_process:
self.event_notification_process = dict()
for key in self.datasets.keys():
self.event_notification_process[key] = \
multiprocessing.Process(target = handle_event_notification, args=(key,))
self.event_notification_process[key].start()
return 0
# Put all windows into the queues so that they can be processed by
# the two classification threads
[docs] def is_predicting_active(self):
""" Returns whether prediction phase is finished or still running """
for key in self.datasets.keys():
return self.predicting_active_potential[key].value == True #or self.predicting_active_LRP.value == True
[docs] def process_external_command(self, command):
if command == "STOP":
self.pause_prediction()
[docs] def pause_prediction(self):
self.predicting_paused_potential.value = True
[docs] def stop_predicting(self):
""" Force the end of the predicting """
# We stop the aBRI-DP training by disconnecting the EEG stream from it
def read(**kwargs):
online_logger.info( "Canceling EEG transfer")
return 0
online_logger.info( "Stopping predicting ...")
# Wait until aBRI-DP has finished predicting
for key in self.datasets.keys():
self.predict_process[key].join()
self.predicting_fct_stream_data_process[key].join()
self.event_notification_process.join()
online_logger.info("Prediction finished")
return 0
[docs] def set_controller(self,controller):
""" Set reference to the controller """
self.controller = controller
[docs] def create_processor_logger(self):
""" Create specific logger for the prediction """
# Setting up log level
# create a logger
# create logger for test output
self.pos_logger = logging.getLogger('positiveEventLogger')
self.pos_logger.setLevel(logging.DEBUG)
self.neg_logger = logging.getLogger('negativeEventLogger')
self.neg_logger.setLevel(logging.DEBUG)
formatterResultsStreamNoLrp = ColorFormatter("%(asctime)s - %(name)s: %(message)s",
color = COLORS.RED)
formatterResultsStreamLrp = ColorFormatter("%(asctime)s - %(name)s: %(message)s",
color = COLORS.GREEN)
formatterResultsFile = logging.Formatter("%(asctime)s - %(name)s: %(message)s")
loggingFileHandlerResults = logging.handlers.TimedRotatingFileHandler("log"+os.path.sep+ \
"prediction_lrp.log",backupCount=5)
loggingStreamHandlerResultsNoLrp = logging.StreamHandler()
loggingStreamHandlerResultsNoLrp.setFormatter(formatterResultsStreamNoLrp)
loggingStreamHandlerResultsLrp = logging.StreamHandler()
loggingStreamHandlerResultsLrp.setFormatter(formatterResultsStreamLrp)
loggingFileHandlerResults.setFormatter(formatterResultsFile)
loggingStreamHandlerResultsLrp.setLevel(logging.DEBUG)
loggingStreamHandlerResultsNoLrp.setLevel(logging.DEBUG)
loggingFileHandlerResults.setLevel(logging.DEBUG)
self.pos_logger.addHandler(loggingStreamHandlerResultsLrp)
self.neg_logger.addHandler(loggingStreamHandlerResultsNoLrp)
self.pos_logger.addHandler(loggingFileHandlerResults)
self.neg_logger.addHandler(loggingFileHandlerResults)