Perl / C Redis Return to list push/pop
authorBill Erickson <berickxx@gmail.com>
Fri, 18 Nov 2022 22:36:22 +0000 (17:36 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 20 Apr 2023 14:18:05 +0000 (10:18 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
bin/opensrf-perl.pl.in
include/opensrf/transport_connection.h
src/libopensrf/transport_client.c
src/libopensrf/transport_connection.c
src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm
src/perl/lib/OpenSRF/Transport/Redis/Client.pm
src/perl/lib/OpenSRF/Transport/Redis/Message.pm
src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm

index 1c34ad8..89cf565 100755 (executable)
@@ -648,10 +648,9 @@ sub do_reset_message_bus {
 
         my @perms = qw/
             -@all 
-            +xgroup 
-            +xadd 
-            +xreadgroup 
-            +xtrim 
+            +lpop
+            +blpop
+            +rpush
             +del 
             ~opensrf:router:*
             ~opensrf:service:*
index 25eef27..a28951e 100644 (file)
@@ -26,7 +26,6 @@ struct transport_con_struct {
 typedef struct transport_con_struct transport_con;
 
 struct transport_con_msg_struct {
-    char* msg_id;
     char* msg_json;
 };
 typedef struct transport_con_msg_struct transport_con_msg;
@@ -45,19 +44,16 @@ int transport_con_connect(transport_con* con,
 
 int transport_con_disconnect(transport_con* con);
 
-int transport_con_send(transport_con* con, const char* msg_json, const char* stream);
+int transport_con_send(transport_con* con, const char* msg_json, const char* recipient);
 
-transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream);
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient);
 
-transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream);
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* recipient);
 
 void transport_con_flush_socket(transport_con* con);
 
 int handle_redis_error(redisReply *reply, const char* command, ...);
 
-int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok);
-
-
 #ifdef __cplusplus
 }
 #endif
index f020646..945b153 100644 (file)
@@ -76,11 +76,8 @@ int client_connect_as_service(transport_client* client, const char* service) {
 
     client->primary_connection = con;
 
-    transport_con_connect(
+    return transport_con_connect(
         con, client->port, client->username, client->password);
-
-    // Make a stream for the service address
-    return transport_con_make_stream(con, client->service_address, 1);
 }
 
 int client_connect(transport_client* client) {
@@ -185,10 +182,6 @@ int client_send_message_to(transport_client* client, transport_message* msg, con
         "client_send_message() to=%s %s", recipient, msg->msg_json);
 
     return transport_con_send(con, msg->msg_json, recipient);
-
-    osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
-    
-    return 0;
 }
 
 transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
index c4544db..d3f12cb 100644 (file)
@@ -21,8 +21,6 @@ void transport_con_msg_free(transport_con_msg* msg) {
     osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()");
 
     if (msg == NULL) { return; } 
-
-    if (msg->msg_id) { free(msg->msg_id); }
     if (msg->msg_json) { free(msg->msg_json); }
 
     free(msg);
@@ -105,34 +103,6 @@ int transport_con_connect(
 
     freeReplyObject(reply);
 
-    return transport_con_make_stream(con, con->address, 0);
-}
-
-int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) {
-    osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream);
-
-    redisReply *reply = redisCommand(
-        con->bus, 
-        "XGROUP CREATE %s %s $ mkstream", 
-        stream,
-        stream,
-        "$",
-        "mkstream"
-    );
-
-    // Produces an error when a group/stream already exists, but that's
-    // acceptible when creating a group/stream for a stop-level service 
-    // address, since multiple Listeners are allowed.
-    if (handle_redis_error(reply, 
-        "XGROUP CREATE %s %s $ mkstream", 
-        stream,
-        stream,
-        "$",
-        "mkstream"
-    )) { return exists_ok; }
-
-    freeReplyObject(reply);
-
     return 1;
 }
 
@@ -153,116 +123,82 @@ int transport_con_disconnect(transport_con* con) {
     return 0;
 }
 
