implemented the majority of server-side python. still need to add settings server...
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Sat, 29 Mar 2008 23:55:34 +0000 (23:55 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Sat, 29 Mar 2008 23:55:34 +0000 (23:55 +0000)
git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1292 9efc2488-bf62-4759-914b-345cdb29e865

src/python/opensrf.py [new file with mode: 0755]
src/python/osrf/app.py [new file with mode: 0644]
src/python/osrf/apps/__init__.py [new file with mode: 0644]
src/python/osrf/apps/example.py [new file with mode: 0644]
src/python/osrf/conf.py
src/python/osrf/json.py
src/python/osrf/net.py
src/python/osrf/server.py
src/python/osrf/ses.py
src/python/osrf/stack.py

diff --git a/src/python/opensrf.py b/src/python/opensrf.py
new file mode 100755 (executable)
index 0000000..cd77bf3
--- /dev/null
@@ -0,0 +1,100 @@
+#!/usr/bin/python
+# -----------------------------------------------------------------------
+# Copyright (C) 2008  Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+
+import sys, getopt, os, signal
+import osrf.system, osrf.server, osrf.app
+
+def help():
+    print '''
+    Manage one or more an OpenSRF applications
+
+    Options:
+        -a <action>
+            start   -- Start a service
+            stop    -- stop a service
+            restart -- restart a service
+
+        -s <service>
+            The service name
+
+        -f <config file>
+            The OpenSRF config file
+
+        -c <config context>
+            The OpenSRF config file context
+
+        -p <PID dir>
+            The location of application PID files.  Default is /tmp
+
+        -d 
+            If set, run in daemon (background) mode
+    '''
+    sys.exit(0)
+
+
+# Parse the command line options
+ops, args = getopt.getopt(sys.argv[1:], 'a:s:f:c:p:d')
+options = dict(ops)
+
+if '-a' not in options or '-s' not in options or '-f' not in options:
+    help()
+
+action = options['-a']
+service = options['-s']
+config_file = options['-f']
+config_ctx = options.get('-c', 'opensrf')
+pid_dir = options.get('-p', '/tmp')
+as_daemon = '-d' in options
+pidfile = "%s/osrf_py_%s.pid" % (pid_dir, service)
+
+
+if action == 'start':
+
+    # connect to the OpenSRF network
+    osrf.system.System.net_connect(
+        config_file = config_file, config_context = config_ctx)
+
+    # XXX load the settings configs...
+    osrf.app.Application.load(service, 'osrf.apps.example') # XXX example only for now
+    osrf.app.Application.register_sysmethods()
+    osrf.app.Application.application.global_init()
+
+    controller = osrf.server.Controller(service)
+    controller.max_requests = 10
+    controller.max_children = 6
+    controller.min_children = 3
+
+    if as_daemon:
+        osrf.system.System.daemonize()
+        file = open(pidfile, 'w')
+        file.write(str(os.getpid()))
+        file.close()
+
+    controller.run()
+
+elif action == 'stop':
+    file = open(pidfile)
+    pid = file.read()
+    file.close()
+    os.kill(int(pid), signal.SIGTERM)
+    os.remove(pidfile)
+
+
diff --git a/src/python/osrf/app.py b/src/python/osrf/app.py
new file mode 100644 (file)
index 0000000..721535f
--- /dev/null
@@ -0,0 +1,173 @@
+# -----------------------------------------------------------------------
+# Copyright (C) 2008  Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+
+import time
+import osrf.log, osrf.ses, osrf.json
+
+
+class Method(object):
+    def __init__(self, **kwargs):
+        self.name = kwargs['api_name']
+        self.handler = kwargs['method']
+        self.stream = kwargs.get('stream', False)
+        self.argc = kwargs.get('argc', 0)
+        self.atomic = kwargs.get('atomic', False)
+
+    def get_func(self):
+        ''' Returns the function handler reference '''
+        return getattr(Application.application, self.handler)
+
+    def get_doc(self):
+        ''' Returns the function documentation '''
+        return self.get_func().func_doc
+        
+
+
+class Application(object):
+    ''' Base class for OpenSRF applications.  Provides static methods
+        for loading and registering applications as well as common 
+        applicatoin methods. '''
+
+    # global application handle
+    application = None
+    name = None
+    methods = {}
+
+    def __init__(self):
+        ''' Sets the application name and loads the application methods '''
+        self.name = None
+
+    def global_init(self):
+        ''' Override this method to run code at application startup '''
+        pass
+
+    def child_init(self):
+        ''' Override this method to run code at child startup.
+            This is useful for initializing database connections or
+            initializing other persistent resources '''
+        pass
+
+    def child_exit(self):
+        ''' Override this method to run code at process exit time.
+            This is useful for cleaning up resources like databaes 
+            handles, etc. '''
+        pass
+
+
+    @staticmethod
+    def load(name, module_name):
+        ''' Loads the provided application module '''
+        Application.name = name
+        try:
+            osrf.log.log_info("Loading application module %s" % module_name)
+            exec('import %s' % module_name)
+        except Exception, e:
+            osrf.log.log_error("Error importing application module %s:%s" % (
+                module_name, unicode(e)))
+
+    @staticmethod
+    def register_app(app):
+        ''' Registers an application for use '''
+        app.name = Application.name
+        Application.application = app
+
+    @staticmethod
+    def register_method(**kwargs):
+        Application.methods[kwargs['api_name']] = Method(**kwargs)
+        if kwargs.get('stream'):
+            kwargs['atomic'] = 1 
+            kwargs['api_name'] +=  '.atomic'
+            Application.methods[kwargs['api_name']] = Method(**kwargs)
+
+
+    @staticmethod
+    def handle_request(session, osrf_msg):
+        ''' Find the handler, construct the server request, then run the method '''
+
+        req_method = osrf_msg.payload()
+        params = req_method.params()
+        method = Application.methods[req_method.method()]
+        handler = method.get_func()
+
+        param_json = osrf.json.to_json(params)
+        param_json = param_json[1:len(param_json)-1]
+
+        osrf.log.log_info("CALL: %s %s %s" % (session.service, method.name, param_json))
+        server_req = osrf.ses.ServerRequest(session, osrf_msg.threadTrace(), method, params)
+
+        result = None
+        try:
+            result = handler(server_req, *params)
+        except Exception, e:
+            osrf.log.log_error("Error running method %s %s %s" % (method.name, param_json, unicode(e)))
+            session.send_status(
+                osrf_msg.threadTrace(),
+                osrf.net_obj.NetworkObject.osrfMethodException({   
+                    'status' : unicode(e),
+                    'statusCode': osrf.const.OSRF_STATUS_INTERNALSERVERERROR
+                })
+            )
+            return
+
+        server_req.respond_complete(result)
+
+    @staticmethod
+    def register_sysmethods():
+        ''' Registers the global system methods '''
+
+        Application.register_method(
+            api_name = 'opensrf.system.time',
+            method = 'sysmethod_time',
+            argc = 0,
+        )
+        
+        Application.register_method(
+            api_name = 'opensrf.system.introspect',
+            method = 'sysmethod_introspect',
+            argc = 0,
+            stream = True
+        )
+
+
+    def sysmethod_time(self, request):
+        '''@return type:number The current epoch time '''
+        return time.time()
+
+    def sysmethod_introspect(self, request, prefix=None):
+        ''' Generates a list of methods with method metadata 
+            @param type:string The limiting method name prefix.  If defined,
+            only methods matching the given prefix will be returned.
+            @return type:array List of method information '''
+
+        for name, method in self.methods.iteritems():
+            if prefix is not None and prefix != name[:len(prefix)]:
+                continue
+
+            request.respond({
+                'api_name' : name,
+                'method' : method.handler,
+                'service' : self.name,
+                'argc' : method.argc,
+                'params' : [], # XXX parse me
+                'desc' : method.get_doc() # XXX parse me
+
+            })
+
+
diff --git a/src/python/osrf/apps/__init__.py b/src/python/osrf/apps/__init__.py
new file mode 100644 (file)
index 0000000..e69de29
diff --git a/src/python/osrf/apps/example.py b/src/python/osrf/apps/example.py
new file mode 100644 (file)
index 0000000..337b5ec
--- /dev/null
@@ -0,0 +1,54 @@
+# -----------------------------------------------------------------------
+# Copyright (C) 2008  Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA  
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+import os
+import osrf.log
+from osrf.app import Application
+
+class Example(Application):
+    ''' Example OpenSRF application. '''
+
+    Application.register_method(
+        api_name = 'opensrf.py-example.reverse',
+        method = 'reverse',
+        argc = 1,
+        stream = True
+    )
+
+    def reverse(self, request, message=''):
+        ''' Returns the given string in reverse order one character at a time
+            @param string Message to reverse 
+            @return string The reversed message, one character at a time.  '''
+        idx = len(message) - 1
+        while idx >= 0:
+            request.respond(message[idx])
+            idx -= 1
+
+    def global_init(self):
+        osrf.log.log_debug("Running global init handler for %s" % __name__)
+
+    def child_init(self):
+        osrf.log.log_debug("Running child init handler for process %d" % os.getpid())
+
+    def child_exit(self):
+        osrf.log.log_debug("Running child exit handler for process %d" % os.getpid())
+
+Application.register_app(Example())
+
+
index 8bb00fa..6caa293 100644 (file)
@@ -34,7 +34,7 @@ class Config(object):
         self.data = osrf.xml_obj.xml_file_to_object(self.file)
         Config.config = self
     
-    def getValue(self, key, idx=None):
+    def get_value(self, key, idx=None):
         if self.context:
             if re.search('/', key):
                 key = "%s/%s" % (self.context, key)
@@ -54,7 +54,7 @@ def get(key, idx=None):
         e.g.  "domains.domain", "username"
     idx -- Optional array index if the searched value is an array member
     """
-    return Config.config.getValue(key, idx)
+    return Config.config.get_value(key, idx)
                 
 
 def get_no_ex(key, idx=None):
@@ -66,7 +66,7 @@ def get_no_ex(key, idx=None):
     idx -- Optional array index if the searched value is an array member
     """
     try:
-        return Config.config.getValue(key, idx)
+        return Config.config.get_value(key, idx)
     except:
         return None
 
index 1508126..95babf6 100644 (file)
@@ -41,24 +41,23 @@ def encode_object(obj):
             newobj[k] = encode_object(v)
         return newobj
 
-    else:
-        if isinstance(obj, list):
-            return [encode_object(v) for v in obj]
-
-        else:
-            if isinstance(obj, NetworkObject):
-                reg = obj.get_registry()
-                data = obj.get_data()
-                if reg.protocol == 'array':
-                    objarray = []
-                    for key in reg.keys:
-                        objarray.append(data.get(key)) 
-                    data = objarray
-
-                return {
-                    OSRF_JSON_CLASS_KEY: reg.hint,
-                    OSRF_JSON_PAYLOAD_KEY: encode_object(data)
-                }
+    elif isinstance(obj, list):
+        return [encode_object(v) for v in obj]
+
+    elif isinstance(obj, NetworkObject):
+        reg = obj.get_registry()
+        data = obj.get_data()
+
+        if reg.protocol == 'array':
+            objarray = []
+            for key in reg.keys:
+                objarray.append(data.get(key)) 
+            data = objarray
+
+        return {
+            OSRF_JSON_CLASS_KEY: reg.hint,
+            OSRF_JSON_PAYLOAD_KEY: encode_object(data)
+        }
 
     return obj
         
index 0fd1f95..a48bdaa 100644 (file)
@@ -64,6 +64,7 @@ class NetworkMessage(object):
             self.thread = message.get_thread()
             self.recipient = message.get_to()
             self.router_command = None
+            self.router_class = None
             if message.xmlnode.hasProp('router_from') and \
                 message.xmlnode.prop('router_from') != '':
                 self.sender = message.xmlnode.prop('router_from')
@@ -75,10 +76,11 @@ class NetworkMessage(object):
             self.body = args.get('body')
             self.thread = args.get('thread')
             self.router_command = args.get('router_command')
+            self.router_class = args.get('router_class')
 
     @staticmethod
     def from_xml(xml):
-        doc=libxml2.parseDoc(xml)
+        doc = libxml2.parseDoc(xml)
         msg = Message(doc.getRootElement())
         return NetworkMessage(msg)
         
@@ -90,6 +92,8 @@ class NetworkMessage(object):
             self.body, self.thread)
         if self.router_command:
             msg.xmlnode.newProp('router_command', self.router_command)
