From 3ae81cb455bdc04ed3c9813f56b77046123e207e Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Thu, 17 Nov 2022 18:09:16 -0500 Subject: [PATCH] LP2017941 Perl / C Redis Signed-off-by: Bill Erickson --- Makefile.am | 4 +- bin/opensrf-perl.pl.in | 72 +++- configure.ac | 17 +- include/opensrf/osrf_message.h | 1 + include/opensrf/osrf_system.h | 12 +- include/opensrf/transport_client.h | 30 +- include/opensrf/transport_connection.h | 65 +++ include/opensrf/transport_message.h | 3 + src/Makefile.am | 2 +- src/gateway/Makefile.am | 2 +- src/libopensrf/Makefile.am | 5 +- src/libopensrf/osrf_app_session.c | 55 +-- src/libopensrf/osrf_prefork.c | 35 +- src/libopensrf/osrf_system.c | 96 ++--- src/libopensrf/transport_client.c | 445 +++++++++------------ src/libopensrf/transport_connection.c | 321 +++++++++++++++ src/libopensrf/transport_message.c | 100 +++++ src/perl/lib/OpenSRF/AppSession.pm | 35 +- src/perl/lib/OpenSRF/Server.pm | 48 +-- src/perl/lib/OpenSRF/System.pm | 7 +- src/perl/lib/OpenSRF/Transport.pm | 45 +-- .../lib/OpenSRF/Transport/Redis/BusConnection.pm | 202 ++++++++++ src/perl/lib/OpenSRF/Transport/Redis/Client.pm | 266 ++++++++++++ src/perl/lib/OpenSRF/Transport/Redis/Message.pm | 129 ++++++ .../lib/OpenSRF/Transport/Redis/PeerConnection.pm | 21 + src/perl/lib/OpenSRF/Transport/SlimJabber.pm | 2 - src/websocket-stdio/osrf-websocket-stdio.c | 99 +++-- 27 files changed, 1608 insertions(+), 511 deletions(-) create mode 100644 include/opensrf/transport_connection.h create mode 100644 src/libopensrf/transport_connection.c create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/Client.pm create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/Message.pm create mode 100644 src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm diff --git a/Makefile.am b/Makefile.am index bf1effe..b1bec86 100644 --- a/Makefile.am +++ b/Makefile.am @@ -19,11 +19,12 @@ endif export PREFIX = @prefix@ export TMP = @TMP@ export LIBXML2_HEADERS = @LIBXML2_HEADERS@ +export HIREDIS_HEADERS = @HIREDIS_HEADERS@ export APR_HEADERS = @APR_HEADERS@ export ETCDIR = @sysconfdir@ export APXS2 = @APXS2@ export APACHE2_HEADERS = @APACHE2_HEADERS@ -export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@ +export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@ export DEF_LDLIBS = -lopensrf export VAR = @localstatedir@ export PID = @localstatedir@/run/opensrf @@ -99,6 +100,7 @@ opensrfinclude_HEADERS = $(OSRFINC)/log.h \ $(OSRFINC)/transport_client.h \ $(OSRFINC)/transport_message.h \ $(OSRFINC)/transport_session.h \ + $(OSRFINC)/transport_connection.h \ $(OSRFINC)/utils.h \ $(OSRFINC)/xml_utils.h \ src/gateway/apachetools.h diff --git a/bin/opensrf-perl.pl.in b/bin/opensrf-perl.pl.in index b2bced6..1c34ad8 100755 --- a/bin/opensrf-perl.pl.in +++ b/bin/opensrf-perl.pl.in @@ -25,6 +25,7 @@ use OpenSRF::Utils::SettingsClient; use OpenSRF::Transport::Listener; use OpenSRF::Utils; use OpenSRF::Utils::Config; +use Redis; my $opt_service = undef; my $opt_config = "@CONF_DIR@/opensrf_core.xml"; @@ -62,6 +63,7 @@ my $opt_reload_all = 0; my $opt_quiet = 0; my $opt_diagnostic = 0; my $opt_ignore_orphans = 0; +my $opt_reset_message_bus = 0; my $sclient; my @perl_services; my @nonperl_services; @@ -104,8 +106,10 @@ GetOptions( 'reload' => \$opt_reload, 'reload-all' => \$opt_reload_all, 'diagnostic' => \$opt_diagnostic, + 'reset-message-bus' => \$opt_reset_message_bus, 'ignore-orphans' => \$opt_ignore_orphans -); +) || warn "\n$!\n" && exit(1); + if ($opt_localhost) { $hostname = 'localhost'; @@ -295,6 +299,7 @@ sub do_diagnostic { sub do_start_router { + return; my $pidfile = get_pid_file('router'); `opensrf_router $opt_config routers $pidfile`; @@ -328,7 +333,7 @@ sub do_init { my $apps = $sclient->config_value("activeapps", "appname"); # disconnect the top-level network handle - OpenSRF::Transport::PeerHandle->retrieve->disconnect; + OpenSRF::Transport::PeerHandle->retrieve->reset; if($apps) { $apps = [$apps] unless ref $apps; @@ -412,7 +417,7 @@ sub do_start { sub do_start_all { msg("starting router and services for $hostname"); - do_start('router'); + #do_start('router'); return do_start_services(); } @@ -569,7 +574,7 @@ sub do_stop_all { # graceful shutdown requires the presence of the router, so stop the # router last. See if it's running first to avoid unnecessary warnings. - do_stop('router', $signals[0]) if get_service_pids_from_file('router'); + #do_stop('router', $signals[0]) if get_service_pids_from_file('router'); return 1; } @@ -607,6 +612,58 @@ sub load_settings { $parser->get_server_config($conf->env->hostname); } +# Clear and reset permissions on the bus'es linked to our domains. +sub do_reset_message_bus { + + OpenSRF::Utils::Config->load(config_file => $opt_config); + my $conf = OpenSRF::Utils::Config->current; + + my $routers = $conf->bootstrap->routers; + + # TODO pull logins for all clients in the conf, including + # gateway and router. + for my $router (@{$conf->bootstrap->routers}) { + + my $domain = ref $router ? $router->{domain} : $router; + my $port = $conf->bootstrap->port; + + # This redis connection uses the "default" account, which has + # access to all actions and keys so it can act as the admin. + my @connect_args = (server => "$domain:$port"); + + my $redis = Redis->new(@connect_args) or + die "Cannot connect to Redis instance at @connect_args\n"; + + # Clear all the data + msg("Clearing all data from message bus: @connect_args"); + $redis->flushall; + + my $username = $conf->bootstrap->username; + my $password = $conf->bootstrap->passwd; + + msg("Applying bus access for $username"); + + $redis->acl('SETUSER', $username, 'reset'); + $redis->acl('SETUSER', $username, 'on', ">$password"); + + my @perms = qw/ + -@all + +xgroup + +xadd + +xreadgroup + +xtrim + +del + ~opensrf:router:* + ~opensrf:service:* + ~opensrf:client:* + /; + + $redis->acl('SETUSER', $username, @perms); + + $redis->quit; + } +} + sub msg { my $m = shift; print "* $m\n" unless $opt_quiet; @@ -753,10 +810,16 @@ sub do_help { and gracefully re-launch drone processes. The -all variant sends the signal to all services. The non-(-all) variant requires a --service. + + --reset-message-bus + Clear ALL data from the message bus, create opensrf accounts w/ + permission to access message queues. HELP exit; } +do_reset_message_bus() if $opt_reset_message_bus; + # we do not verify services for stop/signal actions, since those may # legitimately be used against services not (or no longer) configured # to run on the selected host. @@ -808,6 +871,7 @@ do_diagnostic() if $opt_diagnostic; # show help if no action was requested do_help() if $opt_help or not ( + $opt_reset_message_bus or $opt_start or $opt_start_all or $opt_start_services or diff --git a/configure.ac b/configure.ac index c037c21..4a9a1fe 100644 --- a/configure.ac +++ b/configure.ac @@ -208,6 +208,18 @@ AC_ARG_WITH([libxml], [LIBXML2_HEADERS=/usr/include/libxml2/]) AC_SUBST([LIBXML2_HEADERS]) +AC_ARG_WITH([hiredis], +[ --with-hiredis=path location of the hiredis headers (default is /usr/include/hiredis/))], +[HIREDIS_HEADERS=${withval}], +[HIREDIS_HEADERS=/usr/include/hiredis/]) +AC_SUBST([HIREDIS_HEADERS]) + +AC_ARG_WITH([fyaml], +[ --with-fyaml=path location of the fyaml headers (default is /usr/include/))], +[FYAML_HEADERS=${withval}], +[FYAML_HEADERS=/usr/include/]) +AC_SUBST([FYAML_HEADERS]) + AC_ARG_WITH([includes], [ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)], [EXTRA_USER_INCLUDES=${withval}]) @@ -274,6 +286,8 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers)) AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers)) AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers)) + AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis)) + AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml)) # Check for libmemcached and set flags accordingly PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0) AC_SUBST(memcached_CFLAGS) @@ -323,7 +337,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then src/libopensrf/Makefile src/perl/Makefile src/ports/strn_compat/Makefile - src/router/Makefile src/srfsh/Makefile src/websocket-stdio/Makefile tests/Makefile @@ -343,5 +356,7 @@ AC_MSG_RESULT([--------------------- Configuration options: ------------------- AC_MSG_RESULT(APR headers location: ${APR_HEADERS}) AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION}) AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS}) + AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS}) + AC_MSG_RESULT(libfyaml headers location: ${FYAML_HEADERS}) AC_MSG_RESULT([----------------------------------------------------------------------]) diff --git a/include/opensrf/osrf_message.h b/include/opensrf/osrf_message.h index 1785adf..874c310 100644 --- a/include/opensrf/osrf_message.h +++ b/include/opensrf/osrf_message.h @@ -53,6 +53,7 @@ extern "C" { #define OSRF_STATUS_INTERNALSERVERERROR 500 #define OSRF_STATUS_NOTIMPLEMENTED 501 +#define OSRF_STATUS_SERVICEUNAVAILABLE 503 #define OSRF_STATUS_VERSIONNOTSUPPORTED 505 diff --git a/include/opensrf/osrf_system.h b/include/opensrf/osrf_system.h index 18d8c85..cab0f8c 100644 --- a/include/opensrf/osrf_system.h +++ b/include/opensrf/osrf_system.h @@ -19,10 +19,16 @@ extern "C" { void osrfSystemSetPidFile( const char* name ); -int osrf_system_bootstrap_client( char* config_file, char* contextnode ); +int osrf_system_bootstrap_common(const char* config_file, + const char* contextnode, const char* appname, int is_service); -int osrfSystemBootstrapClientResc( const char* config_file, - const char* contextnode, const char* resource ); +int osrf_system_bootstrap_client(const char* config_file, const char* contextnode); + +int osrf_system_bootstrap_service( + const char* config_file, const char* contextnode, const char* appname); + +int osrfSystemBootstrapClientResc(const char* config_file, + const char* contextnode, const char* appname); int osrfSystemBootstrap( const char* hostname, const char* configfile, const char* contextNode ); diff --git a/include/opensrf/transport_client.h b/include/opensrf/transport_client.h index 4307af9..8d58e45 100644 --- a/include/opensrf/transport_client.h +++ b/include/opensrf/transport_client.h @@ -11,6 +11,7 @@ #include #include +#include #include #include @@ -27,20 +28,25 @@ struct message_list_struct; a Jabber ID for outgoing messages. */ struct transport_client_struct { - transport_message* msg_q_head; /**< Head of message queue */ - transport_message* msg_q_tail; /**< Tail of message queue */ - transport_session* session; /**< Manages lower-level message processing */ + char* primary_domain; + char* service; // NULL if this is a standalone client. + char* service_address; // NULL if this is a standalone client. + osrfHash* connections; + + int port; + char* username; + char* password; + transport_con* primary_connection; + int error; /**< Boolean: true if an error has occurred */ - char* host; /**< Domain name or IP address of the Jabber server */ - char* xmpp_id; /**< Jabber ID used for outgoing messages */ }; typedef struct transport_client_struct transport_client; -transport_client* client_init( const char* server, int port, const char* unix_path, int component ); +transport_client* client_init(const char* server, + int port, const char* username, const char* password); -int client_connect( transport_client* client, - const char* username, const char* password, const char* resource, - int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ); +int client_connect_as_service(transport_client* client, const char* service); +int client_connect(transport_client* client); int client_disconnect( transport_client* client ); @@ -49,10 +55,14 @@ int client_free( transport_client* client ); int client_discard( transport_client* client ); int client_send_message( transport_client* client, transport_message* msg ); +int client_send_message_to( + transport_client* client, transport_message* msg, const char* recipient); int client_connected( const transport_client* client ); -transport_message* client_recv( transport_client* client, int timeout ); +transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream); +transport_message* client_recv(transport_client* client, int timeout); +transport_message* client_recv_for_service(transport_client* client, int timeout); int client_sock_fd( transport_client* client ); diff --git a/include/opensrf/transport_connection.h b/include/opensrf/transport_connection.h new file mode 100644 index 0000000..25eef27 --- /dev/null +++ b/include/opensrf/transport_connection.h @@ -0,0 +1,65 @@ +#ifndef TRANSPORT_CONNECTION_H +#define TRANSPORT_CONNECTION_H + +/** + @file transport_client.h + @brief Header for implementation of transport_client. + + The transport_client routines provide an interface for sending and receiving Jabber + messages, one at a time. +*/ + +#include +#include +#include + +#ifdef __cplusplus +extern "C" { +#endif + +struct transport_con_struct { + char* address; + char* domain; + int max_queue; + redisContext* bus; +}; +typedef struct transport_con_struct transport_con; + +struct transport_con_msg_struct { + char* msg_id; + char* msg_json; +}; +typedef struct transport_con_msg_struct transport_con_msg; + +transport_con* transport_con_new(const char* domain); + +void transport_con_free(transport_con* con); +void transport_con_msg_free(transport_con_msg* msg); + +int transport_con_connected(transport_con* con); + +void transport_con_set_address(transport_con* con, const char* service); + +int transport_con_connect(transport_con* con, + int port, const char* username, const char* password); + +int transport_con_disconnect(transport_con* con); + +int transport_con_send(transport_con* con, const char* msg_json, const char* stream); + +transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream); + +transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream); + +void transport_con_flush_socket(transport_con* con); + +int handle_redis_error(redisReply *reply, const char* command, ...); + +int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok); + + +#ifdef __cplusplus +} +#endif + +#endif diff --git a/include/opensrf/transport_message.h b/include/opensrf/transport_message.h index fb43f92..65a8447 100644 --- a/include/opensrf/transport_message.h +++ b/include/opensrf/transport_message.h @@ -54,6 +54,7 @@ struct transport_message_struct { int error_code; /**< Value of the "code" attribute of <error>. */ int broadcast; /**< Value of the "broadcast" attribute in the message element. */ char* msg_xml; /**< The entire message as XML, complete with entity encoding. */ + char* msg_json; /**< The entire message as JSON*/ struct transport_message_struct* next; }; typedef struct transport_message_struct transport_message; @@ -62,6 +63,7 @@ transport_message* message_init( const char* body, const char* subject, const char* thread, const char* recipient, const char* sender ); transport_message* new_message_from_xml( const char* msg_xml ); +transport_message* new_message_from_json( const char* msg_json ); void message_set_router_info( transport_message* msg, const char* router_from, const char* router_to, const char* router_class, const char* router_command, @@ -70,6 +72,7 @@ void message_set_router_info( transport_message* msg, const char* router_from, void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ); int message_prepare_xml( transport_message* msg ); +int message_prepare_json( transport_message* msg ); int message_free( transport_message* msg ); diff --git a/src/Makefile.am b/src/Makefile.am index 0f90f2c..370b6b2 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -31,7 +31,7 @@ js_SCRIPTS = javascript/DojoSRF.js javascript/JSON_v1.js javascript/md5.js javas endif if BUILDCORE -MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio +MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl bin_SCRIPTS = @top_srcdir@/bin/osrf_config dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example diff --git a/src/gateway/Makefile.am b/src/gateway/Makefile.am index 666fb06..eeffc75 100644 --- a/src/gateway/Makefile.am +++ b/src/gateway/Makefile.am @@ -18,7 +18,7 @@ endif EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \ @srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c -AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) +AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR` diff --git a/src/libopensrf/Makefile.am b/src/libopensrf/Makefile.am index fd3729b..34dbc5a 100644 --- a/src/libopensrf/Makefile.am +++ b/src/libopensrf/Makefile.am @@ -14,7 +14,7 @@ AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir) -LDADD = -lopensrf +LDADD = -lopensrf -lfyaml DISTCLEANFILES = Makefile.in Makefile @@ -29,7 +29,6 @@ TARGS = osrf_message.c \ osrfConfig.c \ osrf_application.c \ osrf_cache.c \ - osrf_transgroup.c \ osrf_list.c \ osrf_hash.c \ osrf_utf8.c \ @@ -37,6 +36,7 @@ TARGS = osrf_message.c \ transport_message.c\ transport_session.c\ transport_client.c\ + transport_connection.c\ md5.c\ log.c\ utils.c\ @@ -47,6 +47,7 @@ TARGS = osrf_message.c \ TARGS_HEADS = $(OSRF_INC)/transport_message.h \ $(OSRF_INC)/transport_session.h \ $(OSRF_INC)/transport_client.h \ + $(OSRF_INC)/transport_connection.h \ $(OSRF_INC)/osrf_message.h \ $(OSRF_INC)/osrf_app_session.h \ $(OSRF_INC)/osrf_stack.h \ diff --git a/src/libopensrf/osrf_app_session.c b/src/libopensrf/osrf_app_session.c index df83e3e..0268c26 100644 --- a/src/libopensrf/osrf_app_session.c +++ b/src/libopensrf/osrf_app_session.c @@ -541,46 +541,11 @@ osrfAppSession* osrfAppSessionClientInit( const char* remote_service ) { return NULL; } - // Get a list of domain names from the config settings; - // ignore all but the first one in the list. - osrfStringArray* arr = osrfNewStringArray(8); - osrfConfigGetValueList(NULL, arr, "/domain"); - const char* domain = osrfStringArrayGetString(arr, 0); - if (!domain) { - osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file"); - free( session ); - osrfStringArrayFree(arr); - return NULL; - } - - // Get a router name from the config settings. - char* router_name = osrfConfigGetValue(NULL, "/router_name"); - if (!router_name) { - osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file"); - free( session ); - osrfStringArrayFree(arr); - return NULL; - } - - char target_buf[512]; - target_buf[ 0 ] = '\0'; + growing_buffer *buf = buffer_init(32); + buffer_add(buf, "opensrf:service:"); + buffer_add(buf, remote_service); - // Using the router name, domain, and service name, - // build a Jabber ID for addressing the service. - int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s", - router_name ? router_name : "(null)", - domain ? domain : "(null)", - remote_service ? remote_service : "(null)" ); - osrfStringArrayFree(arr); - free(router_name); - - if( len >= sizeof( target_buf ) ) { - osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id"); - free( session ); - return NULL; - } - - session->remote_id = strdup(target_buf); + session->remote_id = buffer_release(buf); session->orig_remote_id = strdup(session->remote_id); session->remote_service = strdup(remote_service); session->session_locale = NULL; @@ -1168,11 +1133,21 @@ int osrfSendChunkedResult( about it. */ int osrfSendTransportPayload( osrfAppSession* session, const char* payload ) { + transport_message* t_msg = message_init( payload, "", session->session_id, session->remote_id, NULL ); message_set_osrf_xid( t_msg, osrfLogGetXid() ); - int retval = client_send_message( session->transport_handle, t_msg ); + // When sending a message to a top-level service address, retain the + // message recipient, but deliver the message to the router instead. + char buf[1024 + 1]; + char* recipient = session->remote_id; + if (strstr(recipient, "opensrf:service:")) { + snprintf(buf, 1024, "opensrf:router:%s", session->transport_handle->primary_domain); + recipient = buf; + } + + int retval = client_send_message_to(session->transport_handle, t_msg, recipient); if( retval ) { osrfLogError( OSRF_LOG_MARK, "client_send_message failed, exit()ing immediately" ); exit(99); diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index f02f635..9d764f8 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -174,17 +174,12 @@ int osrf_prefork_run( const char* appname ) { free( max_children ); /* --------------------------------------------------- */ - char* resc = va_list_to_string( "%s_listener", appname ); - // Make sure that we haven't already booted - if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) { + if (!osrf_system_bootstrap_common(NULL, "service", appname, 1)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" ); - free( resc ); return -1; } - free( resc ); - prefork_simple forker; if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) { @@ -238,20 +233,20 @@ static void osrf_prefork_send_router_registration( transport_client* client = osrfSystemGetTransportClient(); // Construct the Jabber address of the router - char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain ); + char* jid = va_list_to_string( "opensrf:router:%s", routerDomain ); // Create the registration message, and send it transport_message* msg; if (unregister) { osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid ); - msg = message_init( "unregistering", NULL, NULL, jid, NULL ); + msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL ); message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 ); } else { osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid ); - msg = message_init( "registering", NULL, NULL, jid, NULL ); + msg = message_init( "\"registering\"", NULL, NULL, jid, NULL ); message_set_router_info( msg, NULL, NULL, appname, "register", 0 ); } @@ -363,7 +358,6 @@ static int prefork_child_init_hook( prefork_child* child ) { // Connect to cache server(s). osrfSystemInitCache(); - char* resc = va_list_to_string( "%s_drone", child->appname ); // If we're a source-client, tell the logger now that we're a new process. char* isclient = osrfConfigGetValue( NULL, "/client" ); @@ -375,15 +369,12 @@ static int prefork_child_init_hook( prefork_child* child ) { // TODO: not necessary if parent disconnects first osrfSystemIgnoreTransportClient(); - // Connect to Jabber - if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) { + // Connect to the message bus + if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" ); - free( resc ); return -1; } - free( resc ); - // Dynamically call the application-specific initialization function // from a previously loaded shared library. if( ! osrfAppRunChildInit( child->appname )) { @@ -412,11 +403,13 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { transport_client* client = osrfSystemGetTransportClient(); + osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client); + // Make sure that we're still connected to Jabber; reconnect if necessary. if( !client_connected( client )) { osrfSystemIgnoreTransportClient(); osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." ); - if( !osrf_system_bootstrap_client( NULL, NULL )) { + if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) { osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client in prefork_child_process_request()" ); sleep( 1 ); @@ -424,8 +417,8 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { } } - // Construct the message from the xml. - transport_message* msg = new_message_from_xml( data ); + // Construct the message from the json + transport_message* msg = new_message_from_json( data ); // Respond to the transport message. This is where method calls are buried. osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname ); @@ -859,7 +852,7 @@ static void prefork_run( prefork_simple* forker ) { // Wait indefinitely for an input message osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); - cur_msg = client_recv( forker->connection, -1 ); + cur_msg = client_recv_for_service( forker->connection, -1 ); if( cur_msg == NULL ) { // most likely a signal was received. clean up any recently @@ -878,8 +871,8 @@ static void prefork_run( prefork_simple* forker ) { continue; } - message_prepare_xml( cur_msg ); - const char* msg_data = cur_msg->msg_xml; + message_prepare_json( cur_msg ); + const char* msg_data = cur_msg->msg_json; if( ! msg_data || ! *msg_data ) { osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); diff --git a/src/libopensrf/osrf_system.c b/src/libopensrf/osrf_system.c index 684dd0d..3a56002 100644 --- a/src/libopensrf/osrf_system.c +++ b/src/libopensrf/osrf_system.c @@ -74,8 +74,9 @@ void osrfSystemIgnoreTransportClient() { A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource. */ -int osrf_system_bootstrap_client( char* config_file, char* contextnode ) { - return osrfSystemBootstrapClientResc(config_file, contextnode, NULL); + +int osrf_system_bootstrap_client(const char* config_file, const char* contextnode) { + return osrf_system_bootstrap_common(config_file, contextnode, "client", 0); } /** @@ -185,7 +186,7 @@ int osrf_system_service_ctrl( const char* action, const char* service) { // Load the conguration, open the log, open a connection to Jabber - if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) { + if (!osrf_system_bootstrap_common(config, context, "client", 0)) { osrfLogError(OSRF_LOG_MARK, "Unable to bootstrap for host %s from configuration file %s", hostname, config); @@ -336,8 +337,19 @@ int osrf_system_service_ctrl( - Open the log. - Open a connection to Jabber. */ -int osrfSystemBootstrapClientResc( const char* config_file, - const char* contextnode, const char* resource ) { +int osrfSystemBootstrapClientResc(const char* config_file, + const char* contextnode, const char* appname) { + return osrf_system_bootstrap_common(config_file, contextnode, appname, 0); +} + +int osrf_system_bootstrap_common(const char* config_file, + const char* contextnode, const char* appname, int is_service) { + + if (contextnode == NULL) { + osrfLogError(OSRF_LOG_MARK, + "osrf_system_bootstrap_common() requires a connection type"); + return -1; + } int failure = 0; @@ -352,7 +364,7 @@ int osrfSystemBootstrapClientResc( const char* config_file, } if( config_file ) { - osrfConfig* cfg = osrfConfigInit( config_file, contextnode ); + osrfConfig* cfg = osrfConfigInit(config_file, contextnode); if(cfg) osrfConfigSetDefaultConfig(cfg); else @@ -360,14 +372,15 @@ int osrfSystemBootstrapClientResc( const char* config_file, // fetch list of configured log redaction marker strings log_protect_arr = osrfNewStringArray(8); - osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared"); - osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" ); + osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared"); + osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" ); } - char* log_file = osrfConfigGetValue( NULL, "/logfile"); - if(!log_file) { + + char* log_file = osrfConfigGetValue( NULL, "/logfile"); + if (!log_file) { fprintf(stderr, "No log file specified in configuration file %s\n", - config_file); + config_file); return -1; } @@ -378,40 +391,38 @@ int osrfSystemBootstrapClientResc( const char* config_file, char* username = osrfConfigGetValue( NULL, "/username" ); char* password = osrfConfigGetValue( NULL, "/passwd" ); char* port = osrfConfigGetValue( NULL, "/port" ); - char* unixpath = osrfConfigGetValue( NULL, "/unixpath" ); char* facility = osrfConfigGetValue( NULL, "/syslog" ); char* actlog = osrfConfigGetValue( NULL, "/actlog" ); char* logtag = osrfConfigGetValue( NULL, "/logtag" ); /* if we're a source-client, tell the logger */ - char* isclient = osrfConfigGetValue(NULL, "/client"); + char* isclient = osrfConfigGetValue(NULL, "/client"); if( isclient && !strcasecmp(isclient,"true") ) osrfLogSetIsClient(1); free(isclient); int llevel = 0; int iport = 0; - if(port) iport = atoi(port); - if(log_level) llevel = atoi(log_level); + if (port) iport = atoi(port); + if (log_level) llevel = atoi(log_level); if(!strcmp(log_file, "syslog")) { if(logtag) osrfLogSetLogTag(logtag); - osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel ); + osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel ); osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility)); if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog)); } else { - osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel ); + osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel ); osrfLogSetFile( log_file ); } - /* Get a domain, if one is specified */ const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */ - if(!domain) { + if (!domain) { fprintf(stderr, "No domain specified in configuration file %s\n", config_file); - osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n", - config_file ); + osrfLogError(OSRF_LOG_MARK, + "No domain specified in configuration file %s\n", config_file ); failure = 1; } @@ -429,51 +440,33 @@ int osrfSystemBootstrapClientResc( const char* config_file, failure = 1; } - if((iport <= 0) && !unixpath) { - fprintf(stderr, "No unixpath or valid port in configuration file %s\n", config_file); - osrfLogError( OSRF_LOG_MARK, "No unixpath or valid port in configuration file %s\n", - config_file); - failure = 1; - } - if (failure) { - osrfStringArrayFree(arr); free(log_file); free(log_level); free(username); free(password); free(port); - free(unixpath); free(facility); free(actlog); free(logtag); return 0; } - osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s", - domain, iport, unixpath ? unixpath : "(none)" ); - transport_client* client = client_init( domain, iport, unixpath, 0 ); - - char host[HOST_NAME_MAX + 1] = ""; - gethostname(host, sizeof(host) ); - host[HOST_NAME_MAX] = '\0'; + osrfLogInfo(OSRF_LOG_MARK, + "Bootstrapping system with domain %s, port %d", domain, iport); - char tbuf[32]; - tbuf[0] = '\0'; - snprintf(tbuf, 32, "%f", get_timestamp_millis()); + transport_client* client = client_init(domain, iport, username, password); - if(!resource) resource = ""; - - int len = strlen(resource) + 256; - char buf[len]; - buf[0] = '\0'; - snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() ); - - if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) { - osrfGlobalTransportClient = client; - } + if (is_service) { + if (client_connect_as_service(client, appname)) { + osrfGlobalTransportClient = client; + } + } else { + if (client_connect(client)) { + osrfGlobalTransportClient = client; + } + } - osrfStringArrayFree(arr); free(actlog); free(facility); free(log_level); @@ -481,7 +474,6 @@ int osrfSystemBootstrapClientResc( const char* config_file, free(username); free(password); free(port); - free(unixpath); if(osrfGlobalTransportClient) return 1; diff --git a/src/libopensrf/transport_client.c b/src/libopensrf/transport_client.c index 2a86d0c..f020646 100644 --- a/src/libopensrf/transport_client.c +++ b/src/libopensrf/transport_client.c @@ -1,307 +1,236 @@ #include -/** - @file transport_client.c - @brief Collection of routines for sending and receiving single messages over Jabber. +transport_client* client_init(const char* domain, + int port, const char* username, const char* password) { - These functions form an API built on top of the transport_session API. They serve - two main purposes: - - They remember a Jabber ID to use when sending messages. - - They maintain a queue of input messages that the calling code can get one at a time. -*/ + osrfLogInfo(OSRF_LOG_MARK, + "TCLIENT client_init domain=%s port=%d username=%s", domain, port, username); + + transport_client* client = safe_malloc(sizeof(transport_client)); + client->primary_domain = strdup(domain); + client->connections = osrfNewHash(); + + // These 2 only get values if this client works for a service. + client->service = NULL; + client->service_address = NULL; -static void client_message_handler( void* client, transport_message* msg ); + client->username = username ? strdup(username) : NULL; + client->password = password ? strdup(password) : NULL; -//int main( int argc, char** argv ); + client->port = port; + client->primary_connection = NULL; -/* -int main( int argc, char** argv ) { + client->error = 0; - transport_message* recv; - transport_message* send; + return client; +} - transport_client* client = client_init( "spacely.georgialibraries.org", 5222 ); +static transport_con* client_connect_common( + transport_client* client, const char* domain) { - // try to connect, allow 15 second connect timeout - if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) { - printf("Connected...\n"); - } else { - printf( "NOT Connected...\n" ); exit(99); - } + osrfLogInfo(OSRF_LOG_MARK, "TCLIENT Connecting to domain: %s", domain); - while( (recv = client_recv( client, -1 )) ) { + transport_con* con = transport_con_new(domain); - if( recv->body ) { - int len = strlen(recv->body); - char buf[len + 20]; - osrf_clearbuf( buf, 0, sizeof(buf)); - sprintf( buf, "Echoing...%s", recv->body ); - send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" ); - } else { - send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" ); - } + osrfHashSet(client->connections, (void*) con, (char*) domain); - if( send == NULL ) { printf("something's wrong"); } - client_send_message( client, send ); + return con; +} - message_free( send ); - message_free( recv ); - } - printf( "ended recv loop\n" ); +static transport_con* get_transport_con(transport_client* client, const char* domain) { + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT get_transport_con() domain=%s", domain); - return 0; + transport_con* con = (transport_con*) osrfHashGet(client->connections, (char*) domain); + if (con != NULL) { return con; } + + // If we don't have the a connection for the requested domain, + // it means we're setting up a connection to a remote domain. + + con = client_connect_common(client, domain); + + transport_con_set_address(con, NULL); + + // Connections to remote domains assume the same connection + // attributes apply. + transport_con_connect(con, client->port, client->username, client->password); + + return con; } -*/ +int client_connect_as_service(transport_client* client, const char* service) { + osrfLogInternal(OSRF_LOG_MARK, + "TCLIENT client_connect_as_service() service=%s", service); -/** - @brief Allocate and initialize a transport_client. - @param server Domain name where the Jabber server resides. - @param port Port used for connecting to Jabber (0 if using UNIX domain socket). - @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket). - @param component Boolean; true if we're a Jabber component. - @return A pointer to a newly created transport_client. - - Create a transport_client with a transport_session and an empty message queue (but don't - open a connection yet). Install a callback function in the transport_session to enqueue - incoming messages. - - The calling code is responsible for freeing the transport_client by calling client_free(). -*/ -transport_client* client_init( const char* server, int port, const char* unix_path, int component ) { + growing_buffer* buf = buffer_init(32); - if(server == NULL) return NULL; + buffer_fadd(buf, "opensrf:service:%s", service); - /* build and clear the client object */ - transport_client* client = safe_malloc( sizeof( transport_client) ); + client->service_address = buffer_release(buf); + client->service = strdup(service); - /* start with an empty message queue */ - client->msg_q_head = NULL; - client->msg_q_tail = NULL; + transport_con* con = client_connect_common(client, client->primary_domain); - /* build the session */ - client->session = init_transport( server, port, unix_path, client, component ); + transport_con_set_address(con, service); - client->session->message_callback = client_message_handler; - client->error = 0; - client->host = strdup(server); - client->xmpp_id = NULL; + client->primary_connection = con; - return client; + transport_con_connect( + con, client->port, client->username, client->password); + + // Make a stream for the service address + return transport_con_make_stream(con, client->service_address, 1); } +int client_connect(transport_client* client) { + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_connect()"); -/** - @brief Open a Jabber session for a transport_client. - @param client Pointer to the transport_client. - @param username Jabber user name. - @param password Password for the Jabber logon. - @param resource Resource name for the Jabber logon. - @param connect_timeout How many seconds to wait for the connection to open. - @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes). - @return 1 if successful, or 0 upon error. - - Besides opening the Jabber session, create a Jabber ID for future use. - - If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond. If - @a connect_timeout is zero, don't wait at all. If @a timeout is positive, wait that - number of seconds before timing out. If @a connect_timeout has a negative value other - than -1, the results are not well defined. - - The value of @a connect_timeout applies to each of two stages in the logon procedure. - Hence the logon may take up to twice the amount of time indicated. - - If we connect as a Jabber component, we send the password as an SHA1 hash. Otherwise - we look at the @a auth_type. If it's AUTH_PLAIN, we send the password as plaintext; if - it's AUTH_DIGEST, we send it as a hash. - */ -int client_connect( transport_client* client, - const char* username, const char* password, const char* resource, - int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ) { - if( client == NULL ) - return 0; - - // Create and store a Jabber ID - if( client->xmpp_id ) - free( client->xmpp_id ); - client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource ); - - // Open a transport_session - return session_connect( client->session, username, - password, resource, connect_timeout, auth_type ); + transport_con* con = client_connect_common(client, client->primary_domain); + + transport_con_set_address(con, NULL); + + client->primary_connection = con; + + return transport_con_connect( + con, client->port, client->username, client->password); } -/** - @brief Disconnect from the Jabber session. - @param client Pointer to the transport_client. - @return 0 in all cases. +// Disconnect all connections and remove them from the connections hash. +int client_disconnect(transport_client* client) { - If there are any messages still in the queue, they stay there; i.e. we don't free them here. -*/ -int client_disconnect( transport_client* client ) { - if( client == NULL ) { return 0; } - return session_disconnect( client->session ); + osrfLogDebug(OSRF_LOG_MARK, "TCLIENT Disconnecting all transport connections"); + + osrfHashIterator* iter = osrfNewHashIterator(client->connections); + + transport_con* con; + + while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) { + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Disconnecting from domain: %s", con->domain); + transport_con_disconnect(con); + transport_con_free(con); + } + + osrfHashIteratorFree(iter); + osrfHashFree(client->connections); + + client->connections = osrfNewHash(); + + return 1; } -/** - @brief Report whether a transport_client is connected. - @param client Pointer to the transport_client. - @return Boolean: 1 if connected, or 0 if not. -*/ int client_connected( const transport_client* client ) { - if(client == NULL) return 0; - return session_connected( client->session ); + return (client != NULL && client->primary_connection != NULL); } -/** - @brief Send a transport message to the current destination. - @param client Pointer to a transport_client. - @param msg Pointer to the transport_message to be sent. - @return 0 if successful, or -1 if not. +static char* get_domain_from_address(const char* address) { + osrfLogInternal(OSRF_LOG_MARK, + "TCLIENT get_domain_from_address() address=%s", address); - Translate the transport_message into XML and send it to Jabber, using the previously - stored Jabber ID for the sender. -*/ -int client_send_message( transport_client* client, transport_message* msg ) { - if( client == NULL || client->error ) - return -1; - if( msg->sender ) - free( msg->sender ); - msg->sender = strdup(client->xmpp_id); - return session_send_msg( client->session, msg ); + char* addr_copy = strdup(address); + strtok(addr_copy, ":"); // "opensrf:" + strtok(NULL, ":"); // "client:" + char* domain = strtok(NULL, ":"); + + if (domain) { + // About to free addr_copy... + domain = strdup(domain); + } else { + osrfLogError(OSRF_LOG_MARK, "No domain parsed from address: %s", address); + } + + free(addr_copy); + + return domain; } -/** - @brief Fetch an input message, if one is available. - @param client Pointer to a transport_client. - @param timeout How long to wait for a message to arrive, in seconds (see remarks). - @return A pointer to a transport_message if successful, or NULL if not. +int client_send_message(transport_client* client, transport_message* msg) { + return client_send_message_to(client, msg, msg->recipient) ; +} - If there is a message already in the queue, return it immediately. Otherwise read any - available messages from the transport_session (subject to a timeout), and return the - first one. +int client_send_message_to(transport_client* client, transport_message* msg, const char* recipient) { + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_send_message()"); - If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a - message arrives (or we error out for other reasons). If the value of @a timeout is zero, - don't wait at all. + if (client == NULL || client->error) { return -1; } - The calling code is responsible for freeing the transport_message by calling message_free(). -*/ -transport_message* client_recv( transport_client* client, int timeout ) { - if( client == NULL ) { return NULL; } + transport_con* con; - int error = 0; /* boolean */ + if (strstr(recipient, "opensrf:client")) { + // We may be talking to a worker that runs on a remote domain. + // Find or create a connection to the domain. - if( NULL == client->msg_q_head ) { + char* domain = get_domain_from_address(recipient); - // No message available on the queue? Try to get a fresh one. + if (!domain) { return -1; } - // When we call session_wait(), it reads a socket for new messages. When it finds - // one, it enqueues it by calling the callback function client_message_handler(), - // which we installed in the transport_session when we created the transport_client. + con = get_transport_con(client, domain); - // Since a single call to session_wait() may not result in the receipt of a complete - // message. we call it repeatedly until we get either a message or an error. + if (!con) { + osrfLogError( + OSRF_LOG_MARK, "Error creating connection for domain: %s", domain); - // Alternatively, a single call to session_wait() may result in the receipt of - // multiple messages. That's why we have to enqueue them. + return -1; + } - // The timeout applies to the receipt of a complete message. For a sufficiently - // short timeout, a sufficiently long message, and a sufficiently slow connection, - // we could timeout on the first message even though we're still receiving data. - - // Likewise we could time out while still receiving the second or subsequent message, - // return the first message, and resume receiving messages later. + } else { + con = client->primary_connection; + } + + if (msg->sender) { free(msg->sender); } + msg->sender = strdup(con->address); - if( timeout == -1 ) { /* wait potentially forever for data to arrive */ + message_prepare_json(msg); - int x; - do { - if( (x = session_wait( client->session, -1 )) ) { - osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x); - error = 1; - break; - } - } while( client->msg_q_head == NULL ); + osrfLogInternal(OSRF_LOG_MARK, + "client_send_message() to=%s %s", recipient, msg->msg_json); + + return transport_con_send(con, msg->msg_json, recipient); + + osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed"); + + return 0; +} - } else { /* loop up to 'timeout' seconds waiting for data to arrive */ +transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) { - /* This loop assumes that a time_t is denominated in seconds -- not */ - /* guaranteed by Standard C, but a fair bet for Linux or UNIX */ + osrfLogInternal(OSRF_LOG_MARK, + "TCLIENT client_recv_stream() timeout=%d stream=%s", timeout, stream); - time_t start = time(NULL); - time_t remaining = (time_t) timeout; + transport_con_msg* con_msg = + transport_con_recv(client->primary_connection, timeout, stream); - int wait_ret; - do { - if( (wait_ret = session_wait( client->session, (int) remaining)) ) { - error = 1; - osrfLogDebug(OSRF_LOG_MARK, - "session_wait returned failure code %d: setting error=1\n", wait_ret); - break; - } + if (con_msg == NULL) { return NULL; } // Receive timed out. - remaining -= time(NULL) - start; - } while( NULL == client->msg_q_head && remaining > 0 ); - } - } + transport_message* msg = new_message_from_json(con_msg->msg_json); - transport_message* msg = NULL; + transport_con_msg_free(con_msg); - if( !error && client->msg_q_head != NULL ) { - /* got message(s); dequeue the oldest one */ - msg = client->msg_q_head; - client->msg_q_head = msg->next; - msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */ - if( NULL == client->msg_q_head ) - client->msg_q_tail = NULL; - } + osrfLogInternal(OSRF_LOG_MARK, + "client_recv() read response for thread %s", msg->thread); return msg; } -/** - @brief Enqueue a newly received transport_message. - @param client A pointer to a transport_client, cast to a void pointer. - @param msg A new transport message. - - Add a newly arrived input message to the tail of the queue. +transport_message* client_recv(transport_client* client, int timeout) { - This is a callback function. The transport_session parses the XML coming in through a - socket, accumulating various bits and pieces. When it sees the end of a message stanza, - it packages the bits and pieces into a transport_message that it passes to this function, - which enqueues the message for processing. -*/ -static void client_message_handler( void* client, transport_message* msg ){ + return client_recv_stream(client, timeout, client->primary_connection->address); +} - if(client == NULL) return; - if(msg == NULL) return; +transport_message* client_recv_for_service(transport_client* client, int timeout) { - transport_client* cli = (transport_client*) client; + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Receiving for service %s", client->service); - /* add the new message to the tail of the queue */ - if( NULL == cli->msg_q_head ) - cli->msg_q_tail = cli->msg_q_head = msg; - else { - cli->msg_q_tail->next = msg; - cli->msg_q_tail = msg; - } - msg->next = NULL; + return client_recv_stream(client, timeout, client->service_address); } - /** @brief Free a transport_client, along with all resources it owns. @param client Pointer to the transport_client to be freed. @return 1 if successful, or 0 if not. The only error condition is if @a client is NULL. */ int client_free( transport_client* client ) { - if(client == NULL) - return 0; - session_free( client->session ); - client->session = NULL; + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_free()"); + if (client == NULL) { return 0; } return client_discard( client ); } @@ -315,29 +244,39 @@ int client_free( transport_client* client ) { disconnect the parent as well. */ int client_discard( transport_client* client ) { - if(client == NULL) - return 0; - - transport_message* current = client->msg_q_head; - transport_message* next; - - /* deallocate the list of messages */ - while( current != NULL ) { - next = current->next; - message_free( current ); - current = next; - } - - free(client->host); - free(client->xmpp_id); - free( client ); + osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_discard()"); + + if (client == NULL) { return 0; } + + osrfLogInternal(OSRF_LOG_MARK, + "Discarding client on domain %s", client->primary_domain); + + if (client->primary_domain) { free(client->primary_domain); } + if (client->service) { free(client->service); } + if (client->service_address) { free(client->service_address); } + if (client->username) { free(client->username); } + if (client->password) { free(client->password); } + + // Clean up our connections. + // We do not disconnect here since they caller may or may + // not want the socket closed. + // If disconnect() was just called, the connections hash + // will be empty. + osrfHashIterator* iter = osrfNewHashIterator(client->connections); + + transport_con* con; + + while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) { + osrfLogInternal(OSRF_LOG_MARK, + "client_discard() freeing connection for %s", con->domain); + transport_con_free(con); + } + + osrfHashIteratorFree(iter); + osrfHashFree(client->connections); + + free(client); + return 1; } -int client_sock_fd( transport_client* client ) -{ - if( !client ) - return 0; - else - return client->session->sock_id; -} diff --git a/src/libopensrf/transport_connection.c b/src/libopensrf/transport_connection.c new file mode 100644 index 0000000..c4544db --- /dev/null +++ b/src/libopensrf/transport_connection.c @@ -0,0 +1,321 @@ +#include + +transport_con* transport_con_new(const char* domain) { + + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_new() domain=%s", domain); + + transport_con* con = safe_malloc(sizeof(transport_con)); + + con->bus = NULL; + con->address = NULL; + con->domain = strdup(domain); + con->max_queue = 1000; // TODO pull from config + + osrfLogInternal(OSRF_LOG_MARK, + "TCON created transport connection with domain: %s", con->domain); + + return con; +} + +void transport_con_msg_free(transport_con_msg* msg) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()"); + + if (msg == NULL) { return; } + + if (msg->msg_id) { free(msg->msg_id); } + if (msg->msg_json) { free(msg->msg_json); } + + free(msg); +} + +void transport_con_free(transport_con* con) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_free()"); + + osrfLogInternal( + OSRF_LOG_MARK, "Freeing transport connection for %s", con->domain); + + if (con->bus) { free(con->bus); } + if (con->address) { free(con->address); } + if (con->domain) { free(con->domain); } + + free(con); +} + +int transport_con_connected(transport_con* con) { + return con->bus != NULL; +} + +void transport_con_set_address(transport_con* con, const char* service) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_set_address()"); + + char hostname[1024]; + hostname[1023] = '\0'; + gethostname(hostname, 1023); + + growing_buffer *buf = buffer_init(64); + buffer_fadd(buf, "opensrf:client:%s:%s:", con->domain, hostname); + + if (service != NULL) { + buffer_fadd(buf, "%s:", service); + } + + buffer_fadd(buf, "%ld", (long) getpid()); + + char junk[256]; + snprintf(junk, sizeof(junk), + "%f%d", get_timestamp_millis(), (int) time(NULL)); + + char* md5 = md5sum(junk); + + buffer_add(buf, ":"); + buffer_add_n(buf, md5, 8); + + con->address = buffer_release(buf); + + osrfLogDebug(OSRF_LOG_MARK, "Connection set address to %s", con->address); +} + +int transport_con_connect( + transport_con* con, int port, const char* username, const char* password) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_connect()"); + + osrfLogDebug(OSRF_LOG_MARK, "Transport con connecting with bus " + "domain=%s; address=%s; port=%d; username=%s", + con->domain, + con->address, + port, + username + ); + + con->bus = redisConnect(con->domain, port); + + if (con->bus == NULL) { + osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance"); + return 0; + } + + osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK"); + + redisReply *reply = + redisCommand(con->bus, "AUTH %s %s", username, password); + + if (handle_redis_error(reply, "AUTH %s %s", username, password)) { + return 0; + } + + freeReplyObject(reply); + + return transport_con_make_stream(con, con->address, 0); +} + +int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream); + + redisReply *reply = redisCommand( + con->bus, + "XGROUP CREATE %s %s $ mkstream", + stream, + stream, + "$", + "mkstream" + ); + + // Produces an error when a group/stream already exists, but that's + // acceptible when creating a group/stream for a stop-level service + // address, since multiple Listeners are allowed. + if (handle_redis_error(reply, + "XGROUP CREATE %s %s $ mkstream", + stream, + stream, + "$", + "mkstream" + )) { return exists_ok; } + + freeReplyObject(reply); + + return 1; +} + +int transport_con_disconnect(transport_con* con) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_disconnect()"); + + if (con == NULL || con->bus == NULL) { return -1; } + + redisReply *reply = redisCommand(con->bus, "DEL %s", con->address); + + if (!handle_redis_error(reply, "DEL %s", con->address)) { + freeReplyObject(reply); + } + + redisFree(con->bus); + con->bus = NULL; + + return 0; +} + +int transport_con_send(transport_con* con, const char* msg_json, const char* stream) { + + osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json); + + redisReply *reply = redisCommand(con->bus, + "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", + stream, + con->max_queue, + msg_json + ); + + if (handle_redis_error(reply, + "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", + stream, con->max_queue, msg_json)) { + + return -1; + } + + freeReplyObject(reply); + + return 0; +} + +transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) { + osrfLogInternal(OSRF_LOG_MARK, + "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream); + + if (stream == NULL) { stream = con->address; } + + redisReply *reply, *tmp; + char *msg_id = NULL, *json = NULL; + + if (timeout == 0) { + + reply = redisCommand(con->bus, + "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >", + stream, con->address, stream + ); + + } else { + + if (timeout == -1) { + // Redis timeout 0 means block indefinitely + timeout = 0; + } else { + // Milliseconds + timeout *= 1000; + } + + reply = redisCommand(con->bus, + "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >", + stream, con->address, timeout, stream + ); + } + + // Timeout or error + if (handle_redis_error( + reply, + "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >", + stream, con->address, "BLOCK X", stream + )) { return NULL; } + + // Unpack the XREADGROUP response, which is a nest of arrays. + // These arrays are mostly 1 and 2-element lists, since we are + // only reading one item on a single stream. + if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) { + tmp = reply->element[0]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { + tmp = tmp->element[1]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) { + tmp = tmp->element[0]; + + if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { + redisReply *r1 = tmp->element[0]; + redisReply *r2 = tmp->element[1]; + + if (r1->type == REDIS_REPLY_STRING) { + msg_id = strdup(r1->str); + } + + if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) { + // r2->element[0] is the message name, which we + // currently don't use for anything. + + r2 = r2->element[1]; + + if (r2->type == REDIS_REPLY_STRING) { + json = strdup(r2->str); + } + } + } + } + } + } + + freeReplyObject(reply); // XREADGROUP + + if (msg_id == NULL) { + // Read timed out. 'json' will also be NULL. + return NULL; + } + + transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg)); + tcon_msg->msg_id = msg_id; + tcon_msg->msg_json = json; + + osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json); + + return tcon_msg; +} + + +transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream) { + osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_recv() stream=%s", stream); + + if (timeout == 0) { + return transport_con_recv_once(con, 0, stream); + + } else if (timeout < 0) { + // Keep trying until we have a result. + + while (1) { + transport_con_msg* msg = transport_con_recv_once(con, -1, stream); + if (msg != NULL) { return msg; } + } + } + + time_t seconds = (time_t) timeout; + + while (seconds > 0) { + // Keep trying until we get a response or our timeout is exhausted. + + time_t now = time(NULL); + transport_con_msg* msg = transport_con_recv_once(con, timeout, stream); + + if (msg == NULL) { + seconds -= now; + } else { + return msg; + } + } + + return NULL; +} + +void transport_con_flush_socket(transport_con* con) { +} + +// Returns false/0 on success, true/1 on failure. +// On error, the reply is freed. +int handle_redis_error(redisReply *reply, const char* command, ...) { + VA_LIST_TO_STRING(command); + + if (reply != NULL && reply->type != REDIS_REPLY_ERROR) { + osrfLogInternal(OSRF_LOG_MARK, "Redis Command: %s", VA_BUF); + return 0; + } + + char* err = reply == NULL ? "" : reply->str; + osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF); + freeReplyObject(reply); + + return 1; +} diff --git a/src/libopensrf/transport_message.c b/src/libopensrf/transport_message.c index 332b622..59a7f1a 100644 --- a/src/libopensrf/transport_message.c +++ b/src/libopensrf/transport_message.c @@ -1,4 +1,5 @@ #include +#include /** @file transport_message.c @@ -66,11 +67,72 @@ transport_message* message_init( const char* body, const char* subject, msg->error_code = 0; msg->broadcast = 0; msg->msg_xml = NULL; + msg->msg_json = NULL; msg->next = NULL; return msg; } +transport_message* new_message_from_json(const char* msg_json) { + + if (msg_json == NULL || *msg_json == '\0') { return NULL; } + + transport_message* new_msg = safe_malloc(sizeof(transport_message)); + + new_msg->body = NULL; + new_msg->subject = NULL; + new_msg->thread = NULL; + new_msg->recipient = NULL; + new_msg->sender = NULL; + new_msg->router_from = NULL; + new_msg->router_to = NULL; + new_msg->router_class = NULL; + new_msg->router_command = NULL; + new_msg->osrf_xid = NULL; + new_msg->is_error = 0; + new_msg->error_type = NULL; + new_msg->error_code = 0; + new_msg->broadcast = 0; + new_msg->msg_xml = NULL; + new_msg->next = NULL; + + jsonObject* json_hash = jsonParse(msg_json); + + if (json_hash == NULL || json_hash->type != JSON_HASH) { + osrfLogError(OSRF_LOG_MARK, "new_message_from_json() received bad JSON"); + jsonObjectFree(json_hash); + message_free(new_msg); + return NULL; + } + + const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from")); + if (sender) { new_msg->sender = strdup((const char*) sender); } + + const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to")); + if (recipient) { new_msg->recipient = strdup((const char*) recipient); } + + const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread")); + if (thread == NULL) { thread = ""; } + new_msg->thread = strdup((const char*) thread); + + const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid")); + if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); } + + // TODO + // Internally the mesage body is stored as a JSON string + // On the wire, it's just part of the message. We could get + // rid if this extra json encode/decode step if we treated + // the body as a JSON object internally. + const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body")); + if (body == NULL) { body = ""; } + new_msg->body = strdup((const char*) body); + + jsonObjectFree(json_hash); + + return new_msg; +} + + /** @brief Translate an XML string into a transport_message. @@ -308,11 +370,49 @@ int message_free( transport_message* msg ){ free(msg->osrf_xid); if( msg->error_type != NULL ) free(msg->error_type); if( msg->msg_xml != NULL ) free(msg->msg_xml); + if( msg->msg_json != NULL ) free(msg->msg_json); free(msg); return 1; } +int message_prepare_json(transport_message* msg) { + + if (!msg) { return 0; } + if (msg->msg_json) { return 1; } /* already done */ + + jsonObject* json_hash = jsonNewObject(NULL); + jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient)); + jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender)); + jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread)); + jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid)); + + if (msg->router_from) { + jsonObjectSetKey(json_hash, "router_from", jsonNewObject(msg->router_from)); + } + if (msg->router_to) { + jsonObjectSetKey(json_hash, "router_to", jsonNewObject(msg->router_to)); + } + if (msg->router_class) { + jsonObjectSetKey(json_hash, "router_class", jsonNewObject(msg->router_class)); + } + if (msg->router_command) { + jsonObjectSetKey(json_hash, "router_command", jsonNewObject(msg->router_command)); + } + + // TODO the various layers expect the message body to be a separate + // JSON string, but on the bus, the body is just another key + // in the JSON object. + jsonObjectSetKey(json_hash, "body", jsonParse(msg->body)); + + msg->msg_json = jsonObjectToJSON(json_hash); + + jsonObjectFree(json_hash); + + return 1; +} + + /** @brief Build a <message> element and store it as a string in the msg_xml member. @param msg Pointer to a transport_message. diff --git a/src/perl/lib/OpenSRF/AppSession.pm b/src/perl/lib/OpenSRF/AppSession.pm index 603ba3c..25ddd1b 100644 --- a/src/perl/lib/OpenSRF/AppSession.pm +++ b/src/perl/lib/OpenSRF/AppSession.pm @@ -208,18 +208,7 @@ sub last_sent_type { sub get_app_targets { my $app = shift; - - my $conf = OpenSRF::Utils::Config->current; - my $router_name = $conf->bootstrap->router_name || 'router'; - my $domain = $conf->bootstrap->domain; - $logger->error("use of is deprecated") if $conf->bootstrap->domains; - - unless($router_name and $domain) { - throw OpenSRF::EX::Config - ("Missing router config information 'router_name' and 'domain'"); - } - - return ("$router_name\@$domain/$app"); + return ("opensrf:service:$app"); } sub stateless { @@ -579,12 +568,24 @@ sub send { } my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc); - $logger->internal("AppSession sending doc: $json"); - $self->{peer_handle}->send( - to => $self->remote_id, - thread => $self->session_id, - body => $json ); + my $recipient = $self->remote_id; + + if ($self->endpoint == CLIENT and $self->state != CONNECTED) { + # Send new requests to our router + my $conf = OpenSRF::Utils::Config->current; + my $domain = $conf->bootstrap->domain; + $recipient = "opensrf:router:$domain"; + } + + $logger->internal("AppSession sending doc to=$recipient: $json"); + + $self->{peer_handle}->send_to( + $recipient, + to => $self->remote_id, + thread => $self->session_id, + body => $json + ); if( $disconnect) { $self->state( DISCONNECTED ); diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 6f3981f..269ee1d 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -21,7 +21,7 @@ use OpenSRF::Utils::Config; use OpenSRF::Transport::PeerHandle; use OpenSRF::Utils::SettingsClient; use OpenSRF::Utils::Logger qw($logger); -use OpenSRF::Transport::SlimJabber::Client; +use OpenSRF::Transport::Redis::Client; use Encode; use POSIX qw/:sys_wait_h :errno_h/; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); @@ -132,7 +132,7 @@ sub handle_sighup { } # ---------------------------------------------------------------- -# Waits on the jabber socket for inbound data from the router. +# Waits on the redis socket for inbound data from the router. # Each new message is passed off to a child process for handling. # At regular intervals, wake up for min/max spare child maintenance # ---------------------------------------------------------------- @@ -259,30 +259,12 @@ sub kill_child { } # ---------------------------------------------------------------- -# Jabber connection inbound message arrive on. +# Redis connection inbound message arrive on. # ---------------------------------------------------------------- sub build_osrf_handle { my $self = shift; - - my $conf = OpenSRF::Utils::Config->current; - my $username = $conf->bootstrap->username; - my $password = $conf->bootstrap->passwd; - my $domain = $conf->bootstrap->domain; - my $port = $conf->bootstrap->port; - my $resource = $self->{service} . '_listener_' . $conf->env->hostname; - - $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port"); - $self->{osrf_handle} = - OpenSRF::Transport::SlimJabber::Client->new( - username => $username, - resource => $resource, - password => $password, - host => $domain, - port => $port, - ); - - $self->{osrf_handle}->initialize; + OpenSRF::Transport::Redis::Client->new($self->{service}); } @@ -291,11 +273,11 @@ sub build_osrf_handle { # ---------------------------------------------------------------- sub write_child { my($self, $child, $msg) = @_; - my $xml = encode_utf8(decode_utf8($msg->to_xml)); + my $json = $msg->to_json; # tell the child how much data to expect, minus the header my $write_size; - {use bytes; $write_size = length($xml)} + {use bytes; $write_size = length($json)} $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size); for (0..2) { @@ -312,12 +294,12 @@ sub write_child { # so the lack of a pid means the child is dead. if (!$child->{pid}) { $logger->error("server: child is dead in write_child(). ". - "unable to send message: $xml"); + "unable to send message: $json"); return; # avoid syswrite crash } # send message to child data pipe - syswrite($child->{pipe_to_child}, $write_size . $xml); + syswrite($child->{pipe_to_child}, $write_size . $json); last unless $self->{sig_pipe}; $logger->error("server: got SIGPIPE writing to $child, retrying..."); @@ -533,11 +515,11 @@ sub register_routers { my $name = $router->{name}; my $domain = $router->{domain}; - push(@targets, "$name\@$domain/router"); + push(@targets, "opensrf:router:$domain"); } } else { - push(@targets, "$router_name\@$router/router"); + push(@targets, "opensrf:router:$router"); } } @@ -545,7 +527,7 @@ sub register_routers { $logger->info("server: registering with router $_"); $self->{osrf_handle}->send( to => $_, - body => 'registering', + body => '"registering"', router_command => 'register', router_class => $self->{service} ); @@ -566,7 +548,7 @@ sub unregister_routers { $logger->info("server: disconnecting from router $router"); $self->{osrf_handle}->send( to => $router, - body => "unregistering", + body => '"unregistering"', router_command => "unregister", router_class => $self->{service} ); @@ -580,7 +562,7 @@ use warnings; use OpenSRF::Transport; use OpenSRF::Application; use OpenSRF::Transport::PeerHandle; -use OpenSRF::Transport::SlimJabber::XMPPMessage; +use OpenSRF::Transport::Redis::Message; use OpenSRF::Utils::Logger qw($logger); use OpenSRF::DomainObject::oilsResponse qw/:status/; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); @@ -613,7 +595,7 @@ sub set_block { } # ---------------------------------------------------------------- -# Connects to Jabber and runs the application child_init +# Connects to the bus and runs the application child_init # ---------------------------------------------------------------- sub init { my $self = shift; @@ -650,7 +632,7 @@ sub run { my $session = OpenSRF::Transport->handler( $self->{parent}->{service}, - OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data) + OpenSRF::Transport::Redis::Message->new(json => $data) ); my $recycle = $self->keepalive_loop($session); diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm index c9534dc..88863a2 100644 --- a/src/perl/lib/OpenSRF/System.pm +++ b/src/perl/lib/OpenSRF/System.pm @@ -32,8 +32,7 @@ sub load_bootstrap_config { OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file); OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']); - OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper"); - OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection"); + OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::Redis::PeerConnection"); OpenSRF::Application->server_class('client'); # Read in a shared portion of the config file # for later use in log parameter redaction @@ -65,11 +64,9 @@ sub bootstrap_client { $bootstrap_config_file = $params{config_file} || $bootstrap_config_file; - my $app = $params{client_name} || "client"; - load_bootstrap_config(); OpenSRF::Utils::Logger::set_config(); - OpenSRF::Transport::PeerHandle->construct($app); + OpenSRF::Transport::PeerHandle->construct; } sub connected { diff --git a/src/perl/lib/OpenSRF/Transport.pm b/src/perl/lib/OpenSRF/Transport.pm index 5aeff4d..a222c04 100644 --- a/src/perl/lib/OpenSRF/Transport.pm +++ b/src/perl/lib/OpenSRF/Transport.pm @@ -7,7 +7,6 @@ use OpenSRF::Utils::JSON; use OpenSRF::Utils::Logger qw(:level); use OpenSRF::DomainObject::oilsResponse qw/:status/; use OpenSRF::EX qw/:try/; -use OpenSRF::Transport::SlimJabber::MessageWrapper; #------------------ # --- These must be implemented by all Transport subclasses @@ -34,32 +33,9 @@ sub get_msg_envelope { shift()->alert_abstract(); } our $message_envelope; my $logger = "OpenSRF::Utils::Logger"; - - -=head2 message_envelope( [$envelope] ); - -Sets the message envelope class that will allow us to extract -information from the messages we receive from the low -level transport - -=cut - -sub message_envelope { - my( $class, $envelope ) = @_; - if( $envelope ) { - $message_envelope = $envelope; - $envelope->use; - if( $@ ) { - $logger->error( - "Error loading message_envelope: $envelope -> $@", ERROR); - } - } - return $message_envelope; -} - =head2 handler( $data ) -Creates a new MessageWrapper, extracts the remote_id, session_id, and message body +Creates a new Message, extracts the remote_id, session_id, and message body from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id. Finally, creates the message document from the body of the message and calls the handler method on the message document. @@ -68,17 +44,16 @@ the handler method on the message document. sub handler { my $start_time = time(); - my( $class, $service, $data ) = @_; - - $logger->transport( "Transport handler() received $data", INTERNAL ); + my ($class, $service, $msg) = @_; - my $remote_id = $data->from; - my $sess_id = $data->thread; - my $body = $data->body; - my $type = $data->type; + my $remote_id = $msg->from; + my $sess_id = $msg->thread; + my $body = $msg->body; + my $type = $msg->type; - $logger->set_osrf_xid($data->osrf_xid); + $logger->internal("Transport:handler() received message with thread: $sess_id"); + $logger->set_osrf_xid($msg->osrf_xid); if (defined($type) and $type eq 'error') { throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!"); @@ -104,8 +79,10 @@ sub handler { } } + $logger->internal( + "Building app session with ses=$sess_id remote=$remote_id service=$service"); + # Retrieve or build the app_session as appropriate (server_build decides which to do) - $logger->transport( "AppSession is valid or does not exist yet", INTERNAL ); $app_session = OpenSRF::AppSession->server_build( $sess_id, $remote_id, $service ); if( ! $app_session ) { diff --git a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm new file mode 100644 index 0000000..f74cad8 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm @@ -0,0 +1,202 @@ +package OpenSRF::Transport::Redis::BusConnection; +use strict; +use warnings; +use Redis; +use Net::Domain qw/hostfqdn/; +use OpenSRF::Utils::Logger qw/$logger/; + +# domain doubles as the host of the Redis instance. +sub new { + my ($class, $domain, $port, $username, $password, $max_queue) = @_; + + $logger->debug("Creating new bus connection $domain:$port user=$username"); + + my $self = { + domain => $domain || 'localhost', + port => $port || 6379, + username => $username, + password => $password, + max_queue => $max_queue + }; + + return bless($self, $class); +} + +sub redis { + my $self = shift; + return $self->{redis}; +} + +sub connected { + my $self = shift; + return $self->redis ? 1 : 0; +} + +sub domain { + my $self = shift; + return $self->{domain}; +} + +sub set_address { + my ($self) = @_; + + my $address = sprintf( + "opensrf:client:%s:%s:$$:%s", $self->{domain}, hostfqdn(), int(rand(10_000))); + + $self->{address} = $address; +} + +sub address { + my $self = shift; + return $self->{address}; +} + +sub connect { + my $self = shift; + + return 1 if $self->redis; + + my $domain = $self->{domain}; + my $port = $self->{port}; + my $username = $self->{username}; + my $password = $self->{password}; + my $address = $self->{address}; + + $logger->debug("Redis client connecting: ". + "domain=$domain port=$port username=$username address=$address"); + + # On disconnect, try to reconnect every 100ms up to 60 seconds. + my @connect_args = ( + server => "$domain:$port", + reconnect => 60, + every => 100_000 + ); + + $logger->debug("Connecting to bus: @connect_args"); + + unless ($self->{redis} = Redis->new(@connect_args)) { + die "Could not connect to Redis bus with @connect_args\n"; + } + + unless ($self->redis->auth($username, $password) eq 'OK') { + die "Cannot authenticate with Redis instance user=$username\n"; + } + + $logger->debug("Auth'ed with Redis as $username OK : address=$address"); + + # Each bus connection has its own stream / group for receiving + # direct messages. These streams/groups should not pre-exist. + + $self->redis->xgroup( + 'create', + $address, + $address, + '$', # only receive new messages + 'mkstream' # create this stream if it's not there. + ); + + return $self; +} + +# Set del_stream to remove the stream and any attached consumer groups. +sub disconnect { + my ($self, $del_stream) = @_; + + return unless $self->redis; + + $self->redis->del($self->address) if $del_stream; + + $self->redis->quit; + + delete $self->{redis}; +} + +sub send { + my ($self, $dest_stream, $msg_json) = @_; + + $logger->internal("send(): to=$dest_stream : $msg_json"); + + my @params = ( + $dest_stream, + 'NOMKSTREAM', + 'MAXLEN', + '~', # maxlen-ish + $self->{max_queue}, + '*', # let Redis generate the ID + 'message', # gotta call it something + $msg_json + ); + + eval { $self->redis->xadd(@params) }; + + if ($@) { + $logger->error("XADD error: $@ : @params"); + } +} + +# $timeout=0 means check for data without blocking +# $timeout=-1 means block indefinitely. +# +# $dest_stream defaults to our bus address. Otherwise, it would be +# the service-level address. +sub recv { + my ($self, $timeout, $dest_stream) = @_; + $dest_stream ||= $self->address; + + $logger->debug("Waiting for content at: $dest_stream"); + + my @block; + if ($timeout) { + # 0 means block indefinitely in Redis + $timeout = 0 if $timeout == -1; + $timeout *= 1000; # milliseconds + @block = (BLOCK => $timeout); + } + + my @params = ( + GROUP => $dest_stream, + $self->address, + COUNT => 1, + @block, + 'NOACK', + STREAMS => $dest_stream, + '>' # new messages only + ); + + my $packet; + eval {$packet = $self->redis->xreadgroup(@params) }; + + if ($@) { + $logger->error("Redis XREADGROUP error: $@ : @params"); + return undef; + } + + # Timed out waiting for data. + return undef unless defined $packet; + + # TODO make this more self-documenting. also, too brittle? + # Also note at some point we may need to return info about the + # recipient stream to the caller in case we are listening + # on multiple streams. + my $container = $packet->[0]->[1]->[0]; + my $msg_id = $container->[0]; + my $json = $container->[1]->[1]; + + $logger->internal("recv() $json"); + + return { + msg_json => $json, + msg_id => $msg_id + }; +} + +sub flush_socket { + my $self = shift; + # Remove all messages from my address + $self->redis->xtrim($self->address, 'MAXLEN', 0); + return 1; +} + +1; + + diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm new file mode 100644 index 0000000..c7c2835 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm @@ -0,0 +1,266 @@ +package OpenSRF::Transport::Redis::Client; +use strict; +use warnings; +use Redis; +use Time::HiRes q/time/; +use OpenSRF::Utils::JSON; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Transport; +use OpenSRF::Transport::Redis::Message; +use OpenSRF::Transport::Redis::BusConnection; + +# Map of bus domain names to bus connections. +my %connections; + +# There will only be one Client per process, but each client may +# have multiple connections. +my $_singleton; +sub retrieve { return $_singleton; } + +sub new { + my ($class, $service, $no_cache) = @_; + + return $_singleton if $_singleton && !$no_cache; + + my $self = {service => $service}; + + bless($self, $class); + + my $conf = OpenSRF::Utils::Config->current; + my $domain = $conf->bootstrap->domain; + + # Create a connection for our primary node. + $self->add_connection($domain); + $self->{primary_domain} = $domain; + + if ($service) { + # If we're a service, this is where we listen for service-level requests. + $self->{service_address} = "opensrf:service:$service"; + $self->create_service_stream; + } + + $_singleton = $self unless $no_cache; + + return $self; +} + +sub reset { + return unless $_singleton; + $logger->debug("Redis client disconnecting on reset()"); + $_singleton->disconnect; + $_singleton = undef; +} + +sub connection_type { + my $self = shift; + return $self->{connection_type}; +} + +sub add_connection { + my ($self, $domain) = @_; + + my $conf = OpenSRF::Utils::Config->current; + + my $username = $conf->bootstrap->username; + my $password = $conf->bootstrap->passwd; + my $port = $conf->bootstrap->port; + my $max_queue = 1024; # TODO + + # Assumes other connection parameters are the same across + # Redis instances, apart from the hostname. + my $connection = OpenSRF::Transport::Redis::BusConnection->new( + $domain, $port, $username, $password, $max_queue + ); + + $connection->set_address(); + $connections{$domain} = $connection; + + $connection->connect; + + return $connection; +} + +sub get_connection { + my ($self, $domain) = @_; + + my $con = $connections{$domain}; + + return $con if $con; + + eval { $con = $self->add_connection($domain) }; + + if ($@) { + $logger->error("Could not connect to bus on node: $domain : $@"); + return undef; + } + + return $con; +} + +# Contains a value if this is a service client. +# Undef for standalone clients. +sub service { + my $self = shift; + return $self->{service}; +} + +# Contains a value if this is a service client. +# Undef for standalone clients. +sub service_address { + my $self = shift; + return $self->{service_address}; +} + +sub primary_domain { + my $self = shift; + return $self->{primary_domain}; +} + +sub primary_connection { + my $self = shift; + return $connections{$self->primary_domain}; +} + +sub disconnect { + my ($self, $domain) = @_; + + for my $domain (keys %connections) { + my $con = $connections{$domain}; + $con->disconnect($self->primary_domain eq $domain); + delete $connections{$domain}; + } + + $_singleton = undef; +} + +sub connected { + my $self = shift; + return $self->primary_connection && $self->primary_connection->connected; +} + +sub tcp_connected { + my $self = shift; + return $self->connected; +} + +sub create_service_stream { + my $self = shift; + + eval { + # This gets mad when a stream / group already exists, + # but it's conceivable that it's already been created. + + $self->primary_connection->redis->xgroup( + 'create', + $self->service_address, # stream name + $self->service_address, # group name + '$', # only receive new messages + 'mkstream' # create this stream if it's not there. + ); + }; + + if ($@) { + $logger->debug("BUSYGROUP is OK => : $@"); + } +} + +# Send a message to $recipient regardless of what's in the 'to' +# field of the message. +sub send_to { + my ($self, $recipient, @msg_parts) = @_; + + my $msg = OpenSRF::Transport::Redis::Message->new(@msg_parts); + + $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body)); + + $msg->osrf_xid($logger->get_osrf_xid); + $msg->from($self->primary_connection->address); + + my $msg_json = $msg->to_json; + my $con = $self->primary_connection; + + if ($recipient =~ /^opensrf:client/o) { + # Clients may be lurking on remote nodes. + # Make sure we have a connection to said node. + + # opensrf:client:domain:... + my (undef, undef, $domain) = split(/:/, $recipient); + + my $con = $self->get_connection($domain); + if (!$con) { + $logger->error("Cannot send message to node $domain: $msg_json"); + return; + } + } + + $logger->internal("send(): recipient=$recipient : $msg_json"); + + $con->send($recipient, $msg_json); +} + +sub send { + my $self = shift; + my %msg_parts = @_; + return $self->send_to($msg_parts{to}, %msg_parts); +} + +sub process { + my ($self, $timeout, $for_service) = @_; + + $timeout ||= 0; + + # Redis does not support fractional timeouts. + $timeout = 1 if ($timeout > 0 && $timeout < 1); + + $timeout = int($timeout); + + if (!$self->connected) { + # We can't do anything without a primary bus connection. + # Sleep a short time to avoid die/fork storms, then + # get outta here. + $logger->error("We have no primary bus connection"); + sleep 5; + die "Exiting on lack of primary bus connection"; + } + + return $self->recv($timeout, $for_service); +} + +# $timeout=0 means check for data without blocking +# $timeout=-1 means block indefinitely. +sub recv { + my ($self, $timeout, $for_service) = @_; + + my $dest_stream = $for_service ? $self->{service_address} : undef; + + my $resp = $self->primary_connection->recv($timeout, $dest_stream); + + return undef unless $resp; + + my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json}); + + return undef unless $msg; + + $msg->msg_id($resp->{msg_id}); + + $logger->internal("recv()'ed thread=" . $msg->thread); + + # The message body is doubly encoded as JSON. + $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body)); + + return $msg; +} + + +sub flush_socket { + my $self = shift; + # Remove all messages from our personal stream + if (my $con = $self->primary_connection) { + $con->redis->xtrim($con->address, 'MAXLEN', 0); + } + return 1; +} + +1; + + diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm new file mode 100644 index 0000000..a017126 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm @@ -0,0 +1,129 @@ +package OpenSRF::Transport::Redis::Message; +use strict; use warnings; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Utils::JSON; +use OpenSRF::EX qw/:try/; +use strict; use warnings; + +sub new { + my ($class, %args) = @_; + my $self = bless({}, $class); + + if ($args{json}) { + $self->from_json($args{json}); + + } else { + $self->{to} = $args{to} || ''; + $self->{from} = $args{from} || ''; + $self->{thread} = $args{thread} || ''; + $self->{body} = $args{body} || ''; + $self->{osrf_xid} = $args{osrf_xid} || ''; + $self->{msg_id} = $args{msg_id} || ''; + $self->{router_command} = $args{router_command} || ''; + $self->{router_class} = $args{router_class} || ''; + $self->{router_reply} = $args{router_reply} || ''; + } + + return $self; +} + +sub to { + my($self, $to) = @_; + $self->{to} = $to if defined $to; + return $self->{to}; +} +sub from { + my($self, $from) = @_; + $self->{from} = $from if defined $from; + return $self->{from}; +} +sub thread { + my($self, $thread) = @_; + $self->{thread} = $thread if defined $thread; + return $self->{thread}; +} +sub body { + my($self, $body) = @_; + $self->{body} = $body if defined $body; + return $self->{body}; +} + +sub status { + my($self, $status) = @_; + $self->{status} = $status if defined $status; + return $self->{status}; +} +sub type { + my($self, $type) = @_; + $self->{type} = $type if defined $type; + return $self->{type}; +} + +sub err_type {} +sub err_code {} + +sub osrf_xid { + my($self, $osrf_xid) = @_; + $self->{osrf_xid} = $osrf_xid if defined $osrf_xid; + return $self->{osrf_xid}; +} + +sub msg_id { + my($self, $msg_id) = @_; + $self->{msg_id} = $msg_id if defined $msg_id; + return $self->{msg_id}; +} + +sub router_command { + my($self, $router_command) = @_; + $self->{router_command} = $router_command if defined $router_command; + return $self->{router_command}; +} + +sub router_class { + my($self, $router_class) = @_; + $self->{router_class} = $router_class if defined $router_class; + return $self->{router_class}; +} + +sub router_reply { + my($self, $router_reply) = @_; + $self->{router_reply} = $router_reply if defined $router_reply; + return $self->{router_reply}; +} + +sub to_json { + my $self = shift; + + my $hash = { + to => $self->{to}, + from => $self->{from}, + thread => $self->{thread}, + body => $self->{body} + }; + + # Some values are optional. + # Avoid cluttering the JSON with undef values. + for my $key (qw/osrf_xid router_command router_class router_reply/) { + $hash->{$key} = $self->{$key} if defined $self->{$key} && $self->{$key} ne ''; + } + + return OpenSRF::Utils::JSON->perl2JSON($hash); +} + +sub from_json { + my $self = shift; + my $json = shift; + my $hash; + + eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); }; + + if ($@) { + $logger->error("Redis::Message received invalid JSON: $@ : $json"); + return undef; + } + + $self->{$_} = $hash->{$_} for keys %$hash; +} + +1; diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm new file mode 100644 index 0000000..8aac820 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm @@ -0,0 +1,21 @@ +package OpenSRF::Transport::Redis::PeerConnection; +use strict; +use warnings; +use OpenSRF::Transport::Redis::Client; + +use base qw/OpenSRF::Transport::Redis::Client/; + +sub construct { + my ($class, $service, $no_cache) = @_; + return __PACKAGE__->SUPER::new($service, $no_cache); +} + +sub process { + my $self = shift; + my $msg = $self->SUPER::process(@_); + return 0 unless $msg; + return OpenSRF::Transport->handler($self->service, $msg); +} + + +1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber.pm index a1742e8..aa460b5 100644 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm +++ b/src/perl/lib/OpenSRF/Transport/SlimJabber.pm @@ -11,6 +11,4 @@ classes for handling transport layer messaging sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; } -sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; } - 1; diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index 0b83782..d5c9d77 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -35,11 +35,13 @@ #include #include #include +#include #include #include #include #include #include +#include #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 256 @@ -71,13 +73,9 @@ // opportunity, at which point force-close the connection. #define SHUTDOWN_MAX_GRACEFUL_SECONDS 120 -// Incremented with every REQUEST, decremented with every COMPLETE. -static int requests_in_flight = 0; - // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; static char* config_ctxt = "gateway"; -static char* osrf_router = NULL; static char* osrf_domain = NULL; // Cache of opensrf thread strings and back-end receipients. @@ -92,6 +90,9 @@ static transport_client* osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // Websocket client IP address (for logging) static char* client_ip = NULL; +// Tracks threads that have active requests in flight. +// This covers all request types regardless of connected-ness. +static osrfStringArray* active_threads = NULL; static void rebuild_stdin_buffer(); static void child_init(int argc, char* argv[]); @@ -133,15 +134,17 @@ int main(int argc, char* argv[]) { // (replies returning to the websocket client). fd_set fds; int stdin_no = fileno(stdin); - int osrf_no = osrf_handle->session->sock_id; - int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + //int osrf_no = osrf_handle->session->sock_id; + //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + int maxfd = stdin_no; int sel_resp; int shutdown_stat; + struct timeval tv; while (1) { FD_ZERO(&fds); - FD_SET(osrf_no, &fds); + //FD_SET(osrf_no, &fds); FD_SET(stdin_no, &fds); if (shutdown_requested) { @@ -155,9 +158,20 @@ int main(int argc, char* argv[]) { } else { - // Wait indefinitely for activity to process. - // This will be interrupted during a shutdown request signal. - sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); + if (active_threads->size > 0) { + tv.tv_usec = 0; + tv.tv_sec = 0; + + // Do a non-blocking check for inbound requests while + // we wait for more osrf data to be returned. + sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv); + + } else { + + // No osrf responses pending. Wait indefinitely. + // This will be interrupted during a shutdown request signal. + sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); + } } if (sel_resp < 0) { // error @@ -172,17 +186,18 @@ int main(int argc, char* argv[]) { "WS select() failed with [%s]. Exiting", strerror(errno)); shut_it_down(1); - } - if (sel_resp > 0) { + } else if (sel_resp > 0) { if (FD_ISSET(stdin_no, &fds)) { read_from_stdin(); - } - - if (FD_ISSET(osrf_no, &fds)) { read_from_osrf(); } + + } else if (active_threads->size > 0) { + // Nothing pulled from the websocket, but we still have + // active osrf request. See if any new responses have arrived. + read_from_osrf(); } if (shutdown_requested) { @@ -213,14 +228,13 @@ static int can_shutdown_gracefully() { return -1; } - unsigned long active_sessions = osrfHashGetCount(stateful_session_cache); - if (active_sessions == 0 && requests_in_flight == 0) { + if (active_threads->size == 0) { osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete"); return 1; } osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with " - "sessions=%d requests=%d", active_sessions, requests_in_flight); + "active threeds=%d", active_threads); return 0; } @@ -235,6 +249,7 @@ static void rebuild_stdin_buffer() { } static int shut_it_down(int stat) { + osrfStringArrayFree(active_threads); osrfHashFree(stateful_session_cache); buffer_free(stdin_buf); osrf_system_shutdown(); // clean XMPP disconnect @@ -256,15 +271,16 @@ static void child_init(int argc, char* argv[]) { shut_it_down(1); } - osrf_handle = osrfSystemGetTransportClient(); - osrfAppSessionSetIngress(WEBSOCKET_INGRESS); + osrf_handle = osrfSystemGetTransportClient(); + osrfAppSessionSetIngress(WEBSOCKET_INGRESS); - osrf_router = osrfConfigGetValue(NULL, "/router_name"); osrf_domain = osrfConfigGetValue(NULL, "/domain"); stateful_session_cache = osrfNewHash(); osrfHashSetCallback(stateful_session_cache, release_hash_string); + active_threads = osrfNewStringArray(16); + client_ip = getenv("REMOTE_ADDR"); osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); } @@ -424,8 +440,8 @@ static void relay_stdin_message(const char* msg_string) { if (!recipient) { if (service) { - int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1, - "%s@%s/%s", osrf_router, osrf_domain, service); + int size = snprintf(recipient_buf, + RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain); recipient_buf[size] = '\0'; recipient = recipient_buf; @@ -494,11 +510,16 @@ static char* extract_inbound_messages( switch (msg->m_type) { case CONNECT: + if (!osrfStringArrayContains(active_threads, thread)) { + osrfStringArrayAdd(active_threads, thread); + } break; case REQUEST: log_request(service, msg); - requests_in_flight++; + if (!osrfStringArrayContains(active_threads, thread)) { + osrfStringArrayAdd(active_threads, thread); + } break; case DISCONNECT: @@ -568,8 +589,7 @@ static void read_from_osrf() { transport_message* tmsg = NULL; // Double check the socket connection before continuing. - if (!client_connected(osrf_handle) || - !socket_connected(osrf_handle->session->sock_id)) { + if (!client_connected(osrf_handle)) { osrfLogWarning(OSRF_LOG_MARK, "WS: Jabber socket disconnected, exiting"); shut_it_down(1); @@ -579,9 +599,23 @@ static void read_from_osrf() { // read. This means we can't return to the main select() loop after // each message, because any subsequent messages will get stuck in // the opensrf receive queue. Process all available messages. - while ( (tmsg = client_recv(osrf_handle, 0)) ) { + + // As long as any active requests are in flight, wait up to one + // second to receive a response. Then return to inspect stdin + // to see if there are any requests waiting we can push through. + // Then come back here. + while (1) { + int timeout = active_threads->size > 0 ? 1 : 0; + + tmsg = client_recv(osrf_handle, timeout); + + if (!tmsg) { break; } + read_one_osrf_message(tmsg); - message_free(tmsg); + + osrfLogDebug(OSRF_LOG_MARK, + "WS relaying message to STDOUT thread=%s, recipient=%s", + tmsg->thread, tmsg->recipient); } } @@ -637,14 +671,17 @@ static void read_one_osrf_message(transport_message* tmsg) { } else { - // connection timed out; clear the cached recipient - if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { + // Any error condition ends the conversation. + if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) { osrfHashRemove(stateful_session_cache, tmsg->thread); + osrfStringArrayRemove(active_threads, tmsg->thread); } else { if (one_msg->status_code == OSRF_STATUS_COMPLETE) { - requests_in_flight--; + osrfLogInternal(OSRF_LOG_MARK, + "WS Marking request complete for thread %s", tmsg->thread); + osrfStringArrayRemove(active_threads, tmsg->thread); } } } -- 2.11.0