+test-driver
aclocal.m4
autom4te.cache/
bin/opensrf-perl.pl
export PREFIX = @prefix@
export TMP = @TMP@
export LIBXML2_HEADERS = @LIBXML2_HEADERS@
+export HIREDIS_HEADERS = @HIREDIS_HEADERS@
export APR_HEADERS = @APR_HEADERS@
export ETCDIR = @sysconfdir@
export APXS2 = @APXS2@
export APACHE2_HEADERS = @APACHE2_HEADERS@
-export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@
+export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@
export DEF_LDLIBS = -lopensrf
export VAR = @localstatedir@
export PID = @localstatedir@/run/opensrf
--- /dev/null
+# OpenSRF-Over-Redis
+
+Proof of concept project to replace XMPP / Ejabberd with Redis as the
+OpenSRF message transport layer.
+
+## Install
+
+### Install Redis version 6.x for ACL support.
+
+NOTE: Redis v6 is the default version in Ubuntu 22.04
+
+```sh
+
+curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
+
+echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" \
+ | sudo tee /etc/apt/sources.list.d/redis.list
+
+sudo apt update
+sudo apt install redis-server libredis-perl libhiredis-dev
+
+```
+
+### Disable Redis Snapshot Disk Persistence
+
+Optional but recommended since disk persistence adds unnecessary overhead.
+
+#### Edit /etc/redis/redis.conf and un-comment the <code># save ""</code> line:
+
+```conf
+# Snapshotting can be completely disabled with a single empty string argument
+# as in following example:
+#
+save ""
+#
+```
+
+#### Restart Redis
+
+```sh
+sudo systemctl restart redis
+```
+
+### Install OpenSRF Config and Initialize/Reset Message Bus
+
+```sh
+# mileage may vary on these commands
+
+sudo su opensrf
+
+# Backup the original config
+mv /openils/conf/opensrf_core.xml /openils/conf/opensrf_core.xml.orig
+
+# From the OpenSRF repository root directory.
+# Copy and modify the new config as needed.
+# TODO move this example config into Evergreen since it references EG services.
+cp examples/opensrf_core.xml.example /openils/conf/opensrf_core.xml
+
+# reset/init message bus
+osrf_control -l --reset-message-bus
+
+```
use OpenSRF::Transport::Listener;
use OpenSRF::Utils;
use OpenSRF::Utils::Config;
+use Redis;
my $opt_service = undef;
my $opt_config = "@CONF_DIR@/opensrf_core.xml";
my $opt_quiet = 0;
my $opt_diagnostic = 0;
my $opt_ignore_orphans = 0;
+my $opt_reset_message_bus = 0;
my $sclient;
my @perl_services;
my @nonperl_services;
'reload' => \$opt_reload,
'reload-all' => \$opt_reload_all,
'diagnostic' => \$opt_diagnostic,
+ 'reset-message-bus' => \$opt_reset_message_bus,
'ignore-orphans' => \$opt_ignore_orphans
);
$ENV{OSRF_HOSTNAME} = $hostname;
}
-my $C_COMMAND = "opensrf-c -c $opt_config -x opensrf -p $opt_pid_dir -h $hostname";
+my $C_COMMAND = "opensrf-c -c $opt_config -x service -p $opt_pid_dir -h $hostname";
sub verify_services {
my $service = shift;
sub do_start_router {
+ return;
my $pidfile = get_pid_file('router');
`opensrf_router $opt_config routers $pidfile`;
sub do_start_all {
msg("starting router and services for $hostname");
- do_start('router');
+ #do_start('router');
return do_start_services();
}
# graceful shutdown requires the presence of the router, so stop the
# router last. See if it's running first to avoid unnecessary warnings.
- do_stop('router', $signals[0]) if get_service_pids_from_file('router');
+ #do_stop('router', $signals[0]) if get_service_pids_from_file('router');
return 1;
}
# parses the local settings file
sub load_settings {
my $conf = OpenSRF::Utils::Config->current;
- my $cfile = $conf->bootstrap->settings_config;
+ my $cfile = $conf->as_hash->{settings_config};
return unless $cfile;
my $parser = OpenSRF::Utils::SettingsParser->new();
$parser->initialize( $cfile );
$parser->get_server_config($conf->env->hostname);
}
+# Clear the service: and client: queues of lingering messages. Apply
+# bus access and permissions for accounts defined in the <connections/>
+# block. This will generally be called before we connect to the system,
+# since the system may not yet have access to connect to the bus.
+sub do_reset_message_bus {
+
+ my $base_conf =
+ OpenSRF::Utils::Config->load(config_file => $opt_config, nocache => 1)->as_hash
+ or die "No suitable configuration found at $opt_config\n";
+
+ my $connections = $base_conf->{connections} or
+ die "No connection configuration found in $opt_config\n";
+
+ my $redis;
+
+ for my $type (keys %$connections) {
+ my $conf = $connections->{$type};
+ my $bus_conf = $conf->{message_bus};
+
+ if (!$redis) {
+ # Though connection details may vary by connection, we
+ # assume all connections in a single bootstratp config
+ # ultimately point to the same Redis instance.
+ my $host = $bus_conf->{host};
+ my $port = $bus_conf->{port};
+ my $sock = $bus_conf->{sock};
+
+ # This redis connection uses the "default" account, which has
+ # access to all actions and keys so it can manage access for
+ # various opensrf account types.
+ my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port");
+
+ $redis = Redis->new(@connect_args) or
+ die "Cannot connect to Redis instance at @connect_args\n";
+
+ # Clear all the data
+ msg("Clearing all data from message bus: @connect_args");
+ $redis->flushall;
+ }
+
+ msg("Applying bus access for connection type: $type");
+
+ my $username = $bus_conf->{username};
+ my $password = $bus_conf->{password};
+
+ $redis->acl('SETUSER', $username, 'reset');
+ $redis->acl('SETUSER', $username, 'on', ">$password");
+
+ # Privileged clients can post message to all services and perform
+ # delete operations to clear their own client: enpoints from
+ # (i.e. flush socket).
+ my $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:* ~service:*';
+
+ if (!$conf->{privileged}) {
+ msg("Limiting bus access for non-privileged connection type=$type");
+
+ # Unprivileged connections have access toa subset of the
+ # available (i.e. public) services.
+ $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:*';
+
+ # Non-privileged connections can only interact with public
+ # service bus endpoints.
+ for my $service (@{$base_conf->{public_services}->{service}}) {
+ $redis->acl('SETUSER', $username, "~service:$service");
+ }
+ }
+
+ $redis->acl('SETUSER', $username, split(/ /, $permissions));
+
+ }
+
+ $redis->quit;
+}
+
sub msg {
my $m = shift;
print "* $m\n" unless $opt_quiet;
and gracefully re-launch drone processes. The -all variant sends
the signal to all services. The non-(-all) variant requires a
--service.
+
+ --reset-message-bus
+ Clear ALL data from the message bus, create opensrf accounts w/
+ permission to access message queues.
HELP
exit;
}
+do_reset_message_bus() if $opt_reset_message_bus;
+
# we do not verify services for stop/signal actions, since those may
# legitimately be used against services not (or no longer) configured
# to run on the selected host.
# show help if no action was requested
do_help() if $opt_help or not (
+ $opt_reset_message_bus or
$opt_start or
$opt_start_all or
$opt_start_services or
[LIBXML2_HEADERS=/usr/include/libxml2/])
AC_SUBST([LIBXML2_HEADERS])
+AC_ARG_WITH([hiredis],
+[ --with-hiredis=path location of the hiredis headers (default is /usr/include/hiredis/))],
+[HIREDIS_HEADERS=${withval}],
+[HIREDIS_HEADERS=/usr/include/hiredis/])
+AC_SUBST([HIREDIS_HEADERS])
+
AC_ARG_WITH([includes],
[ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
[EXTRA_USER_INCLUDES=${withval}])
AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers))
AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
+ AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
# Check for libmemcached and set flags accordingly
PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
AC_SUBST(memcached_CFLAGS)
src/libopensrf/Makefile
src/perl/Makefile
src/ports/strn_compat/Makefile
- src/router/Makefile
src/srfsh/Makefile
src/websocket-stdio/Makefile
tests/Makefile
AC_MSG_RESULT(APR headers location: ${APR_HEADERS})
AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION})
AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS})
+ AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS})
AC_MSG_RESULT([----------------------------------------------------------------------])
<?xml version="1.0"?>
-<!--
-vim:et:ts=2:sw=2:
--->
<config>
- <!-- bootstrap config for OpenSRF apps -->
- <opensrf>
-
- <routers>
-
- <!-- define the list of routers our services will register with -->
-
- <router>
-
- <!-- This is the public router. On this router, we only register applications
- which should be accessible to everyone on the opensrf network -->
- <name>router</name>
- <domain>public.localhost</domain>
- <services>
- <service>opensrf.math</service>
- </services>
- </router>
-
- <router>
- <!-- This is the private router. All applications must register with
- this router, so no explicit <services> section is required -->
- <name>router</name>
- <domain>private.localhost</domain>
- </router>
- </routers>
-
-
- <!-- Jabber login settings
- Our domain should match that of the private router -->
- <domain>private.localhost</domain>
- <username>opensrf</username>
- <passwd>password</passwd>
- <port>5222</port>
- <!-- name of the router used on our private domain.
- this should match one of the <name> of the private router above -->
- <router_name>router</router_name>
-
- <!-- Log a warning when an outbound message reaches this size in bytes -->
- <msg_size_warn>1800000</msg_size_warn>
-
- <!-- log file settings ====================================== -->
- <!-- log to a local file -->
- <logfile>LOCALSTATEDIR/log/osrfsys.log</logfile>
-
- <!-- Log to syslog. You can use this same layout for
- defining the logging of all services in this file -->
- <!--
- <logfile>syslog</logfile>
- <syslog>local2</syslog>
- <actlog>local1</actlog>
- -->
- <!-- Optional log tag. You can use this to help distinguish
- syslog entries when running multiple OpenSRF stacks on the
- same server. -->
- <!--
- <logtag>instance1</logtag>
- -->
-
- <!-- 0 None, 1 Error, 2 Warning, 3 Info, 4 debug, 5 Internal (Nasty) -->
- <loglevel>3</loglevel>
-
- <!-- Maximum log message length; if using syslog, you might need to adjust
- your syslog service's configuration to match longer message lengths -->
- <loglength>1536</loglength>
-
- <!-- config file for the services -->
- <settings_config>SYSCONFDIR/opensrf.xml</settings_config>
-
- </opensrf>
-
- <!-- The section between <gateway>...</gateway> is a standard OpenSRF C stack config file -->
- <gateway>
-
- <!--
- we consider ourselves to be the "originating" client for requests,
- which means we define the log XID string for log traces
- -->
- <client>true</client>
-
- <!-- the routers's name on the network -->
- <router_name>router</router_name>
-
- <!-- jabber login info -->
- <!-- The gateway connects to the public domain -->
- <domain>public.localhost</domain>
- <username>opensrf</username>
- <passwd>password</passwd>
- <port>5222</port>
- <logfile>LOCALSTATEDIR/log/gateway.log</logfile>
- <loglevel>3</loglevel>
-
- <!-- cross origin HTTP settings http://en.wikipedia.org/wiki/Cross-origin_resource_sharing -->
- <cross_origin>
- <!-- specify individual hosts -->
- <!-- <origin>example.com</origin> -->
- <!-- ...or use the * wildcard to match all -->
- <!-- <origin>*</origin> -->
- </cross_origin>
-
- </gateway>
-
- <!-- ======================================================================================== -->
-
- <routers>
- <router> <!-- public router -->
- <trusted_domains>
- <!-- allow private services to register with this router
- and public clients to send requests to this router. -->
- <server>private.localhost</server>
- <!-- also allow private clients to send to the router so it can receive error messages -->
- <client>private.localhost</client>
- <client>public.localhost</client>
- </trusted_domains>
- <transport>
- <server>public.localhost</server>
- <port>5222</port>
- <unixpath>LOCALSTATEDIR/sock/unix_sock</unixpath>
- <username>router</username>
- <password>password</password>
- <resource>router</resource>
- <connect_timeout>10</connect_timeout>
- <max_reconnect_attempts>5</max_reconnect_attempts>
- </transport>
- <logfile>LOCALSTATEDIR/log/router.log</logfile>
- <!--
- <logfile>syslog</logfile>
- <syslog>local2</syslog>
- -->
- <!--
- <logtag>instance1</logtag>
- -->
- <loglevel>2</loglevel>
- </router>
- <router> <!-- private router -->
- <trusted_domains>
- <server>private.localhost</server>
- <!-- only clients on the private domain can send requests to this router -->
- <client>private.localhost</client>
- </trusted_domains>
- <transport>
- <server>private.localhost</server>
- <port>5222</port>
- <username>router</username>
- <password>password</password>
- <resource>router</resource>
- <connect_timeout>10</connect_timeout>
- <max_reconnect_attempts>5</max_reconnect_attempts>
- </transport>
- <logfile>LOCALSTATEDIR/log/router.log</logfile>
- <!--
- <logfile>syslog</logfile>
- <syslog>local2</syslog>
- -->
- <!--
- <logtag>instance1</logtag>
- -->
- <loglevel>4</loglevel>
- </router>
- </routers>
-
- <!-- ======================================================================================== -->
-
- <shared>
- <!-- Any methods which match any of these match_string node values will
- have their params redacted from lower-level input logging.
- Adjust these examples as needed. -->
- <log_protect>
- <!--
- <match_string>open-ils.auth</match_string>
- <match_string>open-ils.some_service.some_method</match_string>
- -->
- </log_protect>
- </shared>
+ <connections>
+ <service>
+
+ <!--
+ Can this connection talk to private services?
+ -->
+ <privileged>true</privileged>
+
+ <message_bus>
+ <host>127.0.0.1</host>
+ <port>6379</port>
+ <username>opensrf@private</username>
+ <password>password</password>
+
+ <!--
+ Max number of un-acknowledged messages Redis will retain in the
+ message stream before discarding the oldest. This value needs
+ to be *something* or queues will grow unbounded, service-level
+ request queues especially.
+ -->
+ <max_queue_size>1000</max_queue_size>
+
+ </message_bus>
+
+ <logfile>syslog</logfile>
+ <syslog>local0</syslog>
+ <actlog>local1</actlog>
+ <loglevel>3</loglevel>
+ </service>
+
+ <gateway>
+ <message_bus>
+ <host>127.0.0.1</host>
+ <port>6379</port>
+ <username>opensrf@public</username>
+ <password>password</password>
+ </message_bus>
+ <client>true</client>
+ <logfile>syslog</logfile>
+ <syslog>local6</syslog>
+ <actlog>local1</actlog>
+ <loglevel>3</loglevel>
+ </gateway>
+ </connections>
+
+ <settings_config>/openils/conf/opensrf.xml</settings_config>
+
+ <public_services>
+ <service>opensrf.math</service>
+ <service>open-ils.actor</service>
+ <service>open-ils.acq</service>
+ <service>open-ils.auth</service>
+ <service>open-ils.auth_proxy</service>
+ <service>open-ils.booking</service>
+ <service>open-ils.cat</service>
+ <service>open-ils.circ</service>
+ <service>open-ils.collections</service>
+ <service>open-ils.courses</service>
+ <service>open-ils.curbside</service>
+ <service>open-ils.fielder</service>
+ <service>open-ils.pcrud</service>
+ <service>open-ils.permacrud</service>
+ <service>open-ils.reporter</service>
+ <service>open-ils.resolver</service>
+ <service>open-ils.search</service>
+ <service>open-ils.supercat</service>
+ <service>open-ils.url_verify</service>
+ <service>open-ils.vandelay</service>
+ <service>open-ils.serial</service>
+ <service>open-ils.ebook_api</service>
+ </public_services>
+
+ <log_protect>
+ <match_string>open-ils.auth.authenticate.verify</match_string>
+ <match_string>open-ils.auth.authenticate.complete</match_string>
+ <match_string>open-ils.auth.login</match_string>
+ <match_string>open-ils.auth_proxy.login</match_string>
+ <match_string>open-ils.actor.patron.password_reset.commit</match_string>
+ <match_string>open-ils.actor.user.password</match_string>
+ <match_string>open-ils.actor.user.username</match_string>
+ <match_string>open-ils.actor.user.email</match_string>
+ <match_string>open-ils.actor.patron.update</match_string>
+ <match_string>open-ils.cstore.direct.actor.user.create</match_string>
+ <match_string>open-ils.cstore.direct.actor.user.update</match_string>
+ <match_string>open-ils.cstore.direct.actor.user.delete</match_string>
+ <match_string>open-ils.search.z3950.apply_credentials</match_string>
+ <match_string>open-ils.geo</match_string>
+ <match_string>open-ils.actor.geo</match_string>
+ </log_protect>
</config>
void osrfSystemSetPidFile( const char* name );
-int osrf_system_bootstrap_client( char* config_file, char* contextnode );
+int osrf_system_bootstrap_common(const char* config_file,
+ const char* contextnode, const char* appname, int is_service);
-int osrfSystemBootstrapClientResc( const char* config_file,
- const char* contextnode, const char* resource );
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode);
+
+int osrf_system_bootstrap_service(
+ const char* config_file, const char* contextnode, const char* appname);
+
+int osrfSystemBootstrapClientResc(const char* config_file,
+ const char* contextnode, const char* appname);
int osrfSystemBootstrap( const char* hostname, const char* configfile,
const char* contextNode );
*/
#include <time.h>
+#include <hiredis.h>
#include <opensrf/transport_session.h>
#include <opensrf/utils.h>
#include <opensrf/log.h>
struct transport_client_struct {
transport_message* msg_q_head; /**< Head of message queue */
transport_message* msg_q_tail; /**< Tail of message queue */
- transport_session* session; /**< Manages lower-level message processing */
+ redisContext* bus;
+
+ // Our communication stream.
+ // This will be unique for all connections except service-level
+ // (Listener) connections.
+ char* stream_name;
+
+ // Our unique name.
+ // Will match the unique stream_name for non-service-level connections.
+ char* consumer_name;
+
+ int max_queue_size;
+
+ int port;
+ char* unix_path;
int error; /**< Boolean: true if an error has occurred */
char* host; /**< Domain name or IP address of the Jabber server */
char* xmpp_id; /**< Jabber ID used for outgoing messages */
};
typedef struct transport_client_struct transport_client;
-transport_client* client_init( const char* server, int port, const char* unix_path, int component );
+transport_client* client_init( const char* server, int port, const char* unix_path );
-int client_connect( transport_client* client,
- const char* username, const char* password, const char* resource,
- int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type );
+int client_connect_with_stream_name(transport_client* client, const char* username, const char* password);
+int client_connect_as_service(transport_client* client,
+ const char* appname, const char* username, const char* password);
+int client_connect(transport_client* client,
+ const char* appname, const char* username, const char* password);
int client_disconnect( transport_client* client );
int error_code; /**< Value of the "code" attribute of <error>. */
int broadcast; /**< Value of the "broadcast" attribute in the message element. */
char* msg_xml; /**< The entire message as XML, complete with entity encoding. */
+ char* msg_json; /**< The entire message as JSON*/
struct transport_message_struct* next;
};
typedef struct transport_message_struct transport_message;
const char* thread, const char* recipient, const char* sender );
transport_message* new_message_from_xml( const char* msg_xml );
+transport_message* new_message_from_json( const char* msg_json );
void message_set_router_info( transport_message* msg, const char* router_from,
const char* router_to, const char* router_class, const char* router_command,
void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
int message_prepare_xml( transport_message* msg );
+int message_prepare_json( transport_message* msg );
int message_free( transport_message* msg );
endif
if BUILDCORE
-MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
+MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio
dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl
bin_SCRIPTS = @top_srcdir@/bin/osrf_config
dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example
--- /dev/null
+#!/usr/bin/perl
+use strict;
+use warnings;
+use Getopt::Long;
+use OpenSRF::Utils::Logger q/$logger/;
+use OpenSRF::AppSession;
+use OpenSRF::Application;
+use Time::HiRes qw/time/;
+
+# Testing with Evergreen storage service by default.
+# I tried using opensrf.settings but for reasons I didn't investigate
+# hitting opensrf.settings with lots of requests lead to failures.
+#my $test_service = "open-ils.storage";
+#my $test_service = "opensrf.settings";
+my $test_service = "open-ils.cstore";
+
+my $iterations = 50;
+my $parallel = 1;
+
+my $small_echo_data = <<TEXT;
+ 1237012938471029348170197908709870987098709870987098709809870987098709870
+ 1237012938471029348170197908709870987098709870987098709809870987098709870
+TEXT
+
+my $large_echo_data = join('', <DATA>);
+my $med_echo_data = substr($large_echo_data, length($large_echo_data) / 2);
+
+my $osrf_config = '/openils/conf/opensrf_core.xml';
+my $ops = GetOptions('osrf-config=s' => \$osrf_config);
+
+sub echoloop {
+ my $data = shift;
+ my $ses = shift || OpenSRF::AppSession->create($test_service);
+ my $connected = shift || 0;
+
+ my $start = time;
+ for (0..$iterations) {
+ my $resp = $ses->request('opensrf.system.echo', "$data$_")->gather(1);
+
+ if ($resp eq "$data$_") {
+ print "+";
+ } else {
+ warn "Got bad data: $resp\n";
+ }
+ }
+
+ my $dur = time - $start;
+ my $avg = $dur / $iterations;
+
+ if ($parallel == 1) {
+ print sprintf(
+ " Connected=$connected Size=%d\tTotal Duration: %0.3f\tAvg Duration: %0.4f\n",
+ length($data), $dur, $avg
+ );
+ }
+}
+
+sub do_the_stuff {
+
+ echoloop($small_echo_data);
+ echoloop($med_echo_data);
+ echoloop($large_echo_data);
+
+ # Connected sessions
+
+ my $ses = OpenSRF::AppSession->create($test_service);
+
+ $ses->connect;
+ echoloop($small_echo_data, $ses, 1);
+ $ses->disconnect;
+
+ $ses->connect;
+ echoloop($med_echo_data, $ses, 1);
+ $ses->disconnect;
+
+ $ses->connect;
+ echoloop($large_echo_data, $ses, 1);
+ $ses->disconnect;
+}
+
+for (1..$parallel) {
+
+ if (fork() == 0) {
+ OpenSRF::System->bootstrap_client(config_file => $osrf_config);
+ do_the_stuff();
+ exit;
+ }
+}
+
+while (wait > 0) {}
+print "\n";
+
+__DATA__
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \
@srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c
-AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
+AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf
AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR`
#endif
static void childInit(apr_pool_t *p, server_rec *s) {
- if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
+ if(!osrf_system_bootstrap_common(configFile, configCtx, "translator", 0)) {
ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
"Unable to Bootstrap OpenSRF Client with config %s..", configFile);
return;
int bootstrapped = 0;
int numserved = 0;
osrfStringArray* allowedOrigins = NULL;
+static osrfStringArray* public_services = NULL;
static const char* osrf_json_gateway_set_default_locale(cmd_parms *parms,
void *config, const char *arg) {
int t = time(NULL);
snprintf(buf, sizeof(buf), "%d", t);
- if( ! osrfSystemBootstrapClientResc( cfg, CONFIG_CONTEXT, buf ) ) {
+ if( ! osrf_system_bootstrap_common( cfg, CONFIG_CONTEXT, buf, 0 ) ) {
ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
"Unable to Bootstrap OpenSRF Client with config %s..", cfg);
return;
}
+ public_services = osrfNewStringArray(16);
+ osrfConfigGetValueList(NULL, public_services, "/config/public_services/service");
+
allowedOrigins = osrfNewStringArray(4);
osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
/* ----------------------------------------------------------------- */
- if(!(service && method)) {
+ if(!(service && method) ||
+ (service && !osrfStringArrayContains(public_services, service))) {
osrfLogError(OSRF_LOG_MARK,
"Service [%s] not found or not allowed", service);
osrfConfig.c \
osrf_application.c \
osrf_cache.c \
- osrf_transgroup.c \
osrf_list.c \
osrf_hash.c \
osrf_utf8.c \
return NULL;
}
+ /*
+
// Get a list of domain names from the config settings;
// ignore all but the first one in the list.
osrfStringArray* arr = osrfNewStringArray(8);
free( session );
return NULL;
}
+ */
+
+ growing_buffer *buf = buffer_init(32);
+ buffer_add(buf, "service:");
+ buffer_add(buf, remote_service);
- session->remote_id = strdup(target_buf);
+ session->remote_id = buffer_release(buf);
+ //session->remote_id = strdup(remote_service);
session->orig_remote_id = strdup(session->remote_id);
session->remote_service = strdup(remote_service);
session->session_locale = NULL;
return -1;
}
- osrfLogDebug( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
+ osrfLogInternal( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
// Extract the only valid option bits, and ignore the rest.
int opts = options & ( OSRF_METHOD_STREAMING | OSRF_METHOD_CACHABLE );
free( max_backlog_queue );
/* --------------------------------------------------- */
- char* resc = va_list_to_string( "%s_listener", appname );
+ //char* resc = va_list_to_string( "%s_listener", appname );
// Make sure that we haven't already booted
- if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+ if( !osrf_system_bootstrap_common(NULL, "service", appname, 1)) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
- free( resc );
+ //free( resc );
return -1;
}
- free( resc );
+ //free( resc );
prefork_simple forker;
prefork_launch_children( &forker );
// Tell the router that you're open for business.
- osrf_prefork_register_routers( appname, false );
+ //osrf_prefork_register_routers( appname, false );
signal( SIGUSR1, sigusr1_handler);
signal( SIGUSR2, sigusr2_handler);
// Connect to cache server(s).
osrfSystemInitCache();
- char* resc = va_list_to_string( "%s_drone", child->appname );
+ //char* resc = va_list_to_string( "%s_drone", child->appname );
// If we're a source-client, tell the logger now that we're a new process.
char* isclient = osrfConfigGetValue( NULL, "/client" );
osrfSystemIgnoreTransportClient();
// Connect to Jabber
- if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+ if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
- free( resc );
+ //free( resc );
return -1;
}
- free( resc );
+ //free( resc );
// Dynamically call the application-specific initialization function
// from a previously loaded shared library.
if( !client_connected( client )) {
osrfSystemIgnoreTransportClient();
osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
- if( !osrf_system_bootstrap_client( NULL, NULL )) {
+ if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
osrfLogError( OSRF_LOG_MARK,
"Unable to bootstrap client in prefork_child_process_request()" );
sleep( 1 );
}
// Construct the message from the xml.
- transport_message* msg = new_message_from_xml( data );
+ //transport_message* msg = new_message_from_xml( data );
+ transport_message* msg = new_message_from_json( data );
// Respond to the transport message. This is where method calls are buried.
osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
*/
static void sigusr1_handler( int sig ) {
if (!global_forker) return;
- osrf_prefork_register_routers(global_forker->appname, true);
+ //osrf_prefork_register_routers(global_forker->appname, true);
signal( SIGUSR1, sigusr1_handler );
}
*/
static void sigusr2_handler( int sig ) {
if (!global_forker) return;
- osrf_prefork_register_routers(global_forker->appname, false);
+ //osrf_prefork_register_routers(global_forker->appname, false);
signal( SIGUSR2, sigusr2_handler );
}
continue;
}
- message_prepare_xml( cur_msg );
- const char* msg_data = cur_msg->msg_xml;
+ //message_prepare_xml( cur_msg );
+ message_prepare_json( cur_msg );
+ //const char* msg_data = cur_msg->msg_xml;
+ const char* msg_data = cur_msg->msg_json;
if( ! msg_data || ! *msg_data ) {
osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
(msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
cur_child->write_data_fd );
- const char* msg_data = cur_msg->msg_xml;
+ const char* msg_data = cur_msg->msg_json;
int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
// This child appears to be dead or unusable. Discard it.
osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
new_child->write_data_fd, new_child->pid );
- const char* msg_data = cur_msg->msg_xml;
+ const char* msg_data = cur_msg->msg_json;
int written = write(
new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
// always de-register routers before killing child processes (or waiting
// for them to complete) so that new requests are directed elsewhere.
- osrf_prefork_register_routers(global_forker->appname, true);
+ //osrf_prefork_register_routers(global_forker->appname, true);
while( prefork->first_child ) {
/**
@brief Bootstrap a generic application from info in the configuration file.
@param config_file Name of the configuration file.
- @param contextnode Name of an aggregate within the configuration file, containing the
+ @param connection_type Name of an aggregate within the configuration file, containing the
relevant subset of configuration stuff.
@return 1 if successful; zero or -1 if error.
A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource.
*/
-int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
- return osrfSystemBootstrapClientResc(config_file, contextnode, NULL);
+
+int osrf_system_bootstrap_client(const char* config_file, const char* connection_type) {
+ return osrf_system_bootstrap_common(config_file, connection_type, "client", 0);
}
/**
const char* action, const char* service) {
// Load the conguration, open the log, open a connection to Jabber
- if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) {
+ if (!osrf_system_bootstrap_common(config, context, "client", 0)) {
osrfLogError(OSRF_LOG_MARK,
"Unable to bootstrap for host %s from configuration file %s",
hostname, config);
/**
@brief Bootstrap a generic application from info in the configuration file.
@param config_file Name of the configuration file.
- @param contextnode Name of an aggregate within the configuration file, containing the
+ @param connection_type Name of an aggregate within the configuration file, containing the
relevant subset of configuration stuff.
@param resource Used to construct a Jabber resource name; may be NULL.
@return 1 if successful; zero or -1 if error.
- Open the log.
- Open a connection to Jabber.
*/
-int osrfSystemBootstrapClientResc( const char* config_file,
- const char* contextnode, const char* resource ) {
+int osrfSystemBootstrapClientResc(const char* config_file,
+ const char* connection_type, const char* appname) {
+ return osrf_system_bootstrap_common(config_file, connection_type, appname, 0);
+}
+
+int osrf_system_bootstrap_common(const char* config_file,
+ const char* connection_type, const char* appname, int is_service) {
+
+ if (connection_type == NULL) {
+ osrfLogError(OSRF_LOG_MARK,
+ "osrf_system_bootstrap_common() requires a connection type");
+ return -1;
+ }
int failure = 0;
return 1; /* we already have a client connection */
}
- if( !( config_file && contextnode ) && ! osrfConfigHasDefaultConfig() ) {
+ if( !( config_file && connection_type ) && ! osrfConfigHasDefaultConfig() ) {
osrfLogError( OSRF_LOG_MARK, "No Config File Specified\n" );
return -1;
}
if( config_file ) {
- osrfConfig* cfg = osrfConfigInit( config_file, contextnode );
+ osrfConfig* cfg = osrfConfigInit(config_file, NULL);
if(cfg)
osrfConfigSetDefaultConfig(cfg);
else
// fetch list of configured log redaction marker strings
log_protect_arr = osrfNewStringArray(8);
- osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
- osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
+ osrfConfigGetValueList(cfg, log_protect_arr, "/config/log_protect/match_string" );
}
- char* log_file = osrfConfigGetValue( NULL, "/logfile");
- if(!log_file) {
+ char* log_file = osrfConfigGetValue(NULL, "/config/connections/%s/logfile", connection_type);
+ if (!log_file) {
fprintf(stderr, "No log file specified in configuration file %s\n",
config_file);
return -1;
}
- char* log_level = osrfConfigGetValue( NULL, "/loglevel" );
- osrfStringArray* arr = osrfNewStringArray(8);
- osrfConfigGetValueList(NULL, arr, "/domain");
-
- char* username = osrfConfigGetValue( NULL, "/username" );
- char* password = osrfConfigGetValue( NULL, "/passwd" );
- char* port = osrfConfigGetValue( NULL, "/port" );
- char* unixpath = osrfConfigGetValue( NULL, "/unixpath" );
- char* facility = osrfConfigGetValue( NULL, "/syslog" );
- char* actlog = osrfConfigGetValue( NULL, "/actlog" );
- char* logtag = osrfConfigGetValue( NULL, "/logtag" );
+ char* log_level = osrfConfigGetValue(NULL, "/config/connections/%s/loglevel", connection_type);
+ char* username = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/username", connection_type);
+ char* password = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/password", connection_type);
+ char* host = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/host", connection_type);
+ char* port = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/port", connection_type);
+ char* unixpath = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/sock", connection_type);
+ char* facility = osrfConfigGetValue(NULL, "/config/connections/%s/syslog", connection_type);
+ char* actlog = osrfConfigGetValue(NULL, "/config/connections/%s/actlog", connection_type);
+ char* logtag = osrfConfigGetValue(NULL, "/config/connections/%s/logtag", connection_type);
/* if we're a source-client, tell the logger */
- char* isclient = osrfConfigGetValue(NULL, "/client");
+ char* isclient = osrfConfigGetValue(NULL, "/config/connections/%s/client", connection_type);
if( isclient && !strcasecmp(isclient,"true") )
osrfLogSetIsClient(1);
free(isclient);
int llevel = 0;
int iport = 0;
- if(port) iport = atoi(port);
- if(log_level) llevel = atoi(log_level);
+ if (port) iport = atoi(port);
+ if (log_level) llevel = atoi(log_level);
if(!strcmp(log_file, "syslog")) {
if(logtag) osrfLogSetLogTag(logtag);
- osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel );
+ osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel );
osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility));
if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog));
} else {
- osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel );
+ osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel );
osrfLogSetFile( log_file );
}
-
- /* Get a domain, if one is specified */
- const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
- if(!domain) {
- fprintf(stderr, "No domain specified in configuration file %s\n", config_file);
- osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n",
- config_file );
- failure = 1;
- }
-
if(!username) {
fprintf(stderr, "No username specified in configuration file %s\n", config_file);
osrfLogError( OSRF_LOG_MARK, "No username specified in configuration file %s\n",
}
if (failure) {
- osrfStringArrayFree(arr);
free(log_file);
free(log_level);
free(username);
return 0;
}
- osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s",
- domain, iport, unixpath ? unixpath : "(none)" );
- transport_client* client = client_init( domain, iport, unixpath, 0 );
-
- char host[HOST_NAME_MAX + 1] = "";
- gethostname(host, sizeof(host) );
- host[HOST_NAME_MAX] = '\0';
+ osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with host %s, port %d, and unixpath %s",
+ host, iport, unixpath ? unixpath : "(none)" );
- char tbuf[32];
- tbuf[0] = '\0';
- snprintf(tbuf, 32, "%f", get_timestamp_millis());
+ transport_client* client = client_init(host, iport, unixpath);
- if(!resource) resource = "";
+ if (appname == NULL) { appname = "client"; }
- int len = strlen(resource) + 256;
- char buf[len];
- buf[0] = '\0';
- snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() );
-
- if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) {
- osrfGlobalTransportClient = client;
- }
+ if (is_service) {
+ if (client_connect_as_service(client, appname, username, password)) {
+ osrfGlobalTransportClient = client;
+ }
+ } else {
+ if (client_connect(client, appname, username, password)) {
+ osrfGlobalTransportClient = client;
+ }
+ }
- osrfStringArrayFree(arr);
free(actlog);
free(facility);
free(log_level);
#include <opensrf/transport_client.h>
-/**
- @file transport_client.c
- @brief Collection of routines for sending and receiving single messages over Jabber.
+static int handle_redis_error(redisReply* reply, char* command, ...);
- These functions form an API built on top of the transport_session API. They serve
- two main purposes:
- - They remember a Jabber ID to use when sending messages.
- - They maintain a queue of input messages that the calling code can get one at a time.
-*/
+transport_client* client_init(const char* server, int port, const char* unix_path) {
-static void client_message_handler( void* client, transport_message* msg );
+ if(server == NULL) return NULL;
-//int main( int argc, char** argv );
+ /* build and clear the client object */
+ transport_client* client = safe_malloc( sizeof( transport_client) );
-/*
-int main( int argc, char** argv ) {
+ /* start with an empty message queue */
+ client->bus = NULL;
+ client->stream_name = NULL;
+ client->consumer_name = NULL;
+
+ client->max_queue_size = 1000; // TODO pull from config
+ client->port = port;
+ client->host = server ? strdup(server) : NULL;
+ client->unix_path = unix_path ? strdup(unix_path) : NULL;
+ client->error = 0;
- transport_message* recv;
- transport_message* send;
+ return client;
+}
- transport_client* client = client_init( "spacely.georgialibraries.org", 5222 );
+int client_connect_with_stream_name(transport_client* client,
+ const char* username, const char* password) {
- // try to connect, allow 15 second connect timeout
- if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) {
- printf("Connected...\n");
- } else {
- printf( "NOT Connected...\n" ); exit(99);
- }
+ osrfLogDebug(OSRF_LOG_MARK, "Transport client connecting with bus "
+ "stream=%s; consumer=%s; host=%s; port=%d; unix_path=%s",
+ client->stream_name,
+ client->consumer_name,
+ client->host,
+ client->port,
+ client->unix_path
+ );
- while( (recv = client_recv( client, -1 )) ) {
+ // TODO use redisConnectWithTimeout so we can verify connection.
+ if (client->host && client->port) {
+ client->bus = redisConnect(client->host, client->port);
+ } else if (client->unix_path) {
+ client->bus = redisConnectUnix(client->unix_path);
+ }
- if( recv->body ) {
- int len = strlen(recv->body);
- char buf[len + 20];
- osrf_clearbuf( buf, 0, sizeof(buf));
- sprintf( buf, "Echoing...%s", recv->body );
- send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" );
- } else {
- send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" );
- }
+ if (client->bus == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance");
+ return 0;
+ }
- if( send == NULL ) { printf("something's wrong"); }
- client_send_message( client, send );
+ osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK");
- message_free( send );
- message_free( recv );
- }
+ osrfLogDebug(OSRF_LOG_MARK, "Sending AUTH with username=%s", username);
- printf( "ended recv loop\n" );
+ redisReply *reply = redisCommand(client->bus, "AUTH %s %s", username, password);
- return 0;
+ if (handle_redis_error(reply, "AUTH %s %s", username, password)) { return 0; }
-}
-*/
+ osrfLogDebug(OSRF_LOG_MARK, "Redis AUTH succeeded");
+ freeReplyObject(reply);
-/**
- @brief Allocate and initialize a transport_client.
- @param server Domain name where the Jabber server resides.
- @param port Port used for connecting to Jabber (0 if using UNIX domain socket).
- @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket).
- @param component Boolean; true if we're a Jabber component.
- @return A pointer to a newly created transport_client.
-
- Create a transport_client with a transport_session and an empty message queue (but don't
- open a connection yet). Install a callback function in the transport_session to enqueue
- incoming messages.
-
- The calling code is responsible for freeing the transport_client by calling client_free().
-*/
-transport_client* client_init( const char* server, int port, const char* unix_path, int component ) {
+ // Create our stream + consumer group.
+ // This will produce an error when the group already exists, which
+ // will happen with service-level groups. Skip error checking.
+ reply = redisCommand(
+ client->bus,
+ "XGROUP CREATE %s %s $ mkstream",
+ client->stream_name,
+ client->stream_name
+ );
- if(server == NULL) return NULL;
+ freeReplyObject(reply);
- /* build and clear the client object */
- transport_client* client = safe_malloc( sizeof( transport_client) );
+ return 1;
+}
- /* start with an empty message queue */
- client->msg_q_head = NULL;
- client->msg_q_tail = NULL;
+int client_connect_as_service(transport_client* client,
+ const char* appname, const char* username, const char* password) {
+ if (client == NULL || appname == NULL) { return 0; }
+ growing_buffer *buf = buffer_init(32);
+ buffer_fadd(buf, "service:%s", appname);
- /* build the session */
- client->session = init_transport( server, port, unix_path, client, component );
+ // strdup the content, leave the buf alive.
+ client->stream_name = buffer_data(buf);
- client->session->message_callback = client_message_handler;
- client->error = 0;
- client->host = strdup(server);
- client->xmpp_id = NULL;
+ // Add some random stuff to the end of the consumer name, which
+ // has to be unuique per client.
+ char junk[256];
+ snprintf(junk, sizeof(junk),
+ "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid());
- return client;
+ char* md5 = md5sum(junk);
+
+ buffer_add(buf, ":");
+ buffer_add_n(buf, md5, 12);
+
+ client->consumer_name = buffer_release(buf);
+
+ return client_connect_with_stream_name(client, username, password);
}
+int client_connect(transport_client* client,
+ const char* appname, const char* username, const char* password) {
+ if (client == NULL || appname == NULL) { return 0; }
-/**
- @brief Open a Jabber session for a transport_client.
- @param client Pointer to the transport_client.
- @param username Jabber user name.
- @param password Password for the Jabber logon.
- @param resource Resource name for the Jabber logon.
- @param connect_timeout How many seconds to wait for the connection to open.
- @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes).
- @return 1 if successful, or 0 upon error.
-
- Besides opening the Jabber session, create a Jabber ID for future use.
-
- If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond. If
- @a connect_timeout is zero, don't wait at all. If @a timeout is positive, wait that
- number of seconds before timing out. If @a connect_timeout has a negative value other
- than -1, the results are not well defined.
-
- The value of @a connect_timeout applies to each of two stages in the logon procedure.
- Hence the logon may take up to twice the amount of time indicated.
-
- If we connect as a Jabber component, we send the password as an SHA1 hash. Otherwise
- we look at the @a auth_type. If it's AUTH_PLAIN, we send the password as plaintext; if
- it's AUTH_DIGEST, we send it as a hash.
- */
-int client_connect( transport_client* client,
- const char* username, const char* password, const char* resource,
- int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ) {
- if( client == NULL )
- return 0;
-
- // Create and store a Jabber ID
- if( client->xmpp_id )
- free( client->xmpp_id );
- client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource );
-
- // Open a transport_session
- return session_connect( client->session, username,
- password, resource, connect_timeout, auth_type );
+ char junk[256];
+ snprintf(junk, sizeof(junk),
+ "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid());
+
+ char* md5 = md5sum(junk);
+
+ growing_buffer *buf = buffer_init(32);
+ buffer_add(buf, "client:");
+
+ if (strcmp("client", appname) == 0) {
+ // Standalone client
+ buffer_add_n(buf, md5, 12);
+ } else {
+ // Service client client:servicename:junk
+ buffer_fadd(buf, "%s:", appname);
+ buffer_add_n(buf, md5, 12);
+ }
+
+ client->stream_name = buffer_release(buf);
+ client->consumer_name = strdup(client->stream_name);
+
+ free(md5);
+
+ return client_connect_with_stream_name(client, username, password);
}
-/**
- @brief Disconnect from the Jabber session.
- @param client Pointer to the transport_client.
- @return 0 in all cases.
+int client_disconnect(transport_client* client) {
+ if (client == NULL || client->bus == NULL) { return 0; }
- If there are any messages still in the queue, they stay there; i.e. we don't free them here.
-*/
-int client_disconnect( transport_client* client ) {
- if( client == NULL ) { return 0; }
- return session_disconnect( client->session );
+ if (strncmp(client->stream_name, "client:", 7) == 0) {
+ // Delete our stream on disconnect if we are a client.
+ // No point in letting it linger.
+ redisReply *reply =
+ redisCommand(client->bus, "DEL %s", client->stream_name);
+
+ freeReplyObject(reply);
+ }
+
+ redisFree(client->bus);
+ client->bus = NULL;
+ return 1;
}
-/**
- @brief Report whether a transport_client is connected.
- @param client Pointer to the transport_client.
- @return Boolean: 1 if connected, or 0 if not.
-*/
int client_connected( const transport_client* client ) {
- if(client == NULL) return 0;
- return session_connected( client->session );
+ return (client != NULL && client->bus != NULL);
}
-/**
- @brief Send a transport message to the current destination.
- @param client Pointer to a transport_client.
- @param msg Pointer to the transport_message to be sent.
- @return 0 if successful, or -1 if not.
+int client_send_message(transport_client* client, transport_message* msg) {
+ if (client == NULL || client->error) { return -1; }
- Translate the transport_message into XML and send it to Jabber, using the previously
- stored Jabber ID for the sender.
-*/
-int client_send_message( transport_client* client, transport_message* msg ) {
- if( client == NULL || client->error )
- return -1;
- if( msg->sender )
- free( msg->sender );
- msg->sender = strdup(client->xmpp_id);
- return session_send_msg( client->session, msg );
+ if (msg->sender) { free(msg->sender); }
+ msg->sender = strdup(client->stream_name);
+
+ message_prepare_json(msg);
+
+ osrfLogInternal(OSRF_LOG_MARK,
+ "client_send_message() to=%s %s", msg->recipient, msg->msg_json);
+
+ redisReply *reply = redisCommand(client->bus,
+ "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+ msg->recipient,
+ client->max_queue_size,
+ msg->msg_json
+ );
+
+ if (handle_redis_error(reply,
+ "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+ msg->recipient,
+ client->max_queue_size,
+ msg->msg_json)
+ ) { return -1; }
+
+ osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
+
+ freeReplyObject(reply);
+
+ return 0;
}
-/**
- @brief Fetch an input message, if one is available.
- @param client Pointer to a transport_client.
- @param timeout How long to wait for a message to arrive, in seconds (see remarks).
- @return A pointer to a transport_message if successful, or NULL if not.
+// Returns the reply on success, NULL on error
+// On error, the reply is freed.
+static int handle_redis_error(redisReply *reply, char* command, ...) {
- If there is a message already in the queue, return it immediately. Otherwise read any
- available messages from the transport_session (subject to a timeout), and return the
- first one.
+ if (reply != NULL && reply->type != REDIS_REPLY_ERROR) {
+ return 0;
+ }
- If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a
- message arrives (or we error out for other reasons). If the value of @a timeout is zero,
- don't wait at all.
+ VA_LIST_TO_STRING(command);
+ char* err = reply == NULL ? "" : reply->str;
+ osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF);
+ freeReplyObject(reply);
- The calling code is responsible for freeing the transport_message by calling message_free().
-*/
-transport_message* client_recv( transport_client* client, int timeout ) {
- if( client == NULL ) { return NULL; }
+ return 1;
+}
- int error = 0; /* boolean */
+/*
+ * Returns at most one allocated char* pulled from the bus or NULL
+ * if the pop times out or is interrupted.
+ *
+ * The string will be valid JSON string, a partial JSON string, or
+ * a message terminator chararcter.
+ */
+char* recv_one_chunk(transport_client* client, int timeout) {
+ if (client == NULL || client->bus == NULL) { return NULL; }
+
+ redisReply *reply, *tmp;
+ char* json = NULL;
+ char* msg_id = NULL;
+
+ if (timeout != 0) {
+
+ if (timeout == -1) {
+ // Redis timeout 0 means block indefinitely
+ timeout = 0;
+ } else {
+ // Milliseconds
+ timeout *= 1000;
+ }
+
+ reply = redisCommand(client->bus,
+ "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
+ client->stream_name,
+ client->consumer_name,
+ timeout,
+ client->stream_name
+ );
+
+ } else {
+
+ reply = redisCommand(client->bus,
+ "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
+ client->stream_name,
+ client->consumer_name,
+ client->stream_name
+ );
+ }
+
+ // Timeout or error
+ if (handle_redis_error(
+ reply,
+ "XREADGROUP GROUP %s %s %s COUNT 1 STREAMS %s >",
+ client->stream_name,
+ client->consumer_name,
+ "BLOCK X",
+ client->stream_name
+ )) { return NULL; }
+
+ // Unpack the XREADGROUP response, which is a nest of arrays.
+ // These arrays are mostly 1 and 2-element lists, since we are
+ // only reading one item on a single stream.
+ if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
+ tmp = reply->element[0];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+ tmp = tmp->element[1];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
+ tmp = tmp->element[0];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+ redisReply *r1 = tmp->element[0];
+ redisReply *r2 = tmp->element[1];
+
+ if (r1->type == REDIS_REPLY_STRING) {
+ msg_id = strdup(r1->str);
+ }
+
+ if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
+ // r2->element[0] is the message name, which we
+ // currently don't use for anything.
+
+ r2 = r2->element[1];
+
+ if (r2->type == REDIS_REPLY_STRING) {
+ json = strdup(r2->str);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ freeReplyObject(reply); // XREADGROUP
+
+ if (msg_id == NULL) {
+ // Read timed out. 'json' will also be NULL.
+ return NULL;
+ }
+
+ reply = redisCommand(client->bus, "XACK %s %s %s",
+ client->stream_name, client->stream_name, msg_id);
+
+ if (handle_redis_error(
+ reply,
+ "XACK %s %s %s",
+ client->stream_name,
+ client->stream_name, msg_id
+ )) { return NULL; }
+
+ freeReplyObject(reply); // XACK
+
+ free(msg_id);
+
+ osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
+
+ return json;
+}
- if( NULL == client->msg_q_head ) {
+/// Returns at most one JSON value pulled from the bus or NULL if
+/// the list pop times out or the pop is interrupted by a signal.
+jsonObject* recv_one_value(transport_client* client, int timeout) {
- // No message available on the queue? Try to get a fresh one.
+ char* json = recv_one_chunk(client, timeout);
- // When we call session_wait(), it reads a socket for new messages. When it finds
- // one, it enqueues it by calling the callback function client_message_handler(),
- // which we installed in the transport_session when we created the transport_client.
+ if (json == NULL) {
+ // recv() timed out.
+ return NULL;
+ }
- // Since a single call to session_wait() may not result in the receipt of a complete
- // message. we call it repeatedly until we get either a message or an error.
+ jsonObject* obj = jsonParse(json);
- // Alternatively, a single call to session_wait() may result in the receipt of
- // multiple messages. That's why we have to enqueue them.
+ if (obj == NULL) {
+ osrfLogWarning(OSRF_LOG_MARK, "Error parsing JSON: %s", json);
+ }
- // The timeout applies to the receipt of a complete message. For a sufficiently
- // short timeout, a sufficiently long message, and a sufficiently slow connection,
- // we could timeout on the first message even though we're still receiving data.
-
- // Likewise we could time out while still receiving the second or subsequent message,
- // return the first message, and resume receiving messages later.
+ free(json);
- if( timeout == -1 ) { /* wait potentially forever for data to arrive */
+ return obj;
+}
- int x;
- do {
- if( (x = session_wait( client->session, -1 )) ) {
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
- error = 1;
- break;
- }
- } while( client->msg_q_head == NULL );
+/**
+ * Returns at most one jsonObject returned from the data bus.
+ *
+ * Keeps trying until a value is returned or the timeout is exceeded.
+ */
+jsonObject* recv_json_value(transport_client* client, int timeout) {
- } else { /* loop up to 'timeout' seconds waiting for data to arrive */
+ if (timeout == 0) {
+ return recv_one_value(client, 0);
- /* This loop assumes that a time_t is denominated in seconds -- not */
- /* guaranteed by Standard C, but a fair bet for Linux or UNIX */
+ } else if (timeout < 0) {
+ // Keep trying until we have a result.
- time_t start = time(NULL);
- time_t remaining = (time_t) timeout;
+ while (1) {
+ jsonObject* obj = recv_one_value(client, timeout);
+ if (obj != NULL) { return obj; }
+ }
+ }
- int wait_ret;
- do {
- if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
- error = 1;
- osrfLogDebug(OSRF_LOG_MARK,
- "session_wait returned failure code %d: setting error=1\n", wait_ret);
- break;
- }
+ time_t seconds = (time_t) timeout;
- remaining -= time(NULL) - start;
- } while( NULL == client->msg_q_head && remaining > 0 );
- }
- }
+ while (seconds > 0) {
- transport_message* msg = NULL;
+ time_t now = time(NULL);
+ jsonObject* obj = recv_one_value(client, timeout);
- if( !error && client->msg_q_head != NULL ) {
- /* got message(s); dequeue the oldest one */
- msg = client->msg_q_head;
- client->msg_q_head = msg->next;
- msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */
- if( NULL == client->msg_q_head )
- client->msg_q_tail = NULL;
- }
+ if (obj == NULL) {
+ seconds -= now;
+ } else {
+ return obj;
+ }
+ }
- return msg;
+ return NULL;
}
-/**
- @brief Enqueue a newly received transport_message.
- @param client A pointer to a transport_client, cast to a void pointer.
- @param msg A new transport message.
+transport_message* client_recv(transport_client* client, int timeout) {
+ if (client == NULL || client->bus == NULL) { return NULL; }
- Add a newly arrived input message to the tail of the queue.
+ // TODO no need for intermediate to/from JSON. Create transport
+ // message directly from received JSON object.
+ jsonObject* obj = recv_json_value(client, timeout);
- This is a callback function. The transport_session parses the XML coming in through a
- socket, accumulating various bits and pieces. When it sees the end of a message stanza,
- it packages the bits and pieces into a transport_message that it passes to this function,
- which enqueues the message for processing.
-*/
-static void client_message_handler( void* client, transport_message* msg ){
+ if (obj == NULL) { return NULL; } // Receive timed out.
- if(client == NULL) return;
- if(msg == NULL) return;
+ char* json = jsonObjectToJSON(obj);
- transport_client* cli = (transport_client*) client;
+ transport_message* msg = new_message_from_json(json);
- /* add the new message to the tail of the queue */
- if( NULL == cli->msg_q_head )
- cli->msg_q_tail = cli->msg_q_head = msg;
- else {
- cli->msg_q_tail->next = msg;
- cli->msg_q_tail = msg;
- }
- msg->next = NULL;
-}
+ free(json);
+ osrfLogInternal(OSRF_LOG_MARK,
+ "client_recv() read response for thread %s", msg->thread);
+
+ return msg;
+}
/**
@brief Free a transport_client, along with all resources it owns.
@return 1 if successful, or 0 if not. The only error condition is if @a client is NULL.
*/
int client_free( transport_client* client ) {
- if(client == NULL)
- return 0;
- session_free( client->session );
- client->session = NULL;
+ if (client == NULL) { return 0; }
return client_discard( client );
}
disconnect the parent as well.
*/
int client_discard( transport_client* client ) {
- if(client == NULL)
- return 0;
-
- transport_message* current = client->msg_q_head;
- transport_message* next;
-
- /* deallocate the list of messages */
- while( current != NULL ) {
- next = current->next;
- message_free( current );
- current = next;
- }
-
- free(client->host);
- free(client->xmpp_id);
- free( client );
+
+ if (client == NULL) { return 0; }
+
+ if (client->host != NULL) { free(client->host); }
+ if (client->unix_path != NULL) { free(client->unix_path); }
+ if (client->stream_name != NULL) { free(client->stream_name); }
+ if (client->consumer_name != NULL) { free(client->consumer_name); }
+
+ free(client);
+
return 1;
}
-int client_sock_fd( transport_client* client )
-{
- if( !client )
- return 0;
- else
- return client->session->sock_id;
-}
#include <opensrf/transport_message.h>
+#include <opensrf/osrf_json.h>
/**
@file transport_message.c
msg->error_code = 0;
msg->broadcast = 0;
msg->msg_xml = NULL;
+ msg->msg_json = NULL;
msg->next = NULL;
return msg;
}
+transport_message* new_message_from_json(const char* msg_json) {
+
+ if (msg_json == NULL || *msg_json == '\0') { return NULL; }
+
+ transport_message* new_msg = safe_malloc(sizeof(transport_message));
+
+ new_msg->body = NULL;
+ new_msg->subject = NULL;
+ new_msg->thread = NULL;
+ new_msg->recipient = NULL;
+ new_msg->sender = NULL;
+ new_msg->router_from = NULL;
+ new_msg->router_to = NULL;
+ new_msg->router_class = NULL;
+ new_msg->router_command = NULL;
+ new_msg->osrf_xid = NULL;
+ new_msg->is_error = 0;
+ new_msg->error_type = NULL;
+ new_msg->error_code = 0;
+ new_msg->broadcast = 0;
+ new_msg->msg_xml = NULL;
+ new_msg->next = NULL;
+
+ jsonObject* json_hash = jsonParse(msg_json);
+
+ if (json_hash == NULL || json_hash->type != JSON_HASH) {
+ osrfLogError(OSRF_LOG_MARK, "new_message_from_json() received bad JSON");
+ jsonObjectFree(json_hash);
+ message_free(new_msg);
+ return NULL;
+ }
+
+ const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from"));
+ if (sender) { new_msg->sender = strdup((const char*) sender); }
+
+ const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to"));
+ if (recipient) { new_msg->recipient = strdup((const char*) recipient); }
+
+ const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread"));
+ if (thread == NULL) { thread = ""; }
+ new_msg->thread = strdup((const char*) thread);
+
+ const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid"));
+ if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); }
+
+ // TODO
+ // Internally the mesage body is stored as a JSON string
+ // On the wire, it's just part of the message. We could get
+ // rid if this extra json encode/decode step if we treated
+ // the body as a JSON object internally.
+ const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body"));
+ if (body == NULL) { body = ""; }
+ new_msg->body = strdup((const char*) body);
+
+ jsonObjectFree(json_hash);
+
+ return new_msg;
+}
+
+
/**
@brief Translate an XML string into a transport_message.
free(msg->osrf_xid);
if( msg->error_type != NULL ) free(msg->error_type);
if( msg->msg_xml != NULL ) free(msg->msg_xml);
+ if( msg->msg_json != NULL ) free(msg->msg_json);
free(msg);
return 1;
}
+int message_prepare_json(transport_message* msg) {
+
+ if (!msg) { return 0; }
+ if (msg->msg_json) { return 1; } /* already done */
+
+ jsonObject* json_hash = jsonNewObject(NULL);
+ jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient));
+ jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender));
+ jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread));
+ jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid));
+
+ // TODO the various layers expect the message body to be a separate
+ // JSON string, but on the bus, the body is just another key
+ // in the JSON object.
+ jsonObjectSetKey(json_hash, "body", jsonParse(msg->body));
+
+ msg->msg_json = jsonObjectToJSON(json_hash);
+
+ jsonObjectFree(json_hash);
+
+ return 1;
+}
+
+
/**
@brief Build a <message> element and store it as a string in the msg_xml member.
@param msg Pointer to a transport_message.
lib/OpenSRF/Transport/PeerHandle.pm
lib/OpenSRF/Transport/SlimJabber.pm
lib/OpenSRF/Transport/SlimJabber/Client.pm
-lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
+lib/OpenSRF/Transport/Redis/Client.pm
+lib/OpenSRF/Transport/Redis/PeerConnection.pm
+lib/OpenSRF/Transport/Redis/Message.pm
lib/OpenSRF/Utils.pm
lib/OpenSRF/Utils/Cache.pm
lib/OpenSRF/Utils/Config.pm
use OpenSRF::Utils::JSON;
use OpenSRF::Utils::Logger qw(:level);
use OpenSRF::Utils::SettingsClient;
-use OpenSRF::Utils::Config;
use OpenSRF::EX;
use OpenSRF;
use Exporter;
sub get_app_targets {
my $app = shift;
-
- my $conf = OpenSRF::Utils::Config->current;
- my $router_name = $conf->bootstrap->router_name || 'router';
- my $domain = $conf->bootstrap->domain;
- $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
- unless($router_name and $domain) {
- throw OpenSRF::EX::Config
- ("Missing router config information 'router_name' and 'domain'");
- }
-
- return ("$router_name\@$domain/$app");
+ return ("service:$app");
}
sub stateless {
}
}
+
+ # TODO Redis
+ # We can remove this extra layer of JSON round-tripping on the body.
my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc);
$logger->internal("AppSession sending doc: $json");
use OpenSRF::EX qw/:try/;
use Carp;
use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Config;
sub DESTROY{};
our $in_request = 0;
our @pending_requests;
-our $shared_conf;
sub package {
my $self = shift;
my $logdata = "CALL: ".$session->service." $method_name ";
my $redact_params = 0;
if (@p) {
- if (ref($shared_conf->shared->log_protect) eq 'ARRAY') {
- foreach my $match_string (@{$shared_conf->shared->log_protect}) {
+
+ my $conf = OpenSRF::Utils::Config->current->as_hash;
+ my $protect = $conf->{shared}->{log_protect};
+
+ if (ref($protect) eq 'ARRAY') {
+ foreach my $match_string (@$protect) {
if ($method_name =~ /^$match_string/) {
$redact_params = 1;
last;
use OpenSRF::Utils::Logger qw($logger);
use OpenSRF::DomainObject::oilsResponse qw/:status/;
use OpenSRF::Transport::SlimJabber::Client;
+use Digest::MD5 qw(md5_hex);
use Encode;
use POSIX qw/:sys_wait_h :errno_h/;
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
sub build_osrf_handle {
my $self = shift;
- my $conf = OpenSRF::Utils::Config->current;
- my $username = $conf->bootstrap->username;
- my $password = $conf->bootstrap->passwd;
- my $domain = $conf->bootstrap->domain;
- my $port = $conf->bootstrap->port;
- my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
+ my $conf = OpenSRF::Utils::Config->current
+ ->as_hash->{connections}->{service}->{message_bus};
+
+ my $port = $conf->{port} || 6379;
+ my $host = $conf->{host} || '127.0.0.1';
+ my $sock = $conf->{sock};
+ my $username = $conf->{username};
+ my $password = $conf->{password};
- $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
+ # Every listener needs a unique consumer name.
+ my $consumer_name = 'service:' .
+ $self->{service} . ':' . substr(md5_hex($$ . time . rand($$)), 0, 12);
$self->{osrf_handle} =
- OpenSRF::Transport::SlimJabber::Client->new(
- username => $username,
- resource => $resource,
- password => $password,
- host => $domain,
+ OpenSRF::Transport::Redis::Client->new(
+ stream_name => "service:" . $self->{service},
+ consumer_name => $consumer_name,
+ host => $host,
port => $port,
+ sock => $sock,
+ username => $username,
+ password => $password
);
$self->{osrf_handle}->initialize;
# ----------------------------------------------------------------
sub write_child {
my($self, $child, $msg) = @_;
- my $xml = encode_utf8(decode_utf8($msg->to_xml));
+ #my $xml = encode_utf8(decode_utf8($msg->to_xml));
+ my $json = $msg->to_json;
# tell the child how much data to expect, minus the header
my $write_size;
- {use bytes; $write_size = length($xml)}
+ {use bytes; $write_size = length($json)}
$write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
for (0..2) {
# so the lack of a pid means the child is dead.
if (!$child->{pid}) {
$logger->error("server: child is dead in write_child(). ".
- "unable to send message: $xml");
+ "unable to send message: $json");
return; # avoid syswrite crash
}
# send message to child data pipe
- syswrite($child->{pipe_to_child}, $write_size . $xml);
+ syswrite($child->{pipe_to_child}, $write_size . $json);
last unless $self->{sig_pipe};
$logger->error("server: got SIGPIPE writing to $child, retrying...");
# Sends the register command to the configured routers
# ----------------------------------------------------------------
sub register_routers {
+ return; # TODO Redis
+
my $self = shift;
my $conf = OpenSRF::Utils::Config->current;
# with.
# ----------------------------------------------------------------
sub unregister_routers {
+ return; # TODO Redis
+
my $self = shift;
return unless $self->{osrf_handle}->tcp_connected;
my $self = shift;
my $service = $self->{parent}->{service};
$0 = "OpenSRF Drone [$service]";
- OpenSRF::Transport::PeerHandle->construct($service);
+ OpenSRF::Transport::PeerHandle->construct($service, 'service');
OpenSRF::Application->application_implementation->child_init
if (OpenSRF::Application->application_implementation->can('child_init'));
}
my $orig_name = $0;
$0 = "$0*";
- # Discard extraneous data from the jabber socket
- if(!$network->flush_socket()) {
+ # Discard extraneous data from our direct message bus ID.
+ if (!$network->flush_socket()) {
$logger->error("server: network disconnected! child dropping request and exiting: $data");
exit;
}
my $session = OpenSRF::Transport->handler(
$self->{parent}->{service},
- OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+ OpenSRF::Transport::Redis::Message->new(json => $data)
);
my $recycle = $self->keepalive_loop($session);
sub DESTROY {}
sub load_bootstrap_config {
+
return if OpenSRF::Utils::Config->current;
die "Please provide a bootstrap config file to OpenSRF::System\n"
unless $bootstrap_config_file;
OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file);
- OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']);
- OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper");
- OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection");
+ OpenSRF::Utils::JSON->register_class_hint(
+ name => 'OpenSRF::Application', hint => 'method', type => 'hash', strip => ['session']);
+ OpenSRF::Transport::PeerHandle->set_peer_client('OpenSRF::Transport::Redis::PeerConnection');
OpenSRF::Application->server_class('client');
+
+ return; # XXX
+
# Read in a shared portion of the config file
# for later use in log parameter redaction
$OpenSRF::Application::shared_conf = OpenSRF::Utils::Config->load(
my $app = $params{client_name} || "client";
+ my $connection_type = $params{connection_type} || 'service';
+
load_bootstrap_config();
- OpenSRF::Utils::Logger::set_config();
- OpenSRF::Transport::PeerHandle->construct($app);
+ OpenSRF::Utils::Logger::set_config(undef, $connection_type);
+ OpenSRF::Transport::PeerHandle->construct($app, $connection_type);
}
sub connected {
$0 = "OpenSRF Listener [$service]";
# temp connection to use for application initialization
- OpenSRF::System->bootstrap_client(client_name => "system_client");
+ OpenSRF::System->bootstrap_client(client_name => $service);
my $sclient = OpenSRF::Utils::SettingsClient->new;
my $getval = sub { $sclient->config_value(apps => $service => @_); };
use OpenSRF::Utils::Logger qw(:level);
use OpenSRF::DomainObject::oilsResponse qw/:status/;
use OpenSRF::EX qw/:try/;
-use OpenSRF::Transport::SlimJabber::MessageWrapper;
#------------------
# --- These must be implemented by all Transport subclasses
our $message_envelope;
my $logger = "OpenSRF::Utils::Logger";
-
-
-=head2 message_envelope( [$envelope] );
-
-Sets the message envelope class that will allow us to extract
-information from the messages we receive from the low
-level transport
-
-=cut
-
-sub message_envelope {
- my( $class, $envelope ) = @_;
- if( $envelope ) {
- $message_envelope = $envelope;
- $envelope->use;
- if( $@ ) {
- $logger->error(
- "Error loading message_envelope: $envelope -> $@", ERROR);
- }
- }
- return $message_envelope;
-}
-
=head2 handler( $data )
-Creates a new MessageWrapper, extracts the remote_id, session_id, and message body
+Creates a new Message, extracts the remote_id, session_id, and message body
from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id.
Finally, creates the message document from the body of the message and calls
the handler method on the message document.
--- /dev/null
+package OpenSRF::Transport::Redis::Client;
+use strict;
+use warnings;
+use Redis;
+use Time::HiRes q/time/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::Redis::Message;
+
+sub new {
+ my ($class, %params) = @_;
+ my $self = bless({}, ref($class) || $class);
+ $self->params(\%params);
+ return $self;
+}
+
+sub redis {
+ my ($self, $redis) = @_;
+ $self->{redis} = $redis if $redis;
+ return $self->{redis};
+}
+
+sub params {
+ my ($self, $params) = @_;
+ $self->{params} = $params if $params;
+ return $self->{params};
+}
+
+sub disconnect {
+ my $self = shift;
+ return unless $self->redis;
+
+ if ($self->stream_name =~ /^client:/) {
+ # Delete our stream since we're the only one using it. Deleting
+ # the stream also deletes our consumer group.
+ $self->redis->del($self->stream_name);
+ }
+
+ $self->redis->quit;
+ delete $self->{redis};
+}
+
+sub gather {
+ my $self = shift;
+ $self->process(0);
+}
+
+# -------------------------------------------------
+
+sub tcp_connected {
+ my $self = shift;
+ return $self->redis ? 1 : 0;
+}
+
+sub connected {
+ my $self = shift;
+ return $self->tcp_connected;
+}
+
+sub initialize {
+ my $self = shift;
+
+ my $host = $self->params->{host} || '';
+ my $port = $self->params->{port} || 0;
+ my $sock = $self->params->{sock} || '';
+ my $username = $self->params->{username};
+ my $password = $self->params->{password};
+ my $stream_name = $self->params->{stream_name};
+ my $consumer_name = $self->params->{consumer_name};
+ my $max_queue_size = $self->params->{max_queue_size};
+
+ $logger->debug("Redis client connecting: ".
+ "host=$host port=$port sock=$sock username=$username stream_name=$stream_name");
+
+ return 1 if $self->redis; # already connected
+
+ # UNIX socket file takes precedence over host:port.
+ my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port");
+
+ # On disconnect, try to reconnect every 100ms up to 60 seconds.
+ push(@connect_args, (reconnect => 60, every => 100_000));
+
+ $logger->debug("Connecting to bus: @connect_args");
+
+ unless ($self->redis(Redis->new(@connect_args))) {
+ throw OpenSRF::EX::Jabber("Could not connect to Redis bus with @connect_args");
+ return 0;
+ }
+
+ unless ($self->redis->auth($username, $password) eq 'OK') {
+ throw OpenSRF::EX::Jabber("Cannot authenticate with Redis instance user=$username");
+ return 0;
+ }
+
+ $logger->debug("Auth'ed with Redis as $username OK : stream_name=$stream_name");
+
+ eval {
+ # This gets mad when a stream / group already exists, but
+ # Listeners share a stream/group name so dupes are possible.
+
+ $self->redis->xgroup(
+ 'create',
+ $stream_name, # stream name
+ $stream_name, # group name
+ '$', # only receive new messages
+ 'mkstream' # create this stream if it's not there.
+ );
+ };
+
+ if ($@) {
+ $logger->info("XGROUP CREATE returned : $@");
+ }
+
+ $self->stream_name($stream_name);
+ $self->consumer_name($consumer_name);
+ $self->max_queue_size($max_queue_size);
+
+ return $self;
+}
+
+sub max_queue_size {
+ my ($self, $max_queue_size) = @_;
+ $self->{max_queue_size} = $max_queue_size if $max_queue_size;
+ return $self->{max_queue_size};
+}
+
+sub stream_name {
+ my ($self, $stream_name) = @_;
+ $self->{stream_name} = $stream_name if $stream_name;
+ return $self->{stream_name};
+}
+
+sub consumer_name {
+ my ($self, $consumer_name) = @_;
+ $self->{consumer_name} = $consumer_name if $consumer_name;
+ return $self->{consumer_name};
+}
+
+
+sub construct {
+ my ($class, $app, $context) = @_;
+ $class->peer_handle($class->new($app, $context)->initialize);
+}
+
+sub send {
+ my $self = shift;
+ my $msg = OpenSRF::Transport::Redis::Message->new(@_);
+
+ $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body));
+
+ $msg->osrf_xid($logger->get_osrf_xid);
+ $msg->from($self->stream_name);
+
+ my $msg_json = $msg->to_json;
+
+ $logger->internal("send(): to=" . $msg->to . " : $msg_json");
+
+ $self->redis->xadd(
+ $msg->to, # recipient == stream name
+ 'NOMKSTREAM',
+ 'MAXLEN',
+ '~', # maxlen-ish
+ $self->max_queue_size,
+ '*', # let Redis generate the ID
+ 'message', # gotta call it something
+ $msg_json
+ );
+}
+
+
+
+
+sub process {
+ my ($self, $timeout) = @_;
+
+ $timeout ||= 0;
+
+ # Redis does not support fractional timeouts.
+ $timeout = 1 if ($timeout > 0 && $timeout < 1);
+
+ $timeout = int($timeout);
+
+ unless ($self->redis) {
+ throw OpenSRF::EX::JabberDisconnected
+ ("This Redis instance is no longer connected to the server ");
+ }
+
+ return $self->recv($timeout);
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+sub recv {
+ my ($self, $timeout) = @_;
+
+ $logger->debug("server: watching for content at " . $self->stream_name);
+
+ my @block;
+ if ($timeout) {
+ # 0 means block indefinitely in Redis
+ $timeout = 0 if $timeout == -1;
+ $timeout *= 1000; # milliseconds
+ @block = (BLOCK => $timeout);
+ }
+
+ my $packet = $self->redis->xreadgroup(
+ GROUP => $self->stream_name,
+ $self->consumer_name,
+ @block,
+ COUNT => 1,
+ STREAMS => $self->stream_name,
+ '>' # new messages only
+ );
+
+ # Timed out waiting for data.
+ return undef unless defined $packet;
+
+ # TODO make this more self-documenting. also, too brittle?
+ my $container = $packet->[0]->[1]->[0];
+ my $bus_id = $container->[0];
+ my $json = $container->[1]->[1];
+
+ $logger->internal("recv() $json");
+
+ # TODO putting this here for now -- it may live somewhere else.
+ # Ideally this could happen out of band.
+ # Note if we don't ACK utnil after successfully processing each
+ # message, a malformed message will stay in the pending list.
+ # Consider options.
+ $self->redis->xack($self->stream_name, $self->stream_name, $bus_id);
+
+ my $msg = OpenSRF::Transport::Redis::Message->new(json => $json);
+ $msg->bus_id($bus_id);
+
+ return undef unless $msg;
+
+ $logger->internal("recv() thread=" . $msg->thread);
+
+ # The message body is doubly encoded as JSON to, among other things,
+ # support message chunking.
+ $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body));
+
+ return $msg;
+}
+
+
+sub flush_socket {
+ my $self = shift;
+ # Remove all messages from the stream
+ $self->redis->xtrim($self->stream_name, 'MAXLEN', 0);
+ return 1;
+}
+
+1;
+
+
--- /dev/null
+package OpenSRF::Transport::Redis::Message;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+
+sub new {
+ my ($class, %args) = @_;
+ my $self = bless({}, $class);
+
+ if ($args{json}) {
+ $self->from_json($args{json});
+
+ } else {
+ $self->{to} = $args{to} || '';
+ $self->{from} = $args{from} || '';
+ $self->{thread} = $args{thread} || '';
+ $self->{body} = $args{body} || '';
+ $self->{osrf_xid} = $args{osrf_xid} || '';
+ $self->{bus_id} = $args{bus_id} || '';
+ }
+
+ return $self;
+}
+
+sub to {
+ my($self, $to) = @_;
+ $self->{to} = $to if defined $to;
+ return $self->{to};
+}
+sub from {
+ my($self, $from) = @_;
+ $self->{from} = $from if defined $from;
+ return $self->{from};
+}
+sub thread {
+ my($self, $thread) = @_;
+ $self->{thread} = $thread if defined $thread;
+ return $self->{thread};
+}
+sub body {
+ my($self, $body) = @_;
+ $self->{body} = $body if defined $body;
+ return $self->{body};
+}
+
+sub status {
+ my($self, $status) = @_;
+ $self->{status} = $status if defined $status;
+ return $self->{status};
+}
+sub type {
+ my($self, $type) = @_;
+ $self->{type} = $type if defined $type;
+ return $self->{type};
+}
+
+sub err_type {}
+sub err_code {}
+
+sub osrf_xid {
+ my($self, $osrf_xid) = @_;
+ $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+ return $self->{osrf_xid};
+}
+
+sub bus_id {
+ my($self, $bus_id) = @_;
+ $self->{bus_id} = $bus_id if defined $bus_id;
+ return $self->{bus_id};
+}
+
+sub to_json {
+ my $self = shift;
+
+ # No nead to encode the bus_id in outbound messages since the ID
+ # won't exist yet.
+ return OpenSRF::Utils::JSON->perl2JSON({
+ to => $self->{to},
+ from => $self->{from},
+ osrf_xid => $self->{osrf_xid},
+ thread => $self->{thread},
+ body => $self->{body}
+ });
+}
+
+sub from_json {
+ my $self = shift;
+ my $json = shift;
+ my $hash;
+
+ eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); };
+
+ if ($@) {
+ $logger->error("Redis::Message received invalid JSON: $@ : $json");
+ return undef;
+ }
+
+ $self->{$_} = $hash->{$_} for keys %$hash;
+}
+
+1;
--- /dev/null
+package OpenSRF::Transport::Redis::PeerConnection;
+use strict;
+use base qw/OpenSRF::Transport::Redis::Client/;
+use Digest::MD5 qw(md5_hex);
+use OpenSRF::Utils::Logger qw/$logger/;
+
+our $_singleton_connection;
+
+sub retrieve {
+ my ($class, $app) = @_;
+ return $_singleton_connection;
+}
+
+sub reset {
+ return unless $_singleton_connection;
+ $_singleton_connection->disconnect;
+ $_singleton_connection = undef;
+}
+
+
+sub new {
+ my ($class, $app, $connection_type) = @_;
+
+ my $peer_con = $class->retrieve;
+ return $peer_con if ($peer_con and $peer_con->tcp_connected);
+
+ my $conf = OpenSRF::Utils::Config->current->as_hash;
+
+ $conf = $conf->{connections} or
+ die "No 'connections' block in bootstrap configuration\n";
+
+ $conf = $conf->{$connection_type} or
+ die "No '$connection_type' connection in bootstrap configuration\n";
+
+ $conf = $conf->{message_bus};
+
+ my $port = $conf->{port} || 6379;
+ my $host = $conf->{host} || '127.0.0.1';
+ my $sock = $conf->{sock};
+ my $username = $conf->{username};
+ my $password = $conf->{password};
+ my $maxlen = $conf->{max_queue_size} || 1000;
+
+ my $stream_name = $app eq 'client' ? 'client:' : "client:$app:";
+ $stream_name .= substr(md5_hex($$ . time . rand($$)), 0, 12);
+
+ $logger->debug("PeerConnection::new() ".
+ "using app=$app username=$username stream_name=$stream_name");
+
+ my $self = $class->SUPER::new(
+ host => $host,
+ port => $port,
+ sock => $sock,
+ username => $username,
+ password => $password,
+ stream_name => $stream_name,
+ consumer_name => $stream_name,
+ max_queue_size => $maxlen
+ );
+
+ bless($self, $class);
+
+ $self->app($app);
+
+ return $_singleton_connection = $self;
+}
+
+sub process {
+ my $self = shift;
+ my $val = $self->SUPER::process(@_);
+ return 0 unless $val;
+ return OpenSRF::Transport->handler($self->app, $val);
+}
+
+sub app {
+ my $self = shift;
+ my $app = shift;
+ $self->{app} = $app if $app;
+ return $self->{app};
+}
+
+1;
+
sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; }
-sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; }
-
1;
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use strict; use warnings;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
-
-# ----------------------------------------------------------
-# Legacy wrapper for XMPPMessage
-# ----------------------------------------------------------
-
-sub new {
- my $class = shift;
- my $msg = shift;
- return bless({msg => $msg}, ref($class) || $class);
-}
-
-sub msg {
- my($self, $msg) = @_;
- $self->{msg} = $msg if $msg;
- return $self->{msg};
-}
-
-sub toString {
- return $_[0]->msg->to_xml;
-}
-
-sub get_body {
- return $_[0]->msg->body;
-}
-
-sub get_sess_id {
- return $_[0]->msg->thread;
-}
-
-sub get_msg_type {
- return $_[0]->msg->type;
-}
-
-sub get_remote_id {
- return $_[0]->msg->from;
-}
-
-sub setType {
- $_[0]->msg->type(shift());
-}
-
-sub setTo {
- $_[0]->msg->to(shift());
-}
-
-sub setThread {
- $_[0]->msg->thread(shift());
-}
-
-sub setBody {
- $_[0]->msg->body(shift());
-}
-
-sub set_router_command {
- $_[0]->msg->router_command(shift());
-}
-sub set_router_class {
- $_[0]->msg->router_class(shift());
-}
-
-sub set_osrf_xid {
- $_[0]->msg->osrf_xid(shift());
-}
-
-sub get_osrf_xid {
- return $_[0]->msg->osrf_xid;
-}
-
-1;
use OpenSRF::Utils (':common');
use OpenSRF::Utils::Logger;
use Net::Domain qw/hostfqdn/;
+use XML::Simple;
#use overload '""' => \&OpenSRF::Utils::Config::dump_ini;
$self->mangle_dirs();
$self->mangle_logs();
+ $self->as_hash(XML::Simple->new->XMLin($self->FILE));
+
$OpenSRF::Utils::ConfigCache = $self unless $self->nocache;
delete $$self{nocache};
delete $$self{force};
return $self;
}
+sub as_hash {
+ my ($self, $hash) = @_;
+ $self->{as_hash} = $hash if $hash;
+ return $self->{as_hash};
+}
+
sub sections {
my $self = shift;
my %filters = @_;
sub ALL { return 100; }
my $isclient; # true if we control the osrf_xid
+my $connection_type;
# load up our config options
sub set_config {
my $force = shift;
+ my $con_type = shift;
+ $connection_type = $con_type if $con_type;
return if defined $config and !$force;
+ die "Logger connection type needed\n" unless $connection_type;
+
$config = OpenSRF::Utils::Config->current;
if( !defined($config) ) {
$loglevel = INFO();
return;
}
- $loglevel = $config->bootstrap->loglevel;
+ $config = $config->as_hash->{connections}->{$connection_type};
+
+ $loglevel = $config->{loglevel};
- if ($config->bootstrap->loglength) {
+ if ($config->{loglength}) {
$max_log_msg_len = $config->bootstrap->loglength;
}
- $service_tag = $config->bootstrap->logtag;
+ $service_tag = $config->{logtag};
- $logfile = $config->bootstrap->logfile;
+ $logfile = $config->{logfile};
if($logfile =~ /^syslog/) {
$syslog_enabled = 1;
$logfile_enabled = 0;
- $logfile = $config->bootstrap->syslog;
+ $logfile = $config->{syslog};
$facility = $logfile;
$logfile = undef;
$facility = _fac_to_const($facility);
# --------------------------------------------------------------
$act_syslog_enabled = 1;
$act_logfile_enabled = 0;
- $actfac = $config->bootstrap->actlog || $config->bootstrap->syslog;
+ $actfac = $config->{actlog} || $config->{syslog};
$actfac = _fac_to_const($actfac);
$actfile = undef;
} else {
# --------------------------------------------------------------
$act_syslog_enabled = 0;
$act_logfile_enabled = 1;
- $actfile = $config->bootstrap->actlog || $config->bootstrap->logfile;
+ $actfile = $config->{actlog} || $config->{logfile};
}
- my $client = OpenSRF::Utils::Config->current->bootstrap->client();
+ my $client = $config->{client} || '';
if ($ENV{OSRF_LOG_CLIENT} or $ENV{MOD_PERL}) {
$isclient = 1;
$isclient = 0;
return;
}
+
$isclient = ($client =~ /^true$/iog) ? 1 : 0;
}
snprintf(fbuf, sizeof(fbuf), "%s/.srfsh.xml", home);
if(!access(fbuf, R_OK)) {
- if( ! osrf_system_bootstrap_client(fbuf, "srfsh") ) {
+ if( ! osrf_system_bootstrap_common(fbuf, "srfsh", "srfsh", 0) ) {
fprintf(stderr,"Unable to bootstrap client for requests\n");
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for requests");
return -1;
#include <string.h>
#include <signal.h>
#include <opensrf/utils.h>
+#include <opensrf/osrfConfig.h>
#include <opensrf/osrf_hash.h>
#include <opensrf/transport_client.h>
#include <opensrf/osrf_message.h>
#include <opensrf/osrf_app_session.h>
#include <opensrf/log.h>
+#include <opensrf/string_array.h>
#define MAX_THREAD_SIZE 64
#define RECIP_BUF_SIZE 256
// opportunity, at which point force-close the connection.
#define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
-// Incremented with every REQUEST, decremented with every COMPLETE.
-static int requests_in_flight = 0;
-
// default values, replaced during setup (below) as needed.
static char* config_file = "/openils/conf/opensrf_core.xml";
static char* config_ctxt = "gateway";
-static char* osrf_router = NULL;
-static char* osrf_domain = NULL;
// Cache of opensrf thread strings and back-end receipients.
// Tracking this here means the caller only needs to track the thread.
// It also means we don't have to expose internal XMPP IDs
static osrfHash* stateful_session_cache = NULL;
+
+// Tracks threads that have active requests in flight.
+// This covers all request types regardless of connected-ness.
+static osrfStringArray* active_threads = NULL;
+static osrfStringArray* public_services = NULL;
+
// Message on STDIN go into our reusable buffer
static growing_buffer* stdin_buf = NULL;
// OpenSRF XMPP connection handle
static char* extract_inbound_messages();
static void log_request(const char*, osrfMessage*);
static void read_from_osrf();
-static void read_one_osrf_message(transport_message*);
+static int read_one_osrf_message(transport_message*);
static int shut_it_down(int);
static void release_hash_string(char*, void*);
static int can_shutdown_gracefully();
// (replies returning to the websocket client).
fd_set fds;
int stdin_no = fileno(stdin);
- int osrf_no = osrf_handle->session->sock_id;
- int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+ //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+ int maxfd = stdin_no;
int sel_resp;
int shutdown_stat;
+ struct timeval tv;
while (1) {
FD_ZERO(&fds);
- FD_SET(osrf_no, &fds);
+ // FD_SET(osrf_no, &fds);
FD_SET(stdin_no, &fds);
if (shutdown_requested) {
- struct timeval tv;
tv.tv_usec = 0;
tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS;
} else {
- // Wait indefinitely for activity to process.
- // This will be interrupted during a shutdown request signal.
- sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+ if (active_threads->size > 0) {
+ tv.tv_usec = 0;
+ tv.tv_sec = 0;
+
+ // Do a non-blocking check for inbound requests while
+ // we wait for more osrf data to be returned.
+ sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
+
+ } else {
+
+ // No osrf responses pending. Wait indefinitely.
+ // This will be interrupted during a shutdown request signal.
+ sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+ }
}
if (sel_resp < 0) { // error
"WS select() failed with [%s]. Exiting", strerror(errno));
shut_it_down(1);
- }
- if (sel_resp > 0) {
+ } else if (sel_resp > 0) {
if (FD_ISSET(stdin_no, &fds)) {
read_from_stdin();
- }
-
- if (FD_ISSET(osrf_no, &fds)) {
read_from_osrf();
}
+
+ } else if (active_threads->size > 0) {
+ // Nothing pulled from the websocket, but we still have
+ // active osrf request. See if any new responses have arrived.
+ read_from_osrf();
}
if (shutdown_requested) {
return -1;
}
- unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
- if (active_sessions == 0 && requests_in_flight == 0) {
+ if (active_threads->size == 0) {
osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
return 1;
}
osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with "
- "sessions=%d requests=%d", active_sessions, requests_in_flight);
+ "active threeds=%d", active_threads);
return 0;
}
static int shut_it_down(int stat) {
osrfHashFree(stateful_session_cache);
+ osrfStringArrayFree(active_threads);
+ osrfStringArrayFree(public_services);
buffer_free(stdin_buf);
osrf_system_shutdown(); // clean XMPP disconnect
exit(stat);
config_file = argv[1];
}
- if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
+ if (!osrf_system_bootstrap_common(config_file, config_ctxt, "websocket", 0) ) {
fprintf(stderr, "Cannot boostrap OSRF\n");
shut_it_down(1);
}
- osrf_handle = osrfSystemGetTransportClient();
- osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
-
- osrf_router = osrfConfigGetValue(NULL, "/router_name");
- osrf_domain = osrfConfigGetValue(NULL, "/domain");
+ osrf_handle = osrfSystemGetTransportClient();
+ osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
stateful_session_cache = osrfNewHash();
osrfHashSetCallback(stateful_session_cache, release_hash_string);
+ active_threads = osrfNewStringArray(16);
+ public_services = osrfNewStringArray(16);
+
+ osrfConfigGetValueList(NULL, public_services, "/config/public_services/service");
+
client_ip = getenv("REMOTE_ADDR");
osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
}
if (!recipient) {
if (service) {
- int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
- "%s@%s/%s", osrf_router, osrf_domain, service);
- recipient_buf[size] = '\0';
+ size_t len = 9 + strlen(service); // service:$name
+ snprintf(recipient_buf, len, "service:%s", service);
recipient = recipient_buf;
+ if (!osrfStringArrayContains(public_services, service)) {
+ osrfLogWarning(OSRF_LOG_MARK,
+ "Request for private or unknown service '%s' forbidden", service);
+ return;
+ }
+
} else {
osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
return;
switch (msg->m_type) {
case CONNECT:
+ if (!osrfStringArrayContains(active_threads, thread)) {
+ osrfStringArrayAdd(active_threads, thread);
+ }
break;
case REQUEST:
log_request(service, msg);
- requests_in_flight++;
+ if (!osrfStringArrayContains(active_threads, thread)) {
+ osrfStringArrayAdd(active_threads, thread);
+ }
break;
case DISCONNECT:
transport_message* tmsg = NULL;
// Double check the socket connection before continuing.
- if (!client_connected(osrf_handle) ||
- !socket_connected(osrf_handle->session->sock_id)) {
+ if (!client_connected(osrf_handle)) {
osrfLogWarning(OSRF_LOG_MARK,
"WS: Jabber socket disconnected, exiting");
shut_it_down(1);
}
+
// Once client_recv is called all data waiting on the socket is
// read. This means we can't return to the main select() loop after
// each message, because any subsequent messages will get stuck in
// the opensrf receive queue. Process all available messages.
- while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+
+
+ // As long as any active requests are in flight, wait up to one
+ // second to receive a response. Then return to inspect stdin
+ // to see if there are any requests waiting we can push through.
+ // Then come back here.
+ while (1) {
+ int timeout = active_threads->size > 0 ? 1 : 0;
+
+ tmsg = client_recv(osrf_handle, timeout);
+
+ if (!tmsg) { break; }
+
read_one_osrf_message(tmsg);
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS relaying message to STDOUT thread=%s, recipient=%s",
+ tmsg->thread, tmsg->recipient);
+
message_free(tmsg);
}
}
// Process a single OpenSRF response message and print the reponse
// to STDOUT for delivery to the websocket client.
-static void read_one_osrf_message(transport_message* tmsg) {
+static int read_one_osrf_message(transport_message* tmsg) {
osrfList *msg_list = NULL;
osrfMessage *one_msg = NULL;
int i;
+ int complete = 0;
osrfLogDebug(OSRF_LOG_MARK,
"WS received opensrf response for thread=%s", tmsg->thread);
} else {
- // connection timed out; clear the cached recipient
- if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+ // Any error conditions ends the conversation
+ if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) {
osrfHashRemove(stateful_session_cache, tmsg->thread);
+ osrfStringArrayRemove(active_threads, tmsg->thread);
} else {
if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
- requests_in_flight--;
+ osrfLogInternal(OSRF_LOG_MARK,
+ "WS Marking request complete for thread %s", tmsg->thread);
+ osrfStringArrayRemove(active_threads, tmsg->thread);
}
}
}
free(msg_string);
jsonObjectFree(msg_wrapper);
+
+ return complete;
}