Source code for pySPACE.environments.live.eeg_stream_manager
""" Script for managing of eeg data streams
Here the :class:`~pySPACE.missions.support.windower.MarkerWindower` is used.
"""
import os
import sys
import time
import xmlrpclib
import subprocess
import random
import warnings
file_path = os.path.dirname(os.path.abspath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
sys.path.append(pyspace_path)
import pySPACE
from pySPACE.missions.support.windower import MarkerWindower, WindowFactory
from pySPACE.tools.live import eeg_stream
from pySPACE.environments.live.recorder import Recorder
[docs]class LiveEegStreamManager(object):
""" This module controls the eegmanager related configuration
and provides a meaningful interface of the eegmanager for pyspace live.
"""
[docs] def __init__(self, logger):
self.logger = logger
self.configuration = pySPACE.configuration
self.eeg_client = None
# tools for eegmanager subprocess
self.server_process = None
self.remote = None
self.ip = None
self.port = None
# tools for Python based recorder
self.recorder = None
# create a new eeg server process
# it is needed either for streaming of local data
# or recording of incoming data
self.executable_path = os.path.join(pyspace_path, "pySPACE", "tools", "live", "eegmanager", "eegmanager", "eegmanager")
if not os.path.isfile(self.executable_path):
self.logger.error("cannot find eegmanager executable!")
self.logger.error("it should be in %s", self.executable_path)
exit(0)
self.logger.info(str("starting process with executable %s" % self.executable_path))
xml_range = range(16253, 17253)
random.shuffle(xml_range)
for xmlport in xml_range:
try:
self.logger.info("Creating eegserver process with server \'eegmanager %d\'" % xmlport)
self.server_process = subprocess.Popen([self.executable_path, str(xmlport)])
except OSError as err:
self.logger.error(str("launching subprocess with excutable at %s failed!" % self.executable_path))
raise err
time.sleep(.3)
self.server_process.poll()
if self.server_process.returncode == None:
# exit the loop of the process is still running..
break
self.remote = xmlrpclib.ServerProxy("http://%s:%s" % ("127.0.0.1", xmlport))
[docs] def __del__(self):
if self.remote is not None:
self.remote.shut_down()
[docs] def stream_local_file(self, filename):
# start streaming of file filename using
# the local eegmanager process
self.ip = ip = "127.0.0.1"
if self.remote.get_state()[0] is not "IDLE":
self.remote.stop()
tcp_range = range(41244, 61244)
random.shuffle(tcp_range)
for port in tcp_range:
self.remote.stop() # clear possible dangling setup
# file acquisition
ret = self.remote.add_module("FILEAcquisition", str("--blocksize 100 --filename %s" % filename))
if ret < 0:
self.remote.stop()
self.logger.error(str("failed to add fileacquisition for file %s" % filename))
self.logger.error(str(self.remote.stdout()))
self.remote.shut_down()
raise Exception, "Check your paths!"
# to the network!
ret = self.remote.add_module("NETOutput", str("--port %d --blocking" % port))
if ret < 0:
self.logger.warn("failed to add netoutput with port %d" % port)
self.logger.error(str(self.remote.stdout()))
continue
self.port = port
break
self.remote.start()
self.logger.debug(str("started local streaming of file %s" % filename))
[docs] def initialize_eeg_server(self, ip=None, port=None, usb=None):
self.ip = ip
self.port = port
self.usb = usb
[docs] def record_with_options(self, subject, experiment, online=False):
# create new Recorder instance to handle raw data storage
self.recorder = Recorder(client=self.eeg_client, folder=None,
subject=subject, task=experiment, online=online)
self.logger.info("started raw-data-recording")
[docs] def request_window_stream(self, window_spec=None, nullmarker_stride_ms=1000, no_overlap=True):
# load windower spec file
if window_spec is None:
window_definitions = WindowFactory.default_windower_spec()
no_overlap = True
self.logger.info("Using default windower spec %s" % window_definitions)
else:
windower_spec_file = open(window_spec, 'r')
window_definitions = \
WindowFactory.window_definitions_from_yaml(windower_spec_file)
windower_spec_file.close()
self.logger.info(str("Finished loading windower spec file from %s" % window_spec))
if nullmarker_stride_ms != window_definitions[0].endoffsetms:
warnings.warn("defined nullmarker stride (%d) is different from "
"endoffset (%d) in window-definitions[0]!" %
(nullmarker_stride_ms, window_definitions[0].endoffsetms))
eeg_client = self.setup_client()
# create windower
self.marker_windower = MarkerWindower(eeg_client,
window_definitions,
nullmarker_stride_ms=nullmarker_stride_ms,
no_overlap = no_overlap)
self.logger.info( "Created windower instance")
# return an iterator over the yielded windows
window_stream = ((sample, label) for (sample, label) in self.marker_windower)
self.logger.info( "Created window-stream")
return window_stream
[docs] def setup_client(self):
# connect and start client
if self.ip is not None and self.port is not None:
eeg_client = eeg_stream.EEGClient(host=self.ip, port=self.port)
elif self.usb is not None:
self.logger.info("Using USB Client")
eeg_client = eeg_stream.EEGClientUsb()
eeg_client.connect()
if self.eeg_client is None:
self.eeg_client = eeg_client
if self.recorder is not None:
if not self.recorder.has_client():
self.recorder.set_eeg_client(eeg_client)
self.logger.info("Started EEG-Client")
return eeg_client
[docs] def stop(self):
self.eeg_client.disconnect()
del self.eeg_client
if self.remote is not None:
self.remote.stop()
self.remote.shut_down()