Source code for pySPACE.tools.live.eeg_stream

# -*- coding: UTF-8 -*

"""eeg_stream.py
EEG client module.

Created by Timo Duchrow on 2008-08-26.
"""

import sys
import socket
import struct
import numpy
import scipy
import Queue
import signal
import subprocess
import os
import time
import xmlrpclib
import random
import glob
import warnings

file_path = os.path.dirname(os.path.abspath(__file__))
pyspace_path = file_path[:file_path.rfind('pySPACE')-1]
if not pyspace_path in sys.path:
    sys.path.append(pyspace_path)

import pySPACE
from ipmarkers import MarkerServer

usb_warning = True
try:
    if hasattr(pySPACE.configuration, "eeg_modules_dir"):
        sys.path.append(pySPACE.configuration.eeg_modules_dir)
        from eeg_acquisition.pybrainamp import BUASubprocess
        usb_warning = False
except:
    pass

from pySPACE.missions.support.WindowerInterface import AbstractStreamReader

verbose = False

READYMSG = '1'

fmt__GUID = 'IHH8s'
fmt_RDA_MessageHeader = 'II'

n__GUID = struct.calcsize(fmt__GUID)
n_RDA_MessageHeader = struct.calcsize(fmt_RDA_MessageHeader)

T_RDA_MessageStart = 1
T_RDA_MessageData = 2
T_RDA_MessageStop = 3
T_RDA_MessageMuxData = 4
T_RDA_MessageData32 = 9
T_ReadyMessage = 10


