--- /dev/null
+#!/usr/bin/python
+# -----------------------------------------------------------------------
+# Copyright (C) 2008 Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+
+import sys, getopt, os, signal
+import osrf.system, osrf.server, osrf.app
+
+def help():
+ print '''
+ Manage one or more an OpenSRF applications
+
+ Options:
+ -a <action>
+ start -- Start a service
+ stop -- stop a service
+ restart -- restart a service
+
+ -s <service>
+ The service name
+
+ -f <config file>
+ The OpenSRF config file
+
+ -c <config context>
+ The OpenSRF config file context
+
+ -p <PID dir>
+ The location of application PID files. Default is /tmp
+
+ -d
+ If set, run in daemon (background) mode
+ '''
+ sys.exit(0)
+
+
+# Parse the command line options
+ops, args = getopt.getopt(sys.argv[1:], 'a:s:f:c:p:d')
+options = dict(ops)
+
+if '-a' not in options or '-s' not in options or '-f' not in options:
+ help()
+
+action = options['-a']
+service = options['-s']
+config_file = options['-f']
+config_ctx = options.get('-c', 'opensrf')
+pid_dir = options.get('-p', '/tmp')
+as_daemon = '-d' in options
+pidfile = "%s/osrf_py_%s.pid" % (pid_dir, service)
+
+
+if action == 'start':
+
+ # connect to the OpenSRF network
+ osrf.system.System.net_connect(
+ config_file = config_file, config_context = config_ctx)
+
+ # XXX load the settings configs...
+ osrf.app.Application.load(service, 'osrf.apps.example') # XXX example only for now
+ osrf.app.Application.register_sysmethods()
+ osrf.app.Application.application.global_init()
+
+ controller = osrf.server.Controller(service)
+ controller.max_requests = 10
+ controller.max_children = 6
+ controller.min_children = 3
+
+ if as_daemon:
+ osrf.system.System.daemonize()
+ file = open(pidfile, 'w')
+ file.write(str(os.getpid()))
+ file.close()
+
+ controller.run()
+
+elif action == 'stop':
+ file = open(pidfile)
+ pid = file.read()
+ file.close()
+ os.kill(int(pid), signal.SIGTERM)
+ os.remove(pidfile)
+
+
--- /dev/null
+# -----------------------------------------------------------------------
+# Copyright (C) 2008 Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+
+import time
+import osrf.log, osrf.ses, osrf.json
+
+
+class Method(object):
+ def __init__(self, **kwargs):
+ self.name = kwargs['api_name']
+ self.handler = kwargs['method']
+ self.stream = kwargs.get('stream', False)
+ self.argc = kwargs.get('argc', 0)
+ self.atomic = kwargs.get('atomic', False)
+
+ def get_func(self):
+ ''' Returns the function handler reference '''
+ return getattr(Application.application, self.handler)
+
+ def get_doc(self):
+ ''' Returns the function documentation '''
+ return self.get_func().func_doc
+
+
+
+class Application(object):
+ ''' Base class for OpenSRF applications. Provides static methods
+ for loading and registering applications as well as common
+ applicatoin methods. '''
+
+ # global application handle
+ application = None
+ name = None
+ methods = {}
+
+ def __init__(self):
+ ''' Sets the application name and loads the application methods '''
+ self.name = None
+
+ def global_init(self):
+ ''' Override this method to run code at application startup '''
+ pass
+
+ def child_init(self):
+ ''' Override this method to run code at child startup.
+ This is useful for initializing database connections or
+ initializing other persistent resources '''
+ pass
+
+ def child_exit(self):
+ ''' Override this method to run code at process exit time.
+ This is useful for cleaning up resources like databaes
+ handles, etc. '''
+ pass
+
+
+ @staticmethod
+ def load(name, module_name):
+ ''' Loads the provided application module '''
+ Application.name = name
+ try:
+ osrf.log.log_info("Loading application module %s" % module_name)
+ exec('import %s' % module_name)
+ except Exception, e:
+ osrf.log.log_error("Error importing application module %s:%s" % (
+ module_name, unicode(e)))
+
+ @staticmethod
+ def register_app(app):
+ ''' Registers an application for use '''
+ app.name = Application.name
+ Application.application = app
+
+ @staticmethod
+ def register_method(**kwargs):
+ Application.methods[kwargs['api_name']] = Method(**kwargs)
+ if kwargs.get('stream'):
+ kwargs['atomic'] = 1
+ kwargs['api_name'] += '.atomic'
+ Application.methods[kwargs['api_name']] = Method(**kwargs)
+
+
+ @staticmethod
+ def handle_request(session, osrf_msg):
+ ''' Find the handler, construct the server request, then run the method '''
+
+ req_method = osrf_msg.payload()
+ params = req_method.params()
+ method = Application.methods[req_method.method()]
+ handler = method.get_func()
+
+ param_json = osrf.json.to_json(params)
+ param_json = param_json[1:len(param_json)-1]
+
+ osrf.log.log_info("CALL: %s %s %s" % (session.service, method.name, param_json))
+ server_req = osrf.ses.ServerRequest(session, osrf_msg.threadTrace(), method, params)
+
+ result = None
+ try:
+ result = handler(server_req, *params)
+ except Exception, e:
+ osrf.log.log_error("Error running method %s %s %s" % (method.name, param_json, unicode(e)))
+ session.send_status(
+ osrf_msg.threadTrace(),
+ osrf.net_obj.NetworkObject.osrfMethodException({
+ 'status' : unicode(e),
+ 'statusCode': osrf.const.OSRF_STATUS_INTERNALSERVERERROR
+ })
+ )
+ return
+
+ server_req.respond_complete(result)
+
+ @staticmethod
+ def register_sysmethods():
+ ''' Registers the global system methods '''
+
+ Application.register_method(
+ api_name = 'opensrf.system.time',
+ method = 'sysmethod_time',
+ argc = 0,
+ )
+
+ Application.register_method(
+ api_name = 'opensrf.system.introspect',
+ method = 'sysmethod_introspect',
+ argc = 0,
+ stream = True
+ )
+
+
+ def sysmethod_time(self, request):
+ '''@return type:number The current epoch time '''
+ return time.time()
+
+ def sysmethod_introspect(self, request, prefix=None):
+ ''' Generates a list of methods with method metadata
+ @param type:string The limiting method name prefix. If defined,
+ only methods matching the given prefix will be returned.
+ @return type:array List of method information '''
+
+ for name, method in self.methods.iteritems():
+ if prefix is not None and prefix != name[:len(prefix)]:
+ continue
+
+ request.respond({
+ 'api_name' : name,
+ 'method' : method.handler,
+ 'service' : self.name,
+ 'argc' : method.argc,
+ 'params' : [], # XXX parse me
+ 'desc' : method.get_doc() # XXX parse me
+
+ })
+
+
--- /dev/null
+# -----------------------------------------------------------------------
+# Copyright (C) 2008 Equinox Software, Inc.
+# Bill Erickson <erickson@esilibrary.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+#
+# You should have received a copy of the GNU General Public License
+# along with this program; if not, write to the Free Software
+# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
+# 02110-1301, USA
+# -----------------------------------------------------------------------
+import os
+import osrf.log
+from osrf.app import Application
+
+class Example(Application):
+ ''' Example OpenSRF application. '''
+
+ Application.register_method(
+ api_name = 'opensrf.py-example.reverse',
+ method = 'reverse',
+ argc = 1,
+ stream = True
+ )
+
+ def reverse(self, request, message=''):
+ ''' Returns the given string in reverse order one character at a time
+ @param string Message to reverse
+ @return string The reversed message, one character at a time. '''
+ idx = len(message) - 1
+ while idx >= 0:
+ request.respond(message[idx])
+ idx -= 1
+
+ def global_init(self):
+ osrf.log.log_debug("Running global init handler for %s" % __name__)
+
+ def child_init(self):
+ osrf.log.log_debug("Running child init handler for process %d" % os.getpid())
+
+ def child_exit(self):
+ osrf.log.log_debug("Running child exit handler for process %d" % os.getpid())
+
+Application.register_app(Example())
+
+
self.data = osrf.xml_obj.xml_file_to_object(self.file)
Config.config = self
- def getValue(self, key, idx=None):
+ def get_value(self, key, idx=None):
if self.context:
if re.search('/', key):
key = "%s/%s" % (self.context, key)
e.g. "domains.domain", "username"
idx -- Optional array index if the searched value is an array member
"""
- return Config.config.getValue(key, idx)
+ return Config.config.get_value(key, idx)
def get_no_ex(key, idx=None):
idx -- Optional array index if the searched value is an array member
"""
try:
- return Config.config.getValue(key, idx)
+ return Config.config.get_value(key, idx)
except:
return None
newobj[k] = encode_object(v)
return newobj
- else:
- if isinstance(obj, list):
- return [encode_object(v) for v in obj]
-
- else:
- if isinstance(obj, NetworkObject):
- reg = obj.get_registry()
- data = obj.get_data()
- if reg.protocol == 'array':
- objarray = []
- for key in reg.keys:
- objarray.append(data.get(key))
- data = objarray
-
- return {
- OSRF_JSON_CLASS_KEY: reg.hint,
- OSRF_JSON_PAYLOAD_KEY: encode_object(data)
- }
+ elif isinstance(obj, list):
+ return [encode_object(v) for v in obj]
+
+ elif isinstance(obj, NetworkObject):
+ reg = obj.get_registry()
+ data = obj.get_data()
+
+ if reg.protocol == 'array':
+ objarray = []
+ for key in reg.keys:
+ objarray.append(data.get(key))
+ data = objarray
+
+ return {
+ OSRF_JSON_CLASS_KEY: reg.hint,
+ OSRF_JSON_PAYLOAD_KEY: encode_object(data)
+ }
return obj
self.thread = message.get_thread()
self.recipient = message.get_to()
self.router_command = None
+ self.router_class = None
if message.xmlnode.hasProp('router_from') and \
message.xmlnode.prop('router_from') != '':
self.sender = message.xmlnode.prop('router_from')
self.body = args.get('body')
self.thread = args.get('thread')
self.router_command = args.get('router_command')
+ self.router_class = args.get('router_class')
@staticmethod
def from_xml(xml):
- doc=libxml2.parseDoc(xml)
+ doc = libxml2.parseDoc(xml)
msg = Message(doc.getRootElement())
return NetworkMessage(msg)
self.body, self.thread)
if self.router_command:
msg.xmlnode.newProp('router_command', self.router_command)
+ if self.router_class:
+ msg.xmlnode.newProp('router_class', self.router_class)
return msg
def to_xml(self):
# -----------------------------------------------------------------------
import os, sys, threading, logging, fcntl, socket, errno, signal, time
-import osrf.log, osrf.net, osrf.system, osrf.stack
+import osrf.log, osrf.conf, osrf.net, osrf.system, osrf.stack, osrf.app
# used to define the size of the PID/size leader in
'''
def __init__(self, service):
- self.service = service
+ self.service = service # service name
+ self.application = None
self.max_requests = 0 # max child requests
self.max_children = 0 # max num of child processes
self.min_childen = 0 # min num of child processes
self.child_idx = 0 # current index into the children array
self.children = [] # list of children
self.osrf_handle = None # xmpp handle
+ self.routers = [] # list of registered routers
# Global status socketpair. All children relay their
# availability info to the parent through this socketpair.
self.read_status, self.write_status = socket.socketpair()
+ def load_app(self):
+ settings = osrf.set.get('activeapps.%s' % self.service)
+
def cleanup(self):
''' Closes management sockets, kills children, reaps children, exits '''
+ osrf.log.log_info("Shutting down...")
+ self.cleanup_routers()
+
self.read_status.shutdown(socket.SHUT_RDWR)
self.write_status.shutdown(socket.SHUT_RDWR)
self.read_status.close()
# clear the recv callback so inbound messages do not filter through the opensrf stack
self.osrf_handle.receive_callback = None
+ # connect to our listening routers
+ self.register_routers()
+
try:
+ osrf.log.log_debug("entering main server loop...")
while True: # main server loop
self.reap_children()
child.run()
os._exit(0)
+ def register_routers(self):
+ ''' Registers this application instance with all configured routers '''
+ routers = osrf.conf.get('routers.router')
+
+ if not isinstance(routers, list):
+ routers = [routers]
+
+ for router in routers:
+ if isinstance(router, dict):
+ if not 'services' in router or \
+ self.service in router['services']['service']:
+ target = "%s@%s/router" % (router['name'], router['domain'])
+ self.register_router(target)
+ else:
+ router_name = osrf.conf.get('router_name')
+ target = "%s@%s/router" % (router_name, router)
+ self.register_router(target)
+
+
+ def register_router(self, target):
+ ''' Registers with a single router '''
+ osrf.log.log_info("registering with router %s" % target)
+ self.routers.append(target)
+
+ reg_msg = osrf.net.NetworkMessage(
+ recipient = target,
+ body = 'registering...',
+ router_command = 'register',
+ router_class = self.service
+ )
+
+ self.osrf_handle.send(reg_msg)
+
+ def cleanup_routers(self):
+ ''' Un-registers with all connected routers '''
+ for target in self.routers:
+ osrf.log.log_info("un-registering with router %s" % target)
+ unreg_msg = osrf.net.NetworkMessage(
+ recipient = target,
+ body = 'un-registering...',
+ router_command = 'unregister',
+ router_class = self.service
+ )
+ self.osrf_handle.send(unreg_msg)
+
+
class Child(object):
''' Models a single child process '''
''' Loops, processing data, until max_requests is reached '''
while True:
try:
- size = int(self.read_data.recv(SIZE_PAD))
+ size = int(self.read_data.recv(SIZE_PAD) or 0)
data = self.read_data.recv(size)
osrf.log.log_internal("recv'd data " + data)
osrf.stack.push(osrf.net.NetworkMessage.from_xml(data))
self.send_status()
except KeyboardInterrupt:
pass
+ # run the exit handler
+ osrf.app.Application.application.child_exit()
def send_status(self):
''' Informs the controller that we are done processing this request '''
''' Connects the opensrf xmpp handle '''
osrf.net.clear_network_handle()
osrf.system.System.net_connect(resource = '%s_drone' % self.controller.service)
-
+ osrf.app.Application.application.child_init()
+
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-import osrf.json
-import osrf.conf
-import osrf.log
-import osrf.net
-import osrf.net_obj
+import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
from osrf.const import OSRF_APP_SESSION_CONNECTED, \
OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
- OSRF_MESSAGE_TYPE_REQUEST
+ OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
import osrf.ex
import random, os, time, threading
self.state = OSRF_APP_SESSION_DISCONNECTED
self.remote_id = None
self.locale = None
+ self.thread = None
+ self.service = None
- def find_session(threadTrace):
- return Session.session_cache.get(threadTrace)
- find_session = staticmethod(find_session)
+ @staticmethod
+ def find_or_create(thread):
+ if thread in Session.session_cache:
+ return Session.session_cache[thread]
+ return ServerSession(thread)
+
+ def set_remote_id(self, remoteid):
+ self.remote_id = remoteid
+ osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
def wait(self, timeout=120):
"""Wait up to <timeout> seconds for data to arrive on the network"""
handle = osrf.net.get_network_handle()
handle.recv(timeout)
- def send(self, omessage):
+ def send(self, omessages):
"""Sends an OpenSRF message"""
+ if not isinstance(omessages, list):
+ omessages = [omessages]
+
net_msg = osrf.net.NetworkMessage(
recipient = self.remote_id,
- body = osrf.json.to_json([omessage]),
+ body = osrf.json.to_json(omessages),
thread = self.thread,
locale = self.locale,
)
# call superclass constructor
Session.__init__(self)
- # the remote service we want to make requests of
+ # the service we are sending requests to
self.service = service
# the locale we want requests to be returned in
self.reset_remote_id()
osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
- req = Request(self, self.next_id, method, arr, self.locale)
+ req = ClientRequest(self, self.next_id, method, arr, self.locale)
self.requests[str(self.next_id)] = req
self.next_id += 1
req.send()
self.state = OSRF_APP_SESSION_DISCONNECTED
- def set_remote_id(self, remoteid):
- self.remote_id = remoteid
- osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
def reset_remote_id(self):
"""Recovers the original remote id"""
class Request(object):
- """Represents a single OpenSRF request.
- A request is made and any resulting respones are
- collected for the client."""
-
def __init__(self, session, rid, method=None, params=[], locale='en-US'):
-
self.session = session # my session handle
self.rid = rid # my unique request ID
self.method = method # method name
self.params = params # my method params
+ self.locale = locale
+ self.complete = False # is this request done?
+ self.complete_time = 0 # time at which the request was completed
+
+
+class ClientRequest(Request):
+ """Represents a single OpenSRF request.
+ A request is made and any resulting respones are
+ collected for the client."""
+
+ def __init__(self, session, rid, method=None, params=[], locale='en-US'):
+ Request.__init__(self, session, rid, method, params, locale)
self.queue = [] # response queue
self.reset_timeout = False # resets the recv timeout?
- self.complete = False # has the server told us this request is done?
self.send_time = 0 # local time the request was put on the wire
- self.complete_time = 0 # time the server told us the request was completed
self.first_response_time = 0 # time it took for our first reponse to be received
- self.locale = locale
def send(self):
"""Sends a request message"""
orig_timeout = timeout
while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
+
s = time.time()
self.session.wait(timeout)
- if orig_timeout >= 0:
- timeout -= time.time() - s
+
if self.reset_timeout:
self.reset_timeout = False
timeout = orig_timeout
+ elif orig_timeout >= 0:
+ timeout -= time.time() - s
+
now = time.time()
# -----------------------------------------------------------------
class ServerSession(Session):
"""Implements a server-side session"""
- pass
+ def __init__(self, thread):
+ Session.__init__(self)
+ self.thread = thread
+
+ def send_status(self, thread_trace, payload):
+ self.send(
+ osrf.net_obj.NetworkObject.osrfMessage(
+ { 'threadTrace' : thread_trace,
+ 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
+ 'payload' : payload,
+ 'locale' : self.locale
+ }
+ )
+ )
+
+ def send_connect_ok(self, thread_trace):
+ status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'status' : 'Connection Successful',
+ 'statusCode': osrf.const.OSRF_STATUS_OK
+ })
+ self.send_status(thread_trace, status_msg)
+class ServerRequest(Request):
+ def __init__(self, session, rid, method, params=[]):
+ Request.__init__(self, session, rid, method, params, session.locale)
+ self.response_list = []
+
+ def _build_response_msg(self, data):
+ result = osrf.net_obj.NetworkObject.osrfResult({
+ 'content' : data,
+ 'statusCode' : osrf.const.OSRF_STATUS_OK,
+ 'status' : 'OK'
+ })
+
+ return osrf.net_obj.NetworkObject.osrfMessage({
+ 'threadTrace' : self.rid,
+ 'type' : OSRF_MESSAGE_TYPE_RESULT,
+ 'payload' : result,
+ 'locale' : self.locale
+ })
+
+ def _build_complete_msg(self):
+
+ status = osrf.net_obj.NetworkObject.osrfConnectStatus({
+ 'threadTrace' : self.rid,
+ 'status' : 'Request Complete',
+ 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
+ })
+
+ return osrf.net_obj.NetworkObject.osrfMessage({
+ 'threadTrace' : self.rid,
+ 'type' : OSRF_MESSAGE_TYPE_STATUS,
+ 'payload' : status,
+ 'locale' : self.locale
+ })
+
+ def respond(self, data):
+ ''' For non-atomic calls, this sends a response directly back
+ to the client. For atomic calls, this pushes the response
+ onto the response list '''
+ osrf.log.log_internal("responding with %s" % str(data))
+ if self.method.atomic:
+ self.response_list.append(data)
+ else:
+ self.session.send(self._build_response_msg(data))
+
+ def respond_complete(self, data):
+ ''' Sends a complete message accompanied by the final result if applicable '''
+
+ if self.complete:
+ return
+ self.copmlete = True
+ self.complete_time = time.time()
+
+ if self.method.atomic:
+ if data is not None:
+ self.response_list.append(data)
+ self.session.send([
+ self._build_response_msg(self.response_list),
+ self._build_complete_msg(),
+ ])
+
+ elif data is not None:
+ self.session.send([
+ self._build_response_msg(data),
+ self._build_complete_msg(),
+ ])
+
+ else:
+ self.session.send(self._build_complete_msg())
+
class MultiSession(object):
''' Manages multiple requests. With the current implementation, a 1 second
cont.id = len(self.reqs)
self.reqs.append(cont)
- def recv(self, timeout=10):
+ def recv(self, timeout=120):
''' Returns a tuple of req_id, response '''
duration = 0
block_time = 1
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-import osrf.json
-import osrf.log
-import osrf.ex
-import osrf.ses
-from osrf.const import OSRF_APP_SESSION_CONNECTED, \
- OSRF_APP_SESSION_DISCONNECTED, OSRF_MESSAGE_TYPE_RESULT, \
- OSRF_MESSAGE_TYPE_STATUS, OSRF_STATUS_COMPLETE, OSRF_STATUS_CONTINUE, \
- OSRF_STATUS_NOTFOUND, OSRF_STATUS_OK, OSRF_STATUS_TIMEOUT
import time
-
+import osrf.json, osrf.log, osrf.ex, osrf.ses, osrf.const, osrf.app
def push(net_msg):
- ses = osrf.ses.Session.find_session(net_msg.thread)
-
- if not ses:
- # This is an incoming request from a client, create a new server session
- osrf.log.log_error("server-side sessions don't exist yet")
- return
+ ses = osrf.ses.Session.find_or_create(net_msg.thread)
ses.set_remote_id(net_msg.sender)
+ if not ses.service:
+ ses.service = osrf.app.Application.name
omessages = osrf.json.to_object(net_msg.body)
- osrf.log.log_internal("push(): received %d messages" \
- % len(omessages))
+ osrf.log.log_internal("stack.push(): received %d messages" % len(omessages))
# Pass each bundled opensrf message to the message handler
start = time.time()
"type %s" % message.type())
if isinstance(session, osrf.ses.ClientSession):
+ handle_client(session, message)
+ else:
+ handle_server(session, message)
+
+
+def handle_client(session, message):
+
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
+ session.push_response_queue(message)
+ return
+
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
+
+ status_code = int(message.payload().statusCode())
+ status_text = message.payload().status()
+ osrf.log.log_internal("handle_message(): processing STATUS, "
+ "status_code = %d" % status_code)
- if message.type() == OSRF_MESSAGE_TYPE_RESULT:
- session.push_response_queue(message)
+ if status_code == osrf.const.OSRF_STATUS_COMPLETE:
+ # The server has informed us that this request is complete
+ req = session.find_request(message.threadTrace())
+ if req:
+ osrf.log.log_internal("marking request as complete: %d" % req.rid)
+ req.set_complete()
return
- if message.type() == OSRF_MESSAGE_TYPE_STATUS:
+ if status_code == osrf.const.OSRF_STATUS_OK:
+ # We have connected successfully
+ osrf.log.log_debug("Successfully connected to " + session.service)
+ session.state = OSRF_APP_SESSION_CONNECTED
+ return
- status_code = int(message.payload().statusCode())
- status_text = message.payload().status()
- osrf.log.log_internal("handle_message(): processing STATUS, "
- "status_code = %d" % status_code)
+ if status_code == osrf.const.OSRF_STATUS_CONTINUE:
+ # server is telling us to reset our wait timeout and keep waiting for a response
+ session.reset_request_timeout(message.threadTrace())
+ return
- if status_code == OSRF_STATUS_COMPLETE:
- # The server has informed us that this request is complete
- req = session.find_request(message.threadTrace())
- if req:
- osrf.log.log_internal("marking request as complete: %d" % req.rid)
- req.set_complete()
- return
+ if status_code == osrf.const.OSRF_STATUS_TIMEOUT:
+ osrf.log.log_debug("The server did not receive a request from us in time...")
+ session.state = OSRF_APP_SESSION_DISCONNECTED
+ return
- if status_code == OSRF_STATUS_OK:
- # We have connected successfully
- osrf.log.log_debug("Successfully connected to " + session.service)
- session.state = OSRF_APP_SESSION_CONNECTED
- return
+ if status_code == osrf.const.OSRF_STATUS_NOTFOUND:
+ osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
+ session.state = OSRF_APP_SESSION_DISCONNECTED
+ raise osrf.ex.OSRFServiceException(status_text)
- if status_code == OSRF_STATUS_CONTINUE:
- # server is telling us to reset our wait timeout and keep waiting for a response
- session.reset_request_timeout(message.threadTrace())
- return
+ if status_code == osrf.const.OSRF_STATUS_INTERNALSERVERERROR:
+ raise osrf.ex.OSRFServiceException("Server error %d : %s" % (status_code, status_text))
- if status_code == OSRF_STATUS_TIMEOUT:
- osrf.log.log_debug("The server did not receive a request from us in time...")
- session.state = OSRF_APP_SESSION_DISCONNECTED
- return
+ raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
- if status_code == OSRF_STATUS_NOTFOUND:
- osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
- session.state = OSRF_APP_SESSION_DISCONNECTED
- raise osrf.ex.OSRFServiceException(status_text)
- raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
+def handle_server(session, message):
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST:
+ osrf.log.log_debug("server received REQUEST from %s" % session.remote_id)
+ osrf.app.Application.handle_request(session, message)
+ return
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
+ osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
+ session.state == osrf.const.OSRF_APP_SESSION_CONNECTED
+ session.send_connect_ok(message.threadTrace())
+ return
+
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
+ osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id)
+ session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
+ return
+
+ if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
+ # Should never get here
+ osrf.log.log_warn("server received STATUS from %s" % session.remote_id)
+ return