The Bit Anvil

python bottle server sent events - one way to handle client disconnects

A Lot of people including me wonder how to handle client disconnections when using Server Sent Events (SSE). This example shows one way of doing it in bottle.

There is a stack overflow question I asked about this here

Note that for my purposes, I don't care if events are 'lost' when there is no client present, but you may care. If so, then take advantage of the SSE ID: record as discussed on Simon Baynes's blog post

How it works:

  • There is a main event queue event_queue 
  • There is a queue crated each time a SSE request comes in
  • The SSE request handler (def event_stream() ) registers it's new
    queue with the EventDispatcher.
  • The event dispatcher copies items from the main event_queue to each of the clients individual queues
  • The individual event queues are destroyed when the WSCGI server fails to write to the client, and hence reliqusihes it's hold on the local event queue. The EventDispatcher only has a week reference to the local event queues threfore it is automatically removed from the client list.

This seems like a merry dance, and it is. But I have not found a way as yet to be notified when a client has 'gone away' in bottle. I suspect it is possible by attaching to the internal bottle event handler in some way. However, for my particular case the code below works well. With a little tidy up and testing it can be put into proper service.

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
class EventDispatcher:
    def __init__(self, main_queue):
        self.main_queue = main_queue
        self.clients = weakref.WeakSet()

    def register_client(self, queue):
        logging.debug("Registering client queue %s", queue)
        self.clients.add(queue)

    def start(self):

        class EnqueueThread:
            def __init__(self, main_queue, clients):
                self.main_queue = main_queue
                self.clients = clients

            def __call__(self):
                while True:
                    msg = self.main_queue.get()
                    logging.debug("Got message. There are %d clients", len(self.clients))
                    for x in self.clients:
                        logging.debug("Delivering message to client %s", x)
                        try:
                            x.put_nowait(msg)
                        except Queue.Full:
                            logging.error("Client %s queue is full, message dropped", x)

        self.enqueue_thread = EnqueueThread(self.main_queue, self.clients)
        threading.Thread(None, self.enqueue_thread).start()

Comments