LPXXX Replace XMPP with Redis (C & Perl)
authorBill Erickson <berickxx@gmail.com>
Mon, 27 Dec 2021 20:58:13 +0000 (15:58 -0500)
committerBill Erickson <berickxx@gmail.com>
Tue, 21 Jun 2022 15:27:42 +0000 (11:27 -0400)
* Use Redis streams / consumer groups
* Includes modified opensrf_core.xml example file
* adds message bus management to opensrf-perl.pl
* Adds public services filter to gateway / websocket translator
* Readme for install

TODO

* Modify install makefiles
* Modify base INSTALL docs
* Trim opensrf_core.xml.example to avoid refs to Evergreen services
  and move the current one to Evergreen.
* Consider if/how/when we want to use redis NOACK / XACK.
* add logfile support back to opensrf_core.xml.exmple
* more, i'm sure

Signed-off-by: Bill Erickson <berickxx@gmail.com>
36 files changed:
.gitignore
Makefile.am
README_REDIS.md [new file with mode: 0644]
bin/opensrf-perl.pl.in
configure.ac
examples/opensrf_core.xml.example
include/opensrf/osrf_system.h
include/opensrf/transport_client.h
include/opensrf/transport_message.h
src/Makefile.am
src/extras/timer.pl [new file with mode: 0755]
src/gateway/Makefile.am
src/gateway/osrf_http_translator.c
src/gateway/osrf_json_gateway.c
src/libopensrf/Makefile.am
src/libopensrf/osrf_app_session.c
src/libopensrf/osrf_application.c
src/libopensrf/osrf_prefork.c
src/libopensrf/osrf_system.c
src/libopensrf/transport_client.c
src/libopensrf/transport_message.c
src/perl/MANIFEST
src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/Application.pm
src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/System.pm
src/perl/lib/OpenSRF/Transport.pm
src/perl/lib/OpenSRF/Transport/Redis/Client.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/Redis/Message.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/SlimJabber.pm
src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm [deleted file]
src/perl/lib/OpenSRF/Utils/Config.pm
src/perl/lib/OpenSRF/Utils/Logger.pm
src/srfsh/srfsh.c
src/websocket-stdio/osrf-websocket-stdio.c

index 181b1d1..b8915ef 100644 (file)
@@ -1,3 +1,4 @@
+test-driver
 aclocal.m4
 autom4te.cache/
 bin/opensrf-perl.pl
index bf1effe..4b09ef6 100644 (file)
@@ -19,11 +19,12 @@ endif
 export PREFIX                   = @prefix@
 export TMP                      = @TMP@
 export LIBXML2_HEADERS          = @LIBXML2_HEADERS@
+export HIREDIS_HEADERS          = @HIREDIS_HEADERS@
 export APR_HEADERS              = @APR_HEADERS@
 export ETCDIR                   = @sysconfdir@
 export APXS2                    = @APXS2@
 export APACHE2_HEADERS          = @APACHE2_HEADERS@
-export DEF_CFLAGS               = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@
+export DEF_CFLAGS               = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@
 export DEF_LDLIBS               = -lopensrf
 export VAR                      = @localstatedir@
 export PID                      = @localstatedir@/run/opensrf
diff --git a/README_REDIS.md b/README_REDIS.md
new file mode 100644 (file)
index 0000000..e59c0f5
--- /dev/null
@@ -0,0 +1,62 @@
+# OpenSRF-Over-Redis
+
+Proof of concept project to replace XMPP / Ejabberd with Redis as the
+OpenSRF message transport layer.
+
+## Install
+
+### Install Redis version 6.x for ACL support.
+
+NOTE: Redis v6 is the default version in Ubuntu 22.04
+
+```sh
+
+curl -fsSL https://packages.redis.io/gpg | sudo gpg --dearmor -o /usr/share/keyrings/redis-archive-keyring.gpg
+
+echo "deb [signed-by=/usr/share/keyrings/redis-archive-keyring.gpg] https://packages.redis.io/deb $(lsb_release -cs) main" \
+    | sudo tee /etc/apt/sources.list.d/redis.list
+
+sudo apt update
+sudo apt install redis-server libredis-perl libhiredis-dev 
+
+```
+
+### Disable Redis Snapshot Disk Persistence
+
+Optional but recommended since disk persistence adds unnecessary overhead.
+
+#### Edit /etc/redis/redis.conf and un-comment the <code># save ""</code> line:
+
+```conf
+# Snapshotting can be completely disabled with a single empty string argument
+# as in following example:
+#
+save ""
+#
+```
+
+#### Restart Redis
+
+```sh
+sudo systemctl restart redis
+```
+
+### Install OpenSRF Config and Initialize/Reset Message Bus
+
+```sh
+# mileage may vary on these commands
+
+sudo su opensrf 
+
+# Backup the original config
+mv /openils/conf/opensrf_core.xml /openils/conf/opensrf_core.xml.orig    
+
+# From the OpenSRF repository root directory.
+# Copy and modify the new config as needed.
+# TODO move this example config into Evergreen since it references EG services.
+cp examples/opensrf_core.xml.example /openils/conf/opensrf_core.xml
+
+# reset/init message bus
+osrf_control -l --reset-message-bus     
+
+```
index b2bced6..afb7aa8 100755 (executable)
@@ -25,6 +25,7 @@ use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Transport::Listener;
 use OpenSRF::Utils;
 use OpenSRF::Utils::Config;
+use Redis;
 
 my $opt_service = undef;
 my $opt_config = "@CONF_DIR@/opensrf_core.xml";
@@ -62,6 +63,7 @@ my $opt_reload_all = 0;
 my $opt_quiet = 0;
 my $opt_diagnostic = 0;
 my $opt_ignore_orphans = 0;
+my $opt_reset_message_bus = 0;
 my $sclient;
 my @perl_services;
 my @nonperl_services;
@@ -104,6 +106,7 @@ GetOptions(
     'reload' => \$opt_reload,
     'reload-all' => \$opt_reload_all,
     'diagnostic' => \$opt_diagnostic,
+    'reset-message-bus' => \$opt_reset_message_bus,
     'ignore-orphans' => \$opt_ignore_orphans
 );
 
@@ -112,7 +115,7 @@ if ($opt_localhost) {
     $ENV{OSRF_HOSTNAME} = $hostname;
 }
 
