python bottle server sent events - one way to handle client disconnects
2015-08-24
A Lot of people including me wonder how to handle client disconnections when using Server Sent Events (SSE). This example shows one way of doing it in bottle.
There is a stack overflow question I asked about this here
Note that for my purposes, I don't care if events are 'lost' when there is no client present, but you may care. If so, then take advantage of the SSE ID: record as discussed on Simon Baynes's blog post
How it works:
- There is a main event queue event_queue
- There is a queue crated each time a SSE request comes in
- The SSE request handler (def event_stream() ) registers it's new
queue with the EventDispatcher. - The event dispatcher copies items from the main event_queue to each of the clients individual queues
- The individual event queues are destroyed when the WSCGI server fails to write to the client, and hence reliqusihes it's hold on the local event queue. The EventDispatcher only has a week reference to the local event queues threfore it is automatically removed from the client list.
This seems like a merry dance, and it is. But I have not found a way as yet to be notified when a client has 'gone away' in bottle. I suspect it is possible by attaching to the internal bottle event handler in some way. However, for my particular case the code below works well. With a little tidy up and testing it can be put into proper service.
class EventDispatcher:
def __init__(self, main_queue):
self.main_queue = main_queue
self.clients = weakref.WeakSet()
def register_client(self, queue):
logging.debug("Registering client queue %s", queue)
self.clients.add(queue)
def start(self):
class EnqueueThread:
def __init__(self, main_queue, clients):
self.main_queue = main_queue
self.clients = clients
def __call__(self):
while True:
msg = self.main_queue.get()
logging.debug("Got message. There are %d clients", len(self.clients))
for x in self.clients:
logging.debug("Delivering message to client %s", x)
try:
x.put_nowait(msg)
except Queue.Full:
logging.error("Client %s queue is full, message dropped", x)
self.enqueue_thread = EnqueueThread(self.main_queue, self.clients)
threading.Thread(None, self.enqueue_thread).start()