-int transport_con_send(transport_con* con, const char* msg_json, const char* stream) {
+// Returns 0 on success.
+int transport_con_send(transport_con* con, const char* msg_json, const char* recipient) {
 
-    osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json);
+    osrfLogInternal(OSRF_LOG_MARK, "Sending to recipient=%s: %s", recipient, msg_json);
 
-    redisReply *reply = redisCommand(con->bus,
-        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
-        stream,
-        con->max_queue,
-        msg_json
-    );
+    redisReply *reply = redisCommand(con->bus, "RPUSH %s %s", recipient, msg_json);
 
-    if (handle_redis_error(reply, 
-        "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
-        stream, con->max_queue, msg_json)) {
+    int stat = handle_redis_error(reply, "RPUSH %s %s", recipient, msg_json);
 
-        return -1;
+    if (!stat) {
+        freeReplyObject(reply);
     }
 
-    freeReplyObject(reply);
-
-    return 0;
+    return stat;
 }
 
-transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) {
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient) {
+
     osrfLogInternal(OSRF_LOG_MARK, 
-        "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream);
+        "TCON transport_con_recv_once() timeout=%d recipient=%s", timeout, recipient);
 
-    if (stream == NULL) { stream = con->address; }
+    if (recipient == NULL) { recipient = con->address; }
 
-    redisReply *reply, *tmp;
-    char *msg_id = NULL, *json = NULL;
+    size_t len = 0;
+    char command_buf[256];
 
-    if (timeout == 0) {
+    if (timeout == 0) { // Non-blocking list pop
 
-        reply = redisCommand(con->bus, 
-            "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
-            stream, con->address, stream
-        );
+        len = snprintf(command_buf, 256, "LPOP %s", recipient);
 
     } else {
+        
+        if (timeout < 0) { // Block indefinitely
 
-        if (timeout == -1) {
-            // Redis timeout 0 means block indefinitely
-            timeout = 0;
-        } else {
-            // Milliseconds
-            timeout *= 1000;
-        }
+            len = snprintf(command_buf, 256, "BLPOP %s 0", recipient);
 
-        reply = redisCommand(con->bus, 
-            "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
-            stream, con->address, timeout, stream
-        );
+        } else { // Block up to timeout seconds
+
+            len = snprintf(command_buf, 256, "BLPOP %s %d", recipient, timeout);
+        }
     }
 
-    // Timeout or error
-    if (handle_redis_error(
-        reply,
-        "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >",
-        stream, con->address, "BLOCK X", stream
-    )) { return NULL; }
-
-    // Unpack the XREADGROUP response, which is a nest of arrays.
-    // These arrays are mostly 1 and 2-element lists, since we are 
-    // only reading one item on a single stream.
-    if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
-        tmp = reply->element[0];
-
-        if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
-            tmp = tmp->element[1];
-
-            if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
-                tmp = tmp->element[0];
-
-                if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
-                    redisReply *r1 = tmp->element[0];
-                    redisReply *r2 = tmp->element[1];
-
-                    if (r1->type == REDIS_REPLY_STRING) {
-                        msg_id = strdup(r1->str);
-                    }
-
-                    if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
-                        // r2->element[0] is the message name, which we
-                        // currently don't use for anything.
-
-                        r2 = r2->element[1];
-
-                        if (r2->type == REDIS_REPLY_STRING) {
-                            json = strdup(r2->str);
-                        }
-                    }
-                }
-            }
+    command_buf[len] = '\0';
+
+    osrfLogInternal(OSRF_LOG_MARK, 
+        "recv_one_chunk() sending command: %s", command_buf);
+
+    redisReply* reply = redisCommand(con->bus, command_buf);
+    if (handle_redis_error(reply, command_buf)) { return NULL; }
+
+    char* json = NULL;
+    if (reply->type == REDIS_REPLY_STRING) { // LPOP
+        json = strdup(reply->str);
+
+    } else if (reply->type == REDIS_REPLY_ARRAY) { // BLPOP
+
+        // BLPOP returns [list_name, popped_value]
+        if (reply->elements == 2 && reply->element[1]->str != NULL) {
+            json = strdup(reply->element[1]->str); 
+        } else {
+            osrfLogInternal(OSRF_LOG_MARK, 
+                "No response returned within timeout: %d", timeout);
         }
     }
 
-    freeReplyObject(reply); // XREADGROUP
+    freeReplyObject(reply);
+
+    osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
 
-    if (msg_id == NULL) {
-        // Read timed out. 'json' will also be NULL.
+    if (json == NULL) {
         return NULL;
     }
 
     transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg));
-    tcon_msg->msg_id = msg_id;
     tcon_msg->msg_json = json;
 
-    osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
-
     return tcon_msg;
 }
 
index 269ee1d..f34e633 100644 (file)
@@ -81,6 +81,7 @@ sub cleanup {
                 @{$self->{active_list}}." active children...");
 
             # block until a child is becomes available
+            $logger->info("waiting for child procs to clear in graceful shutdown");
             $self->check_status(1);
         }
         $logger->info("server: all clear for graceful shutdown");
