@srcdir@/src/ports/strn_compat/strnlen.c \
@srcdir@/src/ports/strn_compat/strnlen.h
-python_FILES = @srcdir@/src/python/opensrf.py \
- @srcdir@/src/python/setup.py \
- @srcdir@/src/python/srfsh.py \
- @srcdir@/src/python/osrf
-
java_FILES = @srcdir@/src/java/deps.inc \
@srcdir@/src/java/deps.sh \
@srcdir@/src/java/org
@srcdir@/src/libopensrf/osrfConfig.c
-EXTRA_DIST = $(DOC_FILES) $(EXAMPLES_FILES) $(libosrf_FILES) $(strn_compat_FILES) $(python_FILES) $(java_FILES) @srcdir@/autogen.sh @srcdir@/src/extras @srcdir@/COPYING @srcdir@/DCO-1.1.txt @srcdir@/LICENSE.txt @srcdir@/src/perl @srcdir@/src/javascript
+EXTRA_DIST = $(DOC_FILES) $(EXAMPLES_FILES) $(libosrf_FILES) $(strn_compat_FILES) $(java_FILES) @srcdir@/autogen.sh @srcdir@/src/extras @srcdir@/COPYING @srcdir@/DCO-1.1.txt @srcdir@/LICENSE.txt @srcdir@/src/perl @srcdir@/src/javascript
OSRFINC=@srcdir@/include/opensrf
---------------------------------------------------------------------------
By default, OpenSRF includes C, Perl, and JavaScript support.
-You can add the `--enable-python` option to the configure command
-to build Python support and `--enable-java` for Java support.
+You can add the `--enable-java` for Java support.
If you are planning on proxying WebSockets traffic (see below), you
can add `--with-websockets-port=443` to specify that WebSockets traffic
/etc/init.d/haproxy start
---------------------------------------------------------------------------
-Troubleshooting note for Python users
--------------------------------------
-
-If you are running a Python client and trying to connect to OpenSRF running on
-localhost rather than a hostname that can be resolved via DNS, you will
-probably receive exceptions about `dns.resolver.NXDOMAIN`. If this happens,
-you need to install the `dnsmasq` package, configure it to serve up a DNS
-entry for localhost, and point your local DNS resolver to `dnsmasq`. For example,
-on Ubuntu you can issue the following commands as the *root* Linux account:
-
-.Installing and starting `dnsmasq`
-[source, bash]
----------------------------------------------------------------------------
-aptitude install dnsmasq
-/etc/init.d/dnsmasq restart
----------------------------------------------------------------------------
-
-Then edit `/etc/resolv.conf` and ensure that `nameserver 127.0.0.1` is the
-first entry in the file.
-
Getting help
------------
if ($svc->{lang} =~ /c/i) {
system("$C_COMMAND -a start -s $service");
return;
- } elsif ($svc->{lang} =~ /python/i) {
- system("$PY_COMMAND -a start -s $service");
- return;
}
}
}
showInstalled() {
JAVA=@OSRF_INSTALL_JAVA@
- PYTHON=@OSRF_INSTALL_PYTHON@
if test "$JAVA" = "true"; then
echo "OSRF_JAVA"
fi
- if test "$PYTHON" = "true"; then
- echo "OSRF_PYTHON"
- fi
}
showAll() {
prefix=$ac_default_prefix
fi
-# Perl and Python scripts don't want ${prefix} if no value was specified
+# Perl scripts don't want ${prefix} if no value was specified
eval "eval CONF_DIR=$sysconfdir"
eval "eval PID_DIR=$localstatedir"
AC_SUBST([CONF_DIR])
AC_SUBST(prefix)
AC_SUBST(bindir)
-
-AC_DEFUN([AC_PYTHON_MOD],[
- if test -z $PYTHON;
- then
- PYTHON="python"
- fi
- AC_MSG_CHECKING($PYTHON_NAME module: $1)
- $PYTHON -c "import $1" 2>/dev/null
- if test $? -eq 0;
- then
- AC_MSG_RESULT(yes)
- eval AS_TR_CPP(HAVE_PYMOD_$1)=yes
- else
- AC_MSG_ERROR(failed to find required module $1)
- exit 1
- fi
-])
-
#-------------------------------
# Installation options
#-------------------------------
AM_CONDITIONAL([BUILDCORE], [test x$OSRF_INSTALL_CORE = xtrue])
AC_SUBST([OSRF_INSTALL_CORE])
-
-# build and install the python modules
-AC_ARG_ENABLE([python],
-[ --enable-python enable building and installing python modules],
-[case "${enableval}" in
- yes) OSRF_INSTALL_PYTHON=true ;;
- no) OSRF_INSTALL_PYTHON=false ;;
- *) AC_MSG_ERROR([please choose another value for --enable-python (supported values are yes or no)]) ;;
-esac],
-[OSRF_INSTALL_PYTHON=false])
-
-AM_CONDITIONAL([BUILDPYTHON], [test x$OSRF_INSTALL_PYTHON = xtrue])
-AC_SUBST([OSRF_INSTALL_PYTHON])
-
# enable debug?
AC_ARG_ENABLE(debug,
AC_CONFIG_FILES([Makefile
src/Makefile])
-#PYTHON TESTS
-if test x$OSRF_INSTALL_PYTHON = xtrue; then
- AC_CHECK_PROG([HAVE_PYTHON],python,yes,no)
- if test $HAVE_PYTHON = "no"; then
- AC_MSG_ERROR([*** python not found, aborting])
- fi
- AC_PYTHON_MOD([setuptools])
- AC_CONFIG_FILES([
- examples/math_client.py
- examples/simple_text.py
- src/python/Makefile
- ])
-fi
# Check Unit test framework
PKG_CHECK_MODULES([CHECK], [check >= 0.9.0], [enable_tests=yes],
[enable_tests=no])
src/libopensrf/Makefile
src/perl/Makefile
src/ports/strn_compat/Makefile
- src/python/opensrf.py
src/router/Makefile
src/srfsh/Makefile
src/websocket-stdio/Makefile
AC_MSG_RESULT([OSRF install Java support? no])
fi
-if test "$OSRF_INSTALL_PYTHON" = "true" ; then
- AC_MSG_RESULT([OSRF install Python support? yes])
-else
- AC_MSG_RESULT([OSRF install Python support? no])
-fi
AC_MSG_RESULT(Installation directory prefix: ${prefix})
AC_MSG_RESULT(Temporary directory: ${TMP})
+++ /dev/null
-#!/usr/bin/env python
-import osrf.system
-import osrf.ses
-
-# XXX: Replace with command line arguments
-file = '@CONF_DIR@/opensrf_core.xml'
-operator = 'add'
-operand1 = 5
-operand2 = 7
-
-# Pull connection settings from <config><opensrf> section of opensrf_core.xml
-osrf.system.System.connect(config_file=file, config_context='config.opensrf')
-
-# Set up a connection to the opensrf.math service
-session = osrf.ses.ClientSession('opensrf.math')
-
-# Call one of the methods defined by the opensrf.math service
-request = session.request(operator, operand1, operand2)
-
-# Retrieve the response from the method
-response = request.recv(timeout=2)
-
-print(response.content())
-
-# Cleanup request and connection resources
-request.cleanup()
-session.cleanup()
+++ /dev/null
-#!/usr/bin/env python
-import osrf.system
-import osrf.ses
-
-# XXX: Replace with command line arguments
-file = '@CONF_DIR@/conf/opensrf_core.xml'
-method = 'opensrf.simple-text.reverse'
-text = 'raboof'
-
-# Pull connection settings from <config><opensrf> section of opensrf_core.xml
-osrf.system.System.connect(config_file=file, config_context='config.opensrf')
-
-# Set up a connection to the opensrf.math service
-session = osrf.ses.ClientSession('opensrf.simple-text')
-
-# Call one of the methods defined by the opensrf.math service
-request = session.request(method, text)
-
-# Retrieve the response from the method
-response = request.recv(timeout=2)
-
-print(response.content())
-
-# Cleanup request and connection resources
-request.cleanup()
-session.cleanup()
AM_LDFLAGS = $(DEF_LDFLAGS)
AM_CFLAGS = $(DEF_CFLAGS)
-if BUILDPYTHON
-MAYBE_PY = python
-endif
-
if BUILDJAVA
MAYBE_JA = java
endif
if BUILDCORE
MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
-if BUILDPYTHON
-dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl @top_srcdir@/src/python/opensrf.py @top_srcdir@/src/python/srfsh.py
-else
dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl
-endif
bin_SCRIPTS = @top_srcdir@/bin/osrf_config
dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example
endif
libxslt1-dev\
memcached\
pkg-config\
- python-coverage\
psmisc\
- python-dev\
- python-libxml2\
- python-memcache\
- python-nose\
- python-pyxmpp\
- python-setuptools\
- python-simplejson\
tar\
unzip\
zip\
libxml-libxml-perl \
libxml-libxslt-perl \
libxml2-devel \
- libxml2-python \
libxslt-devel \
make \
memcached \
perl-XML-LibXSLT \
perl-XML-Simple \
psmisc \
- python-devel \
- python-dns \
- python-memcached \
- python-setuptools \
- python-simplejson \
readline-devel \
tar
+++ /dev/null
-# makefile for OpenSRF Python modules and scripts
-
-DISTCLEANFILES = Makefile.in Makefile
-
-check:
- nosetests --with-coverage --cover-package=osrf
-
-all-local:
- @echo $@
- python @srcdir@/setup.py build
-
-# ------------------------------------------------------------------------------
-# INSTALL
-# ------------------------------------------------------------------------------
-install-data-local:
- @echo $@
- python @srcdir@/setup.py install --root $(DESTDIR)///
-
-distclean-local:
- rm -f @builddir@/OpenSRF.egg-info/PKG-INFO
- rm -f @builddir@/OpenSRF.egg-info/requires.txt
- rm -f @builddir@/OpenSRF.egg-info/top_level.txt
- rm -f @builddir@/OpenSRF.egg-info/SOURCES.txt
- rm -f @builddir@/OpenSRF.egg-info/dependency_links.txt
- rm -f @builddir@/build/scripts-2.5/srfsh.py
- rm -f @builddir@/dist/OpenSRF-1.0.0-py2.5.egg
-
+++ /dev/null
-#!/usr/bin/python
-# -----------------------------------------------------------------------
-# Copyright (C) 2008 Equinox Software, Inc.
-# Bill Erickson <erickson@esilibrary.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA
-# -----------------------------------------------------------------------
-
-'''
-Provides an environment for managing OpenSRF services written in Python
-'''
-
-import sys, getopt, os, signal
-import osrf.system, osrf.server, osrf.app, osrf.set, osrf.json
-import dns.resolver
-
-def do_help():
- '''
- Print help for the OpenSRF Python application process manager
- '''
-
- print '''
- Manage OpenSRF application processes
-
- Options:
- -a <action>
- list_all -- List all services
- start -- Start a service
- stop -- stop a service
- restart -- restart a service
- start_all -- Start all services
- stop_all -- Stop all services
- restart_all -- Restart all services
-
- -s <service>
- The service name
-
- -f <config file>
- The OpenSRF config file
-
- -c <config context>
- The OpenSRF config file context
-
- -p <PID dir>
- The location of application PID files. Default is @PID_DIR@/run/opensrf
-
- -d
- If set, run in daemon (background) mode. This creates a PID
- file for managing the process.
-
- -l
- If set, run in 'localhost' mode
-
- -h
- Prints help message
- '''
- sys.exit(0)
-
-def get_pid_file(service):
- '''
- Return the PID file for the named service
- '''
-
- return "%s/%s.pid" % (pid_dir, service)
-
-def do_init():
- '''
- Initialize the Python service environment
- '''
-
- global domain
- global settings
-
- try:
- # connect to the OpenSRF network
- osrf.system.System.net_connect(
- config_file = config_file, config_context = config_ctx)
- except dns.resolver.NXDOMAIN:
- dnsfail = """
-ERROR: Could not initialize the OpenSRF Python environment. A DNS query
-failed to resolve the network address of the XMPP server.
-"""
- sys.exit(dnsfail)
-
- if as_localhost:
- domain = 'localhost'
- else:
- domain = osrf.conf.get('domain')
-
- try:
- osrf.set.load(domain)
- except osrf.net.XMPPNoRecipient:
- print "* Unable to communicate with opensrf.settings. Giving up..."
- return
-
- settings = osrf.set.get('apps')
- activeapps = osrf.set.get('activeapps')
-
- for key in (set(settings.keys()) & set(activeapps['appname'])):
- svc = settings[key]
- if isinstance(svc, dict) and 'language' in svc and svc['language'] == 'python':
- services[key] = svc
-
-def do_start(service):
- '''
- Start the named Python service
- '''
-
- pidfile = get_pid_file(service)
-
- if service not in services:
- print "* service %s is not a 'python' application" % service
- return
-
- if os.path.exists(pidfile):
- try:
- pid_fd = open(pidfile, 'r')
- alive = os.getsid(int(pid_fd.read()))
- print "* service %s already running" % service
- return
- except OSError:
- os.remove(pidfile)
-
- print "* starting %s" % service
-
- if as_daemon:
-
- if osrf.system.System.daemonize(False):
- return # parent process returns
-
- # write PID file
- pid_fd = open(pidfile, 'w')
- pid_fd.write(str(os.getpid()))
- pid_fd.close()
-
- svc_settings = services[service]
-
- osrf.app.Application.load(service, svc_settings['implementation'])
- osrf.app.Application.register_sysmethods()
- osrf.app.Application.application.global_init()
-
- controller = osrf.server.Controller(service)
- controller.max_requests = svc_settings['unix_config']['max_requests']
- controller.max_children = svc_settings['unix_config']['max_children']
- controller.min_children = svc_settings['unix_config']['min_children']
- controller.keepalive = svc_settings['keepalive']
-
- controller.run()
- os._exit(0)
-
-def do_list_all():
- '''
- List all Python services listed in the OpenSRF configuration file
- '''
- for service in services.keys():
- print service
-
-def do_start_all():
- '''
- Start all Python services listed in the OpenSRF configuration file
- '''
-
- # You can't start more than one service without daemonizing
- global as_daemon
- as_daemon = True
-
- print "* starting all services for %s " % domain
- for service in services.keys():
- do_start(service)
-
-def do_stop_all():
- '''
- Stop all Python services listed in the OpenSRF configuration file
- '''
-
- print "* stopping all services for %s " % domain
- for service in services.keys():
- do_stop(service)
-
-def do_stop(service):
- '''
- Stop the named Python service
- '''
-
- pidfile = get_pid_file(service)
-
- if not os.path.exists(pidfile):
- print "* %s is not running" % service
- return
-
- print "* stopping %s" % service
-
- pid_fd = open(pidfile)
- pid = pid_fd.read()
- pid_fd.close()
- try:
- os.kill(int(pid), signal.SIGTERM)
- except:
- pass
- os.remove(pidfile)
-
-# -----------------------------------------------------
-
-# Parse the command line options
-ops, args = None, None
-try:
- ops, args = getopt.getopt(sys.argv[1:], 'a:s:f:c:p:dhl')
-except getopt.GetoptError, e:
- print '* %s' % str(e)
- do_help()
-
-options = dict(ops)
-
-if '-a' not in options:
- do_help()
-
-action = options['-a']
-
-config_file = options.get('-f', '@CONF_DIR@/opensrf_core.xml')
-pid_dir = options.get('-p', '@PID_DIR@/run/opensrf')
-
-service_name = options.get('-s')
-config_ctx = options.get('-c', 'config.opensrf')
-as_localhost = '-l' in options
-as_daemon = '-d' in options
-
-domain = None
-settings = None
-services = {}
-
-do_init()
-
-if action == 'start':
- do_start(service_name)
-
-elif action == 'stop':
- do_stop(service_name)
-
-elif action == 'restart':
- do_stop(service_name)
- do_start(service_name)
-
-elif action == 'list_all':
- do_list_all()
-
-elif action == 'start_all':
- do_start_all()
-
-elif action == 'stop_all':
- do_stop_all()
-
-elif action == 'restart_all':
- do_stop_all()
- do_start_all()
-
-elif action == 'help':
- do_help()
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2008 Equinox Software, Inc.
-# Bill Erickson <erickson@esilibrary.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA
-# -----------------------------------------------------------------------
-
-import time
-import osrf.log, osrf.ses, osrf.json
-
-
-class Method(object):
- def __init__(self, **kwargs):
- self.name = kwargs['api_name']
- self.handler = kwargs['method']
- self.stream = kwargs.get('stream', False)
- self.argc = kwargs.get('argc', 0)
- self.atomic = kwargs.get('atomic', False)
-
- def get_func(self):
- ''' Returns the function handler reference '''
- return getattr(Application.application, self.handler)
-
- def get_doc(self):
- ''' Returns the function documentation '''
- return self.get_func().func_doc
-
-
-
-class Application(object):
- ''' Base class for OpenSRF applications. Provides static methods
- for loading and registering applications as well as common
- application methods. '''
-
- # global application handle
- application = None
- name = None
- methods = {}
-
- def __init__(self):
- ''' Sets the application name and loads the application methods '''
- self.name = None
-
- def global_init(self):
- ''' Override this method to run code at application startup '''
- pass
-
- def child_init(self):
- ''' Override this method to run code at child startup.
- This is useful for initializing database connections or
- initializing other persistent resources '''
- pass
-
- def child_exit(self):
- ''' Override this method to run code at process exit time.
- This is useful for cleaning up resources like databaes
- handles, etc. '''
- pass
-
-
- @staticmethod
- def load(name, module_name):
- ''' Loads the provided application module '''
- Application.name = name
- try:
- osrf.log.log_info("Loading application module %s" % module_name)
- exec('import %s' % module_name)
- except Exception, e:
- osrf.log.log_error("Error importing application module %s:%s" % (
- module_name, unicode(e)))
-
- @staticmethod
- def register_app(app):
- ''' Registers an application for use '''
- app.name = Application.name
- Application.application = app
-
- @staticmethod
- def register_method(**kwargs):
- Application.methods[kwargs['api_name']] = Method(**kwargs)
- if kwargs.get('stream'):
- kwargs['atomic'] = 1
- kwargs['api_name'] += '.atomic'
- Application.methods[kwargs['api_name']] = Method(**kwargs)
-
-
- @staticmethod
- def handle_request(session, osrf_msg):
- ''' Find the handler, construct the server request, then run the method '''
-
- req_method = osrf_msg.payload()
- params = req_method.params() or []
- method_name = req_method.method()
- method = Application.methods.get(method_name)
-
- if method is None:
- session.send_method_not_found(osrf_msg.threadTrace(), method_name)
- return
-
- handler = method.get_func()
-
- param_json = osrf.json.to_json(params)
- param_json = param_json[1:len(param_json)-1]
-
- osrf.log.log_info("CALL: %s %s %s" % (session.service, method.name, param_json))
- server_req = osrf.ses.ServerRequest(session, osrf_msg.threadTrace(), method, params)
-
- result = None
- try:
- result = handler(server_req, *params)
- except Exception, e:
- osrf.log.log_error("Error running method %s %s %s" % (method.name, param_json, unicode(e)))
- session.send_status(
- osrf_msg.threadTrace(),
- osrf.net_obj.NetworkObject.osrfMethodException({
- 'status' : unicode(e),
- 'statusCode': osrf.const.OSRF_STATUS_INTERNALSERVERERROR
- })
- )
- return
-
- server_req.respond_complete(result)
-
- @staticmethod
- def register_sysmethods():
- ''' Registers the global system methods '''
-
- Application.register_method(
- api_name = 'opensrf.system.time',
- method = 'sysmethod_time',
- argc = 0,
- )
-
- Application.register_method(
- api_name = 'opensrf.system.method',
- method = 'sysmethod_introspect',
- argc = 0,
- stream = True
- )
-
- Application.register_method(
- api_name = 'opensrf.system.method.all',
- method = 'sysmethod_introspect',
- argc = 0,
- stream = True
- )
-
- Application.register_method(
- api_name = 'opensrf.system.echo',
- method = 'sysmethod_echo',
- argc = 1,
- stream = True
- )
-
- def sysmethod_time(self, request):
- '''@return type:number The current epoch time '''
- return time.time()
-
- def sysmethod_echo(self, request, *args):
- '''@return type:string The current epoch time '''
- for a in args:
- request.respond(a)
-
- def sysmethod_introspect(self, request, prefix=None):
- ''' Generates a list of methods with method metadata
- @param type:string The limiting method name prefix. If defined,
- only methods matching the given prefix will be returned.
- @return type:array List of method information '''
-
- for name, method in self.methods.iteritems():
- if prefix is not None and prefix != name[:len(prefix)]:
- continue
-
- request.respond({
- 'api_name' : name,
- 'method' : method.handler,
- 'service' : self.name,
- 'argc' : method.argc,
- 'params' : [], # XXX parse me
- 'desc' : method.get_doc() # XXX parse me
- })
-
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2008 Equinox Software, Inc.
-# Bill Erickson <erickson@esilibrary.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA
-# -----------------------------------------------------------------------
-import os
-import osrf.log
-from osrf.app import Application
-
-class Example(Application):
- ''' Example OpenSRF application. '''
-
- # ---------------------------------------------------------
- # Register a new method for this application
- # ---------------------------------------------------------
- Application.register_method(
- api_name = 'opensrf.py-example.reverse', # published API name for the method
- method = 'reverse', # name of def that implements this method
- argc = 1, # expects a single argument
- stream = True # returns a stream of results. can be called atomic-ly
- )
-
- # ---------------------------------------------------------
- # This method implements the API call registered above
- # ---------------------------------------------------------
- def reverse(self, request, message=''):
- ''' Returns the given string in reverse order one character at a time
- @param type:string Message to reverse
- @return type:string The reversed message, one character at a time. '''
- idx = len(message) - 1
- while idx >= 0:
- request.respond(message[idx])
- idx -= 1
-
- # ---------------------------------------------------------
- # Session data test
- # ---------------------------------------------------------
-
- Application.register_method(
- api_name = 'opensrf.stateful_session_test',
- method = 'session_test',
- argc = 0
- )
-
- def session_test(self, request):
- c = request.session.session_data.get('counter', 0) + 1
- request.session.session_data['counter'] = c
- return c
-
- # ---------------------------------------------------------
- # Session callbacks test
- # ---------------------------------------------------------
- Application.register_method(
- api_name = 'opensrf.session_callback_test',
- method = 'callback_test',
- argc = 0
- )
-
- def callback_test(self, request):
-
- def pre_req_cb(ses):
- osrf.log.log_info("running pre_request callback")
-
- def post_req_cb(ses):
- osrf.log.log_info("running post_request callback")
-
- def disconnect_cb(ses):
- osrf.log.log_info("running disconnect callback")
-
- def death_cb(ses):
- osrf.log.log_info("running death callback")
-
- ses = request.session
-
- ses.register_callback('pre_request', pre_req_cb)
- ses.register_callback('post_request', post_req_cb)
- ses.register_callback('disconnect', disconnect_cb)
- ses.register_callback('death', death_cb)
-
- c = ses.session_data.get('counter', 0) + 1
- ses.session_data['counter'] = c
- return c
-
-
- # ---------------------------------------------------------
- # These example methods override methods from
- # osrf.app.Application. They are not required.
- # ---------------------------------------------------------
- def global_init(self):
- osrf.log.log_debug("Running global init handler for %s" % __name__)
-
- def child_init(self):
- osrf.log.log_debug("Running child init handler for process %d" % os.getpid())
-
- def child_exit(self):
- osrf.log.log_debug("Running child exit handler for process %d" % os.getpid())
-
-
-# ---------------------------------------------------------
-# Now register an instance of this class as an application
-# ---------------------------------------------------------
-Application.register_app(Example())
-
-
+++ /dev/null
-import memcache
-from osrf.json import to_json, to_object
-import osrf.log
-
-'''
-Abstracted OpenSRF caching interface.
-Requires memcache: ftp://ftp.tummy.com/pub/python-memcached/
-'''
-
-_client = None
-defaultTimeout = 0
-
-class CacheException(Exception):
- def __init__(self, info):
- self.info = info
- def __str__(self):
- return "%s: %s" % (self.__class__.__name__, self.info)
-
-class CacheClient(object):
- def __init__(self, servers=None):
- ''' If no servers are provided, this instance will use
- the global memcache connection.
- servers takes the form ['server:port', 'server2:port2', ...]
- '''
- global _client
- if servers:
- self.client = memcache.Client(servers, debug=1)
- else:
- if not _client:
- raise CacheException(
- "not connected to any memcache servers."
- "try CacheClient.connect(servers)"
- )
- self.client = _client
-
- def put(self, key, val, timeout=None):
- global defaultTimeout
- if timeout is None:
- timeout = defaultTimeout
- timeout = int(timeout)
- json = to_json(val)
- osrf.log.log_internal("cache: %s => %s" % (str(key), json))
- return self.client.set(str(key), json, timeout)
-
- def get(self, key):
- obj = self.client.get(str(key))
- osrf.log.log_internal("cache: fetching %s => %s" % (str(key), obj))
- return to_object(obj or "null")
-
- def delete(self, key):
- osrf.log.log_internal("cache: deleting %s" % str(key))
- self.client.delete(str(key))
-
- @staticmethod
- def connect(svrs):
- global _client
- osrf.log.log_debug("cache: connecting to servers %s" % str(svrs))
- _client = memcache.Client(svrs, debug=1)
-
- @staticmethod
- def get_client():
- global _client
- return _client
-
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-
-import osrf.net_obj
-import osrf.ex
-import osrf.xml_obj
-import re
-
-class Config(object):
- """Loads and parses the bootstrap config file"""
-
- config = None
-
- def __init__(self, file, context=None):
- self.file = file
- self.context = context
- self.data = {}
-
- #def parseConfig(self,file=None):
- def parse_config(self):
- self.data = osrf.xml_obj.xml_file_to_object(self.file)
- Config.config = self
-
- def get_value(self, key, idx=None):
- if self.context:
- if re.search('/', key):
- key = "%s/%s" % (self.context, key)
- else:
- key = "%s.%s" % (self.context, key)
-
- val = osrf.net_obj.find_object_path(self.data, key, idx)
- if not val:
- raise osrf.ex.OSRFConfigException("Config value not found: " + key)
- return val
-
-
-def get(key, idx=None):
- """Returns a bootstrap config value.
-
- key -- A string representing the path to the value in the config object
- e.g. "domains.domain", "username"
- idx -- Optional array index if the searched value is an array member
- """
- return Config.config.get_value(key, idx)
-
-
-def get_no_ex(key, idx=None):
- """ Returns a bootstrap config value without throwing an exception
- if the item is not found.
-
- key -- A string representing the path to the value in the config object
- e.g. "domains.domain", "username"
- idx -- Optional array index if the searched value is an array member
- """
- try:
- return Config.config.get_value(key, idx)
- except:
- return None
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-#
-# Collection of global constants
-# -----------------------------------------------------------------------
-
-# -----------------------------------------------------------------------
-# log levels
-# -----------------------------------------------------------------------
-OSRF_LOG_ACT = -1
-OSRF_LOG_NONE = 0
-OSRF_LOG_ERR = 1
-OSRF_LOG_WARN = 2
-OSRF_LOG_INFO = 3
-OSRF_LOG_DEBUG = 4
-OSRF_LOG_INTERNAL = 5
-OSRF_LOG_TYPE_FILE = 1
-OSRF_LOG_TYPE_SYSLOG = 2
-OSRF_LOG_TYPE_STDERR = 3
-
-# -----------------------------------------------------------------------
-# Session states
-# -----------------------------------------------------------------------
-OSRF_APP_SESSION_CONNECTED = 0
-OSRF_APP_SESSION_CONNECTING = 1
-OSRF_APP_SESSION_DISCONNECTED = 2
-
-# -----------------------------------------------------------------------
-# OpenSRF message types
-# -----------------------------------------------------------------------
-OSRF_MESSAGE_TYPE_REQUEST = 'REQUEST'
-OSRF_MESSAGE_TYPE_STATUS = 'STATUS'
-OSRF_MESSAGE_TYPE_RESULT = 'RESULT'
-OSRF_MESSAGE_TYPE_CONNECT = 'CONNECT'
-OSRF_MESSAGE_TYPE_DISCONNECT = 'DISCONNECT'
-
-# -----------------------------------------------------------------------
-# OpenSRF message statuses
-# -----------------------------------------------------------------------
-OSRF_STATUS_CONTINUE = 100
-OSRF_STATUS_OK = 200
-OSRF_STATUS_ACCEPTED = 202
-OSRF_STATUS_COMPLETE = 205
-OSRF_STATUS_REDIRECTED = 307
-OSRF_STATUS_BADREQUEST = 400
-OSRF_STATUS_UNAUTHORIZED = 401
-OSRF_STATUS_FORBIDDEN = 403
-OSRF_STATUS_NOTFOUND = 404
-OSRF_STATUS_NOTALLOWED = 405
-OSRF_STATUS_TIMEOUT = 408
-OSRF_STATUS_EXPFAILED = 417
-OSRF_STATUS_INTERNALSERVERERROR = 500
-OSRF_STATUS_NOTIMPLEMENTED = 501
-OSRF_STATUS_SERVICEUNAVAILABLE = 503
-OSRF_STATUS_VERSIONNOTSUPPORTED = 505
-
-
-# -----------------------------------------------------------------------
-# Some well-known services
-# -----------------------------------------------------------------------
-OSRF_APP_SETTINGS = 'opensrf.settings'
-OSRF_APP_MATH = 'opensrf.math'
-
-
-# where do we find the settings config
-OSRF_METHOD_GET_HOST_CONFIG = 'opensrf.settings.host_config.get'
-
-OSRF_JSON_PAYLOAD_KEY = '__p'
-OSRF_JSON_CLASS_KEY = '__c'
-
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-#
-# This modules define the exception classes. In general, an
-# exception is little more than a name.
-# -----------------------------------------------------------------------
-
-class OSRFException(Exception):
- """Root class for exceptions."""
- def __init__(self, info=''):
- self.msg = '%s: %s' % (self.__class__.__name__, info)
- def __str__(self):
- return self.msg
-
-
-class NetworkException(OSRFException):
- def __init__(self):
- OSRFException.__init__('Error communicating with the OpenSRF network')
-
-class OSRFProtocolException(OSRFException):
- """Raised when something happens during opensrf network stack processing."""
- pass
-
-class OSRFServiceException(OSRFException):
- """Raised when there was an error communicating with a remote service."""
- pass
-
-class OSRFConfigException(OSRFException):
- """Invalid config option requested."""
- pass
-
-class OSRFNetworkObjectException(OSRFException):
- pass
-
-class OSRFJSONParseException(OSRFException):
- """Raised when a JSON parsing error occurs."""
- pass
-
+++ /dev/null
-from xml.dom import minidom
-from xml.sax import handler, make_parser, saxutils
-from osrf.json import to_object
-from osrf.net_obj import NetworkObject, new_object_from_hint
-import osrf.log
-import urllib, urllib2, sys, re
-
-defaultHost = None
-
-class GatewayRequest:
- def __init__(self, service, method, params=[]):
- self.service = service
- self.method = method
- self.params = params
- self.path = 'gateway'
- self.bytes_read = 0 # for now this, this is really characters read
-
- def setPath(self, path):
- self.path = path
-
- def send(self):
- params = self.buildPOSTParams()
- request = urllib2.Request(self.buildURL(), data=params)
- response = None
- try:
- response =urllib2.urlopen(request)
- except urllib2.HTTPError, e:
- # log this?
- sys.stderr.write('%s => %s?%s\n' % (unicode(e), self.buildURL(), params))
- raise e
-
- return self.handleResponse(response)
-
- def buildPOSTParams(self):
-
- params = urllib.urlencode({
- 'service': self.service,
- 'method': self.method,
- 'format': self.getFormat(),
- 'input_format': self.getInputFormat()
- })
-
- for p in self.params:
- params += '¶m=%s' % urllib.quote(self.encodeParam(p), "'/")
- return params
-
- def setDefaultHost(host):
- global defaultHost
- defaultHost = host
- setDefaultHost = staticmethod(setDefaultHost)
-
- def buildURL(self):
- """
- Builds the URL for the OpenSRF gateway based on the host and path
-
- Previous versions of the code assumed that the host would be a bare
- hostname or IP address, and prepended the http:// protocol. However,
- to enable more secure communications, now we check for the existence
- of the HTTP or HTTPS prefix and use that if it has been supplied.
- """
-
- if defaultHost.lower().startswith(('http://', 'https://')):
- return '%s/%s' % (defaultHost, self.path)
-
- return 'http://%s/%s' % (defaultHost, self.path)
-
-class JSONGatewayRequest(GatewayRequest):
- def __init__(self, service, method, *params):
- GatewayRequest.__init__(self, service, method, list(params))
-
- def getFormat(self):
- return 'json'
-
- def getInputFormat(self):
- return self.getFormat()
-
- def handleResponse(self, response):
-
- data = response.read()
- self.bytes_read = len(str(response.headers)) + len(data)
- obj = to_object(data)
-
- if obj['status'] != 200:
- sys.stderr.write('JSON gateway returned status %d:\n' % (obj['status']))
- return None
-
- # the gateway wraps responses in an array to handle streaming data
- # if there is only one item in the array, it (probably) wasn't a streaming request
- p = obj['payload']
- if len(p) > 1: return p
- if len(p): return p[0]
- return None
-
- def encodeParam(self, param):
- return osrf.json.to_json(param)
-
-class XMLGatewayRequest(GatewayRequest):
-
- def __init__(self, service, method, *params):
- GatewayRequest.__init__(self, service, method, list(params))
-
- def getFormat(self):
- return 'xml'
-
- def getInputFormat(self):
- return self.getFormat()
-
- def handleResponse(self, response):
- handler = XMLGatewayParser()
- parser = make_parser()
- parser.setContentHandler(handler)
- try:
- parser.parse(response)
- except Exception, e:
- osrf.log.log_error('Error parsing gateway XML: %s' % unicode(e))
- return None
-
- return handler.getResult()
-
- def encodeParam(self, param):
- return osrf.net_obj.to_xml(param);
-
-class XMLGatewayParser(handler.ContentHandler):
-
- def __init__(self):
- self.result = None
- self.objStack = []
- self.keyStack = []
- self.posStack = [] # for tracking array-based hinted object indices
-
- # true if we are parsing an element that may have character data
- self.charsPending = 0
-
- def getResult(self):
- return self.result
-
- def __getAttr(self, attrs, name):
- for (k, v) in attrs.items():
- if k == name:
- return v
- return None
-
- def startElement(self, name, attrs):
-
- if self.charsPending:
- # we just read a 'string' or 'number' element that resulted
- # in no text data. Appaned a None object
- self.appendChild(None)
-
- if name == 'null':
- self.appendChild(None)
- return
-
- if name == 'string' or name == 'number':
- self.charsPending = True
- return
-
- if name == 'element': # this is an object item wrapper
- self.keyStack.append(self.__getAttr(attrs, 'key'))
- return
-
- hint = self.__getAttr(attrs, 'class_hint')
- if hint:
- obj = new_object_from_hint(hint)
- self.appendChild(obj)
- self.objStack.append(obj)
- if name == 'array':
- self.posStack.append(0)
- return
-
- if name == 'array':
- obj = []
- self.appendChild(obj)
- self.objStack.append(obj)
- return
-
- if name == 'object':
- obj = {}
- self.appendChild(obj)
- self.objStack.append(obj)
- return
-
- if name == 'boolean':
- self.appendChild((self.__getAttr(attrs, 'value') == 'true'))
- return
-
-
- def appendChild(self, child):
-
- if self.result == None:
- self.result = child
-
- if not self.objStack: return;
-
- parent = self.objStack[len(self.objStack)-1]
-
- if isinstance(parent, list):
- parent.append(child)
- else:
- if isinstance(parent, dict):
- parent[self.keyStack.pop()] = child
- else:
- if isinstance(parent, NetworkObject):
- key = None
- if parent.get_registry().protocol == 'array':
- keys = parent.get_registry().keys
- i = self.posStack.pop()
- key = keys[i]
- if i+1 < len(keys):
- self.posStack.append(i+1)
- else:
- key = self.keyStack.pop()
-
- parent.set_field(key, child)
-
- def endElement(self, name):
- if name == 'array' or name == 'object':
- self.objStack.pop()
-
- def characters(self, chars):
- self.charsPending = False
- self.appendChild(urllib.unquote_plus(chars))
-
-
-
-
-
+++ /dev/null
-import sys, os, time, md5, random
-from mod_python import apache, util
-import osrf.system, osrf.cache, osrf.json, osrf.conf, osrf.net, osrf.log
-from osrf.const import OSRF_MESSAGE_TYPE_DISCONNECT, OSRF_MESSAGE_TYPE_CONNECT, \
- OSRF_STATUS_CONTINUE, OSRF_STATUS_TIMEOUT, OSRF_MESSAGE_TYPE_STATUS, OSRF_MESSAGE_TYPE_REQUEST
-
-
-'''
-Proof of concept OpenSRF-HTTP multipart streaming gateway.
-
-Example Apache mod_python config:
-
-<Location /osrf-http-translator>
- SetHandler mod_python
- PythonPath "['/path/to/osrf-python'] + sys.path"
- PythonHandler osrf.http_translator
- PythonOption OSRF_CONFIG /path/to/opensrf_core.xml
- PythonOption OSRF_CONFIG_CONTEXT config.gateway
- PythonOption OSRF_CACHE_SERVERS 127.0.0.1:11211
- # testing only
- PythonAutoReload On
-</Location>
-'''
-
-OSRF_HTTP_HEADER_TO = 'X-OpenSRF-to'
-OSRF_HTTP_HEADER_XID = 'X-OpenSRF-xid'
-OSRF_HTTP_HEADER_FROM = 'X-OpenSRF-from'
-OSRF_HTTP_HEADER_THREAD = 'X-OpenSRF-thread'
-OSRF_HTTP_HEADER_TIMEOUT = 'X-OpenSRF-timeout'
-OSRF_HTTP_HEADER_SERVICE = 'X-OpenSRF-service'
-OSRF_HTTP_HEADER_MULTIPART = 'X-OpenSRF-multipart'
-MULTIPART_CONTENT_TYPE = 'multipart/x-mixed-replace;boundary="%s"'
-JSON_CONTENT_TYPE = 'text/plain'
-CACHE_TIME = 300
-
-ROUTER_NAME = None
-OSRF_DOMAIN = None
-
-# If DEBUG_WRITE = True, all data sent to the client is also written
-# to stderr (apache error log)
-DEBUG_WRITE = False
-
-def _dbg(msg):
- ''' testing only '''
- sys.stderr.write("%s\n\n" % str(msg))
- sys.stderr.flush()
-
-INIT_COMPLETE = False
-def child_init(req):
- ''' At time of writing, mod_python doesn't support a child_init handler,
- so this function is called once per process to initialize
- the opensrf connection '''
-
- global INIT_COMPLETE, ROUTER_NAME, OSRF_DOMAIN
- if INIT_COMPLETE:
- return
-
- # Apache complains with: UnboundLocalError: local variable 'osrf' referenced before assignment
- # if the following import line is removed, even though its also at the top of the file...
- import osrf.system
-
- ops = req.get_options()
- conf = ops['OSRF_CONFIG']
- ctxt = ops.get('OSRF_CONFIG_CONTEXT') or 'opensrf'
- osrf.system.System.net_connect(config_file=conf, config_context=ctxt)
-
- ROUTER_NAME = osrf.conf.get('router_name')
- OSRF_DOMAIN = osrf.conf.get('domain')
- INIT_COMPLETE = True
-
- servers = ops.get('OSRF_CACHE_SERVERS')
- if servers:
- servers = servers.split(',')
- else:
- # no cache servers configured, see if we can talk to the settings server
- import osrf.set
- servers = osrf.set.get('cache.global.servers.server')
- if not isinstance(servers, list):
- servers = [servers]
-
- osrf.cache.CacheClient.connect(servers)
-
-
-def handler(req):
- ''' Create the translator and tell it to process the request. '''
- child_init(req)
-
- # capture the callback handle, clear it, then reset
- # it after we've handled the request
- handle = osrf.net.get_network_handle()
- callback = handle.receive_callback
- handle.set_receive_callback(None)
-
- try:
- translator = HTTPTranslator(req)
- status = translator.process()
- osrf.log.log_debug("translator call resulted in status %d" % int(status))
- if translator.local_xid:
- osrf.log.clear_xid()
- finally:
- handle.receive_callback = callback
-
- return status
-
-class HTTPTranslator(object):
- def __init__(self, apreq):
-
- self.apreq = apreq
-
- if OSRF_HTTP_HEADER_XID in apreq.headers_in:
- osrf.log.log_debug('read XID from client %s' % apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
- osrf.log.set_xid(apreq.headers_in.get(OSRF_HTTP_HEADER_XID))
- self.local_xid = False
- else:
- osrf.log.make_xid()
- osrf.log.log_debug('created new XID %s' % osrf.log.get_xid())
- self.local_xid = True
-
- if apreq.header_only:
- return
-
- for k,v in apreq.headers_in.iteritems():
- osrf.log.log_internal('HEADER: %s = %s' % (k, v))
-
- try:
- #post = util.parse_qsl(apreq.read(int(apreq.headers_in['Content-length'])))
- post = util.parse_qsl(apreq.read())
- osrf.log.log_debug('post = ' + str(post))
- self.body = [d for d in post if d[0] == 'osrf-msg'][0][1]
- osrf.log.log_debug(self.body)
- except Exception, e:
- osrf.log.log_warn("error parsing osrf message: %s" % unicode(e))
- self.body = None
- return
-
- self.messages = []
- self.complete = False
- self.handle = osrf.net.get_network_handle()
-
- self.recipient = apreq.headers_in.get(OSRF_HTTP_HEADER_TO)
- self.service = apreq.headers_in.get(OSRF_HTTP_HEADER_SERVICE)
- self.thread = apreq.headers_in.get(OSRF_HTTP_HEADER_THREAD) or \
- "%s%s" % (os.getpid(), time.time())
- self.timeout = apreq.headers_in.get(OSRF_HTTP_HEADER_TIMEOUT) or 1200
- self.multipart = str(
- apreq.headers_in.get(OSRF_HTTP_HEADER_MULTIPART)).lower() == 'true'
- self.connect_only = False
- self.disconnect_only = False
-
- # generate a random multipart delimiter
- mpart = md5.new()
- mpart.update("%f%d%d" % (time.time(), os.getpid(), \
- random.randint(100, 10000000)))
- self.delim = mpart.hexdigest()
- self.remote_host = self.apreq.get_remote_host(apache.REMOTE_NOLOOKUP)
- self.cache = osrf.cache.CacheClient()
-
-
-
- def process(self):
-
- if self.apreq.header_only:
- return apache.OK
- if not self.body:
- return apache.HTTP_BAD_REQUEST
- if not self.set_to_addr():
- return apache.HTTP_BAD_REQUEST
- if not self.parse_request():
- return apache.HTTP_BAD_REQUEST
-
- while self.handle.recv(0):
- pass # drop stale messages
-
-
- net_msg = osrf.net.NetworkMessage(
- recipient=self.recipient, thread=self.thread, body=self.body)
- self.handle.send(net_msg)
-
- if self.disconnect_only:
- osrf.log.log_debug("exiting early on DISCONNECT")
- return apache.OK
-
- first_write = True
- while not self.complete:
-
- net_msg = None
- try:
- net_msg = self.handle.recv(self.timeout)
- except osrf.net.XMPPNoRecipient:
- return apache.HTTP_NOT_FOUND
-
- if not net_msg:
- return apache.GATEWAY_TIME_OUT
-
- if not self.check_status(net_msg):
- continue
-
- if first_write:
- self.init_headers(net_msg)
- first_write = False
-
- if self.multipart:
- self.respond_chunk(net_msg)
- if self.connect_only:
- break
- else:
- self.messages.append(net_msg.body)
-
- # condense the sets of arrays into a single array of messages
- if self.complete or self.connect_only:
- json = self.messages.pop(0)
- while len(self.messages) > 0:
- msg = self.messages.pop(0)
- json = "%s,%s" % (json[0:len(json)-1], msg[1:])
-
- self.write("%s" % json)
-
-
- return apache.OK
-
- def parse_request(self):
- '''
- If this is solely a DISCONNECT message, we set self.disconnect_only
- to true
- @return True if the body parses correctly, False otherwise
- '''
- osrf_msgs = osrf.json.to_object(self.body)
- if not osrf_msgs:
- return False
-
- if len(osrf_msgs) == 1:
- if osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_CONNECT:
- self.connect_only = True
- elif osrf_msgs[0].type() == OSRF_MESSAGE_TYPE_DISCONNECT:
- self.disconnect_only = True
-
- for msg in osrf_msgs:
- if msg.type() == OSRF_MESSAGE_TYPE_REQUEST:
- method = msg.payload()
- params = osrf.json.to_json(method.params())
- if len(params) == 2:
- params = ''
- else:
- params = params[1:len(params)-1]
-
- osrf.log.log_activity("[%s] [%s] %s %s %s" % (
- self.remote_host,
- '', # XXX auth token?
- self.service,
- method.method(),
- params
- ))
-
-
- return True
-
-
- def set_to_addr(self):
- ''' Determines the TO address. Returns false if
- the address is missing or ambiguous.
- Also returns false if an explicit TO is specified and the
- thread/IP/TO combination is not found in the session cache
- '''
- if self.service:
- if self.recipient:
- osrf.log.log_warn("specifying both SERVICE and TO is not allowed")
- return False
- self.recipient = "%s@%s/%s" % \
- (ROUTER_NAME, OSRF_DOMAIN, self.service)
- osrf.log.log_debug("set service to %s" % self.recipient)
- return True
- else:
- if self.recipient:
- # If the client specifies a specific TO address, verify it's
- # the same address that was cached with the previous request.
- obj = self.cache.get(self.thread)
- if obj and obj['ip'] == self.remote_host and \
- obj['jid'] == self.recipient:
- self.service = obj['service']
- return True
- osrf.log.log_warn("client [%s] attempted to send directly "
- "[%s] without a session" % (self.remote_host, self.recipient))
- return False
-
-
- def init_headers(self, net_msg):
- osrf.log.log_debug("initializing request headers")
- self.apreq.headers_out[OSRF_HTTP_HEADER_FROM] = net_msg.sender
- self.apreq.headers_out[OSRF_HTTP_HEADER_THREAD] = self.thread
- if self.multipart:
- self.apreq.content_type = MULTIPART_CONTENT_TYPE % self.delim
- self.write("--%s\n" % self.delim)
- else:
- self.apreq.content_type = JSON_CONTENT_TYPE
- self.cache.put(self.thread, \
- {'ip':self.remote_host, 'jid': net_msg.sender, 'service':self.service}, CACHE_TIME)
-
- osrf.log.log_debug("caching session [%s] for host [%s] and server "
- " drone [%s]" % (self.thread, self.remote_host, net_msg.sender))
-
-
-
- def check_status(self, net_msg):
- ''' Checks the status of the server response.
- If we received a timeout message, we drop it.
- if it's any other non-continue status, we mark this session as
- complete and carry on.
- @return False if there is no data to return to the caller
- (dropped message, eg. timeout), True otherwise '''
-
- osrf_msgs = osrf.json.to_object(net_msg.body)
- last_msg = osrf_msgs.pop()
-
- if last_msg.type() == OSRF_MESSAGE_TYPE_STATUS:
- code = int(last_msg.payload().statusCode())
- osrf.log.log_debug("got a status message with code %d" % code)
-
- if code == OSRF_STATUS_TIMEOUT:
- osrf.log.log_debug("removing cached session [%s] and "
- "dropping TIMEOUT message" % net_msg.thread)
- self.cache.delete(net_msg.thread)
- return False
-
- if code != OSRF_STATUS_CONTINUE:
- self.complete = True
-
- return True
-
-
- def respond_chunk(self, resp):
- ''' Writes a single multipart-delimited chunk of data '''
-
- self.write("Content-type: %s\n\n" % JSON_CONTENT_TYPE)
- self.write("%s\n\n" % resp.body)
- if self.complete:
- self.write("--%s--\n" % self.delim)
- else:
- self.write("--%s\n" % self.delim)
- self.apreq.flush()
-
- def write(self, msg):
- ''' Writes data to the client stream. '''
-
- if DEBUG_WRITE:
- sys.stderr.write(msg)
- sys.stderr.flush()
- self.apreq.write(msg)
-
-
+++ /dev/null
-import simplejson
-from osrf.net_obj import NetworkObject, parse_net_object
-from osrf.const import OSRF_JSON_PAYLOAD_KEY, OSRF_JSON_CLASS_KEY
-import osrf.log
-
-try:
- # if available, use the faster cjson module for encoding/decoding JSON
- import cjson
- _use_cjson = True
-except ImportError:
- _use_cjson = False
-
-_use_cjson = False
-
-class NetworkEncoder(simplejson.JSONEncoder):
- ''' Encoder used by simplejson '''
-
- def default(self, obj):
- '''
- Extend the default method offered by simplejson.JSONEncoder
-
- Wraps the Python object into with OpenSRF class / payload keys
- '''
-
- if isinstance(obj, NetworkObject):
- reg = obj.get_registry()
- data = obj.get_data()
-
- # re-encode the object as an array if necessary
- if reg.protocol == 'array':
- objarray = []
- for key in reg.keys:
- objarray.append(data.get(key))
- data = objarray
-
- return {
- OSRF_JSON_CLASS_KEY: reg.hint,
- OSRF_JSON_PAYLOAD_KEY: self.default(data)
- }
- return obj
-
-
-def encode_object(obj):
- ''' Generic opensrf object encoder, used by cjson '''
-
- if isinstance(obj, dict):
- newobj = {}
- for key, val in obj.iteritems():
- newobj[key] = encode_object(val)
- return newobj
-
- elif isinstance(obj, list):
- return [encode_object(v) for v in obj]
-
- elif isinstance(obj, NetworkObject):
- reg = obj.get_registry()
- data = obj.get_data()
-
- if reg.protocol == 'array':
- objarray = []
- for key in reg.keys:
- objarray.append(data.get(key))
- data = objarray
-
- return {
- OSRF_JSON_CLASS_KEY: reg.hint,
- OSRF_JSON_PAYLOAD_KEY: encode_object(data)
- }
-
- return obj
-
-
-
-def to_json(obj):
- """Turns a python object into a wrapped JSON object"""
- if _use_cjson:
- return cjson.encode(encode_object(obj))
- return simplejson.dumps(obj, cls=NetworkEncoder)
-
-
-def to_object(json):
- """Turns a JSON string into python objects"""
- if _use_cjson:
- return parse_net_object(cjson.decode(json))
- return parse_net_object(simplejson.loads(json))
-
-def parse_json_raw(json):
- """Parses JSON the old fashioned way."""
- if _use_cjson:
- return cjson.decode(json)
- return simplejson.loads(json)
-
-def to_json_raw(obj):
- """Stringifies an object as JSON with no additional logic."""
- if _use_cjson:
- return cjson.encode(obj)
- return simplejson.dumps(obj)
-
-def __tabs(depth):
- '''
- Returns a string of spaces-not-tabs for the desired indentation level
-
- >>> print '"' + __tabs(0) + '"'
- ""
- >>> print '"' + __tabs(4) + '"'
- " "
- '''
- space = ' ' * depth
- return space
-
-def debug_net_object(obj, depth=1):
- """Returns a debug string for a given object.
-
- If it's an NetworkObject and has registered keys, key/value pairs
- are returned. Otherwise formatted JSON is returned"""
-
- debug_str = ''
- if isinstance(obj, NetworkObject):
- reg = obj.get_registry()
- keys = list(reg.keys) # clone it, so sorting won't break the original
- keys.sort()
-
- for k in keys:
-
- key = str(k)
- while len(key) < 24:
- key += '.' # pad the names to make the values line up somewhat
- val = getattr(obj, k)()
-
- subobj = val and not (isinstance(val, unicode) or isinstance(val, str) or \
- isinstance(val, int) or isinstance(val, float) or isinstance(val, long))
-
- debug_str += __tabs(depth) + key + ' = '
-
- if subobj:
- debug_str += '\n'
- val = debug_net_object(val, depth+1)
-
- debug_str += str(val)
-
- if not subobj:
- debug_str += '\n'
-
- else:
- osrf.log.log_internal("Pretty-printing NetworkObject")
- debug_str = pprint(to_json(obj))
- return debug_str
-
-def pprint(json):
- """JSON pretty-printer"""
- result = ''
- tab = 0
- instring = False
- inescape = False
- done = False
- eatws = False
-
- for char in json:
-
- if eatws and not _use_cjson: # simplejson adds a pesky space after array and object items
- if char == ' ':
- continue
-
- eatws = False
- done = False
- if (char == '{' or char == '[') and not instring:
- tab += 1
- result += char + '\n' + __tabs(tab)
- done = True
-
- if (char == '}' or char == ']') and not instring:
- tab -= 1
- result += '\n' + __tabs(tab) + char
- done = True
-
- if char == ',' and not instring:
- result += char + '\n' + __tabs(tab)
- done = True
- eatws = True
-
- if char == ':' and not instring:
- eatws = True
-
- if char == '"' and not inescape:
- instring = not instring
-
- if inescape:
- inescape = False
-
- if char == '\\':
- inescape = True
-
- if not done:
- result += char
-
- return result
-
-if __name__ == "__main__":
- import doctest
- doctest.testmod()
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <erickson@esilibrary.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-import traceback, sys, os, re, threading, time
-from osrf.const import OSRF_LOG_DEBUG, OSRF_LOG_ERR, OSRF_LOG_INFO, \
- OSRF_LOG_INTERNAL, OSRF_LOG_TYPE_FILE, OSRF_LOG_TYPE_STDERR, \
- OSRF_LOG_TYPE_SYSLOG, OSRF_LOG_WARN, OSRF_LOG_ACT
-LOG_SEMAPHORE = threading.BoundedSemaphore(value=1)
-
-
-LOG_LEVEL = OSRF_LOG_DEBUG
-LOG_TYPE = OSRF_LOG_TYPE_STDERR
-LOG_FILE = None
-FRGX = re.compile('/.*/')
-
-_xid = '' # the current XID
-_xid_pfx = '' # our XID prefix
-_xid_ctr = 0
-_xid_is_client = False # true if we are the request origin
-
-
-def initialize(level, facility=None, logfile=None, is_client=False, syslog_ident=None):
- """Initialize the logging subsystem."""
- global LOG_LEVEL, LOG_TYPE, LOG_FILE, _xid_is_client
-
- _xid_is_client = is_client
- LOG_LEVEL = level
-
- if facility:
- try:
- import syslog
- except ImportError:
- sys.stderr.write("syslog not found, logging to stderr\n")
- return
-
- LOG_TYPE = OSRF_LOG_TYPE_SYSLOG
- initialize_syslog(facility, syslog_ident)
- return
-
- if logfile:
- LOG_TYPE = OSRF_LOG_TYPE_FILE
- LOG_FILE = logfile
-
-def make_xid():
- global _xid, _xid_pfx, _xid_is_client, _xid_ctr
- if _xid_is_client:
- if not _xid_pfx:
- _xid_pfx = "%s%s" % (time.time(), os.getpid())
- _xid = "%s%d" % (_xid_pfx, _xid_ctr)
- _xid_ctr += 1
-
-def clear_xid():
- global _xid
- _xid = ''
-
-def set_xid(xid):
- global _xid
- _xid = xid
-
-def get_xid():
- return _xid
-
-# -----------------------------------------------------------------------
-# Define wrapper functions for the log levels
-# -----------------------------------------------------------------------
-def log_internal(debug_str):
- __log(OSRF_LOG_INTERNAL, debug_str)
-def log_debug(debug_str):
- __log(OSRF_LOG_DEBUG, debug_str)
-def log_info(debug_str):
- __log(OSRF_LOG_INFO, debug_str)
-def log_warn(debug_str):
- __log(OSRF_LOG_WARN, debug_str)
-def log_error(debug_str):
- __log(OSRF_LOG_ERR, debug_str)
-def log_activity(debug_str):
- __log(OSRF_LOG_ACT, debug_str)
-
-def __log(level, msg):
- """Builds the log message and passes the message off to the logger."""
- global LOG_LEVEL, LOG_TYPE
-
- try:
- import syslog
- except:
- if level == OSRF_LOG_ERR:
- sys.stderr.write('ERR ' + msg)
- return
-
- if int(level) > int(LOG_LEVEL): return
-
- # find the caller info for logging the file and line number
- tb = traceback.extract_stack(limit=3)
- tb = tb[0]
- lvl = 'DEBG'
-
- if level == OSRF_LOG_INTERNAL:
- lvl = 'INT '
- if level == OSRF_LOG_INFO:
- lvl = 'INFO'
- if level == OSRF_LOG_WARN:
- lvl = 'WARN'
- if level == OSRF_LOG_ERR:
- lvl = 'ERR '
- if level == OSRF_LOG_ACT:
- lvl = 'ACT '
-
- filename = FRGX.sub('', tb[0])
- msg = '[%s:%d:%s:%s:%s:%s] %s' % (lvl, os.getpid(), filename, tb[1], threading.currentThread().getName(), _xid, msg)
-
- if LOG_TYPE == OSRF_LOG_TYPE_SYSLOG:
- __log_syslog(level, msg)
- else:
- if LOG_TYPE == OSRF_LOG_TYPE_FILE:
- __log_file(msg)
- else:
- sys.stderr.write("%s\n" % msg)
-
- if level == OSRF_LOG_ERR and LOG_TYPE != OSRF_LOG_TYPE_STDERR:
- sys.stderr.write(msg + '\n')
-
-def __log_syslog(level, msg):
- ''' Logs the message to syslog '''
- import syslog
-
- slvl = syslog.LOG_DEBUG
- if level == OSRF_LOG_INTERNAL:
- slvl = syslog.LOG_DEBUG
- if level == OSRF_LOG_INFO or level == OSRF_LOG_ACT:
- slvl = syslog.LOG_INFO
- if level == OSRF_LOG_WARN:
- slvl = syslog.LOG_WARNING
- if level == OSRF_LOG_ERR:
- slvl = syslog.LOG_ERR
-
- syslog.syslog(slvl, msg)
-
-def __log_file(msg):
- ''' Logs the message to a file. '''
-
- global LOG_TYPE
-
- epoch = time.time()
- timestamp = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(epoch))
- timestamp += '.%s' % str('%0.3f' % epoch)[-3:] # grab the millis
-
- logfile = None
- try:
- logfile = open(LOG_FILE, 'a')
- except:
- sys.stderr.write("cannot open log file for writing: %s\n" % LOG_FILE)
- LOG_TYPE = OSRF_LOG_TYPE_STDERR
- return
- try:
- LOG_SEMAPHORE.acquire()
- logfile.write("%s %s\n" % (timestamp, msg))
- finally:
- LOG_SEMAPHORE.release()
-
- logfile.close()
-
-def initialize_syslog(facility, ident=None):
- """Connect to syslog and set the logmask based on the level provided."""
-
- import syslog
- if not ident:
- ident = sys.argv[0]
-
- if facility == 'local0':
- facility = syslog.LOG_LOCAL0
- if facility == 'local1':
- facility = syslog.LOG_LOCAL1
- if facility == 'local2':
- facility = syslog.LOG_LOCAL2
- if facility == 'local3':
- facility = syslog.LOG_LOCAL3
- if facility == 'local4':
- facility = syslog.LOG_LOCAL4
- if facility == 'local5':
- facility = syslog.LOG_LOCAL5
- if facility == 'local6':
- facility = syslog.LOG_LOCAL6
- if facility == 'local7':
- facility = syslog.LOG_LOCAL7
-
- syslog.openlog(str(ident), 0, facility)
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-
-import os, time, threading
-from pyxmpp.jabber.client import JabberClient
-from pyxmpp.message import Message
-from pyxmpp.jid import JID
-from socket import gethostname
-import libxml2
-import osrf.log, osrf.ex
-
-THREAD_SESSIONS = {}
-
-# - log jabber activity (for future reference)
-#import logging
-#logger=logging.getLogger()
-#logger.addHandler(logging.StreamHandler())
-#logger.addHandler(logging.FileHandler('j.log'))
-#logger.setLevel(logging.DEBUG)
-
-
-
-
-class XMPPNoRecipient(osrf.ex.OSRFException):
- ''' Raised when a message was sent to a non-existent recipient
- The recipient is stored in the 'recipient' field on this object
- '''
- def __init__(self, recipient):
- osrf.ex.OSRFException.__init__(self, 'Error communicating with %s' % recipient)
- self.recipient = recipient
-
-class XMPPNoConnection(osrf.ex.OSRFException):
- pass
-
-def set_network_handle(handle):
- """ Sets the thread-specific network handle"""
- THREAD_SESSIONS[threading.currentThread().getName()] = handle
-
-def get_network_handle():
- """ Returns the thread-specific network connection handle."""
- return THREAD_SESSIONS.get(threading.currentThread().getName())
-
-def clear_network_handle():
- ''' Disconnects the thread-specific handle and discards it '''
- handle = THREAD_SESSIONS.get(threading.currentThread().getName())
- if handle:
- osrf.log.log_internal("clearing network handle %s" % handle.jid.as_utf8())
- del THREAD_SESSIONS[threading.currentThread().getName()]
- return handle
-
-class NetworkMessage(object):
- """Network message
-
- attributes:
-
- sender - message sender
- recipient - message recipient
- body - the body of the message
- thread - the message thread
- locale - locale of the message
- osrf_xid - The logging transaction ID
- """
-
- def __init__(self, message=None, **args):
- if message:
- self.body = message.get_body()
- self.thread = message.get_thread()
- self.recipient = message.get_to()
- self.router_command = None
- self.router_class = None
- if message.xmlnode.hasProp('router_from') and \
- message.xmlnode.prop('router_from') != '':
- self.sender = message.xmlnode.prop('router_from')
- else:
- self.sender = message.get_from().as_utf8()
- if message.xmlnode.hasProp('osrf_xid'):
- self.xid = message.xmlnode.prop('osrf_xid')
- else:
- self.xid = ''
- else:
- self.sender = args.get('sender')
- self.recipient = args.get('recipient')
- self.body = args.get('body')
- self.thread = args.get('thread')
- self.router_command = args.get('router_command')
- self.router_class = args.get('router_class')
- self.xid = osrf.log.get_xid()
-
- @staticmethod
- def from_xml(xml):
- doc = libxml2.parseDoc(xml)
- msg = Message(doc.getRootElement())
- return NetworkMessage(msg)
-
-
- def make_xmpp_msg(self):
- ''' Creates a pyxmpp.message.Message and adds custom attributes '''
-
- msg = Message(None, self.sender, self.recipient, None, None, None, \
- self.body, self.thread)
- if self.router_command:
- msg.xmlnode.newProp('router_command', self.router_command)
- if self.router_class:
- msg.xmlnode.newProp('router_class', self.router_class)
- if self.xid:
- msg.xmlnode.newProp('osrf_xid', self.xid)
- return msg
-
- def to_xml(self):
- ''' Turns this message into XML '''
- return self.make_xmpp_msg().serialize()
-
-
-class Network(JabberClient):
- def __init__(self, **args):
- self.isconnected = False
-
- # Create a unique jabber resource
- resource = args.get('resource') or 'python_client'
- resource += '_' + gethostname() + ':' + str(os.getpid()) + '_' + \
- threading.currentThread().getName().lower()
- self.jid = JID(args['username'], args['host'], resource)
-
- osrf.log.log_debug("initializing network with JID %s and host=%s, "
- "port=%s, username=%s" % (self.jid.as_utf8(), args['host'], \
- args['port'], args['username']))
-
- #initialize the superclass
- JabberClient.__init__(self, self.jid, args['password'], args['host'])
- self.queue = []
-
- self.receive_callback = None
- self.transport_error_msg = None
-
- def connect(self):
- JabberClient.connect(self)
- while not self.isconnected:
- stream = self.get_stream()
- act = stream.loop_iter(10)
- if not act:
- self.idle()
-
- def set_receive_callback(self, func):
- """The callback provided is called when a message is received.
-
- The only argument to the function is the received message. """
- self.receive_callback = func
-
- def session_started(self):
- osrf.log.log_info("Successfully connected to the opensrf network")
- self.authenticated()
- self.stream.set_message_handler("normal", self.message_received)
- self.stream.set_message_handler("error", self.error_received)
- self.isconnected = True
-
- def send(self, message):
- """Sends the provided network message."""
- osrf.log.log_internal("jabber sending to %s: %s" % (message.recipient, message.body))
- message.sender = self.jid.as_utf8()
- msg = message.make_xmpp_msg()
- self.stream.send(msg)
-
- def error_received(self, stanza):
- self.transport_error_msg = NetworkMessage(stanza)
- osrf.log.log_error("XMPP error message received from %s" % self.transport_error_msg.sender)
-
- def message_received(self, stanza):
- """Handler for received messages."""
- if stanza.get_type()=="headline":
- return True
- # check for errors
- osrf.log.log_internal("jabber received message from %s : %s"
- % (stanza.get_from().as_utf8(), stanza.get_body()))
- self.queue.append(NetworkMessage(stanza))
- return True
-
- def stream_closed(self, stream):
- osrf.log.log_debug("XMPP Stream closing...")
-
- def stream_error(self, err):
- osrf.log.log_error("XMPP Stream error: condition: %s %r"
- % (err.get_condition().name,err.serialize()))
-
- def disconnected(self):
- osrf.log.log_internal('XMPP Disconnected')
-
- def recv(self, timeout=120):
- """Attempts to receive a message from the network.
-
- timeout - max number of seconds to wait for a message.
- If a message is received in 'timeout' seconds, the message is passed to
- the receive_callback is called and True is returned. Otherwise, false is
- returned.
- """
-
- forever = False
- if timeout < 0:
- forever = True
- timeout = None
-
- if len(self.queue) == 0:
- while (forever or timeout >= 0) and len(self.queue) == 0:
- starttime = time.time()
-
- stream = self.get_stream()
- if not stream:
- raise XMPPNoConnection('We lost our server connection...')
-
- act = stream.loop_iter(timeout)
- endtime = time.time() - starttime
-
- if not forever:
- timeout -= endtime
-
- osrf.log.log_internal("exiting stream loop after %s seconds. "
- "act=%s, queue size=%d" % (str(endtime), act, len(self.queue)))
-
- if self.transport_error_msg:
- msg = self.transport_error_msg
- self.transport_error_msg = None
- raise XMPPNoRecipient(msg.sender)
-
- if not act:
- self.idle()
-
- # if we've acquired a message, handle it
- msg = None
- if len(self.queue) > 0:
- msg = self.queue.pop(0)
- if self.receive_callback:
- self.receive_callback(msg)
-
- return msg
-
-
- def flush_inbound_data(self):
- ''' Read all pending inbound messages from the socket and discard them '''
- cb = self.receive_callback
- self.receive_callback = None
- while self.recv(0): pass
- self.receive_callback = cb
-
-
-
-
+++ /dev/null
-from osrf.const import OSRF_JSON_PAYLOAD_KEY, OSRF_JSON_CLASS_KEY
-import re
-from xml.sax import saxutils
-
-
-# -----------------------------------------------------------
-# Define the global network-class registry
-# -----------------------------------------------------------
-
-
-class NetworkRegistry(object):
- ''' Network-serializable objects must be registered. The class
- hint maps to a set (ordered in the case of array-base objects)
- of field names (keys).
- '''
-
- # Global object registry
- registry = {}
-
- def __init__(self, hint, keys, protocol):
- self.hint = hint
- self.keys = keys
- self.protocol = protocol
- NetworkRegistry.registry[hint] = self
-
- @staticmethod
- def get_registry(hint):
- return NetworkRegistry.registry.get(hint)
-
-# -----------------------------------------------------------
-# Define the base class for all network-serializable objects
-# -----------------------------------------------------------
-
-class NetworkObject(object):
- ''' Base class for all network serializable objects '''
-
- # link to our registry object for this registered class
- registry = None
-
- def __init__(self, data=None):
- ''' If this is an array, we pull data out of the data array
- (if there is any) and translate that into a hash internally '''
-
- self._data = data
- if not data: self._data = {}
- if isinstance(data, list):
- self.import_array_data(data)
-
- def import_array_data(self, data):
- ''' If an array-based object is created with an array
- of data, cycle through and load the data '''
-
- self._data = {}
- if len(data) == 0:
- return
-
- reg = self.get_registry()
- if reg.protocol == 'array':
- for idx, key in enumerate(reg.keys):
- self._data[key] = data[idx]
-
- def get_data(self):
- ''' Returns the full dataset for this object as a dict '''
- return self._data
-
- def set_field(self, field, value):
- self._data[field] = value
-
- def get_field(self, field):
- return self._data.get(field)
-
- def get_registry(self):
- ''' Returns the registry object for this registered class '''
- return self.__class__.registry
-
- def shallow_clone(self):
- ''' Makes a shallow copy '''
- reg = self.get_registry()
- obj = new_object_from_hint(reg.hint)
- for field in reg.keys:
- obj._data[field] = self._data[field]
- return obj
-
-
-
-def new_object_from_hint(hint):
- ''' Given a hint, this will create a new object of that
- type and return it. If this hint is not registered,
- an object of type NetworkObject.__unknown is returned'''
- try:
- obj = None
- exec('obj = NetworkObject.%s()' % hint)
- return obj
- except AttributeError:
- return NetworkObject.__unknown()
-
-def __make_network_accessor(cls, key):
- ''' Creates and accessor/mutator method for the given class.
- 'key' is the name the method will have and represents
- the field on the object whose data we are accessing '''
- def accessor(self, *args):
- if len(args) != 0:
- self._data[key] = args[0]
- return self._data[key]
- setattr(cls, key, accessor)
-
-
-def register_hint(hint, keys, type='hash'):
- ''' Registers a new network-serializable object class.
-
- 'hint' is the class hint
- 'keys' is the list of field names on the object
- If this is an array-based object, the field names
- must be sorted to reflect the encoding order of the fields
- 'type' is the wire-protocol of the object. hash or array.
- '''
-
- # register the class with the global registry
- registry = NetworkRegistry(hint, keys, type)
-
- # create the new class locally with the given hint name
- exec('class %s(NetworkObject):\n\tpass' % hint)
-
- # give the new registered class a local handle
- cls = None
- exec('cls = %s' % hint)
-
- # assign an accessor/mutator for each field on the object
- for k in keys:
- __make_network_accessor(cls, k)
-
- # attach our new class to the NetworkObject
- # class so others can access it
- setattr(NetworkObject, hint , cls)
- cls.registry = registry
-
-
-
-
-# create a unknown object to handle unregistred types
-register_hint('__unknown', [], 'hash')
-
-# -------------------------------------------------------------------
-# Define the custom object parsing behavior
-# -------------------------------------------------------------------
-def parse_net_object(obj):
-
- if isinstance(obj, dict):
- if OSRF_JSON_CLASS_KEY in obj and OSRF_JSON_PAYLOAD_KEY in obj:
-
- hint = obj[OSRF_JSON_CLASS_KEY]
- sub_object = obj[OSRF_JSON_PAYLOAD_KEY]
- reg = NetworkRegistry.get_registry(hint)
-
- if reg:
-
- obj = {}
-
- if reg.protocol == 'array':
- subobj_len = len(sub_object)
-
- for idx, key in enumerate(reg.keys):
- if idx < subobj_len:
- # don't attempt acces past the end of the data list
- obj[key] = parse_net_object(sub_object[idx])
- else:
- # make sure all keys are accounted for, even if there
- # is no data for the key in the parsed object
- obj[key] = None
- else:
- for key in reg.keys:
- obj[key] = parse_net_object(sub_object.get(key))
-
- # vivicate the network object
- estr = 'obj = NetworkObject.%s(obj)' % hint
- exec(estr)
- return obj
-
- # dict, but not a registered NetworkObject
- for key, value in obj.iteritems():
- obj[key] = parse_net_object(value)
-
- elif isinstance(obj, list):
- for idx, value in enumerate(obj):
- obj[idx] = parse_net_object(value)
-
- return obj
-
-
-def to_xml(obj):
- """ Returns the XML representation of an internal object."""
- chars = []
- __to_xml(obj, chars)
- return ''.join(chars)
-
-def __to_xml(obj, chars):
- """ Turns an internal object into OpenSRF XML """
-
- if obj is None:
- chars.append('<null/>')
- return
-
- if isinstance(obj, unicode) or isinstance(obj, str):
- chars.append('<string>%s</string>' % saxutils.escape(obj))
- return
-
- if isinstance(obj, int) or isinstance(obj, long):
- chars.append('<number>%d</number>' % obj)
- return
-
- if isinstance(obj, float):
- chars.append('<number>%f</number>' % obj)
- return
-
- if isinstance(obj, NetworkObject):
-
- registry = obj.get_registry()
- data = obj.get_data()
- hint = saxutils.escape(registry.hint)
-
- if registry.protocol == 'array':
- chars.append("<array class_hint='%s'>" % hint)
- for key in registry.keys:
- __to_xml(data.get(key), chars)
- chars.append('</array>')
-
- else:
- if registry.protocol == 'hash':
- chars.append("<object class_hint='%s'>" % hint)
- for key, value in data.items():
- chars.append("<element key='%s'>" % saxutils.escape(key))
- __to_xml(value, chars)
- chars.append('</element>')
- chars.append('</object>')
-
-
- if isinstance(obj, list):
- chars.append('<array>')
- for entry in obj:
- __to_xml(entry, chars)
- chars.append('</array>')
- return
-
- if isinstance(obj, dict):
- chars.append('<object>')
- for key, value in obj.items():
- chars.append("<element key='%s'>" % saxutils.escape(key))
- __to_xml(value, chars)
- chars.append('</element>')
- chars.append('</object>')
- return
-
- if isinstance(obj, bool):
- val = 'false'
- if obj:
- val = 'true'
- chars.append("<boolean value='%s'/>" % val)
- return
-
-def find_object_path(obj, path, idx=None):
- """Searches an object along the given path for a value to return.
-
- Path separators can be '/' or '.', '/' is tried first."""
-
- parts = []
-
- if re.search('/', path):
- parts = path.split('/')
- else:
- parts = path.split('.')
-
- for part in parts:
- try:
- val = obj[part]
- except:
- return None
- if isinstance(val, str):
- return val
- if isinstance(val, list):
- if idx != None:
- return val[idx]
- return val
- if isinstance(val, dict):
- obj = val
- else:
- return val
-
- return obj
+++ /dev/null
-"""
-Implements an OpenSRF forking request server
-"""
-# -----------------------------------------------------------------------
-# Copyright (C) 2008-2010 Equinox Software, Inc.
-# Bill Erickson <erickson@esilibrary.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-#
-# You should have received a copy of the GNU General Public License
-# along with this program; if not, write to the Free Software
-# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
-# 02110-1301, USA
-# -----------------------------------------------------------------------
-
-import os, sys, fcntl, socket, errno, signal, time
-import osrf.log, osrf.conf, osrf.net, osrf.system
-import osrf.stack, osrf.app, osrf.const
-
-
-# used to define the size of the PID/size leader in
-# status and data messages passed to and from children
-SIZE_PAD = 12
-
-class Controller(object):
- '''
- OpenSRF forking request server.
- '''
-
- def __init__(self, service):
- '''Initialize the Controller object'''
- self.service = service # service name
- self.max_requests = 0 # max child requests
- self.max_children = 0 # max num of child processes
- self.min_children = 0 # min num of child processes
- self.num_children = 0 # current num children
- self.osrf_handle = None # xmpp handle
- self.routers = [] # list of registered routers
- self.keepalive = 0 # how long to wait for subsequent, stateful requests
- self.active_list = [] # list of active children
- self.idle_list = [] # list of idle children
- self.pid_map = {} # map of pid -> child object for faster access
-
- # Global status socketpair. All children relay their
- # availability info to the parent through this socketpair.
- self.read_status, self.write_status = socket.socketpair()
- self.read_status.setblocking(0)
-
- def cleanup(self):
- ''' Closes management sockets, kills children, reaps children, exits '''
-
- osrf.log.log_info("server: shutting down...")
- self.cleanup_routers()
-
- self.read_status.shutdown(socket.SHUT_RDWR)
- self.write_status.shutdown(socket.SHUT_RDWR)
- self.read_status.close()
- self.write_status.close()
-
- for child in self.idle_list + self.active_list:
- child.read_data.shutdown(socket.SHUT_RDWR)
- child.write_data.shutdown(socket.SHUT_RDWR)
- child.read_data.close()
- child.write_data.close()
- os.kill(child.pid, signal.SIGKILL)
-
- self.reap_children(True)
- os._exit(0)
-
-
- def handle_signals(self):
- ''' Installs SIGINT and SIGTERM handlers '''
-
- def handler(signum, frame):
- ''' Handler implementation '''
- self.cleanup()
-
- signal.signal(signal.SIGINT, handler)
- signal.signal(signal.SIGTERM, handler)
-
-
- def run(self):
- ''' Run the OpenSRF service, spawning and reaping children '''
-
- osrf.net.get_network_handle().disconnect()
- osrf.net.clear_network_handle()
- self.spawn_children()
- self.handle_signals()
-
- # give children a chance to connect before we start taking data
- time.sleep(.5)
- self.osrf_handle = osrf.system.System.net_connect(
- resource = '%s_listener' % self.service,
- service = self.service
- )
-
- # clear the recv callback so inbound messages do not filter
- # through the opensrf stack
- self.osrf_handle.receive_callback = None
-
- # connect to our listening routers
- self.register_routers()
-
- try:
- osrf.log.log_internal("server: entering main server loop...")
-
- while True: # main server loop
-
- self.reap_children()
- self.check_status()
- data = self.osrf_handle.recv(-1).to_xml()
- child = None
-
- if len(self.idle_list) > 0:
- child = self.idle_list.pop()
- self.active_list.append(child)
- osrf.log.log_internal(
- "server: sending data to available child %d" % child.pid
- )
-
- elif self.num_children < self.max_children:
- child = self.spawn_child(True)
- osrf.log.log_internal(
- "server: sending data to new child %d" % child.pid
- )
-
- else:
- osrf.log.log_warn("server: no children available, \
-waiting... consider increasing max_children for this application higher than \
-%d in the OpenSRF configuration if this message occurs frequently" \
- % self.max_children)
- child = self.check_status(True)
-
- self.write_child(child, data)
-
- except KeyboardInterrupt:
- osrf.log.log_info("server: exiting with keyboard interrupt")
-
- except Exception, exc:
- osrf.log.log_error(
- "server: exiting with exception: %s" % exc.message
- )
-
- finally:
- self.cleanup()
-
-
- def write_child(self, child, data):
- ''' Sends data to the child process '''
-
- try:
- child.write_data.sendall(data)
-
- except Exception, ex:
- osrf.log.log_error(
- "server: error sending data to child %d: %s"
- % (child.pid, str(ex))
- )
- self.cleanup_child(child.pid, True)
- return False
-
- return True
-
-
- def check_status(self, wait=False):
- ''' Checks to see if any children have indicated they are done with
- their current request. If wait is true, wait indefinitely
- for a child to be free. '''
-
- ret_child = None
-
- if wait:
- self.read_status.setblocking(1)
-
- while True:
- pid = None
-
- try:
- pid = self.read_status.recv(SIZE_PAD)
-
- except socket.error, exc:
- if exc.args[0] == errno.EAGAIN:
- break # no data left to read in nonblocking mode
-
- osrf.log.log_error(
- "server: child status check failed: %s" % str(exc)
- )
- if not wait or ret_child:
- break
-
- finally:
- if wait and ret_child:
- # we've received a status update from at least
- # 1 child. No need to block anymore.
- self.read_status.setblocking(0)
-
- if pid:
- child = self.pid_map[int(pid)]
- osrf.log.log_internal(
- "server: child process %d reporting for duty" % child.pid
- )
- if wait and ret_child is None:
- # caller is waiting for a free child;
- # leave it in the active list
- ret_child = child
- else:
- self.active_list.remove(child)
- self.idle_list.append(child)
-
- return ret_child
-
-
- def reap_children(self, done=False):
- '''
- Uses waitpid() to reap the children. If necessary, spawns new children.
- '''
-
- options = 0
- if not done:
- options = os.WNOHANG
-
- while True:
- try:
- (pid, status) = os.waitpid(0, options)
- if pid == 0:
- if not done:
- self.spawn_children()
- return
-
- osrf.log.log_internal("server: cleaning up child %d" % pid)
- self.num_children -= 1
- self.cleanup_child(pid)
-
- except OSError:
- return
-
- def cleanup_child(self, pid, kill=False):
- '''
- Removes the child from the active or idle list.
-
- Kills the process if requested.
- '''
-
- if kill:
- os.kill(pid, signal.SIGKILL)
-
- # locate the child in the active or idle list and remove it
- # Note: typically, a dead child will be in the active list, since
- # exiting children do not send a cleanup status to the controller
-
- try:
- self.active_list.pop(self.active_list.index(self.pid_map[pid]))
- except:
- try:
- self.idle_list.pop(self.active_list.index(self.pid_map[pid]))
- except:
- pass
-
- del self.pid_map[pid]
-
-
-
- def spawn_children(self):
- ''' Launches up to min_children child processes '''
- while self.num_children < self.min_children:
- self.spawn_child()
-
- def spawn_child(self, active=False):
- ''' Spawns a new child process '''
-
- child = Child(self)
- child.read_data, child.write_data = socket.socketpair()
- child.pid = os.fork()
- sys.stdin.close()
- sys.stdin = open(os.devnull, 'r')
- sys.stdout.close()
- sys.stdout = open(os.devnull, 'w')
- sys.stderr.close()
- sys.stderr = open(os.devnull, 'w')
-
-
- if child.pid:
- self.num_children += 1
- self.pid_map[child.pid] = child
- if active:
- self.active_list.append(child)
- else:
- self.idle_list.append(child)
- osrf.log.log_internal(
- "server: %s spawned child %d : %d total"
- % (self.service, child.pid, self.num_children)
- )
- return child
- else:
- child.pid = os.getpid()
- child.init()
- child.run()
- osrf.net.get_network_handle().disconnect()
- osrf.log.log_internal("server: child exiting...")
- os._exit(0)
-
- def register_routers(self):
- ''' Registers this application instance with all configured routers '''
- routers = osrf.conf.get('routers.router')
-
- if not isinstance(routers, list):
- routers = [routers]
-
- for router in routers:
- if isinstance(router, dict):
- if not 'services' in router or \
- self.service in router['services']['service']:
- target = "%s@%s/router" % (router['name'], router['domain'])
- self.register_router(target)
- else:
- router_name = osrf.conf.get('router_name')
- target = "%s@%s/router" % (router_name, router)
- self.register_router(target)
-
-
- def register_router(self, target):
- ''' Registers with a single router '''
-
- osrf.log.log_info("server: registering with router %s" % target)
- self.routers.append(target)
-
- reg_msg = osrf.net.NetworkMessage(
- recipient = target,
- body = 'registering...',
- router_command = 'register',
- router_class = self.service
- )
-
- self.osrf_handle.send(reg_msg)
-
- def cleanup_routers(self):
- ''' Un-registers with all connected routers '''
-
- for target in self.routers:
- osrf.log.log_info("server: un-registering with router %s" % target)
- unreg_msg = osrf.net.NetworkMessage(
- recipient = target,
- body = 'un-registering...',
- router_command = 'unregister',
- router_class = self.service
- )
- self.osrf_handle.send(unreg_msg)
-
-
-class Child(object):
- ''' Models a single child process '''
-
- def __init__(self, controller):
- ''' Initializes child process instance '''
-
- # our Controller object
- self.controller = controller
-
- # how many requests we've served so far
- self.num_requests = 0
-
- # the child reads data from the controller on this socket
- self.read_data = None
-
- # the controller sends data to the child on this socket
- self.write_data = None
-
- # my process id
- self.pid = 0
-
- def run(self):
- ''' Loops, processing data, until max_requests is reached '''
-
- while True:
-
- try:
-
- self.read_data.setblocking(1)
- data = ''
-
- while True: # read all the data from the socket
-
- buf = None
- try:
- buf = self.read_data.recv(2048)
- except socket.error, exc:
- if exc.args[0] == errno.EAGAIN:
- break
- osrf.log.log_error(
- "server: child data read failed: %s" % str(exc)
- )
- osrf.app.Application.application.child_exit()
- return
-
- if buf is None or buf == '':
- break
-
- data += buf
- self.read_data.setblocking(0)
-
- osrf.log.log_internal("server: child received message: " + data)
-
- osrf.net.get_network_handle().flush_inbound_data()
- session = osrf.stack.push(
- osrf.net.NetworkMessage.from_xml(data)
- )
- self.keepalive_loop(session)
-
- self.num_requests += 1
-
- osrf.log.log_internal("server: child done processing message")
-
- if self.num_requests == self.controller.max_requests:
- break
-
- # tell the parent we're done w/ this request session
- self.send_status()
-
- except KeyboardInterrupt:
- pass
-
- # run the exit handler
- osrf.app.Application.application.child_exit()
-
- def keepalive_loop(self, session):
- '''
- Keeps session alive while client is connected.
-
- If timeout occurs, session disconnects and gets cleaned up.
- '''
- keepalive = self.controller.keepalive
-
- while session.state == osrf.const.OSRF_APP_SESSION_CONNECTED:
-
- status = session.wait(keepalive)
-
- if session.state == osrf.const.OSRF_APP_SESSION_DISCONNECTED:
- osrf.log.log_internal(
- "server: client sent disconnect, exiting keepalive"
- )
- break
-
- # if no msg received before keepalive timeout expired
- if status is None:
-
- osrf.log.log_info(
- "server: no request was received in %d seconds from %s, "
- "exiting stateful session"
- % (session.remote_id, int(keepalive))
- )
-
- session.send_status(
- session.thread,
- osrf.net_obj.NetworkObject.osrfConnectStatus({
- 'status' : 'Disconnected on timeout',
- 'statusCode': osrf.const.OSRF_STATUS_TIMEOUT
- })
- )
-
- break
-
- session.cleanup()
- return
-
- def send_status(self):
- ''' Informs the controller that we are done processing this request '''
- fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_EX)
- try:
- self.controller.write_status.sendall(str(self.pid).rjust(SIZE_PAD))
- finally:
- fcntl.lockf(self.controller.write_status.fileno(), fcntl.LOCK_UN)
-
- def init(self):
- ''' Connects the opensrf xmpp handle '''
- osrf.net.clear_network_handle()
- osrf.system.System.net_connect(
- resource = '%s_drone' % self.controller.service,
- service = self.controller.service
- )
- osrf.app.Application.application.child_init()
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-import osrf.json, osrf.conf, osrf.log, osrf.net, osrf.net_obj, osrf.const
-from osrf.const import OSRF_APP_SESSION_CONNECTED, \
- OSRF_APP_SESSION_CONNECTING, OSRF_APP_SESSION_DISCONNECTED, \
- OSRF_MESSAGE_TYPE_CONNECT, OSRF_MESSAGE_TYPE_DISCONNECT, \
- OSRF_MESSAGE_TYPE_REQUEST, OSRF_MESSAGE_TYPE_RESULT, OSRF_MESSAGE_TYPE_STATUS
-import osrf.ex
-import random, os, time, threading
-
-
-# -----------------------------------------------------------------------
-# Go ahead and register the common network objects
-# -----------------------------------------------------------------------
-osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload', 'ingress'], 'hash')
-osrf.net_obj.register_hint('osrfMethod', ['method', 'params'], 'hash')
-osrf.net_obj.register_hint('osrfResult', ['status', 'statusCode', 'content'], 'hash')
-osrf.net_obj.register_hint('osrfConnectStatus', ['status', 'statusCode'], 'hash')
-osrf.net_obj.register_hint('osrfMethodException', ['status', 'statusCode'], 'hash')
-
-
-class Session(object):
- """Abstract session superclass."""
-
- ''' Global cache of in-service sessions '''
- session_cache = {}
- current_ingress = 'opensrf';
-
- def __init__(self):
- # by default, we're connected to no one
- self.state = OSRF_APP_SESSION_DISCONNECTED
- self.remote_id = None
- self.locale = None
- self.thread = None
- self.service = None
-
- @staticmethod
- def find_or_create(thread):
- if thread in Session.session_cache:
- return Session.session_cache[thread]
- return ServerSession(thread)
-
- @staticmethod
- def ingress(ingress):
- if ingress:
- Session.current_ingress = ingress
- return Session.current_ingress
-
- def set_remote_id(self, remoteid):
- self.remote_id = remoteid
- osrf.log.log_internal("Setting request remote ID to %s" % self.remote_id)
-
- def wait(self, timeout=120):
- """Wait up to <timeout> seconds for data to arrive on the network"""
- osrf.log.log_internal("Session.wait(%d)" % timeout)
- handle = osrf.net.get_network_handle()
- return handle.recv(timeout)
-
- def send(self, omessages):
- """Sends an OpenSRF message"""
- if not isinstance(omessages, list):
- omessages = [omessages]
-
- for msg in omessages:
- msg.ingress(Session.current_ingress);
-
- net_msg = osrf.net.NetworkMessage(
- recipient = self.remote_id,
- body = osrf.json.to_json(omessages),
- thread = self.thread,
- locale = self.locale,
- )
-
- handle = osrf.net.get_network_handle()
- handle.send(net_msg)
-
- def cleanup(self):
- """Removes the session from the global session cache."""
- del Session.session_cache[self.thread]
-
-class ClientSession(Session):
- """Client session object. Use this to make server requests."""
-
- def __init__(self, service, locale='en-US'):
-
- # call superclass constructor
- Session.__init__(self)
-
- # the service we are sending requests to
- self.service = service
-
- # the locale we want requests to be returned in
- self.locale = locale
-
- # find the remote service handle <router>@<domain>/<service>
- domain = osrf.conf.get('domain', 0)
- router = osrf.conf.get('router_name')
- self.remote_id = "%s@%s/%s" % (router, domain, service)
- self.orig_remote_id = self.remote_id
-
- # generate a random message thread
- self.thread = "%s%s%s%s" % (os.getpid(),
- str(random.randint(100,100000)), str(time.time()),threading.currentThread().getName().lower())
-
- # how many requests this session has taken part in
- self.next_id = 0
-
- # cache of request objects
- self.requests = {}
-
- # cache this session in the global session cache
- Session.session_cache[self.thread] = self
-
- def reset_request_timeout(self, rid):
- req = self.find_request(rid)
- if req:
- req.reset_timeout = True
-
-
- def request2(self, method, arr):
- """Creates a new request and sends the request to the server using a python array as the params."""
- return self.__request(method, arr)
-
- def request(self, method, *args):
- """Creates a new request and sends the request to the server using a variable argument list as params"""
- arr = list(args)
- return self.__request(method, arr)
-
- def __request(self, method, arr):
- """Builds the request object and sends it."""
- if self.state != OSRF_APP_SESSION_CONNECTED:
- self.reset_remote_id()
-
- osrf.log.make_xid()
-
- osrf.log.log_debug("Sending request %s -> %s " % (self.service, method))
- req = ClientRequest(self, self.next_id, method, arr, self.locale)
- self.requests[str(self.next_id)] = req
- self.next_id += 1
- req.send()
- return req
-
-
- def connect(self, timeout=10):
- """Connects to a remote service"""
-
- if self.state == OSRF_APP_SESSION_CONNECTED:
- return True
- self.state = OSRF_APP_SESSION_CONNECTING
-
- # construct and send a CONNECT message
- self.send(
- osrf.net_obj.NetworkObject.osrfMessage(
- { 'threadTrace' : 0,
- 'type' : OSRF_MESSAGE_TYPE_CONNECT
- }
- )
- )
-
- while timeout >= 0 and not self.state == OSRF_APP_SESSION_CONNECTED:
- start = time.time()
- self.wait(timeout)
- timeout -= time.time() - start
-
- if self.state != OSRF_APP_SESSION_CONNECTED:
- raise osrf.ex.OSRFServiceException("Unable to connect to " + self.service)
-
- return True
-
- def disconnect(self):
- """Disconnects from a remote service"""
-
- if self.state == OSRF_APP_SESSION_DISCONNECTED:
- return True
-
- self.send(
- osrf.net_obj.NetworkObject.osrfMessage(
- { 'threadTrace' : 0,
- 'type' : OSRF_MESSAGE_TYPE_DISCONNECT
- }
- )
- )
-
- self.state = OSRF_APP_SESSION_DISCONNECTED
-
-
-
- def reset_remote_id(self):
- """Recovers the original remote id"""
- self.remote_id = self.orig_remote_id
- osrf.log.log_internal("Resetting remote ID to %s" % self.remote_id)
-
- def push_response_queue(self, message):
- """Pushes the message payload onto the response queue
- for the request associated with the message's ID."""
- osrf.log.log_debug("pushing %s" % message.payload())
- try:
- self.find_request(message.threadTrace()).push_response(message.payload())
- except Exception, e:
- osrf.log.log_warn("pushing respond to non-existent request %s : %s" % (message.threadTrace(), e))
-
- def find_request(self, rid):
- """Returns the original request matching this message's threadTrace."""
- try:
- return self.requests[str(rid)]
- except KeyError:
- osrf.log.log_debug('find_request(): non-existent request %s' % str(rid))
- return None
-
- @staticmethod
- def atomic_request(service, method, *args):
- ses = ClientSession(service)
- req = ses.request2(method, list(args))
- resp = req.recv()
- data = None
- if resp:
- data = resp.content()
- req.cleanup()
- ses.cleanup()
- return data
-
-
-
-
-class Request(object):
- def __init__(self, session, rid, method=None, params=[], locale='en-US'):
- self.session = session # my session handle
- self.rid = rid # my unique request ID
- self.method = method # method name
- self.params = params # my method params
- self.locale = locale
- self.complete = False # is this request done?
- self.complete_time = 0 # time at which the request was completed
-
-
-class ClientRequest(Request):
- """Represents a single OpenSRF request.
- A request is made and any resulting respones are
- collected for the client."""
-
- def __init__(self, session, rid, method=None, params=[], locale='en-US'):
- Request.__init__(self, session, rid, method, params, locale)
- self.queue = [] # response queue
- self.reset_timeout = False # resets the recv timeout?
- self.send_time = 0 # local time the request was put on the wire
- self.first_response_time = 0 # time it took for our first reponse to be received
-
- def send(self):
- """Sends a request message"""
-
- # construct the method object message with params and method name
- method = osrf.net_obj.NetworkObject.osrfMethod( {
- 'method' : self.method,
- 'params' : self.params
- } )
-
- # construct the osrf message with our method message embedded
- message = osrf.net_obj.NetworkObject.osrfMessage( {
- 'threadTrace' : self.rid,
- 'type' : OSRF_MESSAGE_TYPE_REQUEST,
- 'payload' : method,
- 'locale' : self.locale
- } )
-
- self.send_time = time.time()
- self.session.send(message)
-
- def recv(self, timeout=120):
- """ Waits up to <timeout> seconds for a response to this request.
-
- If a message is received in time, the response message is returned.
- Returns None otherwise."""
-
- self.session.wait(0)
-
- orig_timeout = timeout
- while not self.complete and (timeout >= 0 or orig_timeout < 0) and len(self.queue) == 0:
-
- s = time.time()
- self.session.wait(timeout)
-
- if self.reset_timeout:
- self.reset_timeout = False
- timeout = orig_timeout
-
- elif orig_timeout >= 0:
- timeout -= time.time() - s
-
- now = time.time()
-
- # -----------------------------------------------------------------
- # log some statistics
- if len(self.queue) > 0:
- if not self.first_response_time:
- self.first_response_time = now
- osrf.log.log_debug("time elapsed before first response: %f" \
- % (self.first_response_time - self.send_time))
-
- if self.complete:
- if not self.complete_time:
- self.complete_time = now
- osrf.log.log_debug("time elapsed before complete: %f" \
- % (self.complete_time - self.send_time))
- # -----------------------------------------------------------------
-
-
- if len(self.queue) > 0:
- # we have a reponse, return it
- return self.queue.pop(0)
-
- return None
-
- def push_response(self, content):
- """Pushes a method response onto this requests response queue."""
- self.queue.append(content)
-
- def cleanup(self):
- """Cleans up request data from the cache.
-
- Do this when you are done with a request to prevent "leaked" cache memory."""
- del self.session.requests[str(self.rid)]
-
- def set_complete(self):
- """Sets me as complete. This means the server has sent a 'request complete' message"""
- self.complete = True
-
-
-class ServerSession(Session):
- """Implements a server-side session"""
-
- def __init__(self, thread):
- Session.__init__(self)
- self.thread = thread
- self.callbacks = {}
- self.session_data = {}
- Session.session_cache[self.thread] = self
-
- def send_status(self, thread_trace, payload):
- self.send(
- osrf.net_obj.NetworkObject.osrfMessage(
- { 'threadTrace' : thread_trace,
- 'type' : osrf.const.OSRF_MESSAGE_TYPE_STATUS,
- 'payload' : payload,
- 'locale' : self.locale
- }
- )
- )
-
- def send_connect_ok(self, thread_trace):
- status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
- 'status' : 'Connection Successful',
- 'statusCode': osrf.const.OSRF_STATUS_OK
- })
- self.send_status(thread_trace, status_msg)
-
- def send_method_not_found(self, thread_trace, method_name):
- status_msg = osrf.net_obj.NetworkObject.osrfConnectStatus({
- 'status' : 'Method [%s] not found for %s' % (method_name, self.service),
- 'statusCode': osrf.const.OSRF_STATUS_NOTFOUND
- })
- self.send_status(thread_trace, status_msg)
-
-
- def run_callback(self, type):
- if type in self.callbacks:
- self.callbacks[type](self)
-
- def register_callback(self, type, func):
- self.callbacks[type] = func
-
- def cleanup(self):
- Session.cleanup(self)
- self.run_callback('death')
-
-
-class ServerRequest(Request):
-
- def __init__(self, session, rid, method, params=[]):
- Request.__init__(self, session, rid, method, params, session.locale)
- self.response_list = []
-
- def _build_response_msg(self, data):
- result = osrf.net_obj.NetworkObject.osrfResult({
- 'content' : data,
- 'statusCode' : osrf.const.OSRF_STATUS_OK,
- 'status' : 'OK'
- })
-
- return osrf.net_obj.NetworkObject.osrfMessage({
- 'threadTrace' : self.rid,
- 'type' : OSRF_MESSAGE_TYPE_RESULT,
- 'payload' : result,
- 'locale' : self.locale
- })
-
- def _build_complete_msg(self):
-
- status = osrf.net_obj.NetworkObject.osrfConnectStatus({
- 'threadTrace' : self.rid,
- 'status' : 'Request Complete',
- 'statusCode': osrf.const.OSRF_STATUS_COMPLETE
- })
-
- return osrf.net_obj.NetworkObject.osrfMessage({
- 'threadTrace' : self.rid,
- 'type' : OSRF_MESSAGE_TYPE_STATUS,
- 'payload' : status,
- 'locale' : self.locale
- })
-
- def respond(self, data):
- ''' For non-atomic calls, this sends a response directly back
- to the client. For atomic calls, this pushes the response
- onto the response list '''
- osrf.log.log_internal("responding with %s" % str(data))
- if self.method.atomic:
- self.response_list.append(data)
- else:
- self.session.send(self._build_response_msg(data))
-
- def respond_complete(self, data):
- ''' Sends a complete message accompanied by the final result if applicable '''
-
- if self.complete:
- return
- self.complete = True
- self.complete_time = time.time()
-
- if self.method.atomic:
- if data is not None:
- self.response_list.append(data)
- self.session.send([
- self._build_response_msg(self.response_list),
- self._build_complete_msg(),
- ])
-
- elif data is not None:
- self.session.send([
- self._build_response_msg(data),
- self._build_complete_msg(),
- ])
-
- else:
- self.session.send(self._build_complete_msg())
-
-
-class MultiSession(object):
- ''' Manages multiple requests. With the current implementation, a 1 second
- lag time before the first response is practically guaranteed. Use
- only for long running requests.
-
- Another approach would be a threaded version, but that would require
- build-up and breakdown of thread-specific xmpp connections somewhere.
- conection pooling?
- '''
- class Container(object):
- def __init__(self, req):
- self.req = req
- self.id = None
-
- def __init__(self):
- self.complete = False
- self.reqs = []
-
- def request(self, service, method, *args):
- ses = ClientSession(service)
- cont = MultiSession.Container(ses.request(method, *args))
- cont.id = len(self.reqs)
- self.reqs.append(cont)
-
- def recv(self, timeout=120):
- ''' Returns a tuple of req_id, response '''
- duration = 0
- block_time = 1
- while True:
- for i in range(0, len(self.reqs)):
- cont = self.reqs[i]
- req = cont.req
-
- res = req.recv(0)
- if i == 0 and not res:
- res = req.recv(block_time)
-
- if res: break
-
- if res: break
-
- duration += block_time
- if duration >= timeout:
- return None
-
- if req.complete:
- self.reqs.pop(self.reqs.index(cont))
-
- if len(self.reqs) == 0:
- self.complete = True
-
- return cont.id, res.content()
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-from osrf.const import OSRF_APP_SETTINGS, OSRF_METHOD_GET_HOST_CONFIG
-import osrf.ex, osrf.net_obj, osrf.ses
-
-# global settings config object
-__config = None
-
-def get(path, idx=0):
- global __config
- val = osrf.net_obj.find_object_path(__config, path, idx)
- if not val:
- raise osrf.ex.OSRFConfigException("Config value not found: " + path)
- return val
-
-
-def load(hostname):
- global __config
-
- ses = osrf.ses.ClientSession(OSRF_APP_SETTINGS)
- req = ses.request(OSRF_METHOD_GET_HOST_CONFIG, hostname)
- resp = req.recv(timeout=30)
- __config = resp.content()
- req.cleanup()
- ses.cleanup()
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-import time
-import osrf.json, osrf.log, osrf.ex, osrf.ses, osrf.const, osrf.app
-
-def push(net_msg):
-
- ses = osrf.ses.Session.find_or_create(net_msg.thread)
- ses.set_remote_id(net_msg.sender)
- if not ses.service:
- ses.service = osrf.app.Application.name
-
- omessages = osrf.json.to_object(net_msg.body)
-
- osrf.log.log_internal("stack.push(): received %d messages" % len(omessages))
-
- # Pass each bundled opensrf message to the message handler
- start = time.time()
- for msg in omessages:
- handle_message(ses, msg)
- duration = time.time() - start
-
- if isinstance(ses, osrf.ses.ServerSession):
- osrf.log.log_info("Message processing duration %f" % duration)
-
- return ses
-
-def handle_message(session, message):
-
- osrf.log.log_internal("handle_message(): processing message of "
- "type %s" % message.type())
-
- osrf.ses.Session.ingress(message.ingress())
-
- if isinstance(session, osrf.ses.ClientSession):
- handle_client(session, message)
- else:
- handle_server(session, message)
-
-
-def handle_client(session, message):
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
- session.push_response_queue(message)
- return
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
-
- status_code = int(message.payload().statusCode())
- status_text = message.payload().status()
- osrf.log.log_internal("handle_message(): processing STATUS, "
- "status_code = %d" % status_code)
-
- if status_code == osrf.const.OSRF_STATUS_COMPLETE:
- # The server has informed us that this request is complete
- req = session.find_request(message.threadTrace())
- if req:
- osrf.log.log_internal("marking request as complete: %d" % req.rid)
- req.set_complete()
- return
-
- if status_code == osrf.const.OSRF_STATUS_OK:
- # We have connected successfully
- osrf.log.log_debug("Successfully connected to " + session.service)
- session.state = osrf.const.OSRF_APP_SESSION_CONNECTED
- return
-
- if status_code == osrf.const.OSRF_STATUS_CONTINUE:
- # server is telling us to reset our wait timeout and keep waiting for a response
- session.reset_request_timeout(message.threadTrace())
- return
-
- if status_code == osrf.const.OSRF_STATUS_TIMEOUT:
- osrf.log.log_debug("The server did not receive a request from us in time...")
- session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
- return
-
- if status_code == osrf.const.OSRF_STATUS_NOTFOUND:
- osrf.log.log_error("Requested method was not found on the server: %s" % status_text)
- session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
- raise osrf.ex.OSRFServiceException(status_text)
-
- if status_code == osrf.const.OSRF_STATUS_INTERNALSERVERERROR:
- raise osrf.ex.OSRFServiceException("Server error %d : %s" % (status_code, status_text))
-
- raise osrf.ex.OSRFProtocolException("Unknown message status: %d" % status_code)
-
-
-def handle_server(session, message):
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_REQUEST:
- osrf.log.log_debug("server received REQUEST from %s" % session.remote_id)
- session.run_callback('pre_request')
- osrf.app.Application.handle_request(session, message)
- session.run_callback('post_request')
- return
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_CONNECT:
- osrf.log.log_debug("server received CONNECT from %s" % session.remote_id)
- session.state = osrf.const.OSRF_APP_SESSION_CONNECTED
- session.send_connect_ok(message.threadTrace())
- return
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_DISCONNECT:
- osrf.log.log_debug("server received DISCONNECT from %s" % session.remote_id)
- session.state = osrf.const.OSRF_APP_SESSION_DISCONNECTED
- session.run_callback('disconnect')
- return
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_STATUS:
- osrf.log.log_debug("server ignoring STATUS from %s" % session.remote_id)
- return
-
- if message.type() == osrf.const.OSRF_MESSAGE_TYPE_RESULT:
- osrf.log.log_debug("server ignoring RESULT from %s" % session.remote_id)
- return
-
-
+++ /dev/null
-# -----------------------------------------------------------------------
-# Copyright (C) 2007 Georgia Public Library Service
-# Bill Erickson <billserickson@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# GNU General Public License for more details.
-# -----------------------------------------------------------------------
-
-import osrf.conf
-from osrf.net import Network, set_network_handle, get_network_handle
-import osrf.stack, osrf.log, osrf.set, osrf.cache
-import sys, os
-
-class System(object):
-
- config_file = None
- config_context = None
-
- @staticmethod
- def net_connect(**kwargs):
- if get_network_handle():
- ''' This thread already has a handle '''
- return
-
- config_file = kwargs.get('config_file') or System.config_file
- config_context = kwargs.get('config_context') or System.config_context
-
- # store the last config file info for later
- System.config_file = config_file
- System.config_context = config_context
-
- # parse the config file
- config_parser = osrf.conf.Config(config_file, config_context)
- config_parser.parse_config()
-
- # set up logging
- osrf.log.initialize(
- osrf.conf.get('loglevel'),
- osrf.conf.get_no_ex('syslog'),
- osrf.conf.get_no_ex('logfile'),
- osrf.conf.get_no_ex('client') == 'true',
- kwargs.get('service'))
-
- # connect to the opensrf network
- network = Network(
- host = osrf.conf.get('domain'),
- port = osrf.conf.get('port'),
- username = osrf.conf.get('username'),
- password = osrf.conf.get('passwd'),
- resource = kwargs.get('resource'))
-
- network.set_receive_callback(osrf.stack.push)
- osrf.net.set_network_handle(network)
- network.connect()
-
- return network
-
- @staticmethod
- def net_disconnect():
- network = osrf.net.get_network_handle()
- network.disconnect()
-
- @staticmethod
- def connect(**kwargs):
- """ Connects to the opensrf network
- Options:
- config_file
- config_context
- connect_cache
- resource
- """
-
- network = System.net_connect(**kwargs)
-
- # load the domain-wide settings file
- osrf.set.load(osrf.conf.get('domain'))
-
- if kwargs.get('connect_cache'):
- System.connect_cache()
-
- return network
-
-
- @staticmethod
- def connect_cache():
- ''' Initializes the cache connections '''
- cache_servers = osrf.set.get('cache.global.servers.server')
- if cache_servers:
- if not isinstance(cache_servers, list):
- cache_servers = [cache_servers]
- if not osrf.cache.CacheClient.get_client():
- osrf.cache.CacheClient.connect(cache_servers)
-
- '''
- @return 0 if child, pid if parent
- '''
- @staticmethod
- def daemonize(parent_exit=True):
- pid = os.fork()
- if pid == 0:
- try:
- os.chdir('/')
- os.setsid()
- sys.stdin.close()
- sys.stdin = open(os.devnull)
- sys.stdout.close()
- sys.stdout = open(os.devnull)
- sys.stderr.close()
- sys.stderr = open(os.devnull)
- except (OSError, ValueError):
- pass
- elif parent_exit:
- os._exit(0)
-
- return pid
-
-
+++ /dev/null
-import xml.dom.minidom
-import osrf.json
-from xml.sax import handler, make_parser, saxutils
-import urllib, re
-
-def xml_file_to_object(filename):
- """Turns the contents of an XML file into a Python object"""
- doc = xml.dom.minidom.parse(filename)
- obj = xml_node_to_object(doc.documentElement)
- doc.unlink()
- return obj
-
-def xml_string_to_object(string):
- """Turns an XML string into a Python object"""
- doc = xml.dom.minidom.parseString(string)
- obj = xml_node_to_object(doc.documentElement)
- doc.unlink()
- return obj
-
-def xml_node_to_object(xml_node):
- """Turns an XML node into a Python object"""
- obj = {}
-
- if xml_node.nodeType != xml_node.ELEMENT_NODE:
- return obj
-
- done = False
- node_name = xml_node.nodeName
-
- for node_child in xml_node.childNodes:
- if node_child.nodeType == xml_node.ELEMENT_NODE:
- sub_obj = xml_node_to_object(node_child)
- __append_child_node(obj, node_name, node_child.nodeName, sub_obj)
- done = True
-
- for attr in xml_node.attributes.values():
- __append_child_node(obj, node_name, attr.name,
- dict([(attr.name, attr.value)]))
-
-
- if not done and len(xml_node.childNodes) > 0:
- # If the node has no element children, clean up the text
- # content and use that as the data
- text_node = xml_node.childNodes[0] # extract the text node
- data = unicode(text_node.nodeValue).replace('^\s*','')
- data = data.replace('\s*$','')
-
- if node_name in obj:
- # the current element contains attributes and text
- obj[node_name]['#text'] = data
- else:
- # the current element contains text only
- obj[node_name] = data
-
- return obj
-
-
-def __append_child_node(obj, node_name, child_name, sub_obj):
- """ If a node has element children, create a new sub-object
- for this node, attach an array for each type of child
- and recursively collect the children data into the array(s) """
-
- if not obj.has_key(node_name):
- obj[node_name] = {}
-
- if not obj[node_name].has_key(child_name):
- # we've encountered 1 sub-node with node_child's name
- if child_name in sub_obj:
- obj[node_name][child_name] = sub_obj[child_name]
- else:
- obj[node_name][child_name] = None
-
- else:
- if isinstance(obj[node_name][child_name], list):
- # we already have multiple sub-nodes with node_child's name
- obj[node_name][child_name].append(sub_obj[child_name])
-
- else:
- # we already have 1 sub-node with node_child's name, make
- # it a list and append the current node
- val = obj[node_name][child_name]
- obj[node_name][child_name] = [ val, sub_obj[child_name] ]
-
-
-
-class XMLFlattener(handler.ContentHandler):
- ''' Turns an XML string into a flattened dictionary of properties.
-
- Example <doc><a><b>text1</b></a><c>text2</c><c>text3</c></doc> becomes
- {
- 'doc.a.b' : 'text1',
- 'doc.c' : ['text2', 'text3']
- }
- '''
-
- reg = re.compile('^\s*$')
- class Handler(handler.ContentHandler):
- def __init__(self):
- self.result = {}
- self.elements = []
- self.use_json = None
-
- def startElement(self, name, attrs):
- self.elements.append(name)
-
- def characters(self, chars):
- text = urllib.unquote_plus(chars)
- if re.match(XMLFlattener.reg, text):
- return
- key = ''
- for elm in self.elements:
- key += elm + '.'
- key = key[:-1]
-
- if key in self.result:
- data = self._decode(self.result[key])
- if isinstance(data, list):
- data.append(text)
- else:
- data = [data, text]
- self.result[key] = self._encode(data)
- else:
- self.result[key] = self._encode(text)
-
-
- def endElement(self, name):
- self.elements.pop()
-
- def _decode(self, string):
- if self.use_json:
- return osrf.json.to_object(string)
- return string
-
- def _encode(self, obj):
- if self.use_json:
- return osrf.json.to_json(obj)
- return obj
-
-
-
- def __init__(self, xml_str, encode_as_json=False):
- self.xml_str = xml_str
- self.use_json = encode_as_json
-
- def parse(self):
- ''' Parses the XML string and returns the dict of keys/values '''
- sax_handler = XMLFlattener.Handler()
- sax_handler.use_json = self.use_json
- parser = make_parser()
- parser.setContentHandler(sax_handler)
- try:
- import StringIO
- parser.parse(StringIO.StringIO(self.xml_str))
- except Exception, e:
- osrf.log.log_error('Error parsing XML: %s' % unicode(e))
- raise e
-
- return sax_handler.result
-
-
-
-
+++ /dev/null
-#!/usr/bin/env python
-
-from setuptools import setup
-
-setup(name='OpenSRF',
- version='3.0.dev0',
- install_requires=[
- 'dnspython', # required by pyxmpp
- 'python-memcached',
- 'pyxmpp>=1.0.0',
- 'simplejson>=1.7.1'
- ],
- dependency_links = [
- "http://pyxmpp.jajcus.net/downloads/",
- "ftp://ftp.tummy.com/pub/python-memcached/python-memcached-latest.tar.gz"
- ],
- description='OpenSRF Python Modules',
- author='Bill Erickson',
- author_email='erickson@esilibrary.com',
- license="GPL",
- url='http://www.open-ils.org/',
- packages=['osrf', 'osrf.apps'],
- scripts=['srfsh.py']
-)
+++ /dev/null
-#!/usr/bin/python
-# vim:et:ts=4
-"""
-srfsh.py - provides a basic shell for issuing OpenSRF requests
-
- help
- - show this menu
-
- math_bench <count>
- - runs <count> opensrf.math requests and prints the average time
-
- request <service> <method> [<param1>, <param2>, ...]
- - performs an opensrf request
- - parameters are JSON strings
-
- router <query>
- - Queries the router. Query options: services service-stats service-nodes
-
- introspect <service> [<api_name_prefix>]
- - List API calls for a service.
- - api_name_prefix is a bare string or JSON string.
-
- set VAR=<value>
- - sets an environment variable
-
- get VAR
- - Returns the value for the environment variable
-
- Environment variables:
- SRFSH_OUTPUT_NET_OBJ_KEYS = true - If a network object is array-encoded and key registry exists for the object type, annotate the object with field names
- = false - Print JSON
-
- SRFSH_OUTPUT_FORMAT_JSON = true - Use JSON pretty printer
- = false - Print raw JSON
-
- SRFSH_OUTPUT_PAGED = true - Paged output. Uses "less -EX"
- = false - Output is not paged
-
- SRFSH_LOCALE = <locale> - request responses to be returned in locale <locale> if available
-
-"""
-import os, sys, time, readline, atexit, re, pydoc, traceback
-import osrf.json, osrf.system, osrf.ses, osrf.conf, osrf.log, osrf.net
-
-class Srfsh(object):
-
-
- def __init__(self, script_file=None):
-
- # used for paging
- self.output_buffer = ''
-
- # true if invoked with a script file
- self.reading_script = False
-
- # multi-request sessions
- self.active_session = None
-
- # default opensrf request timeout
- self.timeout = 120
-
- # map of command name to handler
- self.command_map = {}
-
- if script_file:
- self.open_script(script_file)
- self.reading_script = True
-
- # map of router sub-commands to router API calls
- self.router_command_map = {
- 'services' : 'opensrf.router.info.class.list',
- 'service-stats' : 'opensrf.router.info.stats.class.node.all',
- 'service-nodes' : 'opensrf.router.info.stats.class.all'
- }
-
- # seed the tab completion word bank
- self.tab_complete_words = self.router_command_map.keys() + [
- 'exit',
- 'quit',
- 'opensrf.settings',
- 'opensrf.math',
- 'opensrf.dbmath',
- 'opensrf.py-example'
- ]
-
- # add the default commands
- for command in ['request', 'router', 'help', 'set',
- 'get', 'math_bench', 'introspect', 'connect', 'disconnect' ]:
-
- self.add_command(command = command, handler = getattr(Srfsh, 'handle_' + command))
-
- # for compat w/ srfsh.c
- self.add_command(command = 'open', handler = Srfsh.handle_connect)
- self.add_command(command = 'close', handler = Srfsh.handle_disconnect)
-
- def open_script(self, script_file):
- ''' Opens the script file and redirects the contents to STDIN for reading. '''
-
- try:
- script = open(script_file, 'r')
- os.dup2(script.fileno(), sys.stdin.fileno())
- script.close()
- except Exception, e:
- self.report_error("Error opening script file '%s': %s" % (script_file, str(e)))
- raise e
-
-
- def main_loop(self):
- ''' Main listen loop. '''
-
- self.set_vars()
- self.do_connect()
- self.load_plugins()
- self.setup_readline()
-
- while True:
-
- try:
- self.report("", True)
- line = raw_input("srfsh# ")
-
- if not len(line):
- continue
-
- if re.search('^\s*#', line): # ignore lines starting with #
- continue
-
- if str.lower(line) == 'exit' or str.lower(line) == 'quit':
- break
-
- parts = str.split(line)
- command = parts.pop(0)
-
- if command not in self.command_map:
- self.report("unknown command: '%s'\n" % command)
- continue
-
- self.command_map[command](self, parts)
-
- except EOFError: # ctrl-d
- break
-
- except KeyboardInterrupt: # ctrl-c
- self.report("\n")
-
- except Exception, e:
- self.report("%s\n" % traceback.format_exc())
-
- self.cleanup()
-
- def handle_connect(self, parts):
- ''' Opens a connected session to an opensrf service '''
-
- if len(parts) == 0:
- self.report("usage: connect <service>")
- return
-
- service = parts.pop(0)
-
- if self.active_session:
- if self.active_session['service'] == service:
- return # use the existing active session
- else:
- # currently, we only support one active session at a time
- self.handle_disconnect([self.active_session['service']])
-
- self.active_session = {
- 'ses' : osrf.ses.ClientSession(service, locale = self.__get_locale()),
- 'service' : service
- }
-
- self.active_session['ses'].connect()
-
- def handle_disconnect(self, parts):
- ''' Disconnects the currently active session. '''
-
- if len(parts) == 0:
- self.report("usage: disconnect <service>")
- return
-
- service = parts.pop(0)
-
- if self.active_session:
- if self.active_session['service'] == service:
- self.active_session['ses'].disconnect()
- self.active_session['ses'].cleanup()
- self.active_session = None
- else:
- self.report_error("There is no open connection for service '%s'" % service)
-
- def handle_introspect(self, parts):
- ''' Introspect an opensrf service. '''
-
- if len(parts) == 0:
- self.report("usage: introspect <service> [api_prefix]\n")
- return
-
- service = parts.pop(0)
- args = [service, 'opensrf.system.method']
-
- if len(parts) > 0:
- api_pfx = parts[0]
- if api_pfx[0] != '"': # json-encode if necessary
- api_pfx = '"%s"' % api_pfx
- args.append(api_pfx)
- else:
- args[1] += '.all'
-
- return self.handle_request(args)
-
-
- def handle_router(self, parts):
- ''' Send requests to the router. '''
-
- if len(parts) == 0:
- self.report("usage: router <query>\n")
- return
-
- query = parts[0]
-
- if query not in self.router_command_map:
- self.report("router query options: %s\n" % ','.join(self.router_command_map.keys()))
- return
-
- return self.handle_request(['router', self.router_command_map[query]])
-
- def handle_set(self, parts):
- ''' Set env variables to control srfsh behavior. '''
-
- cmd = "".join(parts)
- pattern = re.compile('(.*)=(.*)').match(cmd)
- key = pattern.group(1)
- val = pattern.group(2)
- self.set_var(key, val)
- self.report("%s = %s\n" % (key, val))
-
- def handle_get(self, parts):
- ''' Returns environment variable value '''
- try:
- self.report("%s=%s\n" % (parts[0], self.get_var(parts[0])))
- except:
- self.report("\n")
-
-
- def handle_help(self, foo):
- ''' Prints help info '''
- self.report(__doc__)
-
- def handle_request(self, parts):
- ''' Performs an OpenSRF request and reports the results. '''
-
- if len(parts) < 2:
- self.report("usage: request <service> <api_name> [<param1>, <param2>, ...]\n")
- return
-
- self.report("\n")
-
- service = parts.pop(0)
- method = parts.pop(0)
- locale = self.__get_locale()
- jstr = '[%s]' % "".join(parts)
- params = None
-
- try:
- params = osrf.json.to_object(jstr)
- except:
- self.report("Error parsing JSON: %s\n" % jstr)
- return
-
- using_active = False
- if self.active_session and self.active_session['service'] == service:
- # if we have an open connection to the same service, use it
- ses = self.active_session['ses']
- using_active = True
- else:
- ses = osrf.ses.ClientSession(service, locale=locale)
-
- start = time.time()
-
- req = ses.request2(method, tuple(params))
-
- last_content = None
- total = 0
- while True:
- resp = None
-
- try:
- resp = req.recv(timeout=self.timeout)
- except osrf.net.XMPPNoRecipient:
- self.report("Unable to communicate with %s\n" % service)
- break
- except osrf.ex.OSRFServiceException, e:
- self.report("Server exception occurred: %s" % e)
- break
-
- total = time.time() - start
-
- if not resp: break
-
- content = resp.content()
-
- if content is not None:
- last_content = content
- if self.get_var('SRFSH_OUTPUT_NET_OBJ_KEYS') == 'true':
- self.report("Received Data: %s\n" % osrf.json.debug_net_object(content))
- else:
- if self.get_var('SRFSH_OUTPUT_FORMAT_JSON') == 'true':
- self.report("Received Data: %s\n" % osrf.json.pprint(osrf.json.to_json(content)))
- else:
- self.report("Received Data: %s\n" % osrf.json.to_json(content))
-
- req.cleanup()
- if not using_active:
- ses.cleanup()
-
- self.report("\n" + '-'*60 + "\n")
- self.report("Total request time: %f\n" % total)
- self.report('-'*60 + "\n")
-
- return last_content
-
-
- def handle_math_bench(self, parts):
- ''' Sends a series of request to the opensrf.math service and collects timing stats. '''
-
- count = int(parts.pop(0))
- ses = osrf.ses.ClientSession('opensrf.math')
- times = []
-
- for cnt in range(100):
- if cnt % 10:
- sys.stdout.write('.')
- else:
- sys.stdout.write( str( cnt / 10 ) )
- print ""
-
- for cnt in range(count):
-
- starttime = time.time()
- req = ses.request('add', 1, 2)
- resp = req.recv(timeout=2)
- endtime = time.time()
-
- if resp.content() == 3:
- sys.stdout.write("+")
- sys.stdout.flush()
- times.append( endtime - starttime )
- else:
- print "What happened? %s" % str(resp.content())
-
- req.cleanup()
- if not ( (cnt + 1) % 100):
- print ' [%d]' % (cnt + 1)
-
- ses.cleanup()
- total = 0
- for cnt in times:
- total += cnt
- print "\naverage time %f" % (total / len(times))
-
-
-
-
- def setup_readline(self):
- ''' Initialize readline history and tab completion. '''
-
- class SrfshCompleter(object):
-
- def __init__(self, words):
- self.words = words
- self.prefix = None
-
- def complete(self, prefix, index):
-
- if prefix != self.prefix:
-
- self.prefix = prefix
-
- # find all words that start with this prefix
- self.matching_words = [
- w for w in self.words if w.startswith(prefix)
- ]
-
- if len(self.matching_words) == 0:
- return None
-
- if len(self.matching_words) == 1:
- return self.matching_words[0]
-
- # re-print the prompt w/ all of the possible word completions
- sys.stdout.write('\n%s\nsrfsh# %s' %
- (' '.join(self.matching_words), readline.get_line_buffer()))
-
- return None
-
- completer = SrfshCompleter(tuple(self.tab_complete_words))
- readline.parse_and_bind("tab: complete")
- readline.set_completer(completer.complete)
-
- histfile = os.path.join(self.get_var('HOME'), ".srfsh_history")
- try:
- readline.read_history_file(histfile)
- except IOError:
- pass
- atexit.register(readline.write_history_file, histfile)
-
- readline.set_completer_delims(readline.get_completer_delims().replace('-',''))
-
-
- def do_connect(self):
- ''' Connects this instance to the OpenSRF network. '''
-
- osrf.ses.Session.ingress('srfsh')
- file = os.path.join(self.get_var('HOME'), ".srfsh.xml")
- osrf.system.System.net_connect(config_file=file, config_context='srfsh')
-
- def add_command(self, **kwargs):
- ''' Adds a new command to the supported srfsh commands.
-
- Command is also added to the tab-completion word bank.
-
- kwargs :
- command : the command name
- handler : reference to a two-argument function.
- Arguments are Srfsh instance and command arguments.
- '''
-
- command = kwargs['command']
- self.command_map[command] = kwargs['handler']
- self.tab_complete_words.append(command)
-
-
- def load_plugins(self):
- ''' Load plugin modules from the srfsh configuration file '''
-
- try:
- plugins = osrf.conf.get('plugins.plugin')
- except:
- return
-
- if not isinstance(plugins, list):
- plugins = [plugins]
-
- for plugin in plugins:
- module = plugin['module']
- init = plugin.get('init', 'load')
- self.report("Loading module %s..." % module, True, True)
-
- try:
- mod = __import__(module, fromlist=' ')
- getattr(mod, init)(self, plugin)
- self.report("OK.\n", True, True)
-
- except Exception, e:
- self.report_error("Error importing plugin '%s' : %s\n" % (module, traceback.format_exc()))
-
- def cleanup(self):
- ''' Disconnects from opensrf. '''
- osrf.system.System.net_disconnect()
-
- def report_error(self, msg):
- ''' Log to stderr. '''
- sys.stderr.write("%s\n" % msg)
- sys.stderr.flush()
-
- def report(self, text, flush=False, no_page=False):
- ''' Logs to the pager or stdout, depending on env vars and context '''
-
- if self.reading_script or no_page or self.get_var('SRFSH_OUTPUT_PAGED') != 'true':
- sys.stdout.write(text)
- if flush:
- sys.stdout.flush()
- else:
- self.output_buffer += text
-
- if flush and self.output_buffer != '':
- pipe = os.popen('less -EX', 'w')
- pipe.write(self.output_buffer)
- pipe.close()
- self.output_buffer = ''
-
- def set_vars(self):
- ''' Set defaults for environment variables. '''
-
- if not self.get_var('SRFSH_OUTPUT_NET_OBJ_KEYS'):
- self.set_var('SRFSH_OUTPUT_NET_OBJ_KEYS', 'false')
-
- if not self.get_var('SRFSH_OUTPUT_FORMAT_JSON'):
- self.set_var('SRFSH_OUTPUT_FORMAT_JSON', 'true')
-
- if not self.get_var('SRFSH_OUTPUT_PAGED'):
- self.set_var('SRFSH_OUTPUT_PAGED', 'true')
-
- # XXX Do we need to differ between LANG and LC_MESSAGES?
- if not self.get_var('SRFSH_LOCALE'):
- self.set_var('SRFSH_LOCALE', self.get_var('LC_ALL'))
-
- def set_var(self, key, val):
- ''' Sets an environment variable's value. '''
- os.environ[key] = val
-
- def get_var(self, key):
- ''' Returns an environment variable's value. '''
- return os.environ.get(key, '')
-
- def __get_locale(self):
- """
- Return the defined locale for this srfsh session.
-
- A locale in OpenSRF is currently defined as a [a-z]{2}-[A-Z]{2} pattern.
- This function munges the LC_ALL setting to conform to that pattern; for
- example, trimming en_CA.UTF-8 to en-CA.
-
- >>> import srfsh
- >>> shell = srfsh.Srfsh()
- >>> shell.set_var('SRFSH_LOCALE', 'zz-ZZ')
- >>> print shell.__get_locale()
- zz-ZZ
- >>> shell.set_var('SRFSH_LOCALE', 'en_CA.UTF-8')
- >>> print shell.__get_locale()
- en-CA
- """
-
- env_locale = self.get_var('SRFSH_LOCALE')
- if env_locale:
- pattern = re.compile(r'^\s*([a-z]+)[^a-zA-Z]([A-Z]+)').search(env_locale)
- lang = pattern.group(1)
- region = pattern.group(2)
- locale = "%s-%s" % (lang, region)
- else:
- locale = 'en-US'
-
- return locale
-
-if __name__ == '__main__':
- script = sys.argv[1] if len(sys.argv) > 1 else None
- Srfsh(script).main_loop()
-
+++ /dev/null
-"""
-Unit tests for the osrf.json module
-"""
-
-import sys, os
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
-
-import osrf.json, osrf.net_obj, unittest
-
-class TestObject(object):
- """Test object with basic JSON structures"""
- def __init__(self):
- self.int = 1
- self.string = "two"
- self.array = [1,2,3,4]
- self.dict = {'foo': 'bar', 'key': 'value'}
- self.true = True
- self.false = False
- self.null = None
-
-class CheckObjectToJSON(unittest.TestCase):
- """Tests the osrf.json.to_json() method that converts Python objects into JSON"""
- def setUp(self):
- self.testo = TestObject()
-
- def test_int(self):
- test_json = osrf.json.to_json(self.testo.int)
- self.assertEqual(test_json, '1')
-
- def test_string(self):
- test_json = osrf.json.to_json(self.testo.string)
- self.assertEqual(test_json, '"two"')
-
- def test_array(self):
- test_json = osrf.json.to_json(self.testo.array)
- self.assertEqual(test_json, '[1, 2, 3, 4]')
-
- def test_dict(self):
- test_json = osrf.json.to_json(self.testo.dict)
- self.assertEqual(test_json, '{"foo": "bar", "key": "value"}')
-
- def test_true(self):
- test_json = osrf.json.to_json(self.testo.true)
- self.assertEqual(test_json, 'true')
-
- def test_false(self):
- test_json = osrf.json.to_json(self.testo.false)
- self.assertEqual(test_json, 'false')
-
- def test_null(self):
- test_json = osrf.json.to_json(self.testo.null)
- self.assertEqual(test_json, 'null')
-
-class CheckJSONToObject(unittest.TestCase):
- """Tests that the osrf.json.to_object() method converts JSON into Python objects"""
-
- def setUp(self):
- self.testo = TestObject()
-
- def test_int(self):
- test_json = osrf.json.to_object('1')
- self.assertEqual(test_json, self.testo.int)
-
- def test_string(self):
- test_json = osrf.json.to_object('"two"')
- self.assertEqual(test_json, self.testo.string)
-
- def test_array(self):
- test_json = osrf.json.to_object('[1, 2, 3, 4]')
- self.assertEqual(test_json, self.testo.array)
-
- def test_dict(self):
- test_json = osrf.json.to_object('{"foo": "bar", "key": "value"}')
- self.assertEqual(test_json, self.testo.dict)
-
- def test_true(self):
- test_json = osrf.json.to_object('true')
- self.assertEqual(test_json, self.testo.true)
-
- def test_false(self):
- test_json = osrf.json.to_object('false')
- self.assertEqual(test_json, self.testo.false)
-
- def test_null(self):
- test_json = osrf.json.to_object('null')
- self.assertEqual(test_json, self.testo.null)
-
-if __name__ == '__main__':
- unittest.main()
+++ /dev/null
-"""
-Unit tests for the osrf.net_obj module
-"""
-
-import sys, os
-sys.path.insert(0, os.path.join(os.path.dirname(__file__), '..'))
-
-import osrf.json, osrf.net_obj, unittest
-
-class TestObject(object):
- """Test object with basic JSON structures"""
- def __init__(self):
- self.int = 1
- self.string = "two"
- self.array = [1,2,3,4]
- self.dict = {'foo': 'bar', 'key': 'value'}
- self.true = True
- self.false = False
- self.null = None
-
-class CheckNetworkEncoder(unittest.TestCase):
- """Tests the NetworkEncoder JSON encoding extension"""
-
- def setUp(self):
- osrf.net_obj.register_hint('osrfMessage', ['threadTrace', 'locale', 'type', 'payload'], 'hash')
- self.testo = TestObject()
- self.ne = osrf.json.NetworkEncoder()
-
- def test_connect(self):
- test_json = self.ne.default(
- osrf.net_obj.NetworkObject.osrfMessage({
- 'threadTrace' : 0,
- 'type' : "CONNECT"
- }
- )
- )
- self.assertEqual(test_json, {'__p':
- {'threadTrace': 0, 'type': 'CONNECT'},
- '__c': 'osrfMessage'}
- )
-
- def test_connect_array(self):
- test_json = self.ne.default(
- osrf.net_obj.NetworkObject.osrfMessage({
- 'threadTrace' : 0,
- 'type' : "CONNECT",
- 'protocol' : "array"
- }
- )
- )
- self.assertEqual(test_json, {'__p':
- {'threadTrace': 0, 'protocol': 'array', 'type': 'CONNECT'},
- '__c': 'osrfMessage'}
- )
-
- def test_connect_to_xml(self):
- test_json = self.ne.default(
- osrf.net_obj.NetworkObject.osrfMessage({
- 'threadTrace' : 0,
- 'type' : "CONNECT"
- }
- )
- )
- self.assertEqual(
- osrf.net_obj.to_xml(test_json),
- "<object><element key='__p'><object><element key='threadTrace'>"
- "<number>0</number></element><element key='type'>"
- "<string>CONNECT</string></element></object></element>"
- "<element key='__c'><string>osrfMessage</string></element></object>"
- )
-
-
-if __name__ == '__main__':
- unittest.main()
-
+++ /dev/null
-"""
-Trigger nosetests to give us a complete coverage statement
-
-nosetests only reports on the modules that are imported in
-the tests it finds; this file serves as a placeholder until
-we actually provide unit test coverage of the core files.
-"""
-
-import osrf.app
-import osrf.cache
-import osrf.conf
-import osrf.const
-import osrf.ex
-import osrf.gateway
-# Triggers an exception if mod_python is not installed
-#import osrf.http_translator
-import osrf.json
-import osrf.log
-import osrf.net_obj
-import osrf.net
-import osrf.server
-import osrf.ses
-import osrf.set
-import osrf.stack
-import osrf.system
-import osrf.xml_obj
-import unittest
-
-if __name__ == '__main__':
- unittest.main()