From 46faabe51f946f5ac208826df6a9685ede15e3a8 Mon Sep 17 00:00:00 2001 From: erickson Date: Sat, 29 Mar 2008 23:55:34 +0000 Subject: [PATCH] implemented the majority of server-side python. still need to add settings server parsing and other stuff i have probably overlooked git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1292 9efc2488-bf62-4759-914b-345cdb29e865 --- src/python/opensrf.py | 100 ++++++++++++++++++++++ src/python/osrf/app.py | 173 +++++++++++++++++++++++++++++++++++++++ src/python/osrf/apps/__init__.py | 0 src/python/osrf/apps/example.py | 54 ++++++++++++ src/python/osrf/conf.py | 6 +- src/python/osrf/json.py | 35 ++++---- src/python/osrf/net.py | 6 +- src/python/osrf/server.py | 69 +++++++++++++++- src/python/osrf/ses.py | 156 ++++++++++++++++++++++++++++------- src/python/osrf/stack.py | 117 +++++++++++++++----------- 10 files changed, 613 insertions(+), 103 deletions(-) create mode 100755 src/python/opensrf.py create mode 100644 src/python/osrf/app.py create mode 100644 src/python/osrf/apps/__init__.py create mode 100644 src/python/osrf/apps/example.py diff --git a/src/python/opensrf.py b/src/python/opensrf.py new file mode 100755 index 0000000..cd77bf3 --- /dev/null +++ b/src/python/opensrf.py @@ -0,0 +1,100 @@ +#!/usr/bin/python +# ----------------------------------------------------------------------- +# Copyright (C) 2008 Equinox Software, Inc. +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA +# ----------------------------------------------------------------------- + +import sys, getopt, os, signal +import osrf.system, osrf.server, osrf.app + +def help(): + print ''' + Manage one or more an OpenSRF applications + + Options: + -a + start -- Start a service + stop -- stop a service + restart -- restart a service + + -s + The service name + + -f + The OpenSRF config file + + -c + The OpenSRF config file context + + -p + The location of application PID files. Default is /tmp + + -d + If set, run in daemon (background) mode + ''' + sys.exit(0) + + +# Parse the command line options +ops, args = getopt.getopt(sys.argv[1:], 'a:s:f:c:p:d') +options = dict(ops) + +if '-a' not in options or '-s' not in options or '-f' not in options: + help() + +action = options['-a'] +service = options['-s'] +config_file = options['-f'] +config_ctx = options.get('-c', 'opensrf') +pid_dir = options.get('-p', '/tmp') +as_daemon = '-d' in options +pidfile = "%s/osrf_py_%s.pid" % (pid_dir, service) + + +if action == 'start': + + # connect to the OpenSRF network + osrf.system.System.net_connect( + config_file = config_file, config_context = config_ctx) + + # XXX load the settings configs... + osrf.app.Application.load(service, 'osrf.apps.example') # XXX example only for now + osrf.app.Application.register_sysmethods() + osrf.app.Application.application.global_init() + + controller = osrf.server.Controller(service) + controller.max_requests = 10 + controller.max_children = 6 + controller.min_children = 3 + + if as_daemon: + osrf.system.System.daemonize() + file = open(pidfile, 'w') + file.write(str(os.getpid())) + file.close() + + controller.run() + +elif action == 'stop': + file = open(pidfile) + pid = file.read() + file.close() + os.kill(int(pid), signal.SIGTERM) + os.remove(pidfile) + + diff --git a/src/python/osrf/app.py b/src/python/osrf/app.py new file mode 100644 index 0000000..721535f --- /dev/null +++ b/src/python/osrf/app.py @@ -0,0 +1,173 @@ +# ----------------------------------------------------------------------- +# Copyright (C) 2008 Equinox Software, Inc. +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA +# ----------------------------------------------------------------------- + +import time +import osrf.log, osrf.ses, osrf.json + + +class Method(object): + def __init__(self, **kwargs): + self.name = kwargs['api_name'] + self.handler = kwargs['method'] + self.stream = kwargs.get('stream', False) + self.argc = kwargs.get('argc', 0) + self.atomic = kwargs.get('atomic', False) + + def get_func(self): + ''' Returns the function handler reference ''' + return getattr(Application.application, self.handler) + + def get_doc(self): + ''' Returns the function documentation ''' + return self.get_func().func_doc + + + +class Application(object): + ''' Base class for OpenSRF applications. Provides static methods + for loading and registering applications as well as common + applicatoin methods. ''' + + # global application handle + application = None + name = None + methods = {} + + def __init__(self): + ''' Sets the application name and loads the application methods ''' + self.name = None + + def global_init(self): + ''' Override this method to run code at application startup ''' + pass + + def child_init(self): + ''' Override this method to run code at child startup. + This is useful for initializing database connections or + initializing other persistent resources ''' + pass + + def child_exit(self): + ''' Override this method to run code at process exit time. + This is useful for cleaning up resources like databaes + handles, etc. ''' + pass + + + @staticmethod + def load(name, module_name): + ''' Loads the provided application module ''' + Application.name = name + try: + osrf.log.log_info("Loading application module %s" % module_name) + exec('import %s' % module_name) + except Exception, e: + osrf.log.log_error("Error importing application module %s:%s" % ( + module_name, unicode(e))) + + @staticmethod + def register_app(app): + ''' Registers an application for use ''' + app.name = Application.name + Application.application = app + + @staticmethod + def register_method(**kwargs): + Application.methods[kwargs['api_name']] = Method(**kwargs) + if kwargs.get('stream'): + kwargs['atomic'] = 1 + kwargs['api_name'] += '.atomic' + Application.methods[kwargs['api_name']] = Method(**kwargs) + + + @staticmethod + def handle_request(session, osrf_msg): + ''' Find the handler, construct the server request, then run the method ''' + + req_method = osrf_msg.payload() + params = req_method.params() + method = Application.methods[req_method.method()] + handler = method.get_func() + + param_json = osrf.json.to_json(params) + param_json = param_json[1:len(param_json)-1] + + osrf.log.log_info("CALL: %s %s %s" % (session.service, method.name, param_json)) + server_req = osrf.ses.ServerRequest(session, osrf_msg.threadTrace(), method, params) + + result = None + try: + result = handler(server_req, *params) + except Exception, e: + osrf.log.log_error("Error running method %s %s %s" % (method.name, param_json, unicode(e))) + session.send_status( + osrf_msg.threadTrace(), + osrf.net_obj.NetworkObject.osrfMethodException({ + 'status' : unicode(e), + 'statusCode': osrf.const.OSRF_STATUS_INTERNALSERVERERROR + }) + ) + return + + server_req.respond_complete(result) + + @staticmethod + def register_sysmethods(): + ''' Registers the global system methods ''' + + Application.register_method( + api_name = 'opensrf.system.time', + method = 'sysmethod_time', + argc = 0, + ) + + Application.register_method( + api_name = 'opensrf.system.introspect', + method = 'sysmethod_introspect', + argc = 0, + stream = True + ) + + + def sysmethod_time(self, request): + '''@return type:number The current epoch time ''' + return time.time() + + def sysmethod_introspect(self, request, prefix=None): + ''' Generates a list of methods with method metadata + @param type:string The limiting method name prefix. If defined, + only methods matching the given prefix will be returned. + @return type:array List of method information ''' + + for name, method in self.methods.iteritems(): + if prefix is not None and prefix != name[:len(prefix)]: + continue + + request.respond({ + 'api_name' : name, + 'method' : method.handler, + 'service' : self.name, + 'argc' : method.argc, + 'params' : [], # XXX parse me + 'desc' : method.get_doc() # XXX parse me + + }) + + diff --git a/src/python/osrf/apps/__init__.py b/src/python/osrf/apps/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/src/python/osrf/apps/example.py b/src/python/osrf/apps/example.py new file mode 100644 index 0000000..337b5ec --- /dev/null +++ b/src/python/osrf/apps/example.py @@ -0,0 +1,54 @@ +# ----------------------------------------------------------------------- +# Copyright (C) 2008 Equinox Software, Inc. +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# +# You should have received a copy of the GNU General Public License +# along with this program; if not, write to the Free Software +# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA +# 02110-1301, USA +# ----------------------------------------------------------------------- +import os +import osrf.log +from osrf.app import Application + +class Example(Application): + ''' Example OpenSRF application. ''' + + Application.register_method( + api_name = 'opensrf.py-example.reverse', + method = 'reverse', + argc = 1, + stream = True + ) + + def reverse(self, request, message=''): + ''' Returns the given string in reverse order one character at a time + @param string Message to reverse + @return string The reversed message, one character at a time. ''' + idx = len(message) - 1 + while idx >= 0: + request.respond(message[idx]) + idx -= 1 + + def global_init(self): + osrf.log.log_debug("Running global init handler for %s" % __name__) + + def child_init(self): + osrf.log.log_debug("Running child init handler for process %d" % os.getpid()) + + def child_exit(self): + osrf.log.log_debug("Running child exit handler for process %d" % os.getpid()) + +Application.register_app(Example()) + + diff --git a/src/python/osrf/conf.py b/src/python/osrf/conf.py index 8bb00fa..6caa293 100644 --- a/src/python/osrf/conf.py +++ b/src/python/osrf/conf.py @@ -34,7 +34,7 @@ class Config(object): self.data = osrf.xml_obj.xml_file_to_object(self.file) Config.config = self - def getValue(self, key, idx=None): + def get_value(self, key, idx=None): if self.context: if re.search('/', key): key = "%s/%s" % (self.context, key) @@ -54,7 +54,7 @@ def get(key, idx=None): e.g. "domains.domain", "username" idx -- Optional array index if the searched value is an array member """ - return Config.config.getValue(key, idx) + return Config.config.get_value(key, idx) def get_no_ex(key, idx=None): @@ -66,7 +66,7 @@ def get_no_ex(key, idx=None): idx -- Optional array index if the searched value is an array member """ try: - return Config.config.getValue(key, idx) + return Config.config.get_value(key, idx) except: return None diff --git a/src/python/osrf/json.py b/src/python/osrf/json.py index 1508126..95babf6 100644 --- a/src/python/osrf/json.py +++ b/src/python/osrf/json.py @@ -41,24 +41,23 @@ def encode_object(obj): newobj[k] = encode_object(v) return newobj - else: - if isinstance(obj, list): - return [encode_object(v) for v in obj] - - else: - if isinstance(obj, NetworkObject): - reg = obj.get_registry() - data = obj.get_data() - if reg.protocol == 'array': - objarray = [] - for key in reg.keys: - objarray.append(data.get(key)) - data = objarray - - return { - OSRF_JSON_CLASS_KEY: reg.hint, - OSRF_JSON_PAYLOAD_KEY: encode_object(data) - } + elif isinstance(obj, list): + return [encode_object(v) for v in obj] + + elif isinstance(obj, NetworkObject): + reg = obj.get_registry() + data = obj.get_data() + + if reg.protocol == 'array': + objarray = [] + for key in reg.keys: + objarray.append(data.get(key)) + data = objarray + + return { + OSRF_JSON_CLASS_KEY: reg.hint, + OSRF_JSON_PAYLOAD_KEY: encode_object(data) + } return obj diff --git a/src/python/osrf/net.py b/src/python/osrf/net.py index 0fd1f95..a48bdaa 100644 --- a/src/python/osrf/net.py +++ b/src/python/osrf/net.py @@ -64,6 +64,7 @@ class NetworkMessage(object): self.thread = message.get_thread() self.recipient = message.get_to() self.router_command = None + self.router_class = None if message.xmlnode.hasProp('router_from') and \ message.xmlnode.prop('router_from') != '': self.sender = message.xmlnode.prop('router_from') @@ -75,10 +76,11 @@ class NetworkMessage(object): self.body = args.get('body') self.thread = args.get('thread') self.router_command = args.get('router_command') + self.router_class = args.get('router_class') @staticmethod def from_xml(xml): - doc=libxml2.parseDoc(xml) + doc = libxml2.parseDoc(xml) msg = Message(doc.getRootElement()) return NetworkMessage(msg) @@ -90,6 +92,8 @@ class NetworkMessage(object): self.body, self.thread) if self.router_command: msg.xmlnode.newProp('router_command', self.router_command) + if self.router_class: + msg.xmlnode.newProp('router_class', self.router_class) return msg def to_xml(self): diff --git a/src/python/osrf/server.py b/src/python/osrf/server.py index 630b7dd..f94cff5 100644 --- a/src/python/osrf/server.py +++ b/src/python/osrf/server.py @@ -19,7 +19,7 @@ # ----------------------------------------------------------------------- import os, sys, threading, logging, fcntl, socket, errno, signal, time -import osrf.log, osrf.net, osrf.system, osrf.stack +import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app # used to define the size of the PID/size leader in @@ -32,7 +32,8 @@ class Controller(object): ''' def __init__(self, service): - self.service = service + self.service = service # service name + self.application = None self.max_requests = 0 # max child requests self.max_children = 0 # max num of child processes self.min_childen = 0 # min num of child processes @@ -40,15 +41,22 @@ class Controller(object): self.child_idx = 0 # current index into the children array self.children = [] # list of children self.osrf_handle = None # xmpp handle + self.routers = [] # list of registered routers # Global status socketpair. All children relay their # availability info to the parent through this socketpair. self.read_status, self.write_status = socket.socketpair() + def load_app(self): + settings = osrf.set.get('activeapps.%s' % self.service) + def cleanup(self): ''' Closes management sockets, kills children, reaps children, exits ''' + osrf.log.log_info("Shutting down...") + self.cleanup_routers() + self.read_status.shutdown(socket.SHUT_RDWR) self.write_status.shutdown(socket.SHUT_RDWR) self.read_status.close() @@ -85,7 +93,11 @@ class Controller(object): # clear the recv callback so inbound messages do not filter through the opensrf stack self.osrf_handle.receive_callback = None + # connect to our listening routers + self.register_routers() + try: + osrf.log.log_debug("entering main server loop...") while True: # main server loop self.reap_children() @@ -216,6 +228,52 @@ class Controller(object): child.run() os._exit(0) + def register_routers(self): + ''' Registers this application instance with all configured routers ''' + routers = osrf.conf.get('routers.router') + + if not isinstance(routers, list): + routers = [routers] + + for router in routers: + if isinstance(router, dict): + if not 'services' in router or \ + self.service in router['services']['service']: + target = "%s@%s/router" % (router['name'], router['domain']) + self.register_router(target) + else: + router_name = osrf.conf.get('router_name') + target = "%s@%s/router" % (router_name, router) + self.register_router(target) + + + def register_router(self, target): + ''' Registers with a single router ''' + osrf.log.log_info("registering with router %s" % target) + self.routers.append(target) + + reg_msg = osrf.net.NetworkMessage( + recipient = target, + body = 'registering...', + router_command = 'register', + router_class = self.service + ) + + self.osrf_handle.send(reg_msg) + + def cleanup_routers(self): + ''' Un-registers with all connected routers ''' + for target in self.routers: + osrf.log.log_info("un-registering with router %s" % target) + unreg_msg = osrf.net.NetworkMessage( + recipient = target, + body = 'un-registering...', + router_command = 'unregister', + router_class = self.service + ) + self.osrf_handle.send(unreg_msg) + + class Child(object): ''' Models a single child process ''' @@ -232,7 +290,7 @@ class Child(object): ''' Loops, processing data, until max_requests is reached ''' while True: try: - size = int(self.read_data.recv(SIZE_PAD)) + size = int(self.read_data.recv(SIZE_PAD) or 0) data = self.read_data.recv(size) osrf.log.log_internal("recv'd data " + data) osrf.stack.push(osrf.net.NetworkMessage.from_xml(data)) @@ -242,6 +300,8 @@ class Child(object): self.send_status() except KeyboardInterrupt: pass + # run the exit handler + osrf.app.Application.application.child_exit() def send_status(self): ''' Informs the controller that we are done processing this request ''' @@ -255,4 +315,5 @@ class Child(object): ''' Connects the opensrf xmpp handle ''' osrf.net.clear_network_handle() osrf.system.System.net_connect(resource = '%s_drone' % self.controller.service) - + osrf.app.Application.application.child_init() + diff --git a/src/python/osrf/ses.py b/src/python/osrf/ses.py index 011b143..60c8b27 100644 --- a/src/python/osrf/ses.py +++ b/src/python/osrf/ses.py @@ -13,15 +13,11 @@ # GNU General Public License for more details. # ----------------------------------------------------------------------- -import osrf.json -import osrf.conf -import osrf.log -import osrf.net -import osrf.net_obj +import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const from osrf.const import OSRF_APP_SESSION_CONNECTED, \ OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \ OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \ - OSRF_MESSAGE_TYPE_REQUEST + OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS import osrf.ex import random, os, time, threading @@ -47,10 +43,18 @@ class Session(object): self.state = OSRF_APP_SESSION_DISCONNECTED self.remote_id = None self.locale = None + self.thread = None + self.service = None - def find_session(threadTrace): - return Session.session_cache.get(threadTrace) - find_session = staticmethod(find_session) + @staticmethod + def find_or_create(thread): + if thread in Session.session_cache: + return Session.session_cache[thread] + return ServerSession(thread) + + def set_remote_id(self, remoteid): + self.remote_id = remoteid + osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id) def wait(self, timeout=120): """Wait up to seconds for data to arrive on the network""" @@ -58,11 +62,14 @@ class Session(object): handle = osrf.net.get_network_handle() handle.recv(timeout) - def send(self, omessage): + def send(self, omessages): """Sends an OpenSRF message""" + if not isinstance(omessages, list): + omessages = [omessages] + net_msg = osrf.net.NetworkMessage( recipient = self.remote_id, - body = osrf.json.to_json([omessage]), + body = osrf.json.to_json(omessages), thread = self.thread, locale = self.locale, ) @@ -82,7 +89,7 @@ class ClientSession(Session): # call superclass constructor Session.__init__(self) - # the remote service we want to make requests of + # the service we are sending requests to self.service = service # the locale we want requests to be returned in @@ -128,7 +135,7 @@ class ClientSession(Session): self.reset_remote_id() osrf.log.log_debug("Sending request %s -> %s " % (self.service, method)) - req = Request(self, self.next_id, method, arr, self.locale) + req = ClientRequest(self, self.next_id, method, arr, self.locale) self.requests[str(self.next_id)] = req self.next_id += 1 req.send() @@ -178,9 +185,6 @@ class ClientSession(Session): self.state = OSRF_APP_SESSION_DISCONNECTED - def set_remote_id(self, remoteid): - self.remote_id = remoteid - osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id) def reset_remote_id(self): """Recovers the original remote id""" @@ -220,23 +224,27 @@ class ClientSession(Session): class Request(object): - """Represents a single OpenSRF request. - A request is made and any resulting respones are - collected for the client.""" - def __init__(self, session, rid, method=None, params=[], locale='en-US'): - self.session = session # my session handle self.rid = rid # my unique request ID self.method = method # method name self.params = params # my method params + self.locale = locale + self.complete = False # is this request done? + self.complete_time = 0 # time at which the request was completed + + +class ClientRequest(Request): + """Represents a single OpenSRF request. + A request is made and any resulting respones are + collected for the client.""" + + def __init__(self, session, rid, method=None, params=[], locale='en-US'): + Request.__init__(self, session, rid, method, params, locale) self.queue = [] # response queue self.reset_timeout = False # resets the recv timeout? - self.complete = False # has the server told us this request is done? self.send_time = 0 # local time the request was put on the wire - self.complete_time = 0 # time the server told us the request was completed self.first_response_time = 0 # time it took for our first reponse to be received - self.locale = locale def send(self): """Sends a request message""" @@ -268,14 +276,17 @@ class Request(object): orig_timeout = timeout while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0: + s = time.time() self.session.wait(timeout) - if orig_timeout >= 0: - timeout -= time.time() - s + if self.reset_timeout: self.reset_timeout = False timeout = orig_timeout + elif orig_timeout >= 0: + timeout -= time.time() - s + now = time.time() # ----------------------------------------------------------------- @@ -317,11 +328,100 @@ class Request(object): class ServerSession(Session): """Implements a server-side session""" - pass + def __init__(self, thread): + Session.__init__(self) + self.thread = thread + + def send_status(self, thread_trace, payload): + self.send( + osrf.net_obj.NetworkObject.osrfMessage( + { 'threadTrace' : thread_trace, + 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS, + 'payload' : payload, + 'locale' : self.locale + } + ) + ) + + def send_connect_ok(self, thread_trace): + status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({ + 'status' : 'Connection Successful', + 'statusCode': osrf.const.OSRF_STATUS_OK + }) + self.send_status(thread_trace, status_msg) +class ServerRequest(Request): + def __init__(self, session, rid, method, params=[]): + Request.__init__(self, session, rid, method, params, session.locale) + self.response_list = [] + + def _build_response_msg(self, data): + result = osrf.net_obj.NetworkObject.osrfResult({ + 'content' : data, + 'statusCode' : osrf.const.OSRF_STATUS_OK, + 'status' : 'OK' + }) + + return osrf.net_obj.NetworkObject.osrfMessage({ + 'threadTrace' : self.rid, + 'type' : OSRF_MESSAGE_TYPE_RESULT, + 'payload' : result, + 'locale' : self.locale + }) + + def _build_complete_msg(self): + + status = osrf.net_obj.NetworkObject.osrfConnectStatus({ + 'threadTrace' : self.rid, + 'status' : 'Request Complete', + 'statusCode': osrf.const.OSRF_STATUS_COMPLETE + }) + + return osrf.net_obj.NetworkObject.osrfMessage({ + 'threadTrace' : self.rid, + 'type' : OSRF_MESSAGE_TYPE_STATUS, + 'payload' : status, + 'locale' : self.locale + }) + + def respond(self, data): + ''' For non-atomic calls, this sends a response directly back + to the client. For atomic calls, this pushes the response + onto the response list ''' + osrf.log.log_internal("responding with %s" % str(data)) + if self.method.atomic: + self.response_list.append(data) + else: + self.session.send(self._build_response_msg(data)) + + def respond_complete(self, data): + ''' Sends a complete message accompanied by the final result if applicable ''' + + if self.complete: + return + self.copmlete = True + self.complete_time = time.time() + + if self.method.atomic: + if data is not None: + self.response_list.append(data) + self.session.send([ + self._build_response_msg(self.response_list), + self._build_complete_msg(), + ]) + + elif data is not None: + self.session.send([ + self._build_response_msg(data), + self._build_complete_msg(), + ]) + + else: + self.session.send(self._build_complete_msg()) + class MultiSession(object): ''' Manages multiple requests. With the current implementation, a 1 second @@ -347,7 +447,7 @@ class MultiSession(object): cont.id = len(self.reqs) self.reqs.append(cont) - def recv(self, timeout=10): + def recv(self, timeout=120): ''' Returns a tuple of req_id, response ''' duration = 0 block_time = 1 diff --git a/src/python/osrf/stack.py b/src/python/osrf/stack.py index 6bba0bb..3426ada 100644 --- a/src/python/osrf/stack.py +++ b/src/python/osrf/stack.py @@ -13,31 +13,19 @@ # GNU General Public License for more details. # ----------------------------------------------------------------------- -import osrf.json -import osrf.log -import osrf.ex -import osrf.ses -from osrf.const import OSRF_APP_SESSION_CONNECTED, \ - OSRF_APP_SESSION_DISCONNECTED, OSRF_MESSAGE_TYPE_RESULT, \ - OSRF_MESSAGE_TYPE_STATUS, OSRF_STATUS_COMPLETE, OSRF_STATUS_CONTINUE, \ - OSRF_STATUS_NOTFOUND, OSRF_STATUS_OK, OSRF_STATUS_TIMEOUT import time - +import osrf.json, osrf.log, osrf.ex, osrf.ses, osrf.const, osrf.app def push(net_msg): - ses = osrf.ses.Session.find_session(net_msg.thread) - - if not ses: - # This is an incoming request from a client, create a new server session - osrf.log.log_error("server-side sessions don't exist yet") - return + ses = osrf.ses.Session.find_or_create(net_msg.thread) ses.set_remote_id(net_msg.sender) + if not ses.service: + ses.service = osrf.app.Application.name omessages = osrf.json.to_object(net_msg.body) - osrf.log.log_internal("push(): received %d messages" \ - % len(omessages)) + osrf.log.log_internal("stack.push(): received %d messages" % len(omessages)) # Pass each bundled opensrf message to the message handler start = time.time() @@ -54,49 +42,80 @@ def handle_message(session, message): "type %s" % message.type()) if isinstance(session, osrf.ses.ClientSession): + handle_client(session, message) + else: + handle_server(session, message) + + +def handle_client(session, message): + + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT: + session.push_response_queue(message) + return + + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS: + + status_code = int(message.payload().statusCode()) + status_text = message.payload().status() + osrf.log.log_internal("handle_message(): processing STATUS, " + "status_code = %d" % status_code) - if message.type() == OSRF_MESSAGE_TYPE_RESULT: - session.push_response_queue(message) + if status_code == osrf.const.OSRF_STATUS_COMPLETE: + # The server has informed us that this request is complete + req = session.find_request(message.threadTrace()) + if req: + osrf.log.log_internal("marking request as complete: %d" % req.rid) + req.set_complete() return - if message.type() == OSRF_MESSAGE_TYPE_STATUS: + if status_code == osrf.const.OSRF_STATUS_OK: + # We have connected successfully + osrf.log.log_debug("Successfully connected to " + session.service) + session.state = OSRF_APP_SESSION_CONNECTED + return - status_code = int(message.payload().statusCode()) - status_text = message.payload().status() - osrf.log.log_internal("handle_message(): processing STATUS, " - "status_code = %d" % status_code) + if status_code == osrf.const.OSRF_STATUS_CONTINUE: + # server is telling us to reset our wait timeout and keep waiting for a response + session.reset_request_timeout(message.threadTrace()) + return - if status_code == OSRF_STATUS_COMPLETE: - # The server has informed us that this request is complete - req = session.find_request(message.threadTrace()) - if req: - osrf.log.log_internal("marking request as complete: %d" % req.rid) - req.set_complete() - return + if status_code == osrf.const.OSRF_STATUS_TIMEOUT: + osrf.log.log_debug("The server did not receive a request from us in time...") + session.state = OSRF_APP_SESSION_DISCONNECTED + return - if status_code == OSRF_STATUS_OK: - # We have connected successfully - osrf.log.log_debug("Successfully connected to " + session.service) - session.state = OSRF_APP_SESSION_CONNECTED - return + if status_code == osrf.const.OSRF_STATUS_NOTFOUND: + osrf.log.log_error("Requested method was not found on the server: %s" % status_text) + session.state = OSRF_APP_SESSION_DISCONNECTED + raise osrf.ex.OSRFServiceException(status_text) - if status_code == OSRF_STATUS_CONTINUE: - # server is telling us to reset our wait timeout and keep waiting for a response - session.reset_request_timeout(message.threadTrace()) - return + if status_code == osrf.const.OSRF_STATUS_INTERNALSERVERERROR: + raise osrf.ex.OSRFServiceException("Server error %d : %s" % (status_code, status_text)) - if status_code == OSRF_STATUS_TIMEOUT: - osrf.log.log_debug("The server did not receive a request from us in time...") - session.state = OSRF_APP_SESSION_DISCONNECTED - return + raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code) - if status_code == OSRF_STATUS_NOTFOUND: - osrf.log.log_error("Requested method was not found on the server: %s" % status_text) - session.state = OSRF_APP_SESSION_DISCONNECTED - raise osrf.ex.OSRFServiceException(status_text) - raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code) +def handle_server(session, message): + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST: + osrf.log.log_debug("server received REQUEST from %s" % session.remote_id) + osrf.app.Application.handle_request(session, message) + return + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT: + osrf.log.log_debug("server received CONNECT from %s" % session.remote_id) + session.state == osrf.const.OSRF_APP_SESSION_CONNECTED + session.send_connect_ok(message.threadTrace()) + return + + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT: + osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id) + session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED + return + + if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS: + # Should never get here + osrf.log.log_warn("server received STATUS from %s" % session.remote_id) + return -- 2.11.0