Websocket stdio / websocketd experiment
authorBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 23:09:53 +0000 (19:09 -0400)
committerBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 23:09:53 +0000 (19:09 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/websocket-stdio/osrf-websocket-stdio.c

index 6233e1b..6427ef6 100644 (file)
@@ -1,19 +1,29 @@
-/*
-Copyright (C) 2018 King County Library Service
-Bill Erickson <berickxx@gmail.com>
-
-Code borrows heavily from osrf_websocket_translator.c
-
-This program is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License
-as published by the Free Software Foundation; either version 2
-of the License, or (at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
-GNU General Public License for more details.
-*/
+/* --------------------------------------------------------------------
+ * Copyright (C) 2018 King County Library Service
+ * Bill Erickson <berickxx@gmail.com>
+ * 
+ * Code borrows heavily from osrf_websocket_translator.c
+ * 
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ * 
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+ * GNU General Public License for more details.
+--------------------------------------------------------------------- */
+
+/**
+ * OpenSRF Websockets Relay
+ *
+ * Reads Websockets requests on STDIN
+ * Sends replies to requests on STDOUT
+ *
+ * Built to function with websocketd:
+ * https://github.com/joewalnes/websocketd
+ */
 
 #include <stdio.h>
 #include <unistd.h>
@@ -21,11 +31,9 @@ GNU General Public License for more details.
 #include <signal.h>
 #include <opensrf/utils.h>
 #include <opensrf/osrf_hash.h>
-#include <opensrf/socket_bundle.h>
 #include <opensrf/transport_client.h>
 #include <opensrf/osrf_message.h>
 #include <opensrf/osrf_app_session.h>
-#include <opensrf/utils.h>
 #include <opensrf/log.h>
 
 #define MAX_THREAD_SIZE 64
@@ -37,26 +45,31 @@ GNU General Public License for more details.
 // digits.  This is just a security back-stop.  A client trying to open
 // this many connections is almost certainly attempting to DOS the
 // gateway / server.  We may want to lower this further.
-#define MAX_ACTIVE_STATEFUL_SESSIONS 128
+#define MAX_ACTIVE_STATEFUL_SESSIONS 64
 
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
 static char* config_ctxt = "gateway";
 static char* osrf_router = NULL;
 static char* osrf_domain = NULL;
+
+// Cache of opensrf thread strings and back-end receipients.
+// Tracking this here means the caller only needs to track/know the thread.
 static osrfHash* stateful_session_cache = NULL;
-static char* client_ip = NULL;
 static growing_buffer* stdin_buf = NULL;
 static transport_client* osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+static char* client_ip = NULL;
 
 static void child_init();
 static void read_from_stdin();
 static void relay_stdin_message(const char*);
 static char* extract_inbound_messages();
+static void log_request(const char*, osrfMessage*);
 static void read_from_osrf();
 static void read_one_osrf_message(transport_message*);
 static int shut_it_down(int);
+static void release_hash_string(char*, void*);
 
 static void sigint_handler(int sig) {                                       
     osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
@@ -150,20 +163,33 @@ static void child_init() {
 
     osrf_router = osrfConfigGetValue(NULL, "/router_name");
     osrf_domain = osrfConfigGetValue(NULL, "/domain");
+
     stateful_session_cache = osrfNewHash();
+    osrfHashSetCallback(stateful_session_cache, release_hash_string);
 
     client_ip = getenv("REMOTE_ADDR");
     osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
 }
 
+// Called by osrfHash when a string is removed.  We strdup each
+// string before it goes into the hash.
+static void release_hash_string(char* key, void* str) {
+    if (str == NULL) return;
+    free((char*) str);
+}
+
 
 // Relay messages from STDIN to OpenSRF
-// Reads one message then returns to allow for response to intermingle
+// Reads one message then returns to allow for responses to intermingle
 // with a long series of requests.
 static void read_from_stdin() {
     char char_buf[1];
     char c;
 
+    // Read one char at a time so we can stop at the first newline
+    // and leave any other data on the wire until read_from_stdin()
+    // is called again.
+
     while (1) {
         int stat = read(fileno(stdin), char_buf, 1);
 
@@ -198,7 +224,7 @@ static void read_from_stdin() {
             return;
 
         } else {
-
+            // Add the char to our current message buffer
             buffer_add_char(stdin_buf, c);
         }
     }
@@ -316,7 +342,6 @@ static char* extract_inbound_messages(
     int num_msgs = osrf_msg->size;
     osrfMessage* msg;
     osrfMessage* msg_list[num_msgs];
-    char* recipToFree = NULL;
 
     // here we do an extra json round-trip to get the data
     // in a form osrf_message_deserialize can understand
@@ -329,54 +354,18 @@ static char* extract_inbound_messages(
     // should we require the caller to always pass the service?
     if (service == NULL) service = "";
 
-    for(i = 0; i < num_msgs; i++) {
+    for (i = 0; i < num_msgs; i++) {
         msg = msg_list[i];
         osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
 
         switch (msg->m_type) {
 
-            case REQUEST: {
-                const jsonObject* params = msg->_params;
-                growing_buffer* act = buffer_init(128);
-                char* method = msg->method_name;
-                buffer_fadd(act, "[%s] [%s] %s %s", 
-                    client_ip, "", service, method);
-
-                const jsonObject* obj = NULL;
-                int i = 0;
-                const char* str;
-                int redactParams = 0;
-                while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
-                    if(!strncmp(method, str, strlen(str))) {
-                        redactParams = 1;
-                        break;
-                    }
-                }
-                if(redactParams) {
-                    OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
-                } else {
-                    i = 0;
-                    while((obj = jsonObjectGetIndex(params, i++))) {
-                        char* str = jsonObjectToJSON(obj);
-                        if( i == 1 )
-                            OSRF_BUFFER_ADD(act, " ");
-                        else
-                            OSRF_BUFFER_ADD(act, ", ");
-                        OSRF_BUFFER_ADD(act, str);
-                        free(str);
-                    }
-                }
-                osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
-                buffer_free(act);
+            case REQUEST:
+                log_request(service, msg);
                 break;
-            }
 
             case DISCONNECT:
-                recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread);
-                if (recipToFree != NULL) {
-                    free(recipToFree);
-                }
-
+                osrfHashRemove(stateful_session_cache, thread);
                 break;
 
             default:
@@ -389,12 +378,50 @@ static char* extract_inbound_messages(
     char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
 
     // clean up our messages
-    for(i = 0; i < num_msgs; i++) 
+    for (i = 0; i < num_msgs; i++) 
         osrfMessageFree(msg_list[i]);
 
     return finalMsg;
 }
 
+static void log_request(const char* service, osrfMessage* msg) {
+
+    const jsonObject* params = msg->_params;
+    growing_buffer* act = buffer_init(128);
+    char* method = msg->method_name;
+    const jsonObject* obj = NULL;
+    int i = 0;
+    const char* str;
+    int redactParams = 0;
+
+    buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
+
+    while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+        if (!strncmp(method, str, strlen(str))) {
+            redactParams = 1;
+            break;
+        }
+    }
+
+    if (redactParams) {
+        OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+    } else {
+        i = 0;
+        while ((obj = jsonObjectGetIndex(params, i++))) {
+            char* str = jsonObjectToJSON(obj);
+            if (i == 1)
+                OSRF_BUFFER_ADD(act, " ");
+            else
+                OSRF_BUFFER_ADD(act, ", ");
+            OSRF_BUFFER_ADD(act, str);
+            free(str);
+        }
+    }
+
+    osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+    buffer_free(act);
+}
+
 
 
 // Relay messages from OpenSRF to STDIN
@@ -423,7 +450,6 @@ static void read_from_osrf() {
 static void read_one_osrf_message(transport_message* tmsg) {
     osrfList *msg_list = NULL;
     osrfMessage *one_msg = NULL;
-    char* recipToFree = NULL;
     int i;
 
     osrfLogDebug(OSRF_LOG_MARK,
@@ -473,11 +499,7 @@ static void read_one_osrf_message(transport_message* tmsg) {
 
                 // connection timed out; clear the cached recipient
                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
-                    recipToFree = 
-                        (char*) osrfHashRemove(stateful_session_cache, tmsg->thread);
-                    if (recipToFree != NULL) {
-                        free(recipToFree);
-                    }
+                    osrfHashRemove(stateful_session_cache, tmsg->thread);
                 }
             }
         }