+        if self.router_class:
+            msg.xmlnode.newProp('router_class', self.router_class)
         return msg
 
     def to_xml(self):
index 630b7dd..f94cff5 100644 (file)
@@ -19,7 +19,7 @@
 # -----------------------------------------------------------------------
 
 import os, sys, threading, logging, fcntl, socket, errno, signal, time
-import osrf.log, osrf.net, osrf.system, osrf.stack
+import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
 
 
 # used to define the size of the PID/size leader in 
@@ -32,7 +32,8 @@ class Controller(object):
     '''
 
     def __init__(self, service):
-        self.service = service
+        self.service = service # service name
+        self.application = None
         self.max_requests = 0 # max child requests
         self.max_children = 0 # max num of child processes
         self.min_childen = 0 # min num of child processes
@@ -40,15 +41,22 @@ class Controller(object):
         self.child_idx = 0 # current index into the children array
         self.children = [] # list of children
         self.osrf_handle = None # xmpp handle
+        self.routers = [] # list of registered routers
 
         # Global status socketpair.  All children relay their 
         # availability info to the parent through this socketpair. 
         self.read_status, self.write_status = socket.socketpair()
 
+    def load_app(self):
+        settings = osrf.set.get('activeapps.%s' % self.service)
+        
 
     def cleanup(self):
         ''' Closes management sockets, kills children, reaps children, exits '''
 
+        osrf.log.log_info("Shutting down...")
+        self.cleanup_routers()
+
         self.read_status.shutdown(socket.SHUT_RDWR)
         self.write_status.shutdown(socket.SHUT_RDWR)
         self.read_status.close()
@@ -85,7 +93,11 @@ class Controller(object):
         # clear the recv callback so inbound messages do not filter through the opensrf stack
         self.osrf_handle.receive_callback = None
 
+        # connect to our listening routers
+        self.register_routers()
+
         try:
+            osrf.log.log_debug("entering main server loop...")
             while True: # main server loop
 
                 self.reap_children()
@@ -216,6 +228,52 @@ class Controller(object):
             child.run()
             os._exit(0)
 
+    def register_routers(self):
+        ''' Registers this application instance with all configured routers '''
+        routers = osrf.conf.get('routers.router')
+
+        if not isinstance(routers, list):
+            routers = [routers]
+
+        for router in routers:
+            if isinstance(router, dict):
+                if not 'services' in router or \
+                        self.service in router['services']['service']:
+                    target = "%s@%s/router" % (router['name'], router['domain'])
+                    self.register_router(target)
+            else:
+                router_name = osrf.conf.get('router_name')
+                target = "%s@%s/router" % (router_name, router)
+                self.register_router(target)
+
+
+    def register_router(self, target):
+        ''' Registers with a single router '''
+        osrf.log.log_info("registering with router %s" % target)
+        self.routers.append(target)
+
+        reg_msg = osrf.net.NetworkMessage(
+            recipient = target,
+            body = 'registering...',
+            router_command = 'register',
+            router_class = self.service
+        )
+
+        self.osrf_handle.send(reg_msg)
+
+    def cleanup_routers(self):
+        ''' Un-registers with all connected routers '''
+        for target in self.routers:
+            osrf.log.log_info("un-registering with router %s" % target)
+            unreg_msg = osrf.net.NetworkMessage(
+                recipient = target,
+                body = 'un-registering...',
+                router_command = 'unregister',
+                router_class = self.service
+            )
+            self.osrf_handle.send(unreg_msg)
+        
+
 class Child(object):
     ''' Models a single child process '''
 
@@ -232,7 +290,7 @@ class Child(object):
         ''' Loops, processing data, until max_requests is reached '''
         while True:
             try:
-                size = int(self.read_data.recv(SIZE_PAD))
+                size = int(self.read_data.recv(SIZE_PAD) or 0)
                 data = self.read_data.recv(size)
                 osrf.log.log_internal("recv'd data " + data)
                 osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
@@ -242,6 +300,8 @@ class Child(object):
                 self.send_status()
             except KeyboardInterrupt:
                 pass
+        # run the exit handler
+        osrf.app.Application.application.child_exit()
 
     def send_status(self):
         ''' Informs the controller that we are done processing this request '''
@@ -255,4 +315,5 @@ class Child(object):
         ''' Connects the opensrf xmpp handle '''
         osrf.net.clear_network_handle()
         osrf.system.System.net_connect(resource = '%s_drone' % self.controller.service)
-            
+        osrf.app.Application.application.child_init()
+
index 011b143..60c8b27 100644 (file)
 # GNU General Public License for more details.
 # -----------------------------------------------------------------------
 
-import osrf.json
-import osrf.conf
-import osrf.log
-import osrf.net
-import osrf.net_obj
+import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
 from osrf.const import OSRF_APP_SESSION_CONNECTED, \
     OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
     OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
-    OSRF_MESSAGE_TYPE_REQUEST
+    OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
 import osrf.ex
 import random, os, time, threading
 
@@ -47,10 +43,18 @@ class Session(object):
         self.state = OSRF_APP_SESSION_DISCONNECTED
         self.remote_id = None
         self.locale = None
+        self.thread = None
+        self.service = None
 
-    def find_session(threadTrace):
-        return Session.session_cache.get(threadTrace)
-    find_session = staticmethod(find_session)
+    @staticmethod
+    def find_or_create(thread):
+        if thread in Session.session_cache:
+            return Session.session_cache[thread]
+        return ServerSession(thread)
+
+    def set_remote_id(self, remoteid):
+        self.remote_id = remoteid
+        osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
 
     def wait(self, timeout=120):
         """Wait up to <timeout> seconds for data to arrive on the network"""
@@ -58,11 +62,14 @@ class Session(object):
         handle = osrf.net.get_network_handle()
         handle.recv(timeout)
 
-    def send(self, omessage):
+    def send(self, omessages):
         """Sends an OpenSRF message"""
+        if not isinstance(omessages, list):
+            omessages = [omessages]
+            
         net_msg = osrf.net.NetworkMessage(
             recipient      = self.remote_id,
-            body    = osrf.json.to_json([omessage]),
+            body    = osrf.json.to_json(omessages),
             thread = self.thread,
             locale = self.locale,
         )
@@ -82,7 +89,7 @@ class ClientSession(Session):
         # call superclass constructor
         Session.__init__(self)
 
-        # the remote service we want to make requests of
+        # the service we are sending requests to
         self.service = service
 
         # the locale we want requests to be returned in
@@ -128,7 +135,7 @@ class ClientSession(Session):
             self.reset_remote_id()
 
         osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
-        req = Request(self, self.next_id, method, arr, self.locale)
+        req = ClientRequest(self, self.next_id, method, arr, self.locale)
         self.requests[str(self.next_id)] = req
         self.next_id += 1
         req.send()
@@ -178,9 +185,6 @@ class ClientSession(Session):
         self.state = OSRF_APP_SESSION_DISCONNECTED
 
     
-    def set_remote_id(self, remoteid):
-        self.remote_id = remoteid
-        osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
 
     def reset_remote_id(self):
         """Recovers the original remote id"""
@@ -220,23 +224,27 @@ class ClientSession(Session):
 
 
 class Request(object):
-    """Represents a single OpenSRF request.
-        A request is made and any resulting respones are 
-        collected for the client."""
-
     def __init__(self, session, rid, method=None, params=[], locale='en-US'):
-
         self.session = session # my session handle
         self.rid     = rid # my unique request ID
         self.method = method # method name
         self.params = params # my method params
+        self.locale = locale
+        self.complete = False # is this request done?
+        self.complete_time =  0 # time at which the request was completed
+
+
+class ClientRequest(Request):
+    """Represents a single OpenSRF request.
+        A request is made and any resulting respones are 
+        collected for the client."""
+
+    def __init__(self, session, rid, method=None, params=[], locale='en-US'):
+        Request.__init__(self, session, rid, method, params, locale)
         self.queue  = [] # response queue
         self.reset_timeout = False # resets the recv timeout?
-        self.complete = False # has the server told us this request is done?
         self.send_time = 0 # local time the request was put on the wire
-        self.complete_time =  0 # time the server told us the request was completed
         self.first_response_time = 0 # time it took for our first reponse to be received
-        self.locale = locale
 
     def send(self):
         """Sends a request message"""
@@ -268,14 +276,17 @@ class Request(object):
 
         orig_timeout = timeout
         while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
+
             s = time.time()
             self.session.wait(timeout)
-            if orig_timeout >= 0:
-                timeout -= time.time() - s
+
             if self.reset_timeout:
                 self.reset_timeout = False
                 timeout = orig_timeout
 
+            elif orig_timeout >= 0:
+                timeout -= time.time() - s
+
         now = time.time()
 
         # -----------------------------------------------------------------
@@ -317,11 +328,100 @@ class Request(object):
 
 class ServerSession(Session):
     """Implements a server-side session"""
-    pass
 
+    def __init__(self, thread):
+        Session.__init__(self)
+        self.thread = thread
+
+    def send_status(self, thread_trace, payload):
+        self.send(
+            osrf.net_obj.NetworkObject.osrfMessage( 
+                {   'threadTrace' : thread_trace,
+                    'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
+                    'payload' : payload,
+                    'locale' : self.locale
+                } 
+            )
+        )
+
+    def send_connect_ok(self, thread_trace):
+        status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({   
+            'status' : 'Connection Successful',
+            'statusCode': osrf.const.OSRF_STATUS_OK
+        })
+        self.send_status(thread_trace, status_msg)
 
 
+class ServerRequest(Request):
 
+    def __init__(self, session, rid, method, params=[]):
+        Request.__init__(self, session, rid, method, params, session.locale)
+        self.response_list = []
+
+    def _build_response_msg(self, data):
+        result = osrf.net_obj.NetworkObject.osrfResult({
+            'content' :  data,
+            'statusCode' : osrf.const.OSRF_STATUS_OK,
+            'status' : 'OK'
+        })
+
+        return osrf.net_obj.NetworkObject.osrfMessage({
+            'threadTrace' : self.rid,
+            'type' : OSRF_MESSAGE_TYPE_RESULT,
+            'payload' : result,
+            'locale' : self.locale
+        })
+
+    def _build_complete_msg(self):
+
+        status = osrf.net_obj.NetworkObject.osrfConnectStatus({   
+            'threadTrace' : self.rid,
+            'status' : 'Request Complete',
+            'statusCode': osrf.const.OSRF_STATUS_COMPLETE
+        })
+
+        return osrf.net_obj.NetworkObject.osrfMessage({
+            'threadTrace' : self.rid,
+            'type' : OSRF_MESSAGE_TYPE_STATUS,
+            'payload' : status,
+            'locale' : self.locale
+        })
+
+    def respond(self, data):
+        ''' For non-atomic calls, this sends a response directly back
+            to the client.  For atomic calls, this pushes the response
+            onto the response list '''
+        osrf.log.log_internal("responding with %s" % str(data))
+        if self.method.atomic:
+            self.response_list.append(data)
+        else:
+            self.session.send(self._build_response_msg(data))
+
+    def respond_complete(self, data):
+        ''' Sends a complete message accompanied by the final result if applicable '''
+
+        if self.complete: 
+            return
+        self.copmlete = True
+        self.complete_time = time.time()
+
+        if self.method.atomic:
+            if data is not None:
+                self.response_list.append(data) 
+            self.session.send([
+                self._build_response_msg(self.response_list),
+                self._build_complete_msg(),
+            ])
+
+        elif data is not None:
+            self.session.send([
+                self._build_response_msg(data),
+                self._build_complete_msg(),
+            ])
+
+        else:
+            self.session.send(self._build_complete_msg())
+            
 
 class MultiSession(object):
     ''' Manages multiple requests.  With the current implementation, a 1 second 
