From 6e4f98908529f3c12cf44cd3abe61de291f38a5d Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Sat, 19 Nov 2022 16:44:08 -0500 Subject: [PATCH] Redis Perl / C Continued Signed-off-by: Bill Erickson --- configure.ac | 8 -------- src/libopensrf/Makefile.am | 2 +- src/libopensrf/osrf_prefork.c | 11 +++++------ src/libopensrf/transport_client.c | 14 +++++++++----- src/perl/lib/OpenSRF/Server.pm | 10 +++++----- src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm | 2 +- src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm | 2 +- src/websocket-stdio/osrf-websocket-stdio.c | 17 +++++++++++++++-- 8 files changed, 37 insertions(+), 29 deletions(-) diff --git a/configure.ac b/configure.ac index 4a9a1fe..9bd31b4 100644 --- a/configure.ac +++ b/configure.ac @@ -214,12 +214,6 @@ AC_ARG_WITH([hiredis], [HIREDIS_HEADERS=/usr/include/hiredis/]) AC_SUBST([HIREDIS_HEADERS]) -AC_ARG_WITH([fyaml], -[ --with-fyaml=path location of the fyaml headers (default is /usr/include/))], -[FYAML_HEADERS=${withval}], -[FYAML_HEADERS=/usr/include/]) -AC_SUBST([FYAML_HEADERS]) - AC_ARG_WITH([includes], [ --with-includes=DIRECTORIES a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)], [EXTRA_USER_INCLUDES=${withval}]) @@ -287,7 +281,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers)) AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers)) AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis)) - AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml)) # Check for libmemcached and set flags accordingly PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0) AC_SUBST(memcached_CFLAGS) @@ -357,6 +350,5 @@ AC_MSG_RESULT([--------------------- Configuration options: ------------------- AC_MSG_RESULT(Apache version: ${APACHE_READABLE_VERSION}) AC_MSG_RESULT(libxml2 headers location: ${LIBXML2_HEADERS}) AC_MSG_RESULT(libhiredis headers location: ${HIREDIS_HEADERS}) - AC_MSG_RESULT(libfyaml headers location: ${FYAML_HEADERS}) AC_MSG_RESULT([----------------------------------------------------------------------]) diff --git a/src/libopensrf/Makefile.am b/src/libopensrf/Makefile.am index 34dbc5a..159e90e 100644 --- a/src/libopensrf/Makefile.am +++ b/src/libopensrf/Makefile.am @@ -14,7 +14,7 @@ AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir) -LDADD = -lopensrf -lfyaml +LDADD = -lopensrf DISTCLEANFILES = Makefile.in Makefile diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index 9d764f8..449ed01 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -240,13 +240,13 @@ static void osrf_prefork_send_router_registration( if (unregister) { osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid ); - msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL ); + msg = message_init( "\"[]\"", NULL, NULL, jid, NULL ); message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 ); } else { osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid ); - msg = message_init( "\"registering\"", NULL, NULL, jid, NULL ); + msg = message_init( "\"[]\"", NULL, NULL, jid, NULL ); message_set_router_info( msg, NULL, NULL, appname, "register", 0 ); } @@ -403,8 +403,6 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { transport_client* client = osrfSystemGetTransportClient(); - osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client); - // Make sure that we're still connected to Jabber; reconnect if necessary. if( !client_connected( client )) { osrfSystemIgnoreTransportClient(); @@ -850,9 +848,10 @@ static void prefork_run( prefork_simple* forker ) { return; } - // Wait indefinitely for an input message + // NOTE: avoid indefinite waiting in our recv calls. + // See Perl bits for more info. osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); - cur_msg = client_recv_for_service( forker->connection, -1 ); + cur_msg = client_recv_for_service( forker->connection, 5 ); if( cur_msg == NULL ) { // most likely a signal was received. clean up any recently diff --git a/src/libopensrf/transport_client.c b/src/libopensrf/transport_client.c index 945b153..a8499ca 100644 --- a/src/libopensrf/transport_client.c +++ b/src/libopensrf/transport_client.c @@ -150,13 +150,15 @@ int client_send_message_to(transport_client* client, transport_message* msg, con if (client == NULL || client->error) { return -1; } + const char* receiver = recipient == NULL ? msg->recipient : recipient; + transport_con* con; - if (strstr(recipient, "opensrf:client")) { + if (strstr(receiver, "opensrf:client")) { // We may be talking to a worker that runs on a remote domain. // Find or create a connection to the domain. - char* domain = get_domain_from_address(recipient); + char* domain = get_domain_from_address(receiver); if (!domain) { return -1; } @@ -173,15 +175,17 @@ int client_send_message_to(transport_client* client, transport_message* msg, con con = client->primary_connection; } + // The message sender is always our primary connection address, + // since that's the only address we listen for inbound data on. if (msg->sender) { free(msg->sender); } - msg->sender = strdup(con->address); + msg->sender = strdup(client->primary_connection->address); message_prepare_json(msg); osrfLogInternal(OSRF_LOG_MARK, - "client_send_message() to=%s %s", recipient, msg->msg_json); + "client_send_message() to=%s %s", receiver, msg->msg_json); - return transport_con_send(con, msg->msg_json, recipient); + return transport_con_send(con, msg->msg_json, receiver); } transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) { diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index f34e633..0535fae 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -160,10 +160,10 @@ sub run { $self->check_status; $self->{child_died} = 0; - my $msg = $self->{osrf_handle}->process($wait_time); + my $msg = $self->{osrf_handle}->process($wait_time, $self->{service}); # we woke up for any reason, reset the wait time to allow - # for idle maintenance as necessary + # for more frequent idle maintenance checks. $wait_time = 1; if($msg) { @@ -215,7 +215,7 @@ sub run { # for a new request. In future, we could replace # signals with messages sent directly to listeners # telling them to shutdown. - $wait_time = 3 if + $wait_time = 5 if !$self->perform_idle_maintenance and # no maintenance performed this time @{$self->{active_list}} == 0; # no active children } @@ -540,7 +540,7 @@ sub register_routers { $logger->info("server: registering with router $_"); $self->{osrf_handle}->send( to => $_, - body => '"registering"', + body => '"[]"', router_command => 'register', router_class => $self->{service} ); @@ -561,7 +561,7 @@ sub unregister_routers { $logger->info("server: disconnecting from router $router"); $self->{osrf_handle}->send( to => $router, - body => '"unregistering"', + body => '"[]"', router_command => "unregister", router_class => $self->{service} ); diff --git a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm index bede432..03e4aae 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm @@ -119,7 +119,7 @@ sub recv { my ($self, $timeout, $dest_stream) = @_; $dest_stream ||= $self->address; - $logger->debug("Waiting for content at: $dest_stream"); + $logger->internal("Waiting for content at: $dest_stream"); my $packet; diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm index 742ad8c..97d73ca 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm @@ -7,7 +7,7 @@ use base qw/OpenSRF::Transport::Redis::Client/; sub construct { my ($class, $service, $force) = @_; - return __PACKAGE__->SUPER::new($service, $force); + return __PACKAGE__->SUPER::new($service || "client", $force); } sub process { diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index d5c9d77..a3e18f8 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -88,6 +88,7 @@ static growing_buffer* stdin_buf = NULL; static transport_client* osrf_handle = NULL; // Reusable string buf for recipient addresses static char recipient_buf[RECIP_BUF_SIZE]; +static char deliver_to_buf[RECIP_BUF_SIZE]; // Websocket client IP address (for logging) static char* client_ip = NULL; // Tracks threads that have active requests in flight. @@ -437,14 +438,26 @@ static void relay_stdin_message(const char* msg_string) { } } + char* deliver_to = NULL; + if (!recipient) { if (service) { + // Top level API calls are addressed to the service in question, + // but they are sent to the router for processing. + int size = snprintf(recipient_buf, - RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain); + RECIP_BUF_SIZE - 1, "opensrf:service:%s", service); + recipient_buf[size] = '\0'; recipient = recipient_buf; + size = snprintf(deliver_to_buf, + RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain); + + deliver_to_buf[size] = '\0'; + deliver_to = deliver_to_buf; + } else { osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); return; @@ -471,7 +484,7 @@ static void relay_stdin_message(const char* msg_string) { message_set_osrf_xid(tmsg, osrfLogGetXid()); - if (client_send_message(osrf_handle, tmsg) != 0) { + if (client_send_message_to(osrf_handle, tmsg, deliver_to) != 0) { osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting"); shut_it_down(1); } -- 2.11.0