Source code for pySPACE.missions.nodes.visualization.eeg_visualization

""" Visualization of :mod:`time series <pySPACE.resources.data_types.time_series>`  based on EEG signals to combine it with mapping to real sensor positions """
import logging
import os, pylab, numpy, warnings

from pySPACE.missions.nodes.visualization.base import VisualizationBase
from pySPACE.resources.dataset_defs.stream import StreamDataset
from matplotlib.mlab import griddata

[docs]class ElectrodeCoordinationPlotNode(VisualizationBase): """ Node for plotting EEG time series as topographies. This node uses time series data and plots snapshots of the activity in the brain, i.e. in the electrode configuration space. The node inherits the functionality of the VisualisationBase. Therefore, see documentation of :mod:`VisualisationBase <pySPACE.missions.nodes.visualization.base>` to view basic functionality. **Parameters** :Layout Options: :contourlines: If set to True, contour lines are added to the plot. (*optional, default: False*) :nose_ears: Mark nose as triangle and ears as bars (common for EEG plots) (*optional, default: False*) :smooth_corners: If true, the generated graphics will be rectangular, i.e., the corners around the round head shape are filled. This is achieved by adding sham electrodes to the corners, who's signals are calculated as the means of neighboring electrodes. .. note:: This is only for visualization purposes and does not reflect the true data! Be careful with use in scientific publications! (*optional, default: False*) :add_info: If set to True, additional information (e.g. number of trial) is displayed. (*optional, default: False*) :figlabels: If this option is True, channel names and channel coordinates. (*optional, default: False*) :figtitle: The title that is displayed. If None, the class name is used. (*optional, default: None*) :single_plot: All results per window are plotted into one figure with columns being classes and rows being time points. (*optional, default: False*) :Influence the way how the data is plotted: :clip: If set to True, the values are clipped to maximum and minimum defined by parameter limits. This is only working if limits are defined. (*optional, default: False*) :limits: Here, the user can set the limits for the color in the contour plot (e.g. [-1.0,1.0]). If this option is not set, the colorbar is normalized according to the data. (*optional, default: False*) **Exemplary Call** .. code-block:: yaml - node : Time_Series_Source - node : Electrode_Coordination_Plot parameters : figlabels : True create_movie : True time_stamps : [200, 400] timeshift : -200 smooth_corners : False - node: Nil_Sink Here is an alternative call: .. code-block:: yaml - node : Electrode_Coordination_Plot parameters : single_trial : True accum_avg : True separate_training_and_test : True add_info : True time_stamps : [400] limit2class : Target :Author: Sirko Straube (sirko.straube@dfki.de) :Date of Last Revision: 2013/01/01 .. todo:: Depending on which plot backend is used, the resizing of the figure does not work well. Currently this is not well supported by matplotlib. Change when support is improved. The key command is fig.set_size_inches([a,b]) and the dpi property. """ input_types = ["TimeSeries"]
[docs] def __init__(self, clip=False, contourlines=False, limits=False, nose_ears=False, smooth_corners=False, add_info=False, single_plot=False, figlabels=False, figtitle=None, **kwargs): super(ElectrodeCoordinationPlotNode, self).__init__(**kwargs) if limits: limits = [float(i) for i in limits] #make sure limits consists of floats # define electrode grid xi = numpy.linspace(-125, 125, 200) yi = numpy.linspace(-100, 100, 200) self.set_permanent_attributes(xi=xi, yi=yi, clip=clip, contourlines=contourlines, limits=limits, nose_ears=nose_ears, smooth_corners=smooth_corners, add_info=add_info, single_plot=single_plot, figlabels=figlabels, figtitle=figtitle, time_checked=False)
[docs] def _plotValues(self, values, #dict TimeSeries values plot_label, #str Plot-Label fig_num, #int Figure-number for classify plots # 1: average, # 2: single trial, # 3: average accumulating store_dir = None, #str Directory to store the plots counter=0): #int Plotcounter for all trials #compute sampling_frequency and classes to plot sampling_frequency = values[values.keys()[0]].sampling_frequency list_of_classes = values.keys() num_of_classes = len(list_of_classes) #autoscale color bar or use user scaling if self.limits: levels = self._compute_levels() else: levels = None #compute maximum and minimum for colorbar scaling if not existing vmax = float(max(numpy.array(v).max() for v in values.itervalues())) vmin = float(min(numpy.array(v).min() for v in values.itervalues())) levels = self._compute_levels(limits=[vmin, vmax]) #normalizer=matplotlib.colors.Normalize(vmin=vmin, vmax=vmax) #computing time points to show num_tpoints = values.values()[0].shape[0] all_tpoints = numpy.arange(0, num_tpoints * (1000/sampling_frequency), 1000 / sampling_frequency ) + self.timeshift if self.time_stamps == [-1]: tpoints = all_tpoints else: #check if desired time points are existing and confirm if not self.time_checked: for t in self.time_stamps: if not t in all_tpoints: warnings.warn("Electrode_Coordination_Plot:: At least" \ " one desired time stamp not available!" \ " Legal time stamps are " \ + str(all_tpoints) + ". Switching to " \ "next legal time point. Please check " \ "for consistency!") if t < 0: new_t = self.timeshift else: new_t = range(0, t+1, int(1000/sampling_frequency))[-1] #if we obtain an empty list reset to timeshift if new_t == []: new_t = self.timeshift else: new_t = new_t+self.timeshift #finally check for too high or low values if new_t < self.timeshift: new_t = self.timeshift elif new_t > all_tpoints[-1]: new_t = all_tpoints[-1] self.time_stamps[self.time_stamps.index(t)] = new_t self.time_checked = True #has to be performed only once tpoints = numpy.array(self.time_stamps) num_of_tpoints = len(tpoints) # selecting formatting and clearing figure default_size = [8., 6.] if self.single_plot: num_of_rows = num_of_tpoints if num_of_rows > 4: default_size[0] = default_size[0]/(int((num_of_rows+3)/4)) default_size[1] = default_size[1]/(int((num_of_rows+3)/4)) else: num_of_rows = 1 f=pylab.figure(fig_num, figsize=[default_size[0]*num_of_classes, default_size[1]*num_of_rows]) if pylab.get_backend() in pylab.matplotlib.backends.interactive_bk: f.show() if counter%20 == 19: #clear every 20th trial pylab.figure(fig_num).clear() # Iterate over the time window for time_index in range(num_of_tpoints): pylab.subplots_adjust(left=0.025, right=0.8) #shift a bit to the left if self.single_plot: pl_offset=time_index else: pl_offset=0 for index, class_label in enumerate(list_of_classes): current_plot_num=(num_of_classes*pl_offset)+index+1 pylab.subplot(num_of_rows, num_of_classes, current_plot_num) pylab.gca().clear() # Get the values for the respective class data = values[class_label].view(numpy.ndarray) ec = self.get_metadata("electrode_coordinates") if ec is None: ec = StreamDataset.ec # observe channels channel_names = [channel for channel in values[class_label].channel_names if channel in ec.keys()] fcn = [channel for channel in values[class_label].channel_names if not channel in ec.keys()] if not fcn == []: self._log("Unsupported channels ignored:%s ."%str(fcn), level=logging.CRITICAL) if channel_names == []: self._log("No channel for plotting left.", level=logging.CRITICAL) return ec_2d = StreamDataset.project2d(ec) # Define x and y coordinates of electrodes in the order of # the channels of the data x = numpy.array([ec_2d[key][0] for key in channel_names]) y = numpy.array([ec_2d[key][1] for key in channel_names]) # The values of the electrodes at this point of time pos=list(all_tpoints).index(tpoints[time_index]) z = data[pos, :] if self.smooth_corners: x,y,z = self._smooth_corners(x,y,z, data, channel_names, pos) # griddata returns a masked array # you can get the data via zi[~zi.mask] try: zi = griddata(x, y, z, self.xi, self.yi) except RuntimeError: warnings.warn( "Natbib packackage is not available for interpolating a" " grid. Using linear interpolation instead.") zi = griddata(x, y, z, self.xi, self.yi, interpl='linear') # clip values if self.clip and self.limits: # minimum and maximum zi = numpy.clip(zi, self.limits[0], self.limits[1]) # contour the gridded data, # plotting dots at the nonuniform data points. cs=pylab.contourf(self.xi, self.yi, zi, 15, cmap=pylab.cm.jet, levels=levels) if self.contourlines: pylab.contour(self.xi, self.yi, zi, 15, linewidths=0.5, colors='k', levels=levels) if self.figlabels: # plot data points. if not self.smooth_corners: pylab.scatter(x, y, c='b', s=5, marker='o') else: # dont plot invented electrode positions pylab.scatter(x[:-4], y[:-4], c='b', s=5, marker='o') # Add channel labels for label, position in ec_2d.iteritems(): if label in channel_names: pylab.text(position[0], position[1], label) if self.add_info: if counter: if len(list_of_classes) > 1: if index == 0: pylab.text(-120, -98, 'Trial No. ' + str(counter), fontsize=12) else: pylab.text(-120, -98, 'Trial No. ' + str(counter), fontsize=12) if self.nose_ears: #nose ytips=[87.00,87.00, 97] xtips=[-10.00,10.00, 0] pylab.fill(xtips,ytips, facecolor='k', edgecolor='none') #left xtips=[-108.0,-113.0,-113.0,-108.0] ytips=[-10.0,-10.0,10.0,10.0] pylab.fill(xtips,ytips, facecolor='k', edgecolor='none') #right xtips=[108.0,114.0,113.0,108.0] ytips=[-10.0,-10.0,10.0,10.0] pylab.fill(xtips,ytips, facecolor='k', edgecolor='none') pylab.xlim(-125, 125) pylab.ylim(-100, 100) if not self.single_plot or time_index==0: #if single_plot=True do only for the first row if self.figtitle: pylab.title(self.figtitle, fontsize=20) else: pylab.title(class_label, fontsize=20) pylab.setp(pylab.gca(), xticks=[], yticks=[]) pylab.draw() caxis = pylab.axes([.85, .1, .04, .75]) cb = pylab.colorbar(mappable=cs, cax=caxis) # TODO: The label read 'Amplitude ($\mu$V)' # Removed the unit. Or can we really still assume # a (correct) \muV scale after all preprocessing? cb.ax.set_ylabel(r'Amplitude', fontsize=16) # Position of the time axes ax = pylab.axes([.79, .94, .18, .04]) pylab.gca().clear() pylab.bar(tpoints[time_index], 1.0, width=1000.0/sampling_frequency) pylab.xlim(tpoints[0], tpoints[-1]) pylab.xlabel("time (ms)", fontsize=12) pylab.setp(ax, yticks=[],xticks=[all_tpoints[0], tpoints[time_index], all_tpoints[-1]]) # Draw or store the figure if store_dir is None: pylab.draw() #pylab.show() elif self.single_plot and not current_plot_num==(num_of_rows*num_of_classes): #save only if everything is plotted pylab.draw() #pylab.show() else: current_split=self.current_split if current_split != 0 and not\ plot_label.endswith('_split_' + str(current_split)): #more than one split and first call plot_label = plot_label + '_split_' + str(current_split) f_name=str(store_dir) + str(os.sep) + str(plot_label) + "_" + str(int(tpoints[time_index])) pylab.savefig(f_name + ".png") if self.store_data: import pickle f_name=str(store_dir) + str(os.sep) + str(plot_label) pickle.dump(values, open(f_name + ".pickle",'w'))
[docs] def _smooth_corners(self, x, y, z, data, channel_names, time_index): """ Add sham electrodes to the corners of the coordinate system """ # invent new corner electrodes using x and y positions of the margin # electrodes of the 64 electrode cap. Data is mean of neighbouring # electrodes based, again, on the 64 electrode cap. # # frontleft FL positioned at [x(FT9), y(Fp1)] x=numpy.append(x,x[numpy.where(numpy.array(channel_names)=='FT9')]) y=numpy.append(y,y[numpy.where(numpy.array(channel_names)=='Fp1')]) nz = data[time_index, numpy.where(numpy.array(channel_names)=='Fp1')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='AF7')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='F7')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='FT9')] z=numpy.append(z, 0.1 * nz / 4.0) # frontright FR positioned at [x(FT10), y(Fp1)] x=numpy.append(x,x[numpy.where(numpy.array(channel_names)=='FT10')]) y=numpy.append(y,y[numpy.where(numpy.array(channel_names)=='Fp1')]) nz = data[time_index, numpy.where(numpy.array(channel_names)=='Fp2')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='AF8')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='F8')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='FT10')] z=numpy.append(z, 0.1 * nz / 4.0) # backleft BL positioned at [x(FT9), y(Oz)] x=numpy.append(x,x[numpy.where(numpy.array(channel_names)=='FT9')]) y=numpy.append(y,y[numpy.where(numpy.array(channel_names)=='Oz')]) nz = data[time_index, numpy.where(numpy.array(channel_names)=='TP9')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='P7')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='PO9')] z=numpy.append(z, 0.1 * nz / 4.0) # backright BR positioned at [x(FT10), y(Oz)] x=numpy.append(x,x[numpy.where(numpy.array(channel_names)=='FT10')]) y=numpy.append(y,y[numpy.where(numpy.array(channel_names)=='Oz')]) nz = data[time_index, numpy.where(numpy.array(channel_names)=='TP10')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='P8')] +\ data[time_index, numpy.where(numpy.array(channel_names)=='PO10')] z=numpy.append(z, 0.1 * nz / 4.0) return x,y,z
[docs] def _compute_levels(self, limits=None): if not limits: if self.limits: limits=self.limits else: #should never be reached return None rel_precision = int(('%1.e'%limits[0])[-3:]) #determine the precision of the values epsilon=pow(10, rel_precision-5) #add a small amount to make sure that clipping works step = (limits[1]-limits[0])/100 #split into 100 steps levels=numpy.arange(limits[0],limits[1]+epsilon,step) levels=numpy.round(levels, decimals=abs(rel_precision)+2) levels[0]=levels[0]-epsilon levels[-1]=levels[-1]+epsilon return levels
_NODE_MAPPING = {"Electrode_Coordination_Plot": ElectrodeCoordinationPlotNode}