@@ -347,7 +447,7 @@ class MultiSession(object):
         cont.id = len(self.reqs)
         self.reqs.append(cont)
 
-    def recv(self, timeout=10):
+    def recv(self, timeout=120):
         ''' Returns a tuple of req_id, response '''
         duration = 0
         block_time = 1
index 6bba0bb..3426ada 100644 (file)
 # GNU General Public License for more details.
 # -----------------------------------------------------------------------
 
-import osrf.json
-import osrf.log
-import osrf.ex
-import osrf.ses
-from osrf.const import OSRF_APP_SESSION_CONNECTED, \
-    OSRF_APP_SESSION_DISCONNECTED, OSRF_MESSAGE_TYPE_RESULT, \
-    OSRF_MESSAGE_TYPE_STATUS, OSRF_STATUS_COMPLETE, OSRF_STATUS_CONTINUE, \
-    OSRF_STATUS_NOTFOUND, OSRF_STATUS_OK, OSRF_STATUS_TIMEOUT
 import time
-
+import osrf.json, osrf.log, osrf.ex, osrf.ses, osrf.const, osrf.app
 
 def push(net_msg):
-    ses = osrf.ses.Session.find_session(net_msg.thread)
-
-    if not ses:
-        # This is an incoming request from a client, create a new server session
-        osrf.log.log_error("server-side sessions don't exist yet")
-        return
 
