From 2505765bbcf80cb35c553c976f99db2e15de9a26 Mon Sep 17 00:00:00 2001 From: erickson Date: Sun, 19 Aug 2007 01:16:02 +0000 Subject: [PATCH] added support for multi-threaded client interactions. much like the java lib, each thread is allowed 1 jabber connection, as opposed to 1 process-wide jabber connection. also did some re-tabbing to force 4-space tabs (not raw tabs) - more of those to come git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1081 9efc2488-bf62-4759-914b-345cdb29e865 --- src/python/osrf/json.py | 2 +- src/python/osrf/log.py | 3 + src/python/osrf/net.py | 211 ++++++++++++++-------------- src/python/osrf/ses.py | 22 +-- src/python/osrf/stack.py | 4 +- src/python/osrf/system.py | 2 + src/python/srfsh.py | 342 +++++++++++++++++++++++----------------------- 7 files changed, 295 insertions(+), 291 deletions(-) diff --git a/src/python/osrf/json.py b/src/python/osrf/json.py index a972b09..98746e6 100644 --- a/src/python/osrf/json.py +++ b/src/python/osrf/json.py @@ -92,7 +92,7 @@ def osrfFormatJSON(json): for c in json: - if eatws: # simpljson adds a pesky after array and object items + if eatws: # simpljson adds a pesky space after array and object items if c == ' ': continue diff --git a/src/python/osrf/log.py b/src/python/osrf/log.py index d87dd5a..9ba9a12 100644 --- a/src/python/osrf/log.py +++ b/src/python/osrf/log.py @@ -68,6 +68,9 @@ def __osrfLog(level, msg): if level == OSRF_LOG_WARN: lvl = 'WARN'; slvl=syslog.LOG_WARNING if level == OSRF_LOG_ERR: lvl = 'ERR '; slvl=syslog.LOG_ERR + + # XXX when file logging is implemented, wrap io in a semaphore for thread safety + file = frgx.sub('',tb[0]) msg = '[%s:%d:%s:%s] %s' % (lvl, os.getpid(), file, tb[1], msg) syslog.syslog(slvl, msg) diff --git a/src/python/osrf/net.py b/src/python/osrf/net.py index db0b131..7db4fe7 100644 --- a/src/python/osrf/net.py +++ b/src/python/osrf/net.py @@ -19,129 +19,128 @@ from pyxmpp.message import Message from pyxmpp.jid import JID from socket import gethostname from osrf.log import * -import os, time +import os, time, threading import logging +threadSessions = {} + # - log jabber activity (for future reference) #logger=logging.getLogger() #logger.addHandler(logging.StreamHandler()) #logger.addHandler(logging.FileHandler('j.log')) #logger.setLevel(logging.DEBUG) -__network = None def osrfSetNetworkHandle(handle): - """Sets the global network connection handle.""" - global __network - __network = handle + """ Sets the thread-specific network handle""" + threadSessions[threading.currentThread().getName()] = handle def osrfGetNetworkHandle(): - """Returns the global network connection handle.""" - global __network - return __network + """ Returns the thread-specific network connection handle.""" + return threadSessions.get(threading.currentThread().getName()) class osrfNetworkMessage(object): - """Network message - - attributes: - - sender - message sender - to - message recipient - body - the body of the message - thread - the message thread - """ - - def __init__(self, message=None, **args): - if message: - self.body = message.get_body() - self.thread = message.get_thread() - self.to = message.get_to() - if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '': - self.sender = message.xmlnode.prop('router_from') - else: self.sender = message.get_from().as_utf8() - else: - if args.has_key('sender'): self.sender = args['sender'] - if args.has_key('to'): self.to = args['to'] - if args.has_key('body'): self.body = args['body'] - if args.has_key('thread'): self.thread = args['thread'] + """Network message + + attributes: + + sender - message sender + to - message recipient + body - the body of the message + thread - the message thread + """ + + def __init__(self, message=None, **args): + if message: + self.body = message.get_body() + self.thread = message.get_thread() + self.to = message.get_to() + if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '': + self.sender = message.xmlnode.prop('router_from') + else: self.sender = message.get_from().as_utf8() + else: + if args.has_key('sender'): self.sender = args['sender'] + if args.has_key('to'): self.to = args['to'] + if args.has_key('body'): self.body = args['body'] + if args.has_key('thread'): self.thread = args['thread'] class osrfNetwork(JabberClient): - def __init__(self, **args): - self.isconnected = False - - # Create a unique jabber resource - resource = 'osrf_client' - if args.has_key('resource'): - resource = args['resource'] - resource += '_' + gethostname()+':'+ str(os.getpid()) - self.jid = JID(args['username'], args['host'], resource) - - osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" % - (self.jid.as_utf8(), args['host'], args['port'], args['username'])) - - #initialize the superclass - JabberClient.__init__(self, self.jid, args['password'], args['host']) - self.queue = [] - - def connect(self): - JabberClient.connect(self) - while not self.isconnected: - stream = self.get_stream() - act = stream.loop_iter(10) - if not act: self.idle() - - def setRecvCallback(self, func): - """The callback provided is called when a message is received. - - The only argument to the function is the received message. """ - self.recvCallback = func - - def session_started(self): - osrfLogInfo("Successfully connected to the opensrf network") - self.authenticated() - self.stream.set_message_handler("normal",self.message_received) - self.isconnected = True - - def send(self, message): - """Sends the provided network message.""" - osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body)) - msg = Message(None, None, message.to, None, None, None, message.body, message.thread) - self.stream.send(msg) - - def message_received(self, stanza): - """Handler for received messages.""" - osrfLogInternal("jabber received a message of type %s" % stanza.get_type()) - if stanza.get_type()=="headline": - return True - # check for errors - osrfLogInternal("jabber received message from %s : %s" - % (stanza.get_from().as_utf8(), stanza.get_body())) - self.queue.append(osrfNetworkMessage(stanza)) - return True - - def recv(self, timeout=120): - """Attempts to receive a message from the network. - - timeout - max number of seconds to wait for a message. - If no message is received in 'timeout' seconds, None is returned. """ - - msg = None - if len(self.queue) == 0: - while timeout >= 0 and len(self.queue) == 0: - starttime = time.time() - osrfLogInternal("going into stream loop at " + str(starttime)) - act = self.get_stream().loop_iter(timeout) - endtime = time.time() - starttime - timeout -= endtime - osrfLogInternal("exiting stream loop after %s seconds" % str(endtime)) - osrfLogInternal("act = %s : queue length = %d" % (act, len(self.queue)) ) - if not act: self.idle() - - # if we've acquired a message, handle it - if len(self.queue) > 0: - self.recvCallback(self.queue.pop(0)) - return None + def __init__(self, **args): + self.isconnected = False + + # Create a unique jabber resource + resource = 'python_' + if args.has_key('resource'): + resource = args['resource'] + resource += '_' + gethostname()+':'+ str(os.getpid()) + '_'+ threading.currentThread().getName().lower() + self.jid = JID(args['username'], args['host'], resource) + + osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" % + (self.jid.as_utf8(), args['host'], args['port'], args['username'])) + + #initialize the superclass + JabberClient.__init__(self, self.jid, args['password'], args['host']) + self.queue = [] + + def connect(self): + JabberClient.connect(self) + while not self.isconnected: + stream = self.get_stream() + act = stream.loop_iter(10) + if not act: self.idle() + + def setRecvCallback(self, func): + """The callback provided is called when a message is received. + + The only argument to the function is the received message. """ + self.recvCallback = func + + def session_started(self): + osrfLogInfo("Successfully connected to the opensrf network") + self.authenticated() + self.stream.set_message_handler("normal",self.message_received) + self.isconnected = True + + def send(self, message): + """Sends the provided network message.""" + osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body)) + msg = Message(None, None, message.to, None, None, None, message.body, message.thread) + self.stream.send(msg) + + def message_received(self, stanza): + """Handler for received messages.""" + osrfLogInternal("jabber received a message of type %s" % stanza.get_type()) + if stanza.get_type()=="headline": + return True + # check for errors + osrfLogInternal("jabber received message from %s : %s" + % (stanza.get_from().as_utf8(), stanza.get_body())) + self.queue.append(osrfNetworkMessage(stanza)) + return True + + def recv(self, timeout=120): + """Attempts to receive a message from the network. + + timeout - max number of seconds to wait for a message. + If no message is received in 'timeout' seconds, None is returned. """ + + msg = None + if len(self.queue) == 0: + while timeout >= 0 and len(self.queue) == 0: + starttime = time.time() + osrfLogInternal("going into stream loop at " + str(starttime)) + act = self.get_stream().loop_iter(timeout) + endtime = time.time() - starttime + timeout -= endtime + osrfLogInternal("exiting stream loop after %s seconds" % str(endtime)) + osrfLogInternal("act = %s : queue length = %d" % (act, len(self.queue)) ) + if not act: self.idle() + + # if we've acquired a message, handle it + if len(self.queue) > 0: + self.recvCallback(self.queue.pop(0)) + return None diff --git a/src/python/osrf/ses.py b/src/python/osrf/ses.py index 462ae08..5b9c422 100644 --- a/src/python/osrf/ses.py +++ b/src/python/osrf/ses.py @@ -19,7 +19,7 @@ from osrf.conf import osrfConfigValue from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle from osrf.log import * from osrf.const import * -import random, sys, os, time +import random, sys, os, time, threading # ----------------------------------------------------------------------- @@ -35,10 +35,16 @@ osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash') class osrfSession(object): """Abstract session superclass.""" + ''' Global cache of in-service sessions ''' + sessionCache = {} + def __init__(self): # by default, we're connected to no one self.state = OSRF_APP_SESSION_DISCONNECTED + def findSession(threadTrace): + return osrfSession.sessionCache.get(threadTrace) + findSession = staticmethod(findSession) def wait(self, timeout=120): """Wait up to seconds for data to arrive on the network""" @@ -58,7 +64,7 @@ class osrfSession(object): def cleanup(self): """Removes the session from the global session cache.""" - del osrfClientSession.sessionCache[self.thread] + del osrfSession.sessionCache[self.thread] class osrfClientSession(osrfSession): """Client session object. Use this to make server requests.""" @@ -78,7 +84,8 @@ class osrfClientSession(osrfSession): self.origRemoteId = self.remoteId # generate a random message thread - self.thread = "%s%s%s" % (os.getpid(), str(random.randint(100,100000)), str(time.time())) + self.thread = "%s%s%s%s" % (os.getpid(), + str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower()) # how many requests this session has taken part in self.nextId = 0 @@ -87,7 +94,7 @@ class osrfClientSession(osrfSession): self.requests = {} # cache this session in the global session cache - osrfClientSession.sessionCache[self.thread] = self + osrfSession.sessionCache[self.thread] = self def resetRequestTimeout(self, rid): req = self.findRequest(rid) @@ -190,13 +197,6 @@ class osrfClientSession(osrfSession): -osrfSession.sessionCache = {} -def osrfFindSession(thread): - """Finds a session in the global cache.""" - try: - return osrfClientSession.sessionCache[thread] - except: return None - class osrfRequest(object): """Represents a single OpenSRF request. A request is made and any resulting respones are diff --git a/src/python/osrf/stack.py b/src/python/osrf/stack.py index 0bf4892..4002812 100644 --- a/src/python/osrf/stack.py +++ b/src/python/osrf/stack.py @@ -16,13 +16,13 @@ from osrf.json import * from osrf.log import * from osrf.ex import * -from osrf.ses import osrfFindSession, osrfClientSession, osrfServerSession +from osrf.ses import osrfSession, osrfClientSession, osrfServerSession from osrf.const import * from time import time def osrfPushStack(netMessage): - ses = osrfFindSession(netMessage.thread) + ses = osrfSession.findSession(netMessage.thread) if not ses: # This is an incoming request from a client, create a new server session diff --git a/src/python/osrf/system.py b/src/python/osrf/system.py index bfc5463..94ad907 100644 --- a/src/python/osrf/system.py +++ b/src/python/osrf/system.py @@ -49,3 +49,5 @@ def osrfConnect(configFile, configContext): + + diff --git a/src/python/srfsh.py b/src/python/srfsh.py index bb058b1..3c9cff9 100755 --- a/src/python/srfsh.py +++ b/src/python/srfsh.py @@ -11,73 +11,73 @@ from osrf.conf import osrfConfigValue # main listen loop # ------------------------------------------------------------------- def do_loop(): - while True: + while True: - try: - #line = raw_input("srfsh% ") - line = raw_input("\033[01;32msrfsh\033[01;34m% \033[00m") - if not len(line): - continue - if lower(line) == 'exit' or lower(line) == 'quit': - break - parts = split(line) + try: + #line = raw_input("srfsh% ") + line = raw_input("\033[01;32msrfsh\033[01;34m% \033[00m") + if not len(line): + continue + if lower(line) == 'exit' or lower(line) == 'quit': + break + parts = split(line) - command = parts[0] - - if command == 'request': - parts.pop(0) - handle_request(parts) - continue + command = parts[0] + + if command == 'request': + parts.pop(0) + handle_request(parts) + continue - if command == 'math_bench': - parts.pop(0) - handle_math_bench(parts) - continue + if command == 'math_bench': + parts.pop(0) + handle_math_bench(parts) + continue - if command == 'help': - handle_help() - continue + if command == 'help': + handle_help() + continue - if command == 'set': - parts.pop(0) - handle_set(parts) + if command == 'set': + parts.pop(0) + handle_set(parts) - if command == 'get': - parts.pop(0) - handle_get(parts) + if command == 'get': + parts.pop(0) + handle_get(parts) - except KeyboardInterrupt: - print "" + except KeyboardInterrupt: + print "" - except EOFError: - print "exiting..." - sys.exit(0) + except EOFError: + print "exiting..." + sys.exit(0) # ------------------------------------------------------------------- # Set env variables to control behavior # ------------------------------------------------------------------- def handle_set(parts): - m = re.compile('(.*)=(.*)').match(parts[0]) - key = m.group(1) - val = m.group(2) - set_var(key, val) - print "%s = %s" % (key, val) + m = re.compile('(.*)=(.*)').match(parts[0]) + key = m.group(1) + val = m.group(2) + set_var(key, val) + print "%s = %s" % (key, val) def handle_get(parts): - try: - print get_var(parts[0]) - except: - print "" + try: + print get_var(parts[0]) + except: + print "" # ------------------------------------------------------------------- # Prints help info # ------------------------------------------------------------------- def handle_help(): - print """ + print """ help - show this menu @@ -93,88 +93,88 @@ def handle_help(): Environment variables: SRFSH_OUTPUT = pretty - print pretty JSON and key/value pairs for network objects = raw - print formatted JSON - """ + """ - + # ------------------------------------------------------------------- # performs an opesnrf request # ------------------------------------------------------------------- def handle_request(parts): - service = parts.pop(0) - method = parts.pop(0) - jstr = '[%s]' % join(parts) - params = None + service = parts.pop(0) + method = parts.pop(0) + jstr = '[%s]' % join(parts) + params = None - try: - params = osrfJSONToObject(jstr) - except: - print "Error parsing JSON: %s" % jstr - return + try: + params = osrfJSONToObject(jstr) + except: + print "Error parsing JSON: %s" % jstr + return - ses = osrfClientSession(service) + ses = osrfClientSession(service) - end = None - start = time.time() + end = None + start = time.time() - req = ses.request2(method, tuple(params)) + req = ses.request2(method, tuple(params)) - while True: - resp = req.recv(timeout=120) - if not end: - total = time.time() - start - if not resp: break + while True: + resp = req.recv(timeout=120) + if not end: + total = time.time() - start + if not resp: break - otp = get_var('SRFSH_OUTPUT') - if otp == 'pretty': - print "\n" + osrfDebugNetworkObject(resp.content()) - else: - print osrfFormatJSON(osrfObjectToJSON(resp.content())) + otp = get_var('SRFSH_OUTPUT') + if otp == 'pretty': + print "\n" + osrfDebugNetworkObject(resp.content()) + else: + print osrfFormatJSON(osrfObjectToJSON(resp.content())) - req.cleanup() - ses.cleanup() + req.cleanup() + ses.cleanup() - print '-'*60 - print "Total request time: %f" % total - print '-'*60 + print '-'*60 + print "Total request time: %f" % total + print '-'*60 def handle_math_bench(parts): - count = int(parts.pop(0)) - ses = osrfClientSession('opensrf.math') - times = [] - - for i in range(100): - if i % 10: sys.stdout.write('.') - else: sys.stdout.write( str( i / 10 ) ) - print ""; - - - for i in range(count): - - starttime = time.time() - req = ses.request('add', 1, 2) - resp = req.recv(timeout=2) - endtime = time.time() - - if resp.content() == 3: - sys.stdout.write("+") - sys.stdout.flush() - times.append( endtime - starttime ) - else: - print "What happened? %s" % str(resp.content()) - - req.cleanup() - if not ( (i+1) % 100): - print ' [%d]' % (i+1) - - ses.cleanup() - total = 0 - for i in times: total += i - print "\naverage time %f" % (total / len(times)) + count = int(parts.pop(0)) + ses = osrfClientSession('opensrf.math') + times = [] + + for i in range(100): + if i % 10: sys.stdout.write('.') + else: sys.stdout.write( str( i / 10 ) ) + print ""; + + + for i in range(count): + + starttime = time.time() + req = ses.request('add', 1, 2) + resp = req.recv(timeout=2) + endtime = time.time() + + if resp.content() == 3: + sys.stdout.write("+") + sys.stdout.flush() + times.append( endtime - starttime ) + else: + print "What happened? %s" % str(resp.content()) + + req.cleanup() + if not ( (i+1) % 100): + print ' [%d]' % (i+1) + + ses.cleanup() + total = 0 + for i in times: total += i + print "\naverage time %f" % (total / len(times)) @@ -183,87 +183,87 @@ def handle_math_bench(parts): # Defines the tab-completion handling and sets up the readline history # ------------------------------------------------------------------- def setup_readline(): - class SrfshCompleter(object): - def __init__(self, words): - self.words = words - self.prefix = None - - def complete(self, prefix, index): - if prefix != self.prefix: - # find all words that start with this prefix - self.matching_words = [ - w for w in self.words if w.startswith(prefix) - ] - self.prefix = prefix - try: - return self.matching_words[index] - except IndexError: - return None - - words = 'request', 'help', 'exit', 'quit', 'opensrf.settings', 'opensrf.math', 'set' - completer = SrfshCompleter(words) - readline.parse_and_bind("tab: complete") - readline.set_completer(completer.complete) - - histfile = os.path.join(get_var('HOME'), ".srfsh_history") - try: - readline.read_history_file(histfile) - except IOError: - pass - atexit.register(readline.write_history_file, histfile) + class SrfshCompleter(object): + def __init__(self, words): + self.words = words + self.prefix = None + + def complete(self, prefix, index): + if prefix != self.prefix: + # find all words that start with this prefix + self.matching_words = [ + w for w in self.words if w.startswith(prefix) + ] + self.prefix = prefix + try: + return self.matching_words[index] + except IndexError: + return None + + words = 'request', 'help', 'exit', 'quit', 'opensrf.settings', 'opensrf.math', 'set' + completer = SrfshCompleter(words) + readline.parse_and_bind("tab: complete") + readline.set_completer(completer.complete) + + histfile = os.path.join(get_var('HOME'), ".srfsh_history") + try: + readline.read_history_file(histfile) + except IOError: + pass + atexit.register(readline.write_history_file, histfile) def do_connect(): - file = os.path.join(get_var('HOME'), ".srfsh.xml") - print_green("Connecting to opensrf...") - osrfConnect(file, 'srfsh') - print_red('OK\n') + file = os.path.join(get_var('HOME'), ".srfsh.xml") + print_green("Connecting to opensrf...") + osrfConnect(file, 'srfsh') + print_red('OK\n') def load_plugins(): - # Load the user defined external plugins - # XXX Make this a real module interface, with tab-complete words, commands, etc. - plugins = osrfConfigValue('plugins') - plugins = osrfConfigValue('plugins.plugin') - if not isinstance(plugins, list): - plugins = [plugins] - - for module in plugins: - name = module['module'] - init = module['init'] - print_green("Loading module %s..." % name) - - try: - str = 'from %s import %s\n%s()' % (name, init, init) - exec(str) - print_red('OK\n') - - except Exception, e: - sys.stderr.write("\nError importing plugin %s, with init symbol %s: \n%s\n" % (name, init, e)) + # Load the user defined external plugins + # XXX Make this a real module interface, with tab-complete words, commands, etc. + plugins = osrfConfigValue('plugins') + plugins = osrfConfigValue('plugins.plugin') + if not isinstance(plugins, list): + plugins = [plugins] + + for module in plugins: + name = module['module'] + init = module['init'] + print_green("Loading module %s..." % name) + + try: + str = 'from %s import %s\n%s()' % (name, init, init) + exec(str) + print_red('OK\n') + + except Exception, e: + sys.stderr.write("\nError importing plugin %s, with init symbol %s: \n%s\n" % (name, init, e)) def set_vars(): - if not get_var('SRFSH_OUTPUT'): - set_var('SRFSH_OUTPUT', 'pretty') + if not get_var('SRFSH_OUTPUT'): + set_var('SRFSH_OUTPUT', 'pretty') def set_var(key, val): - os.environ[key] = val + os.environ[key] = val def get_var(key): - try: return os.environ[key] - except: return '' - - + try: return os.environ[key] + except: return '' + + def print_green(str): - sys.stdout.write("\033[01;32m") - sys.stdout.write(str) - sys.stdout.write("\033[00m") - sys.stdout.flush() + sys.stdout.write("\033[01;32m") + sys.stdout.write(str) + sys.stdout.write("\033[00m") + sys.stdout.flush() def print_red(str): - sys.stdout.write("\033[01;31m") - sys.stdout.write(str) - sys.stdout.write("\033[00m") - sys.stdout.flush() + sys.stdout.write("\033[01;31m") + sys.stdout.write(str) + sys.stdout.write("\033[00m") + sys.stdout.flush() -- 2.11.0