-my $C_COMMAND = "opensrf-c -c $opt_config -x opensrf -p $opt_pid_dir -h $hostname";
+my $C_COMMAND = "opensrf-c -c $opt_config -x service -p $opt_pid_dir -h $hostname";
 
 sub verify_services {
     my $service = shift;
@@ -295,6 +298,7 @@ sub do_diagnostic {
 
 
 sub do_start_router {
+    return;
 
     my $pidfile = get_pid_file('router');
     `opensrf_router $opt_config routers $pidfile`;
@@ -412,7 +416,7 @@ sub do_start {
 
 sub do_start_all {
     msg("starting router and services for $hostname");
-    do_start('router');
+    #do_start('router');
     return do_start_services();
 }
 
@@ -569,7 +573,7 @@ sub do_stop_all {
 
     # graceful shutdown requires the presence of the router, so stop the 
     # router last.  See if it's running first to avoid unnecessary warnings.
-    do_stop('router', $signals[0]) if get_service_pids_from_file('router'); 
+    #do_stop('router', $signals[0]) if get_service_pids_from_file('router'); 
 
     return 1;
 }
@@ -599,7 +603,7 @@ sub do_daemon {
 # parses the local settings file
 sub load_settings {
     my $conf = OpenSRF::Utils::Config->current;
-    my $cfile = $conf->bootstrap->settings_config;
+    my $cfile = $conf->as_hash->{settings_config};
     return unless $cfile;
     my $parser = OpenSRF::Utils::SettingsParser->new();
     $parser->initialize( $cfile );
@@ -607,6 +611,80 @@ sub load_settings {
         $parser->get_server_config($conf->env->hostname);
 }
 
+# Clear the service: and client: queues of lingering messages.  Apply
+# bus access and permissions for accounts defined in the <connections/>
+# block.  This will generally be called before we connect to the system,
+# since the system may not yet have access to connect to the bus.
+sub do_reset_message_bus {
+
+    my $base_conf = 
+        OpenSRF::Utils::Config->load(config_file => $opt_config, nocache => 1)->as_hash
+        or die "No suitable configuration found at $opt_config\n";
+
+    my $connections = $base_conf->{connections} or
+        die "No connection configuration found in $opt_config\n";
+
+    my $redis;
+
+    for my $type (keys %$connections) {
+        my $conf = $connections->{$type};
+        my $bus_conf = $conf->{message_bus};
+
+        if (!$redis) {
+            # Though connection details may vary by connection, we
+            # assume all connections in a single bootstratp config
+            # ultimately point to the same Redis instance.
+            my $host = $bus_conf->{host};
+            my $port = $bus_conf->{port};
+            my $sock = $bus_conf->{sock};
+
+            # This redis connection uses the "default" account, which has
+            # access to all actions and keys so it can manage access for
+            # various opensrf account types.
+            my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port");
+
+            $redis = Redis->new(@connect_args) or 
+                die "Cannot connect to Redis instance at @connect_args\n";
+
+            # Clear all the data
+            msg("Clearing all data from message bus: @connect_args");
+            $redis->flushall;
+        }
+
+        msg("Applying bus access for connection type: $type");
+
+        my $username = $bus_conf->{username};
+        my $password = $bus_conf->{password};
+
+        $redis->acl('SETUSER', $username, 'reset');
+        $redis->acl('SETUSER', $username, 'on', ">$password");
+
+        # Privileged clients can post message to all services and perform
+        # delete operations to clear their own client: enpoints from 
+        # (i.e. flush socket).
+        my $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:* ~service:*';
+        
+        if (!$conf->{privileged}) {
+            msg("Limiting bus access for non-privileged connection type=$type");
+
+            # Unprivileged connections have access toa subset of the 
+            # available (i.e. public) services.
+            $permissions = '-@all +xgroup +xadd +xreadgroup +xack +xtrim +del ~client:*';
+
+            # Non-privileged connections can only interact with public
+            # service bus endpoints.
+            for my $service (@{$base_conf->{public_services}->{service}}) {
+                $redis->acl('SETUSER', $username, "~service:$service");
+            }
+        }
+
+        $redis->acl('SETUSER', $username, split(/ /, $permissions));
+
+    }
+
+    $redis->quit;
+}
+
 sub msg {
     my $m = shift;
     print "* $m\n" unless $opt_quiet;
@@ -753,10 +831,16 @@ sub do_help {
         and gracefully re-launch drone processes.  The -all variant sends
         the signal to all services.  The non-(-all) variant requires a
         --service.
+
+    --reset-message-bus
+        Clear ALL data from the message bus, create opensrf accounts w/ 
+        permission to access message queues.
 HELP
 exit;
 }
 
+do_reset_message_bus() if $opt_reset_message_bus;
+
 # we do not verify services for stop/signal actions, since those may
 # legitimately be used against services not (or no longer) configured
 # to run on the selected host.
@@ -808,6 +892,7 @@ do_diagnostic() if $opt_diagnostic;
 
 # show help if no action was requested
 do_help() if $opt_help or not (
+    $opt_reset_message_bus or
     $opt_start or 
     $opt_start_all or 
     $opt_start_services or 
index 21963b0..590cb99 100644 (file)
@@ -208,6 +208,12 @@ AC_ARG_WITH([libxml],
 [LIBXML2_HEADERS=/usr/include/libxml2/])
 AC_SUBST([LIBXML2_HEADERS])
 
+AC_ARG_WITH([hiredis],
+[  --with-hiredis=path               location of the hiredis headers (default is /usr/include/hiredis/))],
+[HIREDIS_HEADERS=${withval}],
+[HIREDIS_HEADERS=/usr/include/hiredis/])
+AC_SUBST([HIREDIS_HEADERS])
+
 AC_ARG_WITH([includes],
 [  --with-includes=DIRECTORIES      a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
 [EXTRA_USER_INCLUDES=${withval}])
@@ -274,6 +280,7 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
        AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers))
        AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
        AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
+    AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
        # Check for libmemcached and set flags accordingly
        PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
        AC_SUBST(memcached_CFLAGS)
@@ -326,7 +333,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
                         src/libopensrf/Makefile
                         src/perl/Makefile
                         src/ports/strn_compat/Makefile
-                        src/router/Makefile
                         src/srfsh/Makefile
                         src/websocket-stdio/Makefile
                         tests/Makefile
@@ -346,5 +352,6 @@ AC_MSG_RESULT([--------------------- Configuration options:  -------------------
         AC_MSG_RESULT(APR headers location:            ${APR_HEADERS})
         AC_MSG_RESULT(Apache version:                  ${APACHE_READABLE_VERSION})
         AC_MSG_RESULT(libxml2 headers location:        ${LIBXML2_HEADERS})
+        AC_MSG_RESULT(libhiredis headers location:     ${HIREDIS_HEADERS})
 
 AC_MSG_RESULT([----------------------------------------------------------------------])
index 8c99cf8..c6db0c5 100644 (file)
 <?xml version="1.0"?>
-<!-- 
-vim:et:ts=2:sw=2:
--->
 <config>
 
-  <!-- bootstrap config for OpenSRF apps -->
-  <opensrf>
-
-    <routers>
-
-      <!-- define the list of routers our services will register with -->
-
-      <router>
-
-        <!-- This is the public router.  On this router, we only register applications
-             which should be accessible to everyone on the opensrf network -->
-        <name>router</name>
-        <domain>public.localhost</domain>
-        <services>
-            <service>opensrf.math</service> 
-        </services>
-      </router>
-
-      <router>
-        <!-- This is the private router.  All applications must register with 
-            this router, so no explicit <services> section is required -->
-        <name>router</name>
-        <domain>private.localhost</domain>
-      </router>
-    </routers>
-
-
-    <!-- Jabber login settings
-        Our domain should match that of the private router -->
-    <domain>private.localhost</domain>
-    <username>opensrf</username>
-    <passwd>password</passwd>
-    <port>5222</port>
-    <!-- name of the router used on our private domain.  
-        this should match one of the <name> of the private router above -->
-    <router_name>router</router_name>
-
-    <!-- Log a warning when an outbound message reaches this size in bytes -->
-    <msg_size_warn>1800000</msg_size_warn>
-
-    <!-- log file settings ======================================  -->
-    <!-- log to a local file -->
-    <logfile>LOCALSTATEDIR/log/osrfsys.log</logfile>
-
-    <!-- Log to syslog. You can use this same layout for 
-        defining the logging of all services in this file -->
-    <!--
-    <logfile>syslog</logfile>
-    <syslog>local2</syslog>
-    <actlog>local1</actlog>
-    -->
-    <!-- Optional log tag.  You can use this to help distinguish
-         syslog entries when running multiple OpenSRF stacks on the
-         same server. -->
-    <!--
-    <logtag>instance1</logtag>
-    -->
-
-    <!-- 0 None, 1 Error, 2 Warning, 3 Info, 4 debug, 5 Internal (Nasty) -->
-    <loglevel>3</loglevel>
-
-    <!-- Maximum log message length; if using syslog, you might need to adjust
-         your syslog service's configuration to match longer message lengths -->
-    <loglength>1536</loglength>
-
-    <!-- config file for the services -->
-    <settings_config>SYSCONFDIR/opensrf.xml</settings_config>
-
-  </opensrf>
-
-  <!-- The section between <gateway>...</gateway> is a standard OpenSRF C stack config file -->
-  <gateway>
-
-    <!--
-    we consider ourselves to be the "originating" client for requests,
-    which means we define the log XID string for log traces
-    -->
-    <client>true</client>
-
-    <!--  the routers's name on the network -->
-    <router_name>router</router_name>
-
-    <!-- jabber login info -->
-    <!-- The gateway connects to the public domain -->
-    <domain>public.localhost</domain>
-    <username>opensrf</username>
-    <passwd>password</passwd>
-    <port>5222</port>
-    <logfile>LOCALSTATEDIR/log/gateway.log</logfile>
-    <loglevel>3</loglevel>
-
-    <!-- cross origin HTTP settings http://en.wikipedia.org/wiki/Cross-origin_resource_sharing -->
-    <cross_origin>
-        <!-- specify individual hosts -->
-        <!-- <origin>example.com</origin> -->
-        <!-- ...or use the * wildcard to match all -->
-        <!-- <origin>*</origin> -->
-    </cross_origin>
-
-  </gateway>
-
-  <!-- ======================================================================================== -->
-
-    <routers>
-        <router> <!-- public router -->
-            <trusted_domains>
-                <!-- allow private services to register with this router 
-                     and public clients to send requests to this router. -->
-                <server>private.localhost</server>
-                <!-- also allow private clients to send to the router so it can receive error messages -->
-                <client>private.localhost</client>
-                <client>public.localhost</client>
-            </trusted_domains>
-            <transport>
-                <server>public.localhost</server>
-                <port>5222</port>
-                <unixpath>LOCALSTATEDIR/sock/unix_sock</unixpath>
-                <username>router</username>
-                <password>password</password>
-                <resource>router</resource>
-                <connect_timeout>10</connect_timeout>
-                <max_reconnect_attempts>5</max_reconnect_attempts>
-            </transport>
-            <logfile>LOCALSTATEDIR/log/router.log</logfile>
-            <!--
-            <logfile>syslog</logfile>
-            <syslog>local2</syslog>
-            -->
-            <!--
-            <logtag>instance1</logtag>
-            -->
-            <loglevel>2</loglevel>
-        </router>
-        <router> <!-- private router -->
-            <trusted_domains>
-                <server>private.localhost</server>
-                <!-- only clients on the private domain can send requests to this router -->
-                <client>private.localhost</client>
-            </trusted_domains>
-            <transport>
-                <server>private.localhost</server>
-                <port>5222</port>
-                <username>router</username>
-                <password>password</password>
-                <resource>router</resource>
-                <connect_timeout>10</connect_timeout>
-                <max_reconnect_attempts>5</max_reconnect_attempts>
-            </transport>
-            <logfile>LOCALSTATEDIR/log/router.log</logfile>
-            <!--
-            <logfile>syslog</logfile>
-            <syslog>local2</syslog>
-            -->
-            <!--
-            <logtag>instance1</logtag>
-            -->
-            <loglevel>4</loglevel>
-        </router>
-    </routers>
-
-  <!-- ======================================================================================== -->
-
-  <shared>
-    <!-- Any methods which match any of these match_string node values will
-         have their params redacted from lower-level input logging.
-         Adjust these examples as needed. -->
-    <log_protect>
-    <!--
-      <match_string>open-ils.auth</match_string>
-      <match_string>open-ils.some_service.some_method</match_string>
-    -->
-    </log_protect>
-  </shared>
+  <connections>
+    <service>
+
+      <!-- 
+        Can this connection talk to private services?
+      -->
+      <privileged>true</privileged>
+
+      <message_bus>
+        <host>127.0.0.1</host>
+        <port>6379</port>
+        <username>opensrf@private</username>
+        <password>password</password>
+
+        <!-- 
+          Max number of un-acknowledged messages Redis will retain in the 
+          message stream before discarding the oldest.  This value needs
+          to be *something* or queues will grow unbounded,  service-level
+          request queues especially. 
+        -->
+        <max_queue_size>1000</max_queue_size>
+
+      </message_bus>
+
+      <logfile>syslog</logfile>
+      <syslog>local0</syslog>
+      <actlog>local1</actlog>
+      <loglevel>3</loglevel>
+    </service>
+
+    <gateway>
+      <message_bus>
+        <host>127.0.0.1</host>
+        <port>6379</port>
+        <username>opensrf@public</username>
+        <password>password</password>
+      </message_bus>
+      <client>true</client>
+      <logfile>syslog</logfile>
+      <syslog>local6</syslog>
+      <actlog>local1</actlog>
+      <loglevel>3</loglevel>
+    </gateway>
+  </connections>
+
+  <settings_config>/openils/conf/opensrf.xml</settings_config>
+
+  <public_services>
+    <service>opensrf.math</service>
+    <service>open-ils.actor</service>
+    <service>open-ils.acq</service>
+    <service>open-ils.auth</service>
+    <service>open-ils.auth_proxy</service>
+    <service>open-ils.booking</service>
+    <service>open-ils.cat</service>
+    <service>open-ils.circ</service>
+    <service>open-ils.collections</service>
+    <service>open-ils.courses</service>
+    <service>open-ils.curbside</service>
+    <service>open-ils.fielder</service>
+    <service>open-ils.pcrud</service>
+    <service>open-ils.permacrud</service>
+    <service>open-ils.reporter</service>
+    <service>open-ils.resolver</service>
+    <service>open-ils.search</service>
+    <service>open-ils.supercat</service>
+    <service>open-ils.url_verify</service>
+    <service>open-ils.vandelay</service>
+    <service>open-ils.serial</service>
+    <service>open-ils.ebook_api</service>
+  </public_services>
+
+  <log_protect>
+    <match_string>open-ils.auth.authenticate.verify</match_string>
+    <match_string>open-ils.auth.authenticate.complete</match_string>
+    <match_string>open-ils.auth.login</match_string>
+    <match_string>open-ils.auth_proxy.login</match_string>
+    <match_string>open-ils.actor.patron.password_reset.commit</match_string>
+    <match_string>open-ils.actor.user.password</match_string>
+    <match_string>open-ils.actor.user.username</match_string>
+    <match_string>open-ils.actor.user.email</match_string>
+    <match_string>open-ils.actor.patron.update</match_string>
+    <match_string>open-ils.cstore.direct.actor.user.create</match_string>
+    <match_string>open-ils.cstore.direct.actor.user.update</match_string>
+    <match_string>open-ils.cstore.direct.actor.user.delete</match_string>
+    <match_string>open-ils.search.z3950.apply_credentials</match_string>
+    <match_string>open-ils.geo</match_string>
+    <match_string>open-ils.actor.geo</match_string>
+  </log_protect>
 
 </config>
index 18d8c85..cab0f8c 100644 (file)
@@ -19,10 +19,16 @@ extern "C" {
 
 void osrfSystemSetPidFile( const char* name );
 
-int osrf_system_bootstrap_client( char* config_file, char* contextnode );
+int osrf_system_bootstrap_common(const char* config_file, 
+    const char* contextnode, const char* appname, int is_service);
 
-int osrfSystemBootstrapClientResc( const char* config_file,
-               const char* contextnode, const char* resource );
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode);
+
+int osrf_system_bootstrap_service(
+    const char* config_file, const char* contextnode, const char* appname);
+
+int osrfSystemBootstrapClientResc(const char* config_file, 
+    const char* contextnode, const char* appname);
 
 int osrfSystemBootstrap( const char* hostname, const char* configfile,
                const char* contextNode );
index 4307af9..2c7a8f8 100644 (file)
@@ -10,6 +10,7 @@
 */
 
 #include <time.h>
+#include <hiredis.h>
 #include <opensrf/transport_session.h>
 #include <opensrf/utils.h>
 #include <opensrf/log.h>
@@ -29,18 +30,34 @@ struct message_list_struct;
 struct transport_client_struct {
        transport_message* msg_q_head;   /**< Head of message queue */
        transport_message* msg_q_tail;   /**< Tail of message queue */
-       transport_session* session;      /**< Manages lower-level message processing */
+    redisContext* bus;
+
+    // Our communication stream.
+    // This will be unique for all connections except service-level
+    // (Listener) connections.
+    char* stream_name;
+
+    // Our unique name.
+    // Will match the unique stream_name for non-service-level connections.
+    char* consumer_name;
+
+    int max_queue_size;
+
+    int port;
+    char* unix_path;
        int error;                       /**< Boolean: true if an error has occurred */
        char* host;                      /**< Domain name or IP address of the Jabber server */
        char* xmpp_id;                   /**< Jabber ID used for outgoing messages */
 };
 typedef struct transport_client_struct transport_client;
 
-transport_client* client_init( const char* server, int port, const char* unix_path, int component );
+transport_client* client_init( const char* server, int port, const char* unix_path );
 
-int client_connect( transport_client* client, 
-               const char* username, const char* password, const char* resource,
-               int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type );
+int client_connect_with_stream_name(transport_client* client, const char* username, const char* password); 
+int client_connect_as_service(transport_client* client, 
+    const char* appname, const char* username, const char* password); 
+int client_connect(transport_client* client, 
+    const char* appname, const char* username, const char* password); 
 
 int client_disconnect( transport_client* client );
 
index fb43f92..65a8447 100644 (file)
@@ -54,6 +54,7 @@ struct transport_message_struct {
        int error_code;        /**< Value of the "code" attribute of &lt;error&gt;. */
        int broadcast;         /**< Value of the "broadcast" attribute in the message element. */
        char* msg_xml;         /**< The entire message as XML, complete with entity encoding. */
+       char* msg_json;         /**< The entire message as JSON*/
        struct transport_message_struct* next;
 };
 typedef struct transport_message_struct transport_message;
@@ -62,6 +63,7 @@ transport_message* message_init( const char* body, const char* subject,
                const char* thread, const char* recipient, const char* sender );
 
 transport_message* new_message_from_xml( const char* msg_xml );
+transport_message* new_message_from_json( const char* msg_json );
 
 void message_set_router_info( transport_message* msg, const char* router_from,
                const char* router_to, const char* router_class, const char* router_command,
@@ -70,6 +72,7 @@ void message_set_router_info( transport_message* msg, const char* router_from,
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
 
 int message_prepare_xml( transport_message* msg );
+int message_prepare_json( transport_message* msg );
 
 int message_free( transport_message* msg );
 
index 0f90f2c..370b6b2 100644 (file)
@@ -31,7 +31,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas
 endif
 
 if BUILDCORE
-MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
+MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio
 dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl
 bin_SCRIPTS = @top_srcdir@/bin/osrf_config
 dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example 
diff --git a/src/extras/timer.pl b/src/extras/timer.pl
new file mode 100755 (executable)
index 0000000..6dedff4
--- /dev/null
@@ -0,0 +1,273 @@
+#!/usr/bin/perl
+use strict;
+use warnings;
+use Getopt::Long;
+use OpenSRF::Utils::Logger q/$logger/;
+use OpenSRF::AppSession;
+use OpenSRF::Application;
+use Time::HiRes qw/time/;
+
+# Testing with Evergreen storage service by default.
+# I tried using opensrf.settings but for reasons I didn't investigate
+# hitting opensrf.settings with lots of requests lead to failures.
+#my $test_service = "open-ils.storage";
+#my $test_service = "opensrf.settings";
+my $test_service = "open-ils.cstore";
+
+my $iterations = 50;
+my $parallel = 1;
+
+my $small_echo_data = <<TEXT;
+    1237012938471029348170197908709870987098709870987098709809870987098709870
+    1237012938471029348170197908709870987098709870987098709809870987098709870
+TEXT
+
+my $large_echo_data = join('', <DATA>);
+my $med_echo_data = substr($large_echo_data, length($large_echo_data) / 2);
+
+my $osrf_config = '/openils/conf/opensrf_core.xml';
+my $ops = GetOptions('osrf-config=s' => \$osrf_config);
+
+sub echoloop {
+    my $data = shift;
+    my $ses = shift || OpenSRF::AppSession->create($test_service);
+    my $connected = shift || 0;
+
+    my $start = time;
+    for (0..$iterations) {
+        my $resp = $ses->request('opensrf.system.echo', "$data$_")->gather(1);
+
+        if ($resp eq "$data$_") {
+            print "+";
+        } else {
+            warn "Got bad data: $resp\n";
+        }
+    }
+
+    my $dur = time - $start;
+    my $avg = $dur / $iterations;
+
+    if ($parallel == 1) {
+        print sprintf(
+            " Connected=$connected Size=%d\tTotal Duration: %0.3f\tAvg Duration: %0.4f\n", 
+            length($data), $dur, $avg
+        );
+    }
+}
+
+sub do_the_stuff {
+
+    echoloop($small_echo_data);
+    echoloop($med_echo_data);
+    echoloop($large_echo_data);
+
+    # Connected sessions
+
+    my $ses = OpenSRF::AppSession->create($test_service);
+
+    $ses->connect;
+    echoloop($small_echo_data, $ses, 1);
+    $ses->disconnect;
+
+    $ses->connect;
+    echoloop($med_echo_data, $ses, 1);
+    $ses->disconnect;
+
+    $ses->connect;
+    echoloop($large_echo_data, $ses, 1);
+    $ses->disconnect;
+}
+
+for (1..$parallel) {
+    
+    if (fork() == 0) {
+        OpenSRF::System->bootstrap_client(config_file => $osrf_config);
+        do_the_stuff();
+        exit;
+    }
+}
+
+while (wait > 0) {}
+print "\n";
+
+__DATA__
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
+asodifuyasoidufyasoidfuyasodfiuyasdofiuaysdofiuaysdfoiasuyfaosidufyasdoifyasoi
index 666fb06..eeffc75 100644 (file)
@@ -18,7 +18,7 @@ endif
 EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \
        @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c
 
-AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
+AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
 AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf
 AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR`
 
index ec8a685..6b59307 100644 (file)
@@ -517,7 +517,7 @@ static apr_status_t childExit(void* data) {
 #endif
 
 static void childInit(apr_pool_t *p, server_rec *s) {
-       if(!osrfSystemBootstrapClientResc(configFile, configCtx, "translator")) {
+       if(!osrf_system_bootstrap_common(configFile, configCtx, "translator", 0)) {
                ap_log_error( APLOG_MARK, APLOG_ERR, 0, s, 
                        "Unable to Bootstrap OpenSRF Client with config %s..", configFile);
                return;
index 783ebc9..a65a3bd 100644 (file)
@@ -31,6 +31,7 @@ char* osrf_json_gateway_config_file = NULL;
 int bootstrapped = 0;
 int numserved = 0;
 osrfStringArray* allowedOrigins = NULL;
+static osrfStringArray* public_services = NULL;
 
 static const char* osrf_json_gateway_set_default_locale(cmd_parms *parms,
                void *config, const char *arg) {
@@ -82,12 +83,15 @@ static void osrf_json_gateway_child_init(apr_pool_t *p, server_rec *s) {
        int t = time(NULL);
        snprintf(buf, sizeof(buf), "%d", t);
 
-       if( ! osrfSystemBootstrapClientResc( cfg, CONFIG_CONTEXT, buf ) ) {
+       if( ! osrf_system_bootstrap_common( cfg, CONFIG_CONTEXT, buf, 0 ) ) {
                ap_log_error( APLOG_MARK, APLOG_ERR, 0, s,
                        "Unable to Bootstrap OpenSRF Client with config %s..", cfg);
                return;
        }
 
+    public_services = osrfNewStringArray(16);
+    osrfConfigGetValueList(NULL, public_services, "/config/public_services/service");
+
        allowedOrigins = osrfNewStringArray(4);
        osrfConfigGetValueList(NULL, allowedOrigins, "/cross_origin/origin");
 
@@ -222,7 +226,8 @@ static int osrf_json_gateway_method_handler (request_rec *r) {
        /* ----------------------------------------------------------------- */
 
 
-       if(!(service && method)) {
+       if(!(service && method) || 
+        (service && !osrfStringArrayContains(public_services, service))) {
 
                osrfLogError(OSRF_LOG_MARK,
                        "Service [%s] not found or not allowed", service);
index fd3729b..49e2ecd 100644 (file)
@@ -29,7 +29,6 @@ TARGS =               osrf_message.c \
                        osrfConfig.c \
                        osrf_application.c \
                        osrf_cache.c \
-                       osrf_transgroup.c \
                        osrf_list.c \
                        osrf_hash.c \
                        osrf_utf8.c \
index df83e3e..9befeaf 100644 (file)
@@ -541,6 +541,8 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
                return NULL;
        }
 
+    /*
+
        // Get a list of domain names from the config settings;
        // ignore all but the first one in the list.
        osrfStringArray* arr = osrfNewStringArray(8);
@@ -579,8 +581,14 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
                free( session );
                return NULL;
        }
+    */
+
+    growing_buffer *buf = buffer_init(32);
+    buffer_add(buf, "service:");
+    buffer_add(buf, remote_service);
 
-       session->remote_id = strdup(target_buf);
+       session->remote_id = buffer_release(buf);
+    //session->remote_id = strdup(remote_service);
        session->orig_remote_id = strdup(session->remote_id);
        session->remote_service = strdup(remote_service);
        session->session_locale = NULL;
index b8cb7c1..47dc5d4 100644 (file)
@@ -331,7 +331,7 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName,
                return -1;
        }
 
-       osrfLogDebug( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
+       osrfLogInternal( OSRF_LOG_MARK, "Registering method %s for app %s", methodName, appName );
 
        // Extract the only valid option bits, and ignore the rest.
        int opts = options & ( OSRF_METHOD_STREAMING | OSRF_METHOD_CACHABLE );
index 845a64e..6d6b5ca 100644 (file)
@@ -183,16 +183,16 @@ int osrf_prefork_run( const char* appname ) {
        free( max_backlog_queue );
        /* --------------------------------------------------- */
 
-       char* resc = va_list_to_string( "%s_listener", appname );
+       //char* resc = va_list_to_string( "%s_listener", appname );
 
        // Make sure that we haven't already booted
-       if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+       if( !osrf_system_bootstrap_common(NULL, "service", appname, 1)) {
                osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
-               free( resc );
+               //free( resc );
                return -1;
        }
 
-       free( resc );
+       //free( resc );
 
        prefork_simple forker;
 
@@ -211,7 +211,7 @@ int osrf_prefork_run( const char* appname ) {
        prefork_launch_children( &forker );
 
        // Tell the router that you're open for business.
-       osrf_prefork_register_routers( appname, false );
+       //osrf_prefork_register_routers( appname, false );
 
        signal( SIGUSR1, sigusr1_handler);
        signal( SIGUSR2, sigusr2_handler);
@@ -372,7 +372,7 @@ static int prefork_child_init_hook( prefork_child* child ) {
 
        // Connect to cache server(s).
        osrfSystemInitCache();
-       char* resc = va_list_to_string( "%s_drone", child->appname );
+       //char* resc = va_list_to_string( "%s_drone", child->appname );
 
        // If we're a source-client, tell the logger now that we're a new process.
        char* isclient = osrfConfigGetValue( NULL, "/client" );
@@ -385,13 +385,13 @@ static int prefork_child_init_hook( prefork_child* child ) {
        osrfSystemIgnoreTransportClient();
 
        // Connect to Jabber
-       if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+       if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
                osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
-               free( resc );
+               //free( resc );
                return -1;
        }
 
-       free( resc );
+       //free( resc );
 
        // Dynamically call the application-specific initialization function
        // from a previously loaded shared library.
@@ -425,7 +425,7 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
        if( !client_connected( client )) {
                osrfSystemIgnoreTransportClient();
                osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
-               if( !osrf_system_bootstrap_client( NULL, NULL )) {
+               if( !osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
                        osrfLogError( OSRF_LOG_MARK,
                                "Unable to bootstrap client in prefork_child_process_request()" );
                        sleep( 1 );
@@ -434,7 +434,8 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
        }
 
        // Construct the message from the xml.
-       transport_message* msg = new_message_from_xml( data );
+       //transport_message* msg = new_message_from_xml( data );
+       transport_message* msg = new_message_from_json( data );
 
        // Respond to the transport message.  This is where method calls are buried.
        osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
@@ -706,7 +707,7 @@ static void sigchld_handler( int sig ) {
 */
 static void sigusr1_handler( int sig ) {
        if (!global_forker) return;
-       osrf_prefork_register_routers(global_forker->appname, true);
+       //osrf_prefork_register_routers(global_forker->appname, true);
        signal( SIGUSR1, sigusr1_handler );
 }
 
@@ -718,7 +719,7 @@ static void sigusr1_handler( int sig ) {
 */
 static void sigusr2_handler( int sig ) {
        if (!global_forker) return;
-       osrf_prefork_register_routers(global_forker->appname, false);
+       //osrf_prefork_register_routers(global_forker->appname, false);
        signal( SIGUSR2, sigusr2_handler );
 }
 
@@ -912,8 +913,10 @@ static void prefork_run( prefork_simple* forker ) {
                                continue;
                        }
 
-                       message_prepare_xml( cur_msg );
-                       const char* msg_data = cur_msg->msg_xml;
+                       //message_prepare_xml( cur_msg );
+                       message_prepare_json( cur_msg );
+                       //const char* msg_data = cur_msg->msg_xml;
+                       const char* msg_data = cur_msg->msg_json;
                        if( ! msg_data || ! *msg_data ) {
                                osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
                                        (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
@@ -1001,7 +1004,7 @@ static void prefork_run( prefork_simple* forker ) {
                                osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
                                        cur_child->write_data_fd );
 
-                               const char* msg_data = cur_msg->msg_xml;
+                               const char* msg_data = cur_msg->msg_json;
                                int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                if( written < 0 ) {
                                        // This child appears to be dead or unusable.  Discard it.
@@ -1036,7 +1039,7 @@ static void prefork_run( prefork_simple* forker ) {
                                                osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
                                                        new_child->write_data_fd, new_child->pid );
 
-                                               const char* msg_data = cur_msg->msg_xml;
+                                               const char* msg_data = cur_msg->msg_json;
                                                int written = write(
                                                        new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                                if( written < 0 ) {
@@ -1457,7 +1460,7 @@ static void prefork_clear( prefork_simple* prefork, bool graceful ) {
 
        // always de-register routers before killing child processes (or waiting
        // for them to complete) so that new requests are directed elsewhere.
-       osrf_prefork_register_routers(global_forker->appname, true);
+       //osrf_prefork_register_routers(global_forker->appname, true);
 
        while( prefork->first_child ) {
 
index 684dd0d..3c04061 100644 (file)
@@ -64,7 +64,7 @@ void osrfSystemIgnoreTransportClient() {
 /**
        @brief Bootstrap a generic application from info in the configuration file.
        @param config_file Name of the configuration file.
-       @param contextnode Name of an aggregate within the configuration file, containing the
+       @param connection_type Name of an aggregate within the configuration file, containing the
        relevant subset of configuration stuff.
        @return 1 if successful; zero or -1 if error.
 
@@ -74,8 +74,9 @@ void osrfSystemIgnoreTransportClient() {
 
        A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource.
 */
-int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
-       return osrfSystemBootstrapClientResc(config_file, contextnode, NULL);
+
+int osrf_system_bootstrap_client(const char* config_file, const char* connection_type) {
+    return osrf_system_bootstrap_common(config_file, connection_type, "client", 0);
 }
 
 /**
@@ -185,7 +186,7 @@ int osrf_system_service_ctrl(
         const char* action, const char* service) {
     
     // Load the conguration, open the log, open a connection to Jabber
-    if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) {
+    if (!osrf_system_bootstrap_common(config, context, "client", 0)) {
         osrfLogError(OSRF_LOG_MARK,
             "Unable to bootstrap for host %s from configuration file %s",
             hostname, config);
@@ -327,7 +328,7 @@ int osrf_system_service_ctrl(
 /**
        @brief Bootstrap a generic application from info in the configuration file.
        @param config_file Name of the configuration file.
-       @param contextnode Name of an aggregate within the configuration file, containing the
+       @param connection_type Name of an aggregate within the configuration file, containing the
        relevant subset of configuration stuff.
        @param resource Used to construct a Jabber resource name; may be NULL.
        @return 1 if successful; zero or -1 if error.
@@ -336,8 +337,19 @@ int osrf_system_service_ctrl(
        - Open the log.
        - Open a connection to Jabber.
 */
-int osrfSystemBootstrapClientResc( const char* config_file,
-               const char* contextnode, const char* resource ) {
+int osrfSystemBootstrapClientResc(const char* config_file,
+    const char* connection_type, const char* appname) {
+    return osrf_system_bootstrap_common(config_file, connection_type, appname, 0);
+}
+
+int osrf_system_bootstrap_common(const char* config_file,
+               const char* connection_type, const char* appname, int is_service) {
+
+    if (connection_type == NULL) {
+        osrfLogError(OSRF_LOG_MARK,
+            "osrf_system_bootstrap_common() requires a connection type");
+        return -1;
+    }
 
        int failure = 0;
 
@@ -346,13 +358,13 @@ int osrfSystemBootstrapClientResc( const char* config_file,
                return 1; /* we already have a client connection */
        }
 
-       if( !( config_file && contextnode ) && ! osrfConfigHasDefaultConfig() ) {
+       if( !( config_file && connection_type ) && ! osrfConfigHasDefaultConfig() ) {
                osrfLogError( OSRF_LOG_MARK, "No Config File Specified\n" );
                return -1;
        }
 
        if( config_file ) {
-               osrfConfig* cfg = osrfConfigInit( config_file, contextnode );
+               osrfConfig* cfg = osrfConfigInit(config_file, NULL);
                if(cfg)
                        osrfConfigSetDefaultConfig(cfg);
                else
@@ -360,61 +372,48 @@ int osrfSystemBootstrapClientResc( const char* config_file,
 
                // fetch list of configured log redaction marker strings
                log_protect_arr = osrfNewStringArray(8);
-               osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
-               osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
+               osrfConfigGetValueList(cfg, log_protect_arr, "/config/log_protect/match_string" );
        }
 
-       char* log_file      = osrfConfigGetValue( NULL, "/logfile");
-       if(!log_file) {
+       char* log_file = osrfConfigGetValue(NULL, "/config/connections/%s/logfile", connection_type);
+       if (!log_file) {
                fprintf(stderr, "No log file specified in configuration file %s\n",
                                config_file);
                return -1;
        }
 
-       char* log_level      = osrfConfigGetValue( NULL, "/loglevel" );
-       osrfStringArray* arr = osrfNewStringArray(8);
-       osrfConfigGetValueList(NULL, arr, "/domain");
-
-       char* username       = osrfConfigGetValue( NULL, "/username" );
-       char* password       = osrfConfigGetValue( NULL, "/passwd" );
-       char* port           = osrfConfigGetValue( NULL, "/port" );
-       char* unixpath       = osrfConfigGetValue( NULL, "/unixpath" );
-       char* facility       = osrfConfigGetValue( NULL, "/syslog" );
-       char* actlog         = osrfConfigGetValue( NULL, "/actlog" );
-       char* logtag         = osrfConfigGetValue( NULL, "/logtag" );
+       char* log_level = osrfConfigGetValue(NULL, "/config/connections/%s/loglevel", connection_type);
+       char* username  = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/username", connection_type);
+       char* password  = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/password", connection_type);
+       char* host      = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/host", connection_type);
+       char* port      = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/port", connection_type);
+       char* unixpath  = osrfConfigGetValue(NULL, "/config/connections/%s/message_bus/sock", connection_type);
+       char* facility  = osrfConfigGetValue(NULL, "/config/connections/%s/syslog", connection_type);
+       char* actlog    = osrfConfigGetValue(NULL, "/config/connections/%s/actlog", connection_type);
+       char* logtag    = osrfConfigGetValue(NULL, "/config/connections/%s/logtag", connection_type);
 
        /* if we're a source-client, tell the logger */
-       char* isclient = osrfConfigGetValue(NULL, "/client");
+       char* isclient = osrfConfigGetValue(NULL, "/config/connections/%s/client", connection_type);
        if( isclient && !strcasecmp(isclient,"true") )
                osrfLogSetIsClient(1);
        free(isclient);
 
        int llevel = 0;
        int iport = 0;
-       if(port) iport = atoi(port);
-       if(log_level) llevel = atoi(log_level);
+       if (port) iport = atoi(port);
+       if (log_level) llevel = atoi(log_level);
 
        if(!strcmp(log_file, "syslog")) {
                if(logtag) osrfLogSetLogTag(logtag);
-               osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel );
+               osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel );
                osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility));
                if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog));
 
        } else {
-               osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel );
+               osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel );
                osrfLogSetFile( log_file );
        }
 
-
-       /* Get a domain, if one is specified */
-       const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
-       if(!domain) {
-               fprintf(stderr, "No domain specified in configuration file %s\n", config_file);
-               osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n",
-                               config_file );
-               failure = 1;
-       }
-
        if(!username) {
                fprintf(stderr, "No username specified in configuration file %s\n", config_file);
                osrfLogError( OSRF_LOG_MARK, "No username specified in configuration file %s\n",
@@ -437,7 +436,6 @@ int osrfSystemBootstrapClientResc( const char* config_file,
        }
 
        if (failure) {
-               osrfStringArrayFree(arr);
                free(log_file);
                free(log_level);
                free(username);
@@ -450,30 +448,23 @@ int osrfSystemBootstrapClientResc( const char* config_file,
                return 0;
        }
 
-       osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s",
-               domain, iport, unixpath ? unixpath : "(none)" );
-       transport_client* client = client_init( domain, iport, unixpath, 0 );
-
-       char host[HOST_NAME_MAX + 1] = "";
-       gethostname(host, sizeof(host) );
-       host[HOST_NAME_MAX] = '\0';
+       osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with host %s, port %d, and unixpath %s",
+               host, iport, unixpath ? unixpath : "(none)" );
 
-       char tbuf[32];
-       tbuf[0] = '\0';
-       snprintf(tbuf, 32, "%f", get_timestamp_millis());
+       transport_client* client = client_init(host, iport, unixpath);
 
-       if(!resource) resource = "";
+    if (appname == NULL) { appname = "client"; }
 
-       int len = strlen(resource) + 256;
-       char buf[len];
-       buf[0] = '\0';
-       snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() );
-
-       if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) {
-               osrfGlobalTransportClient = client;
-       }
+    if (is_service) {
+           if (client_connect_as_service(client, appname, username, password)) {
+                   osrfGlobalTransportClient = client;
+           }
+    } else {
+           if (client_connect(client, appname, username, password)) {
+                   osrfGlobalTransportClient = client;
+           }
+    }
 
-       osrfStringArrayFree(arr);
        free(actlog);
        free(facility);
        free(log_level);
index 2a86d0c..5f717ac 100644 (file)
 #include <opensrf/transport_client.h>
 
-/**
-       @file transport_client.c
-       @brief Collection of routines for sending and receiving single messages over Jabber.
+static int handle_redis_error(redisReply* reply, char* command, ...);
 
-       These functions form an API built on top of the transport_session API.  They serve
-       two main purposes:
-       - They remember a Jabber ID to use when sending messages.
-       - They maintain a queue of input messages that the calling code can get one at a time.
-*/
+transport_client* client_init(const char* server, int port, const char* unix_path) {
 
-static void client_message_handler( void* client, transport_message* msg );
+       if(server == NULL) return NULL;
 
-//int main( int argc, char** argv );
+       /* build and clear the client object */
+       transport_client* client = safe_malloc( sizeof( transport_client) );
 
-/*
-int main( int argc, char** argv ) {
+       /* start with an empty message queue */
+       client->bus = NULL;
+       client->stream_name = NULL;
+       client->consumer_name = NULL;
+
+    client->max_queue_size = 1000; // TODO pull from config
+    client->port = port;
+    client->host = server ? strdup(server) : NULL;
+    client->unix_path = unix_path ? strdup(unix_path) : NULL;
+       client->error = 0;
 
-       transport_message* recv;
-       transport_message* send;
+       return client;
+}
 
-       transport_client* client = client_init( "spacely.georgialibraries.org", 5222 );
+int client_connect_with_stream_name(transport_client* client, 
+       const char* username, const char* password) {
 
-       // try to connect, allow 15 second connect timeout
-       if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) {
-               printf("Connected...\n");
-       } else {
-               printf( "NOT Connected...\n" ); exit(99);
-       }
+    osrfLogDebug(OSRF_LOG_MARK, "Transport client connecting with bus "
+        "stream=%s; consumer=%s; host=%s; port=%d; unix_path=%s", 
+        client->stream_name, 
+        client->consumer_name,
+        client->host, 
+        client->port, 
+        client->unix_path
+    );
 
-       while( (recv = client_recv( client, -1 )) ) {
+    // TODO use redisConnectWithTimeout so we can verify connection.
+    if (client->host && client->port) {
+        client->bus = redisConnect(client->host, client->port);
+    } else if (client->unix_path) {
+        client->bus = redisConnectUnix(client->unix_path);
+    }
 
-               if( recv->body ) {
-                       int len = strlen(recv->body);
-                       char buf[len + 20];
-                       osrf_clearbuf( buf, 0, sizeof(buf));
-                       sprintf( buf, "Echoing...%s", recv->body );
-                       send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" );
-               } else {
-                       send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" );
-               }
+    if (client->bus == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance");
+        return 0;
+    }
 
-               if( send == NULL ) { printf("something's wrong"); }
-               client_send_message( client, send );
+    osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK");
 
-               message_free( send );
-               message_free( recv );
-       }
+    osrfLogDebug(OSRF_LOG_MARK, "Sending AUTH with username=%s", username);
 
-       printf( "ended recv loop\n" );
+    redisReply *reply = redisCommand(client->bus, "AUTH %s %s", username, password);
 
-       return 0;
+    if (handle_redis_error(reply, "AUTH %s %s", username, password)) { return 0; }
 
-}
-*/
+    osrfLogDebug(OSRF_LOG_MARK, "Redis AUTH succeeded");
 
+    freeReplyObject(reply);
 
-/**
-       @brief Allocate and initialize a transport_client.
-       @param server Domain name where the Jabber server resides.
-       @param port Port used for connecting to Jabber (0 if using UNIX domain socket).
-       @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket).
-       @param component Boolean; true if we're a Jabber component.
-       @return A pointer to a newly created transport_client.
-
-       Create a transport_client with a transport_session and an empty message queue (but don't
-       open a connection yet).  Install a callback function in the transport_session to enqueue
-       incoming messages.
-
-       The calling code is responsible for freeing the transport_client by calling client_free().
-*/
-transport_client* client_init( const char* server, int port, const char* unix_path, int component ) {
+    // Create our stream + consumer group.
+    // This will produce an error when the group already exists, which
+    // will happen with service-level groups.  Skip error checking.
+    reply = redisCommand(
+        client->bus, 
+        "XGROUP CREATE %s %s $ mkstream", 
+        client->stream_name, 
+        client->stream_name
+    );
 
-       if(server == NULL) return NULL;
+    freeReplyObject(reply);
 
-       /* build and clear the client object */
-       transport_client* client = safe_malloc( sizeof( transport_client) );
+    return 1;
+}
 
-       /* start with an empty message queue */
-       client->msg_q_head = NULL;
-       client->msg_q_tail = NULL;
+int client_connect_as_service(transport_client* client,
+       const char* appname, const char* username, const char* password) {
+       if (client == NULL || appname == NULL) { return 0; }
+    growing_buffer *buf = buffer_init(32);
+    buffer_fadd(buf, "service:%s", appname);
 
-       /* build the session */
-       client->session = init_transport( server, port, unix_path, client, component );
+    // strdup the content, leave the buf alive.
+    client->stream_name = buffer_data(buf);
 
-       client->session->message_callback = client_message_handler;
-       client->error = 0;
-       client->host = strdup(server);
-       client->xmpp_id = NULL;
+    // Add some random stuff to the end of the consumer name, which
+    // has to be unuique per client.
+    char junk[256];
+       snprintf(junk, sizeof(junk), 
+        "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid());
 
-       return client;
+    char* md5 = md5sum(junk);
+
+    buffer_add(buf, ":");
+    buffer_add_n(buf, md5, 12);
+
+    client->consumer_name = buffer_release(buf);
+
+    return client_connect_with_stream_name(client, username, password);
 }
 
+int client_connect(transport_client* client,
+       const char* appname, const char* username, const char* password) {
+       if (client == NULL || appname == NULL) { return 0; }
 
-/**
-       @brief Open a Jabber session for a transport_client.
-       @param client Pointer to the transport_client.
-       @param username Jabber user name.
-       @param password Password for the Jabber logon.
-       @param resource Resource name for the Jabber logon.
-       @param connect_timeout How many seconds to wait for the connection to open.
-       @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes).
-       @return 1 if successful, or 0 upon error.
-
-       Besides opening the Jabber session, create a Jabber ID for future use.
-
-       If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond.  If
-       @a connect_timeout is zero, don't wait at all.  If @a timeout is positive, wait that
-       number of seconds before timing out.  If @a connect_timeout has a negative value other
-       than -1, the results are not well defined.
-
-       The value of @a connect_timeout applies to each of two stages in the logon procedure.
-       Hence the logon may take up to twice the amount of time indicated.
-
-       If we connect as a Jabber component, we send the password as an SHA1 hash.  Otherwise
-       we look at the @a auth_type.  If it's AUTH_PLAIN, we send the password as plaintext; if
-       it's AUTH_DIGEST, we send it as a hash.
- */
-int client_connect( transport_client* client,
-               const char* username, const char* password, const char* resource,
-               int connect_timeout, enum TRANSPORT_AUTH_TYPE  auth_type ) {
-       if( client == NULL )
-               return 0;
-
-       // Create and store a Jabber ID
-       if( client->xmpp_id )
-               free( client->xmpp_id );
-       client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource );
-
-       // Open a transport_session
-       return session_connect( client->session, username,
-                       password, resource, connect_timeout, auth_type );
+    char junk[256];
+       snprintf(junk, sizeof(junk), 
+        "%f.%d%ld", get_timestamp_millis(), (int) time(NULL), (long) getpid());
+
+    char* md5 = md5sum(junk);
+
+    growing_buffer *buf = buffer_init(32);
+    buffer_add(buf, "client:");
+
+    if (strcmp("client", appname) == 0) {
+        // Standalone client
+        buffer_add_n(buf, md5, 12);
+    } else {
+        // Service client client:servicename:junk
+        buffer_fadd(buf, "%s:", appname);
+        buffer_add_n(buf, md5, 12);
+    }
+
+       client->stream_name = buffer_release(buf);
+       client->consumer_name = strdup(client->stream_name);
+
+    free(md5);
+
+    return client_connect_with_stream_name(client, username, password);
 }
 
-/**
-       @brief Disconnect from the Jabber session.
-       @param client Pointer to the transport_client.
-       @return 0 in all cases.
+int client_disconnect(transport_client* client) {
+       if (client == NULL || client->bus == NULL) { return 0; }
 
-       If there are any messages still in the queue, they stay there; i.e. we don't free them here.
-*/
-int client_disconnect( transport_client* client ) {
-       if( client == NULL ) { return 0; }
-       return session_disconnect( client->session );
+    if (strncmp(client->stream_name, "client:", 7) == 0) {
+        // Delete our stream on disconnect if we are a client.
+        // No point in letting it linger.
+        redisReply *reply = 
+            redisCommand(client->bus, "DEL %s", client->stream_name);
+
+        freeReplyObject(reply);
+    }
+
+    redisFree(client->bus);
+    client->bus = NULL;
+    return 1;
 }
 
-/**
-       @brief Report whether a transport_client is connected.
-       @param client Pointer to the transport_client.
-       @return Boolean: 1 if connected, or 0 if not.
-*/
 int client_connected( const transport_client* client ) {
-       if(client == NULL) return 0;
-       return session_connected( client->session );
+       return (client != NULL && client->bus != NULL);
 }
 
-/**
-       @brief Send a transport message to the current destination.
-       @param client Pointer to a transport_client.
-       @param msg Pointer to the transport_message to be sent.
-       @return 0 if successful, or -1 if not.
+int client_send_message(transport_client* client, transport_message* msg) {
+       if (client == NULL || client->error) { return -1; }
 
-       Translate the transport_message into XML and send it to Jabber, using the previously
-       stored Jabber ID for the sender.
-*/
-int client_send_message( transport_client* client, transport_message* msg ) {
-       if( client == NULL || client->error )
-               return -1;
-       if( msg->sender )
-               free( msg->sender );
-       msg->sender = strdup(client->xmpp_id);
-       return session_send_msg( client->session, msg );
+       if (msg->sender) { free(msg->sender); }
+       msg->sender = strdup(client->stream_name);
+
+    message_prepare_json(msg);
+
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "client_send_message() to=%s %s", msg->recipient, msg->msg_json);
+
+    redisReply *reply = redisCommand(client->bus,
+        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+        msg->recipient, 
+        client->max_queue_size,
+        msg->msg_json
+    );
+
+    if (handle_redis_error(reply, 
+        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+        msg->recipient, 
+        client->max_queue_size,
+        msg->msg_json)
+    ) { return -1; }
+
+    osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
+
+    freeReplyObject(reply);
+    
+    return 0;
 }
 
-/**
-       @brief Fetch an input message, if one is available.
-       @param client Pointer to a transport_client.
-       @param timeout How long to wait for a message to arrive, in seconds (see remarks).
-       @return A pointer to a transport_message if successful, or NULL if not.
+// Returns the reply on success, NULL on error
+// On error, the reply is freed.
+static int handle_redis_error(redisReply *reply, char* command, ...) {
 
-       If there is a message already in the queue, return it immediately.  Otherwise read any
-       available messages from the transport_session (subject to a timeout), and return the
-       first one.
+    if (reply != NULL && reply->type != REDIS_REPLY_ERROR) {
+        return 0;
+    }
 
-       If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a
-       message arrives (or we error out for other reasons).  If the value of @a timeout is zero,
-       don't wait at all.
+    VA_LIST_TO_STRING(command);
+    char* err = reply == NULL ? "" : reply->str;
+    osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF);
+    freeReplyObject(reply);
 
-       The calling code is responsible for freeing the transport_message by calling message_free().
-*/
-transport_message* client_recv( transport_client* client, int timeout ) {
-       if( client == NULL ) { return NULL; }
+    return 1;
+}
 
-       int error = 0;  /* boolean */
+/*
+ * Returns at most one allocated char* pulled from the bus or NULL 
+ * if the pop times out or is interrupted.
+ *
+ * The string will be valid JSON string, a partial JSON string, or
+ * a message terminator chararcter.
+ */
+char* recv_one_chunk(transport_client* client, int timeout) {
+       if (client == NULL || client->bus == NULL) { return NULL; }
+
+    redisReply *reply, *tmp;
+    char* json = NULL;
+    char* msg_id = NULL;
+
+    if (timeout != 0) {
+
+        if (timeout == -1) {
+            // Redis timeout 0 means block indefinitely
+            timeout = 0;
+        } else {
+            // Milliseconds
+            timeout *= 1000;
+        }
+
+        reply = redisCommand(client->bus, 
+            "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
+            client->stream_name,
+            client->consumer_name,
+            timeout,
+            client->stream_name
+        );
+
+    } else {
+
+        reply = redisCommand(client->bus, 
+            "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
+            client->stream_name,
+            client->consumer_name,
+            client->stream_name
+        );
+    }
+
+    // Timeout or error
+    if (handle_redis_error(
+        reply,
+        "XREADGROUP GROUP %s %s %s COUNT 1 STREAMS %s >",
+        client->stream_name,
+        client->consumer_name,
+        "BLOCK X",
+        client->stream_name
+    )) { return NULL; }
+
+    // Unpack the XREADGROUP response, which is a nest of arrays.
+    // These arrays are mostly 1 and 2-element lists, since we are 
+    // only reading one item on a single stream.
+    if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
+        tmp = reply->element[0];
+
+        if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+            tmp = tmp->element[1];
+
+            if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
+                tmp = tmp->element[0];
+
+                if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+                    redisReply *r1 = tmp->element[0];
+                    redisReply *r2 = tmp->element[1];
+
+                    if (r1->type == REDIS_REPLY_STRING) {
+                        msg_id = strdup(r1->str);
+                    }
+
+                    if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
+                        // r2->element[0] is the message name, which we
+                        // currently don't use for anything.
+
+                        r2 = r2->element[1];
+
+                        if (r2->type == REDIS_REPLY_STRING) {
+                            json = strdup(r2->str);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    freeReplyObject(reply); // XREADGROUP
+
+    if (msg_id == NULL) {
+        // Read timed out. 'json' will also be NULL.
+        return NULL;
+    }
+
+    reply = redisCommand(client->bus, "XACK %s %s %s", 
+        client->stream_name, client->stream_name, msg_id);
+
+    if (handle_redis_error(
+        reply,
+        "XACK %s %s %s",
+        client->stream_name, 
+        client->stream_name, msg_id
+    )) { return NULL; }
+
+    freeReplyObject(reply); // XACK
+
+    free(msg_id);
+
+    osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
+
+    return json;
+}
 
-       if( NULL == client->msg_q_head ) {
+/// Returns at most one JSON value pulled from the bus or NULL if
+/// the list pop times out or the pop is interrupted by a signal.
+jsonObject* recv_one_value(transport_client* client, int timeout) {
 
-               // No message available on the queue?  Try to get a fresh one.
+    char* json = recv_one_chunk(client, timeout);
 
-               // When we call session_wait(), it reads a socket for new messages.  When it finds
-               // one, it enqueues it by calling the callback function client_message_handler(),
-               // which we installed in the transport_session when we created the transport_client.
+    if (json == NULL) {
+        // recv() timed out.
+        return NULL;
+    }
 
-               // Since a single call to session_wait() may not result in the receipt of a complete
-               // message. we call it repeatedly until we get either a message or an error.
+    jsonObject* obj = jsonParse(json);
 
-               // Alternatively, a single call to session_wait() may result in the receipt of
-               // multiple messages.  That's why we have to enqueue them.
+    if (obj == NULL) {
+        osrfLogWarning(OSRF_LOG_MARK, "Error parsing JSON: %s", json);
+    }
 
-               // The timeout applies to the receipt of a complete message.  For a sufficiently
-               // short timeout, a sufficiently long message, and a sufficiently slow connection,
-               // we could timeout on the first message even though we're still receiving data.
-               
-               // Likewise we could time out while still receiving the second or subsequent message,
-               // return the first message, and resume receiving messages later.
+    free(json);
 
-               if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
+    return obj;
+}
 
-                       int x;
-                       do {
-                               if( (x = session_wait( client->session, -1 )) ) {
-                                       osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
-                                       error = 1;
-                                       break;
-                               }
-                       } while( client->msg_q_head == NULL );
+/**
+ * Returns at most one jsonObject returned from the data bus.
+ *
+ * Keeps trying until a value is returned or the timeout is exceeded.
+ */
+jsonObject* recv_json_value(transport_client* client, int timeout) {
 
-               } else {    /* loop up to 'timeout' seconds waiting for data to arrive  */
+    if (timeout == 0) {
+        return recv_one_value(client, 0);
 
-                       /* This loop assumes that a time_t is denominated in seconds -- not */
-                       /* guaranteed by Standard C, but a fair bet for Linux or UNIX       */
+    } else if (timeout < 0) {
+        // Keep trying until we have a result.
 
-                       time_t start = time(NULL);
-                       time_t remaining = (time_t) timeout;
+        while (1) {
+            jsonObject* obj = recv_one_value(client, timeout);
+            if (obj != NULL) { return obj; }
+        }
+    }
 
-                       int wait_ret;
-                       do {
-                               if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
-                                       error = 1;
-                                       osrfLogDebug(OSRF_LOG_MARK,
-                                               "session_wait returned failure code %d: setting error=1\n", wait_ret);
-                                       break;
-                               }
+    time_t seconds = (time_t) timeout;
 
-                               remaining -= time(NULL) - start;
-                       } while( NULL == client->msg_q_head && remaining > 0 );
-               }
-       }
+    while (seconds > 0) {
 
-       transport_message* msg = NULL;
+        time_t now = time(NULL);
+        jsonObject* obj = recv_one_value(client, timeout);
 
-       if( !error && client->msg_q_head != NULL ) {
-               /* got message(s); dequeue the oldest one */
-               msg = client->msg_q_head;
-               client->msg_q_head = msg->next;
-               msg->next = NULL;  /* shouldn't be necessary; nullify for good hygiene */
-               if( NULL == client->msg_q_head )
-                       client->msg_q_tail = NULL;
-       }
+        if (obj == NULL) {
+            seconds -= now;
+        } else {
+            return obj;
+        }
+    }
 
-       return msg;
+    return NULL;
 }
 
-/**
-       @brief Enqueue a newly received transport_message.
-       @param client A pointer to a transport_client, cast to a void pointer.
-       @param msg A new transport message.
+transport_message* client_recv(transport_client* client, int timeout) {
+       if (client == NULL || client->bus == NULL) { return NULL; }
 
-       Add a newly arrived input message to the tail of the queue.
+    // TODO no need for intermediate to/from JSON.  Create transport
+    // message directly from received JSON object.
+    jsonObject* obj = recv_json_value(client, timeout);
 
-       This is a callback function.  The transport_session parses the XML coming in through a
-       socket, accumulating various bits and pieces.  When it sees the end of a message stanza,
-       it packages the bits and pieces into a transport_message that it passes to this function,
-       which enqueues the message for processing.
-*/
-static void client_message_handler( void* client, transport_message* msg ){
+    if (obj == NULL) { return NULL; } // Receive timed out.
 
-       if(client == NULL) return;
-       if(msg == NULL) return;
+    char* json = jsonObjectToJSON(obj);
 
-       transport_client* cli = (transport_client*) client;
+       transport_message* msg = new_message_from_json(json);
 
-       /* add the new message to the tail of the queue */
-       if( NULL == cli->msg_q_head )
-               cli->msg_q_tail = cli->msg_q_head = msg;
-       else {
-               cli->msg_q_tail->next = msg;
-               cli->msg_q_tail = msg;
-       }
-       msg->next = NULL;
-}
+    free(json);
 
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "client_recv() read response for thread %s", msg->thread);
+
+       return msg;
+}
 
 /**
        @brief Free a transport_client, along with all resources it owns.
@@ -298,10 +396,7 @@ static void client_message_handler( void* client, transport_message* msg ){
        @return 1 if successful, or 0 if not.  The only error condition is if @a client is NULL.
 */
 int client_free( transport_client* client ) {
-       if(client == NULL)
-               return 0;
-       session_free( client->session );
-       client->session = NULL;
+       if (client == NULL) { return 0; }
        return client_discard( client );
 }
 
@@ -315,29 +410,16 @@ int client_free( transport_client* client ) {
        disconnect the parent as well.
  */
 int client_discard( transport_client* client ) {
-       if(client == NULL)
-               return 0;
-       
-       transport_message* current = client->msg_q_head;
-       transport_message* next;
-
-       /* deallocate the list of messages */
-       while( current != NULL ) {
-               next = current->next;
-               message_free( current );
-               current = next;
-       }
-
-       free(client->host);
-       free(client->xmpp_id);
-       free( client );
+
+       if (client == NULL) { return 0; }
+
+       if (client->host != NULL) { free(client->host); }
+       if (client->unix_path != NULL) { free(client->unix_path); }
+       if (client->stream_name != NULL) { free(client->stream_name); }
+       if (client->consumer_name != NULL) { free(client->consumer_name); }
+
+       free(client);
+
        return 1;
 }
 
-int client_sock_fd( transport_client* client )
-{
-       if( !client )
-               return 0;
-       else
-               return client->session->sock_id;
-}
index 332b622..b5d6841 100644 (file)
@@ -1,4 +1,5 @@
 #include <opensrf/transport_message.h>
+#include <opensrf/osrf_json.h>
 
 /**
        @file transport_message.c
@@ -66,11 +67,72 @@ transport_message* message_init( const char* body, const char* subject,
        msg->error_code     = 0;
        msg->broadcast      = 0;
        msg->msg_xml        = NULL;
+    msg->msg_json       = NULL;
        msg->next           = NULL;
 
        return msg;
 }
 
+transport_message* new_message_from_json(const char* msg_json) {
+
+    if (msg_json == NULL || *msg_json == '\0') { return NULL; }
+
+    transport_message* new_msg = safe_malloc(sizeof(transport_message));
+
+    new_msg->body           = NULL;
+    new_msg->subject        = NULL;
+    new_msg->thread         = NULL;
+    new_msg->recipient      = NULL;
+    new_msg->sender         = NULL;
+    new_msg->router_from    = NULL;
+    new_msg->router_to      = NULL;
+    new_msg->router_class   = NULL;
+    new_msg->router_command = NULL;
+    new_msg->osrf_xid       = NULL;
+    new_msg->is_error       = 0;
+    new_msg->error_type     = NULL;
+    new_msg->error_code     = 0;
+    new_msg->broadcast      = 0;
+    new_msg->msg_xml        = NULL;
+    new_msg->next           = NULL;
+
+    jsonObject* json_hash = jsonParse(msg_json);
+
+    if (json_hash == NULL || json_hash->type != JSON_HASH) {
+        osrfLogError(OSRF_LOG_MARK,  "new_message_from_json() received bad JSON");
+        jsonObjectFree(json_hash);
+        message_free(new_msg);
+        return NULL;
+    }
+
+    const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from"));
+    if (sender) { new_msg->sender = strdup((const char*) sender); }
+
+    const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to"));
+    if (recipient) { new_msg->recipient = strdup((const char*) recipient); }
+
+    const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread"));
+    if (thread == NULL) { thread = ""; }
+    new_msg->thread = strdup((const char*) thread);
+
+    const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid"));
+    if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); }
+
+    // TODO
+    // Internally the mesage body is stored as a JSON string
+    // On the wire, it's just part of the message.  We could get
+    // rid if this extra json encode/decode step if we treated
+    // the body as a JSON object internally.
+    const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body"));
+    if (body == NULL) { body = ""; }
+    new_msg->body = strdup((const char*) body);
+
+    jsonObjectFree(json_hash);
+
+    return new_msg;
+}
+
+
 
 /**
        @brief Translate an XML string into a transport_message.
@@ -308,11 +370,36 @@ int message_free( transport_message* msg ){
        free(msg->osrf_xid);
        if( msg->error_type != NULL ) free(msg->error_type);
        if( msg->msg_xml != NULL ) free(msg->msg_xml);
+       if( msg->msg_json != NULL ) free(msg->msg_json);
        free(msg);
        return 1;
 }
 
 
+int message_prepare_json(transport_message* msg) {
+
+    if (!msg) { return 0; }
+    if (msg->msg_json) { return 1; }   /* already done */
+
+    jsonObject* json_hash = jsonNewObject(NULL);
+    jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient));
+    jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender));
+    jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread));
+    jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid));
+
+    // TODO the various layers expect the message body to be a separate
+    // JSON string, but on the bus, the body is just another key 
+    // in the JSON object.
+    jsonObjectSetKey(json_hash, "body", jsonParse(msg->body));
+
+    msg->msg_json = jsonObjectToJSON(json_hash);
+
+    jsonObjectFree(json_hash);
+
+    return 1;
+}
+
+
 /**
        @brief Build a &lt;message&gt; element and store it as a string in the msg_xml member.
        @param msg Pointer to a transport_message.
index 2f8a129..a166518 100644 (file)
@@ -28,10 +28,12 @@ lib/OpenSRF/Transport/Listener.pm
 lib/OpenSRF/Transport/PeerHandle.pm
 lib/OpenSRF/Transport/SlimJabber.pm
 lib/OpenSRF/Transport/SlimJabber/Client.pm
-lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
 lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
 lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
 lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
+lib/OpenSRF/Transport/Redis/Client.pm
+lib/OpenSRF/Transport/Redis/PeerConnection.pm
+lib/OpenSRF/Transport/Redis/Message.pm
 lib/OpenSRF/Utils.pm
 lib/OpenSRF/Utils/Cache.pm
 lib/OpenSRF/Utils/Config.pm
index 603ba3c..0af61f9 100644 (file)
@@ -6,7 +6,6 @@ use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Utils::JSON;
 use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::Utils::SettingsClient;
-use OpenSRF::Utils::Config;
 use OpenSRF::EX;
 use OpenSRF;
 use Exporter;
@@ -208,18 +207,7 @@ sub last_sent_type {
 
 sub get_app_targets {
        my $app = shift;
-
-       my $conf = OpenSRF::Utils::Config->current;
-       my $router_name = $conf->bootstrap->router_name || 'router';
-       my $domain = $conf->bootstrap->domain;
-       $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
-       unless($router_name and $domain) {
-               throw OpenSRF::EX::Config 
-                       ("Missing router config information 'router_name' and 'domain'");
-       }
-
-    return ("$router_name\@$domain/$app");
+    return ("service:$app");
 }
 
 sub stateless {
@@ -578,6 +566,9 @@ sub send {
                }
 
        } 
+
+    # TODO Redis
+    # We can remove this extra layer of JSON round-tripping on the body.
        my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc);
        $logger->internal("AppSession sending doc: $json");
 
index 2ec87ce..8682629 100644 (file)
@@ -12,6 +12,7 @@ use Time::HiRes qw/time/;
 use OpenSRF::EX qw/:try/;
 use Carp;
 use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Config;
 
 sub DESTROY{};
 
@@ -22,7 +23,6 @@ $log = 'OpenSRF::Utils::Logger';
 
 our $in_request = 0;
 our @pending_requests;
-our $shared_conf;
 
 sub package {
        my $self = shift;
@@ -142,8 +142,12 @@ sub handler {
                my $logdata = "CALL: ".$session->service." $method_name ";
                my $redact_params = 0;
                if (@p) {
-                       if (ref($shared_conf->shared->log_protect) eq 'ARRAY') {
-                               foreach my $match_string (@{$shared_conf->shared->log_protect}) {
+
+                       my $conf = OpenSRF::Utils::Config->current->as_hash;
+                       my $protect = $conf->{shared}->{log_protect};
+
+                       if (ref($protect) eq 'ARRAY') {
+                               foreach my $match_string (@$protect) {
                                        if ($method_name =~ /^$match_string/) {
                                                $redact_params = 1;
                                                last;
index 52c53d2..f6a36a4 100644 (file)
@@ -23,6 +23,7 @@ use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::Transport::SlimJabber::Client;
+use Digest::MD5 qw(md5_hex);
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
@@ -341,22 +342,28 @@ sub kill_child {
 sub build_osrf_handle {
     my $self = shift;
 
-    my $conf = OpenSRF::Utils::Config->current;
-    my $username = $conf->bootstrap->username;
-    my $password = $conf->bootstrap->passwd;
-    my $domain = $conf->bootstrap->domain;
-    my $port = $conf->bootstrap->port;
-    my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
+    my $conf = OpenSRF::Utils::Config->current
+        ->as_hash->{connections}->{service}->{message_bus};
+
+    my $port = $conf->{port} || 6379;
+    my $host = $conf->{host} || '127.0.0.1';
+    my $sock = $conf->{sock};
+    my $username = $conf->{username};
+    my $password = $conf->{password};
 
-    $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
+    # Every listener needs a unique consumer name.
+    my $consumer_name = 'service:' . 
+        $self->{service} . ':' . substr(md5_hex($$ . time . rand($$)), 0, 12);
 
     $self->{osrf_handle} =
-        OpenSRF::Transport::SlimJabber::Client->new(
-            username => $username,
-            resource => $resource,
-            password => $password,
-            host => $domain,
+        OpenSRF::Transport::Redis::Client->new(
+            stream_name => "service:" . $self->{service},
+            consumer_name => $consumer_name,
+            host => $host,
             port => $port,
+            sock => $sock,
+            username => $username,
+            password => $password
         );
 
     $self->{osrf_handle}->initialize;
@@ -368,11 +375,12 @@ sub build_osrf_handle {
 # ----------------------------------------------------------------
 sub write_child {
     my($self, $child, $msg) = @_;
-    my $xml = encode_utf8(decode_utf8($msg->to_xml));
+    #my $xml = encode_utf8(decode_utf8($msg->to_xml));
+    my $json = $msg->to_json;
 
     # tell the child how much data to expect, minus the header
     my $write_size;
-    {use bytes; $write_size = length($xml)}
+    {use bytes; $write_size = length($json)}
     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
 
     for (0..2) {
@@ -389,12 +397,12 @@ sub write_child {
         # so the lack of a pid means the child is dead.
         if (!$child->{pid}) {
             $logger->error("server: child is dead in write_child(). ".
-                "unable to send message: $xml");
+                "unable to send message: $json");
             return; # avoid syswrite crash
         }
 
         # send message to child data pipe
-        syswrite($child->{pipe_to_child}, $write_size . $xml);
+        syswrite($child->{pipe_to_child}, $write_size . $json);
 
         last unless $self->{sig_pipe};
         $logger->error("server: got SIGPIPE writing to $child, retrying...");
@@ -591,6 +599,8 @@ sub spawn_child {
 # Sends the register command to the configured routers
 # ----------------------------------------------------------------
 sub register_routers {
+    return; # TODO Redis
+
     my $self = shift;
 
     my $conf = OpenSRF::Utils::Config->current;
@@ -636,6 +646,8 @@ sub register_routers {
 # with.
 # ----------------------------------------------------------------
 sub unregister_routers {
+    return; # TODO Redis
+
     my $self = shift;
     return unless $self->{osrf_handle}->tcp_connected;
 
@@ -696,7 +708,7 @@ sub init {
     my $self = shift;
     my $service = $self->{parent}->{service};
     $0 = "OpenSRF Drone [$service]";
-    OpenSRF::Transport::PeerHandle->construct($service);
+    OpenSRF::Transport::PeerHandle->construct($service, 'service');
     OpenSRF::Application->application_implementation->child_init
         if (OpenSRF::Application->application_implementation->can('child_init'));
 }
@@ -719,15 +731,15 @@ sub run {
         my $orig_name = $0;
         $0 = "$0*";
 
-        # Discard extraneous data from the jabber socket
-        if(!$network->flush_socket()) {
+        # Discard extraneous data from our direct message bus ID.
+        if (!$network->flush_socket()) {
             $logger->error("server: network disconnected!  child dropping request and exiting: $data");
             exit;
         }
 
         my $session = OpenSRF::Transport->handler(
             $self->{parent}->{service},
-            OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+            OpenSRF::Transport::Redis::Message->new(json => $data)
         );
 
         my $recycle = $self->keepalive_loop($session);
index ece232f..895f930 100644 (file)
@@ -25,16 +25,20 @@ $| = 1;
 sub DESTROY {}
 
 sub load_bootstrap_config {
+
     return if OpenSRF::Utils::Config->current;
 
     die "Please provide a bootstrap config file to OpenSRF::System\n"
         unless $bootstrap_config_file;
 
     OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file);
-    OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']);
-    OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper");
-    OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection");
+    OpenSRF::Utils::JSON->register_class_hint(
+        name => 'OpenSRF::Application', hint => 'method', type => 'hash', strip => ['session']);
+    OpenSRF::Transport::PeerHandle->set_peer_client('OpenSRF::Transport::Redis::PeerConnection');
     OpenSRF::Application->server_class('client');
+
+    return; # XXX
+
     # Read in a shared portion of the config file
     # for later use in log parameter redaction
     $OpenSRF::Application::shared_conf = OpenSRF::Utils::Config->load(
@@ -67,9 +71,11 @@ sub bootstrap_client {
 
     my $app = $params{client_name} || "client";
 
+    my $connection_type = $params{connection_type} || 'service';
+
     load_bootstrap_config();
-    OpenSRF::Utils::Logger::set_config();
-    OpenSRF::Transport::PeerHandle->construct($app);
+    OpenSRF::Utils::Logger::set_config(undef, $connection_type);
+    OpenSRF::Transport::PeerHandle->construct($app, $connection_type);
 }
 
 sub connected {
@@ -85,7 +91,7 @@ sub run_service {
     $0 = "OpenSRF Listener [$service]";
 
     # temp connection to use for application initialization
-    OpenSRF::System->bootstrap_client(client_name => "system_client");
+    OpenSRF::System->bootstrap_client(client_name => $service);
 
     my $sclient = OpenSRF::Utils::SettingsClient->new;
     my $getval = sub { $sclient->config_value(apps => $service => @_); };
index 5aeff4d..2bc8849 100644 (file)
@@ -7,7 +7,6 @@ use OpenSRF::Utils::JSON;
 use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::EX qw/:try/;
-use OpenSRF::Transport::SlimJabber::MessageWrapper;
 
 #------------------ 
 # --- These must be implemented by all Transport subclasses
@@ -34,32 +33,9 @@ sub get_msg_envelope { shift()->alert_abstract(); }
 our $message_envelope;
 my $logger = "OpenSRF::Utils::Logger"; 
 
-
-
-=head2 message_envelope( [$envelope] );
-
-Sets the message envelope class that will allow us to extract
-information from the messages we receive from the low 
-level transport
-
-=cut
-
-sub message_envelope {
-       my( $class, $envelope ) = @_;
-       if( $envelope ) {
-               $message_envelope = $envelope;
-               $envelope->use;
-               if( $@ ) {
-                       $logger->error( 
-                                       "Error loading message_envelope: $envelope -> $@", ERROR);
-               }
-       }
-       return $message_envelope;
-}
-
 =head2 handler( $data )
 
-Creates a new MessageWrapper, extracts the remote_id, session_id, and message body
+Creates a new Message, extracts the remote_id, session_id, and message body
 from the message.  Then, creates or retrieves the AppSession object with the session_id and remote_id. 
 Finally, creates the message document from the body of the message and calls
 the handler method on the message document.
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm
new file mode 100644 (file)
index 0000000..446fd5a
--- /dev/null
@@ -0,0 +1,256 @@
+package OpenSRF::Transport::Redis::Client;
+use strict;
+use warnings;
+use Redis;
+use Time::HiRes q/time/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::Redis::Message;
+
+sub new {
+    my ($class, %params) = @_;
+    my $self = bless({}, ref($class) || $class);
+    $self->params(\%params);
+    return $self;
+}
+
+sub redis {
+    my ($self, $redis) = @_;
+    $self->{redis} = $redis if $redis;
+    return $self->{redis};
+}
+
+sub params {
+    my ($self, $params) = @_;
+    $self->{params} = $params if $params;
+    return $self->{params};
+}
+
+sub disconnect {
+    my $self = shift;
+    return unless $self->redis;
+
+    if ($self->stream_name =~ /^client:/) {
+        # Delete our stream since we're the only one using it.  Deleting
+        # the stream also deletes our consumer group.
+        $self->redis->del($self->stream_name);
+    }
+
+    $self->redis->quit;
+    delete $self->{redis};
+}
+
+sub gather { 
+    my $self = shift; 
+    $self->process(0); 
+}
+
+# -------------------------------------------------
+
+sub tcp_connected {
+    my $self = shift;
+    return $self->redis ? 1 : 0;
+}
+
+sub connected {
+    my $self = shift;
+    return $self->tcp_connected;
+}
+
+sub initialize {
+    my $self = shift;
+
+    my $host = $self->params->{host} || ''; 
+    my $port = $self->params->{port} || 0; 
+    my $sock = $self->params->{sock} || ''; 
+    my $username = $self->params->{username}; 
+    my $password = $self->params->{password}; 
+    my $stream_name = $self->params->{stream_name};
+    my $consumer_name = $self->params->{consumer_name};
+    my $max_queue_size = $self->params->{max_queue_size};
+
+    $logger->debug("Redis client connecting: ".
+        "host=$host port=$port sock=$sock username=$username stream_name=$stream_name");
+
+    return 1 if $self->redis; # already connected
+
+    # UNIX socket file takes precedence over host:port.
+    my @connect_args = $sock ? (sock => $sock) : (server => "$host:$port");
+
+    # On disconnect, try to reconnect every 100ms up to 60 seconds.
+    push(@connect_args, (reconnect => 60, every => 100_000));
+
+    $logger->debug("Connecting to bus: @connect_args");
+
+    unless ($self->redis(Redis->new(@connect_args))) {
+        throw OpenSRF::EX::Jabber("Could not connect to Redis bus with @connect_args");
+        return 0;
+    }
+
+    unless ($self->redis->auth($username, $password) eq 'OK') {
+        throw OpenSRF::EX::Jabber("Cannot authenticate with Redis instance user=$username");
+        return 0;
+    }
+
+    $logger->debug("Auth'ed with Redis as $username OK : stream_name=$stream_name");
+
+    eval { 
+        # This gets mad when a stream / group already exists, but 
+        # Listeners share a stream/group name so dupes are possible.
+
+        $self->redis->xgroup(   
+            'create',
+            $stream_name,   # stream name
+            $stream_name,   # group name
+            '$',            # only receive new messages
+            'mkstream'      # create this stream if it's not there.
+        );
+    };
+
+    if ($@) {
+        $logger->info("XGROUP CREATE returned : $@");
+    }
+
+    $self->stream_name($stream_name);
+    $self->consumer_name($consumer_name);
+    $self->max_queue_size($max_queue_size);
+
+    return $self;
+}
+
+sub max_queue_size {
+    my ($self, $max_queue_size) = @_;
+    $self->{max_queue_size} = $max_queue_size if $max_queue_size;
+    return $self->{max_queue_size};
+}
+
+sub stream_name {
+    my ($self, $stream_name) = @_;
+    $self->{stream_name} = $stream_name if $stream_name;
+    return $self->{stream_name};
+}
+
+sub consumer_name {
+    my ($self, $consumer_name) = @_;
+    $self->{consumer_name} = $consumer_name if $consumer_name;
+    return $self->{consumer_name};
+}
+
+
+sub construct {
+    my ($class, $app, $context) = @_;
+    $class->peer_handle($class->new($app, $context)->initialize);
+}
+
+sub send {
+    my $self = shift;
+    my $msg = OpenSRF::Transport::Redis::Message->new(@_);
+    
+    $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body));
+
+    $msg->osrf_xid($logger->get_osrf_xid);
+    $msg->from($self->stream_name);
+
+    my $msg_json = $msg->to_json;
+
+    $logger->internal("send(): to=" . $msg->to . " : $msg_json");
+
+    $self->redis->xadd(
+        $msg->to,                   # recipient == stream name
+        'NOMKSTREAM',
+        'MAXLEN', 
+        '~',                        # maxlen-ish
+        $self->max_queue_size,
+        '*',                        # let Redis generate the ID
+        'message',                  # gotta call it something 
+        $msg_json
+    );
+}
+
+
+
+
+sub process {
+    my ($self, $timeout) = @_;
+
+    $timeout ||= 0;
+
+    # Redis does not support fractional timeouts.
+    $timeout = 1 if ($timeout > 0 && $timeout < 1);
+
+    $timeout = int($timeout);
+
+    unless ($self->redis) {
+        throw OpenSRF::EX::JabberDisconnected 
+            ("This Redis instance is no longer connected to the server ");
+    }
+
+    return $self->recv($timeout);
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+sub recv {
+    my ($self, $timeout) = @_;
+
+    $logger->debug("server: watching for content at " . $self->stream_name);
+
+    my @block;
+    if ($timeout) {
+        # 0 means block indefinitely in Redis
+        $timeout = 0 if $timeout == -1;
+        $timeout *= 1000; # milliseconds
+        @block = (BLOCK => $timeout);
+    }
+
+    my $packet = $self->redis->xreadgroup(
+        GROUP => $self->stream_name,
+        $self->consumer_name,
+        @block,
+        COUNT => 1,
+        STREAMS => $self->stream_name,
+        '>' # new messages only
+    );      
+
+    # Timed out waiting for data.
+    return undef unless defined $packet;
+
+    # TODO make this more self-documenting.  also, too brittle?
+    my $container = $packet->[0]->[1]->[0];
+    my $bus_id = $container->[0];
+    my $json = $container->[1]->[1];
+
+    $logger->internal("recv() $json");
+
+    # TODO putting this here for now -- it may live somewhere else.
+    # Ideally this could happen out of band.
+    # Note if we don't ACK utnil after successfully processing each
+    # message, a malformed message will stay in the pending list.
+    # Consider options.
+    $self->redis->xack($self->stream_name, $self->stream_name, $bus_id);
+
+    my $msg = OpenSRF::Transport::Redis::Message->new(json => $json);
+    $msg->bus_id($bus_id);
+
+    return undef unless $msg;
+
+    $logger->internal("recv() thread=" . $msg->thread);
+
+    # The message body is doubly encoded as JSON to, among other things,
+    # support message chunking.
+    $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body));
+
+    return $msg;
+}
+
+
+sub flush_socket {
+    my $self = shift;
+    # Remove all messages from the stream
+    $self->redis->xtrim($self->stream_name, 'MAXLEN', 0);
+    return 1;
+}
+
+1;
+
+
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm
new file mode 100644 (file)
index 0000000..642880f
--- /dev/null
@@ -0,0 +1,103 @@
+package OpenSRF::Transport::Redis::Message;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+
+sub new {
+    my ($class, %args) = @_;
+    my $self = bless({}, $class);
+
+    if ($args{json}) {
+        $self->from_json($args{json});
+
+    } else {
+        $self->{to} = $args{to} || '';
+        $self->{from} = $args{from} || '';
+        $self->{thread} = $args{thread} || '';
+        $self->{body} = $args{body} || '';
+        $self->{osrf_xid} = $args{osrf_xid} || '';
+        $self->{bus_id} = $args{bus_id} || '';
+    }
+
+    return $self;
+}
+
+sub to {
+    my($self, $to) = @_;
+    $self->{to} = $to if defined $to;
+    return $self->{to};
+}
+sub from {
+    my($self, $from) = @_;
+    $self->{from} = $from if defined $from;
+    return $self->{from};
+}
+sub thread {
+    my($self, $thread) = @_;
+    $self->{thread} = $thread if defined $thread;
+    return $self->{thread};
+}
+sub body {
+    my($self, $body) = @_;
+    $self->{body} = $body if defined $body;
+    return $self->{body};
+}
+
+sub status {
+    my($self, $status) = @_;
+    $self->{status} = $status if defined $status;
+    return $self->{status};
+}
+sub type {
+    my($self, $type) = @_;
+    $self->{type} = $type if defined $type;
+    return $self->{type};
+}
+
+sub err_type {}
+sub err_code {}
+
+sub osrf_xid {
+    my($self, $osrf_xid) = @_;
+    $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+    return $self->{osrf_xid};
+}
+
+sub bus_id {
+    my($self, $bus_id) = @_;
+    $self->{bus_id} = $bus_id if defined $bus_id;
+    return $self->{bus_id};
+}
+
+sub to_json {
+    my $self = shift;
+
+    # No nead to encode the bus_id in outbound messages since the ID
+    # won't exist yet.
+    return OpenSRF::Utils::JSON->perl2JSON({
+        to => $self->{to},
+        from => $self->{from},
+        osrf_xid => $self->{osrf_xid},
+        thread => $self->{thread},
+        body => $self->{body}
+    });
+}
+
+sub from_json {
+    my $self = shift;
+    my $json = shift;
+    my $hash;
+
+    eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); };
+
+    if ($@) {
+        $logger->error("Redis::Message received invalid JSON: $@ : $json");
+        return undef;
+    }
+
+    $self->{$_} = $hash->{$_} for keys %$hash;
+}
+
+1;
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm
new file mode 100644 (file)
index 0000000..05e8aec
--- /dev/null
@@ -0,0 +1,83 @@
+package OpenSRF::Transport::Redis::PeerConnection;
+use strict;
+use base qw/OpenSRF::Transport::Redis::Client/;
+use Digest::MD5 qw(md5_hex);
+use OpenSRF::Utils::Logger qw/$logger/;
+
+our $_singleton_connection;
+
+sub retrieve {
+    my ($class, $app) = @_;
+    return $_singleton_connection;
+}
+
+sub reset {
+    return unless $_singleton_connection;
+    $_singleton_connection->disconnect;
+    $_singleton_connection = undef;
+}
+
+
+sub new {
+    my ($class, $app, $connection_type) = @_;
+
+    my $peer_con = $class->retrieve;
+    return $peer_con if ($peer_con and $peer_con->tcp_connected);
+
+    my $conf = OpenSRF::Utils::Config->current->as_hash;
+
+    $conf = $conf->{connections} or
+        die "No 'connections' block in bootstrap configuration\n";
+
+    $conf = $conf->{$connection_type} or
+        die "No '$connection_type' connection in bootstrap configuration\n";
+
+    $conf = $conf->{message_bus};
+
+    my $port = $conf->{port} || 6379;
+    my $host = $conf->{host} || '127.0.0.1';
+    my $sock = $conf->{sock};
+    my $username = $conf->{username};
+    my $password = $conf->{password};
+    my $maxlen = $conf->{max_queue_size} || 1000;
+
+    my $stream_name = $app eq 'client' ? 'client:' : "client:$app:";
+    $stream_name .= substr(md5_hex($$ . time . rand($$)), 0, 12);
+
+    $logger->debug("PeerConnection::new() ".
+        "using app=$app username=$username stream_name=$stream_name");
+
+    my $self = $class->SUPER::new(
+        host => $host,
+        port => $port,
+        sock => $sock,
+        username => $username,
+        password => $password,
+        stream_name => $stream_name,
+        consumer_name => $stream_name,
+        max_queue_size => $maxlen
+    );
+
+    bless($self, $class);
+
+    $self->app($app);
+
+    return $_singleton_connection = $self;
+}
+
+sub process {
+    my $self = shift;
+    my $val = $self->SUPER::process(@_);
+    return 0 unless $val;
+    return OpenSRF::Transport->handler($self->app, $val);
+}
+
+sub app {
+    my $self = shift;
+    my $app = shift;
+    $self->{app} = $app if $app;
+    return $self->{app};
+}
+
+1;
+
index a1742e8..aa460b5 100644 (file)
@@ -11,6 +11,4 @@ classes for handling transport layer messaging
 
 sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; }
 
-sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; }
-
 1;
diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
deleted file mode 100644 (file)
index 0fa95c5..0000000
+++ /dev/null
@@ -1,72 +0,0 @@
-package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use strict; use warnings;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
-
-# ----------------------------------------------------------
-# Legacy wrapper for XMPPMessage
-# ----------------------------------------------------------
-
-sub new {
-       my $class = shift;
-    my $msg = shift;
-    return bless({msg => $msg}, ref($class) || $class);
-}
-
-sub msg {
-    my($self, $msg) = @_;
-    $self->{msg} = $msg if $msg;
-    return $self->{msg};
-}
-
-sub toString {
-    return $_[0]->msg->to_xml;
-}
-
-sub get_body {
-    return $_[0]->msg->body;
-}
-
-sub get_sess_id {
-    return $_[0]->msg->thread;
-}
-
-sub get_msg_type {
-    return $_[0]->msg->type;
-}
-
-sub get_remote_id {
-    return $_[0]->msg->from;
-}
-
-sub setType {
-    $_[0]->msg->type(shift());
-}
-
-sub setTo {
-    $_[0]->msg->to(shift());
-}
-
-sub setThread {
-    $_[0]->msg->thread(shift());
-}
-
-sub setBody {
-    $_[0]->msg->body(shift());
-}
-
-sub set_router_command {
-    $_[0]->msg->router_command(shift());
-}
-sub set_router_class {
-    $_[0]->msg->router_class(shift());
-}
-
-sub set_osrf_xid {
-    $_[0]->msg->osrf_xid(shift());
-}
-
-sub get_osrf_xid {
-   return $_[0]->msg->osrf_xid;
-}
-
-1;
index 5553dfb..6f0c3a5 100644 (file)
@@ -52,6 +52,7 @@ use XML::LibXML;
 use OpenSRF::Utils (':common');  
 use OpenSRF::Utils::Logger;
 use Net::Domain qw/hostfqdn/;
+use XML::Simple;
 
 #use overload '""' => \&OpenSRF::Utils::Config::dump_ini;
 
@@ -267,6 +268,8 @@ sub _load {
        $self->mangle_dirs();
        $self->mangle_logs();
 
+    $self->as_hash(XML::Simple->new->XMLin($self->FILE));
+
        $OpenSRF::Utils::ConfigCache = $self unless $self->nocache;
        delete $$self{nocache};
        delete $$self{force};
@@ -274,6 +277,12 @@ sub _load {
        return $self;
 }
 
+sub as_hash {
+    my ($self, $hash) = @_;
+    $self->{as_hash} = $hash if $hash;
+    return $self->{as_hash};
+}
+
 sub sections {
        my $self = shift;
        my %filters = @_;
index 6a662ac..91e0f0e 100644 (file)
@@ -55,13 +55,18 @@ sub INTERNAL { return 5; }
 sub ALL      { return 100; }
 
 my $isclient;  # true if we control the osrf_xid
+my $connection_type;
 
 # load up our config options
 sub set_config {
     my $force = shift;
+    my $con_type = shift;
+    $connection_type = $con_type if $con_type;
 
     return if defined $config and !$force;
 
+    die "Logger connection type needed\n" unless $connection_type;
+
     $config = OpenSRF::Utils::Config->current;
     if( !defined($config) ) {
         $loglevel = INFO();
@@ -69,19 +74,21 @@ sub set_config {
         return;
     }
 
-    $loglevel =  $config->bootstrap->loglevel; 
+    $config = $config->as_hash->{connections}->{$connection_type};
+
+    $loglevel =  $config->{loglevel};
 
-    if ($config->bootstrap->loglength) {
+    if ($config->{loglength}) {
         $max_log_msg_len = $config->bootstrap->loglength;
     }
 
-    $service_tag = $config->bootstrap->logtag;
+    $service_tag = $config->{logtag};
 
-    $logfile = $config->bootstrap->logfile;
+    $logfile = $config->{logfile};
     if($logfile =~ /^syslog/) {
         $syslog_enabled = 1;
         $logfile_enabled = 0;
-        $logfile = $config->bootstrap->syslog;
+        $logfile = $config->{syslog};
         $facility = $logfile;
         $logfile = undef;
         $facility = _fac_to_const($facility);
@@ -100,7 +107,7 @@ sub set_config {
         # --------------------------------------------------------------
         $act_syslog_enabled = 1;
         $act_logfile_enabled = 0;
-        $actfac = $config->bootstrap->actlog || $config->bootstrap->syslog;
+        $actfac = $config->{actlog} || $config->{syslog};
         $actfac = _fac_to_const($actfac);
         $actfile = undef;
     } else {
@@ -110,10 +117,10 @@ sub set_config {
         # --------------------------------------------------------------
         $act_syslog_enabled = 0;
         $act_logfile_enabled = 1;
-        $actfile = $config->bootstrap->actlog || $config->bootstrap->logfile;
+        $actfile = $config->{actlog} || $config->{logfile};
     }
 
-    my $client = OpenSRF::Utils::Config->current->bootstrap->client();
+    my $client = $config->{client} || '';
 
     if ($ENV{OSRF_LOG_CLIENT} or $ENV{MOD_PERL}) {
         $isclient = 1;
@@ -124,6 +131,7 @@ sub set_config {
         $isclient = 0;
         return;
     }
+
     $isclient = ($client =~ /^true$/iog) ?  1 : 0;
 }
 
index 1887bac..02fbe88 100644 (file)
@@ -111,7 +111,7 @@ int main( int argc, char* argv[] ) {
        snprintf(fbuf, sizeof(fbuf), "%s/.srfsh.xml", home);
        
        if(!access(fbuf, R_OK)) {
-               if( ! osrf_system_bootstrap_client(fbuf, "srfsh") ) {
+               if( ! osrf_system_bootstrap_common(fbuf, "srfsh", "srfsh", 0) ) {
                        fprintf(stderr,"Unable to bootstrap client for requests\n");
                        osrfLogError( OSRF_LOG_MARK,  "Unable to bootstrap client for requests");
                        return -1;
index 0b83782..cc4f48e 100644 (file)
 #include <string.h>
 #include <signal.h>
 #include <opensrf/utils.h>
+#include <opensrf/osrfConfig.h>
 #include <opensrf/osrf_hash.h>
 #include <opensrf/transport_client.h>
 #include <opensrf/osrf_message.h>
 #include <opensrf/osrf_app_session.h>
 #include <opensrf/log.h>
+#include <opensrf/string_array.h>
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 256
 // opportunity, at which point force-close the connection.
 #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
 
-// Incremented with every REQUEST, decremented with every COMPLETE.
-static int requests_in_flight = 0;
-
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml";
 static char* config_ctxt = "gateway";
-static char* osrf_router = NULL;
-static char* osrf_domain = NULL;
 
 // Cache of opensrf thread strings and back-end receipients.
 // Tracking this here means the caller only needs to track the thread.
 // It also means we don't have to expose internal XMPP IDs
 static osrfHash* stateful_session_cache = NULL;
+
+// Tracks threads that have active requests in flight.
+// This covers all request types regardless of connected-ness.
+static osrfStringArray* active_threads = NULL;
+static osrfStringArray* public_services = NULL;
+
 // Message on STDIN go into our reusable buffer
 static growing_buffer* stdin_buf = NULL;
 // OpenSRF XMPP connection handle
@@ -100,7 +103,7 @@ static void relay_stdin_message(const char*);
 static char* extract_inbound_messages();
 static void log_request(const char*, osrfMessage*);
 static void read_from_osrf();
-static void read_one_osrf_message(transport_message*);
+static int read_one_osrf_message(transport_message*);
 static int shut_it_down(int);
 static void release_hash_string(char*, void*);
 static int can_shutdown_gracefully();
@@ -133,20 +136,20 @@ int main(int argc, char* argv[]) {
     // (replies returning to the websocket client).
     fd_set fds;
     int stdin_no = fileno(stdin);
-    int osrf_no = osrf_handle->session->sock_id;
-    int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    int maxfd = stdin_no;
     int sel_resp;
     int shutdown_stat;
+    struct timeval tv;
 
     while (1) {
 
         FD_ZERO(&fds);
-        FD_SET(osrf_no, &fds);
+       // FD_SET(osrf_no, &fds);
         FD_SET(stdin_no, &fds);
 
         if (shutdown_requested) {
 
-            struct timeval tv;
             tv.tv_usec = 0;
             tv.tv_sec = SHUTDOWN_POLL_INTERVAL_SECONDS;
     
@@ -155,9 +158,20 @@ int main(int argc, char* argv[]) {
 
         } else {
 
-            // Wait indefinitely for activity to process.
-            // This will be interrupted during a shutdown request signal.
-            sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+            if (active_threads->size > 0) {
+                tv.tv_usec = 0;
+                tv.tv_sec = 0;
+                
+                // Do a non-blocking check for inbound requests while
+                // we wait for more osrf data to be returned.
+                sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
+
+            } else {
+
+                // No osrf responses pending.  Wait indefinitely.
+                // This will be interrupted during a shutdown request signal.
+                sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+            }
         }
 
         if (sel_resp < 0) { // error
@@ -172,17 +186,18 @@ int main(int argc, char* argv[]) {
                 "WS select() failed with [%s]. Exiting", strerror(errno));
 
             shut_it_down(1);
-        }
 
-        if (sel_resp > 0) {
+        } else if (sel_resp > 0) {
 
             if (FD_ISSET(stdin_no, &fds)) {
                 read_from_stdin();
-            }
-
-            if (FD_ISSET(osrf_no, &fds)) {
                 read_from_osrf();
             }
+
+        } else if (active_threads->size > 0) {
+            // Nothing pulled from the websocket, but we still have
+            // active osrf request.  See if any new responses have arrived.
+            read_from_osrf();
         }
 
         if (shutdown_requested) {
@@ -213,14 +228,13 @@ static int can_shutdown_gracefully() {
         return -1;
     }
 
-    unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
-    if (active_sessions == 0 && requests_in_flight == 0) {
+    if (active_threads->size == 0) {
         osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
         return 1;
     }
 
     osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " 
-        "sessions=%d requests=%d", active_sessions, requests_in_flight);
+        "active threeds=%d", active_threads);
 
     return 0;
 }
@@ -236,6 +250,8 @@ static void rebuild_stdin_buffer() {
 
 static int shut_it_down(int stat) {
     osrfHashFree(stateful_session_cache);
+    osrfStringArrayFree(active_threads);
+    osrfStringArrayFree(public_services);
     buffer_free(stdin_buf);
     osrf_system_shutdown(); // clean XMPP disconnect
     exit(stat);
@@ -251,20 +267,22 @@ static void child_init(int argc, char* argv[]) {
         config_file = argv[1];
     }
 
-    if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
+    if (!osrf_system_bootstrap_common(config_file, config_ctxt, "websocket", 0) ) {
         fprintf(stderr, "Cannot boostrap OSRF\n");
         shut_it_down(1);
     }
 
-       osrf_handle = osrfSystemGetTransportClient();
-       osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
-
-    osrf_router = osrfConfigGetValue(NULL, "/router_name");
-    osrf_domain = osrfConfigGetValue(NULL, "/domain");
+    osrf_handle = osrfSystemGetTransportClient();
+    osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
 
     stateful_session_cache = osrfNewHash();
     osrfHashSetCallback(stateful_session_cache, release_hash_string);
 
+    active_threads = osrfNewStringArray(16);
+    public_services = osrfNewStringArray(16);
+
+    osrfConfigGetValueList(NULL, public_services, "/config/public_services/service");
+
     client_ip = getenv("REMOTE_ADDR");
     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
 }
@@ -424,11 +442,16 @@ static void relay_stdin_message(const char* msg_string) {
     if (!recipient) {
 
         if (service) {
-            int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
-                "%s@%s/%s", osrf_router, osrf_domain, service);
-            recipient_buf[size] = '\0';
+            size_t len = 9 + strlen(service); // service:$name
+            snprintf(recipient_buf, len, "service:%s", service);
             recipient = recipient_buf;
 
+            if (!osrfStringArrayContains(public_services, service)) {
+                osrfLogWarning(OSRF_LOG_MARK, 
+                    "Request for private or unknown service '%s' forbidden", service);
+                return;
+            }
+
         } else {
             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
             return;
@@ -494,11 +517,16 @@ static char* extract_inbound_messages(
         switch (msg->m_type) {
 
             case CONNECT:
+                if (!osrfStringArrayContains(active_threads, thread)) {
+                    osrfStringArrayAdd(active_threads, thread);
+                }
                 break;
 
             case REQUEST:
                 log_request(service, msg);
-                requests_in_flight++;
+                if (!osrfStringArrayContains(active_threads, thread)) {
+                    osrfStringArrayAdd(active_threads, thread);
+                }
                 break;
 
             case DISCONNECT:
@@ -568,29 +596,47 @@ static void read_from_osrf() {
     transport_message* tmsg = NULL;
 
     // Double check the socket connection before continuing.
-    if (!client_connected(osrf_handle) ||
-        !socket_connected(osrf_handle->session->sock_id)) {
+    if (!client_connected(osrf_handle)) {
         osrfLogWarning(OSRF_LOG_MARK,
             "WS: Jabber socket disconnected, exiting");
         shut_it_down(1);
     }
 
+
     // Once client_recv is called all data waiting on the socket is
     // read.  This means we can't return to the main select() loop after
     // each message, because any subsequent messages will get stuck in
     // the opensrf receive queue. Process all available messages.
-    while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+
+
+    // As long as any active requests are in flight, wait up to one
+    // second to receive a response.  Then return to inspect stdin
+    // to see if there are any requests waiting we can push through.
+    // Then come back here.
+    while (1) {
+        int timeout = active_threads->size > 0 ? 1 : 0;
+
+        tmsg = client_recv(osrf_handle, timeout);
+
+        if (!tmsg) { break; }
+
         read_one_osrf_message(tmsg);
+
+        osrfLogDebug(OSRF_LOG_MARK,
+            "WS relaying message to STDOUT thread=%s, recipient=%s",
+             tmsg->thread, tmsg->recipient);
+
         message_free(tmsg);
     }
 }
 
 // Process a single OpenSRF response message and print the reponse
 // to STDOUT for delivery to the websocket client.
-static void read_one_osrf_message(transport_message* tmsg) {
+static int read_one_osrf_message(transport_message* tmsg) {
     osrfList *msg_list = NULL;
     osrfMessage *one_msg = NULL;
     int i;
+    int complete = 0;
 
     osrfLogDebug(OSRF_LOG_MARK,
         "WS received opensrf response for thread=%s", tmsg->thread);
@@ -637,14 +683,17 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
             } else {
 
-                // connection timed out; clear the cached recipient
-                if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+                // Any error conditions ends the conversation
+                if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) {
                     osrfHashRemove(stateful_session_cache, tmsg->thread);
+                    osrfStringArrayRemove(active_threads, tmsg->thread);
 
                 } else {
 
                     if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
-                        requests_in_flight--;
+                        osrfLogInternal(OSRF_LOG_MARK, 
+                            "WS Marking request complete for thread %s", tmsg->thread);
+                        osrfStringArrayRemove(active_threads, tmsg->thread);
                     }
                 }
             }
@@ -682,6 +731,8 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
     free(msg_string);
     jsonObjectFree(msg_wrapper);
+
+    return complete;
 }