A Lot of people including me wonder how to handle client disconnections when using Server Sent Events (SSE). This example shows one way of doing it in bottle.
There is a stack overflow question I asked about this here
Note that for my purposes, I don't care if events are 'lost' when there is no client present, but you may care. If so, then take advantage of the SSE ID: record as discussed on Simon Baynes's blog post
How it works:
- There is a main event queue event_queue
- There is a queue crated each time a SSE request comes in
- The SSE request handler (def event_stream() ) registers it's new
queue with the EventDispatcher.
- The event dispatcher copies items from the main event_queue to each of the clients individual queues
- The individual event queues are destroyed when the WSCGI server fails to write to the client, and hence reliqusihes it's hold on the local event queue. The EventDispatcher only has a week reference to the local event queues threfore it is automatically removed from the client list.
This seems like a merry dance, and it is. But I have not found a way as yet to be notified when a client has 'gone away' in bottle. I suspect it is possible by attaching to the internal bottle event handler in some way. However, for my particular case the code below works well. With a little tidy up and testing it can be put into proper service.
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
class EventDispatcher: def __init__(self, main_queue): self.main_queue = main_queue self.clients = weakref.WeakSet() def register_client(self, queue): logging.debug("Registering client queue %s", queue) self.clients.add(queue) def start(self): class EnqueueThread: def __init__(self, main_queue, clients): self.main_queue = main_queue self.clients = clients def __call__(self): while True: msg = self.main_queue.get() logging.debug("Got message. There are %d clients", len(self.clients)) for x in self.clients: logging.debug("Delivering message to client %s", x) try: x.put_nowait(msg) except Queue.Full: logging.error("Client %s queue is full, message dropped", x) self.enqueue_thread = EnqueueThread(self.main_queue, self.clients) threading.Thread(None, self.enqueue_thread).start()