import memcache
-from osrf.json import osrfObjectToJSON, osrfJSONToObject
-from osrf.log import *
+from osrf.json import to_json, to_object
+import osrf.log
'''
Abstracted OpenSRF caching interface.
self.client = memcache.Client(server, debug=1)
else:
if not _client:
- raise CacheException("not connected to any memcache servers. try CacheClient.connect(servers)")
+ raise CacheException(
+ "not connected to any memcache servers."
+ "try CacheClient.connect(servers)"
+ )
self.client = _client
def put(self, key, val, timeout=None):
global defaultTimeout
if timeout is None:
timeout = defaultTimeout
- s = osrfObjectToJSON(val)
- osrfLogInternal("cache: %s => %s" % (str(key), s))
- return self.client.set(str(key), s, timeout)
+ json = to_json(val)
+ osrf.log.osrfLogInternal("cache: %s => %s" % (str(key), json))
+ return self.client.set(str(key), json, timeout)
def get(self, key):
- o = self.client.get(str(key))
- osrfLogInternal("cache: fetching %s => %s" % (str(key), o))
- return osrfJSONToObject(o or "null")
+ obj = self.client.get(str(key))
+ osrf.log.osrfLogInternal("cache: fetching %s => %s" % (str(key), obj))
+ return to_object(obj or "null")
def delete(self, key):
- osrfLogInternal("cache: deleting %s" % str(key))
+ osrf.log.osrfLogInternal("cache: deleting %s" % str(key))
self.client.delete(str(key))
@staticmethod
def connect(svrs):
global _client
- osrfLogDebug("cache: connecting to servers %s" % str(svrs))
+ osrf.log.logDebug("cache: connecting to servers %s" % str(svrs))
_client = memcache.Client(svrs, debug=1)
# -----------------------------------------------------------------------
-from osrf.utils import *
-from osrf.ex import *
+import osrf.net_obj
+import osrf.ex
+import osrf.xml_obj
import re
-class osrfConfig(object):
+class Config(object):
"""Loads and parses the bootstrap config file"""
config = None
#def parseConfig(self,file=None):
def parseConfig(self):
- self.data = osrfXMLFileToObject(self.file)
- osrfConfig.config = self
+ self.data = osrf.xml_obj.xml_file_to_object(self.file)
+ Config.config = self
def getValue(self, key, idx=None):
if self.context:
else:
key = "%s.%s" % (self.context, key)
- val = osrfObjectFindPath(self.data, key, idx)
+ val = osrf.net_obj.find_object_path(self.data, key, idx)
if not val:
- raise osrfConfigException("Config value not found: " + key)
+ raise osrf.ex.OSRFConfigException("Config value not found: " + key)
return val
-def osrfConfigValue(key, idx=None):
+def get(key, idx=None):
"""Returns a bootstrap config value.
key -- A string representing the path to the value in the config object
e.g. "domains.domain", "username"
idx -- Optional array index if the searched value is an array member
"""
- return osrfConfig.config.getValue(key, idx)
+ return Config.config.getValue(key, idx)
-def osrfConfigValueNoEx(key, idx=None):
+def get_no_ex(key, idx=None):
""" Returns a bootstrap config value without throwing an exception
if the item is not found.
idx -- Optional array index if the searched value is an array member
"""
try:
- return osrfConfig.config.getValue(key, idx)
+ return Config.config.getValue(key, idx)
except:
return None
# exception is little more than a name.
# -----------------------------------------------------------------------
-class osrfException(Exception):
- """Root class for exceptions."""
- def __init__(self, info=None):
- self.info = info;
- def __str__(self):
- return self.info
-
-
-class osrfNetworkException(osrfException):
- def __str__(self):
- str = "\nUnable to communicate with the OpenSRF network"
- if self.info:
- str = str + '\n' + repr(self.info)
- return str
-
-class osrfProtocolException(osrfException):
- """Raised when something happens during opensrf network stack processing."""
- pass
-
-class osrfServiceException(osrfException):
- """Raised when there was an error communicating with a remote service."""
- pass
-
-class osrfConfigException(osrfException):
- """Invalid config option requested."""
- pass
-
-class osrfNetworkObjectException(osrfException):
- pass
-
-class osrfJSONParseException(osrfException):
- """Raised when a JSON parsing error occurs."""
- pass
+class OSRFException(Exception):
+ """Root class for exceptions."""
+ def __init__(self, info=None):
+ self.info = info;
+ def __str__(self):
+ return self.info
+
+
+class NetworkException(OSRFException):
+ def __str__(self):
+ str = "\nUnable to communicate with the OpenSRF network"
+ if self.info:
+ str = str + '\n' + repr(self.info)
+ return str
+
+class OSRFProtocolException(OSRFException):
+ """Raised when something happens during opensrf network stack processing."""
+ pass
+
+class OSRFServiceException(OSRFException):
+ """Raised when there was an error communicating with a remote service."""
+ pass
+
+class OSRFConfigException(OSRFException):
+ """Invalid config option requested."""
+ pass
+
+class OSRFNetworkObjectException(OSRFException):
+ pass
+
+class OSRFJSONParseException(OSRFException):
+ """Raised when a JSON parsing error occurs."""
+ pass
from xml.dom import minidom
from xml.sax import handler, make_parser, saxutils
-from osrf.json import *
-from osrf.net_obj import *
-from osrf.log import *
+from osrf.json import to_object
+from osrf.net_obj import NetworkObject, new_object_from_hint
+from osrf.log import logError
import urllib, urllib2, sys, re
defaultHost = None
def handleResponse(self, response):
s = response.read()
- obj = osrfJSONToObject(s)
+ obj = to_object(s)
if obj['status'] != 200:
sys.stderr.write('JSON gateway returned status %d:\n%s\n' % (obj['status'], s))
return None
return p[0]
def encodeParam(self, param):
- return osrfObjectToJSON(param)
+ return osrf.json.to_json(param)
class XMLGatewayRequest(GatewayRequest):
try:
parser.parse(response)
except Exception, e:
- osrfLogErr('Error parsing gateway XML: %s' % unicode(e))
+ logError('Error parsing gateway XML: %s' % unicode(e))
return None
return handler.getResult()
def encodeParam(self, param):
- return osrfObjectToXML(param);
+ return osrf.net_obj.to_xml(param);
class XMLGatewayParser(handler.ContentHandler):
hint = self.__getAttr(attrs, 'class_hint')
if hint:
- obj = osrfNewObjectFromHint(hint)
+ obj = new_object_from_hint(hint)
self.appendChild(obj)
self.objStack.append(obj)
if name == 'array':
if isinstance(parent, dict):
parent[self.keyStack.pop()] = child
else:
- if isinstance(parent, osrfNetworkObject):
+ if isinstance(parent, NetworkObject):
key = None
- if parent.getRegistry().wireProtocol == 'array':
- keys = parent.getRegistry().keys
+ if parent.get_registry().protocol == 'array':
+ keys = parent.get_registry().keys
i = self.posStack.pop()
key = keys[i]
if i+1 < len(keys):
else:
key = self.keyStack.pop()
- parent.setField(key, child)
+ parent.set_field(key, child)
def endElement(self, name):
if name == 'array' or name == 'object':
from mod_python import apache, util
import osrf.cache
-from osrf.system import osrfConnect
-from osrf.json import osrfJSONToObject
-from osrf.conf import osrfConfigValue
-from osrf.set import osrfSettingsValue
+import osrf.system
+import osrf.json
+import osrf.conf
+import osrf.set
+import sys
from osrf.const import *
-from osrf.net import *
-from osrf.log import *
+from osrf.net import get_network_handle
+import osrf.log
'''
OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
-JSON_CONTENT_TYPE = 'text/plain';
+JSON_CONTENT_TYPE = 'text/plain'
CACHE_TIME = 300
ROUTER_NAME = None
OSRF_DOMAIN = None
-# If true, all data sent to the client is also written to stderr (apache error log)
+# If DEBUG_WRITE = True, all data sent to the client is also written
+# to stderr (apache error log)
DEBUG_WRITE = False
-def _dbg(s):
+def _dbg(msg):
''' testing only '''
- sys.stderr.write("%s\n\n" % str(s))
+ sys.stderr.write("%s\n\n" % str(msg))
sys.stderr.flush()
-initComplete = False
-def childInit(req):
- ''' At time of writing, mod_python doesn't support a childInit handler,
+INIT_COMPLETE = False
+def child_init(req):
+ ''' At time of writing, mod_python doesn't support a child_init handler,
so this function is called once per process to initialize
the opensrf connection '''
- global initComplete, ROUTER_NAME, OSRF_DOMAIN
- if initComplete:
+ global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
+ if INIT_COMPLETE:
return
ops = req.get_options()
conf = ops['OSRF_CONFIG']
ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
- osrfConnect(conf, ctxt)
+ osrf.system.connect(conf, ctxt)
- ROUTER_NAME = osrfConfigValue('router_name')
- OSRF_DOMAIN = osrfConfigValue('domains.domain')
- initComplete = True
+ ROUTER_NAME = osrf.conf.get('router_name')
+ OSRF_DOMAIN = osrf.conf.get('domains.domain')
+ INIT_COMPLETE = True
- servers = osrfSettingsValue('cache.global.servers.server')
+ servers = osrf.set.get('cache.global.servers.server')
if not isinstance(servers, list):
servers = [servers]
osrf.cache.CacheClient.connect(servers)
def handler(req):
''' Create the translator and tell it to process the request. '''
- childInit(req)
+ child_init(req)
return HTTPTranslator(req).process()
class HTTPTranslator(object):
self.messages = []
self.complete = False
- self.handle = osrfGetNetworkHandle()
- self.handle.setRecvCallback(None)
+ self.handle = osrf.net.get_network_handle()
+ self.handle.set_receive_callback(None)
- self.to = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
+ self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
- self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or "%s%s" % (os.getpid(), time.time())
+ self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
+ "%s%s" % (os.getpid(), time.time())
self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
- self.multipart = str(apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
- self.disconnectOnly = False
+ self.multipart = str( \
+ apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
+ self.disconnect_only = False
# generate a random multipart delimiter
- m = md5.new()
- m.update("%f%d%d" % (time.time(), os.getpid(), random.randint(100,10000000)))
- self.delim = m.hexdigest()
- self.remoteHost = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
+ mpart = md5.new()
+ mpart.update("%f%d%d" % (time.time(), os.getpid(), \
+ random.randint(100, 10000000)))
+ self.delim = mpart.hexdigest()
+ self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
self.cache = osrf.cache.CacheClient()
return apache.OK
if not self.body:
return apache.HTTP_BAD_REQUEST
- if not self.setToAddr():
+ if not self.set_to_addr():
return apache.HTTP_BAD_REQUEST
- if not self.parseRequest():
+ if not self.parse_request():
return apache.HTTP_BAD_REQUEST
while self.handle.recv(0):
pass # drop stale messages
- netMsg = osrfNetworkMessage(to=self.to, thread=self.thread, body=self.body)
- self.handle.send(netMsg)
+ net_msg = NetworkMessage(recipient=self.recipient, thread=self.thread, \
+ body=self.body)
+ self.handle.send(net_msg)
- if self.disconnectOnly:
- osrfLogDebug("exiting early on DISCONNECT")
+ if self.disconnect_only:
+ osrf.log.logDebug("exiting early on DISCONNECT")
return apache.OK
- firstWrite = True
+ first_write = True
while not self.complete:
- netMsg = self.handle.recv(self.timeout)
- if not netMsg:
+ net_msg = self.handle.recv(self.timeout)
+ if not net_msg:
return apache.GATEWAY_TIME_OUT
- if not self.checkStatus(netMsg):
+ if not self.check_status(net_msg):
continue
- if firstWrite:
- self.initHeaders(netMsg)
- firstWrite = False
+ if first_write:
+ self.init_headers(net_msg)
+ first_write = False
if self.multipart:
- self.respondChunk(netMsg)
+ self.respond_chunk(net_msg)
else:
- self.messages.append(netMsg.body)
+ self.messages.append(net_msg.body)
+ # condense the sets of arrays into a single array of messages
if self.complete:
-
- # condense the sets of arrays into a single array of messages
json = self.messages.pop(0)
while len(self.messages) > 0:
- m = self.messages.pop(0)
- json = "%s,%s" % (json[0:len(json)-1], m[1:])
+ msg = self.messages.pop(0)
+ json = "%s,%s" % (json[0:len(json)-1], msg[1:])
self.write("%s" % json)
return apache.OK
- def parseRequest(self):
- ''' If this is solely a DISCONNECT message, we set self.disconnectOnly to true
- @return True if the body parses correctly, False otherwise
+ def parse_request(self):
+ '''
+ If this is solely a DISCONNECT message, we set self.disconnect_only
+ to true
+ @return True if the body parses correctly, False otherwise
'''
- osrfMsgs = osrfJSONToObject(self.body)
- if not osrfMsgs:
+ osrf_msgs = osrf.json.to_object(self.body)
+ if not osrf_msgs:
return False
- if len(osrfMsgs) == 1 and osrfMsgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
- self.disconnectOnly = True
+ if len(osrf_msgs) == 1 and \
+ osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
+ self.disconnect_only = True
return True
- def setToAddr(self):
+ def set_to_addr(self):
''' Determines the TO address. Returns false if
the address is missing or ambiguous.
Also returns false if an explicit TO is specified and the
thread/IP/TO combination is not found in the session cache
'''
if self.service:
- if self.to:
- osrfLogWarn("specifying both SERVICE and TO is not allowed")
+ if self.recipient:
+ osrf.log.osrfLogWarn("specifying both SERVICE and TO is not allowed")
return False
- self.to = "%s@%s/%s" % (ROUTER_NAME, OSRF_DOMAIN, self.service)
+ self.recipient = "%s@%s/%s" % \
+ (ROUTER_NAME, OSRF_DOMAIN, self.service)
return True
else:
- if self.to:
- # If the client specifies a specific TO address, verify it's the same
- # address that was cached with the previous request.
+ if self.recipient:
+ # If the client specifies a specific TO address, verify it's
+ # the same address that was cached with the previous request.
obj = self.cache.get(self.thread)
- if obj and obj['ip'] == self.remoteHost and obj['jid'] == self.to:
+ if obj and obj['ip'] == self.remote_host and \
+ obj['jid'] == self.recipient:
return True
- osrfLogWarn("client [%s] attempted to send directly [%s] without a session" % (self.remoteHost, self.to))
+ osrf.log.osrfLogWarn("client [%s] attempted to send directly "
+ "[%s] without a session" % (self.remote_host, self.recipient))
return False
- def initHeaders(self, netMsg):
- self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = netMsg.sender
+ def init_headers(self, net_msg):
+ self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
if self.multipart:
self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
self.write("--%s\n" % self.delim)
else:
self.apreq.content_type = JSON_CONTENT_TYPE
- self.cache.put(self.thread, {'ip':self.remoteHost, 'jid': netMsg.sender}, CACHE_TIME)
+ self.cache.put(self.thread, \
+ {'ip':self.remote_host, 'jid': net_msg.sender}, CACHE_TIME)
- osrfLogDebug("caching session [%s] for host [%s] and server drone [%s]" % (
- self.thread, self.remoteHost, netMsg.sender))
+ osrf.log.logDebug("caching session [%s] for host [%s] and server "
+ " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
- def checkStatus(self, netMsg):
+ def check_status(self, net_msg):
''' Checks the status of the server response.
If we received a timeout message, we drop it.
if it's any other non-continue status, we mark this session as
@return False if there is no data to return to the caller
(dropped message, eg. timeout), True otherwise '''
- osrfMsgs = osrfJSONToObject(netMsg.body)
- lastMsg = osrfMsgs.pop()
+ osrf_msgs = osrf.json.to_object(net_msg.body)
+ last_msg = osrf_msgs.pop()
- if lastMsg.type() == OSRF_MESSAGE_TYPE_STATUS:
- code = int(lastMsg.payload().statusCode())
+ if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
+ code = int(last_msg.payload().statusCode())
if code == OSRF_STATUS_TIMEOUT:
- osrfLogDebug("removing cached session [%s] and dropping TIMEOUT message" % netMsg.thread)
- self.cache.delete(netMsg.thread)
+ osrf.log.logDebug("removing cached session [%s] and "
+ "dropping TIMEOUT message" % net_msg.thread)
+ self.cache.delete(net_msg.thread)
return False
if code != OSRF_STATUS_CONTINUE:
return True
- def respondChunk(self, resp):
+ def respond_chunk(self, resp):
''' Writes a single multipart-delimited chunk of data '''
self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
import simplejson, types
-from osrf.net_obj import *
+from osrf.net_obj import NetworkObject, parse_net_object
from osrf.const import OSRF_JSON_PAYLOAD_KEY, OSRF_JSON_CLASS_KEY
-class osrfJSONNetworkEncoder(simplejson.JSONEncoder):
+class NetworkEncoder(simplejson.JSONEncoder):
def default(self, obj):
- if isinstance(obj, osrfNetworkObject):
- reg = obj.getRegistry()
- data = obj.getData()
+ if isinstance(obj, NetworkObject):
+ reg = obj.get_registry()
+ data = obj.get_data()
# re-encode the object as an array if necessary
- if reg.wireProtocol == 'array':
- d = []
- for k in reg.keys:
- d.append(data.get(k))
- data = d
+ if reg.protocol == 'array':
+ objarray = []
+ for key in reg.keys:
+ objarray.append(data.get(key))
+ data = objarray
return {
OSRF_JSON_CLASS_KEY: reg.hint,
return obj
-def osrfObjectToJSON(obj):
+def to_json(obj):
"""Turns a python object into a wrapped JSON object"""
- return simplejson.dumps(obj, cls=osrfJSONNetworkEncoder)
+ return simplejson.dumps(obj, cls=NetworkEncoder)
-def osrfJSONToObject(json):
+def to_object(json):
"""Turns a JSON string into python objects"""
obj = simplejson.loads(json)
- return parseNetObject(obj)
+ return parse_net_object(obj)
-def osrfParseJSONRaw(json):
+def parse_json_raw(json):
"""Parses JSON the old fashioned way."""
return simplejson.loads(json)
-def osrfToJSONRaw(obj):
+def to_json_raw(obj):
"""Stringifies an object as JSON with no additional logic."""
return simplejson.dumps(obj)
-def __tabs(t):
- r=''
- for i in range(t): r += ' '
- return r
+def __tabs(depth):
+ space = ''
+ while range(depth):
+ space += ' '
+ return space
-def osrfDebugNetworkObject(obj, t=1):
+def debug_net_object(obj, depth=1):
"""Returns a debug string for a given object.
- If it's an osrfNetworkObject and has registered keys, key/value p
- pairs are returned. Otherwise formatted JSON is returned"""
+ If it's an NetworkObject and has registered keys, key/value pairs
+ are returned. Otherwise formatted JSON is returned"""
- s = ''
- if isinstance(obj, osrfNetworkObject):
- reg = obj.getRegistry()
+ debug_str = ''
+ if isinstance(obj, NetworkObject):
+ reg = obj.get_registry()
keys = list(reg.keys) # clone it, so sorting won't break the original
keys.sort()
while len(key) < 24: key += '.' # pad the names to make the values line up somewhat
val = getattr(obj, k)()
- subobj = val and not (isinstance(val,unicode) or \
+ subobj = val and not (isinstance(val, unicode) or \
isinstance(val, int) or isinstance(val, float) or isinstance(val, long))
- s += __tabs(t) + key + ' = '
+ debug_str += __tabs(depth) + key + ' = '
if subobj:
- s += '\n'
- val = osrfDebugNetworkObject(val, t+1)
+ debug_str += '\n'
+ val = debug_net_object(val, depth+1)
- s += str(val)
+ debug_str += str(val)
- if not subobj: s += '\n'
+ if not subobj: debug_str += '\n'
else:
- s = osrfFormatJSON(osrfObjectToJSON(obj))
- return s
+ debug_str = pprint(to_json(obj))
+ return debug_str
-def osrfFormatJSON(json):
+def pprint(json):
"""JSON pretty-printer"""
r = ''
t = 0
r += c
return r
-
-
-
-
-
-
import traceback, sys, os, re, threading
from osrf.const import *
-logSema = threading.BoundedSemaphore(value=1)
+LOG_SEMAPHORE = threading.BoundedSemaphore(value=1)
-loglevel = OSRF_LOG_DEBUG
-logtype = OSRF_LOG_TYPE_STDERR
-logfile = None
+LOG_LEVEL = OSRF_LOG_DEBUG
+LOG_TYPE = OSRF_LOG_TYPE_STDERR
+LOG_FILE = None
+FRGX = re.compile('/.*/')
-def osrfInitLog(level, facility=None, file=None):
+
+def initialize(level, facility=None, logfile=None):
"""Initialize the logging subsystem."""
- global loglevel, logtype, logfile
+ global LOG_LEVEL, LOG_TYPE, LOG_FILE
- loglevel = level
+ LOG_LEVEL = level
if facility:
try:
sys.stderr.write("syslog not found, logging to stderr\n")
return
- logtype = OSRF_LOG_TYPE_SYSLOG
- osrfInitSyslog(facility, level)
+ LOG_TYPE = OSRF_LOG_TYPE_SYSLOG
+ initialize_syslog(facility, level)
return
- if file:
- logtype = OSRF_LOG_TYPE_FILE
- logfile = file
+ if logfile:
+ LOG_TYPE = OSRF_LOG_TYPE_FILE
+ LOG_FILE = logfile
# -----------------------------------------------------------------------
# Define wrapper functions for the log levels
# -----------------------------------------------------------------------
-def osrfLogInternal(s): __osrfLog(OSRF_LOG_INTERNAL,s)
-def osrfLogDebug(s): __osrfLog(OSRF_LOG_DEBUG,s)
-def osrfLogInfo(s): __osrfLog(OSRF_LOG_INFO,s)
-def osrfLogWarn(s): __osrfLog(OSRF_LOG_WARN,s)
-def osrfLogErr(s): __osrfLog(OSRF_LOG_ERR,s)
-
-
-frgx = re.compile('/.*/')
+def osrfLogInternal(s):
+ __osrfLog(OSRF_LOG_INTERNAL, s)
+def logDebug(s):
+ __osrfLog(OSRF_LOG_DEBUG, s)
+def osrfLogInfo(s):
+ __osrfLog(OSRF_LOG_INFO, s)
+def osrfLogWarn(s):
+ __osrfLog(OSRF_LOG_WARN, s)
+def logError(s):
+ __osrfLog(OSRF_LOG_ERR, s)
def __osrfLog(level, msg):
"""Builds the log message and passes the message off to the logger."""
- global loglevel, logtype
+ global LOG_LEVEL, LOG_TYPE
try:
import syslog
sys.stderr.write('ERR ' + msg)
return
- if int(level) > int(loglevel): return
+ if int(level) > int(LOG_LEVEL): return
# find the caller info for logging the file and line number
tb = traceback.extract_stack(limit=3)
tb = tb[0]
lvl = 'DEBG'
- if level == OSRF_LOG_INTERNAL: lvl = 'INT '
- if level == OSRF_LOG_INFO: lvl = 'INFO'
- if level == OSRF_LOG_WARN: lvl = 'WARN'
- if level == OSRF_LOG_ERR: lvl = 'ERR '
+ if level == OSRF_LOG_INTERNAL:
+ lvl = 'INT '
+ if level == OSRF_LOG_INFO:
+ lvl = 'INFO'
+ if level == OSRF_LOG_WARN:
+ lvl = 'WARN'
+ if level == OSRF_LOG_ERR:
+ lvl = 'ERR '
- file = frgx.sub('',tb[0])
- msg = '[%s:%d:%s:%s:%s] %s' % (lvl, os.getpid(), file, tb[1], threading.currentThread().getName(), msg)
+ filename = FRGX.sub('', tb[0])
+ msg = '[%s:%d:%s:%s:%s] %s' % (lvl, os.getpid(), filename, tb[1], threading.currentThread().getName(), msg)
- if logtype == OSRF_LOG_TYPE_SYSLOG:
- __logSyslog(level, msg)
+ if LOG_TYPE == OSRF_LOG_TYPE_SYSLOG:
+ __log_syslog(level, msg)
else:
- if logtype == OSRF_LOG_TYPE_FILE:
- __logFile(msg)
+ if LOG_TYPE == OSRF_LOG_TYPE_FILE:
+ __log_file(msg)
else:
sys.stderr.write("%s\n" % msg)
- if level == OSRF_LOG_ERR and logtype != OSRF_LOG_TYPE_STDERR:
+ if level == OSRF_LOG_ERR and LOG_TYPE != OSRF_LOG_TYPE_STDERR:
sys.stderr.write(msg + '\n')
-def __logSyslog(level, msg):
+def __log_syslog(level, msg):
''' Logs the message to syslog '''
import syslog
slvl = syslog.LOG_DEBUG
- if level == OSRF_LOG_INTERNAL: slvl=syslog.LOG_DEBUG
- if level == OSRF_LOG_INFO: slvl = syslog.LOG_INFO
- if level == OSRF_LOG_WARN: slvl = syslog.LOG_WARNING
- if level == OSRF_LOG_ERR: slvl = syslog.LOG_ERR
+ if level == OSRF_LOG_INTERNAL:
+ slvl = syslog.LOG_DEBUG
+ if level == OSRF_LOG_INFO:
+ slvl = syslog.LOG_INFO
+ if level == OSRF_LOG_WARN:
+ slvl = syslog.LOG_WARNING
+ if level == OSRF_LOG_ERR:
+ slvl = syslog.LOG_ERR
syslog.syslog(slvl, msg)
-def __logFile(msg):
+def __log_file(msg):
''' Logs the message to a file. '''
- global logfile, logtype
+ global LOG_FILE, LOG_TYPE
- f = None
+ logfile = None
try:
- f = open(logfile, 'a')
+ logfile = open(LOG_FILE, 'a')
except:
- sys.stderr.write("cannot open log file for writing: %s\n", logfile)
- logtype = OSRF_LOG_TYPE_STDERR
+ sys.stderr.write("cannot open log file for writing: %s\n", LOG_FILE)
+ LOG_TYPE = OSRF_LOG_TYPE_STDERR
return
try:
- logSema.acquire()
- f.write("%s\n" % msg)
+ LOG_SEMAPHORE.acquire()
+ logfile.write("%s\n" % msg)
finally:
- logSema.release()
+ LOG_SEMAPHORE.release()
- f.close()
-
-
+ logfile.close()
-def osrfInitSyslog(facility, level):
+def initialize_syslog(facility, level):
"""Connect to syslog and set the logmask based on the level provided."""
import syslog
level = int(level)
- if facility == 'local0': facility = syslog.LOG_LOCAL0
- if facility == 'local1': facility = syslog.LOG_LOCAL1
- if facility == 'local2': facility = syslog.LOG_LOCAL2
- if facility == 'local3': facility = syslog.LOG_LOCAL3
- if facility == 'local4': facility = syslog.LOG_LOCAL4
- if facility == 'local5': facility = syslog.LOG_LOCAL5
- if facility == 'local6': facility = syslog.LOG_LOCAL6
+ if facility == 'local0':
+ facility = syslog.LOG_LOCAL0
+ if facility == 'local1':
+ facility = syslog.LOG_LOCAL1
+ if facility == 'local2':
+ facility = syslog.LOG_LOCAL2
+ if facility == 'local3':
+ facility = syslog.LOG_LOCAL3
+ if facility == 'local4':
+ facility = syslog.LOG_LOCAL4
+ if facility == 'local5':
+ facility = syslog.LOG_LOCAL5
+ if facility == 'local6':
+ facility = syslog.LOG_LOCAL6
# add other facility maps if necessary...
syslog.openlog(sys.argv[0], 0, facility)
from pyxmpp.message import Message
from pyxmpp.jid import JID
from socket import gethostname
-from osrf.log import *
+import osrf.log
import os, time, threading
-import logging
-threadSessions = {}
+THREAD_SESSIONS = {}
# - log jabber activity (for future reference)
+#import logging
#logger=logging.getLogger()
#logger.addHandler(logging.StreamHandler())
#logger.addHandler(logging.FileHandler('j.log'))
#logger.setLevel(logging.DEBUG)
-def osrfSetNetworkHandle(handle):
+def set_network_handle(handle):
""" Sets the thread-specific network handle"""
- threadSessions[threading.currentThread().getName()] = handle
+ THREAD_SESSIONS[threading.currentThread().getName()] = handle
-def osrfGetNetworkHandle():
+def get_network_handle():
""" Returns the thread-specific network connection handle."""
- return threadSessions.get(threading.currentThread().getName())
+ return THREAD_SESSIONS.get(threading.currentThread().getName())
-class osrfNetworkMessage(object):
+class NetworkMessage(object):
"""Network message
attributes:
sender - message sender
- to - message recipient
+ recipient - message recipient
body - the body of the message
thread - the message thread
+ locale - locale of the message
"""
def __init__(self, message=None, **args):
if message:
self.body = message.get_body()
self.thread = message.get_thread()
- self.to = message.get_to()
- if message.xmlnode.hasProp('router_from') and message.xmlnode.prop('router_from') != '':
+ self.recipient = message.get_to()
+ if message.xmlnode.hasProp('router_from') and \
+ message.xmlnode.prop('router_from') != '':
self.sender = message.xmlnode.prop('router_from')
- else: self.sender = message.get_from().as_utf8()
+ else:
+ self.sender = message.get_from().as_utf8()
+ self.locale = None # XXX fix me good
else:
- if args.has_key('sender'): self.sender = args['sender']
- if args.has_key('to'): self.to = args['to']
- if args.has_key('body'): self.body = args['body']
- if args.has_key('thread'): self.thread = args['thread']
-
-
-class osrfNetwork(JabberClient):
+ if args.has_key('sender'):
+ self.sender = args['sender']
+ if args.has_key('recipient'):
+ self.recipient = args['recipient']
+ if args.has_key('body'):
+ self.body = args['body']
+ if args.has_key('thread'):
+ self.thread = args['thread']
+ if args.has_key('locale'):
+ self.thread = args['locale']
+
+class Network(JabberClient):
def __init__(self, **args):
self.isconnected = False
resource = 'python'
if args.has_key('resource'):
resource = args['resource']
- resource += '_' + gethostname()+':'+ str(os.getpid()) + '_'+ threading.currentThread().getName().lower()
+ resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
+ threading.currentThread().getName().lower()
self.jid = JID(args['username'], args['host'], resource)
- osrfLogDebug("initializing network with JID %s and host=%s, port=%s, username=%s" %
- (self.jid.as_utf8(), args['host'], args['port'], args['username']))
+ osrf.log.logDebug("initializing network with JID %s and host=%s, "
+ "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
+ args['port'], args['username']))
#initialize the superclass
JabberClient.__init__(self, self.jid, args['password'], args['host'])
self.queue = []
- self.recvCallback = None
+ self.receive_callback = None
def connect(self):
JabberClient.connect(self)
while not self.isconnected:
stream = self.get_stream()
act = stream.loop_iter(10)
- if not act: self.idle()
+ if not act:
+ self.idle()
- def setRecvCallback(self, func):
+ def set_receive_callback(self, func):
"""The callback provided is called when a message is received.
The only argument to the function is the received message. """
- self.recvCallback = func
+ self.receive_callback = func
def session_started(self):
- osrfLogInfo("Successfully connected to the opensrf network")
+ osrf.log.osrfLogInfo("Successfully connected to the opensrf network")
self.authenticated()
- self.stream.set_message_handler("normal",self.message_received)
+ self.stream.set_message_handler("normal", self.message_received)
self.isconnected = True
def send(self, message):
"""Sends the provided network message."""
- osrfLogInternal("jabber sending to %s: %s" % (message.to, message.body))
- msg = Message(None, None, message.to, None, None, None, message.body, message.thread)
+ osrf.log.osrfLogInternal("jabber sending to %s: %s" % \
+ (message.recipient, message.body))
+ msg = Message(None, None, message.recipient, None, None, None, \
+ message.body, message.thread)
self.stream.send(msg)
def message_received(self, stanza):
if stanza.get_type()=="headline":
return True
# check for errors
- osrfLogInternal("jabber received message from %s : %s"
+ osrf.log.osrfLogInternal("jabber received message from %s : %s"
% (stanza.get_from().as_utf8(), stanza.get_body()))
- self.queue.append(osrfNetworkMessage(stanza))
+ self.queue.append(NetworkMessage(stanza))
return True
def recv(self, timeout=120):
timeout - max number of seconds to wait for a message.
If a message is received in 'timeout' seconds, the message is passed to
- the recvCallback is called and True is returned. Otherwise, false is returned."""
+ the receive_callback is called and True is returned. Otherwise, false is
+ returned.
+ """
if len(self.queue) == 0:
while timeout >= 0 and len(self.queue) == 0:
act = self.get_stream().loop_iter(timeout)
endtime = time.time() - starttime
timeout -= endtime
- osrfLogInternal("exiting stream loop after %s seconds. act=%s, queue size=%d" % (str(endtime),act, len(self.queue)))
- if not act: self.idle()
+ osrf.log.osrfLogInternal("exiting stream loop after %s seconds. "
+ "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
+ if not act:
+ self.idle()
# if we've acquired a message, handle it
msg = None
if len(self.queue) > 0:
msg = self.queue.pop(0)
- if self.recvCallback:
- self.recvCallback(msg)
+ if self.receive_callback:
+ self.receive_callback(msg)
return msg
from osrf.const import OSRF_JSON_PAYLOAD_KEY, OSRF_JSON_CLASS_KEY
+import re
from xml.sax import saxutils
# -----------------------------------------------------------
# Global object registry
-objectRegistry = {}
+OBJECT_REGISTRY = {}
-class osrfNetworkRegistry(object):
+class NetworkRegistry(object):
''' Network-serializable objects must be registered. The class
hint maps to a set (ordered in the case of array-base objects)
of field names (keys).
'''
- def __init__(self, hint, keys, wireProtocol):
- global objectRegistry
+ def __init__(self, hint, keys, protocol):
+ global OBJECT_REGISTRY
self.hint = hint
self.keys = keys
- self.wireProtocol = wireProtocol
- objectRegistry[hint] = self
+ self.protocol = protocol
+ OBJECT_REGISTRY[hint] = self
- def getRegistry(hint):
- global objectRegistry
- return objectRegistry.get(hint)
- getRegistry = staticmethod(getRegistry)
+ def get_registry(hint):
+ global OBJECT_REGISTRY
+ return OBJECT_REGISTRY.get(hint)
+
+ get_registry = staticmethod(get_registry)
# -----------------------------------------------------------
# Define the base class for all network-serializable objects
# -----------------------------------------------------------
-class osrfNetworkObject(object):
+class NetworkObject(object):
''' Base class for all network serializable objects '''
# link to our registry object for this registered class
self._data = data
if not data: self._data = {}
if isinstance(data, list):
- self.importArrayData(list)
+ self.import_array_data(list)
- def importArrayData(self, data):
+ def import_array_data(self, data):
''' If an array-based object is created with an array
of data, cycle through and load the data '''
self._data = {}
if len(data) == 0: return
- reg = self.getRegistry()
- if reg.wireProtocol == 'array':
- for i in range(len(reg.keys)):
- if len(data) > i: break
- self.setField(reg.keys[i], data[i])
+ reg = self.get_registry()
+ if reg.protocol == 'array':
+ for entry in range(len(reg.keys)):
+ if len(data) > entry: break
+ self.set_field(reg.keys[entry], data[entry])
- def getData(self):
+ def get_data(self):
''' Returns the full dataset for this object as a dict '''
return self._data
- def setField(self, field, value):
+ def set_field(self, field, value):
self._data[field] = value
- def getField(self, field):
+ def get_field(self, field):
return self._data.get(field)
- def getRegistry(cls):
+ def get_registry(cls):
''' Returns the registry object for this registered class '''
return cls.registry
- getRegistry = classmethod(getRegistry)
+ get_registry = classmethod(get_registry)
-def osrfNewObjectFromHint(hint):
+def new_object_from_hint(hint):
''' Given a hint, this will create a new object of that
type and return it. If this hint is not registered,
- an object of type osrfNetworkObject.__unknown is returned'''
+ an object of type NetworkObject.__unknown is returned'''
try:
obj = None
- exec('obj = osrfNetworkObject.%s()' % hint)
+ exec('obj = NetworkObject.%s()' % hint)
return obj
except AttributeError:
- return osrfNetworkObject.__unknown()
-
-
-
+ return NetworkObject.__unknown()
def __makeNetworkAccessor(cls, key):
''' Creates and accessor/mutator method for the given class.
the field on the object whose data we are accessing '''
def accessor(self, *args):
if len(args) != 0:
- self.setField(key, args[0])
- return self.getField(key)
+ self.set_field(key, args[0])
+ return self.get_field(key)
setattr(cls, key, accessor)
-def osrfNetworkRegisterHint(hint, keys, type='hash'):
+def NetworkRegisterHint(hint, keys, type='hash'):
''' Registers a new network-serializable object class.
'hint' is the class hint
'''
# register the class with the global registry
- registry = osrfNetworkRegistry(hint, keys, type)
+ registry = NetworkRegistry(hint, keys, type)
# create the new class locally with the given hint name
- exec('class %s(osrfNetworkObject):\n\tpass' % hint)
+ exec('class %s(NetworkObject):\n\tpass' % hint)
# give the new registered class a local handle
cls = None
for k in keys:
__makeNetworkAccessor(cls, k)
- # attach our new class to the osrfNetworkObject
+ # attach our new class to the NetworkObject
# class so others can access it
- setattr(osrfNetworkObject, hint , cls)
+ setattr(NetworkObject, hint , cls)
cls.registry = registry
# create a unknown object to handle unregistred types
-osrfNetworkRegisterHint('__unknown', [], 'hash')
+NetworkRegisterHint('__unknown', [], 'hash')
# -------------------------------------------------------------------
# Define the custom object parsing behavior
# -------------------------------------------------------------------
-def parseNetObject(obj):
+def parse_net_object(obj):
try:
-
hint = obj[OSRF_JSON_CLASS_KEY]
- subObj = obj[OSRF_JSON_PAYLOAD_KEY]
- reg = osrfNetworkRegistry.getRegistry(hint)
+ sub_object = obj[OSRF_JSON_PAYLOAD_KEY]
+ reg = NetworkRegistry.get_registry(hint)
obj = {}
- if reg.wireProtocol == 'array':
- for i in range(len(reg.keys)):
- if len(subObj) > i:
- obj[reg.keys[i]] = parseNetObject(subObj[i])
+ if reg.protocol == 'array':
+ for entry in range(len(reg.keys)):
+ if len(sub_object) > entry:
+ obj[reg.keys[entry]] = parse_net_object(sub_object[entry])
else:
- obj[reg.keys[i]] = None
+ obj[reg.keys[entry]] = None
else:
- for k in reg.keys:
- obj[k] = parseNetObject(subObj.get(k))
+ for key in reg.keys:
+ obj[key] = parse_net_object(sub_object.get(key))
- estr = 'obj = osrfNetworkObject.%s(obj)' % hint
+ estr = 'obj = NetworkObject.%s(obj)' % hint
try:
exec(estr)
- except e:
+ except:
# this object has not been registered, shove it into the default container
- obj = osrfNetworkObject.__unknown(obj)
+ obj = NetworkObject.__unknown(obj)
return obj
- except: pass
+ except:
+ pass
# the current object does not have a class hint
if isinstance(obj, list):
- for i in range(len(obj)):
- obj[i] = parseNetObject(obj[i])
+ for entry in range(len(obj)):
+ obj[entry] = parse_net_object(obj[entry])
else:
if isinstance(obj, dict):
- for k,v in obj.iteritems():
- obj[k] = parseNetObject(v)
+ for key, value in obj.iteritems():
+ obj[key] = parse_net_object(value)
- return obj;
+ return obj
-def osrfObjectToXML(obj):
+def to_xml(obj):
""" Returns the XML representation of an internal object."""
chars = []
- __osrfObjectToXML(obj, chars)
+ __to_xml(obj, chars)
return ''.join(chars)
-def __osrfObjectToXML(obj, chars):
+def __to_xml(obj, chars):
""" Turns an internal object into OpenSRF XML """
if obj is None:
chars.append('<number>%f</number>' % obj)
return
- classHint = None
-
- if isinstance(obj, osrfNetworkObject):
+ if isinstance(obj, NetworkObject):
- registry = obj.getRegistry()
- data = obj.getData()
+ registry = obj.get_registry()
+ data = obj.get_data()
hint = saxutils.escape(registry.hint)
- if registry.wireProtocol == 'array':
+ if registry.protocol == 'array':
chars.append("<array class_hint='%s'>" % hint)
- for k in registry.keys:
- __osrfObjectToXML(data.get(k), chars)
+ for key in registry.keys:
+ __to_xml(data.get(key), chars)
chars.append('</array>')
else:
- if registry.wireProtocol == 'hash':
+ if registry.protocol == 'hash':
chars.append("<object class_hint='%s'>" % hint)
- for k,v in data.items():
- chars.append("<element key='%s'>" % saxutils.escape(k))
- __osrfObjectToXML(v, chars)
+ for key, value in data.items():
+ chars.append("<element key='%s'>" % saxutils.escape(key))
+ __to_xml(value, chars)
chars.append('</element>')
chars.append('</object>')
if isinstance(obj, list):
chars.append('<array>')
- for i in obj:
- __osrfObjectToXML(i, chars)
+ for entry in obj:
+ __to_xml(entry, chars)
chars.append('</array>')
return
if isinstance(obj, dict):
chars.append('<object>')
- for k,v in obj.items():
- chars.append("<element key='%s'>" % saxutils.escape(k))
- __osrfObjectToXML(v, chars)
+ for key, value in obj.items():
+ chars.append("<element key='%s'>" % saxutils.escape(key))
+ __to_xml(value, chars)
chars.append('</element>')
chars.append('</object>')
return
if isinstance(obj, bool):
val = 'false'
- if obj: val = 'true'
+ if obj:
+ val = 'true'
chars.append("<boolean value='%s'/>" % val)
return
+def find_object_path(obj, path, idx=None):
+ """Searches an object along the given path for a value to return.
+
+ Path separators can be '/' or '.', '/' is tried first."""
+
+ parts = []
+
+ if re.search('/', path):
+ parts = path.split('/')
+ else:
+ parts = path.split('.')
+
+ for part in parts:
+ try:
+ val = obj[part]
+ except:
+ return None
+ if isinstance(val, str):
+ return val
+ if isinstance(val, list):
+ if idx != None:
+ return val[idx]
+ return val
+ if isinstance(val, dict):
+ obj = val
+ else:
+ return val
+
+ return obj
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-from osrf.json import *
-from osrf.net_obj import *
-from osrf.conf import osrfConfigValue
-from osrf.net import osrfNetworkMessage, osrfGetNetworkHandle
-from osrf.log import *
-from osrf.const import *
-import random, sys, os, time, threading
+import osrf.json
+import osrf.conf
+import osrf.log
+import osrf.net
+import osrf.net_obj
+from osrf.const import OSRF_APP_SESSION_CONNECTED, \
+ OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
+ OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
+ OSRF_MESSAGE_TYPE_REQUEST
+import osrf.ex
+import random, os, time, threading
# -----------------------------------------------------------------------
# Go ahead and register the common network objects
# -----------------------------------------------------------------------
-osrfNetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
-osrfNetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
-osrfNetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
-osrfNetworkRegisterHint('osrfConnectStatus', ['status','statusCode'], 'hash')
-osrfNetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
+osrf.net_obj.NetworkRegisterHint('osrfMessage', ['threadTrace', 'type', 'payload'], 'hash')
+osrf.net_obj.NetworkRegisterHint('osrfMethod', ['method', 'params'], 'hash')
+osrf.net_obj.NetworkRegisterHint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
+osrf.net_obj.NetworkRegisterHint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
+osrf.net_obj.NetworkRegisterHint('osrfMethodException', ['status', 'statusCode'], 'hash')
-class osrfSession(object):
+class Session(object):
"""Abstract session superclass."""
''' Global cache of in-service sessions '''
- sessionCache = {}
+ session_cache = {}
def __init__(self):
# by default, we're connected to no one
self.state = OSRF_APP_SESSION_DISCONNECTED
+ self.remote_id = None
- def findSession(threadTrace):
- return osrfSession.sessionCache.get(threadTrace)
- findSession = staticmethod(findSession)
+ def find_session(threadTrace):
+ return Session.session_cache.get(threadTrace)
+ find_session = staticmethod(find_session)
def wait(self, timeout=120):
"""Wait up to <timeout> seconds for data to arrive on the network"""
- osrfLogInternal("osrfSession.wait(%d)" % timeout)
- handle = osrfGetNetworkHandle()
+ osrf.log.osrfLogInternal("Session.wait(%d)" % timeout)
+ handle = osrf.net.get_network_handle()
handle.recv(timeout)
def send(self, omessage):
"""Sends an OpenSRF message"""
- netMessage = osrfNetworkMessage(
- to = self.remoteId,
- body = osrfObjectToJSON([omessage]),
+ net_msg = osrf.net.NetworkMessage(
+ recipient = self.remote_id,
+ body = osrf.json.to_json([omessage]),
thread = self.thread )
- handle = osrfGetNetworkHandle()
- handle.send(netMessage)
+ handle = osrf.net.get_network_handle()
+ handle.send(net_msg)
def cleanup(self):
"""Removes the session from the global session cache."""
- del osrfSession.sessionCache[self.thread]
+ del Session.session_cache[self.thread]
-class osrfClientSession(osrfSession):
+class ClientSession(Session):
"""Client session object. Use this to make server requests."""
def __init__(self, service):
# call superclass constructor
- osrfSession.__init__(self)
+ Session.__init__(self)
# the remote service we want to make requests of
self.service = service
# find the remote service handle <router>@<domain>/<service>
- domain = osrfConfigValue('domains.domain', 0)
- router = osrfConfigValue('router_name')
- self.remoteId = "%s@%s/%s" % (router, domain, service)
- self.origRemoteId = self.remoteId
+ domain = osrf.conf.get('domains.domain', 0)
+ router = osrf.conf.get('router_name')
+ self.remote_id = "%s@%s/%s" % (router, domain, service)
+ self.orig_remote_id = self.remote_id
# generate a random message thread
self.thread = "%s%s%s%s" % (os.getpid(),
str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
# how many requests this session has taken part in
- self.nextId = 0
+ self.next_id = 0
# cache of request objects
self.requests = {}
# cache this session in the global session cache
- osrfSession.sessionCache[self.thread] = self
+ Session.session_cache[self.thread] = self
- def resetRequestTimeout(self, rid):
- req = self.findRequest(rid)
+ def reset_request_timeout(self, rid):
+ req = self.find_request(rid)
if req:
- req.resetTimeout = True
+ req.reset_timeout = True
def request2(self, method, arr):
def __request(self, method, arr):
"""Builds the request object and sends it."""
if self.state != OSRF_APP_SESSION_CONNECTED:
- self.resetRemoteId()
+ self.reset_remote_id()
- osrfLogDebug("Sending request %s -> %s " % (self.service, method))
- req = osrfRequest(self, self.nextId, method, arr)
- self.requests[str(self.nextId)] = req
- self.nextId += 1
+ osrf.log.logDebug("Sending request %s -> %s " % (self.service, method))
+ req = Request(self, self.next_id, method, arr)
+ self.requests[str(self.next_id)] = req
+ self.next_id += 1
req.send()
return req
# construct and send a CONNECT message
self.send(
- osrfNetworkObject.osrfMessage(
+ osrf.net_obj.NetworkObject.osrfMessage(
{ 'threadTrace' : 0,
'type' : OSRF_MESSAGE_TYPE_CONNECT
}
timeout -= time.time() - start
if self.state != OSRF_APP_SESSION_CONNECTED:
- raise osrfServiceException("Unable to connect to " + self.service)
+ raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
return True
return True
self.send(
- osrfNetworkObject.osrfMessage(
+ osrf.net_obj.NetworkObject.osrfMessage(
{ 'threadTrace' : 0,
'type' : OSRF_MESSAGE_TYPE_DISCONNECT
}
self.state = OSRF_APP_SESSION_DISCONNECTED
-
-
- def setRemoteId(self, remoteid):
- self.remoteId = remoteid
- osrfLogInternal("Setting request remote ID to %s" % self.remoteId)
+ def set_remote_id(self, remoteid):
+ self.remote_id = remoteid
+ osrf.log.osrfLogInternal("Setting request remote ID to %s" % self.remote_id)
- def resetRemoteId(self):
+ def reset_remote_id(self):
"""Recovers the original remote id"""
- self.remoteId = self.origRemoteId
- osrfLogInternal("Resetting remote ID to %s" % self.remoteId)
+ self.remote_id = self.orig_remote_id
+ osrf.log.osrfLogInternal("Resetting remote ID to %s" % self.remote_id)
- def pushResponseQueue(self, message):
+ def push_response_queue(self, message):
"""Pushes the message payload onto the response queue
for the request associated with the message's ID."""
- osrfLogDebug("pushing %s" % message.payload())
+ osrf.log.logDebug("pushing %s" % message.payload())
try:
- self.findRequest(message.threadTrace()).pushResponse(message.payload())
+ self.find_request(message.threadTrace()).pushResponse(message.payload())
except Exception, e:
- osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
+ osrf.log.osrfLogWarn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
- def findRequest(self, rid):
+ def find_request(self, rid):
"""Returns the original request matching this message's threadTrace."""
try:
return self.requests[str(rid)]
except KeyError:
- osrfLogDebug('findRequest(): non-existent request %s' % str(rid))
+ osrf.log.logDebug('find_request(): non-existent request %s' % str(rid))
return None
-class osrfRequest(object):
+class Request(object):
"""Represents a single OpenSRF request.
A request is made and any resulting respones are
collected for the client."""
- def __init__(self, session, id, method=None, params=[]):
+ def __init__(self, session, rid, method=None, params=[]):
self.session = session # my session handle
- self.id = id # my unique request ID
+ self.rid = rid # my unique request ID
self.method = method # method name
self.params = params # my method params
self.queue = [] # response queue
- self.resetTimeout = False # resets the recv timeout?
+ self.reset_timeout = False # resets the recv timeout?
self.complete = False # has the server told us this request is done?
- self.sendTime = 0 # local time the request was put on the wire
- self.completeTime = 0 # time the server told us the request was completed
- self.firstResponseTime = 0 # time it took for our first reponse to be received
+ self.send_time = 0 # local time the request was put on the wire
+ self.complete_time = 0 # time the server told us the request was completed
+ self.first_response_time = 0 # time it took for our first reponse to be received
def send(self):
"""Sends a request message"""
# construct the method object message with params and method name
- method = osrfNetworkObject.osrfMethod( {
+ method = osrf.net_obj.NetworkObject.osrfMethod( {
'method' : self.method,
'params' : self.params
} )
# construct the osrf message with our method message embedded
- message = osrfNetworkObject.osrfMessage( {
- 'threadTrace' : self.id,
+ message = osrf.net_obj.NetworkObject.osrfMessage( {
+ 'threadTrace' : self.rid,
'type' : OSRF_MESSAGE_TYPE_REQUEST,
'payload' : method
} )
- self.sendTime = time.time()
+ self.send_time = time.time()
self.session.send(message)
def recv(self, timeout=120):
self.session.wait(0)
- origTimeout = timeout
+ orig_timeout = timeout
while not self.complete and timeout >= 0 and len(self.queue) == 0:
s = time.time()
self.session.wait(timeout)
timeout -= time.time() - s
- if self.resetTimeout:
- self.resetTimeout = False
- timeout = origTimeout
+ if self.reset_timeout:
+ self.reset_timeout = False
+ timeout = orig_timeout
now = time.time()
# -----------------------------------------------------------------
# log some statistics
if len(self.queue) > 0:
- if not self.firstResponseTime:
- self.firstResponseTime = now
- osrfLogDebug("time elapsed before first response: %f" \
- % (self.firstResponseTime - self.sendTime))
+ if not self.first_response_time:
+ self.first_response_time = now
+ osrf.log.logDebug("time elapsed before first response: %f" \
+ % (self.first_response_time - self.send_time))
if self.complete:
- if not self.completeTime:
- self.completeTime = now
- osrfLogDebug("time elapsed before complete: %f" \
- % (self.completeTime - self.sendTime))
+ if not self.complete_time:
+ self.complete_time = now
+ osrf.log.logDebug("time elapsed before complete: %f" \
+ % (self.complete_time - self.send_time))
# -----------------------------------------------------------------
"""Cleans up request data from the cache.
Do this when you are done with a request to prevent "leaked" cache memory."""
- del self.session.requests[str(self.id)]
+ del self.session.requests[str(self.rid)]
- def setComplete(self):
+ def set_complete(self):
"""Sets me as complete. This means the server has sent a 'request complete' message"""
self.complete = True
-class osrfServerSession(osrfSession):
+class ServerSession(Session):
"""Implements a server-side session"""
pass
-def osrfAtomicRequest(service, method, *args):
- ses = osrfClientSession(service)
+def AtomicRequest(service, method, *args):
+ ses = ClientSession(service)
req = ses.request2(method, list(args))
resp = req.recv()
data = resp.content()
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-from osrf.utils import *
-from osrf.const import *
-from osrf.ex import *
+from osrf.const import OSRF_APP_SETTINGS, OSRF_METHOD_GET_HOST_CONFIG
+import osrf.ex
+import osrf.net_obj
# global settings config object
__config = None
-def osrfSettingsValue(path, idx=0):
- global __config
- val = osrfObjectFindPath(__config, path, idx)
- if not val:
- raise osrfConfigException("Config value not found: " + path)
- return val
+def get(path, idx=0):
+ global __config
+ val = osrf.net_obj.find_object_path(__config, path, idx)
+ if not val:
+ raise osrf.ex.OSRFConfigException("Config value not found: " + path)
+ return val
-def osrfLoadSettings(hostname):
- global __config
+def load(hostname):
+ global __config
- from osrf.system import osrfConnect
- from osrf.ses import osrfClientSession
+ from osrf.system import connect
+ from osrf.ses import ClientSession
- ses = osrfClientSession(OSRF_APP_SETTINGS)
- req = ses.request(OSRF_METHOD_GET_HOST_CONFIG, hostname)
- resp = req.recv(timeout=30)
- __config = resp.content()
- req.cleanup()
- ses.cleanup()
+ ses = ClientSession(OSRF_APP_SETTINGS)
+ req = ses.request(OSRF_METHOD_GET_HOST_CONFIG, hostname)
+ resp = req.recv(timeout=30)
+ __config = resp.content()
+ req.cleanup()
+ ses.cleanup()
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-from osrf.json import *
-from osrf.log import *
-from osrf.ex import *
-from osrf.ses import osrfSession, osrfClientSession, osrfServerSession
-from osrf.const import *
-from time import time
+import osrf.json
+import osrf.log
+import osrf.ex
+import osrf.ses
+from osrf.const import OSRF_APP_SESSION_CONNECTED, \
+ OSRF_APP_SESSION_DISCONNECTED, OSRF_MESSAGE_TYPE_RESULT, \
+ OSRF_MESSAGE_TYPE_STATUS, OSRF_STATUS_COMPLETE, OSRF_STATUS_CONTINUE, \
+ OSRF_STATUS_NOTFOUND, OSRF_STATUS_OK, OSRF_STATUS_TIMEOUT
+import time
-def osrfPushStack(netMessage):
- ses = osrfSession.findSession(netMessage.thread)
+def push(net_msg):
+ ses = osrf.ses.Session.find_session(net_msg.thread)
if not ses:
# This is an incoming request from a client, create a new server session
- osrfLogErr("server-side sessions don't exist yet")
- pass
+ osrf.log.logError("server-side sessions don't exist yet")
- ses.setRemoteId(netMessage.sender)
+ ses.set_remote_id(net_msg.sender)
- oMessages = osrfJSONToObject(netMessage.body)
+ omessages = osrf.json.to_object(net_msg.body)
- osrfLogInternal("osrfPushStack(): received %d messages" % len(oMessages))
+ osrf.log.osrfLogInternal("push(): received %d messages" \
+ % len(omessages))
# Pass each bundled opensrf message to the message handler
- t = time()
- for m in oMessages:
- osrfHandleMessage(ses, m)
- t = time() - t
+ start = time.time()
+ for msg in omessages:
+ handle_message(ses, msg)
+ duration = time.time() - start
- if isinstance(ses, osrfServerSession):
- osrfLogInfo("Message processing duration %f" % t)
+ if isinstance(ses, osrf.ses.ServerSession):
+ osrf.log.osrfLogInfo("Message processing duration %f" % duration)
-def osrfHandleMessage(session, message):
+def handle_message(session, message):
- osrfLogInternal("osrfHandleMessage(): processing message of type %s" % message.type())
+ osrf.log.osrfLogInternal("handle_message(): processing message of "
+ "type %s" % message.type())
- if isinstance(session, osrfClientSession):
+ if isinstance(session, osrf.ses.ClientSession):
if message.type() == OSRF_MESSAGE_TYPE_RESULT:
- session.pushResponseQueue(message)
+ session.push_response_queue(message)
return
if message.type() == OSRF_MESSAGE_TYPE_STATUS:
- statusCode = int(message.payload().statusCode())
- statusText = message.payload().status()
- osrfLogInternal("osrfHandleMessage(): processing STATUS, statusCode = %d" % statusCode)
+ status_code = int(message.payload().statusCode())
+ status_text = message.payload().status()
+ osrf.log.osrfLogInternal("handle_message(): processing STATUS, "
+ "status_code = %d" % status_code)
- if statusCode == OSRF_STATUS_COMPLETE:
+ if status_code == OSRF_STATUS_COMPLETE:
# The server has informed us that this request is complete
- req = session.findRequest(message.threadTrace())
+ req = session.find_request(message.threadTrace())
if req:
- osrfLogInternal("marking request as complete: %d" % req.id)
- req.setComplete()
+ osrf.log.osrfLogInternal("marking request as complete: %d" % req.rid)
+ req.set_complete()
return
- if statusCode == OSRF_STATUS_OK:
+ if status_code == OSRF_STATUS_OK:
# We have connected successfully
- osrfLogDebug("Successfully connected to " + session.service)
+ osrf.log.logDebug("Successfully connected to " + session.service)
session.state = OSRF_APP_SESSION_CONNECTED
return
- if statusCode == OSRF_STATUS_CONTINUE:
+ if status_code == OSRF_STATUS_CONTINUE:
# server is telling us to reset our wait timeout and keep waiting for a response
- session.resetRequestTimeout(message.threadTrace())
- return;
+ session.reset_request_timeout(message.threadTrace())
+ return
- if statusCode == OSRF_STATUS_TIMEOUT:
- osrfLogDebug("The server did not receive a request from us in time...")
+ if status_code == OSRF_STATUS_TIMEOUT:
+ osrf.log.logDebug("The server did not receive a request from us in time...")
session.state = OSRF_APP_SESSION_DISCONNECTED
return
- if statusCode == OSRF_STATUS_NOTFOUND:
- osrfLogErr("Requested method was not found on the server: %s" % statusText)
+ if status_code == OSRF_STATUS_NOTFOUND:
+ osrf.log.logError("Requested method was not found on the server: %s" % status_text)
session.state = OSRF_APP_SESSION_DISCONNECTED
- raise osrfServiceException(statusText)
+ raise osrf.ex.OSRFServiceException(status_text)
- raise osrfProtocolException("Unknown message status: %d" % statusCode)
+ raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
# GNU General Public License for more details.
# -----------------------------------------------------------------------
-from osrf.conf import osrfConfig, osrfConfigValue, osrfConfigValueNoEx
-from osrf.net import osrfNetwork, osrfSetNetworkHandle, osrfGetNetworkHandle
-from osrf.stack import osrfPushStack
-from osrf.log import *
-from osrf.set import osrfLoadSettings
+from osrf.conf import Config, get, get_no_ex
+from osrf.net import Network, set_network_handle, get_network_handle
+import osrf.stack
+import osrf.log
+import osrf.set
import sys
-def osrfConnect(configFile, configContext):
+def connect(configFile, configContext):
""" Connects to the opensrf network """
- if osrfGetNetworkHandle():
+ if get_network_handle():
''' This thread already has a handle '''
return
# parse the config file
- configParser = osrfConfig(configFile, configContext)
+ configParser = Config(configFile, configContext)
configParser.parseConfig()
# set up logging
- osrfInitLog(
- osrfConfigValue('loglevel'),
- osrfConfigValueNoEx('syslog'),
- osrfConfigValueNoEx('logfile'))
+ osrf.log.initialize(
+ osrf.conf.get('loglevel'),
+ osrf.conf.get_no_ex('syslog'),
+ osrf.conf.get_no_ex('logfile'))
# connect to the opensrf network
- network = osrfNetwork(
- host=osrfConfigValue('domains.domain'),
- port=osrfConfigValue('port'),
- username=osrfConfigValue('username'),
- password=osrfConfigValue('passwd'))
- network.setRecvCallback(osrfPushStack)
- osrfSetNetworkHandle(network)
+ network = Network(
+ host = osrf.conf.get('domains.domain'),
+ port = osrf.conf.get('port'),
+ username = osrf.conf.get('username'),
+ password = osrf.conf.get('passwd'))
+ network.set_receive_callback(osrf.stack.push)
+ osrf.net.set_network_handle(network)
network.connect()
# load the domain-wide settings file
- osrfLoadSettings(osrfConfigValue('domains.domain'))
-
-
-
-
+ osrf.set.load(osrf.conf.get('domains.domain'))
-import xml.dom.minidom, re
+import xml.dom.minidom
-def osrfXMLFileToObject(filename):
+def xml_file_to_object(filename):
"""Turns the contents of an XML file into a Python object"""
doc = xml.dom.minidom.parse(filename)
- obj = osrfXMLNodeToObject(doc.documentElement)
+ obj = xml_node_to_object(doc.documentElement)
doc.unlink()
return obj
-def osrfXMLStringToObject(string):
+def xml_string_to_object(string):
"""Turns an XML string into a Python object"""
doc = xml.dom.minidom.parseString(string)
- obj = osrfXMLNodeToObject(doc.documentElement)
+ obj = xml_node_to_object(doc.documentElement)
doc.unlink()
return obj
-def osrfXMLNodeToObject(xmlNode):
+def xml_node_to_object(xml_node):
"""Turns an XML node into a Python object"""
obj = {}
- if xmlNode.nodeType != xmlNode.ELEMENT_NODE:
+ if xml_node.nodeType != xml_node.ELEMENT_NODE:
return obj
done = False
- nodeName = xmlNode.nodeName
+ node_name = xml_node.nodeName
- for nodeChild in xmlNode.childNodes:
- if nodeChild.nodeType == xmlNode.ELEMENT_NODE:
- subObj = osrfXMLNodeToObject(nodeChild);
- __appendChildNode(obj, nodeName, nodeChild.nodeName, subObj)
+ for node_child in xml_node.childNodes:
+ if node_child.nodeType == xml_node.ELEMENT_NODE:
+ sub_obj = xml_node_to_object(node_child)
+ __append_child_node(obj, node_name, node_child.nodeName, sub_obj)
done = True
- for attr in xmlNode.attributes.values():
- __appendChildNode(obj, nodeName, attr.name, dict([(attr.name, attr.value)]))
+ for attr in xml_node.attributes.values():
+ __append_child_node(obj, node_name, attr.name,
+ dict([(attr.name, attr.value)]))
- if not done and len(xmlNode.childNodes) > 0:
+ if not done and len(xml_node.childNodes) > 0:
# If the node has no element children, clean up the text
# content and use that as the data
- textNode = xmlNode.childNodes[0] # extract the text node
- data = unicode(textNode.nodeValue).replace('^\s*','')
+ text_node = xml_node.childNodes[0] # extract the text node
+ data = unicode(text_node.nodeValue).replace('^\s*','')
data = data.replace('\s*$','')
- if nodeName in obj:
+ if node_name in obj:
# the current element contains attributes and text
- obj[nodeName]['#text'] = data
+ obj[node_name]['#text'] = data
else:
# the current element contains text only
- obj[nodeName] = data
+ obj[node_name] = data
return obj
-def __appendChildNode(obj, nodeName, childName, subObj):
+def __append_child_node(obj, node_name, child_name, sub_obj):
""" If a node has element children, create a new sub-object
for this node, attach an array for each type of child
and recursively collect the children data into the array(s) """
- if not obj.has_key(nodeName):
- obj[nodeName] = {}
+ if not obj.has_key(node_name):
+ obj[node_name] = {}
- if not obj[nodeName].has_key(childName):
- # we've encountered 1 sub-node with nodeChild's name
- if childName in subObj:
- obj[nodeName][childName] = subObj[childName]
+ if not obj[node_name].has_key(child_name):
+ # we've encountered 1 sub-node with node_child's name
+ if child_name in sub_obj:
+ obj[node_name][child_name] = sub_obj[child_name]
else:
- obj[nodeName][childName] = None
+ obj[node_name][child_name] = None
else:
- if isinstance(obj[nodeName][childName], list):
- # we already have multiple sub-nodes with nodeChild's name
- obj[nodeName][childName].append(subObj[childName])
+ if isinstance(obj[node_name][child_name], list):
+ # we already have multiple sub-nodes with node_child's name
+ obj[node_name][child_name].append(sub_obj[child_name])
else:
- # we already have 1 sub-node with nodeChild's name, make
+ # we already have 1 sub-node with node_child's name, make
# it a list and append the current node
- val = obj[nodeName][childName]
- obj[nodeName][childName] = [ val, subObj[childName] ]
+ val = obj[node_name][child_name]
+ obj[node_name][child_name] = [ val, sub_obj[child_name] ]
-def osrfObjectFindPath(obj, path, idx=None):
- """Searches an object along the given path for a value to return.
-
- Path separaters can be '/' or '.', '/' is tried first."""
-
- parts = []
-
- if re.search('/', path):
- parts = path.split('/')
- else:
- parts = path.split('.')
-
- for part in parts:
- try:
- o = obj[part]
- except Exception:
- return None
- if isinstance(o,str):
- return o
- if isinstance(o,list):
- if( idx != None ):
- return o[idx]
- return o
- if isinstance(o,dict):
- obj = o
- else:
- return o
-
- return obj
-
-
-
-
# vim:et:ts=4
import os, sys, time, readline, atexit, re
import osrf.json
-from osrf.system import osrfConnect
-from osrf.ses import osrfClientSession
-from osrf.conf import osrfConfigValue
+import osrf.system
+import osrf.ses
+import osrf.conf
# -------------------------------------------------------------------
# Set env variables to control behavior
# -------------------------------------------------------------------
def handle_set(parts):
- m = re.compile('(.*)=(.*)').match(parts[0])
- key = m.group(1)
- val = m.group(2)
+ pattern = re.compile('(.*)=(.*)').match(parts[0])
+ key = pattern.group(1)
+ val = pattern.group(2)
set_var(key, val)
print "%s = %s" % (key, val)
def handle_request(parts):
service = parts.pop(0)
method = parts.pop(0)
- jstr = '[%s]' % join(parts)
+ jstr = '[%s]' % "".join(parts)
params = None
try:
- params = osrf.json.osrfJSONToObject(jstr)
+ params = osrf.json.to_object(jstr)
except:
print "Error parsing JSON: %s" % jstr
return
- ses = osrfClientSession(service)
+ ses = osrf.ses.ClientSession(service)
end = None
start = time.time()
resp = req.recv(timeout=120)
if not end:
total = time.time() - start
- if not resp: break
+ if not resp:
+ break
otp = get_var('SRFSH_OUTPUT')
if otp == 'pretty':
- print "\n" + osrf.json.osrfDebugNetworkObject(resp.content())
+ print "\n" + osrf.json.debug_net_object(resp.content())
else:
- print osrf.json.osrfFormatJSON(osrfObjectToJSON(resp.content()))
+ print osrf.json.pprint(osrf.json.to_json(resp.content()))
req.cleanup()
ses.cleanup()
def handle_math_bench(parts):
count = int(parts.pop(0))
- ses = osrfClientSession('opensrf.math')
+ ses = osrf.ses.ClientSession('opensrf.math')
times = []
- for i in range(100):
- if i % 10: sys.stdout.write('.')
- else: sys.stdout.write( str( i / 10 ) )
- print "";
+ for cnt in range(100):
+ if cnt % 10:
+ sys.stdout.write('.')
+ else:
+ sys.stdout.write( str( cnt / 10 ) )
+ print ""
- for i in range(count):
+ for cnt in range(count):
starttime = time.time()
req = ses.request('add', 1, 2)
print "What happened? %s" % str(resp.content())
req.cleanup()
- if not ( (i+1) % 100):
- print ' [%d]' % (i+1)
+ if not ( (cnt + 1) % 100):
+ print ' [%d]' % (cnt + 1)
ses.cleanup()
total = 0
- for i in times: total += i
+ for cnt in times:
+ total += cnt
print "\naverage time %f" % (total / len(times))
def do_connect():
file = os.path.join(get_var('HOME'), ".srfsh.xml")
print_green("Connecting to opensrf...")
- osrfConnect(file, 'srfsh')
+ osrf.system.connect(file, 'srfsh')
print_red('OK\n')
def load_plugins():
# Load the user defined external plugins
# XXX Make this a real module interface, with tab-complete words, commands, etc.
try:
- plugins = osrfConfigValue('plugins')
+ plugins = osrf.conf.get('plugins')
except:
# XXX standard srfsh.xml does not yet define <plugins> element
print_red("No plugins defined in /srfsh/plugins/plugin\n")
return
- plugins = osrfConfigValue('plugins.plugin')
+ plugins = osrf.conf.get('plugins.plugin')
if not isinstance(plugins, list):
plugins = [plugins]
def get_var(key):
- try: return os.environ[key]
- except: return ''
+ try:
+ return os.environ[key]
+ except:
+ return ''
def print_green(string):