Redis Perl / C Continued
authorBill Erickson <berickxx@gmail.com>
Sat, 19 Nov 2022 21:44:08 +0000 (16:44 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 20 Apr 2023 14:18:05 +0000 (10:18 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
configure.ac
src/libopensrf/Makefile.am
src/libopensrf/osrf_prefork.c
src/libopensrf/transport_client.c
src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm
src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm
src/websocket-stdio/osrf-websocket-stdio.c

index 4a9a1fe..9bd31b4 100644 (file)
@@ -214,12 +214,6 @@ AC_ARG_WITH([hiredis],
 [HIREDIS_HEADERS=/usr/include/hiredis/])
 AC_SUBST([HIREDIS_HEADERS])
 
-AC_ARG_WITH([fyaml],
-[  --with-fyaml=path               location of the fyaml headers (default is /usr/include/))],
-[FYAML_HEADERS=${withval}],
-[FYAML_HEADERS=/usr/include/])
-AC_SUBST([FYAML_HEADERS])
-
 AC_ARG_WITH([includes],
 [  --with-includes=DIRECTORIES      a colon-separated list of directories that will be added to the list the compiler searches for header files (Example: --with-includes=/path/headers:/anotherpath/moreheaders)],
 [EXTRA_USER_INCLUDES=${withval}])
@@ -287,7 +281,6 @@ if test "x$OSRF_INSTALL_CORE" = "xtrue"; then
        AC_CHECK_LIB([readline], [readline], [], AC_MSG_ERROR(***OpenSRF requires readline development headers))
        AC_CHECK_LIB([xml2], [xmlAddID], [], AC_MSG_ERROR(***OpenSRF requires xml2 development headers))
     AC_CHECK_LIB([hiredis], [redisConnect], [], AC_MSG_ERROR(***OpenSRF requires libhiredis))
-    AC_CHECK_LIB([fyaml], [fy_document_build_from_file], [], AC_MSG_ERROR(***OpenSRF requires libfyaml))
        # Check for libmemcached and set flags accordingly
        PKG_CHECK_MODULES(memcached, libmemcached >= 0.8.0)
        AC_SUBST(memcached_CFLAGS)
@@ -357,6 +350,5 @@ AC_MSG_RESULT([--------------------- Configuration options:  -------------------
         AC_MSG_RESULT(Apache version:                  ${APACHE_READABLE_VERSION})
         AC_MSG_RESULT(libxml2 headers location:        ${LIBXML2_HEADERS})
         AC_MSG_RESULT(libhiredis headers location:     ${HIREDIS_HEADERS})
-        AC_MSG_RESULT(libfyaml headers location:       ${FYAML_HEADERS})
 
 AC_MSG_RESULT([----------------------------------------------------------------------])
index 34dbc5a..159e90e 100644 (file)
@@ -14,7 +14,7 @@
 
 AM_CFLAGS = $(DEF_CFLAGS) -DASSUME_STATELESS  -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -DOSRF_JSON_ENABLE_XML_UTILS
 AM_LDFLAGS = $(DEF_LDFLAGS) -R $(libdir)
-LDADD = -lopensrf -lfyaml
+LDADD = -lopensrf
 
 DISTCLEANFILES = Makefile.in Makefile
 
index 9d764f8..449ed01 100644 (file)
@@ -240,13 +240,13 @@ static void osrf_prefork_send_router_registration(
     if (unregister) {
 
            osrfLogInfo( OSRF_LOG_MARK, "%s un-registering with router %s", appname, jid );
-           msg = message_init( "\"unregistering\"", NULL, NULL, jid, NULL );
+           msg = message_init( "\"[]\"", NULL, NULL, jid, NULL );
            message_set_router_info( msg, NULL, NULL, appname, "unregister", 0 );
 
     } else {
 
            osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
-           msg = message_init( "\"registering\"", NULL, NULL, jid, NULL );
+           msg = message_init( "\"[]\"", NULL, NULL, jid, NULL );
            message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
     }
 
@@ -403,8 +403,6 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
 
        transport_client* client = osrfSystemGetTransportClient();
 
-    osrfLogInfo(OSRF_LOG_MARK, "we have a client = %s", client);
-
        // Make sure that we're still connected to Jabber; reconnect if necessary.
        if( !client_connected( client )) {
                osrfSystemIgnoreTransportClient();
@@ -850,9 +848,10 @@ static void prefork_run( prefork_simple* forker ) {
                        return;
                }
 
-               // Wait indefinitely for an input message
+        // NOTE: avoid indefinite waiting in our recv calls.  
+        // See Perl bits for more info.
                osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-               cur_msg = client_recv_for_service( forker->connection, -1 );
+               cur_msg = client_recv_for_service( forker->connection, 5 );
 
                if( cur_msg == NULL ) {
                        // most likely a signal was received.  clean up any recently
index 945b153..a8499ca 100644 (file)
@@ -150,13 +150,15 @@ int client_send_message_to(transport_client* client, transport_message* msg, con
 
        if (client == NULL || client->error) { return -1; }
 
+    const char* receiver = recipient == NULL ? msg->recipient : recipient;
+
     transport_con* con;
 
-    if (strstr(recipient, "opensrf:client")) {
+    if (strstr(receiver, "opensrf:client")) {
         // We may be talking to a worker that runs on a remote domain.
         // Find or create a connection to the domain.
 
-        char* domain = get_domain_from_address(recipient);
+        char* domain = get_domain_from_address(receiver);
 
         if (!domain) { return -1; }
 
@@ -173,15 +175,17 @@ int client_send_message_to(transport_client* client, transport_message* msg, con
         con = client->primary_connection;
     }
         
+    // The message sender is always our primary connection address,
+    // since that's the only address we listen for inbound data on.
        if (msg->sender) { free(msg->sender); }
-       msg->sender = strdup(con->address);
+       msg->sender = strdup(client->primary_connection->address);
 
     message_prepare_json(msg);
 
     osrfLogInternal(OSRF_LOG_MARK, 
-        "client_send_message() to=%s %s", recipient, msg->msg_json);
+        "client_send_message() to=%s %s", receiver, msg->msg_json);
 
-    return transport_con_send(con, msg->msg_json, recipient);
+    return transport_con_send(con, msg->msg_json, receiver);
 }
 
 transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
index f34e633..0535fae 100644 (file)
@@ -160,10 +160,10 @@ sub run {
         $self->check_status;
         $self->{child_died} = 0;
 
-        my $msg = $self->{osrf_handle}->process($wait_time);
+        my $msg = $self->{osrf_handle}->process($wait_time, $self->{service});
 
         # we woke up for any reason, reset the wait time to allow
-        # for idle maintenance as necessary
+        # for more frequent idle maintenance checks.
         $wait_time = 1;
 
         if($msg) {
@@ -215,7 +215,7 @@ sub run {
                 # for a new request.  In future, we could replace
                 # signals with messages sent directly to listeners
                 # telling them to shutdown.
-                $wait_time = 3 if 
+                $wait_time = 5 if 
                     !$self->perform_idle_maintenance and # no maintenance performed this time
                     @{$self->{active_list}} == 0; # no active children 
             }
@@ -540,7 +540,7 @@ sub register_routers {
         $logger->info("server: registering with router $_");
         $self->{osrf_handle}->send(
             to => $_,
-            body => '"registering"',
+            body => '"[]"',
             router_command => 'register',
             router_class => $self->{service}
         );
@@ -561,7 +561,7 @@ sub unregister_routers {
         $logger->info("server: disconnecting from router $router");
         $self->{osrf_handle}->send(
             to => $router,
-            body => '"unregistering"',
+            body => '"[]"',
             router_command => "unregister",
             router_class => $self->{service}
         );
index bede432..03e4aae 100644 (file)
@@ -119,7 +119,7 @@ sub recv {
     my ($self, $timeout, $dest_stream) = @_;
     $dest_stream ||= $self->address;
 
-    $logger->debug("Waiting for content at: $dest_stream");
+    $logger->internal("Waiting for content at: $dest_stream");
 
     my $packet;
 
index 742ad8c..97d73ca 100644 (file)
@@ -7,7 +7,7 @@ use base qw/OpenSRF::Transport::Redis::Client/;
 
 sub construct {
     my ($class, $service, $force) = @_;
-    return __PACKAGE__->SUPER::new($service, $force);
+    return __PACKAGE__->SUPER::new($service || "client", $force);
 }
 
 sub process {
index d5c9d77..a3e18f8 100644 (file)
@@ -88,6 +88,7 @@ static growing_buffer* stdin_buf = NULL;
 static transport_client* osrf_handle = NULL;
 // Reusable string buf for recipient addresses
 static char recipient_buf[RECIP_BUF_SIZE];
+static char deliver_to_buf[RECIP_BUF_SIZE];
 // Websocket client IP address (for logging)
 static char* client_ip = NULL;
 // Tracks threads that have active requests in flight.
@@ -437,14 +438,26 @@ static void relay_stdin_message(const char* msg_string) {
         }
     }
 
+    char* deliver_to = NULL;
+
     if (!recipient) {
 
         if (service) {
+            // Top level API calls are addressed to the service in question,
+            // but they are sent to the router for processing.
+
             int size = snprintf(recipient_buf, 
-                RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
+                RECIP_BUF_SIZE - 1, "opensrf:service:%s", service);
+
             recipient_buf[size] = '\0';
             recipient = recipient_buf;
 
+            size = snprintf(deliver_to_buf, 
+                RECIP_BUF_SIZE - 1, "opensrf:router:%s", osrf_domain);
+
+            deliver_to_buf[size] = '\0';
+            deliver_to = deliver_to_buf;
+
         } else {
             osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
             return;
@@ -471,7 +484,7 @@ static void relay_stdin_message(const char* msg_string) {
 
     message_set_osrf_xid(tmsg, osrfLogGetXid());
 
-    if (client_send_message(osrf_handle, tmsg) != 0) {
+    if (client_send_message_to(osrf_handle, tmsg, deliver_to) != 0) {
         osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
         shut_it_down(1);
     }