+    ses = osrf.ses.Session.find_or_create(net_msg.thread)
     ses.set_remote_id(net_msg.sender)
+    if not ses.service:
+        ses.service = osrf.app.Application.name
 
     omessages = osrf.json.to_object(net_msg.body)
 
-    osrf.log.log_internal("push(): received %d messages" \
-        % len(omessages))
+    osrf.log.log_internal("stack.push(): received %d messages" % len(omessages))
 
     # Pass each bundled opensrf message to the message handler
     start = time.time()
@@ -54,49 +42,80 @@ def handle_message(session, message):
         "type %s" % message.type())
 
     if isinstance(session, osrf.ses.ClientSession):
+        handle_client(session, message)
+    else:
+        handle_server(session, message)
+
+
+def handle_client(session, message):
+
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
+        session.push_response_queue(message)
+        return
+
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
+
+        status_code = int(message.payload().statusCode())
+        status_text = message.payload().status()
+        osrf.log.log_internal("handle_message(): processing STATUS, "
+            "status_code =  %d" % status_code)
 
-        if message.type() == OSRF_MESSAGE_TYPE_RESULT:
-            session.push_response_queue(message)
+        if status_code == osrf.const.OSRF_STATUS_COMPLETE:
+            # The server has informed us that this request is complete
+            req = session.find_request(message.threadTrace())
+            if req: 
+                osrf.log.log_internal("marking request as complete: %d" % req.rid)
+                req.set_complete()
             return
 
