From a0aeb95580454aeec05c0961d617ee23bb9a76dd Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 27 Dec 2021 15:58:13 -0500 Subject: [PATCH] LPXXX Replace XMPP with Redis (C & Perl) * Use Redis streams / consumer groups * Includes modified opensrf_core.xml example file * adds message bus management to opensrf-perl.pl * Adds public services filter to gateway / websocket translator * Readme for install TODO * Modify install makefiles * Modify base INSTALL docs * Trim opensrf_core.xml.example to avoid refs to Evergreen services and move the current one to Evergreen. * Consider if/how/when we want to use redis NOACK / XACK. * add logfile support back to opensrf_core.xml.exmple * more, i'm sure Signed-off-by: Bill Erickson --- .gitignore | 1 + Makefile.am | 3 +- README_REDIS.md | 62 +++ bin/opensrf-perl.pl.in | 93 +++- configure.ac | 9 +- examples/opensrf_core.xml.example | 265 ++++----- include/opensrf/osrf_system.h | 12 +- include/opensrf/transport_client.h | 27 +- include/opensrf/transport_message.h | 3 + src/Makefile.am | 2 +- src/extras/timer.pl | 273 ++++++++++ src/gateway/Makefile.am | 2 +- src/gateway/osrf_http_translator.c | 2 +- src/gateway/osrf_json_gateway.c | 9 +- src/libopensrf/Makefile.am | 1 - src/libopensrf/osrf_app_session.c | 10 +- src/libopensrf/osrf_application.c | 2 +- src/libopensrf/osrf_prefork.c | 39 +- src/libopensrf/osrf_system.c | 111 ++-- src/libopensrf/transport_client.c | 590 ++++++++++++--------- src/libopensrf/transport_message.c | 87 +++ src/perl/MANIFEST | 4 +- src/perl/lib/OpenSRF/AppSession.pm | 17 +- src/perl/lib/OpenSRF/Application.pm | 10 +- src/perl/lib/OpenSRF/Server.pm | 52 +- src/perl/lib/OpenSRF/System.pm | 18 +- src/perl/lib/OpenSRF/Transport.pm | 26 +- src/perl/lib/OpenSRF/Transport/Redis/Client.pm | 256 +++++++++ src/perl/lib/OpenSRF/Transport/Redis/Message.pm | 103 ++++ .../lib/OpenSRF/Transport/Redis/PeerConnection.pm | 83 +++ src/perl/lib/OpenSRF/Transport/SlimJabber.pm | 2 - .../OpenSRF/Transport/SlimJabber/MessageWrapper.pm | 72 --- src/perl/lib/OpenSRF/Utils/Config.pm | 9 + src/perl/lib/OpenSRF/Utils/Logger.pm | 24 +- src/srfsh/srfsh.c | 2 +- src/websocket-stdio/osrf-websocket-stdio.c | 127 +++-- 36 files changed, 1689 insertions(+), 719 deletions(-) create mode 100644 README_REDIS.md create mode 100755 src/extras/timer.pl create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/Client.pm create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/Message.pm create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm diff --git a/.gitignore b/.gitignore index 181b1d1..b8915ef 100644 --- a/.gitignore +++ b/.gitignore @@ -1,3 +1,4 @@ +test-driver aclocal.m4 autom4te.cache/ bin/opensrf-perl.pl diff --git a/Makefile.am b/Makefile.am index bf1effe..4b09ef6 100644 --- a/Makefile.am +++ b/Makefile.am @@ -19,11 +19,12 @@ endif export PREFIX = @prefix@ export TMP = @TMP@ export LIBXML2_HEADERS = @LIBXML2_HEADERS@ +export HIREDIS_HEADERS = @HIREDIS_HEADERS@ export APR_HEADERS = @APR_HEADERS@ export ETCDIR = @sysconfdir@ export APXS2 = @APXS2@ export APACHE2_HEADERS = @APACHE2_HEADERS@ -export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@ +export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@ export DEF_LDLIBS = -lopensrf export VAR = @localstatedir@ export PID = @localstatedir@/run/opensrf diff --git a/README_REDIS.md b/README_REDIS.md new file mode 100644 index 0000000..e59c0f5 --- /dev/null +++ b/README_REDIS.md @@ -0,0 +1,62 @@ +# OpenSRF-Over-Redis + +Proof of concept project to replace XMPP / Ejabberd with Redis as the +OpenSRF message transport layer. + +## Install + +### Install Redis version 6.x for ACL support. + +NOTE: Redis v6 is the default version in Ubuntu 22.04 + +```sh + +curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg + +echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" \ + | sudo tee /etc/apt/sources.list.d/redis.list + +sudo apt update +sudo apt install redis-server libredis-perl libhiredis-dev + +``` + +### Disable Redis Snapshot Disk Persistence + +Optional but recommended since disk persistence adds unnecessary overhead. + +#### Edit /etc/redis/redis.conf and un-comment the # save "" line: + +```conf +# Snapshotting can be completely disabled with a single empty string argument +# as in following example: +# +save "" +# +``` + +#### Restart Redis + +```sh +sudo systemctl restart redis +``` + +### Install OpenSRF Config and Initialize/Reset Message Bus + +```sh +# mileage may vary on these commands + +sudo su opensrf + +# Backup the original config +mv /openils/conf/opensrf_core.xml /openils/conf/opensrf_core.xml.orig + +# From the OpenSRF repository root directory. +# Copy and modify the new config as needed. +# TODO move this example config into Evergreen since it references EG services. +cp examples/opensrf_core.xml.example /openils/conf/opensrf_core.xml + +# reset/init message bus +osrf_control -l --reset-message-bus + +``` diff --git a/bin/opensrf-perl.pl.in b/bin/opensrf-perl.pl.in index b2bced6..afb7aa8 100755 --- a/bin/opensrf-perl.pl.in +++ b/bin/opensrf-perl.pl.in @@ -25,6 +25,7 @@ use OpenSRF::Utils::SettingsClient; use OpenSRF::Transport::Listener; use OpenSRF::Utils; use OpenSRF::Utils::Config; +use Redis; my $opt_service = undef; my $opt_config = "@CONF_DIR@/opensrf_core.xml"; @@ -62,6 +63,7 @@ my $opt_reload_all = 0; my $opt_quiet = 0; my $opt_diagnostic = 0; my $opt_ignore_orphans = 0; +my $opt_reset_message_bus = 0; my $sclient; my @perl_services; my @nonperl_services; @@ -104,6 +106,7 @@ GetOptions( 'reload' => \$opt_reload, 'reload-all' => \$opt_reload_all, 'diagnostic' => \$opt_diagnostic, + 'reset-message-bus' => \$opt_reset_message_bus, 'ignore-orphans' => \$opt_ignore_orphans ); @@ -112,7 +115,7 @@ if ($opt_localhost) { $ENV{OSRF_HOSTNAME} = $hostname; } -my $C_COMMAND = "opensrf-c -c $opt_config -x opensrf -p $opt_pid_dir -h $hostname"; +my $C_COMMAND = "opensrf-c -c $opt_config -x service -p $opt_pid_dir -h $hostname"; sub verify_services { my $service = shift; @@ -295,6 +298,7 @@ sub do_diagnostic { sub do_start_router { + return; my $pidfile = get_pid_file('router'); `opensrf_router $opt_config routers $pidfile`; @@ -412,7 +416,7 @@ sub do_start { sub do_start_all { msg("starting router and services for $hostname"); - do_start('router'); + #do_start('router'); return do_start_services(); } @@ -569,7 +573,7 @@ sub do_stop_all { # graceful shutdown requires the presence of the router, so stop the # router last. See if it's running first to avoid unnecessary warnings. - do_stop('router', $signals[0]) if get_service_pids_from_file('router'); + #do_stop('router', $signals[0]) if get_service_pids_from_file('router'); return 1; } @@ -599,7 +603,7 @@ sub do_daemon { # parses the local settings file sub load_settings { my $conf = OpenSRF::Utils::Config->current; - my $cfile = $conf->bootstrap->settings_config; + my $cfile = $conf->as_hash->{settings_config}; return unless $cfile; my $parser = OpenSRF::Utils::SettingsParser->new(); $parser->initialize( $cfile ); @@ -607,6 +611,80 @@ sub load_settings { $parser->get_server_config($conf->env->hostname); } +# Clear the service: and client: queues of lingering messages. Apply +# bus access and permissions for accounts defined in the +# block. This will generally be called before we connect to the system, +# since the system may not yet have access to connect to the bus. +sub do_reset_message_bus { + + my $base_conf = + OpenSRF::Utils::Config->load(config_file => $opt_config, nocache => 1)->as_hash + or die "No suitable configuration found at $opt_config\n"; + + my $connections = $base_conf->{connections} or + die "No connection configuration found in $opt_config\n"; + + my $redis; + + for my $type (keys %$connections) { + my $conf = $connections->{$type}; + my $bus_conf = $conf->{message_bus}; + + if (!$redis) { + # Though connection details may vary by connection, we + # assume all connections in a single bootstratp config + # ultimately point to the same Redis instance. + my $host = $bus_conf->{host}; + my $port = $bus_conf->{port}; + my $sock = $bus_conf->{sock}; + + # This redis connection uses the "default" account, which has + # access to all actions and keys so it can manage access for + # various opensrf account types. + my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port"); + + $redis = Redis->new(@connect_args) or + die "Cannot connect to Redis instance at @connect_args\n"; + + # Clear all the data + msg("Clearing all data from message bus: @connect_args"); + $redis->flushall; + } + + msg("Applying bus access for connection type: $type"); + + my $username = $bus_conf->{username}; + my $password = $bus_conf->{password}; + + $redis->acl('SETUSER', $username, 'reset'); + $redis->acl('SETUSER', $username, 'on', ">$password"); + + # Privileged clients can post message to all services and perform + # delete operations to clear their own client: enpoints from + # (i.e. flush socket). + my $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:* ~service:*'; + + if (!$conf->{privileged}) { + msg("Limiting bus access for non-privileged connection type=$type"); + + # Unprivileged connections have access toa subset of the + # available (i.e. public) services. + $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:*'; + + # Non-privileged connections can only interact with public + # service bus endpoints. + for my $service (@{$base_conf->{public_services}->{service}}) { + $redis->acl('SETUSER', $username, "~service:$service"); + } + } + + $redis->acl('SETUSER', $username, split(/ /, $permissions)); + + } + + $redis->quit; +} + sub msg { my $m = shift; print "* $m\n" unless $opt_quiet; @@ -753,10 +831,16 @@ sub do_help { and gracefully re-launch drone processes. The -all variant sends the signal to all services. The non-(-all) variant requires a --service. + + --reset-message-bus + Clear ALL data from the message bus, create opensrf accounts w/ + permission to access message queues. HELP exit; } +do_reset_message_bus() if $opt_reset_message_bus; + # we do not verify services for stop/signal actions, since those may # legitimately be used against services not (or no longer) configured # to run on the selected host. @@ -808,6 +892,7 @@ do_diagnostic() if $opt_diagnostic; # show help if no action was requested do_help() if $opt_help or not ( + $opt_reset_message_bus or $opt_start or $opt_start_all or $opt_start_services or diff --git a/configure.ac b/configure.ac index 21963b0..590cb99 100644 --- a/configure.ac +++ b/configure.ac @@ -208,6 +208,12 @@ AC_ARG_WITH([libxml], [LIBXML2_HEADERS=/usr/include/libxml2/]) AC_SUBST([LIBXML2_HEADERS]) +AC_ARG_WITH([hiredis], +[ --with-hiredis=path location of the hiredis headers (default is /usr/include/hiredis/))], +[HIREDIS_HEADERS=${withval}], +[HIREDIS_HEADERS=/usr/include/hiredis/]) +AC_SUBST([HIREDIS_HEADERS]) + AC_ARG_WITH([includes], [ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)], [EXTRA_USER_INCLUDES=${withval}]) @@ -274,6 +280,7 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers)) AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers)) AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers)) + AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis)) # Check for libmemcached and set flags accordingly PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0) AC_SUBST(memcached_CFLAGS) @@ -326,7 +333,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then src/libopensrf/Makefile src/perl/Makefile src/ports/strn_compat/Makefile - src/router/Makefile src/srfsh/Makefile src/websocket-stdio/Makefile tests/Makefile @@ -346,5 +352,6 @@ AC_MSG_RESULT([--------------------- Configuration options: ------------------- AC_MSG_RESULT(APR headers location: ${APR_HEADERS}) AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION}) AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS}) + AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS}) AC_MSG_RESULT([----------------------------------------------------------------------]) diff --git a/examples/opensrf_core.xml.example b/examples/opensrf_core.xml.example index 8c99cf8..c6db0c5 100644 --- a/examples/opensrf_core.xml.example +++ b/examples/opensrf_core.xml.example @@ -1,181 +1,94 @@ - - - - - - - - - - - - router - public.localhost - - opensrf.math - - - - - - router - private.localhost - - - - - - private.localhost - opensrf - password - 5222 - - router - - - 1800000 - - - - LOCALSTATEDIR/log/osrfsys.log - - - - - - - - 3 - - - 1536 - - - SYSCONFDIR/opensrf.xml - - - - - - - - true - - - router - - - - public.localhost - opensrf - password - 5222 - LOCALSTATEDIR/log/gateway.log - 3 - - - - - - - - - - - - - - - - - - private.localhost - - private.localhost - public.localhost - - - public.localhost - 5222 - LOCALSTATEDIR/sock/unix_sock - router - password - router - 10 - 5 - - LOCALSTATEDIR/log/router.log - - - 2 - - - - private.localhost - - private.localhost - - - private.localhost - 5222 - router - password - router - 10 - 5 - - LOCALSTATEDIR/log/router.log - - - 4 - - - - - - - - - - - + + + + + true + + + 127.0.0.1 + 6379 + opensrf@private + password + + + 1000 + + + + syslog + local0 + local1 + 3 + + + + + 127.0.0.1 + 6379 + opensrf@public + password + + true + syslog + local6 + local1 + 3 + + + + /openils/conf/opensrf.xml + + + opensrf.math + open-ils.actor + open-ils.acq + open-ils.auth + open-ils.auth_proxy + open-ils.booking + open-ils.cat + open-ils.circ + open-ils.collections + open-ils.courses + open-ils.curbside + open-ils.fielder + open-ils.pcrud + open-ils.permacrud + open-ils.reporter + open-ils.resolver + open-ils.search + open-ils.supercat + open-ils.url_verify + open-ils.vandelay + open-ils.serial + open-ils.ebook_api + + + + open-ils.auth.authenticate.verify + open-ils.auth.authenticate.complete + open-ils.auth.login + open-ils.auth_proxy.login + open-ils.actor.patron.password_reset.commit + open-ils.actor.user.password + open-ils.actor.user.username + open-ils.actor.user.email + open-ils.actor.patron.update + open-ils.cstore.direct.actor.user.create + open-ils.cstore.direct.actor.user.update + open-ils.cstore.direct.actor.user.delete + open-ils.search.z3950.apply_credentials + open-ils.geo + open-ils.actor.geo + diff --git a/include/opensrf/osrf_system.h b/include/opensrf/osrf_system.h index 18d8c85..cab0f8c 100644 --- a/include/opensrf/osrf_system.h +++ b/include/opensrf/osrf_system.h @@ -19,10 +19,16 @@ extern "C" { void osrfSystemSetPidFile( const char* name ); -int osrf_system_bootstrap_client( char* config_file, char* contextnode ); +int osrf_system_bootstrap_common(const char* config_file, + const char* contextnode, const char* appname, int is_service); -int osrfSystemBootstrapClientResc( const char* config_file, - const char* contextnode, const char* resource ); +int osrf_system_bootstrap_client(const char* config_file, const char* contextnode); + +int osrf_system_bootstrap_service( + const char* config_file, const char* contextnode, const char* appname); + +int osrfSystemBootstrapClientResc(const char* config_file, + const char* contextnode, const char* appname); int osrfSystemBootstrap( const char* hostname, const char* configfile, const char* contextNode ); diff --git a/include/opensrf/transport_client.h b/include/opensrf/transport_client.h index 4307af9..2c7a8f8 100644 --- a/include/opensrf/transport_client.h +++ b/include/opensrf/transport_client.h @@ -10,6 +10,7 @@ */ #include +#include #include #include #include @@ -29,18 +30,34 @@ struct message_list_struct; struct transport_client_struct { transport_message* msg_q_head; /**< Head of message queue */ transport_message* msg_q_tail; /**< Tail of message queue */ - transport_session* session; /**< Manages lower-level message processing */ + redisContext* bus; + + // Our communication stream. + // This will be unique for all connections except service-level + // (Listener) connections. + char* stream_name; + + // Our unique name. + // Will match the unique stream_name for non-service-level connections. + char* consumer_name; + + int max_queue_size; + + int port; + char* unix_path; int error; /**< Boolean: true if an error has occurred */ char* host; /**< Domain name or IP address of the Jabber server */ char* xmpp_id; /**< Jabber ID used for outgoing messages */ }; typedef struct transport_client_struct transport_client; -transport_client* client_init( const char* server, int port, const char* unix_path, int component ); +transport_client* client_init( const char* server, int port, const char* unix_path ); -int client_connect( transport_client* client, - const char* username, const char* password, const char* resource, - int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ); +int client_connect_with_stream_name(transport_client* client, const char* username, const char* password); +int client_connect_as_service(transport_client* client, + const char* appname, const char* username, const char* password); +int client_connect(transport_client* client, + const char* appname, const char* username, const char* password); int client_disconnect( transport_client* client ); diff --git a/include/opensrf/transport_message.h b/include/opensrf/transport_message.h index fb43f92..65a8447 100644 --- a/include/opensrf/transport_message.h +++ b/include/opensrf/transport_message.h @@ -54,6 +54,7 @@ struct transport_message_struct { int error_code; /**< Value of the "code" attribute of <error>. */ int broadcast; /**< Value of the "broadcast" attribute in the message element. */ char* msg_xml; /**< The entire message as XML, complete with entity encoding. */ + char* msg_json; /**< The entire message as JSON*/ struct transport_message_struct* next; }; typedef struct transport_message_struct transport_message; @@ -62,6 +63,7 @@ transport_message* message_init( const char* body, const char* subject, const char* thread, const char* recipient, const char* sender ); transport_message* new_message_from_xml( const char* msg_xml ); +transport_message* new_message_from_json( const char* msg_json ); void message_set_router_info( transport_message* msg, const char* router_from, const char* router_to, const char* router_class, const char* router_command, @@ -70,6 +72,7 @@ void message_set_router_info( transport_message* msg, const char* router_from, void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ); int message_prepare_xml( transport_message* msg ); +int message_prepare_json( transport_message* msg ); int message_free( transport_message* msg ); diff --git a/src/Makefile.am b/src/Makefile.am index 0f90f2c..370b6b2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -31,7 +31,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas endif if BUILDCORE -MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio +MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl bin_SCRIPTS = @top_srcdir@/bin/osrf_config dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example diff --git a/src/extras/timer.pl b/src/extras/timer.pl new file mode 100755 index 0000000..6dedff4 --- /dev/null +++ b/src/extras/timer.pl @@ -0,0 +1,273 @@ +#!/usr/bin/perl +use strict; +use warnings; +use Getopt::Long; +use OpenSRF::Utils::Logger q/$logger/; +use OpenSRF::AppSession; +use OpenSRF::Application; +use Time::HiRes qw/time/; + +# Testing with Evergreen storage service by default. +# I tried using opensrf.settings but for reasons I didn't investigate +# hitting opensrf.settings with lots of requests lead to failures. +#my $test_service = "open-ils.storage"; +#my $test_service = "opensrf.settings"; +my $test_service = "open-ils.cstore"; + +my $iterations = 50; +my $parallel = 1; + +my $small_echo_data = <); +my $med_echo_data = substr($large_echo_data, length($large_echo_data) / 2); + +my $osrf_config = '/openils/conf/opensrf_core.xml'; +my $ops = GetOptions('osrf-config=s' => \$osrf_config); + +sub echoloop { + my $data = shift; + my $ses = shift || OpenSRF::AppSession->create($test_service); + my $connected = shift || 0; + + my $start = time; + for (0..$iterations) { + my $resp = $ses->request('opensrf.system.echo', "$data$_")->gather(1); + + if ($resp eq "$data$_") { + print "+"; + } else { + warn "Got bad data: $resp\n"; + } + } + + my $dur = time - $start; + my $avg = $dur / $iterations; + + if ($parallel == 1) { + print sprintf( + " Connected=$connected Size=%d\tTotal Duration: %0.3f\tAvg Duration: %0.4f\n", + length($data), $dur, $avg + ); + } +} + +sub do_the_stuff { + + echoloop($small_echo_data); + echoloop($med_echo_data); + echoloop($large_echo_data); + + # Connected sessions + + my $ses = OpenSRF::AppSession->create($test_service); + + $ses->connect; + echoloop($small_echo_data, $ses, 1); + $ses->disconnect; + + $ses->connect; + echoloop($med_echo_data, $ses, 1); + $ses->disconnect; + + $ses->connect; + echoloop($large_echo_data, $ses, 1); + $ses->disconnect; +} + +for (1..$parallel) { + + if (fork() == 0) { + OpenSRF::System->bootstrap_client(config_file => $osrf_config); + do_the_stuff(); + exit; + } +} + +while (wait > 0) {} +print "\n"; + +__DATA__ +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi +asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi diff --git a/src/gateway/Makefile.am b/src/gateway/Makefile.am index 666fb06..eeffc75 100644 --- a/src/gateway/Makefile.am +++ b/src/gateway/Makefile.am @@ -18,7 +18,7 @@ endif EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \ @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c -AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) +AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR` diff --git a/src/gateway/osrf_http_translator.c b/src/gateway/osrf_http_translator.c index ec8a685..6b59307 100644 --- a/src/gateway/osrf_http_translator.c +++ b/src/gateway/osrf_http_translator.c @@ -517,7 +517,7 @@ static apr_status_t childExit(void* data) { #endif static void childInit(apr_pool_t *p, server_rec *s) { - if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) { + if(!osrf_system_bootstrap_common(configFile, configCtx, "translator", 0)) { ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, "Unable to Bootstrap OpenSRF Client with config %s..", configFile); return; diff --git a/src/gateway/osrf_json_gateway.c b/src/gateway/osrf_json_gateway.c index 783ebc9..a65a3bd 100644 --- a/src/gateway/osrf_json_gateway.c +++ b/src/gateway/osrf_json_gateway.c @@ -31,6 +31,7 @@ char* osrf_json_gateway_config_file = NULL; int bootstrapped = 0; int numserved = 0; osrfStringArray* allowedOrigins = NULL; +static osrfStringArray* public_services = NULL; static const char* osrf_json_gateway_set_default_locale(cmd_parms *parms, void *config, const char *arg) { @@ -82,12 +83,15 @@ static void osrf_json_gateway_child_init(apr_pool_t *p, server_rec *s) { int t = time(NULL); snprintf(buf, sizeof(buf), "%d", t); - if( ! osrfSystemBootstrapClientResc( cfg, CONFIG_CONTEXT, buf ) ) { + if( ! osrf_system_bootstrap_common( cfg, CONFIG_CONTEXT, buf, 0 ) ) { ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, "Unable to Bootstrap OpenSRF Client with config %s..", cfg); return; } + public_services = osrfNewStringArray(16); + osrfConfigGetValueList(NULL, public_services, "/config/public_services/service"); + allowedOrigins = osrfNewStringArray(4); osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin"); @@ -222,7 +226,8 @@ static int osrf_json_gateway_method_handler (request_rec *r) { /* ----------------------------------------------------------------- */ - if(!(service && method)) { + if(!(service && method) || + (service && !osrfStringArrayContains(public_services, service))) { osrfLogError(OSRF_LOG_MARK, "Service [%s] not found or not allowed", service); diff --git a/src/libopensrf/Makefile.am b/src/libopensrf/Makefile.am index fd3729b..49e2ecd 100644 --- a/src/libopensrf/Makefile.am +++ b/src/libopensrf/Makefile.am @@ -29,7 +29,6 @@ TARGS = osrf_message.c \ osrfConfig.c \ osrf_application.c \ osrf_cache.c \ - osrf_transgroup.c \ osrf_list.c \ osrf_hash.c \ osrf_utf8.c \ diff --git a/src/libopensrf/osrf_app_session.c b/src/libopensrf/osrf_app_session.c index df83e3e..9befeaf 100644 --- a/src/libopensrf/osrf_app_session.c +++ b/src/libopensrf/osrf_app_session.c @@ -541,6 +541,8 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) { return NULL; } + /* + // Get a list of domain names from the config settings; // ignore all but the first one in the list. osrfStringArray* arr = osrfNewStringArray(8); @@ -579,8 +581,14 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) { free( session ); return NULL; } + */ + + growing_buffer *buf = buffer_init(32); + buffer_add(buf, "service:"); + buffer_add(buf, remote_service); - session->remote_id = strdup(target_buf); + session->remote_id = buffer_release(buf); + //session->remote_id = strdup(remote_service); session->orig_remote_id = strdup(session->remote_id); session->remote_service = strdup(remote_service); session->session_locale = NULL; diff --git a/src/libopensrf/osrf_application.c b/src/libopensrf/osrf_application.c index b8cb7c1..47dc5d4 100644 --- a/src/libopensrf/osrf_application.c +++ b/src/libopensrf/osrf_application.c @@ -331,7 +331,7 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName, return -1; } - osrfLogDebug( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName ); + osrfLogInternal( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName ); // Extract the only valid option bits, and ignore the rest. int opts = options & ( OSRF_METHOD_STREAMING | OSRF_METHOD_CACHABLE ); diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index 845a64e..6d6b5ca 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -183,16 +183,16 @@ int osrf_prefork_run( const char* appname ) { free( max_backlog_queue ); /* --------------------------------------------------- */ - char* resc = va_list_to_string( "%s_listener", appname ); + //char* resc = va_list_to_string( "%s_listener", appname ); // Make sure that we haven't already booted - if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) { + if( !osrf_system_bootstrap_common(NULL, "service", appname, 1)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" ); - free( resc ); + //free( resc ); return -1; } - free( resc ); + //free( resc ); prefork_simple forker; @@ -211,7 +211,7 @@ int osrf_prefork_run( const char* appname ) { prefork_launch_children( &forker ); // Tell the router that you're open for business. - osrf_prefork_register_routers( appname, false ); + //osrf_prefork_register_routers( appname, false ); signal( SIGUSR1, sigusr1_handler); signal( SIGUSR2, sigusr2_handler); @@ -372,7 +372,7 @@ static int prefork_child_init_hook( prefork_child* child ) { // Connect to cache server(s). osrfSystemInitCache(); - char* resc = va_list_to_string( "%s_drone", child->appname ); + //char* resc = va_list_to_string( "%s_drone", child->appname ); // If we're a source-client, tell the logger now that we're a new process. char* isclient = osrfConfigGetValue( NULL, "/client" ); @@ -385,13 +385,13 @@ static int prefork_child_init_hook( prefork_child* child ) { osrfSystemIgnoreTransportClient(); // Connect to Jabber - if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) { + if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" ); - free( resc ); + //free( resc ); return -1; } - free( resc ); + //free( resc ); // Dynamically call the application-specific initialization function // from a previously loaded shared library. @@ -425,7 +425,7 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { if( !client_connected( client )) { osrfSystemIgnoreTransportClient(); osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." ); - if( !osrf_system_bootstrap_client( NULL, NULL )) { + if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client in prefork_child_process_request()" ); sleep( 1 ); @@ -434,7 +434,8 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { } // Construct the message from the xml. - transport_message* msg = new_message_from_xml( data ); + //transport_message* msg = new_message_from_xml( data ); + transport_message* msg = new_message_from_json( data ); // Respond to the transport message. This is where method calls are buried. osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname ); @@ -706,7 +707,7 @@ static void sigchld_handler( int sig ) { */ static void sigusr1_handler( int sig ) { if (!global_forker) return; - osrf_prefork_register_routers(global_forker->appname, true); + //osrf_prefork_register_routers(global_forker->appname, true); signal( SIGUSR1, sigusr1_handler ); } @@ -718,7 +719,7 @@ static void sigusr1_handler( int sig ) { */ static void sigusr2_handler( int sig ) { if (!global_forker) return; - osrf_prefork_register_routers(global_forker->appname, false); + //osrf_prefork_register_routers(global_forker->appname, false); signal( SIGUSR2, sigusr2_handler ); } @@ -912,8 +913,10 @@ static void prefork_run( prefork_simple* forker ) { continue; } - message_prepare_xml( cur_msg ); - const char* msg_data = cur_msg->msg_xml; + //message_prepare_xml( cur_msg ); + message_prepare_json( cur_msg ); + //const char* msg_data = cur_msg->msg_xml; + const char* msg_data = cur_msg->msg_json; if( ! msg_data || ! *msg_data ) { osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); @@ -1001,7 +1004,7 @@ static void prefork_run( prefork_simple* forker ) { osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd ); - const char* msg_data = cur_msg->msg_xml; + const char* msg_data = cur_msg->msg_json; int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { // This child appears to be dead or unusable. Discard it. @@ -1036,7 +1039,7 @@ static void prefork_run( prefork_simple* forker ) { osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d", new_child->write_data_fd, new_child->pid ); - const char* msg_data = cur_msg->msg_xml; + const char* msg_data = cur_msg->msg_json; int written = write( new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { @@ -1457,7 +1460,7 @@ static void prefork_clear( prefork_simple* prefork, bool graceful ) { // always de-register routers before killing child processes (or waiting // for them to complete) so that new requests are directed elsewhere. - osrf_prefork_register_routers(global_forker->appname, true); + //osrf_prefork_register_routers(global_forker->appname, true); while( prefork->first_child ) { diff --git a/src/libopensrf/osrf_system.c b/src/libopensrf/osrf_system.c index 684dd0d..3c04061 100644 --- a/src/libopensrf/osrf_system.c +++ b/src/libopensrf/osrf_system.c @@ -64,7 +64,7 @@ void osrfSystemIgnoreTransportClient() { /** @brief Bootstrap a generic application from info in the configuration file. @param config_file Name of the configuration file. - @param contextnode Name of an aggregate within the configuration file, containing the + @param connection_type Name of an aggregate within the configuration file, containing the relevant subset of configuration stuff. @return 1 if successful; zero or -1 if error. @@ -74,8 +74,9 @@ void osrfSystemIgnoreTransportClient() { A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource. */ -int osrf_system_bootstrap_client( char* config_file, char* contextnode ) { - return osrfSystemBootstrapClientResc(config_file, contextnode, NULL); + +int osrf_system_bootstrap_client(const char* config_file, const char* connection_type) { + return osrf_system_bootstrap_common(config_file, connection_type, "client", 0); } /** @@ -185,7 +186,7 @@ int osrf_system_service_ctrl( const char* action, const char* service) { // Load the conguration, open the log, open a connection to Jabber - if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) { + if (!osrf_system_bootstrap_common(config, context, "client", 0)) { osrfLogError(OSRF_LOG_MARK, "Unable to bootstrap for host %s from configuration file %s", hostname, config); @@ -327,7 +328,7 @@ int osrf_system_service_ctrl( /** @brief Bootstrap a generic application from info in the configuration file. @param config_file Name of the configuration file. - @param contextnode Name of an aggregate within the configuration file, containing the + @param connection_type Name of an aggregate within the configuration file, containing the relevant subset of configuration stuff. @param resource Used to construct a Jabber resource name; may be NULL. @return 1 if successful; zero or -1 if error. @@ -336,8 +337,19 @@ int osrf_system_service_ctrl( - Open the log. - Open a connection to Jabber. */ -int osrfSystemBootstrapClientResc( const char* config_file, - const char* contextnode, const char* resource ) { +int osrfSystemBootstrapClientResc(const char* config_file, + const char* connection_type, const char* appname) { + return osrf_system_bootstrap_common(config_file, connection_type, appname, 0); +} + +int osrf_system_bootstrap_common(const char* config_file, + const char* connection_type, const char* appname, int is_service) { + + if (connection_type == NULL) { + osrfLogError(OSRF_LOG_MARK, + "osrf_system_bootstrap_common() requires a connection type"); + return -1; + } int failure = 0; @@ -346,13 +358,13 @@ int osrfSystemBootstrapClientResc( const char* config_file, return 1; /* we already have a client connection */ } - if( !( config_file && contextnode ) && ! osrfConfigHasDefaultConfig() ) { + if( !( config_file && connection_type ) && ! osrfConfigHasDefaultConfig() ) { osrfLogError( OSRF_LOG_MARK, "No Config File Specified\n" ); return -1; } if( config_file ) { - osrfConfig* cfg = osrfConfigInit( config_file, contextnode ); + osrfConfig* cfg = osrfConfigInit(config_file, NULL); if(cfg) osrfConfigSetDefaultConfig(cfg); else @@ -360,61 +372,48 @@ int osrfSystemBootstrapClientResc( const char* config_file, // fetch list of configured log redaction marker strings log_protect_arr = osrfNewStringArray(8); - osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared"); - osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" ); + osrfConfigGetValueList(cfg, log_protect_arr, "/config/log_protect/match_string" ); } - char* log_file = osrfConfigGetValue( NULL, "/logfile"); - if(!log_file) { + char* log_file = osrfConfigGetValue(NULL, "/config/connections/%s/logfile", connection_type); + if (!log_file) { fprintf(stderr, "No log file specified in configuration file %s\n", config_file); return -1; } - char* log_level = osrfConfigGetValue( NULL, "/loglevel" ); - osrfStringArray* arr = osrfNewStringArray(8); - osrfConfigGetValueList(NULL, arr, "/domain"); - - char* username = osrfConfigGetValue( NULL, "/username" ); - char* password = osrfConfigGetValue( NULL, "/passwd" ); - char* port = osrfConfigGetValue( NULL, "/port" ); - char* unixpath = osrfConfigGetValue( NULL, "/unixpath" ); - char* facility = osrfConfigGetValue( NULL, "/syslog" ); - char* actlog = osrfConfigGetValue( NULL, "/actlog" ); - char* logtag = osrfConfigGetValue( NULL, "/logtag" ); + char* log_level = osrfConfigGetValue(NULL, "/config/connections/%s/loglevel", connection_type); + char* username = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/username", connection_type); + char* password = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/password", connection_type); + char* host = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/host", connection_type); + char* port = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/port", connection_type); + char* unixpath = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/sock", connection_type); + char* facility = osrfConfigGetValue(NULL, "/config/connections/%s/syslog", connection_type); + char* actlog = osrfConfigGetValue(NULL, "/config/connections/%s/actlog", connection_type); + char* logtag = osrfConfigGetValue(NULL, "/config/connections/%s/logtag", connection_type); /* if we're a source-client, tell the logger */ - char* isclient = osrfConfigGetValue(NULL, "/client"); + char* isclient = osrfConfigGetValue(NULL, "/config/connections/%s/client", connection_type); if( isclient && !strcasecmp(isclient,"true") ) osrfLogSetIsClient(1); free(isclient); int llevel = 0; int iport = 0; - if(port) iport = atoi(port); - if(log_level) llevel = atoi(log_level); + if (port) iport = atoi(port); + if (log_level) llevel = atoi(log_level); if(!strcmp(log_file, "syslog")) { if(logtag) osrfLogSetLogTag(logtag); - osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel ); + osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel ); osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility)); if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog)); } else { - osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel ); + osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel ); osrfLogSetFile( log_file ); } - - /* Get a domain, if one is specified */ - const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */ - if(!domain) { - fprintf(stderr, "No domain specified in configuration file %s\n", config_file); - osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n", - config_file ); - failure = 1; - } - if(!username) { fprintf(stderr, "No username specified in configuration file %s\n", config_file); osrfLogError( OSRF_LOG_MARK, "No username specified in configuration file %s\n", @@ -437,7 +436,6 @@ int osrfSystemBootstrapClientResc( const char* config_file, } if (failure) { - osrfStringArrayFree(arr); free(log_file); free(log_level); free(username); @@ -450,30 +448,23 @@ int osrfSystemBootstrapClientResc( const char* config_file, return 0; } - osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s", - domain, iport, unixpath ? unixpath : "(none)" ); - transport_client* client = client_init( domain, iport, unixpath, 0 ); - - char host[HOST_NAME_MAX + 1] = ""; - gethostname(host, sizeof(host) ); - host[HOST_NAME_MAX] = '\0'; + osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with host %s, port %d, and unixpath %s", + host, iport, unixpath ? unixpath : "(none)" ); - char tbuf[32]; - tbuf[0] = '\0'; - snprintf(tbuf, 32, "%f", get_timestamp_millis()); + transport_client* client = client_init(host, iport, unixpath); - if(!resource) resource = ""; + if (appname == NULL) { appname = "client"; } - int len = strlen(resource) + 256; - char buf[len]; - buf[0] = '\0'; - snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() ); - - if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) { - osrfGlobalTransportClient = client; - } + if (is_service) { + if (client_connect_as_service(client, appname, username, password)) { + osrfGlobalTransportClient = client; + } + } else { + if (client_connect(client, appname, username, password)) { + osrfGlobalTransportClient = client; + } + } - osrfStringArrayFree(arr); free(actlog); free(facility); free(log_level); diff --git a/src/libopensrf/transport_client.c b/src/libopensrf/transport_client.c index 2a86d0c..5f717ac 100644 --- a/src/libopensrf/transport_client.c +++ b/src/libopensrf/transport_client.c @@ -1,296 +1,394 @@ #include -/** - @file transport_client.c - @brief Collection of routines for sending and receiving single messages over Jabber. +static int handle_redis_error(redisReply* reply, char* command, ...); - These functions form an API built on top of the transport_session API. They serve - two main purposes: - - They remember a Jabber ID to use when sending messages. - - They maintain a queue of input messages that the calling code can get one at a time. -*/ +transport_client* client_init(const char* server, int port, const char* unix_path) { -static void client_message_handler( void* client, transport_message* msg ); + if(server == NULL) return NULL; -//int main( int argc, char** argv ); + /* build and clear the client object */ + transport_client* client = safe_malloc( sizeof( transport_client) ); -/* -int main( int argc, char** argv ) { + /* start with an empty message queue */ + client->bus = NULL; + client->stream_name = NULL; + client->consumer_name = NULL; + + client->max_queue_size = 1000; // TODO pull from config + client->port = port; + client->host = server ? strdup(server) : NULL; + client->unix_path = unix_path ? strdup(unix_path) : NULL; + client->error = 0; - transport_message* recv; - transport_message* send; + return client; +} - transport_client* client = client_init( "spacely.georgialibraries.org", 5222 ); +int client_connect_with_stream_name(transport_client* client, + const char* username, const char* password) { - // try to connect, allow 15 second connect timeout - if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) { - printf("Connected...\n"); - } else { - printf( "NOT Connected...\n" ); exit(99); - } + osrfLogDebug(OSRF_LOG_MARK, "Transport client connecting with bus " + "stream=%s; consumer=%s; host=%s; port=%d; unix_path=%s", + client->stream_name, + client->consumer_name, + client->host, + client->port, + client->unix_path + ); - while( (recv = client_recv( client, -1 )) ) { + // TODO use redisConnectWithTimeout so we can verify connection. + if (client->host && client->port) { + client->bus = redisConnect(client->host, client->port); + } else if (client->unix_path) { + client->bus = redisConnectUnix(client->unix_path); + } - if( recv->body ) { - int len = strlen(recv->body); - char buf[len + 20]; - osrf_clearbuf( buf, 0, sizeof(buf)); - sprintf( buf, "Echoing...%s", recv->body ); - send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" ); - } else { - send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" ); - } + if (client->bus == NULL) { + osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance"); + return 0; + } - if( send == NULL ) { printf("something's wrong"); } - client_send_message( client, send ); + osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK"); - message_free( send ); - message_free( recv ); - } + osrfLogDebug(OSRF_LOG_MARK, "Sending AUTH with username=%s", username); - printf( "ended recv loop\n" ); + redisReply *reply = redisCommand(client->bus, "AUTH %s %s", username, password); - return 0; + if (handle_redis_error(reply, "AUTH %s %s", username, password)) { return 0; } -} -*/ + osrfLogDebug(OSRF_LOG_MARK, "Redis AUTH succeeded"); + freeReplyObject(reply); -/** - @brief Allocate and initialize a transport_client. - @param server Domain name where the Jabber server resides. - @param port Port used for connecting to Jabber (0 if using UNIX domain socket). - @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket). - @param component Boolean; true if we're a Jabber component. - @return A pointer to a newly created transport_client. - - Create a transport_client with a transport_session and an empty message queue (but don't - open a connection yet). Install a callback function in the transport_session to enqueue - incoming messages. - - The calling code is responsible for freeing the transport_client by calling client_free(). -*/ -transport_client* client_init( const char* server, int port, const char* unix_path, int component ) { + // Create our stream + consumer group. + // This will produce an error when the group already exists, which + // will happen with service-level groups. Skip error checking. + reply = redisCommand( + client->bus, + "XGROUP CREATE %s %s $ mkstream", + client->stream_name, + client->stream_name + ); - if(server == NULL) return NULL; + freeReplyObject(reply); - /* build and clear the client object */ - transport_client* client = safe_malloc( sizeof( transport_client) ); + return 1; +} - /* start with an empty message queue */ - client->msg_q_head = NULL; - client->msg_q_tail = NULL; +int client_connect_as_service(transport_client* client, + const char* appname, const char* username, const char* password) { + if (client == NULL || appname == NULL) { return 0; } + growing_buffer *buf = buffer_init(32); + buffer_fadd(buf, "service:%s", appname); - /* build the session */ - client->session = init_transport( server, port, unix_path, client, component ); + // strdup the content, leave the buf alive. + client->stream_name = buffer_data(buf); - client->session->message_callback = client_message_handler; - client->error = 0; - client->host = strdup(server); - client->xmpp_id = NULL; + // Add some random stuff to the end of the consumer name, which + // has to be unuique per client. + char junk[256]; + snprintf(junk, sizeof(junk), + "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid()); - return client; + char* md5 = md5sum(junk); + + buffer_add(buf, ":"); + buffer_add_n(buf, md5, 12); + + client->consumer_name = buffer_release(buf); + + return client_connect_with_stream_name(client, username, password); } +int client_connect(transport_client* client, + const char* appname, const char* username, const char* password) { + if (client == NULL || appname == NULL) { return 0; } -/** - @brief Open a Jabber session for a transport_client. - @param client Pointer to the transport_client. - @param username Jabber user name. - @param password Password for the Jabber logon. - @param resource Resource name for the Jabber logon. - @param connect_timeout How many seconds to wait for the connection to open. - @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes). - @return 1 if successful, or 0 upon error. - - Besides opening the Jabber session, create a Jabber ID for future use. - - If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond. If - @a connect_timeout is zero, don't wait at all. If @a timeout is positive, wait that - number of seconds before timing out. If @a connect_timeout has a negative value other - than -1, the results are not well defined. - - The value of @a connect_timeout applies to each of two stages in the logon procedure. - Hence the logon may take up to twice the amount of time indicated. - - If we connect as a Jabber component, we send the password as an SHA1 hash. Otherwise - we look at the @a auth_type. If it's AUTH_PLAIN, we send the password as plaintext; if - it's AUTH_DIGEST, we send it as a hash. - */ -int client_connect( transport_client* client, - const char* username, const char* password, const char* resource, - int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ) { - if( client == NULL ) - return 0; - - // Create and store a Jabber ID - if( client->xmpp_id ) - free( client->xmpp_id ); - client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource ); - - // Open a transport_session - return session_connect( client->session, username, - password, resource, connect_timeout, auth_type ); + char junk[256]; + snprintf(junk, sizeof(junk), + "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid()); + + char* md5 = md5sum(junk); + + growing_buffer *buf = buffer_init(32); + buffer_add(buf, "client:"); + + if (strcmp("client", appname) == 0) { + // Standalone client + buffer_add_n(buf, md5, 12); + } else { + // Service client client:servicename:junk + buffer_fadd(buf, "%s:", appname); + buffer_add_n(buf, md5, 12); + } + + client->stream_name = buffer_release(buf); + client->consumer_name = strdup(client->stream_name); + + free(md5); + + return client_connect_with_stream_name(client, username, password); } -/** - @brief Disconnect from the Jabber session. - @param client Pointer to the transport_client. - @return 0 in all cases. +int client_disconnect(transport_client* client) { + if (client == NULL || client->bus == NULL) { return 0; } - If there are any messages still in the queue, they stay there; i.e. we don't free them here. -*/ -int client_disconnect( transport_client* client ) { - if( client == NULL ) { return 0; } - return session_disconnect( client->session ); + if (strncmp(client->stream_name, "client:", 7) == 0) { + // Delete our stream on disconnect if we are a client. + // No point in letting it linger. + redisReply *reply = + redisCommand(client->bus, "DEL %s", client->stream_name); + + freeReplyObject(reply); + } + + redisFree(client->bus); + client->bus = NULL; + return 1; } -/** - @brief Report whether a transport_client is connected. - @param client Pointer to the transport_client. - @return Boolean: 1 if connected, or 0 if not. -*/ int client_connected( const transport_client* client ) { - if(client == NULL) return 0; - return session_connected( client->session ); + return (client != NULL && client->bus != NULL); } -/** - @brief Send a transport message to the current destination. - @param client Pointer to a transport_client. - @param msg Pointer to the transport_message to be sent. - @return 0 if successful, or -1 if not. +int client_send_message(transport_client* client, transport_message* msg) { + if (client == NULL || client->error) { return -1; } - Translate the transport_message into XML and send it to Jabber, using the previously - stored Jabber ID for the sender. -*/ -int client_send_message( transport_client* client, transport_message* msg ) { - if( client == NULL || client->error ) - return -1; - if( msg->sender ) - free( msg->sender ); - msg->sender = strdup(client->xmpp_id); - return session_send_msg( client->session, msg ); + if (msg->sender) { free(msg->sender); } + msg->sender = strdup(client->stream_name); + + message_prepare_json(msg); + + osrfLogInternal(OSRF_LOG_MARK, + "client_send_message() to=%s %s", msg->recipient, msg->msg_json); + + redisReply *reply = redisCommand(client->bus, + "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", + msg->recipient, + client->max_queue_size, + msg->msg_json + ); + + if (handle_redis_error(reply, + "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", + msg->recipient, + client->max_queue_size, + msg->msg_json) + ) { return -1; } + + osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed"); + + freeReplyObject(reply); + + return 0; } -/** - @brief Fetch an input message, if one is available. - @param client Pointer to a transport_client. - @param timeout How long to wait for a message to arrive, in seconds (see remarks). - @return A pointer to a transport_message if successful, or NULL if not. +// Returns the reply on success, NULL on error +// On error, the reply is freed. +static int handle_redis_error(redisReply *reply, char* command, ...) { - If there is a message already in the queue, return it immediately. Otherwise read any - available messages from the transport_session (subject to a timeout), and return the - first one. + if (reply != NULL && reply->type != REDIS_REPLY_ERROR) { + return 0; + } - If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a - message arrives (or we error out for other reasons). If the value of @a timeout is zero, - don't wait at all. + VA_LIST_TO_STRING(command); + char* err = reply == NULL ? "" : reply->str; + osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF); + freeReplyObject(reply); - The calling code is responsible for freeing the transport_message by calling message_free(). -*/ -transport_message* client_recv( transport_client* client, int timeout ) { - if( client == NULL ) { return NULL; } + return 1; +} - int error = 0; /* boolean */ +/* + * Returns at most one allocated char* pulled from the bus or NULL + * if the pop times out or is interrupted. + * + * The string will be valid JSON string, a partial JSON string, or + * a message terminator chararcter. + */ +char* recv_one_chunk(transport_client* client, int timeout) { + if (client == NULL || client->bus == NULL) { return NULL; } + + redisReply *reply, *tmp; + char* json = NULL; + char* msg_id = NULL; + + if (timeout != 0) { + + if (timeout == -1) { + // Redis timeout 0 means block indefinitely + timeout = 0; + } else { + // Milliseconds + timeout *= 1000; + } + + reply = redisCommand(client->bus, + "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >", + client->stream_name, + client->consumer_name, + timeout, + client->stream_name + ); + + } else { + + reply = redisCommand(client->bus, + "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >", + client->stream_name, + client->consumer_name, + client->stream_name + ); + } + + // Timeout or error + if (handle_redis_error( + reply, + "XREADGROUP GROUP %s %s %s COUNT 1 STREAMS %s >", + client->stream_name, + client->consumer_name, + "BLOCK X", + client->stream_name + )) { return NULL; } + + // Unpack the XREADGROUP response, which is a nest of arrays. + // These arrays are mostly 1 and 2-element lists, since we are + // only reading one item on a single stream. + if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) { + tmp = reply->element[0]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { + tmp = tmp->element[1]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) { + tmp = tmp->element[0]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { + redisReply *r1 = tmp->element[0]; + redisReply *r2 = tmp->element[1]; + + if (r1->type == REDIS_REPLY_STRING) { + msg_id = strdup(r1->str); + } + + if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) { + // r2->element[0] is the message name, which we + // currently don't use for anything. + + r2 = r2->element[1]; + + if (r2->type == REDIS_REPLY_STRING) { + json = strdup(r2->str); + } + } + } + } + } + } + + freeReplyObject(reply); // XREADGROUP + + if (msg_id == NULL) { + // Read timed out. 'json' will also be NULL. + return NULL; + } + + reply = redisCommand(client->bus, "XACK %s %s %s", + client->stream_name, client->stream_name, msg_id); + + if (handle_redis_error( + reply, + "XACK %s %s %s", + client->stream_name, + client->stream_name, msg_id + )) { return NULL; } + + freeReplyObject(reply); // XACK + + free(msg_id); + + osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json); + + return json; +} - if( NULL == client->msg_q_head ) { +/// Returns at most one JSON value pulled from the bus or NULL if +/// the list pop times out or the pop is interrupted by a signal. +jsonObject* recv_one_value(transport_client* client, int timeout) { - // No message available on the queue? Try to get a fresh one. + char* json = recv_one_chunk(client, timeout); - // When we call session_wait(), it reads a socket for new messages. When it finds - // one, it enqueues it by calling the callback function client_message_handler(), - // which we installed in the transport_session when we created the transport_client. + if (json == NULL) { + // recv() timed out. + return NULL; + } - // Since a single call to session_wait() may not result in the receipt of a complete - // message. we call it repeatedly until we get either a message or an error. + jsonObject* obj = jsonParse(json); - // Alternatively, a single call to session_wait() may result in the receipt of - // multiple messages. That's why we have to enqueue them. + if (obj == NULL) { + osrfLogWarning(OSRF_LOG_MARK, "Error parsing JSON: %s", json); + } - // The timeout applies to the receipt of a complete message. For a sufficiently - // short timeout, a sufficiently long message, and a sufficiently slow connection, - // we could timeout on the first message even though we're still receiving data. - - // Likewise we could time out while still receiving the second or subsequent message, - // return the first message, and resume receiving messages later. + free(json); - if( timeout == -1 ) { /* wait potentially forever for data to arrive */ + return obj; +} - int x; - do { - if( (x = session_wait( client->session, -1 )) ) { - osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x); - error = 1; - break; - } - } while( client->msg_q_head == NULL ); +/** + * Returns at most one jsonObject returned from the data bus. + * + * Keeps trying until a value is returned or the timeout is exceeded. + */ +jsonObject* recv_json_value(transport_client* client, int timeout) { - } else { /* loop up to 'timeout' seconds waiting for data to arrive */ + if (timeout == 0) { + return recv_one_value(client, 0); - /* This loop assumes that a time_t is denominated in seconds -- not */ - /* guaranteed by Standard C, but a fair bet for Linux or UNIX */ + } else if (timeout < 0) { + // Keep trying until we have a result. - time_t start = time(NULL); - time_t remaining = (time_t) timeout; + while (1) { + jsonObject* obj = recv_one_value(client, timeout); + if (obj != NULL) { return obj; } + } + } - int wait_ret; - do { - if( (wait_ret = session_wait( client->session, (int) remaining)) ) { - error = 1; - osrfLogDebug(OSRF_LOG_MARK, - "session_wait returned failure code %d: setting error=1\n", wait_ret); - break; - } + time_t seconds = (time_t) timeout; - remaining -= time(NULL) - start; - } while( NULL == client->msg_q_head && remaining > 0 ); - } - } + while (seconds > 0) { - transport_message* msg = NULL; + time_t now = time(NULL); + jsonObject* obj = recv_one_value(client, timeout); - if( !error && client->msg_q_head != NULL ) { - /* got message(s); dequeue the oldest one */ - msg = client->msg_q_head; - client->msg_q_head = msg->next; - msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */ - if( NULL == client->msg_q_head ) - client->msg_q_tail = NULL; - } + if (obj == NULL) { + seconds -= now; + } else { + return obj; + } + } - return msg; + return NULL; } -/** - @brief Enqueue a newly received transport_message. - @param client A pointer to a transport_client, cast to a void pointer. - @param msg A new transport message. +transport_message* client_recv(transport_client* client, int timeout) { + if (client == NULL || client->bus == NULL) { return NULL; } - Add a newly arrived input message to the tail of the queue. + // TODO no need for intermediate to/from JSON. Create transport + // message directly from received JSON object. + jsonObject* obj = recv_json_value(client, timeout); - This is a callback function. The transport_session parses the XML coming in through a - socket, accumulating various bits and pieces. When it sees the end of a message stanza, - it packages the bits and pieces into a transport_message that it passes to this function, - which enqueues the message for processing. -*/ -static void client_message_handler( void* client, transport_message* msg ){ + if (obj == NULL) { return NULL; } // Receive timed out. - if(client == NULL) return; - if(msg == NULL) return; + char* json = jsonObjectToJSON(obj); - transport_client* cli = (transport_client*) client; + transport_message* msg = new_message_from_json(json); - /* add the new message to the tail of the queue */ - if( NULL == cli->msg_q_head ) - cli->msg_q_tail = cli->msg_q_head = msg; - else { - cli->msg_q_tail->next = msg; - cli->msg_q_tail = msg; - } - msg->next = NULL; -} + free(json); + osrfLogInternal(OSRF_LOG_MARK, + "client_recv() read response for thread %s", msg->thread); + + return msg; +} /** @brief Free a transport_client, along with all resources it owns. @@ -298,10 +396,7 @@ static void client_message_handler( void* client, transport_message* msg ){ @return 1 if successful, or 0 if not. The only error condition is if @a client is NULL. */ int client_free( transport_client* client ) { - if(client == NULL) - return 0; - session_free( client->session ); - client->session = NULL; + if (client == NULL) { return 0; } return client_discard( client ); } @@ -315,29 +410,16 @@ int client_free( transport_client* client ) { disconnect the parent as well. */ int client_discard( transport_client* client ) { - if(client == NULL) - return 0; - - transport_message* current = client->msg_q_head; - transport_message* next; - - /* deallocate the list of messages */ - while( current != NULL ) { - next = current->next; - message_free( current ); - current = next; - } - - free(client->host); - free(client->xmpp_id); - free( client ); + + if (client == NULL) { return 0; } + + if (client->host != NULL) { free(client->host); } + if (client->unix_path != NULL) { free(client->unix_path); } + if (client->stream_name != NULL) { free(client->stream_name); } + if (client->consumer_name != NULL) { free(client->consumer_name); } + + free(client); + return 1; } -int client_sock_fd( transport_client* client ) -{ - if( !client ) - return 0; - else - return client->session->sock_id; -} diff --git a/src/libopensrf/transport_message.c b/src/libopensrf/transport_message.c index 332b622..b5d6841 100644 --- a/src/libopensrf/transport_message.c +++ b/src/libopensrf/transport_message.c @@ -1,4 +1,5 @@ #include +#include /** @file transport_message.c @@ -66,11 +67,72 @@ transport_message* message_init( const char* body, const char* subject, msg->error_code = 0; msg->broadcast = 0; msg->msg_xml = NULL; + msg->msg_json = NULL; msg->next = NULL; return msg; } +transport_message* new_message_from_json(const char* msg_json) { + + if (msg_json == NULL || *msg_json == '\0') { return NULL; } + + transport_message* new_msg = safe_malloc(sizeof(transport_message)); + + new_msg->body = NULL; + new_msg->subject = NULL; + new_msg->thread = NULL; + new_msg->recipient = NULL; + new_msg->sender = NULL; + new_msg->router_from = NULL; + new_msg->router_to = NULL; + new_msg->router_class = NULL; + new_msg->router_command = NULL; + new_msg->osrf_xid = NULL; + new_msg->is_error = 0; + new_msg->error_type = NULL; + new_msg->error_code = 0; + new_msg->broadcast = 0; + new_msg->msg_xml = NULL; + new_msg->next = NULL; + + jsonObject* json_hash = jsonParse(msg_json); + + if (json_hash == NULL || json_hash->type != JSON_HASH) { + osrfLogError(OSRF_LOG_MARK, "new_message_from_json() received bad JSON"); + jsonObjectFree(json_hash); + message_free(new_msg); + return NULL; + } + + const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from")); + if (sender) { new_msg->sender = strdup((const char*) sender); } + + const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to")); + if (recipient) { new_msg->recipient = strdup((const char*) recipient); } + + const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread")); + if (thread == NULL) { thread = ""; } + new_msg->thread = strdup((const char*) thread); + + const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid")); + if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); } + + // TODO + // Internally the mesage body is stored as a JSON string + // On the wire, it's just part of the message. We could get + // rid if this extra json encode/decode step if we treated + // the body as a JSON object internally. + const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body")); + if (body == NULL) { body = ""; } + new_msg->body = strdup((const char*) body); + + jsonObjectFree(json_hash); + + return new_msg; +} + + /** @brief Translate an XML string into a transport_message. @@ -308,11 +370,36 @@ int message_free( transport_message* msg ){ free(msg->osrf_xid); if( msg->error_type != NULL ) free(msg->error_type); if( msg->msg_xml != NULL ) free(msg->msg_xml); + if( msg->msg_json != NULL ) free(msg->msg_json); free(msg); return 1; } +int message_prepare_json(transport_message* msg) { + + if (!msg) { return 0; } + if (msg->msg_json) { return 1; } /* already done */ + + jsonObject* json_hash = jsonNewObject(NULL); + jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient)); + jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender)); + jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread)); + jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid)); + + // TODO the various layers expect the message body to be a separate + // JSON string, but on the bus, the body is just another key + // in the JSON object. + jsonObjectSetKey(json_hash, "body", jsonParse(msg->body)); + + msg->msg_json = jsonObjectToJSON(json_hash); + + jsonObjectFree(json_hash); + + return 1; +} + + /** @brief Build a <message> element and store it as a string in the msg_xml member. @param msg Pointer to a transport_message. diff --git a/src/perl/MANIFEST b/src/perl/MANIFEST index 2f8a129..a166518 100644 --- a/src/perl/MANIFEST +++ b/src/perl/MANIFEST @@ -28,10 +28,12 @@ lib/OpenSRF/Transport/Listener.pm lib/OpenSRF/Transport/PeerHandle.pm lib/OpenSRF/Transport/SlimJabber.pm lib/OpenSRF/Transport/SlimJabber/Client.pm -lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm +lib/OpenSRF/Transport/Redis/Client.pm +lib/OpenSRF/Transport/Redis/PeerConnection.pm +lib/OpenSRF/Transport/Redis/Message.pm lib/OpenSRF/Utils.pm lib/OpenSRF/Utils/Cache.pm lib/OpenSRF/Utils/Config.pm diff --git a/src/perl/lib/OpenSRF/AppSession.pm b/src/perl/lib/OpenSRF/AppSession.pm index 603ba3c..0af61f9 100644 --- a/src/perl/lib/OpenSRF/AppSession.pm +++ b/src/perl/lib/OpenSRF/AppSession.pm @@ -6,7 +6,6 @@ use OpenSRF::Transport::PeerHandle; use OpenSRF::Utils::JSON; use OpenSRF::Utils::Logger qw(:level); use OpenSRF::Utils::SettingsClient; -use OpenSRF::Utils::Config; use OpenSRF::EX; use OpenSRF; use Exporter; @@ -208,18 +207,7 @@ sub last_sent_type { sub get_app_targets { my $app = shift; - - my $conf = OpenSRF::Utils::Config->current; - my $router_name = $conf->bootstrap->router_name || 'router'; - my $domain = $conf->bootstrap->domain; - $logger->error("use of is deprecated") if $conf->bootstrap->domains; - - unless($router_name and $domain) { - throw OpenSRF::EX::Config - ("Missing router config information 'router_name' and 'domain'"); - } - - return ("$router_name\@$domain/$app"); + return ("service:$app"); } sub stateless { @@ -578,6 +566,9 @@ sub send { } } + + # TODO Redis + # We can remove this extra layer of JSON round-tripping on the body. my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc); $logger->internal("AppSession sending doc: $json"); diff --git a/src/perl/lib/OpenSRF/Application.pm b/src/perl/lib/OpenSRF/Application.pm index 2ec87ce..8682629 100644 --- a/src/perl/lib/OpenSRF/Application.pm +++ b/src/perl/lib/OpenSRF/Application.pm @@ -12,6 +12,7 @@ use Time::HiRes qw/time/; use OpenSRF::EX qw/:try/; use Carp; use OpenSRF::Utils::JSON; +use OpenSRF::Utils::Config; sub DESTROY{}; @@ -22,7 +23,6 @@ $log = 'OpenSRF::Utils::Logger'; our $in_request = 0; our @pending_requests; -our $shared_conf; sub package { my $self = shift; @@ -142,8 +142,12 @@ sub handler { my $logdata = "CALL: ".$session->service." $method_name "; my $redact_params = 0; if (@p) { - if (ref($shared_conf->shared->log_protect) eq 'ARRAY') { - foreach my $match_string (@{$shared_conf->shared->log_protect}) { + + my $conf = OpenSRF::Utils::Config->current->as_hash; + my $protect = $conf->{shared}->{log_protect}; + + if (ref($protect) eq 'ARRAY') { + foreach my $match_string (@$protect) { if ($method_name =~ /^$match_string/) { $redact_params = 1; last; diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 52c53d2..f6a36a4 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -23,6 +23,7 @@ use OpenSRF::Utils::SettingsClient; use OpenSRF::Utils::Logger qw($logger); use OpenSRF::DomainObject::oilsResponse qw/:status/; use OpenSRF::Transport::SlimJabber::Client; +use Digest::MD5 qw(md5_hex); use Encode; use POSIX qw/:sys_wait_h :errno_h/; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); @@ -341,22 +342,28 @@ sub kill_child { sub build_osrf_handle { my $self = shift; - my $conf = OpenSRF::Utils::Config->current; - my $username = $conf->bootstrap->username; - my $password = $conf->bootstrap->passwd; - my $domain = $conf->bootstrap->domain; - my $port = $conf->bootstrap->port; - my $resource = $self->{service} . '_listener_' . $conf->env->hostname; + my $conf = OpenSRF::Utils::Config->current + ->as_hash->{connections}->{service}->{message_bus}; + + my $port = $conf->{port} || 6379; + my $host = $conf->{host} || '127.0.0.1'; + my $sock = $conf->{sock}; + my $username = $conf->{username}; + my $password = $conf->{password}; - $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port"); + # Every listener needs a unique consumer name. + my $consumer_name = 'service:' . + $self->{service} . ':' . substr(md5_hex($$ . time . rand($$)), 0, 12); $self->{osrf_handle} = - OpenSRF::Transport::SlimJabber::Client->new( - username => $username, - resource => $resource, - password => $password, - host => $domain, + OpenSRF::Transport::Redis::Client->new( + stream_name => "service:" . $self->{service}, + consumer_name => $consumer_name, + host => $host, port => $port, + sock => $sock, + username => $username, + password => $password ); $self->{osrf_handle}->initialize; @@ -368,11 +375,12 @@ sub build_osrf_handle { # ---------------------------------------------------------------- sub write_child { my($self, $child, $msg) = @_; - my $xml = encode_utf8(decode_utf8($msg->to_xml)); + #my $xml = encode_utf8(decode_utf8($msg->to_xml)); + my $json = $msg->to_json; # tell the child how much data to expect, minus the header my $write_size; - {use bytes; $write_size = length($xml)} + {use bytes; $write_size = length($json)} $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size); for (0..2) { @@ -389,12 +397,12 @@ sub write_child { # so the lack of a pid means the child is dead. if (!$child->{pid}) { $logger->error("server: child is dead in write_child(). ". - "unable to send message: $xml"); + "unable to send message: $json"); return; # avoid syswrite crash } # send message to child data pipe - syswrite($child->{pipe_to_child}, $write_size . $xml); + syswrite($child->{pipe_to_child}, $write_size . $json); last unless $self->{sig_pipe}; $logger->error("server: got SIGPIPE writing to $child, retrying..."); @@ -591,6 +599,8 @@ sub spawn_child { # Sends the register command to the configured routers # ---------------------------------------------------------------- sub register_routers { + return; # TODO Redis + my $self = shift; my $conf = OpenSRF::Utils::Config->current; @@ -636,6 +646,8 @@ sub register_routers { # with. # ---------------------------------------------------------------- sub unregister_routers { + return; # TODO Redis + my $self = shift; return unless $self->{osrf_handle}->tcp_connected; @@ -696,7 +708,7 @@ sub init { my $self = shift; my $service = $self->{parent}->{service}; $0 = "OpenSRF Drone [$service]"; - OpenSRF::Transport::PeerHandle->construct($service); + OpenSRF::Transport::PeerHandle->construct($service, 'service'); OpenSRF::Application->application_implementation->child_init if (OpenSRF::Application->application_implementation->can('child_init')); } @@ -719,15 +731,15 @@ sub run { my $orig_name = $0; $0 = "$0*"; - # Discard extraneous data from the jabber socket - if(!$network->flush_socket()) { + # Discard extraneous data from our direct message bus ID. + if (!$network->flush_socket()) { $logger->error("server: network disconnected! child dropping request and exiting: $data"); exit; } my $session = OpenSRF::Transport->handler( $self->{parent}->{service}, - OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data) + OpenSRF::Transport::Redis::Message->new(json => $data) ); my $recycle = $self->keepalive_loop($session); diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm index ece232f..895f930 100644 --- a/src/perl/lib/OpenSRF/System.pm +++ b/src/perl/lib/OpenSRF/System.pm @@ -25,16 +25,20 @@ $| = 1; sub DESTROY {} sub load_bootstrap_config { + return if OpenSRF::Utils::Config->current; die "Please provide a bootstrap config file to OpenSRF::System\n" unless $bootstrap_config_file; OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file); - OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']); - OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper"); - OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection"); + OpenSRF::Utils::JSON->register_class_hint( + name => 'OpenSRF::Application', hint => 'method', type => 'hash', strip => ['session']); + OpenSRF::Transport::PeerHandle->set_peer_client('OpenSRF::Transport::Redis::PeerConnection'); OpenSRF::Application->server_class('client'); + + return; # XXX + # Read in a shared portion of the config file # for later use in log parameter redaction $OpenSRF::Application::shared_conf = OpenSRF::Utils::Config->load( @@ -67,9 +71,11 @@ sub bootstrap_client { my $app = $params{client_name} || "client"; + my $connection_type = $params{connection_type} || 'service'; + load_bootstrap_config(); - OpenSRF::Utils::Logger::set_config(); - OpenSRF::Transport::PeerHandle->construct($app); + OpenSRF::Utils::Logger::set_config(undef, $connection_type); + OpenSRF::Transport::PeerHandle->construct($app, $connection_type); } sub connected { @@ -85,7 +91,7 @@ sub run_service { $0 = "OpenSRF Listener [$service]"; # temp connection to use for application initialization - OpenSRF::System->bootstrap_client(client_name => "system_client"); + OpenSRF::System->bootstrap_client(client_name => $service); my $sclient = OpenSRF::Utils::SettingsClient->new; my $getval = sub { $sclient->config_value(apps => $service => @_); }; diff --git a/src/perl/lib/OpenSRF/Transport.pm b/src/perl/lib/OpenSRF/Transport.pm index 5aeff4d..2bc8849 100644 --- a/src/perl/lib/OpenSRF/Transport.pm +++ b/src/perl/lib/OpenSRF/Transport.pm @@ -7,7 +7,6 @@ use OpenSRF::Utils::JSON; use OpenSRF::Utils::Logger qw(:level); use OpenSRF::DomainObject::oilsResponse qw/:status/; use OpenSRF::EX qw/:try/; -use OpenSRF::Transport::SlimJabber::MessageWrapper; #------------------ # --- These must be implemented by all Transport subclasses @@ -34,32 +33,9 @@ sub get_msg_envelope { shift()->alert_abstract(); } our $message_envelope; my $logger = "OpenSRF::Utils::Logger"; - - -=head2 message_envelope( [$envelope] ); - -Sets the message envelope class that will allow us to extract -information from the messages we receive from the low -level transport - -=cut - -sub message_envelope { - my( $class, $envelope ) = @_; - if( $envelope ) { - $message_envelope = $envelope; - $envelope->use; - if( $@ ) { - $logger->error( - "Error loading message_envelope: $envelope -> $@", ERROR); - } - } - return $message_envelope; -} - =head2 handler( $data ) -Creates a new MessageWrapper, extracts the remote_id, session_id, and message body +Creates a new Message, extracts the remote_id, session_id, and message body from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id. Finally, creates the message document from the body of the message and calls the handler method on the message document. diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm new file mode 100644 index 0000000..446fd5a --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm @@ -0,0 +1,256 @@ +package OpenSRF::Transport::Redis::Client; +use strict; +use warnings; +use Redis; +use Time::HiRes q/time/; +use OpenSRF::Utils::JSON; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Transport::Redis::Message; + +sub new { + my ($class, %params) = @_; + my $self = bless({}, ref($class) || $class); + $self->params(\%params); + return $self; +} + +sub redis { + my ($self, $redis) = @_; + $self->{redis} = $redis if $redis; + return $self->{redis}; +} + +sub params { + my ($self, $params) = @_; + $self->{params} = $params if $params; + return $self->{params}; +} + +sub disconnect { + my $self = shift; + return unless $self->redis; + + if ($self->stream_name =~ /^client:/) { + # Delete our stream since we're the only one using it. Deleting + # the stream also deletes our consumer group. + $self->redis->del($self->stream_name); + } + + $self->redis->quit; + delete $self->{redis}; +} + +sub gather { + my $self = shift; + $self->process(0); +} + +# ------------------------------------------------- + +sub tcp_connected { + my $self = shift; + return $self->redis ? 1 : 0; +} + +sub connected { + my $self = shift; + return $self->tcp_connected; +} + +sub initialize { + my $self = shift; + + my $host = $self->params->{host} || ''; + my $port = $self->params->{port} || 0; + my $sock = $self->params->{sock} || ''; + my $username = $self->params->{username}; + my $password = $self->params->{password}; + my $stream_name = $self->params->{stream_name}; + my $consumer_name = $self->params->{consumer_name}; + my $max_queue_size = $self->params->{max_queue_size}; + + $logger->debug("Redis client connecting: ". + "host=$host port=$port sock=$sock username=$username stream_name=$stream_name"); + + return 1 if $self->redis; # already connected + + # UNIX socket file takes precedence over host:port. + my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port"); + + # On disconnect, try to reconnect every 100ms up to 60 seconds. + push(@connect_args, (reconnect => 60, every => 100_000)); + + $logger->debug("Connecting to bus: @connect_args"); + + unless ($self->redis(Redis->new(@connect_args))) { + throw OpenSRF::EX::Jabber("Could not connect to Redis bus with @connect_args"); + return 0; + } + + unless ($self->redis->auth($username, $password) eq 'OK') { + throw OpenSRF::EX::Jabber("Cannot authenticate with Redis instance user=$username"); + return 0; + } + + $logger->debug("Auth'ed with Redis as $username OK : stream_name=$stream_name"); + + eval { + # This gets mad when a stream / group already exists, but + # Listeners share a stream/group name so dupes are possible. + + $self->redis->xgroup( + 'create', + $stream_name, # stream name + $stream_name, # group name + '$', # only receive new messages + 'mkstream' # create this stream if it's not there. + ); + }; + + if ($@) { + $logger->info("XGROUP CREATE returned : $@"); + } + + $self->stream_name($stream_name); + $self->consumer_name($consumer_name); + $self->max_queue_size($max_queue_size); + + return $self; +} + +sub max_queue_size { + my ($self, $max_queue_size) = @_; + $self->{max_queue_size} = $max_queue_size if $max_queue_size; + return $self->{max_queue_size}; +} + +sub stream_name { + my ($self, $stream_name) = @_; + $self->{stream_name} = $stream_name if $stream_name; + return $self->{stream_name}; +} + +sub consumer_name { + my ($self, $consumer_name) = @_; + $self->{consumer_name} = $consumer_name if $consumer_name; + return $self->{consumer_name}; +} + + +sub construct { + my ($class, $app, $context) = @_; + $class->peer_handle($class->new($app, $context)->initialize); +} + +sub send { + my $self = shift; + my $msg = OpenSRF::Transport::Redis::Message->new(@_); + + $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body)); + + $msg->osrf_xid($logger->get_osrf_xid); + $msg->from($self->stream_name); + + my $msg_json = $msg->to_json; + + $logger->internal("send(): to=" . $msg->to . " : $msg_json"); + + $self->redis->xadd( + $msg->to, # recipient == stream name + 'NOMKSTREAM', + 'MAXLEN', + '~', # maxlen-ish + $self->max_queue_size, + '*', # let Redis generate the ID + 'message', # gotta call it something + $msg_json + ); +} + + + + +sub process { + my ($self, $timeout) = @_; + + $timeout ||= 0; + + # Redis does not support fractional timeouts. + $timeout = 1 if ($timeout > 0 && $timeout < 1); + + $timeout = int($timeout); + + unless ($self->redis) { + throw OpenSRF::EX::JabberDisconnected + ("This Redis instance is no longer connected to the server "); + } + + return $self->recv($timeout); +} + +# $timeout=0 means check for data without blocking +# $timeout=-1 means block indefinitely. +sub recv { + my ($self, $timeout) = @_; + + $logger->debug("server: watching for content at " . $self->stream_name); + + my @block; + if ($timeout) { + # 0 means block indefinitely in Redis + $timeout = 0 if $timeout == -1; + $timeout *= 1000; # milliseconds + @block = (BLOCK => $timeout); + } + + my $packet = $self->redis->xreadgroup( + GROUP => $self->stream_name, + $self->consumer_name, + @block, + COUNT => 1, + STREAMS => $self->stream_name, + '>' # new messages only + ); + + # Timed out waiting for data. + return undef unless defined $packet; + + # TODO make this more self-documenting. also, too brittle? + my $container = $packet->[0]->[1]->[0]; + my $bus_id = $container->[0]; + my $json = $container->[1]->[1]; + + $logger->internal("recv() $json"); + + # TODO putting this here for now -- it may live somewhere else. + # Ideally this could happen out of band. + # Note if we don't ACK utnil after successfully processing each + # message, a malformed message will stay in the pending list. + # Consider options. + $self->redis->xack($self->stream_name, $self->stream_name, $bus_id); + + my $msg = OpenSRF::Transport::Redis::Message->new(json => $json); + $msg->bus_id($bus_id); + + return undef unless $msg; + + $logger->internal("recv() thread=" . $msg->thread); + + # The message body is doubly encoded as JSON to, among other things, + # support message chunking. + $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body)); + + return $msg; +} + + +sub flush_socket { + my $self = shift; + # Remove all messages from the stream + $self->redis->xtrim($self->stream_name, 'MAXLEN', 0); + return 1; +} + +1; + + diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm new file mode 100644 index 0000000..642880f --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm @@ -0,0 +1,103 @@ +package OpenSRF::Transport::Redis::Message; +use strict; use warnings; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Utils::JSON; +use OpenSRF::EX qw/:try/; +use strict; use warnings; + +sub new { + my ($class, %args) = @_; + my $self = bless({}, $class); + + if ($args{json}) { + $self->from_json($args{json}); + + } else { + $self->{to} = $args{to} || ''; + $self->{from} = $args{from} || ''; + $self->{thread} = $args{thread} || ''; + $self->{body} = $args{body} || ''; + $self->{osrf_xid} = $args{osrf_xid} || ''; + $self->{bus_id} = $args{bus_id} || ''; + } + + return $self; +} + +sub to { + my($self, $to) = @_; + $self->{to} = $to if defined $to; + return $self->{to}; +} +sub from { + my($self, $from) = @_; + $self->{from} = $from if defined $from; + return $self->{from}; +} +sub thread { + my($self, $thread) = @_; + $self->{thread} = $thread if defined $thread; + return $self->{thread}; +} +sub body { + my($self, $body) = @_; + $self->{body} = $body if defined $body; + return $self->{body}; +} + +sub status { + my($self, $status) = @_; + $self->{status} = $status if defined $status; + return $self->{status}; +} +sub type { + my($self, $type) = @_; + $self->{type} = $type if defined $type; + return $self->{type}; +} + +sub err_type {} +sub err_code {} + +sub osrf_xid { + my($self, $osrf_xid) = @_; + $self->{osrf_xid} = $osrf_xid if defined $osrf_xid; + return $self->{osrf_xid}; +} + +sub bus_id { + my($self, $bus_id) = @_; + $self->{bus_id} = $bus_id if defined $bus_id; + return $self->{bus_id}; +} + +sub to_json { + my $self = shift; + + # No nead to encode the bus_id in outbound messages since the ID + # won't exist yet. + return OpenSRF::Utils::JSON->perl2JSON({ + to => $self->{to}, + from => $self->{from}, + osrf_xid => $self->{osrf_xid}, + thread => $self->{thread}, + body => $self->{body} + }); +} + +sub from_json { + my $self = shift; + my $json = shift; + my $hash; + + eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); }; + + if ($@) { + $logger->error("Redis::Message received invalid JSON: $@ : $json"); + return undef; + } + + $self->{$_} = $hash->{$_} for keys %$hash; +} + +1; diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm new file mode 100644 index 0000000..05e8aec --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm @@ -0,0 +1,83 @@ +package OpenSRF::Transport::Redis::PeerConnection; +use strict; +use base qw/OpenSRF::Transport::Redis::Client/; +use Digest::MD5 qw(md5_hex); +use OpenSRF::Utils::Logger qw/$logger/; + +our $_singleton_connection; + +sub retrieve { + my ($class, $app) = @_; + return $_singleton_connection; +} + +sub reset { + return unless $_singleton_connection; + $_singleton_connection->disconnect; + $_singleton_connection = undef; +} + + +sub new { + my ($class, $app, $connection_type) = @_; + + my $peer_con = $class->retrieve; + return $peer_con if ($peer_con and $peer_con->tcp_connected); + + my $conf = OpenSRF::Utils::Config->current->as_hash; + + $conf = $conf->{connections} or + die "No 'connections' block in bootstrap configuration\n"; + + $conf = $conf->{$connection_type} or + die "No '$connection_type' connection in bootstrap configuration\n"; + + $conf = $conf->{message_bus}; + + my $port = $conf->{port} || 6379; + my $host = $conf->{host} || '127.0.0.1'; + my $sock = $conf->{sock}; + my $username = $conf->{username}; + my $password = $conf->{password}; + my $maxlen = $conf->{max_queue_size} || 1000; + + my $stream_name = $app eq 'client' ? 'client:' : "client:$app:"; + $stream_name .= substr(md5_hex($$ . time . rand($$)), 0, 12); + + $logger->debug("PeerConnection::new() ". + "using app=$app username=$username stream_name=$stream_name"); + + my $self = $class->SUPER::new( + host => $host, + port => $port, + sock => $sock, + username => $username, + password => $password, + stream_name => $stream_name, + consumer_name => $stream_name, + max_queue_size => $maxlen + ); + + bless($self, $class); + + $self->app($app); + + return $_singleton_connection = $self; +} + +sub process { + my $self = shift; + my $val = $self->SUPER::process(@_); + return 0 unless $val; + return OpenSRF::Transport->handler($self->app, $val); +} + +sub app { + my $self = shift; + my $app = shift; + $self->{app} = $app if $app; + return $self->{app}; +} + +1; + diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber.pm index a1742e8..aa460b5 100644 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm +++ b/src/perl/lib/OpenSRF/Transport/SlimJabber.pm @@ -11,6 +11,4 @@ classes for handling transport layer messaging sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; } -sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; } - 1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm deleted file mode 100644 index 0fa95c5..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm +++ /dev/null @@ -1,72 +0,0 @@ -package OpenSRF::Transport::SlimJabber::MessageWrapper; -use strict; use warnings; -use OpenSRF::Transport::SlimJabber::XMPPMessage; - -# ---------------------------------------------------------- -# Legacy wrapper for XMPPMessage -# ---------------------------------------------------------- - -sub new { - my $class = shift; - my $msg = shift; - return bless({msg => $msg}, ref($class) || $class); -} - -sub msg { - my($self, $msg) = @_; - $self->{msg} = $msg if $msg; - return $self->{msg}; -} - -sub toString { - return $_[0]->msg->to_xml; -} - -sub get_body { - return $_[0]->msg->body; -} - -sub get_sess_id { - return $_[0]->msg->thread; -} - -sub get_msg_type { - return $_[0]->msg->type; -} - -sub get_remote_id { - return $_[0]->msg->from; -} - -sub setType { - $_[0]->msg->type(shift()); -} - -sub setTo { - $_[0]->msg->to(shift()); -} - -sub setThread { - $_[0]->msg->thread(shift()); -} - -sub setBody { - $_[0]->msg->body(shift()); -} - -sub set_router_command { - $_[0]->msg->router_command(shift()); -} -sub set_router_class { - $_[0]->msg->router_class(shift()); -} - -sub set_osrf_xid { - $_[0]->msg->osrf_xid(shift()); -} - -sub get_osrf_xid { - return $_[0]->msg->osrf_xid; -} - -1; diff --git a/src/perl/lib/OpenSRF/Utils/Config.pm b/src/perl/lib/OpenSRF/Utils/Config.pm index 5553dfb..6f0c3a5 100644 --- a/src/perl/lib/OpenSRF/Utils/Config.pm +++ b/src/perl/lib/OpenSRF/Utils/Config.pm @@ -52,6 +52,7 @@ use XML::LibXML; use OpenSRF::Utils (':common'); use OpenSRF::Utils::Logger; use Net::Domain qw/hostfqdn/; +use XML::Simple; #use overload '""' => \&OpenSRF::Utils::Config::dump_ini; @@ -267,6 +268,8 @@ sub _load { $self->mangle_dirs(); $self->mangle_logs(); + $self->as_hash(XML::Simple->new->XMLin($self->FILE)); + $OpenSRF::Utils::ConfigCache = $self unless $self->nocache; delete $$self{nocache}; delete $$self{force}; @@ -274,6 +277,12 @@ sub _load { return $self; } +sub as_hash { + my ($self, $hash) = @_; + $self->{as_hash} = $hash if $hash; + return $self->{as_hash}; +} + sub sections { my $self = shift; my %filters = @_; diff --git a/src/perl/lib/OpenSRF/Utils/Logger.pm b/src/perl/lib/OpenSRF/Utils/Logger.pm index 6a662ac..91e0f0e 100644 --- a/src/perl/lib/OpenSRF/Utils/Logger.pm +++ b/src/perl/lib/OpenSRF/Utils/Logger.pm @@ -55,13 +55,18 @@ sub INTERNAL { return 5; } sub ALL { return 100; } my $isclient; # true if we control the osrf_xid +my $connection_type; # load up our config options sub set_config { my $force = shift; + my $con_type = shift; + $connection_type = $con_type if $con_type; return if defined $config and !$force; + die "Logger connection type needed\n" unless $connection_type; + $config = OpenSRF::Utils::Config->current; if( !defined($config) ) { $loglevel = INFO(); @@ -69,19 +74,21 @@ sub set_config { return; } - $loglevel = $config->bootstrap->loglevel; + $config = $config->as_hash->{connections}->{$connection_type}; + + $loglevel = $config->{loglevel}; - if ($config->bootstrap->loglength) { + if ($config->{loglength}) { $max_log_msg_len = $config->bootstrap->loglength; } - $service_tag = $config->bootstrap->logtag; + $service_tag = $config->{logtag}; - $logfile = $config->bootstrap->logfile; + $logfile = $config->{logfile}; if($logfile =~ /^syslog/) { $syslog_enabled = 1; $logfile_enabled = 0; - $logfile = $config->bootstrap->syslog; + $logfile = $config->{syslog}; $facility = $logfile; $logfile = undef; $facility = _fac_to_const($facility); @@ -100,7 +107,7 @@ sub set_config { # -------------------------------------------------------------- $act_syslog_enabled = 1; $act_logfile_enabled = 0; - $actfac = $config->bootstrap->actlog || $config->bootstrap->syslog; + $actfac = $config->{actlog} || $config->{syslog}; $actfac = _fac_to_const($actfac); $actfile = undef; } else { @@ -110,10 +117,10 @@ sub set_config { # -------------------------------------------------------------- $act_syslog_enabled = 0; $act_logfile_enabled = 1; - $actfile = $config->bootstrap->actlog || $config->bootstrap->logfile; + $actfile = $config->{actlog} || $config->{logfile}; } - my $client = OpenSRF::Utils::Config->current->bootstrap->client(); + my $client = $config->{client} || ''; if ($ENV{OSRF_LOG_CLIENT} or $ENV{MOD_PERL}) { $isclient = 1; @@ -124,6 +131,7 @@ sub set_config { $isclient = 0; return; } + $isclient = ($client =~ /^true$/iog) ? 1 : 0; } diff --git a/src/srfsh/srfsh.c b/src/srfsh/srfsh.c index 1887bac..02fbe88 100644 --- a/src/srfsh/srfsh.c +++ b/src/srfsh/srfsh.c @@ -111,7 +111,7 @@ int main( int argc, char* argv[] ) { snprintf(fbuf, sizeof(fbuf), "%s/.srfsh.xml", home); if(!access(fbuf, R_OK)) { - if( ! osrf_system_bootstrap_client(fbuf, "srfsh") ) { + if( ! osrf_system_bootstrap_common(fbuf, "srfsh", "srfsh", 0) ) { fprintf(stderr,"Unable to bootstrap client for requests\n"); osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for requests"); return -1; diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index 0b83782..cc4f48e 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -35,11 +35,13 @@ #include #include #include +#include #include #include #include #include #include +#include #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 256 @@ -71,19 +73,20 @@ // opportunity, at which point force-close the connection. #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120 -// Incremented with every REQUEST, decremented with every COMPLETE. -static int requests_in_flight = 0; - // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; static char* config_ctxt = "gateway"; -static char* osrf_router = NULL; -static char* osrf_domain = NULL; // Cache of opensrf thread strings and back-end receipients. // Tracking this here means the caller only needs to track the thread. // It also means we don't have to expose internal XMPP IDs static osrfHash* stateful_session_cache = NULL; + +// Tracks threads that have active requests in flight. +// This covers all request types regardless of connected-ness. +static osrfStringArray* active_threads = NULL; +static osrfStringArray* public_services = NULL; + // Message on STDIN go into our reusable buffer static growing_buffer* stdin_buf = NULL; // OpenSRF XMPP connection handle @@ -100,7 +103,7 @@ static void relay_stdin_message(const char*); static char* extract_inbound_messages(); static void log_request(const char*, osrfMessage*); static void read_from_osrf(); -static void read_one_osrf_message(transport_message*); +static int read_one_osrf_message(transport_message*); static int shut_it_down(int); static void release_hash_string(char*, void*); static int can_shutdown_gracefully(); @@ -133,20 +136,20 @@ int main(int argc, char* argv[]) { // (replies returning to the websocket client). fd_set fds; int stdin_no = fileno(stdin); - int osrf_no = osrf_handle->session->sock_id; - int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + int maxfd = stdin_no; int sel_resp; int shutdown_stat; + struct timeval tv; while (1) { FD_ZERO(&fds); - FD_SET(osrf_no, &fds); + // FD_SET(osrf_no, &fds); FD_SET(stdin_no, &fds); if (shutdown_requested) { - struct timeval tv; tv.tv_usec = 0; tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS; @@ -155,9 +158,20 @@ int main(int argc, char* argv[]) { } else { - // Wait indefinitely for activity to process. - // This will be interrupted during a shutdown request signal. - sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); + if (active_threads->size > 0) { + tv.tv_usec = 0; + tv.tv_sec = 0; + + // Do a non-blocking check for inbound requests while + // we wait for more osrf data to be returned. + sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv); + + } else { + + // No osrf responses pending. Wait indefinitely. + // This will be interrupted during a shutdown request signal. + sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); + } } if (sel_resp < 0) { // error @@ -172,17 +186,18 @@ int main(int argc, char* argv[]) { "WS select() failed with [%s]. Exiting", strerror(errno)); shut_it_down(1); - } - if (sel_resp > 0) { + } else if (sel_resp > 0) { if (FD_ISSET(stdin_no, &fds)) { read_from_stdin(); - } - - if (FD_ISSET(osrf_no, &fds)) { read_from_osrf(); } + + } else if (active_threads->size > 0) { + // Nothing pulled from the websocket, but we still have + // active osrf request. See if any new responses have arrived. + read_from_osrf(); } if (shutdown_requested) { @@ -213,14 +228,13 @@ static int can_shutdown_gracefully() { return -1; } - unsigned long active_sessions = osrfHashGetCount(stateful_session_cache); - if (active_sessions == 0 && requests_in_flight == 0) { + if (active_threads->size == 0) { osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete"); return 1; } osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " - "sessions=%d requests=%d", active_sessions, requests_in_flight); + "active threeds=%d", active_threads); return 0; } @@ -236,6 +250,8 @@ static void rebuild_stdin_buffer() { static int shut_it_down(int stat) { osrfHashFree(stateful_session_cache); + osrfStringArrayFree(active_threads); + osrfStringArrayFree(public_services); buffer_free(stdin_buf); osrf_system_shutdown(); // clean XMPP disconnect exit(stat); @@ -251,20 +267,22 @@ static void child_init(int argc, char* argv[]) { config_file = argv[1]; } - if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) { + if (!osrf_system_bootstrap_common(config_file, config_ctxt, "websocket", 0) ) { fprintf(stderr, "Cannot boostrap OSRF\n"); shut_it_down(1); } - osrf_handle = osrfSystemGetTransportClient(); - osrfAppSessionSetIngress(WEBSOCKET_INGRESS); - - osrf_router = osrfConfigGetValue(NULL, "/router_name"); - osrf_domain = osrfConfigGetValue(NULL, "/domain"); + osrf_handle = osrfSystemGetTransportClient(); + osrfAppSessionSetIngress(WEBSOCKET_INGRESS); stateful_session_cache = osrfNewHash(); osrfHashSetCallback(stateful_session_cache, release_hash_string); + active_threads = osrfNewStringArray(16); + public_services = osrfNewStringArray(16); + + osrfConfigGetValueList(NULL, public_services, "/config/public_services/service"); + client_ip = getenv("REMOTE_ADDR"); osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); } @@ -424,11 +442,16 @@ static void relay_stdin_message(const char* msg_string) { if (!recipient) { if (service) { - int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1, - "%s@%s/%s", osrf_router, osrf_domain, service); - recipient_buf[size] = '\0'; + size_t len = 9 + strlen(service); // service:$name + snprintf(recipient_buf, len, "service:%s", service); recipient = recipient_buf; + if (!osrfStringArrayContains(public_services, service)) { + osrfLogWarning(OSRF_LOG_MARK, + "Request for private or unknown service '%s' forbidden", service); + return; + } + } else { osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); return; @@ -494,11 +517,16 @@ static char* extract_inbound_messages( switch (msg->m_type) { case CONNECT: + if (!osrfStringArrayContains(active_threads, thread)) { + osrfStringArrayAdd(active_threads, thread); + } break; case REQUEST: log_request(service, msg); - requests_in_flight++; + if (!osrfStringArrayContains(active_threads, thread)) { + osrfStringArrayAdd(active_threads, thread); + } break; case DISCONNECT: @@ -568,29 +596,47 @@ static void read_from_osrf() { transport_message* tmsg = NULL; // Double check the socket connection before continuing. - if (!client_connected(osrf_handle) || - !socket_connected(osrf_handle->session->sock_id)) { + if (!client_connected(osrf_handle)) { osrfLogWarning(OSRF_LOG_MARK, "WS: Jabber socket disconnected, exiting"); shut_it_down(1); } + // Once client_recv is called all data waiting on the socket is // read. This means we can't return to the main select() loop after // each message, because any subsequent messages will get stuck in // the opensrf receive queue. Process all available messages. - while ( (tmsg = client_recv(osrf_handle, 0)) ) { + + + // As long as any active requests are in flight, wait up to one + // second to receive a response. Then return to inspect stdin + // to see if there are any requests waiting we can push through. + // Then come back here. + while (1) { + int timeout = active_threads->size > 0 ? 1 : 0; + + tmsg = client_recv(osrf_handle, timeout); + + if (!tmsg) { break; } + read_one_osrf_message(tmsg); + + osrfLogDebug(OSRF_LOG_MARK, + "WS relaying message to STDOUT thread=%s, recipient=%s", + tmsg->thread, tmsg->recipient); + message_free(tmsg); } } // Process a single OpenSRF response message and print the reponse // to STDOUT for delivery to the websocket client. -static void read_one_osrf_message(transport_message* tmsg) { +static int read_one_osrf_message(transport_message* tmsg) { osrfList *msg_list = NULL; osrfMessage *one_msg = NULL; int i; + int complete = 0; osrfLogDebug(OSRF_LOG_MARK, "WS received opensrf response for thread=%s", tmsg->thread); @@ -637,14 +683,17 @@ static void read_one_osrf_message(transport_message* tmsg) { } else { - // connection timed out; clear the cached recipient - if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { + // Any error conditions ends the conversation + if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) { osrfHashRemove(stateful_session_cache, tmsg->thread); + osrfStringArrayRemove(active_threads, tmsg->thread); } else { if (one_msg->status_code == OSRF_STATUS_COMPLETE) { - requests_in_flight--; + osrfLogInternal(OSRF_LOG_MARK, + "WS Marking request complete for thread %s", tmsg->thread); + osrfStringArrayRemove(active_threads, tmsg->thread); } } } @@ -682,6 +731,8 @@ static void read_one_osrf_message(transport_message* tmsg) { free(msg_string); jsonObjectFree(msg_wrapper); + + return complete; } -- 2.11.0