export PREFIX = @prefix@
export TMP = @TMP@
export LIBXML2_HEADERS = @LIBXML2_HEADERS@
+export HIREDIS_HEADERS = @HIREDIS_HEADERS@
export APR_HEADERS = @APR_HEADERS@
export ETCDIR = @sysconfdir@
export APXS2 = @APXS2@
export APACHE2_HEADERS = @APACHE2_HEADERS@
-export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) @AM_CPPFLAGS@
+export DEF_CFLAGS = -D_LARGEFILE64_SOURCE $(MAYBE_DEBUG) -pipe -g -Wall -O2 -fPIC -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS) -I$(HIREDIS_HEADERS) @AM_CPPFLAGS@
export DEF_LDLIBS = -lopensrf
export VAR = @localstatedir@
export PID = @localstatedir@/run/opensrf
$(OSRFINC)/transport_client.h \
$(OSRFINC)/transport_message.h \
$(OSRFINC)/transport_session.h \
+ $(OSRFINC)/transport_connection.h \
$(OSRFINC)/utils.h \
$(OSRFINC)/xml_utils.h \
src/gateway/apachetools.h
use OpenSRF::Transport::Listener;
use OpenSRF::Utils;
use OpenSRF::Utils::Config;
+use Redis;
my $opt_service = undef;
my $opt_config = "@CONF_DIR@/opensrf_core.xml";
my $opt_quiet = 0;
my $opt_diagnostic = 0;
my $opt_ignore_orphans = 0;
+my $opt_reset_message_bus = 0;
my $sclient;
my @perl_services;
my @nonperl_services;
'reload' => \$opt_reload,
'reload-all' => \$opt_reload_all,
'diagnostic' => \$opt_diagnostic,
+ 'reset-message-bus' => \$opt_reset_message_bus,
'ignore-orphans' => \$opt_ignore_orphans
-);
+) || warn "\n$!\n" && exit(1);
+
if ($opt_localhost) {
$hostname = 'localhost';
sub do_start_router {
+ return;
my $pidfile = get_pid_file('router');
`opensrf_router $opt_config routers $pidfile`;
my $apps = $sclient->config_value("activeapps", "appname");
# disconnect the top-level network handle
- OpenSRF::Transport::PeerHandle->retrieve->disconnect;
+ OpenSRF::Transport::PeerHandle->retrieve->reset;
if($apps) {
$apps = [$apps] unless ref $apps;
sub do_start_all {
msg("starting router and services for $hostname");
- do_start('router');
+ #do_start('router');
return do_start_services();
}
# graceful shutdown requires the presence of the router, so stop the
# router last. See if it's running first to avoid unnecessary warnings.
- do_stop('router', $signals[0]) if get_service_pids_from_file('router');
+ #do_stop('router', $signals[0]) if get_service_pids_from_file('router');
return 1;
}
$parser->get_server_config($conf->env->hostname);
}
+# Clear and reset permissions on the bus'es linked to our domains.
+sub do_reset_message_bus {
+
+ OpenSRF::Utils::Config->load(config_file => $opt_config);
+ my $conf = OpenSRF::Utils::Config->current;
+
+ my $routers = $conf->bootstrap->routers;
+
+ # TODO pull logins for all clients in the conf, including
+ # gateway and router.
+ for my $router (@{$conf->bootstrap->routers}) {
+
+ my $domain = ref $router ? $router->{domain} : $router;
+ my $port = $conf->bootstrap->port;
+
+ # This redis connection uses the "default" account, which has
+ # access to all actions and keys so it can act as the admin.
+ my @connect_args = (server => "$domain:$port");
+
+ my $redis = Redis->new(@connect_args) or
+ die "Cannot connect to Redis instance at @connect_args\n";
+
+ # Clear all the data
+ msg("Clearing all data from message bus: @connect_args");
+ $redis->flushall;
+
+ my $username = $conf->bootstrap->username;
+ my $password = $conf->bootstrap->passwd;
+
+ msg("Applying bus access for $username");
+
+ $redis->acl('SETUSER', $username, 'reset');
+ $redis->acl('SETUSER', $username, 'on', ">$password");
+
+ my @perms = qw/
+ -@all
+ +xgroup
+ +xadd
+ +xreadgroup
+ +xtrim
+ +del
+ ~opensrf:router:*
+ ~opensrf:service:*
+ ~opensrf:client:*
+ /;
+
+ $redis->acl('SETUSER', $username, @perms);
+
+ $redis->quit;
+ }
+}
+
sub msg {
my $m = shift;
print "* $m\n" unless $opt_quiet;
and gracefully re-launch drone processes. The -all variant sends
the signal to all services. The non-(-all) variant requires a
--service.
+
+ --reset-message-bus
+ Clear ALL data from the message bus, create opensrf accounts w/
+ permission to access message queues.
HELP
exit;
}
+do_reset_message_bus() if $opt_reset_message_bus;
+
# we do not verify services for stop/signal actions, since those may
# legitimately be used against services not (or no longer) configured
# to run on the selected host.
# show help if no action was requested
do_help() if $opt_help or not (
+ $opt_reset_message_bus or
$opt_start or
$opt_start_all or
$opt_start_services or
[LIBXML2_HEADERS=/usr/include/libxml2/])
AC_SUBST([LIBXML2_HEADERS])
+AC_ARG_WITH([hiredis],
+[ --with-hiredis=path location of the hiredis headers (default is /usr/include/hiredis/))],
+[HIREDIS_HEADERS=${withval}],
+[HIREDIS_HEADERS=/usr/include/hiredis/])
+AC_SUBST([HIREDIS_HEADERS])
+
+AC_ARG_WITH([fyaml],
+[ --with-fyaml=path location of the fyaml headers (default is /usr/include/))],
+[FYAML_HEADERS=${withval}],
+[FYAML_HEADERS=/usr/include/])
+AC_SUBST([FYAML_HEADERS])
+
AC_ARG_WITH([includes],
[ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
[EXTRA_USER_INCLUDES=${withval}])
AC_CHECK_LIB([ncurses], [initscr], [], AC_MSG_ERROR(***OpenSRF requires ncurses development headers))
AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
+ AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
+ AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml))
# Check for libmemcached and set flags accordingly
PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
AC_SUBST(memcached_CFLAGS)
src/libopensrf/Makefile
src/perl/Makefile
src/ports/strn_compat/Makefile
- src/router/Makefile
src/srfsh/Makefile
src/websocket-stdio/Makefile
tests/Makefile
AC_MSG_RESULT(APR headers location: ${APR_HEADERS})
AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION})
AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS})
+ AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS})
+ AC_MSG_RESULT(libfyaml headers location: ${FYAML_HEADERS})
AC_MSG_RESULT([----------------------------------------------------------------------])
#define OSRF_STATUS_INTERNALSERVERERROR 500
#define OSRF_STATUS_NOTIMPLEMENTED 501
+#define OSRF_STATUS_SERVICEUNAVAILABLE 503
#define OSRF_STATUS_VERSIONNOTSUPPORTED 505
void osrfSystemSetPidFile( const char* name );
-int osrf_system_bootstrap_client( char* config_file, char* contextnode );
+int osrf_system_bootstrap_common(const char* config_file,
+ const char* contextnode, const char* appname, int is_service);
-int osrfSystemBootstrapClientResc( const char* config_file,
- const char* contextnode, const char* resource );
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode);
+
+int osrf_system_bootstrap_service(
+ const char* config_file, const char* contextnode, const char* appname);
+
+int osrfSystemBootstrapClientResc(const char* config_file,
+ const char* contextnode, const char* appname);
int osrfSystemBootstrap( const char* hostname, const char* configfile,
const char* contextNode );
#include <time.h>
#include <opensrf/transport_session.h>
+#include <opensrf/transport_connection.h>
#include <opensrf/utils.h>
#include <opensrf/log.h>
a Jabber ID for outgoing messages.
*/
struct transport_client_struct {
- transport_message* msg_q_head; /**< Head of message queue */
- transport_message* msg_q_tail; /**< Tail of message queue */
- transport_session* session; /**< Manages lower-level message processing */
+ char* primary_domain;
+ char* service; // NULL if this is a standalone client.
+ char* service_address; // NULL if this is a standalone client.
+ osrfHash* connections;
+
+ int port;
+ char* username;
+ char* password;
+ transport_con* primary_connection;
+
int error; /**< Boolean: true if an error has occurred */
- char* host; /**< Domain name or IP address of the Jabber server */
- char* xmpp_id; /**< Jabber ID used for outgoing messages */
};
typedef struct transport_client_struct transport_client;
-transport_client* client_init( const char* server, int port, const char* unix_path, int component );
+transport_client* client_init(const char* server,
+ int port, const char* username, const char* password);
-int client_connect( transport_client* client,
- const char* username, const char* password, const char* resource,
- int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type );
+int client_connect_as_service(transport_client* client, const char* service);
+int client_connect(transport_client* client);
int client_disconnect( transport_client* client );
int client_discard( transport_client* client );
int client_send_message( transport_client* client, transport_message* msg );
+int client_send_message_to(
+ transport_client* client, transport_message* msg, const char* recipient);
int client_connected( const transport_client* client );
-transport_message* client_recv( transport_client* client, int timeout );
+transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream);
+transport_message* client_recv(transport_client* client, int timeout);
+transport_message* client_recv_for_service(transport_client* client, int timeout);
int client_sock_fd( transport_client* client );
--- /dev/null
+#ifndef TRANSPORT_CONNECTION_H
+#define TRANSPORT_CONNECTION_H
+
+/**
+ @file transport_client.h
+ @brief Header for implementation of transport_client.
+
+ The transport_client routines provide an interface for sending and receiving Jabber
+ messages, one at a time.
+*/
+
+#include <hiredis.h>
+#include <opensrf/utils.h>
+#include <opensrf/log.h>
+
+#ifdef __cplusplus
+extern "C" {
+#endif
+
+struct transport_con_struct {
+ char* address;
+ char* domain;
+ int max_queue;
+ redisContext* bus;
+};
+typedef struct transport_con_struct transport_con;
+
+struct transport_con_msg_struct {
+ char* msg_id;
+ char* msg_json;
+};
+typedef struct transport_con_msg_struct transport_con_msg;
+
+transport_con* transport_con_new(const char* domain);
+
+void transport_con_free(transport_con* con);
+void transport_con_msg_free(transport_con_msg* msg);
+
+int transport_con_connected(transport_con* con);
+
+void transport_con_set_address(transport_con* con, const char* service);
+
+int transport_con_connect(transport_con* con,
+ int port, const char* username, const char* password);
+
+int transport_con_disconnect(transport_con* con);
+
+int transport_con_send(transport_con* con, const char* msg_json, const char* stream);
+
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream);
+
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream);
+
+void transport_con_flush_socket(transport_con* con);
+
+int handle_redis_error(redisReply *reply, const char* command, ...);
+
+int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok);
+
+
+#ifdef __cplusplus
+}
+#endif
+
+#endif
int error_code; /**< Value of the "code" attribute of <error>. */
int broadcast; /**< Value of the "broadcast" attribute in the message element. */
char* msg_xml; /**< The entire message as XML, complete with entity encoding. */
+ char* msg_json; /**< The entire message as JSON*/
struct transport_message_struct* next;
};
typedef struct transport_message_struct transport_message;
const char* thread, const char* recipient, const char* sender );
transport_message* new_message_from_xml( const char* msg_xml );
+transport_message* new_message_from_json( const char* msg_json );
void message_set_router_info( transport_message* msg, const char* router_from,
const char* router_to, const char* router_class, const char* router_command,
void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
int message_prepare_xml( transport_message* msg );
+int message_prepare_json( transport_message* msg );
int message_free( transport_message* msg );
endif
if BUILDCORE
-MAYBE_CORE = libopensrf c-apps router srfsh gateway perl websocket-stdio
+MAYBE_CORE = libopensrf c-apps srfsh gateway perl websocket-stdio
dist_bin_SCRIPTS = @top_srcdir@/bin/opensrf-perl.pl
bin_SCRIPTS = @top_srcdir@/bin/osrf_config
dist_sysconf_DATA = @top_srcdir@/examples/opensrf.xml.example @top_srcdir@/examples/opensrf_core.xml.example @top_srcdir@/examples/srfsh.xml.example
EXTRA_DIST = @srcdir@/apachetools.c @srcdir@/apachetools.h \
@srcdir@/osrf_json_gateway.c @srcdir@/osrf_http_translator.c
-AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
+AM_CFLAGS = -D_LARGEFILE64_SOURCE $(HAVE_APACHE_MIN_24) -Wall -I@abs_top_srcdir@/include/ -I$(HIREDIS_HEADERS) -I$(LIBXML2_HEADERS) -I$(APACHE2_HEADERS) -I$(APR_HEADERS)
AM_LDFLAGS = -L$(LIBDIR) -L@top_builddir@/src/libopensrf
AP_LIBEXECDIR = `$(APXS2) -q LIBEXECDIR`
AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS
AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir)
-LDADD = -lopensrf
+LDADD = -lopensrf -lfyaml
DISTCLEANFILES = Makefile.in Makefile
osrfConfig.c \
osrf_application.c \
osrf_cache.c \
- osrf_transgroup.c \
osrf_list.c \
osrf_hash.c \
osrf_utf8.c \
transport_message.c\
transport_session.c\
transport_client.c\
+ transport_connection.c\
md5.c\
log.c\
utils.c\
TARGS_HEADS = $(OSRF_INC)/transport_message.h \
$(OSRF_INC)/transport_session.h \
$(OSRF_INC)/transport_client.h \
+ $(OSRF_INC)/transport_connection.h \
$(OSRF_INC)/osrf_message.h \
$(OSRF_INC)/osrf_app_session.h \
$(OSRF_INC)/osrf_stack.h \
return NULL;
}
- // Get a list of domain names from the config settings;
- // ignore all but the first one in the list.
- osrfStringArray* arr = osrfNewStringArray(8);
- osrfConfigGetValueList(NULL, arr, "/domain");
- const char* domain = osrfStringArrayGetString(arr, 0);
- if (!domain) {
- osrfLogWarning( OSRF_LOG_MARK, "No domains specified in the OpenSRF config file");
- free( session );
- osrfStringArrayFree(arr);
- return NULL;
- }
-
- // Get a router name from the config settings.
- char* router_name = osrfConfigGetValue(NULL, "/router_name");
- if (!router_name) {
- osrfLogWarning( OSRF_LOG_MARK, "No router name specified in the OpenSRF config file");
- free( session );
- osrfStringArrayFree(arr);
- return NULL;
- }
-
- char target_buf[512];
- target_buf[ 0 ] = '\0';
+ growing_buffer *buf = buffer_init(32);
+ buffer_add(buf, "opensrf:service:");
+ buffer_add(buf, remote_service);
- // Using the router name, domain, and service name,
- // build a Jabber ID for addressing the service.
- int len = snprintf( target_buf, sizeof(target_buf), "%s@%s/%s",
- router_name ? router_name : "(null)",
- domain ? domain : "(null)",
- remote_service ? remote_service : "(null)" );
- osrfStringArrayFree(arr);
- free(router_name);
-
- if( len >= sizeof( target_buf ) ) {
- osrfLogWarning( OSRF_LOG_MARK, "Buffer overflow for remote_id");
- free( session );
- return NULL;
- }
-
- session->remote_id = strdup(target_buf);
+ session->remote_id = buffer_release(buf);
session->orig_remote_id = strdup(session->remote_id);
session->remote_service = strdup(remote_service);
session->session_locale = NULL;
about it.
*/
int osrfSendTransportPayload( osrfAppSession* session, const char* payload ) {
+
transport_message* t_msg = message_init(
payload, "", session->session_id, session->remote_id, NULL );
message_set_osrf_xid( t_msg, osrfLogGetXid() );
- int retval = client_send_message( session->transport_handle, t_msg );
+ // When sending a message to a top-level service address, retain the
+ // message recipient, but deliver the message to the router instead.
+ char buf[1024 + 1];
+ char* recipient = session->remote_id;
+ if (strstr(recipient, "opensrf:service:")) {
+ snprintf(buf, 1024, "opensrf:router:%s", session->transport_handle->primary_domain);
+ recipient = buf;
+ }
+
+ int retval = client_send_message_to(session->transport_handle, t_msg, recipient);
if( retval ) {
osrfLogError( OSRF_LOG_MARK, "client_send_message failed, exit()ing immediately" );
exit(99);
free( max_children );
/* --------------------------------------------------- */
- char* resc = va_list_to_string( "%s_listener", appname );
-
// Make sure that we haven't already booted
- if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+ if (!osrf_system_bootstrap_common(NULL, "service", appname, 1)) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
- free( resc );
return -1;
}
- free( resc );
-
prefork_simple forker;
if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
transport_client* client = osrfSystemGetTransportClient();
// Construct the Jabber address of the router
- char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
+ char* jid = va_list_to_string( "opensrf:router:%s", routerDomain );
// Create the registration message, and send it
transport_message* msg;
if (unregister) {
osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
- msg = message_init( "unregistering", NULL, NULL, jid, NULL );
+ msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL );
message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
} else {
osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
- msg = message_init( "registering", NULL, NULL, jid, NULL );
+ msg = message_init( "\"registering\"", NULL, NULL, jid, NULL );
message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
}
// Connect to cache server(s).
osrfSystemInitCache();
- char* resc = va_list_to_string( "%s_drone", child->appname );
// If we're a source-client, tell the logger now that we're a new process.
char* isclient = osrfConfigGetValue( NULL, "/client" );
// TODO: not necessary if parent disconnects first
osrfSystemIgnoreTransportClient();
- // Connect to Jabber
- if( !osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
+ // Connect to the message bus
+ if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()" );
- free( resc );
return -1;
}
- free( resc );
-
// Dynamically call the application-specific initialization function
// from a previously loaded shared library.
if( ! osrfAppRunChildInit( child->appname )) {
transport_client* client = osrfSystemGetTransportClient();
+ osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client);
+
// Make sure that we're still connected to Jabber; reconnect if necessary.
if( !client_connected( client )) {
osrfSystemIgnoreTransportClient();
osrfLogWarning( OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect..." );
- if( !osrf_system_bootstrap_client( NULL, NULL )) {
+ if (!osrf_system_bootstrap_common(NULL, "service", child->appname, 0)) {
osrfLogError( OSRF_LOG_MARK,
"Unable to bootstrap client in prefork_child_process_request()" );
sleep( 1 );
}
}
- // Construct the message from the xml.
- transport_message* msg = new_message_from_xml( data );
+ // Construct the message from the json
+ transport_message* msg = new_message_from_json( data );
// Respond to the transport message. This is where method calls are buried.
osrfAppSession* session = osrf_stack_transport_handler( msg, child->appname );
// Wait indefinitely for an input message
osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
- cur_msg = client_recv( forker->connection, -1 );
+ cur_msg = client_recv_for_service( forker->connection, -1 );
if( cur_msg == NULL ) {
// most likely a signal was received. clean up any recently
continue;
}
- message_prepare_xml( cur_msg );
- const char* msg_data = cur_msg->msg_xml;
+ message_prepare_json( cur_msg );
+ const char* msg_data = cur_msg->msg_json;
if( ! msg_data || ! *msg_data ) {
osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
(msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
A thin wrapper for osrfSystemBootstrapClientResc, passing it NULL for a resource.
*/
-int osrf_system_bootstrap_client( char* config_file, char* contextnode ) {
- return osrfSystemBootstrapClientResc(config_file, contextnode, NULL);
+
+int osrf_system_bootstrap_client(const char* config_file, const char* contextnode) {
+ return osrf_system_bootstrap_common(config_file, contextnode, "client", 0);
}
/**
const char* action, const char* service) {
// Load the conguration, open the log, open a connection to Jabber
- if (!osrfSystemBootstrapClientResc(config, context, "c_launcher")) {
+ if (!osrf_system_bootstrap_common(config, context, "client", 0)) {
osrfLogError(OSRF_LOG_MARK,
"Unable to bootstrap for host %s from configuration file %s",
hostname, config);
- Open the log.
- Open a connection to Jabber.
*/
-int osrfSystemBootstrapClientResc( const char* config_file,
- const char* contextnode, const char* resource ) {
+int osrfSystemBootstrapClientResc(const char* config_file,
+ const char* contextnode, const char* appname) {
+ return osrf_system_bootstrap_common(config_file, contextnode, appname, 0);
+}
+
+int osrf_system_bootstrap_common(const char* config_file,
+ const char* contextnode, const char* appname, int is_service) {
+
+ if (contextnode == NULL) {
+ osrfLogError(OSRF_LOG_MARK,
+ "osrf_system_bootstrap_common() requires a connection type");
+ return -1;
+ }
int failure = 0;
}
if( config_file ) {
- osrfConfig* cfg = osrfConfigInit( config_file, contextnode );
+ osrfConfig* cfg = osrfConfigInit(config_file, contextnode);
if(cfg)
osrfConfigSetDefaultConfig(cfg);
else
// fetch list of configured log redaction marker strings
log_protect_arr = osrfNewStringArray(8);
- osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
- osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
+ osrfConfig* cfg_shared = osrfConfigInit(config_file, "shared");
+ osrfConfigGetValueList( cfg_shared, log_protect_arr, "/log_protect/match_string" );
}
- char* log_file = osrfConfigGetValue( NULL, "/logfile");
- if(!log_file) {
+
+ char* log_file = osrfConfigGetValue( NULL, "/logfile");
+ if (!log_file) {
fprintf(stderr, "No log file specified in configuration file %s\n",
- config_file);
+ config_file);
return -1;
}
char* username = osrfConfigGetValue( NULL, "/username" );
char* password = osrfConfigGetValue( NULL, "/passwd" );
char* port = osrfConfigGetValue( NULL, "/port" );
- char* unixpath = osrfConfigGetValue( NULL, "/unixpath" );
char* facility = osrfConfigGetValue( NULL, "/syslog" );
char* actlog = osrfConfigGetValue( NULL, "/actlog" );
char* logtag = osrfConfigGetValue( NULL, "/logtag" );
/* if we're a source-client, tell the logger */
- char* isclient = osrfConfigGetValue(NULL, "/client");
+ char* isclient = osrfConfigGetValue(NULL, "/client");
if( isclient && !strcasecmp(isclient,"true") )
osrfLogSetIsClient(1);
free(isclient);
int llevel = 0;
int iport = 0;
- if(port) iport = atoi(port);
- if(log_level) llevel = atoi(log_level);
+ if (port) iport = atoi(port);
+ if (log_level) llevel = atoi(log_level);
if(!strcmp(log_file, "syslog")) {
if(logtag) osrfLogSetLogTag(logtag);
- osrfLogInit( OSRF_LOG_TYPE_SYSLOG, contextnode, llevel );
+ osrfLogInit( OSRF_LOG_TYPE_SYSLOG, appname, llevel );
osrfLogSetSyslogFacility(osrfLogFacilityToInt(facility));
if(actlog) osrfLogSetSyslogActFacility(osrfLogFacilityToInt(actlog));
} else {
- osrfLogInit( OSRF_LOG_TYPE_FILE, contextnode, llevel );
+ osrfLogInit( OSRF_LOG_TYPE_FILE, appname, llevel );
osrfLogSetFile( log_file );
}
-
/* Get a domain, if one is specified */
const char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
- if(!domain) {
+ if (!domain) {
fprintf(stderr, "No domain specified in configuration file %s\n", config_file);
- osrfLogError( OSRF_LOG_MARK, "No domain specified in configuration file %s\n",
- config_file );
+ osrfLogError(OSRF_LOG_MARK,
+ "No domain specified in configuration file %s\n", config_file );
failure = 1;
}
failure = 1;
}
- if((iport <= 0) && !unixpath) {
- fprintf(stderr, "No unixpath or valid port in configuration file %s\n", config_file);
- osrfLogError( OSRF_LOG_MARK, "No unixpath or valid port in configuration file %s\n",
- config_file);
- failure = 1;
- }
-
if (failure) {
- osrfStringArrayFree(arr);
free(log_file);
free(log_level);
free(username);
free(password);
free(port);
- free(unixpath);
free(facility);
free(actlog);
free(logtag);
return 0;
}
- osrfLogInfo( OSRF_LOG_MARK, "Bootstrapping system with domain %s, port %d, and unixpath %s",
- domain, iport, unixpath ? unixpath : "(none)" );
- transport_client* client = client_init( domain, iport, unixpath, 0 );
-
- char host[HOST_NAME_MAX + 1] = "";
- gethostname(host, sizeof(host) );
- host[HOST_NAME_MAX] = '\0';
+ osrfLogInfo(OSRF_LOG_MARK,
+ "Bootstrapping system with domain %s, port %d", domain, iport);
- char tbuf[32];
- tbuf[0] = '\0';
- snprintf(tbuf, 32, "%f", get_timestamp_millis());
+ transport_client* client = client_init(domain, iport, username, password);
- if(!resource) resource = "";
-
- int len = strlen(resource) + 256;
- char buf[len];
- buf[0] = '\0';
- snprintf(buf, len - 1, "%s_%s_%s_%ld", resource, host, tbuf, (long) getpid() );
-
- if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) {
- osrfGlobalTransportClient = client;
- }
+ if (is_service) {
+ if (client_connect_as_service(client, appname)) {
+ osrfGlobalTransportClient = client;
+ }
+ } else {
+ if (client_connect(client)) {
+ osrfGlobalTransportClient = client;
+ }
+ }
- osrfStringArrayFree(arr);
free(actlog);
free(facility);
free(log_level);
free(username);
free(password);
free(port);
- free(unixpath);
if(osrfGlobalTransportClient)
return 1;
#include <opensrf/transport_client.h>
-/**
- @file transport_client.c
- @brief Collection of routines for sending and receiving single messages over Jabber.
+transport_client* client_init(const char* domain,
+ int port, const char* username, const char* password) {
- These functions form an API built on top of the transport_session API. They serve
- two main purposes:
- - They remember a Jabber ID to use when sending messages.
- - They maintain a queue of input messages that the calling code can get one at a time.
-*/
+ osrfLogInfo(OSRF_LOG_MARK,
+ "TCLIENT client_init domain=%s port=%d username=%s", domain, port, username);
+
+ transport_client* client = safe_malloc(sizeof(transport_client));
+ client->primary_domain = strdup(domain);
+ client->connections = osrfNewHash();
+
+ // These 2 only get values if this client works for a service.
+ client->service = NULL;
+ client->service_address = NULL;
-static void client_message_handler( void* client, transport_message* msg );
+ client->username = username ? strdup(username) : NULL;
+ client->password = password ? strdup(password) : NULL;
-//int main( int argc, char** argv );
+ client->port = port;
+ client->primary_connection = NULL;
-/*
-int main( int argc, char** argv ) {
+ client->error = 0;
- transport_message* recv;
- transport_message* send;
+ return client;
+}
- transport_client* client = client_init( "spacely.georgialibraries.org", 5222 );
+static transport_con* client_connect_common(
+ transport_client* client, const char* domain) {
- // try to connect, allow 15 second connect timeout
- if( client_connect( client, "admin", "asdfjkjk", "system", 15 ) ) {
- printf("Connected...\n");
- } else {
- printf( "NOT Connected...\n" ); exit(99);
- }
+ osrfLogInfo(OSRF_LOG_MARK, "TCLIENT Connecting to domain: %s", domain);
- while( (recv = client_recv( client, -1 )) ) {
+ transport_con* con = transport_con_new(domain);
- if( recv->body ) {
- int len = strlen(recv->body);
- char buf[len + 20];
- osrf_clearbuf( buf, 0, sizeof(buf));
- sprintf( buf, "Echoing...%s", recv->body );
- send = message_init( buf, "Echoing Stuff", "12345", recv->sender, "" );
- } else {
- send = message_init( " * ECHOING * ", "Echoing Stuff", "12345", recv->sender, "" );
- }
+ osrfHashSet(client->connections, (void*) con, (char*) domain);
- if( send == NULL ) { printf("something's wrong"); }
- client_send_message( client, send );
+ return con;
+}
- message_free( send );
- message_free( recv );
- }
- printf( "ended recv loop\n" );
+static transport_con* get_transport_con(transport_client* client, const char* domain) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT get_transport_con() domain=%s", domain);
- return 0;
+ transport_con* con = (transport_con*) osrfHashGet(client->connections, (char*) domain);
+ if (con != NULL) { return con; }
+
+ // If we don't have the a connection for the requested domain,
+ // it means we're setting up a connection to a remote domain.
+
+ con = client_connect_common(client, domain);
+
+ transport_con_set_address(con, NULL);
+
+ // Connections to remote domains assume the same connection
+ // attributes apply.
+ transport_con_connect(con, client->port, client->username, client->password);
+
+ return con;
}
-*/
+int client_connect_as_service(transport_client* client, const char* service) {
+ osrfLogInternal(OSRF_LOG_MARK,
+ "TCLIENT client_connect_as_service() service=%s", service);
-/**
- @brief Allocate and initialize a transport_client.
- @param server Domain name where the Jabber server resides.
- @param port Port used for connecting to Jabber (0 if using UNIX domain socket).
- @param unix_path Name of Jabber's socket in file system (if using UNIX domain socket).
- @param component Boolean; true if we're a Jabber component.
- @return A pointer to a newly created transport_client.
-
- Create a transport_client with a transport_session and an empty message queue (but don't
- open a connection yet). Install a callback function in the transport_session to enqueue
- incoming messages.
-
- The calling code is responsible for freeing the transport_client by calling client_free().
-*/
-transport_client* client_init( const char* server, int port, const char* unix_path, int component ) {
+ growing_buffer* buf = buffer_init(32);
- if(server == NULL) return NULL;
+ buffer_fadd(buf, "opensrf:service:%s", service);
- /* build and clear the client object */
- transport_client* client = safe_malloc( sizeof( transport_client) );
+ client->service_address = buffer_release(buf);
+ client->service = strdup(service);
- /* start with an empty message queue */
- client->msg_q_head = NULL;
- client->msg_q_tail = NULL;
+ transport_con* con = client_connect_common(client, client->primary_domain);
- /* build the session */
- client->session = init_transport( server, port, unix_path, client, component );
+ transport_con_set_address(con, service);
- client->session->message_callback = client_message_handler;
- client->error = 0;
- client->host = strdup(server);
- client->xmpp_id = NULL;
+ client->primary_connection = con;
- return client;
+ transport_con_connect(
+ con, client->port, client->username, client->password);
+
+ // Make a stream for the service address
+ return transport_con_make_stream(con, client->service_address, 1);
}
+int client_connect(transport_client* client) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_connect()");
-/**
- @brief Open a Jabber session for a transport_client.
- @param client Pointer to the transport_client.
- @param username Jabber user name.
- @param password Password for the Jabber logon.
- @param resource Resource name for the Jabber logon.
- @param connect_timeout How many seconds to wait for the connection to open.
- @param auth_type An enum: either AUTH_PLAIN or AUTH_DIGEST (see notes).
- @return 1 if successful, or 0 upon error.
-
- Besides opening the Jabber session, create a Jabber ID for future use.
-
- If @a connect_timeout is -1, wait indefinitely for the Jabber server to respond. If
- @a connect_timeout is zero, don't wait at all. If @a timeout is positive, wait that
- number of seconds before timing out. If @a connect_timeout has a negative value other
- than -1, the results are not well defined.
-
- The value of @a connect_timeout applies to each of two stages in the logon procedure.
- Hence the logon may take up to twice the amount of time indicated.
-
- If we connect as a Jabber component, we send the password as an SHA1 hash. Otherwise
- we look at the @a auth_type. If it's AUTH_PLAIN, we send the password as plaintext; if
- it's AUTH_DIGEST, we send it as a hash.
- */
-int client_connect( transport_client* client,
- const char* username, const char* password, const char* resource,
- int connect_timeout, enum TRANSPORT_AUTH_TYPE auth_type ) {
- if( client == NULL )
- return 0;
-
- // Create and store a Jabber ID
- if( client->xmpp_id )
- free( client->xmpp_id );
- client->xmpp_id = va_list_to_string( "%s@%s/%s", username, client->host, resource );
-
- // Open a transport_session
- return session_connect( client->session, username,
- password, resource, connect_timeout, auth_type );
+ transport_con* con = client_connect_common(client, client->primary_domain);
+
+ transport_con_set_address(con, NULL);
+
+ client->primary_connection = con;
+
+ return transport_con_connect(
+ con, client->port, client->username, client->password);
}
-/**
- @brief Disconnect from the Jabber session.
- @param client Pointer to the transport_client.
- @return 0 in all cases.
+// Disconnect all connections and remove them from the connections hash.
+int client_disconnect(transport_client* client) {
- If there are any messages still in the queue, they stay there; i.e. we don't free them here.
-*/
-int client_disconnect( transport_client* client ) {
- if( client == NULL ) { return 0; }
- return session_disconnect( client->session );
+ osrfLogDebug(OSRF_LOG_MARK, "TCLIENT Disconnecting all transport connections");
+
+ osrfHashIterator* iter = osrfNewHashIterator(client->connections);
+
+ transport_con* con;
+
+ while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Disconnecting from domain: %s", con->domain);
+ transport_con_disconnect(con);
+ transport_con_free(con);
+ }
+
+ osrfHashIteratorFree(iter);
+ osrfHashFree(client->connections);
+
+ client->connections = osrfNewHash();
+
+ return 1;
}
-/**
- @brief Report whether a transport_client is connected.
- @param client Pointer to the transport_client.
- @return Boolean: 1 if connected, or 0 if not.
-*/
int client_connected( const transport_client* client ) {
- if(client == NULL) return 0;
- return session_connected( client->session );
+ return (client != NULL && client->primary_connection != NULL);
}
-/**
- @brief Send a transport message to the current destination.
- @param client Pointer to a transport_client.
- @param msg Pointer to the transport_message to be sent.
- @return 0 if successful, or -1 if not.
+static char* get_domain_from_address(const char* address) {
+ osrfLogInternal(OSRF_LOG_MARK,
+ "TCLIENT get_domain_from_address() address=%s", address);
- Translate the transport_message into XML and send it to Jabber, using the previously
- stored Jabber ID for the sender.
-*/
-int client_send_message( transport_client* client, transport_message* msg ) {
- if( client == NULL || client->error )
- return -1;
- if( msg->sender )
- free( msg->sender );
- msg->sender = strdup(client->xmpp_id);
- return session_send_msg( client->session, msg );
+ char* addr_copy = strdup(address);
+ strtok(addr_copy, ":"); // "opensrf:"
+ strtok(NULL, ":"); // "client:"
+ char* domain = strtok(NULL, ":");
+
+ if (domain) {
+ // About to free addr_copy...
+ domain = strdup(domain);
+ } else {
+ osrfLogError(OSRF_LOG_MARK, "No domain parsed from address: %s", address);
+ }
+
+ free(addr_copy);
+
+ return domain;
}
-/**
- @brief Fetch an input message, if one is available.
- @param client Pointer to a transport_client.
- @param timeout How long to wait for a message to arrive, in seconds (see remarks).
- @return A pointer to a transport_message if successful, or NULL if not.
+int client_send_message(transport_client* client, transport_message* msg) {
+ return client_send_message_to(client, msg, msg->recipient) ;
+}
- If there is a message already in the queue, return it immediately. Otherwise read any
- available messages from the transport_session (subject to a timeout), and return the
- first one.
+int client_send_message_to(transport_client* client, transport_message* msg, const char* recipient) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_send_message()");
- If the value of @a timeout is -1, then there is no time limit -- wait indefinitely until a
- message arrives (or we error out for other reasons). If the value of @a timeout is zero,
- don't wait at all.
+ if (client == NULL || client->error) { return -1; }
- The calling code is responsible for freeing the transport_message by calling message_free().
-*/
-transport_message* client_recv( transport_client* client, int timeout ) {
- if( client == NULL ) { return NULL; }
+ transport_con* con;
- int error = 0; /* boolean */
+ if (strstr(recipient, "opensrf:client")) {
+ // We may be talking to a worker that runs on a remote domain.
+ // Find or create a connection to the domain.
- if( NULL == client->msg_q_head ) {
+ char* domain = get_domain_from_address(recipient);
- // No message available on the queue? Try to get a fresh one.
+ if (!domain) { return -1; }
- // When we call session_wait(), it reads a socket for new messages. When it finds
- // one, it enqueues it by calling the callback function client_message_handler(),
- // which we installed in the transport_session when we created the transport_client.
+ con = get_transport_con(client, domain);
- // Since a single call to session_wait() may not result in the receipt of a complete
- // message. we call it repeatedly until we get either a message or an error.
+ if (!con) {
+ osrfLogError(
+ OSRF_LOG_MARK, "Error creating connection for domain: %s", domain);
- // Alternatively, a single call to session_wait() may result in the receipt of
- // multiple messages. That's why we have to enqueue them.
+ return -1;
+ }
- // The timeout applies to the receipt of a complete message. For a sufficiently
- // short timeout, a sufficiently long message, and a sufficiently slow connection,
- // we could timeout on the first message even though we're still receiving data.
-
- // Likewise we could time out while still receiving the second or subsequent message,
- // return the first message, and resume receiving messages later.
+ } else {
+ con = client->primary_connection;
+ }
+
+ if (msg->sender) { free(msg->sender); }
+ msg->sender = strdup(con->address);
- if( timeout == -1 ) { /* wait potentially forever for data to arrive */
+ message_prepare_json(msg);
- int x;
- do {
- if( (x = session_wait( client->session, -1 )) ) {
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
- error = 1;
- break;
- }
- } while( client->msg_q_head == NULL );
+ osrfLogInternal(OSRF_LOG_MARK,
+ "client_send_message() to=%s %s", recipient, msg->msg_json);
+
+ return transport_con_send(con, msg->msg_json, recipient);
+
+ osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
+
+ return 0;
+}
- } else { /* loop up to 'timeout' seconds waiting for data to arrive */
+transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
- /* This loop assumes that a time_t is denominated in seconds -- not */
- /* guaranteed by Standard C, but a fair bet for Linux or UNIX */
+ osrfLogInternal(OSRF_LOG_MARK,
+ "TCLIENT client_recv_stream() timeout=%d stream=%s", timeout, stream);
- time_t start = time(NULL);
- time_t remaining = (time_t) timeout;
+ transport_con_msg* con_msg =
+ transport_con_recv(client->primary_connection, timeout, stream);
- int wait_ret;
- do {
- if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
- error = 1;
- osrfLogDebug(OSRF_LOG_MARK,
- "session_wait returned failure code %d: setting error=1\n", wait_ret);
- break;
- }
+ if (con_msg == NULL) { return NULL; } // Receive timed out.
- remaining -= time(NULL) - start;
- } while( NULL == client->msg_q_head && remaining > 0 );
- }
- }
+ transport_message* msg = new_message_from_json(con_msg->msg_json);
- transport_message* msg = NULL;
+ transport_con_msg_free(con_msg);
- if( !error && client->msg_q_head != NULL ) {
- /* got message(s); dequeue the oldest one */
- msg = client->msg_q_head;
- client->msg_q_head = msg->next;
- msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */
- if( NULL == client->msg_q_head )
- client->msg_q_tail = NULL;
- }
+ osrfLogInternal(OSRF_LOG_MARK,
+ "client_recv() read response for thread %s", msg->thread);
return msg;
}
-/**
- @brief Enqueue a newly received transport_message.
- @param client A pointer to a transport_client, cast to a void pointer.
- @param msg A new transport message.
-
- Add a newly arrived input message to the tail of the queue.
+transport_message* client_recv(transport_client* client, int timeout) {
- This is a callback function. The transport_session parses the XML coming in through a
- socket, accumulating various bits and pieces. When it sees the end of a message stanza,
- it packages the bits and pieces into a transport_message that it passes to this function,
- which enqueues the message for processing.
-*/
-static void client_message_handler( void* client, transport_message* msg ){
+ return client_recv_stream(client, timeout, client->primary_connection->address);
+}
- if(client == NULL) return;
- if(msg == NULL) return;
+transport_message* client_recv_for_service(transport_client* client, int timeout) {
- transport_client* cli = (transport_client*) client;
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT Receiving for service %s", client->service);
- /* add the new message to the tail of the queue */
- if( NULL == cli->msg_q_head )
- cli->msg_q_tail = cli->msg_q_head = msg;
- else {
- cli->msg_q_tail->next = msg;
- cli->msg_q_tail = msg;
- }
- msg->next = NULL;
+ return client_recv_stream(client, timeout, client->service_address);
}
-
/**
@brief Free a transport_client, along with all resources it owns.
@param client Pointer to the transport_client to be freed.
@return 1 if successful, or 0 if not. The only error condition is if @a client is NULL.
*/
int client_free( transport_client* client ) {
- if(client == NULL)
- return 0;
- session_free( client->session );
- client->session = NULL;
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_free()");
+ if (client == NULL) { return 0; }
return client_discard( client );
}
disconnect the parent as well.
*/
int client_discard( transport_client* client ) {
- if(client == NULL)
- return 0;
-
- transport_message* current = client->msg_q_head;
- transport_message* next;
-
- /* deallocate the list of messages */
- while( current != NULL ) {
- next = current->next;
- message_free( current );
- current = next;
- }
-
- free(client->host);
- free(client->xmpp_id);
- free( client );
+ osrfLogInternal(OSRF_LOG_MARK, "TCLIENT client_discard()");
+
+ if (client == NULL) { return 0; }
+
+ osrfLogInternal(OSRF_LOG_MARK,
+ "Discarding client on domain %s", client->primary_domain);
+
+ if (client->primary_domain) { free(client->primary_domain); }
+ if (client->service) { free(client->service); }
+ if (client->service_address) { free(client->service_address); }
+ if (client->username) { free(client->username); }
+ if (client->password) { free(client->password); }
+
+ // Clean up our connections.
+ // We do not disconnect here since they caller may or may
+ // not want the socket closed.
+ // If disconnect() was just called, the connections hash
+ // will be empty.
+ osrfHashIterator* iter = osrfNewHashIterator(client->connections);
+
+ transport_con* con;
+
+ while( (con = (transport_con*) osrfHashIteratorNext(iter)) ) {
+ osrfLogInternal(OSRF_LOG_MARK,
+ "client_discard() freeing connection for %s", con->domain);
+ transport_con_free(con);
+ }
+
+ osrfHashIteratorFree(iter);
+ osrfHashFree(client->connections);
+
+ free(client);
+
return 1;
}
-int client_sock_fd( transport_client* client )
-{
- if( !client )
- return 0;
- else
- return client->session->sock_id;
-}
--- /dev/null
+#include <opensrf/transport_connection.h>
+
+transport_con* transport_con_new(const char* domain) {
+
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_new() domain=%s", domain);
+
+ transport_con* con = safe_malloc(sizeof(transport_con));
+
+ con->bus = NULL;
+ con->address = NULL;
+ con->domain = strdup(domain);
+ con->max_queue = 1000; // TODO pull from config
+
+ osrfLogInternal(OSRF_LOG_MARK,
+ "TCON created transport connection with domain: %s", con->domain);
+
+ return con;
+}
+
+void transport_con_msg_free(transport_con_msg* msg) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()");
+
+ if (msg == NULL) { return; }
+
+ if (msg->msg_id) { free(msg->msg_id); }
+ if (msg->msg_json) { free(msg->msg_json); }
+
+ free(msg);
+}
+
+void transport_con_free(transport_con* con) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_free()");
+
+ osrfLogInternal(
+ OSRF_LOG_MARK, "Freeing transport connection for %s", con->domain);
+
+ if (con->bus) { free(con->bus); }
+ if (con->address) { free(con->address); }
+ if (con->domain) { free(con->domain); }
+
+ free(con);
+}
+
+int transport_con_connected(transport_con* con) {
+ return con->bus != NULL;
+}
+
+void transport_con_set_address(transport_con* con, const char* service) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_set_address()");
+
+ char hostname[1024];
+ hostname[1023] = '\0';
+ gethostname(hostname, 1023);
+
+ growing_buffer *buf = buffer_init(64);
+ buffer_fadd(buf, "opensrf:client:%s:%s:", con->domain, hostname);
+
+ if (service != NULL) {
+ buffer_fadd(buf, "%s:", service);
+ }
+
+ buffer_fadd(buf, "%ld", (long) getpid());
+
+ char junk[256];
+ snprintf(junk, sizeof(junk),
+ "%f%d", get_timestamp_millis(), (int) time(NULL));
+
+ char* md5 = md5sum(junk);
+
+ buffer_add(buf, ":");
+ buffer_add_n(buf, md5, 8);
+
+ con->address = buffer_release(buf);
+
+ osrfLogDebug(OSRF_LOG_MARK, "Connection set address to %s", con->address);
+}
+
+int transport_con_connect(
+ transport_con* con, int port, const char* username, const char* password) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_connect()");
+
+ osrfLogDebug(OSRF_LOG_MARK, "Transport con connecting with bus "
+ "domain=%s; address=%s; port=%d; username=%s",
+ con->domain,
+ con->address,
+ port,
+ username
+ );
+
+ con->bus = redisConnect(con->domain, port);
+
+ if (con->bus == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "Could not connect to Redis instance");
+ return 0;
+ }
+
+ osrfLogDebug(OSRF_LOG_MARK, "Connected to Redis instance OK");
+
+ redisReply *reply =
+ redisCommand(con->bus, "AUTH %s %s", username, password);
+
+ if (handle_redis_error(reply, "AUTH %s %s", username, password)) {
+ return 0;
+ }
+
+ freeReplyObject(reply);
+
+ return transport_con_make_stream(con, con->address, 0);
+}
+
+int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream);
+
+ redisReply *reply = redisCommand(
+ con->bus,
+ "XGROUP CREATE %s %s $ mkstream",
+ stream,
+ stream,
+ "$",
+ "mkstream"
+ );
+
+ // Produces an error when a group/stream already exists, but that's
+ // acceptible when creating a group/stream for a stop-level service
+ // address, since multiple Listeners are allowed.
+ if (handle_redis_error(reply,
+ "XGROUP CREATE %s %s $ mkstream",
+ stream,
+ stream,
+ "$",
+ "mkstream"
+ )) { return exists_ok; }
+
+ freeReplyObject(reply);
+
+ return 1;
+}
+
+int transport_con_disconnect(transport_con* con) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_disconnect()");
+
+ if (con == NULL || con->bus == NULL) { return -1; }
+
+ redisReply *reply = redisCommand(con->bus, "DEL %s", con->address);
+
+ if (!handle_redis_error(reply, "DEL %s", con->address)) {
+ freeReplyObject(reply);
+ }
+
+ redisFree(con->bus);
+ con->bus = NULL;
+
+ return 0;
+}
+
+int transport_con_send(transport_con* con, const char* msg_json, const char* stream) {
+
+ osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json);
+
+ redisReply *reply = redisCommand(con->bus,
+ "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+ stream,
+ con->max_queue,
+ msg_json
+ );
+
+ if (handle_redis_error(reply,
+ "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
+ stream, con->max_queue, msg_json)) {
+
+ return -1;
+ }
+
+ freeReplyObject(reply);
+
+ return 0;
+}
+
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) {
+ osrfLogInternal(OSRF_LOG_MARK,
+ "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream);
+
+ if (stream == NULL) { stream = con->address; }
+
+ redisReply *reply, *tmp;
+ char *msg_id = NULL, *json = NULL;
+
+ if (timeout == 0) {
+
+ reply = redisCommand(con->bus,
+ "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
+ stream, con->address, stream
+ );
+
+ } else {
+
+ if (timeout == -1) {
+ // Redis timeout 0 means block indefinitely
+ timeout = 0;
+ } else {
+ // Milliseconds
+ timeout *= 1000;
+ }
+
+ reply = redisCommand(con->bus,
+ "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
+ stream, con->address, timeout, stream
+ );
+ }
+
+ // Timeout or error
+ if (handle_redis_error(
+ reply,
+ "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >",
+ stream, con->address, "BLOCK X", stream
+ )) { return NULL; }
+
+ // Unpack the XREADGROUP response, which is a nest of arrays.
+ // These arrays are mostly 1 and 2-element lists, since we are
+ // only reading one item on a single stream.
+ if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
+ tmp = reply->element[0];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+ tmp = tmp->element[1];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
+ tmp = tmp->element[0];
+
+ if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
+ redisReply *r1 = tmp->element[0];
+ redisReply *r2 = tmp->element[1];
+
+ if (r1->type == REDIS_REPLY_STRING) {
+ msg_id = strdup(r1->str);
+ }
+
+ if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
+ // r2->element[0] is the message name, which we
+ // currently don't use for anything.
+
+ r2 = r2->element[1];
+
+ if (r2->type == REDIS_REPLY_STRING) {
+ json = strdup(r2->str);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ freeReplyObject(reply); // XREADGROUP
+
+ if (msg_id == NULL) {
+ // Read timed out. 'json' will also be NULL.
+ return NULL;
+ }
+
+ transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg));
+ tcon_msg->msg_id = msg_id;
+ tcon_msg->msg_json = json;
+
+ osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
+
+ return tcon_msg;
+}
+
+
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream) {
+ osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_recv() stream=%s", stream);
+
+ if (timeout == 0) {
+ return transport_con_recv_once(con, 0, stream);
+
+ } else if (timeout < 0) {
+ // Keep trying until we have a result.
+
+ while (1) {
+ transport_con_msg* msg = transport_con_recv_once(con, -1, stream);
+ if (msg != NULL) { return msg; }
+ }
+ }
+
+ time_t seconds = (time_t) timeout;
+
+ while (seconds > 0) {
+ // Keep trying until we get a response or our timeout is exhausted.
+
+ time_t now = time(NULL);
+ transport_con_msg* msg = transport_con_recv_once(con, timeout, stream);
+
+ if (msg == NULL) {
+ seconds -= now;
+ } else {
+ return msg;
+ }
+ }
+
+ return NULL;
+}
+
+void transport_con_flush_socket(transport_con* con) {
+}
+
+// Returns false/0 on success, true/1 on failure.
+// On error, the reply is freed.
+int handle_redis_error(redisReply *reply, const char* command, ...) {
+ VA_LIST_TO_STRING(command);
+
+ if (reply != NULL && reply->type != REDIS_REPLY_ERROR) {
+ osrfLogInternal(OSRF_LOG_MARK, "Redis Command: %s", VA_BUF);
+ return 0;
+ }
+
+ char* err = reply == NULL ? "" : reply->str;
+ osrfLogError(OSRF_LOG_MARK, "REDIS Error [%s] %s", err, VA_BUF);
+ freeReplyObject(reply);
+
+ return 1;
+}
#include <opensrf/transport_message.h>
+#include <opensrf/osrf_json.h>
/**
@file transport_message.c
msg->error_code = 0;
msg->broadcast = 0;
msg->msg_xml = NULL;
+ msg->msg_json = NULL;
msg->next = NULL;
return msg;
}
+transport_message* new_message_from_json(const char* msg_json) {
+
+ if (msg_json == NULL || *msg_json == '\0') { return NULL; }
+
+ transport_message* new_msg = safe_malloc(sizeof(transport_message));
+
+ new_msg->body = NULL;
+ new_msg->subject = NULL;
+ new_msg->thread = NULL;
+ new_msg->recipient = NULL;
+ new_msg->sender = NULL;
+ new_msg->router_from = NULL;
+ new_msg->router_to = NULL;
+ new_msg->router_class = NULL;
+ new_msg->router_command = NULL;
+ new_msg->osrf_xid = NULL;
+ new_msg->is_error = 0;
+ new_msg->error_type = NULL;
+ new_msg->error_code = 0;
+ new_msg->broadcast = 0;
+ new_msg->msg_xml = NULL;
+ new_msg->next = NULL;
+
+ jsonObject* json_hash = jsonParse(msg_json);
+
+ if (json_hash == NULL || json_hash->type != JSON_HASH) {
+ osrfLogError(OSRF_LOG_MARK, "new_message_from_json() received bad JSON");
+ jsonObjectFree(json_hash);
+ message_free(new_msg);
+ return NULL;
+ }
+
+ const char* sender = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "from"));
+ if (sender) { new_msg->sender = strdup((const char*) sender); }
+
+ const char* recipient = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "to"));
+ if (recipient) { new_msg->recipient = strdup((const char*) recipient); }
+
+ const char* thread = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "thread"));
+ if (thread == NULL) { thread = ""; }
+ new_msg->thread = strdup((const char*) thread);
+
+ const char* osrf_xid = jsonObjectGetString(jsonObjectGetKeyConst(json_hash, "osrf_xid"));
+ if (osrf_xid) { message_set_osrf_xid(new_msg, (char*) osrf_xid); }
+
+ // TODO
+ // Internally the mesage body is stored as a JSON string
+ // On the wire, it's just part of the message. We could get
+ // rid if this extra json encode/decode step if we treated
+ // the body as a JSON object internally.
+ const char* body = jsonObjectToJSON(jsonObjectGetKeyConst(json_hash, "body"));
+ if (body == NULL) { body = ""; }
+ new_msg->body = strdup((const char*) body);
+
+ jsonObjectFree(json_hash);
+
+ return new_msg;
+}
+
+
/**
@brief Translate an XML string into a transport_message.
free(msg->osrf_xid);
if( msg->error_type != NULL ) free(msg->error_type);
if( msg->msg_xml != NULL ) free(msg->msg_xml);
+ if( msg->msg_json != NULL ) free(msg->msg_json);
free(msg);
return 1;
}
+int message_prepare_json(transport_message* msg) {
+
+ if (!msg) { return 0; }
+ if (msg->msg_json) { return 1; } /* already done */
+
+ jsonObject* json_hash = jsonNewObject(NULL);
+ jsonObjectSetKey(json_hash, "to", jsonNewObject(msg->recipient));
+ jsonObjectSetKey(json_hash, "from", jsonNewObject(msg->sender));
+ jsonObjectSetKey(json_hash, "thread", jsonNewObject(msg->thread));
+ jsonObjectSetKey(json_hash, "osrf_xid", jsonNewObject(msg->osrf_xid));
+
+ if (msg->router_from) {
+ jsonObjectSetKey(json_hash, "router_from", jsonNewObject(msg->router_from));
+ }
+ if (msg->router_to) {
+ jsonObjectSetKey(json_hash, "router_to", jsonNewObject(msg->router_to));
+ }
+ if (msg->router_class) {
+ jsonObjectSetKey(json_hash, "router_class", jsonNewObject(msg->router_class));
+ }
+ if (msg->router_command) {
+ jsonObjectSetKey(json_hash, "router_command", jsonNewObject(msg->router_command));
+ }
+
+ // TODO the various layers expect the message body to be a separate
+ // JSON string, but on the bus, the body is just another key
+ // in the JSON object.
+ jsonObjectSetKey(json_hash, "body", jsonParse(msg->body));
+
+ msg->msg_json = jsonObjectToJSON(json_hash);
+
+ jsonObjectFree(json_hash);
+
+ return 1;
+}
+
+
/**
@brief Build a <message> element and store it as a string in the msg_xml member.
@param msg Pointer to a transport_message.
sub get_app_targets {
my $app = shift;
-
- my $conf = OpenSRF::Utils::Config->current;
- my $router_name = $conf->bootstrap->router_name || 'router';
- my $domain = $conf->bootstrap->domain;
- $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
- unless($router_name and $domain) {
- throw OpenSRF::EX::Config
- ("Missing router config information 'router_name' and 'domain'");
- }
-
- return ("$router_name\@$domain/$app");
+ return ("opensrf:service:$app");
}
sub stateless {
}
my $json = OpenSRF::Utils::JSON->perl2JSON(\@doc);
- $logger->internal("AppSession sending doc: $json");
- $self->{peer_handle}->send(
- to => $self->remote_id,
- thread => $self->session_id,
- body => $json );
+ my $recipient = $self->remote_id;
+
+ if ($self->endpoint == CLIENT and $self->state != CONNECTED) {
+ # Send new requests to our router
+ my $conf = OpenSRF::Utils::Config->current;
+ my $domain = $conf->bootstrap->domain;
+ $recipient = "opensrf:router:$domain";
+ }
+
+ $logger->internal("AppSession sending doc to=$recipient: $json");
+
+ $self->{peer_handle}->send_to(
+ $recipient,
+ to => $self->remote_id,
+ thread => $self->session_id,
+ body => $json
+ );
if( $disconnect) {
$self->state( DISCONNECTED );
use OpenSRF::Transport::PeerHandle;
use OpenSRF::Utils::SettingsClient;
use OpenSRF::Utils::Logger qw($logger);
-use OpenSRF::Transport::SlimJabber::Client;
+use OpenSRF::Transport::Redis::Client;
use Encode;
use POSIX qw/:sys_wait_h :errno_h/;
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
}
# ----------------------------------------------------------------
-# Waits on the jabber socket for inbound data from the router.
+# Waits on the redis socket for inbound data from the router.
# Each new message is passed off to a child process for handling.
# At regular intervals, wake up for min/max spare child maintenance
# ----------------------------------------------------------------
}
# ----------------------------------------------------------------
-# Jabber connection inbound message arrive on.
+# Redis connection inbound message arrive on.
# ----------------------------------------------------------------
sub build_osrf_handle {
my $self = shift;
-
- my $conf = OpenSRF::Utils::Config->current;
- my $username = $conf->bootstrap->username;
- my $password = $conf->bootstrap->passwd;
- my $domain = $conf->bootstrap->domain;
- my $port = $conf->bootstrap->port;
- my $resource = $self->{service} . '_listener_' . $conf->env->hostname;
-
- $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port");
-
$self->{osrf_handle} =
- OpenSRF::Transport::SlimJabber::Client->new(
- username => $username,
- resource => $resource,
- password => $password,
- host => $domain,
- port => $port,
- );
-
- $self->{osrf_handle}->initialize;
+ OpenSRF::Transport::Redis::Client->new($self->{service});
}
# ----------------------------------------------------------------
sub write_child {
my($self, $child, $msg) = @_;
- my $xml = encode_utf8(decode_utf8($msg->to_xml));
+ my $json = $msg->to_json;
# tell the child how much data to expect, minus the header
my $write_size;
- {use bytes; $write_size = length($xml)}
+ {use bytes; $write_size = length($json)}
$write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
for (0..2) {
# so the lack of a pid means the child is dead.
if (!$child->{pid}) {
$logger->error("server: child is dead in write_child(). ".
- "unable to send message: $xml");
+ "unable to send message: $json");
return; # avoid syswrite crash
}
# send message to child data pipe
- syswrite($child->{pipe_to_child}, $write_size . $xml);
+ syswrite($child->{pipe_to_child}, $write_size . $json);
last unless $self->{sig_pipe};
$logger->error("server: got SIGPIPE writing to $child, retrying...");
my $name = $router->{name};
my $domain = $router->{domain};
- push(@targets, "$name\@$domain/router");
+ push(@targets, "opensrf:router:$domain");
}
} else {
- push(@targets, "$router_name\@$router/router");
+ push(@targets, "opensrf:router:$router");
}
}
$logger->info("server: registering with router $_");
$self->{osrf_handle}->send(
to => $_,
- body => 'registering',
+ body => '"registering"',
router_command => 'register',
router_class => $self->{service}
);
$logger->info("server: disconnecting from router $router");
$self->{osrf_handle}->send(
to => $router,
- body => "unregistering",
+ body => '"unregistering"',
router_command => "unregister",
router_class => $self->{service}
);
use OpenSRF::Transport;
use OpenSRF::Application;
use OpenSRF::Transport::PeerHandle;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Transport::Redis::Message;
use OpenSRF::Utils::Logger qw($logger);
use OpenSRF::DomainObject::oilsResponse qw/:status/;
use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
}
# ----------------------------------------------------------------
-# Connects to Jabber and runs the application child_init
+# Connects to the bus and runs the application child_init
# ----------------------------------------------------------------
sub init {
my $self = shift;
my $session = OpenSRF::Transport->handler(
$self->{parent}->{service},
- OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data)
+ OpenSRF::Transport::Redis::Message->new(json => $data)
);
my $recycle = $self->keepalive_loop($session);
OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file);
OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash", strip => ['session']);
- OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper");
- OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection");
+ OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::Redis::PeerConnection");
OpenSRF::Application->server_class('client');
# Read in a shared portion of the config file
# for later use in log parameter redaction
$bootstrap_config_file =
$params{config_file} || $bootstrap_config_file;
- my $app = $params{client_name} || "client";
-
load_bootstrap_config();
OpenSRF::Utils::Logger::set_config();
- OpenSRF::Transport::PeerHandle->construct($app);
+ OpenSRF::Transport::PeerHandle->construct;
}
sub connected {
use OpenSRF::Utils::Logger qw(:level);
use OpenSRF::DomainObject::oilsResponse qw/:status/;
use OpenSRF::EX qw/:try/;
-use OpenSRF::Transport::SlimJabber::MessageWrapper;
#------------------
# --- These must be implemented by all Transport subclasses
our $message_envelope;
my $logger = "OpenSRF::Utils::Logger";
-
-
-=head2 message_envelope( [$envelope] );
-
-Sets the message envelope class that will allow us to extract
-information from the messages we receive from the low
-level transport
-
-=cut
-
-sub message_envelope {
- my( $class, $envelope ) = @_;
- if( $envelope ) {
- $message_envelope = $envelope;
- $envelope->use;
- if( $@ ) {
- $logger->error(
- "Error loading message_envelope: $envelope -> $@", ERROR);
- }
- }
- return $message_envelope;
-}
-
=head2 handler( $data )
-Creates a new MessageWrapper, extracts the remote_id, session_id, and message body
+Creates a new Message, extracts the remote_id, session_id, and message body
from the message. Then, creates or retrieves the AppSession object with the session_id and remote_id.
Finally, creates the message document from the body of the message and calls
the handler method on the message document.
sub handler {
my $start_time = time();
- my( $class, $service, $data ) = @_;
-
- $logger->transport( "Transport handler() received $data", INTERNAL );
+ my ($class, $service, $msg) = @_;
- my $remote_id = $data->from;
- my $sess_id = $data->thread;
- my $body = $data->body;
- my $type = $data->type;
+ my $remote_id = $msg->from;
+ my $sess_id = $msg->thread;
+ my $body = $msg->body;
+ my $type = $msg->type;
- $logger->set_osrf_xid($data->osrf_xid);
+ $logger->internal("Transport:handler() received message with thread: $sess_id");
+ $logger->set_osrf_xid($msg->osrf_xid);
if (defined($type) and $type eq 'error') {
throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
}
}
+ $logger->internal(
+ "Building app session with ses=$sess_id remote=$remote_id service=$service");
+
# Retrieve or build the app_session as appropriate (server_build decides which to do)
- $logger->transport( "AppSession is valid or does not exist yet", INTERNAL );
$app_session = OpenSRF::AppSession->server_build( $sess_id, $remote_id, $service );
if( ! $app_session ) {
--- /dev/null
+package OpenSRF::Transport::Redis::BusConnection;
+use strict;
+use warnings;
+use Redis;
+use Net::Domain qw/hostfqdn/;
+use OpenSRF::Utils::Logger qw/$logger/;
+
+# domain doubles as the host of the Redis instance.
+sub new {
+ my ($class, $domain, $port, $username, $password, $max_queue) = @_;
+
+ $logger->debug("Creating new bus connection $domain:$port user=$username");
+
+ my $self = {
+ domain => $domain || 'localhost',
+ port => $port || 6379,
+ username => $username,
+ password => $password,
+ max_queue => $max_queue
+ };
+
+ return bless($self, $class);
+}
+
+sub redis {
+ my $self = shift;
+ return $self->{redis};
+}
+
+sub connected {
+ my $self = shift;
+ return $self->redis ? 1 : 0;
+}
+
+sub domain {
+ my $self = shift;
+ return $self->{domain};
+}
+
+sub set_address {
+ my ($self) = @_;
+
+ my $address = sprintf(
+ "opensrf:client:%s:%s:$$:%s", $self->{domain}, hostfqdn(), int(rand(10_000)));
+
+ $self->{address} = $address;
+}
+
+sub address {
+ my $self = shift;
+ return $self->{address};
+}
+
+sub connect {
+ my $self = shift;
+
+ return 1 if $self->redis;
+
+ my $domain = $self->{domain};
+ my $port = $self->{port};
+ my $username = $self->{username};
+ my $password = $self->{password};
+ my $address = $self->{address};
+
+ $logger->debug("Redis client connecting: ".
+ "domain=$domain port=$port username=$username address=$address");
+
+ # On disconnect, try to reconnect every 100ms up to 60 seconds.
+ my @connect_args = (
+ server => "$domain:$port",
+ reconnect => 60,
+ every => 100_000
+ );
+
+ $logger->debug("Connecting to bus: @connect_args");
+
+ unless ($self->{redis} = Redis->new(@connect_args)) {
+ die "Could not connect to Redis bus with @connect_args\n";
+ }
+
+ unless ($self->redis->auth($username, $password) eq 'OK') {
+ die "Cannot authenticate with Redis instance user=$username\n";
+ }
+
+ $logger->debug("Auth'ed with Redis as $username OK : address=$address");
+
+ # Each bus connection has its own stream / group for receiving
+ # direct messages. These streams/groups should not pre-exist.
+
+ $self->redis->xgroup(
+ 'create',
+ $address,
+ $address,
+ '$', # only receive new messages
+ 'mkstream' # create this stream if it's not there.
+ );
+
+ return $self;
+}
+
+# Set del_stream to remove the stream and any attached consumer groups.
+sub disconnect {
+ my ($self, $del_stream) = @_;
+
+ return unless $self->redis;
+
+ $self->redis->del($self->address) if $del_stream;
+
+ $self->redis->quit;
+
+ delete $self->{redis};
+}
+
+sub send {
+ my ($self, $dest_stream, $msg_json) = @_;
+
+ $logger->internal("send(): to=$dest_stream : $msg_json");
+
+ my @params = (
+ $dest_stream,
+ 'NOMKSTREAM',
+ 'MAXLEN',
+ '~', # maxlen-ish
+ $self->{max_queue},
+ '*', # let Redis generate the ID
+ 'message', # gotta call it something
+ $msg_json
+ );
+
+ eval { $self->redis->xadd(@params) };
+
+ if ($@) {
+ $logger->error("XADD error: $@ : @params");
+ }
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+#
+# $dest_stream defaults to our bus address. Otherwise, it would be
+# the service-level address.
+sub recv {
+ my ($self, $timeout, $dest_stream) = @_;
+ $dest_stream ||= $self->address;
+
+ $logger->debug("Waiting for content at: $dest_stream");
+
+ my @block;
+ if ($timeout) {
+ # 0 means block indefinitely in Redis
+ $timeout = 0 if $timeout == -1;
+ $timeout *= 1000; # milliseconds
+ @block = (BLOCK => $timeout);
+ }
+
+ my @params = (
+ GROUP => $dest_stream,
+ $self->address,
+ COUNT => 1,
+ @block,
+ 'NOACK',
+ STREAMS => $dest_stream,
+ '>' # new messages only
+ );
+
+ my $packet;
+ eval {$packet = $self->redis->xreadgroup(@params) };
+
+ if ($@) {
+ $logger->error("Redis XREADGROUP error: $@ : @params");
+ return undef;
+ }
+
+ # Timed out waiting for data.
+ return undef unless defined $packet;
+
+ # TODO make this more self-documenting. also, too brittle?
+ # Also note at some point we may need to return info about the
+ # recipient stream to the caller in case we are listening
+ # on multiple streams.
+ my $container = $packet->[0]->[1]->[0];
+ my $msg_id = $container->[0];
+ my $json = $container->[1]->[1];
+
+ $logger->internal("recv() $json");
+
+ return {
+ msg_json => $json,
+ msg_id => $msg_id
+ };
+}
+
+sub flush_socket {
+ my $self = shift;
+ # Remove all messages from my address
+ $self->redis->xtrim($self->address, 'MAXLEN', 0);
+ return 1;
+}
+
+1;
+
+
--- /dev/null
+package OpenSRF::Transport::Redis::Client;
+use strict;
+use warnings;
+use Redis;
+use Time::HiRes q/time/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport;
+use OpenSRF::Transport::Redis::Message;
+use OpenSRF::Transport::Redis::BusConnection;
+
+# Map of bus domain names to bus connections.
+my %connections;
+
+# There will only be one Client per process, but each client may
+# have multiple connections.
+my $_singleton;
+sub retrieve { return $_singleton; }
+
+sub new {
+ my ($class, $service, $no_cache) = @_;
+
+ return $_singleton if $_singleton && !$no_cache;
+
+ my $self = {service => $service};
+
+ bless($self, $class);
+
+ my $conf = OpenSRF::Utils::Config->current;
+ my $domain = $conf->bootstrap->domain;
+
+ # Create a connection for our primary node.
+ $self->add_connection($domain);
+ $self->{primary_domain} = $domain;
+
+ if ($service) {
+ # If we're a service, this is where we listen for service-level requests.
+ $self->{service_address} = "opensrf:service:$service";
+ $self->create_service_stream;
+ }
+
+ $_singleton = $self unless $no_cache;
+
+ return $self;
+}
+
+sub reset {
+ return unless $_singleton;
+ $logger->debug("Redis client disconnecting on reset()");
+ $_singleton->disconnect;
+ $_singleton = undef;
+}
+
+sub connection_type {
+ my $self = shift;
+ return $self->{connection_type};
+}
+
+sub add_connection {
+ my ($self, $domain) = @_;
+
+ my $conf = OpenSRF::Utils::Config->current;
+
+ my $username = $conf->bootstrap->username;
+ my $password = $conf->bootstrap->passwd;
+ my $port = $conf->bootstrap->port;
+ my $max_queue = 1024; # TODO
+
+ # Assumes other connection parameters are the same across
+ # Redis instances, apart from the hostname.
+ my $connection = OpenSRF::Transport::Redis::BusConnection->new(
+ $domain, $port, $username, $password, $max_queue
+ );
+
+ $connection->set_address();
+ $connections{$domain} = $connection;
+
+ $connection->connect;
+
+ return $connection;
+}
+
+sub get_connection {
+ my ($self, $domain) = @_;
+
+ my $con = $connections{$domain};
+
+ return $con if $con;
+
+ eval { $con = $self->add_connection($domain) };
+
+ if ($@) {
+ $logger->error("Could not connect to bus on node: $domain : $@");
+ return undef;
+ }
+
+ return $con;
+}
+
+# Contains a value if this is a service client.
+# Undef for standalone clients.
+sub service {
+ my $self = shift;
+ return $self->{service};
+}
+
+# Contains a value if this is a service client.
+# Undef for standalone clients.
+sub service_address {
+ my $self = shift;
+ return $self->{service_address};
+}
+
+sub primary_domain {
+ my $self = shift;
+ return $self->{primary_domain};
+}
+
+sub primary_connection {
+ my $self = shift;
+ return $connections{$self->primary_domain};
+}
+
+sub disconnect {
+ my ($self, $domain) = @_;
+
+ for my $domain (keys %connections) {
+ my $con = $connections{$domain};
+ $con->disconnect($self->primary_domain eq $domain);
+ delete $connections{$domain};
+ }
+
+ $_singleton = undef;
+}
+
+sub connected {
+ my $self = shift;
+ return $self->primary_connection && $self->primary_connection->connected;
+}
+
+sub tcp_connected {
+ my $self = shift;
+ return $self->connected;
+}
+
+sub create_service_stream {
+ my $self = shift;
+
+ eval {
+ # This gets mad when a stream / group already exists,
+ # but it's conceivable that it's already been created.
+
+ $self->primary_connection->redis->xgroup(
+ 'create',
+ $self->service_address, # stream name
+ $self->service_address, # group name
+ '$', # only receive new messages
+ 'mkstream' # create this stream if it's not there.
+ );
+ };
+
+ if ($@) {
+ $logger->debug("BUSYGROUP is OK => : $@");
+ }
+}
+
+# Send a message to $recipient regardless of what's in the 'to'
+# field of the message.
+sub send_to {
+ my ($self, $recipient, @msg_parts) = @_;
+
+ my $msg = OpenSRF::Transport::Redis::Message->new(@msg_parts);
+
+ $msg->body(OpenSRF::Utils::JSON->JSON2perl($msg->body));
+
+ $msg->osrf_xid($logger->get_osrf_xid);
+ $msg->from($self->primary_connection->address);
+
+ my $msg_json = $msg->to_json;
+ my $con = $self->primary_connection;
+
+ if ($recipient =~ /^opensrf:client/o) {
+ # Clients may be lurking on remote nodes.
+ # Make sure we have a connection to said node.
+
+ # opensrf:client:domain:...
+ my (undef, undef, $domain) = split(/:/, $recipient);
+
+ my $con = $self->get_connection($domain);
+ if (!$con) {
+ $logger->error("Cannot send message to node $domain: $msg_json");
+ return;
+ }
+ }
+
+ $logger->internal("send(): recipient=$recipient : $msg_json");
+
+ $con->send($recipient, $msg_json);
+}
+
+sub send {
+ my $self = shift;
+ my %msg_parts = @_;
+ return $self->send_to($msg_parts{to}, %msg_parts);
+}
+
+sub process {
+ my ($self, $timeout, $for_service) = @_;
+
+ $timeout ||= 0;
+
+ # Redis does not support fractional timeouts.
+ $timeout = 1 if ($timeout > 0 && $timeout < 1);
+
+ $timeout = int($timeout);
+
+ if (!$self->connected) {
+ # We can't do anything without a primary bus connection.
+ # Sleep a short time to avoid die/fork storms, then
+ # get outta here.
+ $logger->error("We have no primary bus connection");
+ sleep 5;
+ die "Exiting on lack of primary bus connection";
+ }
+
+ return $self->recv($timeout, $for_service);
+}
+
+# $timeout=0 means check for data without blocking
+# $timeout=-1 means block indefinitely.
+sub recv {
+ my ($self, $timeout, $for_service) = @_;
+
+ my $dest_stream = $for_service ? $self->{service_address} : undef;
+
+ my $resp = $self->primary_connection->recv($timeout, $dest_stream);
+
+ return undef unless $resp;
+
+ my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json});
+
+ return undef unless $msg;
+
+ $msg->msg_id($resp->{msg_id});
+
+ $logger->internal("recv()'ed thread=" . $msg->thread);
+
+ # The message body is doubly encoded as JSON.
+ $msg->body(OpenSRF::Utils::JSON->perl2JSON($msg->body));
+
+ return $msg;
+}
+
+
+sub flush_socket {
+ my $self = shift;
+ # Remove all messages from our personal stream
+ if (my $con = $self->primary_connection) {
+ $con->redis->xtrim($con->address, 'MAXLEN', 0);
+ }
+ return 1;
+}
+
+1;
+
+
--- /dev/null
+package OpenSRF::Transport::Redis::Message;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Utils::JSON;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+
+sub new {
+ my ($class, %args) = @_;
+ my $self = bless({}, $class);
+
+ if ($args{json}) {
+ $self->from_json($args{json});
+
+ } else {
+ $self->{to} = $args{to} || '';
+ $self->{from} = $args{from} || '';
+ $self->{thread} = $args{thread} || '';
+ $self->{body} = $args{body} || '';
+ $self->{osrf_xid} = $args{osrf_xid} || '';
+ $self->{msg_id} = $args{msg_id} || '';
+ $self->{router_command} = $args{router_command} || '';
+ $self->{router_class} = $args{router_class} || '';
+ $self->{router_reply} = $args{router_reply} || '';
+ }
+
+ return $self;
+}
+
+sub to {
+ my($self, $to) = @_;
+ $self->{to} = $to if defined $to;
+ return $self->{to};
+}
+sub from {
+ my($self, $from) = @_;
+ $self->{from} = $from if defined $from;
+ return $self->{from};
+}
+sub thread {
+ my($self, $thread) = @_;
+ $self->{thread} = $thread if defined $thread;
+ return $self->{thread};
+}
+sub body {
+ my($self, $body) = @_;
+ $self->{body} = $body if defined $body;
+ return $self->{body};
+}
+
+sub status {
+ my($self, $status) = @_;
+ $self->{status} = $status if defined $status;
+ return $self->{status};
+}
+sub type {
+ my($self, $type) = @_;
+ $self->{type} = $type if defined $type;
+ return $self->{type};
+}
+
+sub err_type {}
+sub err_code {}
+
+sub osrf_xid {
+ my($self, $osrf_xid) = @_;
+ $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+ return $self->{osrf_xid};
+}
+
+sub msg_id {
+ my($self, $msg_id) = @_;
+ $self->{msg_id} = $msg_id if defined $msg_id;
+ return $self->{msg_id};
+}
+
+sub router_command {
+ my($self, $router_command) = @_;
+ $self->{router_command} = $router_command if defined $router_command;
+ return $self->{router_command};
+}
+
+sub router_class {
+ my($self, $router_class) = @_;
+ $self->{router_class} = $router_class if defined $router_class;
+ return $self->{router_class};
+}
+
+sub router_reply {
+ my($self, $router_reply) = @_;
+ $self->{router_reply} = $router_reply if defined $router_reply;
+ return $self->{router_reply};
+}
+
+sub to_json {
+ my $self = shift;
+
+ my $hash = {
+ to => $self->{to},
+ from => $self->{from},
+ thread => $self->{thread},
+ body => $self->{body}
+ };
+
+ # Some values are optional.
+ # Avoid cluttering the JSON with undef values.
+ for my $key (qw/osrf_xid router_command router_class router_reply/) {
+ $hash->{$key} = $self->{$key} if defined $self->{$key} && $self->{$key} ne '';
+ }
+
+ return OpenSRF::Utils::JSON->perl2JSON($hash);
+}
+
+sub from_json {
+ my $self = shift;
+ my $json = shift;
+ my $hash;
+
+ eval { $hash = OpenSRF::Utils::JSON->JSON2perl($json); };
+
+ if ($@) {
+ $logger->error("Redis::Message received invalid JSON: $@ : $json");
+ return undef;
+ }
+
+ $self->{$_} = $hash->{$_} for keys %$hash;
+}
+
+1;
--- /dev/null
+package OpenSRF::Transport::Redis::PeerConnection;
+use strict;
+use warnings;
+use OpenSRF::Transport::Redis::Client;
+
+use base qw/OpenSRF::Transport::Redis::Client/;
+
+sub construct {
+ my ($class, $service, $no_cache) = @_;
+ return __PACKAGE__->SUPER::new($service, $no_cache);
+}
+
+sub process {
+ my $self = shift;
+ my $msg = $self->SUPER::process(@_);
+ return 0 unless $msg;
+ return OpenSRF::Transport->handler($self->service, $msg);
+}
+
+
+1;
sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; }
-sub get_msg_envelope { return "OpenSRF::Transport::SlimJabber::MessageWrapper"; }
-
1;
#include <string.h>
#include <signal.h>
#include <opensrf/utils.h>
+#include <opensrf/osrfConfig.h>
#include <opensrf/osrf_hash.h>
#include <opensrf/transport_client.h>
#include <opensrf/osrf_message.h>
#include <opensrf/osrf_app_session.h>
#include <opensrf/log.h>
+#include <opensrf/string_array.h>
#define MAX_THREAD_SIZE 64
#define RECIP_BUF_SIZE 256
// opportunity, at which point force-close the connection.
#define SHUTDOWN_MAX_GRACEFUL_SECONDS 120
-// Incremented with every REQUEST, decremented with every COMPLETE.
-static int requests_in_flight = 0;
-
// default values, replaced during setup (below) as needed.
static char* config_file = "/openils/conf/opensrf_core.xml";
static char* config_ctxt = "gateway";
-static char* osrf_router = NULL;
static char* osrf_domain = NULL;
// Cache of opensrf thread strings and back-end receipients.
static char recipient_buf[RECIP_BUF_SIZE];
// Websocket client IP address (for logging)
static char* client_ip = NULL;
+// Tracks threads that have active requests in flight.
+// This covers all request types regardless of connected-ness.
+static osrfStringArray* active_threads = NULL;
static void rebuild_stdin_buffer();
static void child_init(int argc, char* argv[]);
// (replies returning to the websocket client).
fd_set fds;
int stdin_no = fileno(stdin);
- int osrf_no = osrf_handle->session->sock_id;
- int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+ //int osrf_no = osrf_handle->session->sock_id;
+ //int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+ int maxfd = stdin_no;
int sel_resp;
int shutdown_stat;
+ struct timeval tv;
while (1) {
FD_ZERO(&fds);
- FD_SET(osrf_no, &fds);
+ //FD_SET(osrf_no, &fds);
FD_SET(stdin_no, &fds);
if (shutdown_requested) {
} else {
- // Wait indefinitely for activity to process.
- // This will be interrupted during a shutdown request signal.
- sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+ if (active_threads->size > 0) {
+ tv.tv_usec = 0;
+ tv.tv_sec = 0;
+
+ // Do a non-blocking check for inbound requests while
+ // we wait for more osrf data to be returned.
+ sel_resp = select(maxfd + 1, &fds, NULL, NULL, &tv);
+
+ } else {
+
+ // No osrf responses pending. Wait indefinitely.
+ // This will be interrupted during a shutdown request signal.
+ sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
+ }
}
if (sel_resp < 0) { // error
"WS select() failed with [%s]. Exiting", strerror(errno));
shut_it_down(1);
- }
- if (sel_resp > 0) {
+ } else if (sel_resp > 0) {
if (FD_ISSET(stdin_no, &fds)) {
read_from_stdin();
- }
-
- if (FD_ISSET(osrf_no, &fds)) {
read_from_osrf();
}
+
+ } else if (active_threads->size > 0) {
+ // Nothing pulled from the websocket, but we still have
+ // active osrf request. See if any new responses have arrived.
+ read_from_osrf();
}
if (shutdown_requested) {
return -1;
}
- unsigned long active_sessions = osrfHashGetCount(stateful_session_cache);
- if (active_sessions == 0 && requests_in_flight == 0) {
+ if (active_threads->size == 0) {
osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle complete");
return 1;
}
osrfLogInfo(OSRF_LOG_MARK, "Graceful shutdown cycle continuing with "
- "sessions=%d requests=%d", active_sessions, requests_in_flight);
+ "active threeds=%d", active_threads);
return 0;
}
}
static int shut_it_down(int stat) {
+ osrfStringArrayFree(active_threads);
osrfHashFree(stateful_session_cache);
buffer_free(stdin_buf);
osrf_system_shutdown(); // clean XMPP disconnect
shut_it_down(1);
}
- osrf_handle = osrfSystemGetTransportClient();
- osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
+ osrf_handle = osrfSystemGetTransportClient();
+ osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
- osrf_router = osrfConfigGetValue(NULL, "/router_name");
osrf_domain = osrfConfigGetValue(NULL, "/domain");
stateful_session_cache = osrfNewHash();
osrfHashSetCallback(stateful_session_cache, release_hash_string);
+ active_threads = osrfNewStringArray(16);
+
client_ip = getenv("REMOTE_ADDR");
osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
}
if (!recipient) {
if (service) {
- int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
- "%s@%s/%s", osrf_router, osrf_domain, service);
+ int size = snprintf(recipient_buf,
+ RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
recipient_buf[size] = '\0';
recipient = recipient_buf;
switch (msg->m_type) {
case CONNECT:
+ if (!osrfStringArrayContains(active_threads, thread)) {
+ osrfStringArrayAdd(active_threads, thread);
+ }
break;
case REQUEST:
log_request(service, msg);
- requests_in_flight++;
+ if (!osrfStringArrayContains(active_threads, thread)) {
+ osrfStringArrayAdd(active_threads, thread);
+ }
break;
case DISCONNECT:
transport_message* tmsg = NULL;
// Double check the socket connection before continuing.
- if (!client_connected(osrf_handle) ||
- !socket_connected(osrf_handle->session->sock_id)) {
+ if (!client_connected(osrf_handle)) {
osrfLogWarning(OSRF_LOG_MARK,
"WS: Jabber socket disconnected, exiting");
shut_it_down(1);
// read. This means we can't return to the main select() loop after
// each message, because any subsequent messages will get stuck in
// the opensrf receive queue. Process all available messages.
- while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+
+ // As long as any active requests are in flight, wait up to one
+ // second to receive a response. Then return to inspect stdin
+ // to see if there are any requests waiting we can push through.
+ // Then come back here.
+ while (1) {
+ int timeout = active_threads->size > 0 ? 1 : 0;
+
+ tmsg = client_recv(osrf_handle, timeout);
+
+ if (!tmsg) { break; }
+
read_one_osrf_message(tmsg);
- message_free(tmsg);
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS relaying message to STDOUT thread=%s, recipient=%s",
+ tmsg->thread, tmsg->recipient);
}
}
} else {
- // connection timed out; clear the cached recipient
- if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+ // Any error condition ends the conversation.
+ if (one_msg->status_code >= OSRF_STATUS_BADREQUEST) {
osrfHashRemove(stateful_session_cache, tmsg->thread);
+ osrfStringArrayRemove(active_threads, tmsg->thread);
} else {
if (one_msg->status_code == OSRF_STATUS_COMPLETE) {
- requests_in_flight--;
+ osrfLogInternal(OSRF_LOG_MARK,
+ "WS Marking request complete for thread %s", tmsg->thread);
+ osrfStringArrayRemove(active_threads, tmsg->thread);
}
}
}