my @perms = qw/
-@all
- +xgroup
- +xadd
- +xreadgroup
- +xtrim
+ +lpop
+ +blpop
+ +rpush
+del
~opensrf:router:*
~opensrf:service:*
typedef struct transport_con_struct transport_con;
struct transport_con_msg_struct {
- char* msg_id;
char* msg_json;
};
typedef struct transport_con_msg_struct transport_con_msg;
int transport_con_disconnect(transport_con* con);
-int transport_con_send(transport_con* con, const char* msg_json, const char* stream);
+int transport_con_send(transport_con* con, const char* msg_json, const char* recipient);
-transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream);
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient);
-transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* stream);
+transport_con_msg* transport_con_recv(transport_con* con, int timeout, const char* recipient);
void transport_con_flush_socket(transport_con* con);
int handle_redis_error(redisReply *reply, const char* command, ...);
-int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok);
-
-
#ifdef __cplusplus
}
#endif
client->primary_connection = con;
- transport_con_connect(
+ return transport_con_connect(
con, client->port, client->username, client->password);
-
- // Make a stream for the service address
- return transport_con_make_stream(con, client->service_address, 1);
}
int client_connect(transport_client* client) {
"client_send_message() to=%s %s", recipient, msg->msg_json);
return transport_con_send(con, msg->msg_json, recipient);
-
- osrfLogInternal(OSRF_LOG_MARK, "client_send_message() send completed");
-
- return 0;
}
transport_message* client_recv_stream(transport_client* client, int timeout, const char* stream) {
osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_msg_free()");
if (msg == NULL) { return; }
-
- if (msg->msg_id) { free(msg->msg_id); }
if (msg->msg_json) { free(msg->msg_json); }
free(msg);
freeReplyObject(reply);
- return transport_con_make_stream(con, con->address, 0);
-}
-
-int transport_con_make_stream(transport_con* con, const char* stream, int exists_ok) {
- osrfLogInternal(OSRF_LOG_MARK, "TCON transport_con_make_stream() stream=%s", stream);
-
- redisReply *reply = redisCommand(
- con->bus,
- "XGROUP CREATE %s %s $ mkstream",
- stream,
- stream,
- "$",
- "mkstream"
- );
-
- // Produces an error when a group/stream already exists, but that's
- // acceptible when creating a group/stream for a stop-level service
- // address, since multiple Listeners are allowed.
- if (handle_redis_error(reply,
- "XGROUP CREATE %s %s $ mkstream",
- stream,
- stream,
- "$",
- "mkstream"
- )) { return exists_ok; }
-
- freeReplyObject(reply);
-
return 1;
}
return 0;
}
-int transport_con_send(transport_con* con, const char* msg_json, const char* stream) {
+// Returns 0 on success.
+int transport_con_send(transport_con* con, const char* msg_json, const char* recipient) {
- osrfLogInternal(OSRF_LOG_MARK, "Sending to stream=%s: %s", stream, msg_json);
+ osrfLogInternal(OSRF_LOG_MARK, "Sending to recipient=%s: %s", recipient, msg_json);
- redisReply *reply = redisCommand(con->bus,
- "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
- stream,
- con->max_queue,
- msg_json
- );
+ redisReply *reply = redisCommand(con->bus, "RPUSH %s %s", recipient, msg_json);
- if (handle_redis_error(reply,
- "XADD %s NOMKSTREAM MAXLEN ~ %d * message %s",
- stream, con->max_queue, msg_json)) {
+ int stat = handle_redis_error(reply, "RPUSH %s %s", recipient, msg_json);
- return -1;
+ if (!stat) {
+ freeReplyObject(reply);
}
- freeReplyObject(reply);
-
- return 0;
+ return stat;
}
-transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* stream) {
+transport_con_msg* transport_con_recv_once(transport_con* con, int timeout, const char* recipient) {
+
osrfLogInternal(OSRF_LOG_MARK,
- "TCON transport_con_recv_once() timeout=%d stream=%s", timeout, stream);
+ "TCON transport_con_recv_once() timeout=%d recipient=%s", timeout, recipient);
- if (stream == NULL) { stream = con->address; }
+ if (recipient == NULL) { recipient = con->address; }
- redisReply *reply, *tmp;
- char *msg_id = NULL, *json = NULL;
+ size_t len = 0;
+ char command_buf[256];
- if (timeout == 0) {
+ if (timeout == 0) { // Non-blocking list pop
- reply = redisCommand(con->bus,
- "XREADGROUP GROUP %s %s COUNT 1 STREAMS %s >",
- stream, con->address, stream
- );
+ len = snprintf(command_buf, 256, "LPOP %s", recipient);
} else {
+
+ if (timeout < 0) { // Block indefinitely
- if (timeout == -1) {
- // Redis timeout 0 means block indefinitely
- timeout = 0;
- } else {
- // Milliseconds
- timeout *= 1000;
- }
+ len = snprintf(command_buf, 256, "BLPOP %s 0", recipient);
- reply = redisCommand(con->bus,
- "XREADGROUP GROUP %s %s BLOCK %d COUNT 1 STREAMS %s >",
- stream, con->address, timeout, stream
- );
+ } else { // Block up to timeout seconds
+
+ len = snprintf(command_buf, 256, "BLPOP %s %d", recipient, timeout);
+ }
}
- // Timeout or error
- if (handle_redis_error(
- reply,
- "XREADGROUP GROUP %s %s %s COUNT 1 NOACK STREAMS %s >",
- stream, con->address, "BLOCK X", stream
- )) { return NULL; }
-
- // Unpack the XREADGROUP response, which is a nest of arrays.
- // These arrays are mostly 1 and 2-element lists, since we are
- // only reading one item on a single stream.
- if (reply->type == REDIS_REPLY_ARRAY && reply->elements > 0) {
- tmp = reply->element[0];
-
- if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
- tmp = tmp->element[1];
-
- if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 0) {
- tmp = tmp->element[0];
-
- if (tmp->type == REDIS_REPLY_ARRAY && tmp->elements > 1) {
- redisReply *r1 = tmp->element[0];
- redisReply *r2 = tmp->element[1];
-
- if (r1->type == REDIS_REPLY_STRING) {
- msg_id = strdup(r1->str);
- }
-
- if (r2->type == REDIS_REPLY_ARRAY && r2->elements > 1) {
- // r2->element[0] is the message name, which we
- // currently don't use for anything.
-
- r2 = r2->element[1];
-
- if (r2->type == REDIS_REPLY_STRING) {
- json = strdup(r2->str);
- }
- }
- }
- }
+ command_buf[len] = '\0';
+
+ osrfLogInternal(OSRF_LOG_MARK,
+ "recv_one_chunk() sending command: %s", command_buf);
+
+ redisReply* reply = redisCommand(con->bus, command_buf);
+ if (handle_redis_error(reply, command_buf)) { return NULL; }
+
+ char* json = NULL;
+ if (reply->type == REDIS_REPLY_STRING) { // LPOP
+ json = strdup(reply->str);
+
+ } else if (reply->type == REDIS_REPLY_ARRAY) { // BLPOP
+
+ // BLPOP returns [list_name, popped_value]
+ if (reply->elements == 2 && reply->element[1]->str != NULL) {
+ json = strdup(reply->element[1]->str);
+ } else {
+ osrfLogInternal(OSRF_LOG_MARK,
+ "No response returned within timeout: %d", timeout);
}
}
- freeReplyObject(reply); // XREADGROUP
+ freeReplyObject(reply);
+
+ osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
- if (msg_id == NULL) {
- // Read timed out. 'json' will also be NULL.
+ if (json == NULL) {
return NULL;
}
transport_con_msg* tcon_msg = safe_malloc(sizeof(transport_con_msg));
- tcon_msg->msg_id = msg_id;
tcon_msg->msg_json = json;
- osrfLogInternal(OSRF_LOG_MARK, "recv_one_chunk() read json: %s", json);
-
return tcon_msg;
}
@{$self->{active_list}}." active children...");
# block until a child is becomes available
+ $logger->info("waiting for child procs to clear in graceful shutdown");
$self->check_status(1);
}
$logger->info("server: all clear for graceful shutdown");
my $wait_time = 1;
# main server loop
- while(1) {
+ while (1) {
$self->check_status;
$self->{child_died} = 0;
# when we hit equilibrium, there's no need for regular
# maintenance, so set wait_time to 'forever'
- $wait_time = -1 if
+ #
+ # Avoid indefinite waiting here -- Redis client
+ # gracefully handles interrupts and immediately goes
+ # back to listening after the signal handler is
+ # complete. In our case, the signal handler may include
+ # un-registering with routers, which requires the Redis
+ # client to wait for an ACK from the Redis server.
+ # However, it will never receive the ack because our
+ # client is already blocking on an BLPOP call wiating
+ # for a new request. In future, we could replace
+ # signals with messages sent directly to listeners
+ # telling them to shutdown.
+ $wait_time = 3 if
!$self->perform_idle_maintenance and # no maintenance performed this time
@{$self->{active_list}} == 0; # no active children
}
router_command => "unregister",
router_class => $self->{service}
);
+ $logger->info("Disconnect sent to $router");
}
}
my $self = shift;
my $service = $self->{parent}->{service};
$0 = "OpenSRF Drone [$service]";
- OpenSRF::Transport::PeerHandle->construct($service);
+ OpenSRF::Transport::PeerHandle->construct($service, 1);
OpenSRF::Application->application_implementation->child_init
if (OpenSRF::Application->application_implementation->can('child_init'));
}
$logger->debug("Redis client connecting: ".
"domain=$domain port=$port username=$username address=$address");
- # On disconnect, try to reconnect every 100ms up to 60 seconds.
+ # On disconnect, try to reconnect every second up to 60 seconds.
my @connect_args = (
server => "$domain:$port",
reconnect => 60,
- every => 100_000
+ every => 1_000_000
);
$logger->debug("Connecting to bus: @connect_args");
$logger->debug("Auth'ed with Redis as $username OK : address=$address");
- # Each bus connection has its own stream / group for receiving
- # direct messages. These streams/groups should not pre-exist.
-
- $self->redis->xgroup(
- 'create',
- $address,
- $address,
- '$', # only receive new messages
- 'mkstream' # create this stream if it's not there.
- );
-
return $self;
}
$logger->internal("send(): to=$dest_stream : $msg_json");
- my @params = (
- $dest_stream,
- 'NOMKSTREAM',
- 'MAXLEN',
- '~', # maxlen-ish
- $self->{max_queue},
- '*', # let Redis generate the ID
- 'message', # gotta call it something
- $msg_json
- );
-
- eval { $self->redis->xadd(@params) };
+ eval { $self->redis->rpush($dest_stream, $msg_json) };
- if ($@) {
- $logger->error("XADD error: $@ : @params");
- }
+ if ($@) { $logger->error("RPUSH error: $@"); }
}
# $timeout=0 means check for data without blocking
$logger->debug("Waiting for content at: $dest_stream");
- my @block;
- if ($timeout) {
- # 0 means block indefinitely in Redis
- $timeout = 0 if $timeout == -1;
- $timeout *= 1000; # milliseconds
- @block = (BLOCK => $timeout);
- }
+ my $packet;
- my @params = (
- GROUP => $dest_stream,
- $self->address,
- COUNT => 1,
- @block,
- 'NOACK',
- STREAMS => $dest_stream,
- '>' # new messages only
- );
+ if ($timeout == 0) {
+ # Non-blocking list pop
+ eval { $packet = $self->redis->lpop($dest_stream) };
- my $packet;
- eval {$packet = $self->redis->xreadgroup(@params) };
+ } else {
+ # In Redis, timeout 0 means wait indefinitely
+ eval { $packet =
+ $self->redis->blpop($dest_stream, $timeout == -1 ? 0 : $timeout) };
+ }
if ($@) {
- $logger->error("Redis XREADGROUP error: $@ : @params");
+ $logger->error("Redis list pop error: $@");
return undef;
}
# Timed out waiting for data.
return undef unless defined $packet;
- # TODO make this more self-documenting. also, too brittle?
- # Also note at some point we may need to return info about the
- # recipient stream to the caller in case we are listening
- # on multiple streams.
- my $container = $packet->[0]->[1]->[0];
- my $msg_id = $container->[0];
- my $json = $container->[1]->[1];
+ my $json = ref $packet eq 'ARRAY' ? $packet->[1] : $packet;
$logger->internal("recv() $json");
- return {
- msg_json => $json,
- msg_id => $msg_id
- };
+ return $json;
}
sub flush_socket {
my $self = shift;
# Remove all messages from my address
- $self->redis->xtrim($self->address, 'MAXLEN', 0);
+ $self->redis->del($self->address);
return 1;
}
sub retrieve { return $_singleton; }
sub new {
- my ($class, $service, $no_cache) = @_;
+ my ($class, $service, $force) = @_;
- return $_singleton if $_singleton && !$no_cache;
+ return $_singleton if $_singleton && !$force;
my $self = {service => $service};
if ($service) {
# If we're a service, this is where we listen for service-level requests.
$self->{service_address} = "opensrf:service:$service";
- $self->create_service_stream;
}
- $_singleton = $self unless $no_cache;
-
- return $self;
+ return $_singleton = $self;
}
sub reset {
return $self->connected;
}
-sub create_service_stream {
- my $self = shift;
-
- eval {
- # This gets mad when a stream / group already exists,
- # but it's conceivable that it's already been created.
-
- $self->primary_connection->redis->xgroup(
- 'create',
- $self->service_address, # stream name
- $self->service_address, # group name
- '$', # only receive new messages
- 'mkstream' # create this stream if it's not there.
- );
- };
-
- if ($@) {
- $logger->debug("BUSYGROUP is OK => : $@");
- }
-}
# Send a message to $recipient regardless of what's in the 'to'
# field of the message.
}
}
- $logger->internal("send(): recipient=$recipient : $msg_json");
-
$con->send($recipient, $msg_json);
}
return undef unless $resp;
- my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp->{msg_json});
+ my $msg = OpenSRF::Transport::Redis::Message->new(json => $resp);
return undef unless $msg;
- $msg->msg_id($resp->{msg_id});
-
$logger->internal("recv()'ed thread=" . $msg->thread);
# The message body is doubly encoded as JSON.
my $self = shift;
# Remove all messages from our personal stream
if (my $con = $self->primary_connection) {
- $con->redis->xtrim($con->address, 'MAXLEN', 0);
+ $con->redis->del($con->address);
}
return 1;
}
$self->{thread} = $args{thread} || '';
$self->{body} = $args{body} || '';
$self->{osrf_xid} = $args{osrf_xid} || '';
- $self->{msg_id} = $args{msg_id} || '';
$self->{router_command} = $args{router_command} || '';
$self->{router_class} = $args{router_class} || '';
$self->{router_reply} = $args{router_reply} || '';
return $self->{osrf_xid};
}
-sub msg_id {
- my($self, $msg_id) = @_;
- $self->{msg_id} = $msg_id if defined $msg_id;
- return $self->{msg_id};
-}
-
sub router_command {
my($self, $router_command) = @_;
$self->{router_command} = $router_command if defined $router_command;
use base qw/OpenSRF::Transport::Redis::Client/;
sub construct {
- my ($class, $service, $no_cache) = @_;
- return __PACKAGE__->SUPER::new($service, $no_cache);
+ my ($class, $service, $force) = @_;
+ return __PACKAGE__->SUPER::new($service, $force);
}
sub process {