@@ -154,7 +155,7 @@ sub run {
     my $wait_time = 1;
 
     # main server loop
-    while(1) {
+    while (1) {
 
         $self->check_status;
         $self->{child_died} = 0;
@@ -202,7 +203,19 @@ sub run {
 
                 # when we hit equilibrium, there's no need for regular
                 # maintenance, so set wait_time to 'forever'
-                $wait_time = -1 if 
+                #
+                # Avoid indefinite waiting here -- Redis client
+                # gracefully handles interrupts and immediately goes
+                # back to listening after the signal handler is
+                # complete.  In our case, the signal handler may include
+                # un-registering with routers, which requires the Redis
+                # client to wait for an ACK from the Redis server.
+                # However, it will never receive the ack because our
+                # client is already blocking on an BLPOP call wiating
+                # for a new request.  In future, we could replace
+                # signals with messages sent directly to listeners
+                # telling them to shutdown.
+                $wait_time = 3 if 
                     !$self->perform_idle_maintenance and # no maintenance performed this time
                     @{$self->{active_list}} == 0; # no active children 
             }
@@ -552,6 +565,7 @@ sub unregister_routers {
             router_command => "unregister",
             router_class => $self->{service}
         );
+        $logger->info("Disconnect sent to $router");
     }
 }
 
@@ -601,7 +615,7 @@ sub init {
     my $self = shift;
     my $service = $self->{parent}->{service};
     $0 = "OpenSRF Drone [$service]";
-    OpenSRF::Transport::PeerHandle->construct($service);
+    OpenSRF::Transport::PeerHandle->construct($service, 1);
     OpenSRF::Application->application_implementation->child_init
         if (OpenSRF::Application->application_implementation->can('child_init'));
 }
index f74cad8..bede432 100644 (file)
@@ -65,11 +65,11 @@ sub connect {
     $logger->debug("Redis client connecting: ".
         "domain=$domain port=$port username=$username address=$address");
 
-    # On disconnect, try to reconnect every 100ms up to 60 seconds.
+    # On disconnect, try to reconnect every second up to 60 seconds.
     my @connect_args = (
         server => "$domain:$port",
         reconnect => 60, 
-        every => 100_000
+        every => 1_000_000
     );
 
     $logger->debug("Connecting to bus: @connect_args");
@@ -84,17 +84,6 @@ sub connect {
 
     $logger->debug("Auth'ed with Redis as $username OK : address=$address");
 
-    # Each bus connection has its own stream / group for receiving
-    # direct messages.  These streams/groups should not pre-exist.
-
-    $self->redis->xgroup(   
-        'create',
-        $address,
-        $address,
-        '$',            # only receive new messages
-        'mkstream'      # create this stream if it's not there.
-    );
-
     return $self;
 }
 
@@ -116,22 +105,9 @@ sub send {
     
     $logger->internal("send(): to=$dest_stream : $msg_json");
 
-    my @params = (
-        $dest_stream,
-        'NOMKSTREAM',
-        'MAXLEN', 
-        '~',                        # maxlen-ish
-        $self->{max_queue},
-        '*',                        # let Redis generate the ID
-        'message',                  # gotta call it something 
-        $msg_json
-    );
-
-    eval { $self->redis->xadd(@params) };
+    eval { $self->redis->rpush($dest_stream, $msg_json) };
 
-    if ($@) {
-        $logger->error("XADD error: $@ : @params");
-    }
+    if ($@) { $logger->error("RPUSH error: $@"); }
 }
 
 # $timeout=0 means check for data without blocking
@@ -145,55 +121,37 @@ sub recv {
 
     $logger->debug("Waiting for content at: $dest_stream");
 
-    my @block;
-    if ($timeout) {
-        # 0 means block indefinitely in Redis
-        $timeout = 0 if $timeout == -1;
-        $timeout *= 1000; # milliseconds
-        @block = (BLOCK => $timeout);
-    }
+    my $packet;
 
-    my @params = (
-        GROUP => $dest_stream,
-        $self->address,
-        COUNT => 1,
-        @block,
-        'NOACK',
-        STREAMS => $dest_stream,
-        '>' # new messages only
-    );      
+    if ($timeout == 0) {
+        # Non-blocking list pop
+        eval { $packet = $self->redis->lpop($dest_stream) };
 
-    my $packet;
-    eval {$packet = $self->redis->xreadgroup(@params) };
+    } else {
+        # In Redis, timeout 0 means wait indefinitely
+        eval { $packet = 
+            $self->redis->blpop($dest_stream, $timeout == -1 ? 0 : $timeout) };
+    }
 
     if ($@) {
-        $logger->error("Redis XREADGROUP error: $@ : @params");
+        $logger->error("Redis list pop error: $@");
         return undef;
     }
 
     # Timed out waiting for data.
     return undef unless defined $packet;
 
-    # TODO make this more self-documenting.  also, too brittle?
-    # Also note at some point we may need to return info about the
-    # recipient stream to the caller in case we are listening
-    # on multiple streams.
-    my $container = $packet->[0]->[1]->[0];
-    my $msg_id = $container->[0];
-    my $json = $container->[1]->[1];
+    my $json = ref $packet eq 'ARRAY' ? $packet->[1] : $packet;
 
     $logger->internal("recv() $json");
 
-    return {
-        msg_json => $json,
-        msg_id => $msg_id
-    };
+    return $json;
 }
 
 sub flush_socket {
     my $self = shift;
     # Remove all messages from my address
-    $self->redis->xtrim($self->address, 'MAXLEN', 0);
+    $self->redis->del($self->address);
     return 1;
 }
 
index c7c2835..eff90a3 100644 (file)
@@ -18,9 +18,9 @@ my $_singleton;
 sub retrieve { return $_singleton; }
 
 sub new {
-    my ($class, $service, $no_cache) = @_;
+    my ($class, $service, $force) = @_;
 
-    return $_singleton if $_singleton && !$no_cache;
+    return $_singleton if $_singleton && !$force;
 
     my $self = {service => $service};
 
@@ -36,12 +36,9 @@ sub new {
     if ($service) {
         # If we're a service, this is where we listen for service-level requests.
         $self->{service_address} = "opensrf:service:$service";
-        $self->create_service_stream;
     }
 
-    $_singleton = $self unless $no_cache;
-
-    return $self;
+    return $_singleton = $self;
 }
 
 sub reset {                                                                    
@@ -143,26 +140,6 @@ sub tcp_connected {
     return $self->connected;
 }
 
-sub create_service_stream {
-    my $self = shift;
-
-    eval { 
-        # This gets mad when a stream / group already exists, 
-        # but it's conceivable that it's already been created.
-
-        $self->primary_connection->redis->xgroup(   
-            'create',
-            $self->service_address, # stream name
-            $self->service_address, # group name
-            '$',            # only receive new messages
-            'mkstream'      # create this stream if it's not there.
-        );
-    };
-
-    if ($@) {
-        $logger->debug("BUSYGROUP is OK => : $@");
-    }
-}
 
 # Send a message to $recipient regardless of what's in the 'to'
 # field of the message.
@@ -193,8 +170,6 @@ sub send_to {
         }
     }
 
-    $logger->internal("send(): recipient=$recipient : $msg_json");
-
     $con->send($recipient, $msg_json);
 }
 
@@ -237,12 +212,10 @@ sub recv {
 
     return undef unless $resp;
 
-    my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json});
+    my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp);
 
     return undef unless $msg;
 
