LP#1268619: python websockets server proof of concept
authorBill Erickson <berick@esilibrary.com>
Thu, 20 Sep 2012 19:54:39 +0000 (15:54 -0400)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:33 +0000 (16:10 -0400)
Basically a drop-in replacement for http translator

Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/python/osrf-websocket_wsh.py [new file with mode: 0644]

diff --git a/src/python/osrf-websocket_wsh.py b/src/python/osrf-websocket_wsh.py
new file mode 100644 (file)
index 0000000..1e49130
--- /dev/null
@@ -0,0 +1,135 @@
+import os, sys, time, re, traceback, random
+from mod_pywebsocket import msgutil
+from mod_python import util
+from datetime import datetime
+import osrf.json, osrf.system, osrf.ses, osrf.conf, osrf.log, osrf.net
+
+'''
+Apache:
+
+# set at the top level, not inside a virtualhost!
+
+PythonOption mod_pywebsocket.handler_root /path/to/osrf-websocket_wsh.py_directory/
+PythonOption mod_pywebsocket.allow_handlers_outside_root_dir On
+<Location /osrf-websocket>
+    PythonHeaderParserHandler mod_pywebsocket.headerparserhandler
+</Location>
+
+'''
+
+# TODO config settings
+conf = '/openils/conf/opensrf_core.xml'
+ctxt = 'config.gateway'
+
+# TODO: child init
+osrf.system.System.net_connect(config_file=conf, config_context=ctxt)
+handle = osrf.net.get_network_handle()
+handle.set_receive_callback(None)
+router_name = osrf.conf.get('router_name')
+domain = osrf.conf.get('domain')
+
+# TODO: use apache logging
+def _dbg(msg):
+    sys.stderr.write("%s\n\n" % str(msg))
+    sys.stderr.flush()
+
+def web_socket_do_extra_handshake(request):
+    _dbg('opening new connection')
+    pass
+
+def web_socket_transfer_data(request):
+    params = util.parse_qsl(request.args)
+
+    # parse_qsl generates a list of key/value tuples
+    service = [p for p in params if p[0] == 'service'][0][1]
+    thread  = "%s%s%s" % (
+        os.getpid(), 
+        str(random.randint(100,100000)), 
+        str(time.time())
+    )
+
+    recipient = "%s@%s/%s" % (router_name, domain, service)
+    stateful = False
+
+    while True:
+
+        # client has opened a channel, let's see what they have to say
+        try:
+            _dbg('waiting for message...')
+            # TODO: timeout
+            req_body = msgutil.receive_message(request)
+        except Exception, e:
+            _dbg('exception on msgutil.receive_message()')
+            raise e
+
+        _dbg('read message: ' + req_body)
+
+        try:
+            osrf_msgs = osrf.json.to_object(req_body)
+        except Exception, e:
+            # invalid JSON; /kick
+            # TODO: log
+            # TODO: if this is a stateful connect, 
+            # send a disconnect on behalf of the caller
+            return
+
+        while handle.recv(0):
+            # discard stale statuses, etc.
+            pass
+
+        for osrf_msg in osrf_msgs:
+            msg_type = osrf_msg.type()
+
+            xmpp_msg = osrf.net.NetworkMessage(
+                recipient = recipient, 
+                thread = thread, 
+                body = req_body
+            )
+
+            handle.send(xmpp_msg)
+
+            if msg_type == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
+                stateful = True
+
+            if msg_type == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
+                _dbg('Exiting keepalive on disconnect')
+                return # caller is done
+
+            while True:
+                _dbg('calling recv...')
+
+                xmpp_msg = handle.recv(60) # TODO timeout config
+                if xmpp_msg is None: break
+
+                _dbg('responding with ' + xmpp_msg.body)
+
+                # send the entire collection back (i.e. leverage osrf chunking)
+                # and let the caller break the responses up into messages.
+                msgutil.send_message(request, xmpp_msg.body)
+
+                # responses come from workers. ensure all future 
+                # communication goes directly to the worker process
+                recipient = xmpp_msg.sender
+
+                # see if the server has sent all responses..
+                resp_list = osrf.json.to_object(xmpp_msg.body)
+                last_msg = resp_list[-1:][0]
+
+                if last_msg.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
+                    stat_code = int(last_msg.payload().statusCode())
+                    _dbg('status msg with code %s' % stat_code)
+
+                    if stat_code in [
+                        osrf.const.OSRF_STATUS_OK, osrf.const.OSRF_STATUS_COMPLETE]:
+                        break
+
+                    if stat_code == osrf.const.OSRF_STATUS_TIMEOUT:
+                        # drone timed out waiting on us
+                        _dbg('Exiting keepalive on stateful timeout')
+                        return
+
+        if not stateful: 
+            _dbg("Exiting non-stateful session")
+            return
+
+