Websocket stdio / websocketd experiment
authorBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 16:16:43 +0000 (12:16 -0400)
committerBill Erickson <berickxx@gmail.com>
Sun, 10 Jun 2018 16:16:43 +0000 (12:16 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
.gitignore
src/websocket-stdio/osrf-websocket-stdio.c

index 0170ebe..ac2a33e 100644 (file)
@@ -96,3 +96,7 @@ tests/check_transport_client
 tests/check_transport_message
 tests/Makefile
 tests/Makefile.in
+src/websocket-stdio/.deps/
+src/websocket-stdio/.libs/
+src/websocket-stdio/osrf-websocket-stdio
+src/websocket-stdio/*.o
index a237c06..f4879bd 100644 (file)
@@ -1,7 +1,25 @@
+/*
+Copyright (C) 2018 King County Library Service
+Bill Erickson <berickxx@gmail.com>
+
+Code borrows heavily from osrf_websocket_translator.c
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+GNU General Public License for more details.
+*/
+
 #include <stdio.h>
 #include <unistd.h>
 #include <string.h>
 #include <opensrf/utils.h>
+#include <opensrf/osrf_hash.h>
 #include <opensrf/socket_bundle.h>
 #include <opensrf/transport_client.h>
 #include <opensrf/osrf_message.h>
 #include <opensrf/utils.h>
 #include <opensrf/log.h>
 
+#define MAX_THREAD_SIZE 64
+#define RECIP_BUF_SIZE 256
+#define WEBSOCKET_INGRESS "ws-translator-v2"
+
+// maximun number of active, CONNECTed opensrf sessions allowed. in
+// practice, this number will be very small, rarely reaching double
+// digits.  This is just a security back-stop.  A client trying to open
+// this many connections is almost certainly attempting to DOS the
+// gateway / server.  We may want to lower this further.
+#define MAX_ACTIVE_STATEFUL_SESSIONS 128
+
+// default values, replaced during setup (below) as needed.
+static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
+static char* config_ctxt = "gateway";
+static char* osrf_router = NULL;
+static char* osrf_domain = NULL;
+static osrfHash* stateful_session_cache = NULL;
+static char* client_ip;
+
+static time_t idle_timeout_interval = 120;
+static time_t idle_check_interval = 5;
+static time_t last_activity_time = 0;
+
+// Generally, we do not disconnect the client (as idle) if there is a
+// request in flight.  However, we need to have an upper bound on the
+// amount of time we will wait for in-flight requests to complete to
+// avoid leaving an effectively idle connection open after a request
+// died on the backend and no response was received.
+// Note that if other activity occurs while a long-running request
+// is active, the wait time will get reset with each new activity.
+// This is OK, though, because the goal of max_request_wait_time
+// is not to chop requests off at the knees, it's to allow the client
+// to timeout as idle when only a single long-running request is active
+// and preventing timeout.
+static time_t max_request_wait_time = 600;
+
+// Incremented with every REQUEST, decremented with every COMPLETE.
+// Gives us a rough picture of the number of reqests we've sent to
+// the server vs. the number for which a completed response has been
+// received.
+static int requests_in_flight = 0;
+
+static growing_buffer* stdin_buf = NULL;
+static transport_client* osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+
 static void child_init();
 static void read_from_stdin();
+static void relay_stdin_message(const char*);
+static char* extract_inbound_messages();
 static void read_from_osrf();
-static growing_buffer* stdin_buf = NULL;
-static transport_client* osrf_handle = NULL; 
-char* osrf_config = "/openils/conf/opensrf_core.xml"; // TODO
 
 int main() {
-    // Reusable stdin reading buffer.
+
+    // stdin buffer is cleared with each read and reused.
     stdin_buf = buffer_init(512);
 
-    child_init(); // exits on error
+    // Connect to OpenSRF
+    // exits on error.
+    child_init();
 
     // Disable output buffering.
     setbuf(stdout, NULL);
 
+    // The main loop waits for data to be available on both STDIN
+    // (inbound websocket data) and the OpenSRF XMPP socket (outbound
+    // replies).
+
     fd_set fds;
     int stdin_no = fileno(stdin);
     int osrf_no = osrf_handle->session->sock_id;
     int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+    int sel_resp;
 
     while (1) {
 
         FD_ZERO(&fds);
-        FD_SET(osrf_no, &fds); 
-        FD_SET(stdin_no, &fds); 
+        FD_SET(osrf_no, &fds);
+        FD_SET(stdin_no, &fds);
+
+        sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
 
-        select(maxfd + 1, &fds, NULL, NULL, NULL); 
+        if (sel_resp < 0) { 
+
+            if (errno == EINTR) {
+                // Interrupted by a signal.  Start the loop over.
+                continue;
+            }
+
+            if (errno == EBADF) {
+                osrfLogError(OSRF_LOG_MARK, 
+                    "WS exiting on bad file descriptor");
+
+            } else {
+                osrfLogError(OSRF_LOG_MARK, 
+                    "WS exiting on irrecoverable select() error");
+            }
+
+            // No way to fix this one, exit program.
+            exit(1);
+        }
 
         if (FD_ISSET(stdin_no, &fds)) {
             read_from_stdin();
@@ -47,7 +138,9 @@ int main() {
         }
     }
 
+    osrfHashFree(stateful_session_cache);
     buffer_free(stdin_buf);
+    osrf_system_shutdown();
     return 0;
 }
 
@@ -55,26 +148,41 @@ int main() {
 // Connect to opensrf
 static void child_init() {
 
-    if (!osrf_system_bootstrap_client(osrf_config, "gateway") ) {
+    if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
         fprintf(stderr, "Cannot boostrap OSRF\n");
         exit(1);
     }
 
        osrf_handle = osrfSystemGetTransportClient();
-       osrfAppSessionSetIngress("osrf-websocket-stdio");
+       osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
+
+    osrf_router = osrfConfigGetValue(NULL, "/router_name");
+    osrf_domain = osrfConfigGetValue(NULL, "/domain");
+    stateful_session_cache = osrfNewHash();
+
+    client_ip = getenv("REMOTE_ADDR");
+    osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+
+    // TODO read timeout ENV vars
 }
 
+
 // Relay messages from STDIN to OpenSRF
-// Reads one message then exits
+// Reads one message then returns
 static void read_from_stdin() {
     char c;
 
     while ( (c = getchar()) ) {
 
+        if (c == EOF) {
+            osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
+            exit(0);
+        }
+
         if (c == '\n') { // end of current message
 
             if (stdin_buf->n_used > 0) {
-                printf("Echoing: %s\n", stdin_buf->buf);
+                relay_stdin_message(stdin_buf->buf);
                 buffer_reset(stdin_buf);
             }
 
@@ -87,12 +195,325 @@ static void read_from_stdin() {
     }
 }
 
+static void relay_stdin_message(const char* msg_string) {
+
+    jsonObject *msg_wrapper = NULL; // free me
+    const jsonObject *tmp_obj = NULL;
+    const jsonObject *osrf_msg = NULL;
+    const char *service = NULL;
+    const char *thread = NULL;
+    const char *log_xid = NULL;
+    char *msg_body = NULL;
+    char *recipient = NULL;
+
+    // generate a new log trace for this request. it
+    // may be replaced by a client-provided trace below.
+    osrfLogMkXid();
+
+    osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
+
+    msg_wrapper = jsonParse(msg_string);
+
+    if (msg_wrapper == NULL) {
+        osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
+        return;
+    }
+
+    osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
+        service = jsonObjectGetString(tmp_obj);
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
+        thread = jsonObjectGetString(tmp_obj);
+
+    if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
+        log_xid = jsonObjectGetString(tmp_obj);
+
+    if (log_xid) {
+
+        // use the caller-provide log trace id
+        if (strlen(log_xid) > MAX_THREAD_SIZE) {
+            osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
+            return;
+        }
+
+        osrfLogForceXid(log_xid);
+    }
+
+    if (thread) {
+
+        if (strlen(thread) > MAX_THREAD_SIZE) {
+            osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
+            return;
+        }
+
+        // since clients can provide their own threads at session start time,
+        // the presence of a thread does not guarantee a cached recipient
+        recipient = (char*) osrfHashGet(stateful_session_cache, thread);
+
+        if (recipient) {
+            osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
+        }
+    }
+
+    if (!recipient) {
+
+        if (service) {
+            int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
+                "%s@%s/%s", osrf_router, osrf_domain, service);
+            recipient_buf[size] = '\0';
+            recipient = recipient_buf;
+
+        } else {
+            osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
+            return;
+        }
+    }
+
+    osrfLogDebug(OSRF_LOG_MARK,
+        "WS relaying message to opensrf thread=%s, recipient=%s",
+            thread, recipient);
+
+    msg_body = extract_inbound_messages(
+        service, thread, recipient, osrf_msg);
+
+    osrfLogInternal(OSRF_LOG_MARK,
+        "WS relaying inbound message: %s", msg_body);
+
+    transport_message *tmsg = message_init(
+        msg_body, NULL, thread, recipient, NULL);
+
+    message_set_osrf_xid(tmsg, osrfLogGetXid());
+
+    if (client_send_message(osrf_handle, tmsg) != 0) {
+        osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
+        exit(1);
+    }
+
+    osrfLogClearXid();
+    message_free(tmsg);
+    jsonObjectFree(msg_wrapper);
+    free(msg_body);
+
+    last_activity_time = time(NULL);
+}
+
+static char* extract_inbound_messages(
+        const char* service, 
+        const char* thread, 
+        const char* recipient, 
+        const jsonObject *osrf_msg) {
+
+    int i;
+    int num_msgs = osrf_msg->size;
+    osrfMessage* msg;
+    osrfMessage* msg_list[num_msgs];
+    char* recipToFree = NULL;
+
+    // here we do an extra json round-trip to get the data
+    // in a form osrf_message_deserialize can understand
+    // TODO: consider a version of osrf_message_init which can 
+    // accept a jsonObject* instead of a JSON string.
+    char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+    osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+    free(osrf_msg_json);
+
+    // should we require the caller to always pass the service?
+    if (service == NULL) service = "";
+
+    for(i = 0; i < num_msgs; i++) {
+        msg = msg_list[i];
+        osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
+
+        switch (msg->m_type) {
+
+            case REQUEST: {
+                const jsonObject* params = msg->_params;
+                growing_buffer* act = buffer_init(128);
+                char* method = msg->method_name;
+                buffer_fadd(act, "[%s] [%s] %s %s", 
+                    client_ip, "", service, method);
+
+                const jsonObject* obj = NULL;
+                int i = 0;
+                const char* str;
+                int redactParams = 0;
+                while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+                    if(!strncmp(method, str, strlen(str))) {
+                        redactParams = 1;
+                        break;
+                    }
+                }
+                if(redactParams) {
+                    OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+                } else {
+                    i = 0;
+                    while((obj = jsonObjectGetIndex(params, i++))) {
+                        char* str = jsonObjectToJSON(obj);
+                        if( i == 1 )
+                            OSRF_BUFFER_ADD(act, " ");
+                        else
+                            OSRF_BUFFER_ADD(act, ", ");
+                        OSRF_BUFFER_ADD(act, str);
+                        free(str);
+                    }
+                }
+                osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+                buffer_free(act);
+                requests_in_flight++;
+                break;
+            }
+
+            case DISCONNECT:
+                recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread);
+                if (recipToFree != NULL) {
+                    free(recipToFree);
+                }
+
+                break;
+
+            default:
+                osrfLogError(OSRF_LOG_MARK, 
+                    "WS received unexpected message type");
+                break;
+        }
+    }
+
+    char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
+
+    // clean up our messages
+    for(i = 0; i < num_msgs; i++) 
+        osrfMessageFree(msg_list[i]);
+
+    return finalMsg;
+}
+
+
 
 // Relay messages from OpenSRF to STDIN
 // Relays all available messages
 static void read_from_osrf() {
 
+    osrfList *msg_list = NULL;
+    osrfMessage *one_msg = NULL;
+    char* recipToFree = NULL;
+    int i;
+
+    // At this point we know the osrf socket has data, read
+    // up to one message with no waiting.
+    transport_message* tmsg = client_recv(osrf_handle, 0);
+
+    if (!tmsg) {
+        // tmsg can only be NULL if the underlying select() call is
+        // interrupted or the jabber socket connection was severed.
+
+        if (client_connected(osrf_handle) &&
+            socket_connected(osrf_handle->session->sock_id)) {
+            return; // back to main select loop
+        }
+
+        // Socket connection was broken.  Send disconnect to client,
+        // causing on_disconnect_handler to run and cleanup.
+        osrfLogWarning(OSRF_LOG_MARK, 
+            "WS: Jabber socket disconnected, exiting");
+
+        exit(1);
+    }
+
+    osrfLogDebug(OSRF_LOG_MARK,
+        "WS received opensrf response for thread=%s", tmsg->thread);
+
+    // first we need to perform some maintenance
+    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+    for (i = 0; i < msg_list->size; i++) {
+        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+        osrfLogDebug(OSRF_LOG_MARK,
+            "WS returned response of type %d", one_msg->m_type);
+
+        /*  if our client just successfully connected to an opensrf service,
+            cache the sender so that future calls on this thread will use
+            the correct recipient. */
+        if (one_msg && one_msg->m_type == STATUS) {
+
+            if (one_msg->status_code == OSRF_STATUS_OK) {
+
+                if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
+
+                    unsigned long ses_size = 
+                        osrfHashGetCount(stateful_session_cache);
+
+                    if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
+
+                        osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
+                            "thread=%s, sender=%s; concurrent=%d",
+                            tmsg->thread, tmsg->sender, ses_size);
+
+                        char* sender = strdup(tmsg->sender); // free in *Remove
+                        osrfHashSet(stateful_session_cache, sender, tmsg->thread);
+
+                    } else {
+                        osrfLogWarning(OSRF_LOG_MARK,
+                            "WS max concurrent sessions (%d) reached.  "
+                            "Current session will not be tracked",
+                            MAX_ACTIVE_STATEFUL_SESSIONS
+                        );
+                    }
+                }
+
+            } else {
+
+                // connection timed out; clear the cached recipient
+                if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+                    recipToFree = 
+                        (char*) osrfHashRemove(stateful_session_cache, tmsg->thread);
+                    if (recipToFree != NULL) {
+                        free(recipToFree);
+                    }
+
+                } else {
+                    if (one_msg->status_code == OSRF_STATUS_COMPLETE)
+                        requests_in_flight--;
+                }
+            }
+        }
+    }
+
+    // osrfMessageDeserialize applies the freeItem handler to the
+    // newly created osrfList.  We only need to free the list and
+    // the individual osrfMessage's will be freed along with it
+    osrfListFree(msg_list);
+
+    // relay the response messages to the client
+    jsonObject *msg_wrapper = NULL;
+    char *msg_string = NULL;
+
+    // build the wrapper object
+    msg_wrapper = jsonNewObject(NULL);
+    jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+    jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+    jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+    if (tmsg->is_error) {
+        osrfLogError(OSRF_LOG_MARK,
+            "WS received jabber error message in response to thread=%s",
+            tmsg->thread);
+        jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+    }
+
+    msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+    // Send the JSON to STDOUT
+    printf("%s\n", msg_string);
+
+    free(msg_string);
+    jsonObjectFree(msg_wrapper);
 }
 
 
 
+
+
+