-    $msg->msg_id($resp->{msg_id});
-
     $logger->internal("recv()'ed thread=" . $msg->thread);
 
     # The message body is doubly encoded as JSON.
@@ -256,7 +229,7 @@ sub flush_socket {
     my $self = shift;
     # Remove all messages from our personal stream
     if (my $con = $self->primary_connection) {
-        $con->redis->xtrim($con->address, 'MAXLEN', 0);
+        $con->redis->del($con->address);
     }
     return 1;
 }
index a017126..a31329a 100644 (file)
@@ -18,7 +18,6 @@ sub new {
         $self->{thread} = $args{thread} || '';
         $self->{body} = $args{body} || '';
         $self->{osrf_xid} = $args{osrf_xid} || '';
-        $self->{msg_id} = $args{msg_id} || '';
         $self->{router_command} = $args{router_command} || '';
         $self->{router_class} = $args{router_class} || '';
         $self->{router_reply} = $args{router_reply} || '';
@@ -68,12 +67,6 @@ sub osrf_xid {
     return $self->{osrf_xid};
 }
 
-sub msg_id {
-    my($self, $msg_id) = @_;
-    $self->{msg_id} = $msg_id if defined $msg_id;
-    return $self->{msg_id};
-}
-
 sub router_command {
     my($self, $router_command) = @_;
     $self->{router_command} = $router_command if defined $router_command;
index 8aac820..742ad8c 100644 (file)
@@ -6,8 +6,8 @@ use OpenSRF::Transport::Redis::Client;
 use base qw/OpenSRF::Transport::Redis::Client/;
 
 sub construct {
-    my ($class, $service, $no_cache) = @_;
-    return __PACKAGE__->SUPER::new($service, $no_cache);
+    my ($class, $service, $force) = @_;
+    return __PACKAGE__->SUPER::new($service, $force);
 }
 
 sub process {