import socket
import time
import struct
import threading
import select
import random
import Queue
import warnings
[docs]class MarkerSocket(threading.Thread):
[docs] def __init__(self, ip="10.250.3.83", port=55555, name="test", **kwargs):
super(MarkerSocket, self).__init__(**kwargs)
self.name = name
self.ip = ip
self.port = port
self.connected = False
self.running = True
[docs] def send(self, marker):
if not self.connected:
warnings.warn("%s not sent - socket is not connected!" % marker)
return
fmt = "bb6sQ"
data = struct.pack(fmt, 2, struct.calcsize(fmt), marker[:6], long(time.time()*1000))
try:
self.s.send(data)
except socket.error:
warnings.warn("%s not sent - socket error: %s" % (marker, socket.errno))
self.connected = False
[docs] def run(self):
fmt = "bb6sQQQQ"
while self.running:
while (not self.connected) and self.running:
try:
self.s = socket.socket()
self.s.connect((self.ip,self.port))
self.s.send(self.name)
except socket.error:
time.sleep(1)
continue
self.connected = True
break
while self.connected and self.running:
(r,w,e) = select.select([self.s], [], [], .01)
if self.s in r:
_t2 = long(time.time()*1000)
beacon = ""
try:
beacon = str(self.s.recv(struct.calcsize(fmt)))
except socket.error:
warnings.warn("%s: error during recv!" % self.name)
self.connected = False
if len(beacon) == 0:
warnings.warn("%s: connection closed by remote!" % self.name)
self.connected = False
continue
(typ, size, progress, t1, t2, t3, t4) = struct.unpack(fmt, beacon)
t2 = _t2
t3 = long(time.time()*1000)
progress += "*"
beacon = struct.pack(fmt, typ, size, progress, t1, t2, t3, t4)
self.s.send(beacon)
[docs] def stop(self):
self.running = False
[docs]class MarkerServer(threading.Thread):
[docs] def __init__(self, port=55555, sync_interval=10, **kwargs):
super(MarkerServer, self).__init__(**kwargs)
self.s = socket.socket()
self.s.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, True)
self.s.bind(("", port))
self.s.listen(50)
self.children = []
self.sync_interval = sync_interval
self.queue = Queue.Queue()
self.running = True
[docs] def stop(self):
self.running = False
[docs] def __repr__(self):
s = super(MarkerServer, self).__repr__()
return str("%s\n\tconnected to %d clients" % (s, len(self.children)))
[docs] def run(self):
while self.running:
(r,w,e) = select.select([self.s], [], [], .25)
if self.s in r:
(client, address) = self.s.accept()
print("connection requested %s" % (str(address)))
c = MarkerAcquisitionThread(client, address,
sync_interval=self.sync_interval,
queue=self.queue)
c.start()
self.children.append(c)
self.join_stopped_threads()
self.s.close()
for c in self.children:
if c.isAlive():
c.stop()
c.join()
self.children = []
[docs] def join_stopped_threads(self):
for c in self.children:
if not c.isAlive():
c.join()
[docs] def read(self):
if not self.queue.empty():
return self.queue.get(block=False)
return None, None
[docs]class MarkerAcquisitionThread(threading.Thread):
[docs] def __init__(self, client, address, sync_interval=10, queue=None, **kwargs):
super(MarkerAcquisitionThread, self).__init__(**kwargs)
self.client = client
self.address = address
self.sync_interval = sync_interval
self.delay_ms = 0
self.name = self.client.recv(128)
self.running = True
self.queue = queue
print("marker source %s@%s connected" % (self.name, self.address[0]))
[docs] def stop(self):
self.running = False
[docs] def __repr__(self):
s = super(MarkerAcquisitionThread, self).__repr__()
return str("%s:%s" % (s, self.address))
[docs] def run(self):
sync_fmt = "bb6sQQQQ"
mark_fmt = "bb6sQ"
last_sync = 0.0
while self.running:
(r,w,e) = select.select([self.client], [], [], .01)
if self.client in r:
try:
msg = self.client.recv(struct.calcsize(sync_fmt))
except socket.error as e:
print("client %s@%s: %s" % (self.name, self.address[0], e.strerror))
self.stop()
continue
if len(msg) == 0:
print("client %s@%s: socket closed" % (self.name, self.address[0]))
self.stop()
continue
elif len(msg) == struct.calcsize(mark_fmt):
self.show_marker(msg, mark_fmt)
elif len(msg) == struct.calcsize(sync_fmt):
self.sync_end(msg, sync_fmt)
else:
pass
if int(time.time()-last_sync) > self.sync_interval:
self.sync_start(sync_fmt)
last_sync = time.time()
self.client.close()
[docs] def sync_start(self, fmt):
t1 = long(time.time()*1000)
beacon = struct.pack(fmt, 1, struct.calcsize(fmt), "*", t1, 0, 0, 0)
self.client.send(beacon)
[docs] def sync_end(self, beacon, fmt):
(typ, size, progress, t1, t2, t3, _t4) = struct.unpack(fmt, beacon)
t4 = long(time.time()*1000)
self.delay_ms = ((int(t2)-int(t4))+(int(t3)-int(t1)))/2
# print("SYNC done! delay: %f [ms]" % self.delay_ms)
[docs] def show_marker(self, marker, fmt):
(typ, size, mark, t1) = struct.unpack(fmt, marker)
self.queue.put((str(mark).strip("\0"), int(t1-self.delay_ms)))
if __name__ == "__main__":
markerserver = MarkerServer(port=55555, sync_interval=15)
markerserver.start()
sockets = []
for i in range(25):
c = MarkerSocket(ip="127.0.0.1", port=55555, name=str("client%d" % i))
c.start()
sockets.append(c)
for s in sockets:
mark = str("S%3d" % int(random.random()*255))
print("sending marker %s with client %s" % (mark, s.name))
s.send(mark)
time.sleep(random.random()*1)
while True:
m = markerserver.read()
if None in m:
break
print m
print markerserver
for c in sockets:
c.stop()
c.join()
sockets = []
for i in range(markerserver.sync_interval*1, 0, -1):
print("waiting.. %d" % i)
time.sleep(1)
print markerserver
markerserver.stop()
markerserver.join()