import asyncio
import json
import logging
import sockjs.tornado
from biothings.utils.hub_db import ChangeListener
[docs]
class WebSocketConnection(sockjs.tornado.SockJSConnection):
    """
    Listen to Hub DB through a listener object, and publish
    events to any client connected
    """
    clients = set()
    def __init__(self, session, listeners):
        """
        SockJSConnection.__init__() takes only a session as argument, and there's
        no way to pass custom settings. In order to use that class, we need to use partial
        to partially init the instance with 'listeners' and let the rest use the 'session'
        parameter:
          pconn = partial(WebSocketConnection,listeners=listeners)
          ws_router = sockjs.tornado.SockJSRouter(pconn,"/path")
        """
        if type(listeners) != list:
            listeners = [listeners]
        self.listeners = listeners
        # propagate connection so listeners can access it and trigger message sending
        for listener in self.listeners:
            listener.socket = self
        super(WebSocketConnection, self).__init__(session)
[docs]
    def publish(self, message):
        self.broadcast(self.__class__.clients, message) 
[docs]
    def on_open(self, info):
        # Send that someone joined
        self.broadcast(self.__class__.clients, "Someone joined. %s" % info)
        # Add client to the clients list
        self.__class__.clients.add(self) 
[docs]
    def on_message(self, message):
        err = None
        strerr = None
        try:
            message = json.loads(message)
            if message["op"] == "ping":
                self.send({"op": "pong"})
        except json.JSONDecodeError:
            strerr = "malformed json message: %s" % message
            err = json.JSONDecodeError(strerr)
        except KeyError:
            strerr = "malformed socket message: %s" % message
            err = KeyError(strerr)
        except Exception as e:
            strerr = "Unable to process message '%s': %s" % (message, e)
            err = Exception(strerr)
        if err:
            self.send({"error": strerr})
            raise err 
[docs]
    def on_close(self):
        # Remove client from the clients list and broadcast leave message
        self.__class__.clients.remove(self)
        self.broadcast(self.__class__.clients, "Someone left.") 
 
[docs]
class HubDBListener(ChangeListener):
    """
    Get events from Hub DB and propagate them through the
    websocket instance
    """
[docs]
    def read(self, event):
        # self.socket is set while initalizing the websocket connection
        self.socket.publish(event) 
 
[docs]
class LogListener(ChangeListener):
    # IMPORTANT: no logging calls here, or infinite loop
    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.socket = None
[docs]
    def read(self, event):
        if self.socket:
            # make sure there's a loop in current thread
            try:
                logging.disable(logging.CRITICAL)
                loop = asyncio.get_event_loop()
            except RuntimeError:
                loop = asyncio.new_event_loop()
                asyncio.set_event_loop(loop)
            finally:
                logging.disable(logging.NOTSET)
            try:
                self.socket.publish(event)
            except Exception as e:
                # can't log anything there, but we don't want a problem with
                # issuing the log statements through the websocket to cause
                # any error in the caller
                print(e)
                pass 
 
[docs]
class ShellListener(LogListener):
    pass