Websocket stdio / websocketd experiment
authorBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 19:43:12 +0000 (15:43 -0400)
committerBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 19:43:12 +0000 (15:43 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/websocket-stdio/osrf-websocket-stdio.c

index f4879bd..c02e6ff 100644 (file)
@@ -18,6 +18,7 @@ GNU General Public License for more details.
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
+#include <signal.h>
 #include <opensrf/utils.h>
 #include <opensrf/osrf_hash.h>
 #include <opensrf/socket_bundle.h>
@@ -78,9 +79,19 @@ static void read_from_stdin();
 static void relay_stdin_message(const char*);
 static char* extract_inbound_messages();
 static void read_from_osrf();
+static void read_one_osrf_message(transport_message*);
+static int shut_it_down(int);
+
+static void sigint_handler(int sig) {                                       
+    osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
+    shut_it_down(0);
+}
 
 int main() {
 
+    // Handle shutdown signal
+    signal(SIGINT, sigint_handler);
+
     // stdin buffer is cleared with each read and reused.
     stdin_buf = buffer_init(512);
 
@@ -122,26 +133,33 @@ int main() {
 
             } else {
                 osrfLogError(OSRF_LOG_MARK, 
-                    "WS exiting on irrecoverable select() error");
+                    "WS select() failed with [%s]. Exiting", strerror(errno));
             }
 
-            // No way to fix this one, exit program.
-            exit(1);
+            // No way to fix this one, get outta here.
+            shut_it_down(1);
         }
 
         if (FD_ISSET(stdin_no, &fds)) {
+            osrfLogDebug(OSRF_LOG_MARK, "STDIN active");
             read_from_stdin();
         }
 
         if (FD_ISSET(osrf_no, &fds)) {
+            osrfLogDebug(OSRF_LOG_MARK, "XMPP active");
             read_from_osrf();
         }
     }
 
+    return shut_it_down(0);
+}
+
+static int shut_it_down(int stat) {
     osrfHashFree(stateful_session_cache);
     buffer_free(stdin_buf);
     osrf_system_shutdown();
-    return 0;
+    exit(stat);
+    return stat;
 }
 
 
@@ -150,7 +168,7 @@ static void child_init() {
 
     if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
         fprintf(stderr, "Cannot boostrap OSRF\n");
-        exit(1);
+        shut_it_down(1);
     }
 
        osrf_handle = osrfSystemGetTransportClient();
@@ -168,17 +186,36 @@ static void child_init() {
 
 
 // Relay messages from STDIN to OpenSRF
-// Reads one message then returns
+// Reads one message then returns to allow for response to intermingle
+// with a long series of requests.
 static void read_from_stdin() {
+    char char_buf[1];
     char c;
 
-    while ( (c = getchar()) ) {
+    while (1) {
+        int stat = read(fileno(stdin), char_buf, 1);
+
+        if (stat < 0) {
+
+            if (errno == EAGAIN) {
+                // read interrupted.  Return to main loop to resume.
+                return;
+            }
+
+            osrfLogError(OSRF_LOG_MARK, 
+                "WS STDIN read failed with [%s]. Exiting", strerror(errno));
+            shut_it_down(1);
+            return;
+        }
 
-        if (c == EOF) {
+        if (stat == 0) { // EOF
             osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
-            exit(0);
+            shut_it_down(0);
+            return;
         }
 
+        c = char_buf[0];
+
         if (c == '\n') { // end of current message
 
             if (stdin_buf->n_used > 0) {
@@ -288,7 +325,7 @@ static void relay_stdin_message(const char* msg_string) {
 
     if (client_send_message(osrf_handle, tmsg) != 0) {
         osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
-        exit(1);
+        shut_it_down(1);
     }
 
     osrfLogClearXid();
@@ -394,32 +431,31 @@ static char* extract_inbound_messages(
 // Relay messages from OpenSRF to STDIN
 // Relays all available messages
 static void read_from_osrf() {
+    transport_message* tmsg = NULL;
 
-    osrfList *msg_list = NULL;
-    osrfMessage *one_msg = NULL;
-    char* recipToFree = NULL;
-    int i;
-
-    // At this point we know the osrf socket has data, read
-    // up to one message with no waiting.
-    transport_message* tmsg = client_recv(osrf_handle, 0);
-
-    if (!tmsg) {
-        // tmsg can only be NULL if the underlying select() call is
-        // interrupted or the jabber socket connection was severed.
-
-        if (client_connected(osrf_handle) &&
-            socket_connected(osrf_handle->session->sock_id)) {
-            return; // back to main select loop
-        }
-
-        // Socket connection was broken.  Send disconnect to client,
-        // causing on_disconnect_handler to run and cleanup.
+    // Double check the socket connection before continuing.
+    if (!client_connected(osrf_handle) ||
+        !socket_connected(osrf_handle->session->sock_id)) {
         osrfLogWarning(OSRF_LOG_MARK, 
             "WS: Jabber socket disconnected, exiting");
+        shut_it_down(1);
+    }
 
-        exit(1);
+    // Once client_recv is called all data waiting on the socket is read.
+    // This means we can't return to the main select() loop after each message,
+    // because any subsequent messages will get stuck in the opensrf queue.
+    // Instead, process all available messages.
+    while ( (tmsg = client_recv(osrf_handle, 0)) ) {
+        read_one_osrf_message(tmsg);
+        message_free(tmsg);
     }
+}
+
+static void read_one_osrf_message(transport_message* tmsg) {
+    osrfList *msg_list = NULL;
+    osrfMessage *one_msg = NULL;
+    char* recipToFree = NULL;
+    int i;
 
     osrfLogDebug(OSRF_LOG_MARK,
         "WS received opensrf response for thread=%s", tmsg->thread);
@@ -510,6 +546,7 @@ static void read_from_osrf() {
 
     free(msg_string);
     jsonObjectFree(msg_wrapper);
+
 }