-        if message.type() == OSRF_MESSAGE_TYPE_STATUS:
+        if status_code == osrf.const.OSRF_STATUS_OK:
+            # We have connected successfully
+            osrf.log.log_debug("Successfully connected to " + session.service)
+            session.state = OSRF_APP_SESSION_CONNECTED
+            return
 
-            status_code = int(message.payload().statusCode())
-            status_text = message.payload().status()
-            osrf.log.log_internal("handle_message(): processing STATUS, "
-                "status_code =  %d" % status_code)
+        if status_code == osrf.const.OSRF_STATUS_CONTINUE:
+            # server is telling us to reset our wait timeout and keep waiting for a response
+            session.reset_request_timeout(message.threadTrace())
+            return
 
-            if status_code == OSRF_STATUS_COMPLETE:
-                # The server has informed us that this request is complete
-                req = session.find_request(message.threadTrace())
-                if req: 
-                    osrf.log.log_internal("marking request as complete: %d" % req.rid)
-                    req.set_complete()
-                return
+        if status_code == osrf.const.OSRF_STATUS_TIMEOUT:
+            osrf.log.log_debug("The server did not receive a request from us in time...")
+            session.state = OSRF_APP_SESSION_DISCONNECTED
+            return
 
-            if status_code == OSRF_STATUS_OK:
-                # We have connected successfully
-                osrf.log.log_debug("Successfully connected to " + session.service)
-                session.state = OSRF_APP_SESSION_CONNECTED
-                return
+        if status_code == osrf.const.OSRF_STATUS_NOTFOUND:
+            osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
+            session.state = OSRF_APP_SESSION_DISCONNECTED
+            raise osrf.ex.OSRFServiceException(status_text)
 
