From d72a0e50c2be73f772645bd21aff2448e3c5f492 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Thu, 20 Sep 2012 15:54:39 -0400 Subject: [PATCH] LP#1268619: python websockets server proof of concept Basically a drop-in replacement for http translator Signed-off-by: Bill Erickson --- src/python/osrf-websocket_wsh.py | 135 +++++++++++++++++++++++++++++++++++++++ 1 file changed, 135 insertions(+) create mode 100644 src/python/osrf-websocket_wsh.py diff --git a/src/python/osrf-websocket_wsh.py b/src/python/osrf-websocket_wsh.py new file mode 100644 index 0000000..1e49130 --- /dev/null +++ b/src/python/osrf-websocket_wsh.py @@ -0,0 +1,135 @@ +import os, sys, time, re, traceback, random +from mod_pywebsocket import msgutil +from mod_python import util +from datetime import datetime +import osrf.json, osrf.system, osrf.ses, osrf.conf, osrf.log, osrf.net + +''' +Apache: + +# set at the top level, not inside a virtualhost! + +PythonOption mod_pywebsocket.handler_root /path/to/osrf-websocket_wsh.py_directory/ +PythonOption mod_pywebsocket.allow_handlers_outside_root_dir On + + PythonHeaderParserHandler mod_pywebsocket.headerparserhandler + + +''' + +# TODO config settings +conf = '/openils/conf/opensrf_core.xml' +ctxt = 'config.gateway' + +# TODO: child init +osrf.system.System.net_connect(config_file=conf, config_context=ctxt) +handle = osrf.net.get_network_handle() +handle.set_receive_callback(None) +router_name = osrf.conf.get('router_name') +domain = osrf.conf.get('domain') + +# TODO: use apache logging +def _dbg(msg): + sys.stderr.write("%s\n\n" % str(msg)) + sys.stderr.flush() + +def web_socket_do_extra_handshake(request): + _dbg('opening new connection') + pass + +def web_socket_transfer_data(request): + params = util.parse_qsl(request.args) + + # parse_qsl generates a list of key/value tuples + service = [p for p in params if p[0] == 'service'][0][1] + thread = "%s%s%s" % ( + os.getpid(), + str(random.randint(100,100000)), + str(time.time()) + ) + + recipient = "%s@%s/%s" % (router_name, domain, service) + stateful = False + + while True: + + # client has opened a channel, let's see what they have to say + try: + _dbg('waiting for message...') + # TODO: timeout + req_body = msgutil.receive_message(request) + except Exception, e: + _dbg('exception on msgutil.receive_message()') + raise e + + _dbg('read message: ' + req_body) + + try: + osrf_msgs = osrf.json.to_object(req_body) + except Exception, e: + # invalid JSON; /kick + # TODO: log + # TODO: if this is a stateful connect, + # send a disconnect on behalf of the caller + return + + while handle.recv(0): + # discard stale statuses, etc. + pass + + for osrf_msg in osrf_msgs: + msg_type = osrf_msg.type() + + xmpp_msg = osrf.net.NetworkMessage( + recipient = recipient, + thread = thread, + body = req_body + ) + + handle.send(xmpp_msg) + + if msg_type == osrf.const.OSRF_MESSAGE_TYPE_CONNECT: + stateful = True + + if msg_type == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT: + _dbg('Exiting keepalive on disconnect') + return # caller is done + + while True: + _dbg('calling recv...') + + xmpp_msg = handle.recv(60) # TODO timeout config + if xmpp_msg is None: break + + _dbg('responding with ' + xmpp_msg.body) + + # send the entire collection back (i.e. leverage osrf chunking) + # and let the caller break the responses up into messages. + msgutil.send_message(request, xmpp_msg.body) + + # responses come from workers. ensure all future + # communication goes directly to the worker process + recipient = xmpp_msg.sender + + # see if the server has sent all responses.. + resp_list = osrf.json.to_object(xmpp_msg.body) + last_msg = resp_list[-1:][0] + + if last_msg.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS: + stat_code = int(last_msg.payload().statusCode()) + _dbg('status msg with code %s' % stat_code) + + if stat_code in [ + osrf.const.OSRF_STATUS_OK, osrf.const.OSRF_STATUS_COMPLETE]: + break + + if stat_code == osrf.const.OSRF_STATUS_TIMEOUT: + # drone timed out waiting on us + _dbg('Exiting keepalive on stateful timeout') + return + + if not stateful: + _dbg("Exiting non-stateful session") + return + + -- 2.11.0