From c9ea2c6d2dd721c6df3383c3aa58d7fc66ddb8b6 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Sun, 10 Jun 2018 19:09:53 -0400 Subject: [PATCH] Websocket stdio / websocketd experiment Signed-off-by: Bill Erickson --- src/websocket-stdio/osrf-websocket-stdio.c | 162 ++++++++++++++++------------- 1 file changed, 92 insertions(+), 70 deletions(-) diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index 6233e1b..6427ef6 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -1,19 +1,29 @@ -/* -Copyright (C) 2018 King County Library Service -Bill Erickson - -Code borrows heavily from osrf_websocket_translator.c - -This program is free software; you can redistribute it and/or -modify it under the terms of the GNU General Public License -as published by the Free Software Foundation; either version 2 -of the License, or (at your option) any later version. - -This program is distributed in the hope that it will be useful, -but WITHOUT ANY WARRANTY; without even the implied warranty of -MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -GNU General Public License for more details. -*/ +/* -------------------------------------------------------------------- + * Copyright (C) 2018 King County Library Service + * Bill Erickson + * + * Code borrows heavily from osrf_websocket_translator.c + * + * This program is free software; you can redistribute it and/or + * modify it under the terms of the GNU General Public License + * as published by the Free Software Foundation; either version 2 + * of the License, or (at your option) any later version. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU General Public License for more details. +--------------------------------------------------------------------- */ + +/** + * OpenSRF Websockets Relay + * + * Reads Websockets requests on STDIN + * Sends replies to requests on STDOUT + * + * Built to function with websocketd: + * https://github.com/joewalnes/websocketd + */ #include #include @@ -21,11 +31,9 @@ GNU General Public License for more details. #include #include #include -#include #include #include #include -#include #include #define MAX_THREAD_SIZE 64 @@ -37,26 +45,31 @@ GNU General Public License for more details. // digits. This is just a security back-stop. A client trying to open // this many connections is almost certainly attempting to DOS the // gateway / server. We may want to lower this further. -#define MAX_ACTIVE_STATEFUL_SESSIONS 128 +#define MAX_ACTIVE_STATEFUL_SESSIONS 64 // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO static char* config_ctxt = "gateway"; static char* osrf_router = NULL; static char* osrf_domain = NULL; + +// Cache of opensrf thread strings and back-end receipients. +// Tracking this here means the caller only needs to track/know the thread. static osrfHash* stateful_session_cache = NULL; -static char* client_ip = NULL; static growing_buffer* stdin_buf = NULL; static transport_client* osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer +static char* client_ip = NULL; static void child_init(); static void read_from_stdin(); static void relay_stdin_message(const char*); static char* extract_inbound_messages(); +static void log_request(const char*, osrfMessage*); static void read_from_osrf(); static void read_one_osrf_message(transport_message*); static int shut_it_down(int); +static void release_hash_string(char*, void*); static void sigint_handler(int sig) { osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown"); @@ -150,20 +163,33 @@ static void child_init() { osrf_router = osrfConfigGetValue(NULL, "/router_name"); osrf_domain = osrfConfigGetValue(NULL, "/domain"); + stateful_session_cache = osrfNewHash(); + osrfHashSetCallback(stateful_session_cache, release_hash_string); client_ip = getenv("REMOTE_ADDR"); osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); } +// Called by osrfHash when a string is removed. We strdup each +// string before it goes into the hash. +static void release_hash_string(char* key, void* str) { + if (str == NULL) return; + free((char*) str); +} + // Relay messages from STDIN to OpenSRF -// Reads one message then returns to allow for response to intermingle +// Reads one message then returns to allow for responses to intermingle // with a long series of requests. static void read_from_stdin() { char char_buf[1]; char c; + // Read one char at a time so we can stop at the first newline + // and leave any other data on the wire until read_from_stdin() + // is called again. + while (1) { int stat = read(fileno(stdin), char_buf, 1); @@ -198,7 +224,7 @@ static void read_from_stdin() { return; } else { - + // Add the char to our current message buffer buffer_add_char(stdin_buf, c); } } @@ -316,7 +342,6 @@ static char* extract_inbound_messages( int num_msgs = osrf_msg->size; osrfMessage* msg; osrfMessage* msg_list[num_msgs]; - char* recipToFree = NULL; // here we do an extra json round-trip to get the data // in a form osrf_message_deserialize can understand @@ -329,54 +354,18 @@ static char* extract_inbound_messages( // should we require the caller to always pass the service? if (service == NULL) service = ""; - for(i = 0; i < num_msgs; i++) { + for (i = 0; i < num_msgs; i++) { msg = msg_list[i]; osrfMessageSetIngress(msg, WEBSOCKET_INGRESS); switch (msg->m_type) { - case REQUEST: { - const jsonObject* params = msg->_params; - growing_buffer* act = buffer_init(128); - char* method = msg->method_name; - buffer_fadd(act, "[%s] [%s] %s %s", - client_ip, "", service, method); - - const jsonObject* obj = NULL; - int i = 0; - const char* str; - int redactParams = 0; - while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { - if(!strncmp(method, str, strlen(str))) { - redactParams = 1; - break; - } - } - if(redactParams) { - OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); - } else { - i = 0; - while((obj = jsonObjectGetIndex(params, i++))) { - char* str = jsonObjectToJSON(obj); - if( i == 1 ) - OSRF_BUFFER_ADD(act, " "); - else - OSRF_BUFFER_ADD(act, ", "); - OSRF_BUFFER_ADD(act, str); - free(str); - } - } - osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); - buffer_free(act); + case REQUEST: + log_request(service, msg); break; - } case DISCONNECT: - recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread); - if (recipToFree != NULL) { - free(recipToFree); - } - + osrfHashRemove(stateful_session_cache, thread); break; default: @@ -389,12 +378,50 @@ static char* extract_inbound_messages( char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs); // clean up our messages - for(i = 0; i < num_msgs; i++) + for (i = 0; i < num_msgs; i++) osrfMessageFree(msg_list[i]); return finalMsg; } +static void log_request(const char* service, osrfMessage* msg) { + + const jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + char* method = msg->method_name; + const jsonObject* obj = NULL; + int i = 0; + const char* str; + int redactParams = 0; + + buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method); + + while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { + if (!strncmp(method, str, strlen(str))) { + redactParams = 1; + break; + } + } + + if (redactParams) { + OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); + } else { + i = 0; + while ((obj = jsonObjectGetIndex(params, i++))) { + char* str = jsonObjectToJSON(obj); + if (i == 1) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + free(str); + } + } + + osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); + buffer_free(act); +} + // Relay messages from OpenSRF to STDIN @@ -423,7 +450,6 @@ static void read_from_osrf() { static void read_one_osrf_message(transport_message* tmsg) { osrfList *msg_list = NULL; osrfMessage *one_msg = NULL; - char* recipToFree = NULL; int i; osrfLogDebug(OSRF_LOG_MARK, @@ -473,11 +499,7 @@ static void read_one_osrf_message(transport_message* tmsg) { // connection timed out; clear the cached recipient if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { - recipToFree = - (char*) osrfHashRemove(stateful_session_cache, tmsg->thread); - if (recipToFree != NULL) { - free(recipToFree); - } + osrfHashRemove(stateful_session_cache, tmsg->thread); } } } -- 2.11.0