-            if status_code == OSRF_STATUS_CONTINUE:
-                # server is telling us to reset our wait timeout and keep waiting for a response
-                session.reset_request_timeout(message.threadTrace())
-                return
+        if status_code == osrf.const.OSRF_STATUS_INTERNALSERVERERROR:
+            raise osrf.ex.OSRFServiceException("Server error %d : %s" % (status_code, status_text))
 
-            if status_code == OSRF_STATUS_TIMEOUT:
-                osrf.log.log_debug("The server did not receive a request from us in time...")
-                session.state = OSRF_APP_SESSION_DISCONNECTED
-                return
+        raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
 
-            if status_code == OSRF_STATUS_NOTFOUND:
-                osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
-                session.state = OSRF_APP_SESSION_DISCONNECTED
-                raise osrf.ex.OSRFServiceException(status_text)
 
-            raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
+def handle_server(session, message):
 
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST:
+        osrf.log.log_debug("server received REQUEST from %s" % session.remote_id)
+        osrf.app.Application.handle_request(session, message)
+        return
 
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
+        osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
+        session.state == osrf.const.OSRF_APP_SESSION_CONNECTED 
+        session.send_connect_ok(message.threadTrace())
+        return
+
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
+        osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id)
+        session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
+        return
+
+    if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
+        # Should never get here
+        osrf.log.log_warn("server received STATUS from %s" % session.remote_id)
+        return