From 355966695789989c0bc3fdade4d5a8c16280e4c8 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 26 Dec 2022 16:52:48 -0500 Subject: [PATCH] Websocket avoid waiting for response/end after connect OK Signed-off-by: Bill Erickson --- src/websocket-stdio/osrf-websocket-stdio.c | 45 ++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 11 deletions(-) diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index a3e18f8..d612a96 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -101,8 +101,8 @@ static void read_from_stdin(); static void relay_stdin_message(const char*); static char* extract_inbound_messages(); static void log_request(const char*, osrfMessage*); -static void read_from_osrf(); -static void read_one_osrf_message(transport_message*); +static int read_from_osrf(); +static int read_one_osrf_message(transport_message*); static int shut_it_down(int); static void release_hash_string(char*, void*); static int can_shutdown_gracefully(); @@ -142,6 +142,13 @@ int main(int argc, char* argv[]) { int shutdown_stat; struct timeval tv; + // When we have active threads, we check, but avoid waiting on + // the websocket file descriptor, since we have an active OSRF + // conversation. However, in some scenarios (e.g. directly after a + // CONNECT), we know osrf will not be replying with any more data + // until another requests comes in from the WS client. + int break_osrf_listen_loop = 0; + while (1) { FD_ZERO(&fds); @@ -159,7 +166,7 @@ int main(int argc, char* argv[]) { } else { - if (active_threads->size > 0) { + if (active_threads->size > 0 && !break_osrf_listen_loop) { tv.tv_usec = 0; tv.tv_sec = 0; @@ -192,13 +199,13 @@ int main(int argc, char* argv[]) { if (FD_ISSET(stdin_no, &fds)) { read_from_stdin(); - read_from_osrf(); + break_osrf_listen_loop = read_from_osrf(); } } else if (active_threads->size > 0) { // Nothing pulled from the websocket, but we still have // active osrf request. See if any new responses have arrived. - read_from_osrf(); + break_osrf_listen_loop = read_from_osrf(); } if (shutdown_requested) { @@ -598,7 +605,7 @@ static void log_request(const char* service, osrfMessage* msg) { // Relay response messages from OpenSRF to STDIN // Relays all available messages -static void read_from_osrf() { +static int read_from_osrf() { transport_message* tmsg = NULL; // Double check the socket connection before continuing. @@ -617,28 +624,40 @@ static void read_from_osrf() { // second to receive a response. Then return to inspect stdin // to see if there are any requests waiting we can push through. // Then come back here. + int break_osrf_read_loop = 0; + while (1) { - int timeout = active_threads->size > 0 ? 1 : 0; + int timeout = (active_threads->size > 0 && !break_osrf_read_loop) ? 1 : 0; tmsg = client_recv(osrf_handle, timeout); - if (!tmsg) { break; } - - read_one_osrf_message(tmsg); + // Let the caller know if the last response we processed was a + // CONNECT-success and therefore don't expect any more data + // from OSRF until another API call comes. + if (!tmsg) { return break_osrf_read_loop; } osrfLogDebug(OSRF_LOG_MARK, "WS relaying message to STDOUT thread=%s, recipient=%s", tmsg->thread, tmsg->recipient); + + // read_one_osrf_message returns true if we should avoid waiting + // for another response or a complete message. Typically, + // this happends directly after a successful CONNECT, where + // follow-up responses are not delivered until an API call + // is sent. + break_osrf_read_loop = read_one_osrf_message(tmsg); } } // Process a single OpenSRF response message and print the reponse // to STDOUT for delivery to the websocket client. -static void read_one_osrf_message(transport_message* tmsg) { +static int read_one_osrf_message(transport_message* tmsg) { osrfList *msg_list = NULL; osrfMessage *one_msg = NULL; int i; + int break_listen_loop = 0; + osrfLogDebug(OSRF_LOG_MARK, "WS received opensrf response for thread=%s", tmsg->thread); @@ -658,6 +677,8 @@ static void read_one_osrf_message(transport_message* tmsg) { if (one_msg->status_code == OSRF_STATUS_OK) { + break_listen_loop = 1; + if (!osrfHashGet(stateful_session_cache, tmsg->thread)) { unsigned long ses_size = @@ -732,6 +753,8 @@ static void read_one_osrf_message(transport_message* tmsg) { free(msg_string); jsonObjectFree(msg_wrapper); + + return break_listen_loop; } -- 2.11.0