Perl / C Redis
authorBill Erickson <berickxx@gmail.com>
Thu, 17 Nov 2022 23:09:16 +0000 (18:09 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 20 Apr 2023 14:18:05 +0000 (10:18 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
27 files changed:
Makefile.am
bin/opensrf-perl.pl.in
configure.ac
include/opensrf/osrf_message.h
include/opensrf/osrf_system.h
include/opensrf/transport_client.h
include/opensrf/transport_connection.h [new file with mode: 0644]
include/opensrf/transport_message.h
src/Makefile.am
src/gateway/Makefile.am
src/libopensrf/Makefile.am
src/libopensrf/osrf_app_session.c
src/libopensrf/osrf_prefork.c
src/libopensrf/osrf_system.c
src/libopensrf/transport_client.c
src/libopensrf/transport_connection.c [new file with mode: 0644]
src/libopensrf/transport_message.c
src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/System.pm
src/perl/lib/OpenSRF/Transport.pm
src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/Redis/Client.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/Redis/Message.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/SlimJabber.pm
src/websocket-stdio/osrf-websocket-stdio.c

index bf1effe..b1bec86 100644 (file)
@@ -19,11 +19,12 @@ endif
 export PREFIX                   = @prefix@
 export TMP                      = @TMP@
 export LIBXML2_HEADERS          = @LIBXML2_HEADERS@
+export HIREDIS_HEADERS          = @HIREDIS_HEADERS@
 export APR_HEADERS              = @APR_HEADERS@
 export ETCDIR                   = @sysconfdir@
 export APXS2                    = @APXS2@
 export APACHE2_HEADERS          = @APACHE2_HEADERS@
-export DEF_CFLAGS               = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@
+export DEF_CFLAGS               = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@
 export DEF_LDLIBS               = -lopensrf
 export VAR                      = @localstatedir@
 export PID                      = @localstatedir@/run/opensrf
@@ -99,6 +100,7 @@ opensrfinclude_HEADERS = $(OSRFINC)/log.h \
        $(OSRFINC)/transport_client.h \
        $(OSRFINC)/transport_message.h \
        $(OSRFINC)/transport_session.h \
+       $(OSRFINC)/transport_connection.h \
        $(OSRFINC)/utils.h \
        $(OSRFINC)/xml_utils.h \
        src/gateway/apachetools.h
index b2bced6..1c34ad8 100755 (executable)
@@ -25,6 +25,7 @@ use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Transport::Listener;
 use OpenSRF::Utils;
 use OpenSRF::Utils::Config;
+use Redis;
 
 my $opt_service = undef;
 my $opt_config = "@CONF_DIR@/opensrf_core.xml";
@@ -62,6 +63,7 @@ my $opt_reload_all = 0;
 my $opt_quiet = 0;
 my $opt_diagnostic = 0;
 my $opt_ignore_orphans = 0;
+my $opt_reset_message_bus = 0;
 my $sclient;
 my @perl_services;
 my @nonperl_services;
@@ -104,8 +106,10 @@ GetOptions(
     'reload' => \$opt_reload,
     'reload-all' => \$opt_reload_all,
     'diagnostic' => \$opt_diagnostic,
+    'reset-message-bus' => \$opt_reset_message_bus,
     'ignore-orphans' => \$opt_ignore_orphans
-);
+) || warn "\n$!\n" && exit(1);
+
 
 if ($opt_localhost) {
     $hostname = 'localhost';
@@ -295,6 +299,7 @@ sub do_diagnostic {
 
 
 sub do_start_router {
+    return;
 
     my $pidfile = get_pid_file('router');
     `opensrf_router $opt_config routers $pidfile`;
@@ -328,7 +333,7 @@ sub do_init {
     my $apps = $sclient->config_value("activeapps", "appname");
 
     # disconnect the top-level network handle
-    OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+    OpenSRF::Transport::PeerHandle->retrieve->reset;
 
     if($apps) {
         $apps = [$apps] unless ref $apps;
@@ -412,7 +417,7 @@ sub do_start {
 
 sub do_start_all {
     msg("starting router and services for $hostname");
-    do_start('router');
+    #do_start('router');
     return do_start_services();
 }
 
@@ -569,7 +574,7 @@ sub do_stop_all {
 
     # graceful shutdown requires the presence of the router, so stop the 
     # router last.  See if it's running first to avoid unnecessary warnings.
-    do_stop('router', $signals[0]) if get_service_pids_from_file('router'); 
+    #do_stop('router', $signals[0]) if get_service_pids_from_file('router'); 
 
     return 1;
 }
@@ -607,6 +612,58 @@ sub load_settings {
         $parser->get_server_config($conf->env->hostname);
 }
 
+# Clear and reset permissions on the bus'es linked to our domains.
+sub do_reset_message_bus {
+
+    OpenSRF::Utils::Config->load(config_file => $opt_config);
+    my $conf = OpenSRF::Utils::Config->current;
+
+    my $routers = $conf->bootstrap->routers;
+
+    # TODO pull logins for all clients in the conf, including
+    # gateway and router.
+    for my $router (@{$conf->bootstrap->routers}) {
+
+        my $domain = ref $router ? $router->{domain} : $router;
+        my $port = $conf->bootstrap->port;
+
+        # This redis connection uses the "default" account, which has
+        # access to all actions and keys so it can act as the admin.
+        my @connect_args = (server => "$domain:$port");
+
+        my $redis = Redis->new(@connect_args) or 
+            die "Cannot connect to Redis instance at @connect_args\n";
+
+        # Clear all the data
+        msg("Clearing all data from message bus: @connect_args");
+        $redis->flushall;
+
+        my $username = $conf->bootstrap->username;
+        my $password = $conf->bootstrap->passwd;
+
+        msg("Applying bus access for $username");
+
+        $redis->acl('SETUSER', $username, 'reset');
+        $redis->acl('SETUSER', $username, 'on', ">$password");
+
+        my @perms = qw/
+            -@all 
+            +xgroup 
+            +xadd 
+            +xreadgroup 
+            +xtrim 
+            +del 
+            ~opensrf:router:*
+            ~opensrf:service:*
+            ~opensrf:client:*
+        /;
+
+        $redis->acl('SETUSER', $username, @perms);
+
+        $redis->quit;
+    }
+}
+
 sub msg {
     my $m = shift;
     print "* $m\n" unless $opt_quiet;
@@ -753,10 +810,16 @@ sub do_help {
         and gracefully re-launch drone processes.  The -all variant sends
         the signal to all services.  The non-(-all) variant requires a
         --service.
+
+    --reset-message-bus
+        Clear ALL data from the message bus, create opensrf accounts w/ 
+        permission to access message queues.
 HELP
 exit;
 }
 
+do_reset_message_bus() if $opt_reset_message_bus;
+
 # we do not verify services for stop/signal actions, since those may
 # legitimately be used against services not (or no longer) configured
 # to run on the selected host.
@@ -808,6 +871,7 @@ do_diagnostic() if $opt_diagnostic;
 
 # show help if no action was requested
 do_help() if $opt_help or not (
+    $opt_reset_message_bus or
     $opt_start or 
     $opt_start_all or 
     $opt_start_services or 
index c037c21..4a9a1fe 100644 (file)
@@ -208,6 +208,18 @@ AC_ARG_WITH([libxml],
 [LIBXML2_HEADERS=/usr/include/libxml2/])
 AC_SUBST([LIBXML2_HEADERS])
 
+AC_ARG_WITH([hiredis],
+[  --with-hiredis=path               location of the hiredis headers (default is /usr/include/hiredis/))],
+[HIREDIS_HEADERS=${withval}],
+[HIREDIS_HEADERS=/usr/include/hiredis/])
+AC_SUBST([HIREDIS_HEADERS])
+
+AC_ARG_WITH([fyaml],
+[  --with-fyaml=path               location of the fyaml headers (default is /usr/include/))],
+[FYAML_HEADERS=${withval}],
+[FYAML_HEADERS=/usr/include/])
+AC_SUBST([FYAML_HEADERS])
+
 AC_ARG_WITH([includes],
 [  --with-includes=DIRECTORIES      a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
 [EXTRA_USER_INCLUDES=${withval}])
@@ -274,6 +286,8 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
        AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers))
        AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
        AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
+    AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
+    AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml))
        # Check for libmemcached and set flags accordingly
        PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
        AC_SUBST(memcached_CFLAGS)
@@ -323,7 +337,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
                         src/libopensrf/Makefile
                         src/perl/Makefile
                         src/ports/strn_compat/Makefile
-                        src/router/Makefile
                         src/srfsh/Makefile
                         src/websocket-stdio/Makefile
                         tests/Makefile
@@ -343,5 +356,7 @@ AC_MSG_RESULT([--------------------- Configuration options:  -------------------
         AC_MSG_RESULT(APR headers location:            ${APR_HEADERS})
         AC_MSG_RESULT(Apache version:                  ${APACHE_READABLE_VERSION})
         AC_MSG_RESULT(libxml2 headers location:        ${LIBXML2_HEADERS})
+        AC_MSG_RESULT(libhiredis headers location:     ${HIREDIS_HEADERS})
+        AC_MSG_RESULT(libfyaml headers location:       ${FYAML_HEADERS})
 
 AC_MSG_RESULT([----------------------------------------------------------------------])
index 1785adf..874c310 100644 (file)
@@ -53,6 +53,7 @@ extern "C" {
 
 #define OSRF_STATUS_INTERNALSERVERERROR  500
 #define OSRF_STATUS_NOTIMPLEMENTED       501
+#define OSRF_STATUS_SERVICEUNAVAILABLE   503
 #define OSRF_STATUS_VERSIONNOTSUPPORTED  505
 
 
index 18d8c85..cab0f8c 100644 (file)
@@ -19,10 +19,16 @@ extern "C" {
 
 void osrfSystemSetPidFile( const char* name );
 
-int osrf_system_bootstrap_client( char* config_file, char* contextnode );
+int osrf_system_bootstrap_common(const char* config_file, 
+    const char* contextnode, const char* appname, int is_service);
 
-int osrfSystemBootstrapClientResc( const char* config_file,
-               const char* contextnode, const char* resource );
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode);
+
+int osrf_system_bootstrap_service(
+    const char* config_file, const char* contextnode, const char* appname);
+
+int osrfSystemBootstrapClientResc(const char* config_file, 
+    const char* contextnode, const char* appname);
 
 int osrfSystemBootstrap( const char* hostname, const char* configfile,
                const char* contextNode );
index 4307af9..8d58e45 100644 (file)
@@ -11,6 +11,7 @@
 
 #include <time.h>
 #include <opensrf/transport_session.h>
+#include <opensrf/transport_connection.h>
 #include <opensrf/utils.h>
 #include <opensrf/log.h>
 
@@ -27,20 +28,25 @@ struct message_list_struct;
        a Jabber ID for outgoing messages.
 */
 struct transport_client_struct {
-       transport_message* msg_q_head;   /**< Head of message queue */
-       transport_message* msg_q_tail;   /**< Tail of message queue */
-       transport_session* session;      /**< Manages lower-level message processing */
+    char* primary_domain;
+    char* service; // NULL if this is a standalone client.
+    char* service_address; // NULL if this is a standalone client.
+    osrfHash* connections;
+
+    int port;
+    char* username;
+    char* password;
+    transport_con* primary_connection;
+
        int error;                       /**< Boolean: true if an error has occurred */
-       char* host;                      /**< Domain name or IP address of the Jabber server */
-       char* xmpp_id;                   /**< Jabber ID used for outgoing messages */
 };
 typedef struct transport_client_struct transport_client;
 
-transport_client* client_init( const char* server, int port, const char* unix_path, int component );
+transport_client* client_init(const char* server, 
+    int port, const char* username, const char* password);
 
-int client_connect( transport_client* client, 
-               const char* username, const char* password, const char* resource,
-               int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type );
+int client_connect_as_service(transport_client* client, const char* service);
+int client_connect(transport_client* client); 
 
 int client_disconnect( transport_client* client );
 
@@ -49,10 +55,14 @@ int client_free( transport_client* client );
 int client_discard( transport_client* client );
 
 int client_send_message( transport_client* client, transport_message* msg );
+int client_send_message_to(
+    transport_client* client, transport_message* msg, const char* recipient);
 
 int client_connected( const transport_client* client );
 
-transport_message* client_recv( transport_client* client, int timeout );
+transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream);
+transport_message* client_recv(transport_client* client, int timeout);
+transport_message* client_recv_for_service(transport_client* client, int timeout);
 
 int client_sock_fd( transport_client* client );
 
diff --git a/include/opensrf/transport_connection.h b/include/opensrf/transport_connection.h
new file mode 100644 (file)
index 0000000..25eef27
--- /dev/null
@@ -0,0 +1,65 @@
+#ifndef TRANSPORT_CONNECTION_H
+#define TRANSPORT_CONNECTION_H
+
+/**
+       @file transport_client.h
+       @brief Header for implementation of transport_client.
+
+       The transport_client routines provide an interface for sending and receiving Jabber
+       messages, one at a time.
+*/
+
+#include <hiredis.h>
+#include <opensrf/utils.h>
+#include <opensrf/log.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct transport_con_struct {
+    char* address;
+    char* domain;
+    int max_queue;
+    redisContext* bus;
+};
+typedef struct transport_con_struct transport_con;
+
+struct transport_con_msg_struct {
+    char* msg_id;
+    char* msg_json;
+};
+typedef struct transport_con_msg_struct transport_con_msg;
+
+transport_con* transport_con_new(const char* domain);
+
+void transport_con_free(transport_con* con);
+void transport_con_msg_free(transport_con_msg* msg);
+
+int transport_con_connected(transport_con* con);
+
+void transport_con_set_address(transport_con* con, const char* service);
+
+int transport_con_connect(transport_con* con, 
+    int port, const char* username, const char* password);
+
+int transport_con_disconnect(transport_con* con);
+
+int transport_con_send(transport_con* con, const char* msg_json, const char* stream);
+
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream);
+
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream);
+
+void transport_con_flush_socket(transport_con* con);
+
+int handle_redis_error(redisReply *reply, const char* command, ...);
+
+int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
index fb43f92..65a8447 100644 (file)
@@ -54,6 +54,7 @@ struct transport_message_struct {
        int error_code;        /**< Value of the "code" attribute of &lt;error&gt;. */
        int broadcast;         /**< Value of the "broadcast" attribute in the message element. */
        char* msg_xml;         /**< The entire message as XML, complete with entity encoding. */
+       char* msg_json;         /**< The entire message as JSON*/
        struct transport_message_struct* next;
 };
 typedef struct transport_message_struct transport_message;
@@ -62,6 +63,7 @@ transport_message* message_init( const char* body, const char* subject,
                const char* thread, const char* recipient, const char* sender );
 
 transport_message* new_message_from_xml( const char* msg_xml );
+transport_message* new_message_from_json( const char* msg_json );
 
 void message_set_router_info( transport_message* msg, const char* router_from,
                const char* router_to, const char* router_class, const char* router_command,
@@ -70,6 +72,7 @@ void message_set_router_info( transport_message* msg, const char* router_from,
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
 
 int message_prepare_xml( transport_message* msg );
+int message_prepare_json( transport_message* msg );
 
 int message_free( transport_message* msg );
 
index 0f90f2c..370b6b2 100644 (file)
@@ -31,7 +31,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas
 endif
 
 if BUILDCORE
-MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
+MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio
 dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl
 bin_SCRIPTS = @top_srcdir@/bin/osrf_config
 dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example 
index 666fb06..eeffc75 100644 (file)
@@ -18,7 +18,7 @@ endif
 EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \
        @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c
 
-AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
+AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
 AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf
 AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR`
 
index fd3729b..34dbc5a 100644 (file)
@@ -14,7 +14,7 @@
 
 AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS  -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS
 AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir)
-LDADD = -lopensrf
+LDADD = -lopensrf -lfyaml
 
 DISTCLEANFILES = Makefile.in Makefile
 
@@ -29,7 +29,6 @@ TARGS =               osrf_message.c \
                        osrfConfig.c \
                        osrf_application.c \
                        osrf_cache.c \
-                       osrf_transgroup.c \
                        osrf_list.c \
                        osrf_hash.c \
                        osrf_utf8.c \
@@ -37,6 +36,7 @@ TARGS =               osrf_message.c \
                        transport_message.c\
                        transport_session.c\
                        transport_client.c\
+                       transport_connection.c\
                        md5.c\
                        log.c\
                        utils.c\
@@ -47,6 +47,7 @@ TARGS =               osrf_message.c \
 TARGS_HEADS =   $(OSRF_INC)/transport_message.h \
                 $(OSRF_INC)/transport_session.h \
                 $(OSRF_INC)/transport_client.h \
+                $(OSRF_INC)/transport_connection.h \
                 $(OSRF_INC)/osrf_message.h \
                 $(OSRF_INC)/osrf_app_session.h \
                 $(OSRF_INC)/osrf_stack.h \
index df83e3e..0268c26 100644 (file)
@@ -541,46 +541,11 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) {
                return NULL;
        }
 
-       // Get a list of domain names from the config settings;
-       // ignore all but the first one in the list.
-       osrfStringArray* arr = osrfNewStringArray(8);
-       osrfConfigGetValueList(NULL, arr, "/domain");
-       const char* domain = osrfStringArrayGetString(arr, 0);
-       if (!domain) {
-               osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file");
-               free( session );
-               osrfStringArrayFree(arr);
-               return NULL;
-       }
-
-       // Get a router name from the config settings.
-       char* router_name = osrfConfigGetValue(NULL, "/router_name");
-       if (!router_name) {
-               osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file");
-               free( session );
-               osrfStringArrayFree(arr);
-               return NULL;
-       }
-
-       char target_buf[512];
-       target_buf[ 0 ] = '\0';
+    growing_buffer *buf = buffer_init(32);
+    buffer_add(buf, "opensrf:service:");
+    buffer_add(buf, remote_service);
 
-       // Using the router name, domain, and service name,
-       // build a Jabber ID for addressing the service.
-       int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s",
-                       router_name ? router_name : "(null)",
-                       domain ? domain : "(null)",
-                       remote_service ? remote_service : "(null)" );
-       osrfStringArrayFree(arr);
-       free(router_name);
-
-       if( len >= sizeof( target_buf ) ) {
-               osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id");
-               free( session );
-               return NULL;
-       }
-
-       session->remote_id = strdup(target_buf);
+    session->remote_id = buffer_release(buf);
        session->orig_remote_id = strdup(session->remote_id);
        session->remote_service = strdup(remote_service);
        session->session_locale = NULL;
@@ -1168,11 +1133,21 @@ int osrfSendChunkedResult(
        about it.
 */
 int osrfSendTransportPayload( osrfAppSession* session, const char* payload ) {
+
        transport_message* t_msg = message_init(
                payload, "", session->session_id, session->remote_id, NULL );
        message_set_osrf_xid( t_msg, osrfLogGetXid() );
 
-       int retval = client_send_message( session->transport_handle, t_msg );
+    // When sending a message to a top-level service address, retain the
+    // message recipient, but deliver the message to the router instead.
+    char buf[1024 + 1];
+    char* recipient = session->remote_id;
+    if (strstr(recipient, "opensrf:service:")) {
+        snprintf(buf, 1024, "opensrf:router:%s", session->transport_handle->primary_domain);
+        recipient = buf;
+    }
+
+       int retval = client_send_message_to(session->transport_handle, t_msg, recipient);
        if( retval ) {
                osrfLogError( OSRF_LOG_MARK, "client_send_message failed, exit()ing immediately" );
                exit(99);
index f02f635..9d764f8 100644 (file)
@@ -174,17 +174,12 @@ int osrf_prefork_run( const char* appname ) {
        free( max_children );
        /* --------------------------------------------------- */
 
-       char* resc = va_list_to_string( "%s_listener", appname );
-
        // Make sure that we haven't already booted
-       if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+    if (!osrf_system_bootstrap_common(NULL, "service", appname, 1)) {
                osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
-               free( resc );
                return -1;
        }
 
-       free( resc );
-
        prefork_simple forker;
 
        if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
@@ -238,20 +233,20 @@ static void osrf_prefork_send_router_registration(
        transport_client* client = osrfSystemGetTransportClient();
 
        // Construct the Jabber address of the router
-       char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
+       char* jid = va_list_to_string( "opensrf:router:%s", routerDomain );
 
        // Create the registration message, and send it
        transport_message* msg;
     if (unregister) {
 
            osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
-           msg = message_init( "unregistering", NULL, NULL, jid, NULL );
+           msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL );
            message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
 
     } else {
 
            osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
-           msg = message_init( "registering", NULL, NULL, jid, NULL );
+           msg = message_init( "\"registering\"", NULL, NULL, jid, NULL );
            message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
     }
 
@@ -363,7 +358,6 @@ static int prefork_child_init_hook( prefork_child* child ) {
 
        // Connect to cache server(s).
        osrfSystemInitCache();
-       char* resc = va_list_to_string( "%s_drone", child->appname );
 
        // If we're a source-client, tell the logger now that we're a new process.
        char* isclient = osrfConfigGetValue( NULL, "/client" );
@@ -375,15 +369,12 @@ static int prefork_child_init_hook( prefork_child* child ) {
     // TODO: not necessary if parent disconnects first
        osrfSystemIgnoreTransportClient();
 
-       // Connect to Jabber
-       if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+       // Connect to the message bus
+    if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
                osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
-               free( resc );
                return -1;
        }
 
-       free( resc );
-
        // Dynamically call the application-specific initialization function
        // from a previously loaded shared library.
        if( ! osrfAppRunChildInit( child->appname )) {
@@ -412,11 +403,13 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
 
        transport_client* client = osrfSystemGetTransportClient();
 
+    osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client);
+
        // Make sure that we're still connected to Jabber; reconnect if necessary.
        if( !client_connected( client )) {
                osrfSystemIgnoreTransportClient();
                osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
-               if( !osrf_system_bootstrap_client( NULL, NULL )) {
+        if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
                        osrfLogError( OSRF_LOG_MARK,
                                "Unable to bootstrap client in prefork_child_process_request()" );
                        sleep( 1 );
@@ -424,8 +417,8 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
                }
        }
 
-       // Construct the message from the xml.
-       transport_message* msg = new_message_from_xml( data );
+       // Construct the message from the json
+       transport_message* msg = new_message_from_json( data );
 
        // Respond to the transport message.  This is where method calls are buried.
        osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
@@ -859,7 +852,7 @@ static void prefork_run( prefork_simple* forker ) {
 
                // Wait indefinitely for an input message
                osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-               cur_msg = client_recv( forker->connection, -1 );
+               cur_msg = client_recv_for_service( forker->connection, -1 );
 
                if( cur_msg == NULL ) {
                        // most likely a signal was received.  clean up any recently
@@ -878,8 +871,8 @@ static void prefork_run( prefork_simple* forker ) {
             continue;
         }
 
-               message_prepare_xml( cur_msg );
-               const char* msg_data = cur_msg->msg_xml;
+               message_prepare_json( cur_msg );
+               const char* msg_data = cur_msg->msg_json;
                if( ! msg_data || ! *msg_data ) {
                        osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
                                (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
index 684dd0d..3a56002 100644 (file)
@@ -74,8 +74,9 @@ void osrfSystemIgnoreTransportClient() {
 
        A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource.
 */
-int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
-       return osrfSystemBootstrapClientResc(config_file, contextnode, NULL);
+
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode) {
+    return osrf_system_bootstrap_common(config_file, contextnode, "client", 0);
 }
 
 /**
@@ -185,7 +186,7 @@ int osrf_system_service_ctrl(
         const char* action, const char* service) {
     
     // Load the conguration, open the log, open a connection to Jabber
-    if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) {
+    if (!osrf_system_bootstrap_common(config, context, "client", 0)) {
         osrfLogError(OSRF_LOG_MARK,
             "Unable to bootstrap for host %s from configuration file %s",
             hostname, config);
@@ -336,8 +337,19 @@ int osrf_system_service_ctrl(
        - Open the log.
        - Open a connection to Jabber.
 */
-int osrfSystemBootstrapClientResc( const char* config_file,
-               const char* contextnode, const char* resource ) {
+int osrfSystemBootstrapClientResc(const char* config_file,
+    const char* contextnode, const char* appname) {
+    return osrf_system_bootstrap_common(config_file, contextnode, appname, 0);
+}
+
+int osrf_system_bootstrap_common(const char* config_file,
+               const char* contextnode, const char* appname, int is_service) {
+
+    if (contextnode == NULL) {
+        osrfLogError(OSRF_LOG_MARK,
+            "osrf_system_bootstrap_common() requires a connection type");
+        return -1;
+    }
 
        int failure = 0;
 
@@ -352,7 +364,7 @@ int osrfSystemBootstrapClientResc( const char* config_file,
        }
 
        if( config_file ) {
-               osrfConfig* cfg = osrfConfigInit( config_file, contextnode );
+               osrfConfig* cfg = osrfConfigInit(config_file, contextnode);
                if(cfg)
                        osrfConfigSetDefaultConfig(cfg);
                else
@@ -360,14 +372,15 @@ int osrfSystemBootstrapClientResc( const char* config_file,
 
                // fetch list of configured log redaction marker strings
                log_protect_arr = osrfNewStringArray(8);
-               osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
-               osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
+        osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
+        osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
        }
 
-       char* log_file      = osrfConfigGetValue( NULL, "/logfile");
-       if(!log_file) {
+
+    char* log_file = osrfConfigGetValue( NULL, "/logfile");
+       if (!log_file) {
                fprintf(stderr, "No log file specified in configuration file %s\n",
-                               config_file);
+               config_file);
                return -1;
        }
 
@@ -378,40 +391,38 @@ int osrfSystemBootstrapClientResc( const char* config_file,
        char* username       = osrfConfigGetValue( NULL, "/username" );
        char* password       = osrfConfigGetValue( NULL, "/passwd" );
        char* port           = osrfConfigGetValue( NULL, "/port" );
-       char* unixpath       = osrfConfigGetValue( NULL, "/unixpath" );
        char* facility       = osrfConfigGetValue( NULL, "/syslog" );
        char* actlog         = osrfConfigGetValue( NULL, "/actlog" );
        char* logtag         = osrfConfigGetValue( NULL, "/logtag" );
 
        /* if we're a source-client, tell the logger */
-       char* isclient = osrfConfigGetValue(NULL, "/client");
+    char* isclient = osrfConfigGetValue(NULL, "/client");
        if( isclient && !strcasecmp(isclient,"true") )
                osrfLogSetIsClient(1);
        free(isclient);
 
        int llevel = 0;
        int iport = 0;
-       if(port) iport = atoi(port);
-       if(log_level) llevel = atoi(log_level);
+       if (port) iport = atoi(port);
+       if (log_level) llevel = atoi(log_level);
 
        if(!strcmp(log_file, "syslog")) {
                if(logtag) osrfLogSetLogTag(logtag);
-               osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel );
+               osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel );
                osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility));
                if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog));
 
        } else {
-               osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel );
+               osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel );
                osrfLogSetFile( log_file );
        }
 
-
        /* Get a domain, if one is specified */
        const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
-       if(!domain) {
+       if (!domain) {
                fprintf(stderr, "No domain specified in configuration file %s\n", config_file);
-               osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n",
-                               config_file );
+               osrfLogError(OSRF_LOG_MARK, 
+                       "No domain specified in configuration file %s\n", config_file );
                failure = 1;
        }
 
@@ -429,51 +440,33 @@ int osrfSystemBootstrapClientResc( const char* config_file,
                failure = 1;
        }
 
-       if((iport <= 0) && !unixpath) {
-               fprintf(stderr, "No unixpath or valid port in configuration file %s\n", config_file);
-               osrfLogError( OSRF_LOG_MARK, "No unixpath or valid port in configuration file %s\n",
-                       config_file);
-               failure = 1;
-       }
-
        if (failure) {
-               osrfStringArrayFree(arr);
                free(log_file);
                free(log_level);
                free(username);
                free(password);
                free(port);
-               free(unixpath);
                free(facility);
                free(actlog);
                free(logtag);
                return 0;
        }
 
-       osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s",
-               domain, iport, unixpath ? unixpath : "(none)" );
-       transport_client* client = client_init( domain, iport, unixpath, 0 );
-
-       char host[HOST_NAME_MAX + 1] = "";
-       gethostname(host, sizeof(host) );
-       host[HOST_NAME_MAX] = '\0';
+       osrfLogInfo(OSRF_LOG_MARK, 
+        "Bootstrapping system with domain %s, port %d", domain, iport);
 
-       char tbuf[32];
-       tbuf[0] = '\0';
-       snprintf(tbuf, 32, "%f", get_timestamp_millis());
+       transport_client* client = client_init(domain, iport, username, password);
 
-       if(!resource) resource = "";
-
-       int len = strlen(resource) + 256;
-       char buf[len];
-       buf[0] = '\0';
-       snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() );
-
-       if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) {
-               osrfGlobalTransportClient = client;
-       }
+    if (is_service) {
+           if (client_connect_as_service(client, appname)) {
+                   osrfGlobalTransportClient = client;
+           }
+    } else {
+           if (client_connect(client)) {
+                   osrfGlobalTransportClient = client;
+           }
+    }
 
-       osrfStringArrayFree(arr);
        free(actlog);
        free(facility);
        free(log_level);
@@ -481,7 +474,6 @@ int osrfSystemBootstrapClientResc( const char* config_file,
        free(username);
        free(password);
        free(port);
-       free(unixpath);
 
        if(osrfGlobalTransportClient)
                return 1;
index 2a86d0c..f020646 100644 (file)
 #include <opensrf/transport_client.h>
 
-/**
-       @file transport_client.c
-       @brief Collection of routines for sending and receiving single messages over Jabber.
+transport_client* client_init(const char* domain, 
+    int port, const char* username, const char* password) {
 
-       These functions form an API built on top of the transport_session API.  They serve
-       two main purposes:
-       - They remember a Jabber ID to use when sending messages.
-       - They maintain a queue of input messages that the calling code can get one at a time.
-*/
+    osrfLogInfo(OSRF_LOG_MARK, 
+        "TCLIENT client_init domain=%s port=%d username=%s", domain, port, username);
+
+       transport_client* client = safe_malloc(sizeof(transport_client));
+    client->primary_domain = strdup(domain);
+    client->connections = osrfNewHash();
+
+    // These 2 only get values if this client works for a service.
+       client->service = NULL;
+       client->service_address = NULL;
 
-static void client_message_handler( void* client, transport_message* msg );
+    client->username = username ? strdup(username) : NULL;
+    client->password = password ? strdup(password) : NULL;
 
-//int main( int argc, char** argv );
+    client->port = port;
+    client->primary_connection = NULL;
 
-/*
-int main( int argc, char** argv ) {
+       client->error = 0;
 
-       transport_message* recv;
-       transport_message* send;
+       return client;
+}
 
-       transport_client* client = client_init( "spacely.georgialibraries.org", 5222 );
+static transport_con* client_connect_common(
+    transport_client* client, const char* domain) {
 
-       // try to connect, allow 15 second connect timeout
-       if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) {
-               printf("Connected...\n");
-       } else {
-               printf( "NOT Connected...\n" ); exit(99);
-       }
+    osrfLogInfo(OSRF_LOG_MARK, "TCLIENT Connecting to domain: %s", domain);
 
-       while( (recv = client_recv( client, -1 )) ) {
+    transport_con* con = transport_con_new(domain);
 
-               if( recv->body ) {
-                       int len = strlen(recv->body);
-                       char buf[len + 20];
-                       osrf_clearbuf( buf, 0, sizeof(buf));
-                       sprintf( buf, "Echoing...%s", recv->body );
-                       send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" );
-               } else {
-                       send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" );
-               }
+    osrfHashSet(client->connections, (void*) con, (char*) domain);
 
-               if( send == NULL ) { printf("something's wrong"); }
-               client_send_message( client, send );
+    return con;
+}
 
-               message_free( send );
-               message_free( recv );
-       }
 
-       printf( "ended recv loop\n" );
+static transport_con* get_transport_con(transport_client* client, const char* domain) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT get_transport_con() domain=%s", domain);
 
-       return 0;
+    transport_con* con = (transport_con*) osrfHashGet(client->connections, (char*) domain);
 
+    if (con != NULL) { return con; }
+
+    // If we don't have the a connection for the requested domain,
+    // it means we're setting up a connection to a remote domain.
+
+    con = client_connect_common(client, domain);
+
+    transport_con_set_address(con, NULL);
+
+    // Connections to remote domains assume the same connection
+    // attributes apply.
+    transport_con_connect(con, client->port, client->username, client->password);
+
+    return con;
 }
-*/
 
+int client_connect_as_service(transport_client* client, const char* service) {
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "TCLIENT client_connect_as_service() service=%s", service);
 
-/**
-       @brief Allocate and initialize a transport_client.
-       @param server Domain name where the Jabber server resides.
-       @param port Port used for connecting to Jabber (0 if using UNIX domain socket).
-       @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket).
-       @param component Boolean; true if we're a Jabber component.
-       @return A pointer to a newly created transport_client.
-
-       Create a transport_client with a transport_session and an empty message queue (but don't
-       open a connection yet).  Install a callback function in the transport_session to enqueue
-       incoming messages.
-
-       The calling code is responsible for freeing the transport_client by calling client_free().
-*/
-transport_client* client_init( const char* server, int port, const char* unix_path, int component ) {
+    growing_buffer* buf = buffer_init(32);
 
-       if(server == NULL) return NULL;
+    buffer_fadd(buf, "opensrf:service:%s", service);
 
-       /* build and clear the client object */
-       transport_client* client = safe_malloc( sizeof( transport_client) );
+    client->service_address = buffer_release(buf);
+    client->service = strdup(service);
 
-       /* start with an empty message queue */
-       client->msg_q_head = NULL;
-       client->msg_q_tail = NULL;
+    transport_con* con = client_connect_common(client, client->primary_domain);
 
-       /* build the session */
-       client->session = init_transport( server, port, unix_path, client, component );
+    transport_con_set_address(con, service);
 
-       client->session->message_callback = client_message_handler;
-       client->error = 0;
-       client->host = strdup(server);
-       client->xmpp_id = NULL;
+    client->primary_connection = con;
 
-       return client;
+    transport_con_connect(
+        con, client->port, client->username, client->password);
+
+    // Make a stream for the service address
+    return transport_con_make_stream(con, client->service_address, 1);
 }
 
+int client_connect(transport_client* client) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_connect()");
 
-/**
-       @brief Open a Jabber session for a transport_client.
-       @param client Pointer to the transport_client.
-       @param username Jabber user name.
-       @param password Password for the Jabber logon.
-       @param resource Resource name for the Jabber logon.
-       @param connect_timeout How many seconds to wait for the connection to open.
-       @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes).
-       @return 1 if successful, or 0 upon error.
-
-       Besides opening the Jabber session, create a Jabber ID for future use.
-
-       If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond.  If
-       @a connect_timeout is zero, don't wait at all.  If @a timeout is positive, wait that
-       number of seconds before timing out.  If @a connect_timeout has a negative value other
-       than -1, the results are not well defined.
-
-       The value of @a connect_timeout applies to each of two stages in the logon procedure.
-       Hence the logon may take up to twice the amount of time indicated.
-
-       If we connect as a Jabber component, we send the password as an SHA1 hash.  Otherwise
-       we look at the @a auth_type.  If it's AUTH_PLAIN, we send the password as plaintext; if
-       it's AUTH_DIGEST, we send it as a hash.
- */
-int client_connect( transport_client* client,
-               const char* username, const char* password, const char* resource,
-               int connect_timeout, enum TRANSPORT_AUTH_TYPE  auth_type ) {
-       if( client == NULL )
-               return 0;
-
-       // Create and store a Jabber ID
-       if( client->xmpp_id )
-               free( client->xmpp_id );
-       client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource );
-
-       // Open a transport_session
-       return session_connect( client->session, username,
-                       password, resource, connect_timeout, auth_type );
+    transport_con* con = client_connect_common(client, client->primary_domain);
+
+    transport_con_set_address(con, NULL);
+
+    client->primary_connection = con;
+
+    return transport_con_connect(
+        con, client->port, client->username, client->password);
 }
 
-/**
-       @brief Disconnect from the Jabber session.
-       @param client Pointer to the transport_client.
-       @return 0 in all cases.
+// Disconnect all connections and remove them from the connections hash.
+int client_disconnect(transport_client* client) {
 
-       If there are any messages still in the queue, they stay there; i.e. we don't free them here.
-*/
-int client_disconnect( transport_client* client ) {
-       if( client == NULL ) { return 0; }
-       return session_disconnect( client->session );
+    osrfLogDebug(OSRF_LOG_MARK, "TCLIENT Disconnecting all transport connections");
+
+    osrfHashIterator* iter = osrfNewHashIterator(client->connections);
+
+    transport_con* con;
+
+    while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) {
+        osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Disconnecting from domain: %s", con->domain);
+        transport_con_disconnect(con);
+        transport_con_free(con);
+    }
+
+    osrfHashIteratorFree(iter);
+    osrfHashFree(client->connections);
+
+    client->connections = osrfNewHash();
+
+    return 1;
 }
 
-/**
-       @brief Report whether a transport_client is connected.
-       @param client Pointer to the transport_client.
-       @return Boolean: 1 if connected, or 0 if not.
-*/
 int client_connected( const transport_client* client ) {
-       if(client == NULL) return 0;
-       return session_connected( client->session );
+       return (client != NULL && client->primary_connection != NULL);
 }
 
-/**
-       @brief Send a transport message to the current destination.
-       @param client Pointer to a transport_client.
-       @param msg Pointer to the transport_message to be sent.
-       @return 0 if successful, or -1 if not.
+static char* get_domain_from_address(const char* address) {
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "TCLIENT get_domain_from_address() address=%s", address);
 
-       Translate the transport_message into XML and send it to Jabber, using the previously
-       stored Jabber ID for the sender.
-*/
-int client_send_message( transport_client* client, transport_message* msg ) {
-       if( client == NULL || client->error )
-               return -1;
-       if( msg->sender )
-               free( msg->sender );
-       msg->sender = strdup(client->xmpp_id);
-       return session_send_msg( client->session, msg );
+    char* addr_copy = strdup(address);
+    strtok(addr_copy, ":"); // "opensrf:"
+    strtok(NULL, ":"); // "client:"
+    char* domain = strtok(NULL, ":");
+
+    if (domain) {
+        // About to free addr_copy...
+        domain = strdup(domain);
+    } else {
+        osrfLogError(OSRF_LOG_MARK, "No domain parsed from address: %s", address);
+    }
+
+    free(addr_copy);
+
+    return domain;
 }
 
-/**
-       @brief Fetch an input message, if one is available.
-       @param client Pointer to a transport_client.
-       @param timeout How long to wait for a message to arrive, in seconds (see remarks).
-       @return A pointer to a transport_message if successful, or NULL if not.
+int client_send_message(transport_client* client, transport_message* msg) {
+    return client_send_message_to(client, msg, msg->recipient) ;
+}
 
-       If there is a message already in the queue, return it immediately.  Otherwise read any
-       available messages from the transport_session (subject to a timeout), and return the
-       first one.
+int client_send_message_to(transport_client* client, transport_message* msg, const char* recipient) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_send_message()");
 
-       If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a
-       message arrives (or we error out for other reasons).  If the value of @a timeout is zero,
-       don't wait at all.
+       if (client == NULL || client->error) { return -1; }
 
-       The calling code is responsible for freeing the transport_message by calling message_free().
-*/
-transport_message* client_recv( transport_client* client, int timeout ) {
-       if( client == NULL ) { return NULL; }
+    transport_con* con;
 
-       int error = 0;  /* boolean */
+    if (strstr(recipient, "opensrf:client")) {
+        // We may be talking to a worker that runs on a remote domain.
+        // Find or create a connection to the domain.
 
-       if( NULL == client->msg_q_head ) {
+        char* domain = get_domain_from_address(recipient);
 
-               // No message available on the queue?  Try to get a fresh one.
+        if (!domain) { return -1; }
 
-               // When we call session_wait(), it reads a socket for new messages.  When it finds
-               // one, it enqueues it by calling the callback function client_message_handler(),
-               // which we installed in the transport_session when we created the transport_client.
+        con = get_transport_con(client, domain);
 
-               // Since a single call to session_wait() may not result in the receipt of a complete
-               // message. we call it repeatedly until we get either a message or an error.
+        if (!con) {
+            osrfLogError(
+                OSRF_LOG_MARK, "Error creating connection for domain: %s", domain);
 
-               // Alternatively, a single call to session_wait() may result in the receipt of
-               // multiple messages.  That's why we have to enqueue them.
+            return -1;
+        }
 
-               // The timeout applies to the receipt of a complete message.  For a sufficiently
-               // short timeout, a sufficiently long message, and a sufficiently slow connection,
-               // we could timeout on the first message even though we're still receiving data.
-               
-               // Likewise we could time out while still receiving the second or subsequent message,
-               // return the first message, and resume receiving messages later.
+    } else {
+        con = client->primary_connection;
+    }
+        
+       if (msg->sender) { free(msg->sender); }
+       msg->sender = strdup(con->address);
 
-               if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
+    message_prepare_json(msg);
 
-                       int x;
-                       do {
-                               if( (x = session_wait( client->session, -1 )) ) {
-                                       osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
-                                       error = 1;
-                                       break;
-                               }
-                       } while( client->msg_q_head == NULL );
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "client_send_message() to=%s %s", recipient, msg->msg_json);
+
+    return transport_con_send(con, msg->msg_json, recipient);
+
+    osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
+    
+    return 0;
+}
 
-               } else {    /* loop up to 'timeout' seconds waiting for data to arrive  */
+transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
 
-                       /* This loop assumes that a time_t is denominated in seconds -- not */
-                       /* guaranteed by Standard C, but a fair bet for Linux or UNIX       */
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "TCLIENT client_recv_stream() timeout=%d stream=%s", timeout, stream);
 
-                       time_t start = time(NULL);
-                       time_t remaining = (time_t) timeout;
+    transport_con_msg* con_msg = 
+        transport_con_recv(client->primary_connection, timeout, stream);
 
-                       int wait_ret;
-                       do {
-                               if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
-                                       error = 1;
-                                       osrfLogDebug(OSRF_LOG_MARK,
-                                               "session_wait returned failure code %d: setting error=1\n", wait_ret);
-                                       break;
-                               }
+    if (con_msg == NULL) { return NULL; } // Receive timed out.
 
-                               remaining -= time(NULL) - start;
-                       } while( NULL == client->msg_q_head && remaining > 0 );
-               }
-       }
+       transport_message* msg = new_message_from_json(con_msg->msg_json);
 
-       transport_message* msg = NULL;
+    transport_con_msg_free(con_msg);
 
-       if( !error && client->msg_q_head != NULL ) {
-               /* got message(s); dequeue the oldest one */
-               msg = client->msg_q_head;
-               client->msg_q_head = msg->next;
-               msg->next = NULL;  /* shouldn't be necessary; nullify for good hygiene */
-               if( NULL == client->msg_q_head )
-                       client->msg_q_tail = NULL;
-       }
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "client_recv() read response for thread %s", msg->thread);
 
        return msg;
 }
 
-/**
-       @brief Enqueue a newly received transport_message.
-       @param client A pointer to a transport_client, cast to a void pointer.
-       @param msg A new transport message.
-
-       Add a newly arrived input message to the tail of the queue.
+transport_message* client_recv(transport_client* client, int timeout) {
 
-       This is a callback function.  The transport_session parses the XML coming in through a
-       socket, accumulating various bits and pieces.  When it sees the end of a message stanza,
-       it packages the bits and pieces into a transport_message that it passes to this function,
-       which enqueues the message for processing.
-*/
-static void client_message_handler( void* client, transport_message* msg ){
+    return client_recv_stream(client, timeout, client->primary_connection->address);
+}
 
-       if(client == NULL) return;
-       if(msg == NULL) return;
+transport_message* client_recv_for_service(transport_client* client, int timeout) {
 
-       transport_client* cli = (transport_client*) client;
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Receiving for service %s", client->service);
 
-       /* add the new message to the tail of the queue */
-       if( NULL == cli->msg_q_head )
-               cli->msg_q_tail = cli->msg_q_head = msg;
-       else {
-               cli->msg_q_tail->next = msg;
-               cli->msg_q_tail = msg;
-       }
-       msg->next = NULL;
+    return client_recv_stream(client, timeout, client->service_address);
 }
 
-
 /**
        @brief Free a transport_client, along with all resources it owns.
        @param client Pointer to the transport_client to be freed.
        @return 1 if successful, or 0 if not.  The only error condition is if @a client is NULL.
 */
 int client_free( transport_client* client ) {
-       if(client == NULL)
-               return 0;
-       session_free( client->session );
-       client->session = NULL;
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_free()");
+       if (client == NULL) { return 0; }
        return client_discard( client );
 }
 
@@ -315,29 +244,39 @@ int client_free( transport_client* client ) {
        disconnect the parent as well.
  */
 int client_discard( transport_client* client ) {
-       if(client == NULL)
-               return 0;
-       
-       transport_message* current = client->msg_q_head;
-       transport_message* next;
-
-       /* deallocate the list of messages */
-       while( current != NULL ) {
-               next = current->next;
-               message_free( current );
-               current = next;
-       }
-
-       free(client->host);
-       free(client->xmpp_id);
-       free( client );
+    osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_discard()");
+
+       if (client == NULL) { return 0; }
+
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "Discarding client on domain %s", client->primary_domain);
+
+       if (client->primary_domain) { free(client->primary_domain); }
+       if (client->service) { free(client->service); }
+       if (client->service_address) { free(client->service_address); }
+    if (client->username) { free(client->username); }
+    if (client->password) { free(client->password); }
+
+    // Clean up our connections.
+    // We do not disconnect here since they caller may or may
+    // not want the socket closed.
+    // If disconnect() was just called, the connections hash
+    // will be empty.
+    osrfHashIterator* iter = osrfNewHashIterator(client->connections);
+
+    transport_con* con;
+
+    while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) {
+        osrfLogInternal(OSRF_LOG_MARK, 
+            "client_discard() freeing connection for %s", con->domain);
+        transport_con_free(con);
+    }
+
+    osrfHashIteratorFree(iter);
+    osrfHashFree(client->connections);
+
+       free(client);
+
        return 1;
 }
 
-int client_sock_fd( transport_client* client )
-{
-       if( !client )
-               return 0;
-       else
-               return client->session->sock_id;
-}
diff --git a/src/libopensrf/transport_connection.c b/src/libopensrf/transport_connection.c
new file mode 100644 (file)
index 0000000..c4544db
--- /dev/null
@@ -0,0 +1,321 @@
+#include <opensrf/transport_connection.h>
+
+transport_con* transport_con_new(const char* domain) {
+
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_new() domain=%s", domain);
+
+    transport_con* con = safe_malloc(sizeof(transport_con));
+
+    con->bus = NULL;
+    con->address = NULL;
+    con->domain = strdup(domain);
+    con->max_queue = 1000; // TODO pull from config
+
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "TCON created transport connection with domain: %s", con->domain);
+
+    return con;
+}
+
+void transport_con_msg_free(transport_con_msg* msg) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()");
+
+    if (msg == NULL) { return; } 
+
+    if (msg->msg_id) { free(msg->msg_id); }
+    if (msg->msg_json) { free(msg->msg_json); }
+
+    free(msg);
+}
+
+void transport_con_free(transport_con* con) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_free()");
+
+    osrfLogInternal(
+        OSRF_LOG_MARK, "Freeing transport connection for %s", con->domain);
+
+    if (con->bus) { free(con->bus); }
+    if (con->address) { free(con->address); }
+    if (con->domain) { free(con->domain); }
+
+    free(con);
+}
+
+int transport_con_connected(transport_con* con) {
+    return con->bus != NULL;
+} 
+
+void transport_con_set_address(transport_con* con, const char* service) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_set_address()");
+
+    char hostname[1024];
+    hostname[1023] = '\0';
+    gethostname(hostname, 1023);
+
+    growing_buffer *buf = buffer_init(64);
+    buffer_fadd(buf, "opensrf:client:%s:%s:", con->domain, hostname);
+
+    if (service != NULL) {
+        buffer_fadd(buf, "%s:", service);
+    }
+
+    buffer_fadd(buf, "%ld", (long) getpid());
+
+    char junk[256];
+    snprintf(junk, sizeof(junk), 
+        "%f%d", get_timestamp_millis(), (int) time(NULL));
+
+    char* md5 = md5sum(junk);
+
+    buffer_add(buf, ":");
+    buffer_add_n(buf, md5, 8);
+
+    con->address = buffer_release(buf);
+
+    osrfLogDebug(OSRF_LOG_MARK, "Connection set address to %s", con->address);
+}
+
+int transport_con_connect(
+    transport_con* con, int port, const char* username, const char* password) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_connect()");
+
+    osrfLogDebug(OSRF_LOG_MARK, "Transport con connecting with bus "
+        "domain=%s; address=%s; port=%d; username=%s", 
+        con->domain,
+        con->address,
+        port, 
+        username
+    );
+
+    con->bus = redisConnect(con->domain, port);
+
+    if (con->bus == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance");
+        return 0;
+    }
+
+    osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK");
+
+    redisReply *reply = 
+        redisCommand(con->bus, "AUTH %s %s", username, password);
+
+    if (handle_redis_error(reply, "AUTH %s %s", username, password)) { 
+        return 0; 
+    }
+
+    freeReplyObject(reply);
+
+    return transport_con_make_stream(con, con->address, 0);
+}
+
+int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream);
+
+    redisReply *reply = redisCommand(
+        con->bus, 
+        "XGROUP CREATE %s %s $ mkstream", 
+        stream,
+        stream,
+        "$",
+        "mkstream"
+    );
+
+    // Produces an error when a group/stream already exists, but that's
+    // acceptible when creating a group/stream for a stop-level service 
+    // address, since multiple Listeners are allowed.
+    if (handle_redis_error(reply, 
+        "XGROUP CREATE %s %s $ mkstream", 
+        stream,
+        stream,
+        "$",
+        "mkstream"
+    )) { return exists_ok; }
+
+    freeReplyObject(reply);
+
+    return 1;
+}
+
+int transport_con_disconnect(transport_con* con) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_disconnect()");
+
+    if (con == NULL || con->bus == NULL) { return -1; }
+
+    redisReply *reply = redisCommand(con->bus, "DEL %s", con->address);
+
+    if (!handle_redis_error(reply, "DEL %s", con->address)) {
+        freeReplyObject(reply);
+    }
+
+    redisFree(con->bus);
+    con->bus = NULL;
+
+    return 0;
+}
+
+int transport_con_send(transport_con* con, const char* msg_json, const char* stream) {
+
+    osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json);
+
+    redisReply *reply = redisCommand(con->bus,
+        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+        stream,
+        con->max_queue,
+        msg_json
+    );
+
+    if (handle_redis_error(reply, 
+        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+        stream, con->max_queue, msg_json)) {
+
+        return -1;
+    }
+
+    freeReplyObject(reply);
+
+    return 0;
+}
+
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) {
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream);
+
+    if (stream == NULL) { stream = con->address; }
+
+    redisReply *reply, *tmp;
+    char *msg_id = NULL, *json = NULL;
+
+    if (timeout == 0) {
+
+        reply = redisCommand(con->bus, 
+            "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
+            stream, con->address, stream
+        );
+
+    } else {
+
+        if (timeout == -1) {
+            // Redis timeout 0 means block indefinitely
+            timeout = 0;
+        } else {
+            // Milliseconds
+            timeout *= 1000;
+        }
+
+        reply = redisCommand(con->bus, 
+            "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
+            stream, con->address, timeout, stream
+        );
+    }
+
+    // Timeout or error
+    if (handle_redis_error(
+        reply,
+        "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >",
+        stream, con->address, "BLOCK X", stream
+    )) { return NULL; }
+
+    // Unpack the XREADGROUP response, which is a nest of arrays.
+    // These arrays are mostly 1 and 2-element lists, since we are 
+    // only reading one item on a single stream.
+    if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
+        tmp = reply->element[0];
+
+        if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+            tmp = tmp->element[1];
+
+            if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
+                tmp = tmp->element[0];
+
+                if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+                    redisReply *r1 = tmp->element[0];
+                    redisReply *r2 = tmp->element[1];
+
+                    if (r1->type == REDIS_REPLY_STRING) {
+                        msg_id = strdup(r1->str);
+                    }
+
+                    if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
+                        // r2->element[0] is the message name, which we
+                        // currently don't use for anything.
+
+                        r2 = r2->element[1];
+
+                        if (r2->type == REDIS_REPLY_STRING) {
+                            json = strdup(r2->str);
+                        }
+                    }
+                }
+            }
+        }
+    }
+
+    freeReplyObject(reply); // XREADGROUP
+
+    if (msg_id == NULL) {
+        // Read timed out. 'json' will also be NULL.
+        return NULL;
+    }
+
+    transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg));
+    tcon_msg->msg_id = msg_id;
+    tcon_msg->msg_json = json;
+
+    osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
+
+    return tcon_msg;
+}
+
+
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream) {
+    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_recv() stream=%s", stream);
+
+    if (timeout == 0) {
+        return transport_con_recv_once(con, 0, stream);
+
+    } else if (timeout < 0) {
+        // Keep trying until we have a result.
+
+        while (1) {
+            transport_con_msg* msg = transport_con_recv_once(con, -1, stream);
+            if (msg != NULL) { return msg; }
+        }
+    }
+
+    time_t seconds = (time_t) timeout;
+
+    while (seconds > 0) {
+        // Keep trying until we get a response or our timeout is exhausted.
+
+        time_t now = time(NULL);
+        transport_con_msg* msg = transport_con_recv_once(con, timeout, stream);
+
+        if (msg == NULL) {
+            seconds -= now;
+        } else {
+            return msg;
+        }
+    }
+
+    return NULL;
+}
+
+void transport_con_flush_socket(transport_con* con) {
+}
+
+// Returns false/0 on success, true/1 on failure.
+// On error, the reply is freed.
+int handle_redis_error(redisReply *reply, const char* command, ...) {
+    VA_LIST_TO_STRING(command);
+
+    if (reply != NULL && reply->type != REDIS_REPLY_ERROR) {
+        osrfLogInternal(OSRF_LOG_MARK, "Redis Command: %s", VA_BUF);
+        return 0;
+    }
+
+    char* err = reply == NULL ? "" : reply->str;
+    osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF);
+    freeReplyObject(reply);
+
+    return 1;
+}
index 332b622..59a7f1a 100644 (file)
@@ -1,4 +1,5 @@
 #include <opensrf/transport_message.h>
+#include <opensrf/osrf_json.h>
 
 /**
        @file transport_message.c
@@ -66,11 +67,72 @@ transport_message* message_init( const char* body, const char* subject,
        msg->error_code     = 0;
        msg->broadcast      = 0;
        msg->msg_xml        = NULL;
+    msg->msg_json       = NULL;
        msg->next           = NULL;
 
        return msg;
 }
 
+transport_message* new_message_from_json(const char* msg_json) {
+
+    if (msg_json == NULL || *msg_json == '\0') { return NULL; }
+
+    transport_message* new_msg = safe_malloc(sizeof(transport_message));
+
+    new_msg->body           = NULL;
+    new_msg->subject        = NULL;
+    new_msg->thread         = NULL;
+    new_msg->recipient      = NULL;
+    new_msg->sender         = NULL;
+    new_msg->router_from    = NULL;
+    new_msg->router_to      = NULL;
+    new_msg->router_class   = NULL;
+    new_msg->router_command = NULL;
+    new_msg->osrf_xid       = NULL;
+    new_msg->is_error       = 0;
+    new_msg->error_type     = NULL;
+    new_msg->error_code     = 0;
+    new_msg->broadcast      = 0;
+    new_msg->msg_xml        = NULL;
+    new_msg->next           = NULL;
+
+    jsonObject* json_hash = jsonParse(msg_json);
+
+    if (json_hash == NULL || json_hash->type != JSON_HASH) {
+        osrfLogError(OSRF_LOG_MARK,  "new_message_from_json() received bad JSON");
+        jsonObjectFree(json_hash);
+        message_free(new_msg);
+        return NULL;
+    }
+
+    const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from"));
+    if (sender) { new_msg->sender = strdup((const char*) sender); }
+
+    const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to"));
+    if (recipient) { new_msg->recipient = strdup((const char*) recipient); }
+
+    const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread"));
+    if (thread == NULL) { thread = ""; }
+    new_msg->thread = strdup((const char*) thread);
+
+    const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid"));
+    if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); }
+
+    // TODO
+    // Internally the mesage body is stored as a JSON string
+    // On the wire, it's just part of the message.  We could get
+    // rid if this extra json encode/decode step if we treated
+    // the body as a JSON object internally.
+    const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body"));
+    if (body == NULL) { body = ""; }
+    new_msg->body = strdup((const char*) body);
+
+    jsonObjectFree(json_hash);
+
+    return new_msg;
+}
+
+
 
 /**
        @brief Translate an XML string into a transport_message.
@@ -308,11 +370,49 @@ int message_free( transport_message* msg ){
        free(msg->osrf_xid);
        if( msg->error_type != NULL ) free(msg->error_type);
        if( msg->msg_xml != NULL ) free(msg->msg_xml);
+       if( msg->msg_json != NULL ) free(msg->msg_json);
        free(msg);
        return 1;
 }
 
 
+int message_prepare_json(transport_message* msg) {
+
+    if (!msg) { return 0; }
+    if (msg->msg_json) { return 1; }   /* already done */
+
+    jsonObject* json_hash = jsonNewObject(NULL);
+    jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient));
+    jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender));
+    jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread));
+    jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid));
+
+    if (msg->router_from) {
+        jsonObjectSetKey(json_hash, "router_from", jsonNewObject(msg->router_from));
+    }
+    if (msg->router_to) {
+        jsonObjectSetKey(json_hash, "router_to", jsonNewObject(msg->router_to));
+    }
+    if (msg->router_class) {
+        jsonObjectSetKey(json_hash, "router_class", jsonNewObject(msg->router_class));
+    }
+    if (msg->router_command) {
+        jsonObjectSetKey(json_hash, "router_command", jsonNewObject(msg->router_command));
+    }
+
+    // TODO the various layers expect the message body to be a separate
+    // JSON string, but on the bus, the body is just another key 
+    // in the JSON object.
+    jsonObjectSetKey(json_hash, "body", jsonParse(msg->body));
+
+    msg->msg_json = jsonObjectToJSON(json_hash);
+
+    jsonObjectFree(json_hash);
+
+    return 1;
+}
+
+
 /**
        @brief Build a &lt;message&gt; element and store it as a string in the msg_xml member.
        @param msg Pointer to a transport_message.
index 603ba3c..25ddd1b 100644 (file)
@@ -208,18 +208,7 @@ sub last_sent_type {
 
 sub get_app_targets {
        my $app = shift;
-
-       my $conf = OpenSRF::Utils::Config->current;
-       my $router_name = $conf->bootstrap->router_name || 'router';
-       my $domain = $conf->bootstrap->domain;
-       $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
-       unless($router_name and $domain) {
-               throw OpenSRF::EX::Config 
-                       ("Missing router config information 'router_name' and 'domain'");
-       }
-
-    return ("$router_name\@$domain/$app");
+    return ("opensrf:service:$app");
 }
 
 sub stateless {
@@ -579,12 +568,24 @@ sub send {
 
        } 
        my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc);
-       $logger->internal("AppSession sending doc: $json");
 
-       $self->{peer_handle}->send( 
-                                       to     => $self->remote_id,
-                                  thread => $self->session_id,
-                                  body   => $json );
+    my $recipient = $self->remote_id;
+
+    if ($self->endpoint == CLIENT and $self->state != CONNECTED) {
+        # Send new requests to our router
+        my $conf = OpenSRF::Utils::Config->current;
+        my $domain = $conf->bootstrap->domain;
+        $recipient = "opensrf:router:$domain";
+    }
+
+    $logger->internal("AppSession sending doc to=$recipient: $json");
+
+    $self->{peer_handle}->send_to( 
+        $recipient,
+        to     => $self->remote_id,
+        thread => $self->session_id,
+        body   => $json
+    );
 
        if( $disconnect) {
                $self->state( DISCONNECTED );
index 6f3981f..269ee1d 100644 (file)
@@ -21,7 +21,7 @@ use OpenSRF::Utils::Config;
 use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Logger qw($logger);
-use OpenSRF::Transport::SlimJabber::Client;
+use OpenSRF::Transport::Redis::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
@@ -132,7 +132,7 @@ sub handle_sighup {
 }
 
 # ----------------------------------------------------------------
-# Waits on the jabber socket for inbound data from the router.
+# Waits on the redis socket for inbound data from the router.
 # Each new message is passed off to a child process for handling.
 # At regular intervals, wake up for min/max spare child maintenance
 # ----------------------------------------------------------------
@@ -259,30 +259,12 @@ sub kill_child {
 }
 
 # ----------------------------------------------------------------
-# Jabber connection inbound message arrive on.
+# Redis connection inbound message arrive on.
 # ----------------------------------------------------------------
 sub build_osrf_handle {
     my $self = shift;
-
-    my $conf = OpenSRF::Utils::Config->current;
-    my $username = $conf->bootstrap->username;
-    my $password = $conf->bootstrap->passwd;
-    my $domain = $conf->bootstrap->domain;
-    my $port = $conf->bootstrap->port;
-    my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
-
-    $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
-
     $self->{osrf_handle} =
-        OpenSRF::Transport::SlimJabber::Client->new(
-            username => $username,
-            resource => $resource,
-            password => $password,
-            host => $domain,
-            port => $port,
-        );
-
-    $self->{osrf_handle}->initialize;
+        OpenSRF::Transport::Redis::Client->new($self->{service});
 }
 
 
@@ -291,11 +273,11 @@ sub build_osrf_handle {
 # ----------------------------------------------------------------
 sub write_child {
     my($self, $child, $msg) = @_;
-    my $xml = encode_utf8(decode_utf8($msg->to_xml));
+    my $json = $msg->to_json;
 
     # tell the child how much data to expect, minus the header
     my $write_size;
-    {use bytes; $write_size = length($xml)}
+    {use bytes; $write_size = length($json)}
     $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
 
     for (0..2) {
@@ -312,12 +294,12 @@ sub write_child {
         # so the lack of a pid means the child is dead.
         if (!$child->{pid}) {
             $logger->error("server: child is dead in write_child(). ".
-                "unable to send message: $xml");
+                "unable to send message: $json");
             return; # avoid syswrite crash
         }
 
         # send message to child data pipe
-        syswrite($child->{pipe_to_child}, $write_size . $xml);
+        syswrite($child->{pipe_to_child}, $write_size . $json);
 
         last unless $self->{sig_pipe};
         $logger->error("server: got SIGPIPE writing to $child, retrying...");
@@ -533,11 +515,11 @@ sub register_routers {
 
                 my $name = $router->{name};
                 my $domain = $router->{domain};
-                push(@targets, "$name\@$domain/router");
+                push(@targets, "opensrf:router:$domain");
             }
 
         } else {
-            push(@targets, "$router_name\@$router/router");
+            push(@targets, "opensrf:router:$router");
         }
     }
 
@@ -545,7 +527,7 @@ sub register_routers {
         $logger->info("server: registering with router $_");
         $self->{osrf_handle}->send(
             to => $_,
-            body => 'registering',
+            body => '"registering"',
             router_command => 'register',
             router_class => $self->{service}
         );
@@ -566,7 +548,7 @@ sub unregister_routers {
         $logger->info("server: disconnecting from router $router");
         $self->{osrf_handle}->send(
             to => $router,
-            body => "unregistering",
+            body => '"unregistering"',
             router_command => "unregister",
             router_class => $self->{service}
         );
@@ -580,7 +562,7 @@ use warnings;
 use OpenSRF::Transport;
 use OpenSRF::Application;
 use OpenSRF::Transport::PeerHandle;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Transport::Redis::Message;
 use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
@@ -613,7 +595,7 @@ sub set_block {
 }
 
 # ----------------------------------------------------------------
-# Connects to Jabber and runs the application child_init
+# Connects to the bus and runs the application child_init
 # ----------------------------------------------------------------
 sub init {
     my $self = shift;
@@ -650,7 +632,7 @@ sub run {
 
         my $session = OpenSRF::Transport->handler(
             $self->{parent}->{service},
-            OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+            OpenSRF::Transport::Redis::Message->new(json => $data)
         );
 
         my $recycle = $self->keepalive_loop($session);
index c9534dc..88863a2 100644 (file)
@@ -32,8 +32,7 @@ sub load_bootstrap_config {
 
     OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file);
     OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']);
-    OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper");
-    OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection");
+    OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::Redis::PeerConnection");
     OpenSRF::Application->server_class('client');
     # Read in a shared portion of the config file
     # for later use in log parameter redaction
@@ -65,11 +64,9 @@ sub bootstrap_client {
     $bootstrap_config_file = 
         $params{config_file} || $bootstrap_config_file;
 
-    my $app = $params{client_name} || "client";
-
     load_bootstrap_config();
     OpenSRF::Utils::Logger::set_config();
-    OpenSRF::Transport::PeerHandle->construct($app);
+    OpenSRF::Transport::PeerHandle->construct;
 }
 
 sub connected {
index 5aeff4d..a222c04 100644 (file)
@@ -7,7 +7,6 @@ use OpenSRF::Utils::JSON;
 use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
 use OpenSRF::EX qw/:try/;
-use OpenSRF::Transport::SlimJabber::MessageWrapper;
 
 #------------------ 
 # --- These must be implemented by all Transport subclasses
@@ -34,32 +33,9 @@ sub get_msg_envelope { shift()->alert_abstract(); }
 our $message_envelope;
 my $logger = "OpenSRF::Utils::Logger"; 
 
-
-
-=head2 message_envelope( [$envelope] );
-
-Sets the message envelope class that will allow us to extract
-information from the messages we receive from the low 
-level transport
-
-=cut
-
-sub message_envelope {
-       my( $class, $envelope ) = @_;
-       if( $envelope ) {
-               $message_envelope = $envelope;
-               $envelope->use;
-               if( $@ ) {
-                       $logger->error( 
-                                       "Error loading message_envelope: $envelope -> $@", ERROR);
-               }
-       }
-       return $message_envelope;
-}
-
 =head2 handler( $data )
 
-Creates a new MessageWrapper, extracts the remote_id, session_id, and message body
+Creates a new Message, extracts the remote_id, session_id, and message body
 from the message.  Then, creates or retrieves the AppSession object with the session_id and remote_id. 
 Finally, creates the message document from the body of the message and calls
 the handler method on the message document.
@@ -68,17 +44,16 @@ the handler method on the message document.
 
 sub handler {
        my $start_time = time();
-       my( $class, $service, $data ) = @_;
-
-       $logger->transport( "Transport handler() received $data", INTERNAL );
+       my ($class, $service, $msg) = @_;
 
-       my $remote_id   = $data->from;
-       my $sess_id     = $data->thread;
-       my $body        = $data->body;
-       my $type        = $data->type;
+       my $remote_id = $msg->from;
+       my $sess_id     = $msg->thread;
+       my $body = $msg->body;
+       my $type = $msg->type;
 
-       $logger->set_osrf_xid($data->osrf_xid);
+    $logger->internal("Transport:handler() received message with thread: $sess_id");
 
+       $logger->set_osrf_xid($msg->osrf_xid);
 
        if (defined($type) and $type eq 'error') {
                throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
@@ -104,8 +79,10 @@ sub handler {
         }
        } 
 
+    $logger->internal(
+        "Building app session with ses=$sess_id remote=$remote_id service=$service");
+
        # Retrieve or build the app_session as appropriate (server_build decides which to do)
-       $logger->transport( "AppSession is valid or does not exist yet", INTERNAL );
        $app_session = OpenSRF::AppSession->server_build( $sess_id, $remote_id, $service );
 
        if( ! $app_session ) {
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm
new file mode 100644 (file)
index 0000000..f74cad8
--- /dev/null
@@ -0,0 +1,202 @@
+package OpenSRF::Transport::Redis::BusConnection;
+use strict;
+use warnings;
+use Redis;
+use Net::Domain qw/hostfqdn/;
+use OpenSRF::Utils::Logger qw/$logger/;
+
+# domain doubles as the host of the Redis instance.
+sub new {
+    my ($class, $domain, $port, $username, $password, $max_queue) = @_;
+
+    $logger->debug("Creating new bus connection $domain:$port user=$username");
+
+    my $self = {
+        domain => $domain || 'localhost',
+        port => $port || 6379,
+        username => $username,
+        password => $password,
+        max_queue => $max_queue
+    };
+
+    return bless($self, $class);
+}
+
+sub redis {
+    my $self = shift;
+    return $self->{redis};
+}
+
+sub connected {
+    my $self = shift;
+    return $self->redis ? 1 : 0;
+}
+
+sub domain {
+    my $self = shift;
+    return $self->{domain};
+}
+
+sub set_address {
+    my ($self) = @_;
+
+    my $address = sprintf(
+        "opensrf:client:%s:%s:$$:%s", $self->{domain}, hostfqdn(), int(rand(10_000)));
+
+    $self->{address} = $address;
+}
+
+sub address {
+    my $self = shift;
+    return $self->{address};
+}
+
+sub connect {
+    my $self = shift;
+
+    return 1 if $self->redis;
+
+    my $domain = $self->{domain};
+    my $port = $self->{port};
+    my $username = $self->{username}; 
+    my $password = $self->{password}; 
+    my $address = $self->{address};
+
+    $logger->debug("Redis client connecting: ".
+        "domain=$domain port=$port username=$username address=$address");
+
+    # On disconnect, try to reconnect every 100ms up to 60 seconds.
+    my @connect_args = (
+        server => "$domain:$port",
+        reconnect => 60, 
+        every => 100_000
+    );
+
+    $logger->debug("Connecting to bus: @connect_args");
+
+    unless ($self->{redis} = Redis->new(@connect_args)) {
+        die "Could not connect to Redis bus with @connect_args\n";
+    }
+
+    unless ($self->redis->auth($username, $password) eq 'OK') {
+        die "Cannot authenticate with Redis instance user=$username\n";
+    }
+
+    $logger->debug("Auth'ed with Redis as $username OK : address=$address");
+
+    # Each bus connection has its own stream / group for receiving
+    # direct messages.  These streams/groups should not pre-exist.
+
+    $self->redis->xgroup(   
+        'create',
+        $address,
+        $address,
+        '$',            # only receive new messages
+        'mkstream'      # create this stream if it's not there.
+    );
+
+    return $self;
+}
+
+# Set del_stream to remove the stream and any attached consumer groups.
+sub disconnect {
+    my ($self, $del_stream) = @_;
+
+    return unless $self->redis;
+
+    $self->redis->del($self->address) if $del_stream;
+
+    $self->redis->quit;
+
+    delete $self->{redis};
+}
+
+sub send {
+    my ($self, $dest_stream, $msg_json) = @_;
+    
+    $logger->internal("send(): to=$dest_stream : $msg_json");
+
+    my @params = (
+        $dest_stream,
+        'NOMKSTREAM',
+        'MAXLEN', 
+        '~',                        # maxlen-ish
+        $self->{max_queue},
+        '*',                        # let Redis generate the ID
+        'message',                  # gotta call it something 
+        $msg_json
+    );
+
+    eval { $self->redis->xadd(@params) };
+
+    if ($@) {
+        $logger->error("XADD error: $@ : @params");
+    }
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+#
+# $dest_stream defaults to our bus address.  Otherwise, it would be
+# the service-level address.
+sub recv {
+    my ($self, $timeout, $dest_stream) = @_;
+    $dest_stream ||= $self->address;
+
+    $logger->debug("Waiting for content at: $dest_stream");
+
+    my @block;
+    if ($timeout) {
+        # 0 means block indefinitely in Redis
+        $timeout = 0 if $timeout == -1;
+        $timeout *= 1000; # milliseconds
+        @block = (BLOCK => $timeout);
+    }
+
+    my @params = (
+        GROUP => $dest_stream,
+        $self->address,
+        COUNT => 1,
+        @block,
+        'NOACK',
+        STREAMS => $dest_stream,
+        '>' # new messages only
+    );      
+
+    my $packet;
+    eval {$packet = $self->redis->xreadgroup(@params) };
+
+    if ($@) {
+        $logger->error("Redis XREADGROUP error: $@ : @params");
+        return undef;
+    }
+
+    # Timed out waiting for data.
+    return undef unless defined $packet;
+
+    # TODO make this more self-documenting.  also, too brittle?
+    # Also note at some point we may need to return info about the
+    # recipient stream to the caller in case we are listening
+    # on multiple streams.
+    my $container = $packet->[0]->[1]->[0];
+    my $msg_id = $container->[0];
+    my $json = $container->[1]->[1];
+
+    $logger->internal("recv() $json");
+
+    return {
+        msg_json => $json,
+        msg_id => $msg_id
+    };
+}
+
+sub flush_socket {
+    my $self = shift;
+    # Remove all messages from my address
+    $self->redis->xtrim($self->address, 'MAXLEN', 0);
+    return 1;
+}
+
+1;
+
+
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm
new file mode 100644 (file)
index 0000000..c7c2835
--- /dev/null
@@ -0,0 +1,266 @@
+package OpenSRF::Transport::Redis::Client;
+use strict;
+use warnings;
+use Redis;
+use Time::HiRes q/time/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport;
+use OpenSRF::Transport::Redis::Message;
+use OpenSRF::Transport::Redis::BusConnection;
+
+# Map of bus domain names to bus connections.
+my %connections;
+
+# There will only be one Client per process, but each client may
+# have multiple connections.
+my $_singleton;
+sub retrieve { return $_singleton; }
+
+sub new {
+    my ($class, $service, $no_cache) = @_;
+
+    return $_singleton if $_singleton && !$no_cache;
+
+    my $self = {service => $service};
+
+    bless($self, $class);
+
+    my $conf = OpenSRF::Utils::Config->current;
+    my $domain = $conf->bootstrap->domain;
+
+    # Create a connection for our primary node.
+    $self->add_connection($domain);
+    $self->{primary_domain} = $domain;
+
+    if ($service) {
+        # If we're a service, this is where we listen for service-level requests.
+        $self->{service_address} = "opensrf:service:$service";
+        $self->create_service_stream;
+    }
+
+    $_singleton = $self unless $no_cache;
+
+    return $self;
+}
+
+sub reset {                                                                    
+    return unless $_singleton;
+    $logger->debug("Redis client disconnecting on reset()");
+    $_singleton->disconnect;
+    $_singleton = undef;
+} 
+
+sub connection_type {
+    my $self = shift;
+    return $self->{connection_type};
+}
+
+sub add_connection {
+    my ($self, $domain) = @_;
+
+    my $conf = OpenSRF::Utils::Config->current;
+
+    my $username = $conf->bootstrap->username;
+    my $password = $conf->bootstrap->passwd;
+    my $port = $conf->bootstrap->port;
+    my $max_queue = 1024; # TODO
+
+    # Assumes other connection parameters are the same across
+    # Redis instances, apart from the hostname.
+    my $connection = OpenSRF::Transport::Redis::BusConnection->new(
+        $domain, $port, $username, $password, $max_queue
+    );
+
+    $connection->set_address();
+    $connections{$domain} = $connection;
+    
+    $connection->connect;
+
+    return $connection;
+}
+
+sub get_connection {
+    my ($self, $domain) = @_;
+
+    my $con = $connections{$domain};
+
+    return $con if $con;
+
+    eval { $con = $self->add_connection($domain) };
+
+    if ($@) {
+        $logger->error("Could not connect to bus on node: $domain : $@");
+        return undef;
+    }
+
+    return $con;
+}
+
+# Contains a value if this is a service client.
+# Undef for standalone clients.
+sub service {
+    my $self = shift;
+    return $self->{service};
+}
+
+# Contains a value if this is a service client.
+# Undef for standalone clients.
+sub service_address {
+    my $self = shift;
+    return $self->{service_address};
+}
+
+sub primary_domain {
+    my $self = shift;
+    return $self->{primary_domain};
+}
+
+sub primary_connection {
+    my $self = shift;
+    return $connections{$self->primary_domain};
+}
+
+sub disconnect {
+    my ($self, $domain) = @_;
+
+    for my $domain (keys %connections) {
+        my $con = $connections{$domain};
+        $con->disconnect($self->primary_domain eq $domain);
+        delete $connections{$domain};
+    }
+
+    $_singleton = undef;
+}
+
+sub connected {
+    my $self = shift;
+    return $self->primary_connection && $self->primary_connection->connected;
+}
+
+sub tcp_connected {
+    my $self = shift;
+    return $self->connected;
+}
+
+sub create_service_stream {
+    my $self = shift;
+
+    eval { 
+        # This gets mad when a stream / group already exists, 
+        # but it's conceivable that it's already been created.
+
+        $self->primary_connection->redis->xgroup(   
+            'create',
+            $self->service_address, # stream name
+            $self->service_address, # group name
+            '$',            # only receive new messages
+            'mkstream'      # create this stream if it's not there.
+        );
+    };
+
+    if ($@) {
+        $logger->debug("BUSYGROUP is OK => : $@");
+    }
+}
+
+# Send a message to $recipient regardless of what's in the 'to'
+# field of the message.
+sub send_to {
+    my ($self, $recipient, @msg_parts) = @_;
+
+    my $msg = OpenSRF::Transport::Redis::Message->new(@msg_parts);
+
+    $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body));
+
+    $msg->osrf_xid($logger->get_osrf_xid);
+    $msg->from($self->primary_connection->address);
+
+    my $msg_json = $msg->to_json;
+    my $con = $self->primary_connection;
+
+    if ($recipient =~ /^opensrf:client/o) {
+        # Clients may be lurking on remote nodes.
+        # Make sure we have a connection to said node.
+
+        # opensrf:client:domain:...
+        my (undef, undef, $domain) = split(/:/, $recipient);
+
+        my $con = $self->get_connection($domain);
+        if (!$con) {
+            $logger->error("Cannot send message to node $domain: $msg_json");
+            return;
+        }
+    }
+
+    $logger->internal("send(): recipient=$recipient : $msg_json");
+
+    $con->send($recipient, $msg_json);
+}
+
+sub send {
+    my $self = shift;
+    my %msg_parts = @_;
+    return $self->send_to($msg_parts{to}, %msg_parts);
+}
+
+sub process {
+    my ($self, $timeout, $for_service) = @_;
+
+    $timeout ||= 0;
+
+    # Redis does not support fractional timeouts.
+    $timeout = 1 if ($timeout > 0 && $timeout < 1);
+
+    $timeout = int($timeout);
+
+    if (!$self->connected) {
+        # We can't do anything without a primary bus connection.
+        # Sleep a short time to avoid die/fork storms, then
+        # get outta here.
+        $logger->error("We have no primary bus connection");
+        sleep 5;
+        die "Exiting on lack of primary bus connection";
+    }
+
+    return $self->recv($timeout, $for_service);
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+sub recv {
+    my ($self, $timeout, $for_service) = @_;
+
+    my $dest_stream = $for_service ? $self->{service_address} : undef;
+
+    my $resp = $self->primary_connection->recv($timeout, $dest_stream);
+
+    return undef unless $resp;
+
+    my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json});
+
+    return undef unless $msg;
+
+    $msg->msg_id($resp->{msg_id});
+
+    $logger->internal("recv()'ed thread=" . $msg->thread);
+
+    # The message body is doubly encoded as JSON.
+    $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body));
+
+    return $msg;
+}
+
+
+sub flush_socket {
+    my $self = shift;
+    # Remove all messages from our personal stream
+    if (my $con = $self->primary_connection) {
+        $con->redis->xtrim($con->address, 'MAXLEN', 0);
+    }
+    return 1;
+}
+
+1;
+
+
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm
new file mode 100644 (file)
index 0000000..a017126
--- /dev/null
@@ -0,0 +1,129 @@
+package OpenSRF::Transport::Redis::Message;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+
+sub new {
+    my ($class, %args) = @_;
+    my $self = bless({}, $class);
+
+    if ($args{json}) {
+        $self->from_json($args{json});
+
+    } else {
+        $self->{to} = $args{to} || '';
+        $self->{from} = $args{from} || '';
+        $self->{thread} = $args{thread} || '';
+        $self->{body} = $args{body} || '';
+        $self->{osrf_xid} = $args{osrf_xid} || '';
+        $self->{msg_id} = $args{msg_id} || '';
+        $self->{router_command} = $args{router_command} || '';
+        $self->{router_class} = $args{router_class} || '';
+        $self->{router_reply} = $args{router_reply} || '';
+    }
+
+    return $self;
+}
+
+sub to {
+    my($self, $to) = @_;
+    $self->{to} = $to if defined $to;
+    return $self->{to};
+}
+sub from {
+    my($self, $from) = @_;
+    $self->{from} = $from if defined $from;
+    return $self->{from};
+}
+sub thread {
+    my($self, $thread) = @_;
+    $self->{thread} = $thread if defined $thread;
+    return $self->{thread};
+}
+sub body {
+    my($self, $body) = @_;
+    $self->{body} = $body if defined $body;
+    return $self->{body};
+}
+
+sub status {
+    my($self, $status) = @_;
+    $self->{status} = $status if defined $status;
+    return $self->{status};
+}
+sub type {
+    my($self, $type) = @_;
+    $self->{type} = $type if defined $type;
+    return $self->{type};
+}
+
+sub err_type {}
+sub err_code {}
+
+sub osrf_xid {
+    my($self, $osrf_xid) = @_;
+    $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+    return $self->{osrf_xid};
+}
+
+sub msg_id {
+    my($self, $msg_id) = @_;
+    $self->{msg_id} = $msg_id if defined $msg_id;
+    return $self->{msg_id};
+}
+
+sub router_command {
+    my($self, $router_command) = @_;
+    $self->{router_command} = $router_command if defined $router_command;
+    return $self->{router_command};
+}
+
+sub router_class {
+    my($self, $router_class) = @_;
+    $self->{router_class} = $router_class if defined $router_class;
+    return $self->{router_class};
+}
+
+sub router_reply {
+    my($self, $router_reply) = @_;
+    $self->{router_reply} = $router_reply if defined $router_reply;
+    return $self->{router_reply};
+}
+
+sub to_json {
+    my $self = shift;
+
+    my $hash = {
+        to => $self->{to},
+        from => $self->{from},
+        thread => $self->{thread},
+        body => $self->{body}
+    };
+
+    # Some values are optional.
+    # Avoid cluttering the JSON with undef values.
+    for my $key (qw/osrf_xid router_command router_class router_reply/) {
+        $hash->{$key} = $self->{$key} if defined $self->{$key} && $self->{$key} ne '';
+    }
+
+    return OpenSRF::Utils::JSON->perl2JSON($hash);
+}
+
+sub from_json {
+    my $self = shift;
+    my $json = shift;
+    my $hash;
+
+    eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); };
+
+    if ($@) {
+        $logger->error("Redis::Message received invalid JSON: $@ : $json");
+        return undef;
+    }
+
+    $self->{$_} = $hash->{$_} for keys %$hash;
+}
+
+1;
diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm
new file mode 100644 (file)
index 0000000..8aac820
--- /dev/null
@@ -0,0 +1,21 @@
+package OpenSRF::Transport::Redis::PeerConnection;
+use strict;
+use warnings;
+use OpenSRF::Transport::Redis::Client;
+
+use base qw/OpenSRF::Transport::Redis::Client/;
+
+sub construct {
+    my ($class, $service, $no_cache) = @_;
+    return __PACKAGE__->SUPER::new($service, $no_cache);
+}
+
+sub process {
+       my $self = shift;
+       my $msg = $self->SUPER::process(@_);
+       return 0 unless $msg;
+       return OpenSRF::Transport->handler($self->service, $msg);
+}
+
+
+1;
index a1742e8..aa460b5 100644 (file)
@@ -11,6 +11,4 @@ classes for handling transport layer messaging
 
 sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; }
 
-sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; }
-
 1;
index 0b83782..d5c9d77 100644 (file)
 #include <string.h>
 #include <signal.h>
 #include <opensrf/utils.h>
+#include <opensrf/osrfConfig.h>
 #include <opensrf/osrf_hash.h>
 #include <opensrf/transport_client.h>
 #include <opensrf/osrf_message.h>
 #include <opensrf/osrf_app_session.h>
 #include <opensrf/log.h>
+#include <opensrf/string_array.h>
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 256
 // opportunity, at which point force-close the connection.
 #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
 
-// Incremented with every REQUEST, decremented with every COMPLETE.
-static int requests_in_flight = 0;
-
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml";
 static char* config_ctxt = "gateway";
-static char* osrf_router = NULL;
 static char* osrf_domain = NULL;
 
 // Cache of opensrf thread strings and back-end receipients.
@@ -92,6 +90,9 @@ static transport_client* osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE];
 // Websocket client IP address (for logging)
 static char* client_ip = NULL;
+// Tracks threads that have active requests in flight.
+// This covers all request types regardless of connected-ness.
+static osrfStringArray* active_threads = NULL;
 
 static void rebuild_stdin_buffer();
 static void child_init(int argc, char* argv[]);
@@ -133,15 +134,17 @@ int main(int argc, char* argv[]) {
     // (replies returning to the websocket client).
     fd_set fds;
     int stdin_no = fileno(stdin);
-    int osrf_no = osrf_handle->session->sock_id;
-    int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    //int osrf_no = osrf_handle->session->sock_id;
+    //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    int maxfd = stdin_no;
     int sel_resp;
     int shutdown_stat;
+    struct timeval tv;
 
     while (1) {
 
         FD_ZERO(&fds);
-        FD_SET(osrf_no, &fds);
+        //FD_SET(osrf_no, &fds);
         FD_SET(stdin_no, &fds);
 
         if (shutdown_requested) {
@@ -155,9 +158,20 @@ int main(int argc, char* argv[]) {
 
         } else {
 
-            // Wait indefinitely for activity to process.
-            // This will be interrupted during a shutdown request signal.
-            sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+            if (active_threads->size > 0) {
+                tv.tv_usec = 0;
+                tv.tv_sec = 0;
+                
+                // Do a non-blocking check for inbound requests while
+                // we wait for more osrf data to be returned.
+                sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
+
+            } else {
+
+                // No osrf responses pending.  Wait indefinitely.
+                // This will be interrupted during a shutdown request signal.
+                sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+            }
         }
 
         if (sel_resp < 0) { // error
@@ -172,17 +186,18 @@ int main(int argc, char* argv[]) {
                 "WS select() failed with [%s]. Exiting", strerror(errno));
 
             shut_it_down(1);
-        }
 
-        if (sel_resp > 0) {
+        } else if (sel_resp > 0) {
 
             if (FD_ISSET(stdin_no, &fds)) {
                 read_from_stdin();
-            }
-
-            if (FD_ISSET(osrf_no, &fds)) {
                 read_from_osrf();
             }
+
+        } else if (active_threads->size > 0) {
+            // Nothing pulled from the websocket, but we still have
+            // active osrf request.  See if any new responses have arrived.
+            read_from_osrf();
         }
 
         if (shutdown_requested) {
@@ -213,14 +228,13 @@ static int can_shutdown_gracefully() {
         return -1;
     }
 
-    unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
-    if (active_sessions == 0 && requests_in_flight == 0) {
+    if (active_threads->size == 0) {
         osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
         return 1;
     }
 
     osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " 
-        "sessions=%d requests=%d", active_sessions, requests_in_flight);
+        "active threeds=%d", active_threads);
 
     return 0;
 }
@@ -235,6 +249,7 @@ static void rebuild_stdin_buffer() {
 }
 
 static int shut_it_down(int stat) {
+    osrfStringArrayFree(active_threads);
     osrfHashFree(stateful_session_cache);
     buffer_free(stdin_buf);
     osrf_system_shutdown(); // clean XMPP disconnect
@@ -256,15 +271,16 @@ static void child_init(int argc, char* argv[]) {
         shut_it_down(1);
     }
 
-       osrf_handle = osrfSystemGetTransportClient();
-       osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
+    osrf_handle = osrfSystemGetTransportClient();
+    osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
 
-    osrf_router = osrfConfigGetValue(NULL, "/router_name");
     osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
     stateful_session_cache = osrfNewHash();
     osrfHashSetCallback(stateful_session_cache, release_hash_string);
 
+    active_threads = osrfNewStringArray(16);
+
     client_ip = getenv("REMOTE_ADDR");
     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
 }
@@ -424,8 +440,8 @@ static void relay_stdin_message(const char* msg_string) {
     if (!recipient) {
 
         if (service) {
-            int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
-                "%s@%s/%s", osrf_router, osrf_domain, service);
+            int size = snprintf(recipient_buf, 
+                RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
             recipient_buf[size] = '\0';
             recipient = recipient_buf;
 
@@ -494,11 +510,16 @@ static char* extract_inbound_messages(
         switch (msg->m_type) {
 
             case CONNECT:
+                if (!osrfStringArrayContains(active_threads, thread)) {
+                    osrfStringArrayAdd(active_threads, thread);
+                }
                 break;
 
             case REQUEST:
                 log_request(service, msg);
-                requests_in_flight++;
+                if (!osrfStringArrayContains(active_threads, thread)) {
+                    osrfStringArrayAdd(active_threads, thread);
+                }
                 break;
 
             case DISCONNECT:
@@ -568,8 +589,7 @@ static void read_from_osrf() {
     transport_message* tmsg = NULL;
 
     // Double check the socket connection before continuing.
-    if (!client_connected(osrf_handle) ||
-        !socket_connected(osrf_handle->session->sock_id)) {
+    if (!client_connected(osrf_handle)) {
         osrfLogWarning(OSRF_LOG_MARK,
             "WS: Jabber socket disconnected, exiting");
         shut_it_down(1);
@@ -579,9 +599,23 @@ static void read_from_osrf() {
     // read.  This means we can't return to the main select() loop after
     // each message, because any subsequent messages will get stuck in
     // the opensrf receive queue. Process all available messages.
-    while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+
+    // As long as any active requests are in flight, wait up to one
+    // second to receive a response.  Then return to inspect stdin
+    // to see if there are any requests waiting we can push through.
+    // Then come back here.
+    while (1) {
+        int timeout = active_threads->size > 0 ? 1 : 0;
+
+        tmsg = client_recv(osrf_handle, timeout);
+
+        if (!tmsg) { break; }
+
         read_one_osrf_message(tmsg);
-        message_free(tmsg);
+
+        osrfLogDebug(OSRF_LOG_MARK,
+            "WS relaying message to STDOUT thread=%s, recipient=%s",
+             tmsg->thread, tmsg->recipient);
     }
 }
 
@@ -637,14 +671,17 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
             } else {
 
-                // connection timed out; clear the cached recipient
-                if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+                // Any error condition ends the conversation.
+                if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) {
                     osrfHashRemove(stateful_session_cache, tmsg->thread);
+                    osrfStringArrayRemove(active_threads, tmsg->thread);
 
                 } else {
 
                     if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
-                        requests_in_flight--;
+                        osrfLogInternal(OSRF_LOG_MARK, 
+                            "WS Marking request complete for thread %s", tmsg->thread);
+                        osrfStringArrayRemove(active_threads, tmsg->thread);
                     }
                 }
             }