[docs]class EEGClient(AbstractStreamReader): """EEG stream client for EEG stream protocol"""
[docs] def __init__(self, host='127.0.0.1', port=51244, **kwargs): super(EEGClient, self).__init__() # containers for abstract properties self._dSamplingInterval = None # sampling interval self._channelNames = None # list of channel names self._stdblocksize = None # standard number of paints in one data block self._markerids= dict() self._markerNames = dict() # dictionary with marker names self.host = host self.port = port self.nChannels = None # number of channels self.sample_size = None self.protocol_version = None self.resolutions = None # list of resolutions / channel self.channelids = dict() self.abs_start_time = 0 self.callbacks = list() self.meta = dict() self.ndsamples = None # last sample block read self.ndmarkers = None # last marker block read self.nmarkertypes = 0 # number of different marker types self.lostmarker = False # for two markers corresponding to one sample in last sample of a block self.lostmarkertypedesc = None self.running = True
@property def dSamplingInterval(self): return self._dSamplingInterval @property def stdblocksize(self): return self._stdblocksize @property def markerids(self): return self._markerids @property def channelNames(self): return self._channelNames @property def markerNames(self): return self._markerNames
[docs] def connect(self, verbose=False): """Connect to EEG stream server and collect metadata""" try: self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) except socket.error, (value,message): if self.socket: self.socket.close() raise IOError, "(%d): could not open socket(%d): %s" % (value, self.port, message) # Configure and connect the Socket try: self.socket.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) self.socket.connect((self.host, self.port)) except socket.error, (value,message): if self.socket: self.socket.close() raise IOError, "(%d): SO_REUSEADDR and/or connect failed! (%d): %s" % (value, self.port, message) # print "connected to %s @ %s" %(self.host, self.port) if(os.name=='posix'): # install signal handlers sigs = (signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGQUIT) for s in sigs: signal.signal(s, self._grace) # read start message ret = self._readmsg(msg_type=T_RDA_MessageStart, verbose=verbose) if ret == -1: self.socket.close() raise IOError, "could not read start message" if verbose: print "connected to server"
[docs] def regcallback(self, func): """Register callback function""" self.callbacks.append(func)
[docs] def read(self, nblocks=1, verbose=False): """Invoke registered callbacks for each incoming data block returns number of read _data_ blocks""" ret = 0 nread = 0 # return no data when not running if not self.running: return nread while ret != -1 and \ ret != T_RDA_MessageStop and \ (nblocks == -1 or nread < nblocks) and self.running: ret = self._readmsg(verbose=verbose) if ret == T_RDA_MessageData or ret == T_RDA_MessageData32 or ret == T_RDA_MessageMuxData: if verbose: print "received EEG stream data message" for f in self.callbacks: f(self.ndsamples, self.ndmarkers) nread += 1 elif ret == T_RDA_MessageStop: if verbose: print "received EEG stream stop message" elif ret == T_RDA_MessageStart: if verbose: print "Received updated EEG stream start Message" #raise IOError, "received unexpected EEG stream start message" elif ret == -1: raise IOError, "could not read EEG stream message" else: raise IOError, "unknown EEG stream message type %d" % ret return nread
[docs] def _sendmsg(self, msg_type=T_ReadyMessage, content=None): """Sends Messages to EEGServer T_ReadyMessage := tell Server to send another EEG{Data, Start, Stop}Message""" if msg_type == T_ReadyMessage: self.socket.send(READYMSG) else: raise IOError, "Type of Message to send to EEGserver not known!"
[docs] def _readmsg(self, msg_type='all', verbose=False): """Reads EEG stream message and invokes appropiate handler. Reads EEG stream header and checks optional type constraint. Returns which message type was read.""" # tell server to send a packet try: self._sendmsg() except: return T_RDA_MessageStop # read GUID #guid = self.socket.recv(n__GUID) # read header recvsize = n_RDA_MessageHeader buff = '' while len(buff)<recvsize: hdr = self.socket.recv(recvsize-len(buff)) buff += hdr hdr = buff if len(hdr) == 0: # print ">>> no header" return -1 elif len(hdr) != n_RDA_MessageHeader: raise IOError, "data stream corrupt" (nSize, nType) = struct.unpack(fmt_RDA_MessageHeader, hdr) if verbose: print "header:\n nSize: %08x\n nType: %08x" % (nSize, nType) # check optional type constraint if msg_type != 'all' and msg_type != nType: raise IOError, "RDA message of type %d expected "\ "(received type %d from server)" % (msg_type, nType) # read message from socket # The while loop is neccessary to ensure that the total amount of data is read self.recvsize = nSize - n_RDA_MessageHeader if verbose: print "receiving message type %d of size %d" % (nType, nSize) self.buff1 = '' while len(self.buff1) < self.recvsize: payload = self.socket.recv(self.recvsize-len(self.buff1)) self.buff1 += payload payload = self.buff1 if verbose: print("done! total size (except header): %d" % len(payload)) # invoke appropiate handler for decoding if nType == T_RDA_MessageStart: self._getstartmsg(payload, verbose=verbose) elif nType == T_RDA_MessageData: self._getdatamsg(payload) elif nType == T_RDA_MessageMuxData: self._getmuxdatamsg(payload, verbose=verbose) elif nType == T_RDA_MessageStop: self._getstopmsg(payload) elif nType == T_RDA_MessageData32: self._getdata32msg(payload) return nType
[docs] def _getstopmsg(self, payload, verbose=False): self.socket.close()
[docs] def _getstartmsg(self, payload, verbose=False): """Decode metadata from start type message""" offset = 0 # read number of channels and sampling interval fmt = 'IIIII' nread = struct.calcsize(fmt) (self.nChannels, self._stdblocksize, self._dSamplingInterval, self.sample_size, self.protocol_version) = \ struct.unpack_from(fmt, payload, offset) offset += nread # offset += 4 # TODO: find out why! (fix for compatability with eegmanager 18ee0306a783fe1518742b1ea13b811e837662fb) # TODO: solved? if self.protocol_version == 2: self.nChannels = self.nChannels - 1 fmt = 'II' nread = struct.calcsize(fmt) part1, part2 = struct.unpack_from(fmt, payload, offset) offset += struct.calcsize('II') self.abs_start_time = part1 self.abs_start_time = self.abs_start_time << 32 self.abs_start_time += part2 if verbose: print "abs_start_time: %d \n" % (self.abs_start_time) print nread if verbose: print "\n\n\nmessage start:\n nChannels: %d\n dSamplingInterval: %d\n Blocksize: %d\nSampleSize: %d\nProtocol-Version: %d\n" \ % (self.nChannels, self.dSamplingInterval, self.stdblocksize, self.sample_size, self.protocol_version) # read resolutions fmt = '%dB' % (256) nread = struct.calcsize(fmt) self.resolutions = struct.unpack_from(fmt, payload, offset) if verbose: print " resolutions: ", self.resolutions offset += nread fmt = 'I' nread = struct.calcsize(fmt) (self.lenChannelNames) = \ struct.unpack_from(fmt, payload, offset) if verbose: print " lenChannelNames: %d\n" \ % (self.lenChannelNames) offset += nread # read channel names fmt = '%ds' % (self.lenChannelNames) namesbuf = struct.unpack_from(fmt, payload, offset) nread = struct.calcsize(fmt) # build list of channel names self._channelNames = list() nstr = '' for c in namesbuf[0]: if c == '\x00': self.channelNames.append(nstr) nstr = '' else: nstr += c if self.protocol_version == 2: assert 'marker' == self.channelNames.pop() assert len(self.channelNames) == self.nChannels if verbose: print "removed \'marker\' pseudo channel from channel-names.\n" for (cid, name) in enumerate(self.channelNames): self.channelids[name] = cid if verbose: print " channel names: ", self.channelNames, "\n" offset += nread fmt = 'I' nread = struct.calcsize(fmt) (self.lenMarkerNames) = \ struct.unpack_from(fmt, payload, offset) if verbose: print " lenMarkerNames: %d\n" \ % (self.lenMarkerNames) offset += nread # read marker names fmt = '%ds' % (self.lenMarkerNames) namesbuf = struct.unpack_from(fmt, payload, offset) # build list of marker names self._markerNames = dict() self._markerids = dict() nstr = '' for c in namesbuf[0]: if c == '\x00': self.markerNames[len(self.markerNames)] = nstr nstr = '' else: nstr += c self.markerNames[len(self.markerNames)] = 'null' self.nmarkertypes = len(self.markerNames) for (cid, name) in self.markerNames.iteritems(): self.markerids[name] = cid if verbose: print " marker names: ", self.markerNames, "\n" print " marker ids: ", self.markerids, "\n"
[docs] def _getdata32msg(self, payload, verbose=False): """Convenience method for getting data from 32-bit type data message (float)""" # TODO test with RDA server self._getdatamsg(payload, dtype='float32', msgtype='f')
[docs] def _getdatamsg(self, payload, verbose=False, dtype='short', msgtype='h'): """Get data from 16-bit type data message (short)""" offset = 0 fmt = 'II' nread = struct.calcsize(fmt) (time_code, nMarkers) = \ struct.unpack_from(fmt, payload, offset) offset += nread self.meta['time_code'] = time_code self.meta['n_markers'] = nMarkers # read markers #self.ndmarkers = None self.ndmarkers = numpy.zeros([self.stdblocksize], int) self.ndmarkers.fill(-1) if self.lostmarker: self.lostmarker = False self.ndmarkers[0] = self.lostmarkertypedesc #print "lenmarkers: "+str(nMarkers) for i in range(nMarkers): fmt = 'II' nread = struct.calcsize(fmt) (nPosition, ntypedesc) = \ struct.unpack_from(fmt, payload, offset) offset += nread if ntypedesc == -1 or ntypedesc >= self.nmarkertypes: raise Exception, "received unexpexted marker type (%d) not " \ "defined in header max(%d)" % (ntypedesc, self.nmarkertypes) if verbose: print " relPosition: nPosition: %d\n sTypeDesc: %d\n" % (nPosition, ntypedesc) print "%s" % self.markerNames[ntypedesc] #self.ndmarkers[len(self.ndmarkers)] = ntypedesc #(ntypedesc, nPosition) if self.ndmarkers[nPosition-1] == -1: self.ndmarkers[nPosition-1] = ntypedesc elif (nPosition - 1) < (self.stdblocksize - 1): self.ndmarkers[nPosition] = ntypedesc else: self.lostmarker = True self.lostmarkertypedesc = ntypedesc # print self.ndmarkers # build dictionary of marker names on the fly # this information should really come from the header in the next # version of RDA server # ntypedesc = None # if not self.markerids.has_key(sTypeDesc): # self.nmarkertypes += 1 # self.markerids[sTypeDesc] = self.nmarkertypes # ntypedesc = self.nmarkertypes # else: # ntypedesc = self.markerids[sTypeDesc] # # self.markernames[ntypedesc] = sTypeDesc # check if this is ok #self.ndmarkers[nPosition-1] = ntypedesc # print "nPosition ", nPosition # * struct.calcsize(msgtype) self.readSize = (self.nChannels * self.stdblocksize * struct.calcsize(msgtype)) dt = numpy.dtype(numpy.int16) self.ndsamples = numpy.frombuffer(payload[offset:offset + self.readSize], dtype=dt) self.ndsamples.shape = (self.stdblocksize, self.nChannels) self.ndsamples = scipy.transpose(self.ndsamples)
[docs] def _getmuxdatamsg(self, payload, verbose=False): """Get data from 16/32-bit type data message (short)""" offset = 0 fmt = 'II' nread = struct.calcsize(fmt) time_code, sample_size = struct.unpack_from(fmt, payload, offset) offset += nread self.meta['time_code'] = time_code self.meta['sample_size'] = sample_size sample_size = int(sample_size) if sample_size == 2: dtype = numpy.int16 msgtype = 'h' elif sample_size == 4: dtype = numpy.int32 msgtype = 'i' else: print "recovered sample_size (%d) is unknown or an error!" % sample_size raise IOError if verbose: print "unpacking message (dtype:=%s, msgtype=%s)" % (dtype, msgtype) # read markers # TODO: extract markers self.readSize = ((self.nChannels+1) * self.stdblocksize * struct.calcsize(msgtype)) dt = numpy.dtype(dtype) raw_data = numpy.frombuffer(payload[offset:offset + self.readSize], dtype=dt) raw_data.shape = (self.stdblocksize, self.nChannels+1) # self.ndsamples = numpy.hsplit(raw_data, numpy.array([raw_data.shape[1]-1, raw_data.shape[1]]))[0] self.ndsamples = raw_data[:,:self.nChannels] self.ndsamples.shape = (self.stdblocksize, self.nChannels) self.ndsamples = scipy.transpose(self.ndsamples) comp_markers = list() # raw_markers = numpy.hsplit(raw_data, numpy.array([raw_data.shape[1]-1, raw_data.shape[1]], numpy.int16))[1] raw_markers = raw_data[:,self.nChannels] for m in raw_markers: if m != 0: smarker = (m & 0xff) rmarker = (m & 0xff00) >> 8 if rmarker == 0: comp_markers.append(self.markerids[str("S%3d" % smarker)]) else: comp_markers.append(self.markerids[str("R%3d" % rmarker)]) else: comp_markers.append(-1) self.ndmarkers = numpy.array(comp_markers)
# print "samples shape: ", self.ndsamples.shape # print "markers shape: ", self.ndmarkers.shape
[docs] def _grace(self, signum, stackframe): self.socket.close() sys.exit(1)
[docs]class EEGServer(object): """EEG stream server for EEG stream protocol"""
[docs] def __init__(self, absolute_data_path, block_size=4, port=51244): self.port = port self.data_path = absolute_data_path self.block_size = block_size self.server_process = None self.server_proxy = None self.executable = os.path.join("eegmanager", "release", "eegmanager")
[docs] def __del__(self): if self.server_process != None: self.server_proxy.shut_down() else: warnings.warn('EEG-Server could not be started. Did you compile it correctly?')
[docs] def start(self): """ Starts the EEG server""" if None == self.server_process: server_out_log = open("server_out_log", 'w') server_err_log = open("server_err_log", 'w') eeg_acq_root = os.sep.join(sys.modules[__name__].__file__.split(os.sep)[:-1]) # Workaround for finding Free XML-Ports freeport = False call = list() call.append(self.executable) try: files = glob.glob(eeg_acq_root + self.executable) except: raise Exception("There is no existing executable in %s. Please compile!" % (os.path.join(eeg_acq_root, self.executable))) while not freeport: # Add Port number, create test-ProxyServer, # XMLRPC-Port range: 16253..26253 xmlport = 16253 + random.randint(0,10000) call.append(str(xmlport)) test_proxy = xmlrpclib.ServerProxy("http://127.0.0.1:%s" % call[1]) try: test_proxy.system.listMethods() except socket.error, (value, message): freeport = True del test_proxy # Remove Portnumber if not freeport: call.pop() self.server_process = subprocess.Popen(call, cwd=eeg_acq_root, stdin=None, stderr=server_err_log, stdout=server_out_log) self.server_proxy = xmlrpclib.ServerProxy("http://127.0.0.1:%s" % call[1]) mysocket = None # wait for the server to startup... # TODO: check this! Time depends on number of started processes! time.sleep(3) try: #try to get a socket connection and start the server self.server_proxy.start(self.data_path, self.port, self.block_size) time.sleep(5) # check if the server process is still running? # returncode: None -> still running # Numeric -> terminated self.server_process.poll() if(self.server_process.returncode != None): ret = self.server_process.returncode self.server_process = None raise SystemError, "Server Process should run but exited with status %d" % (ret) mysocket = socket.socket(socket.AF_INET, socket.SOCK_STREAM) connected = mysocket.connect(("127.0.0.1", self.port)) except socket.error, (value,message): # failure in connection if mysocket: mysocket.close() # could not open connection -> kill server os.kill(self.server_process.pid, signal.SIGKILL) raise IOError, "error: %d could not open socket(%d): %s" % (value, self.port, message) time.sleep(1) #Create a method which handles the tear down of the server def kill(): """ Kills the running EEG server """ self.server_proxy.stop() time.sleep(.3) self.server_proxy.shut_down() #Add signal handler signals = (signal.SIGHUP, signal.SIGINT, signal.SIGTERM, signal.SIGQUIT) for sig in signals: signal.signal(sig, kill)
[docs] def reset(self): """ Reset the EEG server so that he points again to the beginning""" self.server_proxy.stop() time.sleep(1) self.server_proxy.shut_down() time.sleep(1) self.start()
[docs]class EEGClientUsb(AbstractStreamReader): """ Acquire raw streamed eeg-data from usb-acquisition submodule This implementation uses the usb1 module, available from Pip/easy_install. usb1 is used to wrap calls to the libusbx c-library. """
[docs] def __init__(self, ipmarker_server=None, **kwargs): """ currently no parameter """ if usb_warning: warnings.warn("The usb_acquisition Module could not be imported!") super(EEGClientUsb, self).__init__(**kwargs) self.all_channels = ['Fp1', 'Fp2', 'F7', 'F3', 'Fz', 'F4', 'F8', 'FC5',\ 'FC1', 'FC2', 'FC6', 'T7', 'C3', 'Cz', 'C4', 'T8',\ 'TP9', 'CP5', 'CP1', 'CP2', 'CP6', 'TP10', 'P7',\ 'P3', 'Pz', 'P4', 'P8', 'PO9', 'O1', 'Oz', 'O2',\ 'PO10', 'AF7', 'AF3', 'AF4', 'AF8', 'F5', 'F1',\ 'F2', 'F6', 'FT9', 'FT7', 'FC3', 'FC4', 'FT8',\ 'FT10', 'C5', 'C1', 'C2', 'C6', 'TP7', 'CP3',\ 'CPz', 'CP4', 'TP8', 'P5', 'P1', 'P2', 'P6',\ 'PO7', 'PO3', 'POz', 'PO4', 'PO8', 'Fpz', 'F9',\ 'AFF5h', 'AFF1h', 'AFF2h', 'AFF6h', 'F10', 'FTT9h',\ 'FTT7h', 'FCC5h', 'FCC3h', 'FCC1h', 'FCC2h',\ 'FCC4h', 'FCC6h', 'FTT8h', 'FTT10h', 'TPP9h',\ 'TPP7h', 'CPP5h', 'CPP3h', 'CPP1h', 'CPP2h',\ 'CPP4h', 'CPP6h', 'TPP8h', 'TPP10h', 'POO9h',\ 'POO1', 'POO2', 'POO10h', 'Iz', 'AFp1', 'AFp2',\ 'FFT9h', 'FFT7h', 'FFC5h', 'FFC3h', 'FFC1h',\ 'FFC2h', 'FFC4h', 'FFC6h', 'FFT8h', 'FFT10h',\ 'TTP7h', 'CCP5h', 'CCP3h', 'CCP1h', 'CCP2h',\ 'CCP4h', 'CCP6h', 'TTP8h', 'P9', 'PPO9h', 'PPO5h',\ 'PPO1h', 'PPO2h', 'PPO6h', 'PPO10h', 'P10', 'I1',\ 'OI1h', 'OI2h', 'I2'] self.ipmarker_server = ipmarker_server self.ip_lastmarker = (None, None) self._dSamplingInterval = 5000 self._channelNames = None self._markerids = dict() self._markerNames = dict() self.marker_id_counter = 0 self._stdblocksize = 100 self.fmt = None self.channelids = None self.resolutions = None self.nChannels = None self.callbacks = list() self.raw_data = list() self.timestamp = 0 self.dig_lastmarker = 0 self.acquisition = BUASubprocess() self.acquisition.start() self.connect()
@property def dSamplingInterval(self): return self._dSamplingInterval @property def stdblocksize(self): return self._stdblocksize @property def markerids(self): return self._markerids @property def channelNames(self): return self._channelNames @property def markerNames(self): return self._markerNames
[docs] def regcallback(self, func): self.callbacks.append(func)
[docs] def block_length_ms(self): return (self.stdblocksize*1000)/self.dSamplingInterval
[docs] def disconnect(self): if self.ipmarker_server is not None: self.ipmarker_server.stop() self.ipmarker_server.join() self.acquisition.stop() self.acquisition.join(timeout=5) del self.acquisition
[docs] def new_marker_id(self): self.marker_id_counter += 1 return self.marker_id_counter
[docs] def connect(self, verbose=False): while self.acquisition.nchannels.value < 0: if not self.acquisition.is_alive(): raise IOError, "Acquisition quit early!" time.sleep(.1) self.nChannels = self.acquisition.nchannels.value if self.nChannels == 0: raise IOError, "No Amplifiers found! Switch them on?" # generate all possible marker names and ids self._markerids['null'] = 0 for s in range(1,256,1): self._markerids[str('S%d' % s)] = self.new_marker_id() for r in range(1,256,1): self._markerids[str('R%d' % r)] = self.new_marker_id() # generate reverse mapping for k,v in zip(self._markerids.iterkeys(), self._markerids.itervalues()): self._markerNames[v] = k # select channelnames self._channelNames = self.all_channels[:self.nChannels] # calculate raw-data threshold while self.acquisition.nextra_channels.value < 0 \ or self.acquisition.nall_channels < 0: time.sleep(.1) self.nextra_channels = self.acquisition.nextra_channels.value self.nall_channels = self.acquisition.nall_channels.value self.min_raw_data = self.stdblocksize * self.nall_channels
[docs] def read(self, nblocks=1, verbose=False): readblocks = 0 while (readblocks < nblocks or nblocks == -1): # get enough raw data blocks self.gather_enough_data() # split data and marker in seperate arrays ndsamples, ndmarkers = self.separate() for f in self.callbacks: f(ndsamples, ndmarkers) readblocks += 1 return readblocks
[docs] def gather_enough_data(self): """ gets enough data to generate a block of size stdblocksize in channel and markers """ while len(self.raw_data) < self.min_raw_data: data, timestamp = self.acquisition.read() if self.fmt is None: self.fmt = str("%dh" % (len(data)/2)) values = struct.unpack(self.fmt, data) self.raw_data.extend(values) self.timestamp = timestamp-self.block_length_ms()
[docs] def separate(self): """ separates the raw-data into the data- and marker-channels """ packet = self.raw_data[0:self.min_raw_data] self.raw_data[0:self.min_raw_data] = [] # example block-layout for 32 channels: # [marker:1][reserved:4][nchannels data:32] :|| data = list() mark = list() for i in range(self.stdblocksize): mark.append(self.digital_marker(packet[0])) data.extend(packet[self.nextra_channels:self.nextra_channels+self.nChannels]) packet[0:self.nall_channels] = [] if self.ipmarker_server is not None: mark = self.insert_ip_markers(mark) ndata = numpy.array(data) ndata = ndata.reshape((self.stdblocksize,self.nChannels)) return ndata, mark
[docs] def insert_ip_markers(self, mark): while True: if not None in self.ip_lastmarker: m, t = self.ip_lastmarker self.ip_lastmarker = (None, None) else: m, t = self.ipmarker_server.read() if m is None or t is None: break time_index = self.time2index(t) mark_index = self.mark2index(m) if time_index > len(mark)-1: self.ip_lastmarker = (m, t) break elif time_index >= 0 and time_index < len(mark): mark[time_index] = mark_index else: warnings.warn("Index did not fit: %d (%d, %s)" % (time_index, mark_index, self.markerNames[mark_index])) return mark
[docs] def mark2index(self, m): if not self.markerids.has_key(m): new = self.new_marker_id() self.markerids[m] = new self.markerNames[new] = m # print("added new marker %s with id %d" % (m, new)) return self.markerids[m]
[docs] def time2index(self, t): index = ((t-self.timestamp)*self.stdblocksize)/self.block_length_ms() # make sure its not negative! # (a negative index means that the marker was delayed!) return max(0, index)
[docs] def digital_marker(self, raw_value): if raw_value == self.dig_lastmarker: value = -1 else: m = raw_value & (self.dig_lastmarker^0xffff) smarker = (m & 0xff) rmarker = (m & 0xff00) >> 8 if smarker != 0: value = self.markerids[str("S%d" % smarker)] elif rmarker != 0: value = self.markerids[str("R%d" % rmarker)] else: value = -1 self.dig_lastmarker = raw_value return value
[docs]def dummylisten1(samples, markers): sys.stdout.write("*") sys.stdout.flush()
[docs]def dummylisten2(samples, markers): print samples[:32]
[docs]def dummylisten3(samples, markers): # print markers pass
if __name__ == '__main__': if len(sys.argv) > 2: try: host = str(sys.argv[1]) print host if len(host.split(".")) != 4: raise Exception port = int(sys.argv[2]) print port c = EEGClient(host=host, port=port) except Exception: print "could not generate meaning from args %s" % sys.argv c = EEGClient(host='127.0.0.1', port=51244) else: # Setup for Localhost # c = EEGClient(host='127.0.0.1', port=51244) s = MarkerServer(port=55555) s.start() c = EEGClientUsb(ipmarker_server=s) def marker_listen(samples, markers): for i,m in enumerate(markers): if m != -1: print c.markerNames[m], i c.regcallback(marker_listen) print("running with %d channels" % c.nChannels) n = c.read(nblocks=1, verbose=True) start = time.time() n += c.read(nblocks=2500) stop = time.time() c.disconnect() total_b = n*c.stdblocksize*c.nChannels*c.sample_size total_kb = float(total_b)/1000.0 total_mb = float(total_kb)/1000.0 total_gb = float(total_mb)/1000.0 del c rate_mb_s = float(total_mb)/(stop-start) print "received %06f MB in %04f Seconds := %f MB/s" % (total_mb, (stop-start), rate_mb_s)