[HIREDIS_HEADERS=/usr/include/hiredis/])
AC_SUBST([HIREDIS_HEADERS])
-AC_ARG_WITH([fyaml],
-[ --with-fyaml=path location of the fyaml headers (default is /usr/include/))],
-[FYAML_HEADERS=${withval}],
-[FYAML_HEADERS=/usr/include/])
-AC_SUBST([FYAML_HEADERS])
-
AC_ARG_WITH([includes],
[ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
[EXTRA_USER_INCLUDES=${withval}])
AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
- AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml))
# Check for libmemcached and set flags accordingly
PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
AC_SUBST(memcached_CFLAGS)
AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION})
AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS})
AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS})
- AC_MSG_RESULT(libfyaml headers location: ${FYAML_HEADERS})
AC_MSG_RESULT([----------------------------------------------------------------------])
AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS
AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir)
-LDADD = -lopensrf -lfyaml
+LDADD = -lopensrf
DISTCLEANFILES = Makefile.in Makefile
if (unregister) {
osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
- msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL );
+ msg = message_init( "\"[]\"", NULL, NULL, jid, NULL );
message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
} else {
osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
- msg = message_init( "\"registering\"", NULL, NULL, jid, NULL );
+ msg = message_init( "\"[]\"", NULL, NULL, jid, NULL );
message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
}
transport_client* client = osrfSystemGetTransportClient();
- osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client);
-
// Make sure that we're still connected to Jabber; reconnect if necessary.
if( !client_connected( client )) {
osrfSystemIgnoreTransportClient();
return;
}
- // Wait indefinitely for an input message
+ // NOTE: avoid indefinite waiting in our recv calls.
+ // See Perl bits for more info.
osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
- cur_msg = client_recv_for_service( forker->connection, -1 );
+ cur_msg = client_recv_for_service( forker->connection, 5 );
if( cur_msg == NULL ) {
// most likely a signal was received. clean up any recently
if (client == NULL || client->error) { return -1; }
+ const char* receiver = recipient == NULL ? msg->recipient : recipient;
+
transport_con* con;
- if (strstr(recipient, "opensrf:client")) {
+ if (strstr(receiver, "opensrf:client")) {
// We may be talking to a worker that runs on a remote domain.
// Find or create a connection to the domain.
- char* domain = get_domain_from_address(recipient);
+ char* domain = get_domain_from_address(receiver);
if (!domain) { return -1; }
con = client->primary_connection;
}
+ // The message sender is always our primary connection address,
+ // since that's the only address we listen for inbound data on.
if (msg->sender) { free(msg->sender); }
- msg->sender = strdup(con->address);
+ msg->sender = strdup(client->primary_connection->address);
message_prepare_json(msg);
osrfLogInternal(OSRF_LOG_MARK,
- "client_send_message() to=%s %s", recipient, msg->msg_json);
+ "client_send_message() to=%s %s", receiver, msg->msg_json);
- return transport_con_send(con, msg->msg_json, recipient);
+ return transport_con_send(con, msg->msg_json, receiver);
}
transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
$self->check_status;
$self->{child_died} = 0;
- my $msg = $self->{osrf_handle}->process($wait_time);
+ my $msg = $self->{osrf_handle}->process($wait_time, $self->{service});
# we woke up for any reason, reset the wait time to allow
- # for idle maintenance as necessary
+ # for more frequent idle maintenance checks.
$wait_time = 1;
if($msg) {
# for a new request. In future, we could replace
# signals with messages sent directly to listeners
# telling them to shutdown.
- $wait_time = 3 if
+ $wait_time = 5 if
!$self->perform_idle_maintenance and # no maintenance performed this time
@{$self->{active_list}} == 0; # no active children
}
$logger->info("server: registering with router $_");
$self->{osrf_handle}->send(
to => $_,
- body => '"registering"',
+ body => '"[]"',
router_command => 'register',
router_class => $self->{service}
);
$logger->info("server: disconnecting from router $router");
$self->{osrf_handle}->send(
to => $router,
- body => '"unregistering"',
+ body => '"[]"',
router_command => "unregister",
router_class => $self->{service}
);
my ($self, $timeout, $dest_stream) = @_;
$dest_stream ||= $self->address;
- $logger->debug("Waiting for content at: $dest_stream");
+ $logger->internal("Waiting for content at: $dest_stream");
my $packet;
sub construct {
my ($class, $service, $force) = @_;
- return __PACKAGE__->SUPER::new($service, $force);
+ return __PACKAGE__->SUPER::new($service || "client", $force);
}
sub process {
static transport_client* osrf_handle = NULL;
// Reusable string buf for recipient addresses
static char recipient_buf[RECIP_BUF_SIZE];
+static char deliver_to_buf[RECIP_BUF_SIZE];
// Websocket client IP address (for logging)
static char* client_ip = NULL;
// Tracks threads that have active requests in flight.
}
}
+ char* deliver_to = NULL;
+
if (!recipient) {
if (service) {
+ // Top level API calls are addressed to the service in question,
+ // but they are sent to the router for processing.
+
int size = snprintf(recipient_buf,
- RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
+ RECIP_BUF_SIZE - 1, "opensrf:service:%s", service);
+
recipient_buf[size] = '\0';
recipient = recipient_buf;
+ size = snprintf(deliver_to_buf,
+ RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
+
+ deliver_to_buf[size] = '\0';
+ deliver_to = deliver_to_buf;
+
} else {
osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
return;
message_set_osrf_xid(tmsg, osrfLogGetXid());
- if (client_send_message(osrf_handle, tmsg) != 0) {
+ if (client_send_message_to(osrf_handle, tmsg, deliver_to) != 0) {
osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
shut_it_down(1);
}