Websocket avoid waiting for response/end after connect OK
authorBill Erickson <berickxx@gmail.com>
Mon, 26 Dec 2022 21:52:48 +0000 (16:52 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 20 Apr 2023 14:18:05 +0000 (10:18 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/websocket-stdio/osrf-websocket-stdio.c

index a3e18f8..d612a96 100644 (file)
@@ -101,8 +101,8 @@ static void read_from_stdin();
 static void relay_stdin_message(const char*);
 static char* extract_inbound_messages();
 static void log_request(const char*, osrfMessage*);
-static void read_from_osrf();
-static void read_one_osrf_message(transport_message*);
+static int read_from_osrf();
+static int read_one_osrf_message(transport_message*);
 static int shut_it_down(int);
 static void release_hash_string(char*, void*);
 static int can_shutdown_gracefully();
@@ -142,6 +142,13 @@ int main(int argc, char* argv[]) {
     int shutdown_stat;
     struct timeval tv;
 
+    // When we have active threads, we check, but avoid waiting on
+    // the websocket file descriptor, since we have an active OSRF
+    // conversation.  However, in some scenarios (e.g. directly after a
+    // CONNECT), we know osrf will not be replying with any more data
+    // until another requests comes in from the WS client.
+    int break_osrf_listen_loop = 0;
+
     while (1) {
 
         FD_ZERO(&fds);
@@ -159,7 +166,7 @@ int main(int argc, char* argv[]) {
 
         } else {
 
-            if (active_threads->size > 0) {
+            if (active_threads->size > 0 && !break_osrf_listen_loop) {
                 tv.tv_usec = 0;
                 tv.tv_sec = 0;
                 
@@ -192,13 +199,13 @@ int main(int argc, char* argv[]) {
 
             if (FD_ISSET(stdin_no, &fds)) {
                 read_from_stdin();
-                read_from_osrf();
+                break_osrf_listen_loop = read_from_osrf();
             }
 
         } else if (active_threads->size > 0) {
             // Nothing pulled from the websocket, but we still have
             // active osrf request.  See if any new responses have arrived.
-            read_from_osrf();
+            break_osrf_listen_loop = read_from_osrf();
         }
 
         if (shutdown_requested) {
@@ -598,7 +605,7 @@ static void log_request(const char* service, osrfMessage* msg) {
 
 // Relay response messages from OpenSRF to STDIN
 // Relays all available messages
-static void read_from_osrf() {
+static int read_from_osrf() {
     transport_message* tmsg = NULL;
 
     // Double check the socket connection before continuing.
@@ -617,28 +624,40 @@ static void read_from_osrf() {
     // second to receive a response.  Then return to inspect stdin
     // to see if there are any requests waiting we can push through.
     // Then come back here.
+    int break_osrf_read_loop = 0;
+
     while (1) {
-        int timeout = active_threads->size > 0 ? 1 : 0;
+        int timeout = (active_threads->size > 0 && !break_osrf_read_loop) ? 1 : 0;
 
         tmsg = client_recv(osrf_handle, timeout);
 
-        if (!tmsg) { break; }
-
-        read_one_osrf_message(tmsg);
+        // Let the caller know if the last response we processed was a 
+        // CONNECT-success and therefore don't expect any more data
+        // from OSRF until another API call comes.
+        if (!tmsg) { return break_osrf_read_loop; }
 
         osrfLogDebug(OSRF_LOG_MARK,
             "WS relaying message to STDOUT thread=%s, recipient=%s",
              tmsg->thread, tmsg->recipient);
+
+        // read_one_osrf_message returns true if we should avoid waiting
+        // for another response or a complete message.  Typically,
+        // this happends directly after a successful CONNECT, where
+        // follow-up responses are not delivered until an API call
+        // is sent.
+        break_osrf_read_loop = read_one_osrf_message(tmsg);
     }
 }
 
 // Process a single OpenSRF response message and print the reponse
 // to STDOUT for delivery to the websocket client.
-static void read_one_osrf_message(transport_message* tmsg) {
+static int read_one_osrf_message(transport_message* tmsg) {
     osrfList *msg_list = NULL;
     osrfMessage *one_msg = NULL;
     int i;
 
+    int break_listen_loop = 0;
+
     osrfLogDebug(OSRF_LOG_MARK,
         "WS received opensrf response for thread=%s", tmsg->thread);
 
@@ -658,6 +677,8 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
             if (one_msg->status_code == OSRF_STATUS_OK) {
 
+                break_listen_loop = 1;
+
                 if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
 
                     unsigned long ses_size =
@@ -732,6 +753,8 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
     free(msg_string);
     jsonObjectFree(msg_wrapper);
+
+    return break_listen_loop;
 }