From: Bill Erickson Date: Fri, 18 Nov 2022 22:36:22 +0000 (-0500) Subject: Perl / C Redis Return to list push/pop X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=def7018b08c41e3b03e41e145deb638929981548;p=working%2FOpenSRF.git Perl / C Redis Return to list push/pop Signed-off-by: Bill Erickson --- diff --git a/bin/opensrf-perl.pl.in b/bin/opensrf-perl.pl.in index 1c34ad8..89cf565 100755 --- a/bin/opensrf-perl.pl.in +++ b/bin/opensrf-perl.pl.in @@ -648,10 +648,9 @@ sub do_reset_message_bus { my @perms = qw/ -@all - +xgroup - +xadd - +xreadgroup - +xtrim + +lpop + +blpop + +rpush +del ~opensrf:router:* ~opensrf:service:* diff --git a/include/opensrf/transport_connection.h b/include/opensrf/transport_connection.h index 25eef27..a28951e 100644 --- a/include/opensrf/transport_connection.h +++ b/include/opensrf/transport_connection.h @@ -26,7 +26,6 @@ struct transport_con_struct { typedef struct transport_con_struct transport_con; struct transport_con_msg_struct { - char* msg_id; char* msg_json; }; typedef struct transport_con_msg_struct transport_con_msg; @@ -45,19 +44,16 @@ int transport_con_connect(transport_con* con, int transport_con_disconnect(transport_con* con); -int transport_con_send(transport_con* con, const char* msg_json, const char* stream); +int transport_con_send(transport_con* con, const char* msg_json, const char* recipient); -transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream); +transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient); -transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream); +transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* recipient); void transport_con_flush_socket(transport_con* con); int handle_redis_error(redisReply *reply, const char* command, ...); -int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok); - - #ifdef __cplusplus } #endif diff --git a/src/libopensrf/transport_client.c b/src/libopensrf/transport_client.c index f020646..945b153 100644 --- a/src/libopensrf/transport_client.c +++ b/src/libopensrf/transport_client.c @@ -76,11 +76,8 @@ int client_connect_as_service(transport_client* client, const char* service) { client->primary_connection = con; - transport_con_connect( + return transport_con_connect( con, client->port, client->username, client->password); - - // Make a stream for the service address - return transport_con_make_stream(con, client->service_address, 1); } int client_connect(transport_client* client) { @@ -185,10 +182,6 @@ int client_send_message_to(transport_client* client, transport_message* msg, con "client_send_message() to=%s %s", recipient, msg->msg_json); return transport_con_send(con, msg->msg_json, recipient); - - osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed"); - - return 0; } transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) { diff --git a/src/libopensrf/transport_connection.c b/src/libopensrf/transport_connection.c index c4544db..d3f12cb 100644 --- a/src/libopensrf/transport_connection.c +++ b/src/libopensrf/transport_connection.c @@ -21,8 +21,6 @@ void transport_con_msg_free(transport_con_msg* msg) { osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()"); if (msg == NULL) { return; } - - if (msg->msg_id) { free(msg->msg_id); } if (msg->msg_json) { free(msg->msg_json); } free(msg); @@ -105,34 +103,6 @@ int transport_con_connect( freeReplyObject(reply); - return transport_con_make_stream(con, con->address, 0); -} - -int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) { - osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream); - - redisReply *reply = redisCommand( - con->bus, - "XGROUP CREATE %s %s $ mkstream", - stream, - stream, - "$", - "mkstream" - ); - - // Produces an error when a group/stream already exists, but that's - // acceptible when creating a group/stream for a stop-level service - // address, since multiple Listeners are allowed. - if (handle_redis_error(reply, - "XGROUP CREATE %s %s $ mkstream", - stream, - stream, - "$", - "mkstream" - )) { return exists_ok; } - - freeReplyObject(reply); - return 1; } @@ -153,116 +123,82 @@ int transport_con_disconnect(transport_con* con) { return 0; } -int transport_con_send(transport_con* con, const char* msg_json, const char* stream) { +// Returns 0 on success. +int transport_con_send(transport_con* con, const char* msg_json, const char* recipient) { - osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json); + osrfLogInternal(OSRF_LOG_MARK, "Sending to recipient=%s: %s", recipient, msg_json); - redisReply *reply = redisCommand(con->bus, - "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", - stream, - con->max_queue, - msg_json - ); + redisReply *reply = redisCommand(con->bus, "RPUSH %s %s", recipient, msg_json); - if (handle_redis_error(reply, - "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s", - stream, con->max_queue, msg_json)) { + int stat = handle_redis_error(reply, "RPUSH %s %s", recipient, msg_json); - return -1; + if (!stat) { + freeReplyObject(reply); } - freeReplyObject(reply); - - return 0; + return stat; } -transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) { +transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient) { + osrfLogInternal(OSRF_LOG_MARK, - "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream); + "TCON transport_con_recv_once() timeout=%d recipient=%s", timeout, recipient); - if (stream == NULL) { stream = con->address; } + if (recipient == NULL) { recipient = con->address; } - redisReply *reply, *tmp; - char *msg_id = NULL, *json = NULL; + size_t len = 0; + char command_buf[256]; - if (timeout == 0) { + if (timeout == 0) { // Non-blocking list pop - reply = redisCommand(con->bus, - "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >", - stream, con->address, stream - ); + len = snprintf(command_buf, 256, "LPOP %s", recipient); } else { + + if (timeout < 0) { // Block indefinitely - if (timeout == -1) { - // Redis timeout 0 means block indefinitely - timeout = 0; - } else { - // Milliseconds - timeout *= 1000; - } + len = snprintf(command_buf, 256, "BLPOP %s 0", recipient); - reply = redisCommand(con->bus, - "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >", - stream, con->address, timeout, stream - ); + } else { // Block up to timeout seconds + + len = snprintf(command_buf, 256, "BLPOP %s %d", recipient, timeout); + } } - // Timeout or error - if (handle_redis_error( - reply, - "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >", - stream, con->address, "BLOCK X", stream - )) { return NULL; } - - // Unpack the XREADGROUP response, which is a nest of arrays. - // These arrays are mostly 1 and 2-element lists, since we are - // only reading one item on a single stream. - if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) { - tmp = reply->element[0]; - - if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { - tmp = tmp->element[1]; - - if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) { - tmp = tmp->element[0]; - - if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) { - redisReply *r1 = tmp->element[0]; - redisReply *r2 = tmp->element[1]; - - if (r1->type == REDIS_REPLY_STRING) { - msg_id = strdup(r1->str); - } - - if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) { - // r2->element[0] is the message name, which we - // currently don't use for anything. - - r2 = r2->element[1]; - - if (r2->type == REDIS_REPLY_STRING) { - json = strdup(r2->str); - } - } - } - } + command_buf[len] = '\0'; + + osrfLogInternal(OSRF_LOG_MARK, + "recv_one_chunk() sending command: %s", command_buf); + + redisReply* reply = redisCommand(con->bus, command_buf); + if (handle_redis_error(reply, command_buf)) { return NULL; } + + char* json = NULL; + if (reply->type == REDIS_REPLY_STRING) { // LPOP + json = strdup(reply->str); + + } else if (reply->type == REDIS_REPLY_ARRAY) { // BLPOP + + // BLPOP returns [list_name, popped_value] + if (reply->elements == 2 && reply->element[1]->str != NULL) { + json = strdup(reply->element[1]->str); + } else { + osrfLogInternal(OSRF_LOG_MARK, + "No response returned within timeout: %d", timeout); } } - freeReplyObject(reply); // XREADGROUP + freeReplyObject(reply); + + osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json); - if (msg_id == NULL) { - // Read timed out. 'json' will also be NULL. + if (json == NULL) { return NULL; } transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg)); - tcon_msg->msg_id = msg_id; tcon_msg->msg_json = json; - osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json); - return tcon_msg; } diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 269ee1d..f34e633 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -81,6 +81,7 @@ sub cleanup { @{$self->{active_list}}." active children..."); # block until a child is becomes available + $logger->info("waiting for child procs to clear in graceful shutdown"); $self->check_status(1); } $logger->info("server: all clear for graceful shutdown"); @@ -154,7 +155,7 @@ sub run { my $wait_time = 1; # main server loop - while(1) { + while (1) { $self->check_status; $self->{child_died} = 0; @@ -202,7 +203,19 @@ sub run { # when we hit equilibrium, there's no need for regular # maintenance, so set wait_time to 'forever' - $wait_time = -1 if + # + # Avoid indefinite waiting here -- Redis client + # gracefully handles interrupts and immediately goes + # back to listening after the signal handler is + # complete. In our case, the signal handler may include + # un-registering with routers, which requires the Redis + # client to wait for an ACK from the Redis server. + # However, it will never receive the ack because our + # client is already blocking on an BLPOP call wiating + # for a new request. In future, we could replace + # signals with messages sent directly to listeners + # telling them to shutdown. + $wait_time = 3 if !$self->perform_idle_maintenance and # no maintenance performed this time @{$self->{active_list}} == 0; # no active children } @@ -552,6 +565,7 @@ sub unregister_routers { router_command => "unregister", router_class => $self->{service} ); + $logger->info("Disconnect sent to $router"); } } @@ -601,7 +615,7 @@ sub init { my $self = shift; my $service = $self->{parent}->{service}; $0 = "OpenSRF Drone [$service]"; - OpenSRF::Transport::PeerHandle->construct($service); + OpenSRF::Transport::PeerHandle->construct($service, 1); OpenSRF::Application->application_implementation->child_init if (OpenSRF::Application->application_implementation->can('child_init')); } diff --git a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm index f74cad8..bede432 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/BusConnection.pm @@ -65,11 +65,11 @@ sub connect { $logger->debug("Redis client connecting: ". "domain=$domain port=$port username=$username address=$address"); - # On disconnect, try to reconnect every 100ms up to 60 seconds. + # On disconnect, try to reconnect every second up to 60 seconds. my @connect_args = ( server => "$domain:$port", reconnect => 60, - every => 100_000 + every => 1_000_000 ); $logger->debug("Connecting to bus: @connect_args"); @@ -84,17 +84,6 @@ sub connect { $logger->debug("Auth'ed with Redis as $username OK : address=$address"); - # Each bus connection has its own stream / group for receiving - # direct messages. These streams/groups should not pre-exist. - - $self->redis->xgroup( - 'create', - $address, - $address, - '$', # only receive new messages - 'mkstream' # create this stream if it's not there. - ); - return $self; } @@ -116,22 +105,9 @@ sub send { $logger->internal("send(): to=$dest_stream : $msg_json"); - my @params = ( - $dest_stream, - 'NOMKSTREAM', - 'MAXLEN', - '~', # maxlen-ish - $self->{max_queue}, - '*', # let Redis generate the ID - 'message', # gotta call it something - $msg_json - ); - - eval { $self->redis->xadd(@params) }; + eval { $self->redis->rpush($dest_stream, $msg_json) }; - if ($@) { - $logger->error("XADD error: $@ : @params"); - } + if ($@) { $logger->error("RPUSH error: $@"); } } # $timeout=0 means check for data without blocking @@ -145,55 +121,37 @@ sub recv { $logger->debug("Waiting for content at: $dest_stream"); - my @block; - if ($timeout) { - # 0 means block indefinitely in Redis - $timeout = 0 if $timeout == -1; - $timeout *= 1000; # milliseconds - @block = (BLOCK => $timeout); - } + my $packet; - my @params = ( - GROUP => $dest_stream, - $self->address, - COUNT => 1, - @block, - 'NOACK', - STREAMS => $dest_stream, - '>' # new messages only - ); + if ($timeout == 0) { + # Non-blocking list pop + eval { $packet = $self->redis->lpop($dest_stream) }; - my $packet; - eval {$packet = $self->redis->xreadgroup(@params) }; + } else { + # In Redis, timeout 0 means wait indefinitely + eval { $packet = + $self->redis->blpop($dest_stream, $timeout == -1 ? 0 : $timeout) }; + } if ($@) { - $logger->error("Redis XREADGROUP error: $@ : @params"); + $logger->error("Redis list pop error: $@"); return undef; } # Timed out waiting for data. return undef unless defined $packet; - # TODO make this more self-documenting. also, too brittle? - # Also note at some point we may need to return info about the - # recipient stream to the caller in case we are listening - # on multiple streams. - my $container = $packet->[0]->[1]->[0]; - my $msg_id = $container->[0]; - my $json = $container->[1]->[1]; + my $json = ref $packet eq 'ARRAY' ? $packet->[1] : $packet; $logger->internal("recv() $json"); - return { - msg_json => $json, - msg_id => $msg_id - }; + return $json; } sub flush_socket { my $self = shift; # Remove all messages from my address - $self->redis->xtrim($self->address, 'MAXLEN', 0); + $self->redis->del($self->address); return 1; } diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm index c7c2835..eff90a3 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/Client.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/Client.pm @@ -18,9 +18,9 @@ my $_singleton; sub retrieve { return $_singleton; } sub new { - my ($class, $service, $no_cache) = @_; + my ($class, $service, $force) = @_; - return $_singleton if $_singleton && !$no_cache; + return $_singleton if $_singleton && !$force; my $self = {service => $service}; @@ -36,12 +36,9 @@ sub new { if ($service) { # If we're a service, this is where we listen for service-level requests. $self->{service_address} = "opensrf:service:$service"; - $self->create_service_stream; } - $_singleton = $self unless $no_cache; - - return $self; + return $_singleton = $self; } sub reset { @@ -143,26 +140,6 @@ sub tcp_connected { return $self->connected; } -sub create_service_stream { - my $self = shift; - - eval { - # This gets mad when a stream / group already exists, - # but it's conceivable that it's already been created. - - $self->primary_connection->redis->xgroup( - 'create', - $self->service_address, # stream name - $self->service_address, # group name - '$', # only receive new messages - 'mkstream' # create this stream if it's not there. - ); - }; - - if ($@) { - $logger->debug("BUSYGROUP is OK => : $@"); - } -} # Send a message to $recipient regardless of what's in the 'to' # field of the message. @@ -193,8 +170,6 @@ sub send_to { } } - $logger->internal("send(): recipient=$recipient : $msg_json"); - $con->send($recipient, $msg_json); } @@ -237,12 +212,10 @@ sub recv { return undef unless $resp; - my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json}); + my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp); return undef unless $msg; - $msg->msg_id($resp->{msg_id}); - $logger->internal("recv()'ed thread=" . $msg->thread); # The message body is doubly encoded as JSON. @@ -256,7 +229,7 @@ sub flush_socket { my $self = shift; # Remove all messages from our personal stream if (my $con = $self->primary_connection) { - $con->redis->xtrim($con->address, 'MAXLEN', 0); + $con->redis->del($con->address); } return 1; } diff --git a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm index a017126..a31329a 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/Message.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/Message.pm @@ -18,7 +18,6 @@ sub new { $self->{thread} = $args{thread} || ''; $self->{body} = $args{body} || ''; $self->{osrf_xid} = $args{osrf_xid} || ''; - $self->{msg_id} = $args{msg_id} || ''; $self->{router_command} = $args{router_command} || ''; $self->{router_class} = $args{router_class} || ''; $self->{router_reply} = $args{router_reply} || ''; @@ -68,12 +67,6 @@ sub osrf_xid { return $self->{osrf_xid}; } -sub msg_id { - my($self, $msg_id) = @_; - $self->{msg_id} = $msg_id if defined $msg_id; - return $self->{msg_id}; -} - sub router_command { my($self, $router_command) = @_; $self->{router_command} = $router_command if defined $router_command; diff --git a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm index 8aac820..742ad8c 100644 --- a/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm +++ b/src/perl/lib/OpenSRF/Transport/Redis/PeerConnection.pm @@ -6,8 +6,8 @@ use OpenSRF::Transport::Redis::Client; use base qw/OpenSRF::Transport::Redis::Client/; sub construct { - my ($class, $service, $no_cache) = @_; - return __PACKAGE__->SUPER::new($service, $no_cache); + my ($class, $service, $force) = @_; + return __PACKAGE__->SUPER::new($service, $force); } sub process {