From 5c0e27e3812b0248bd1aa62977f09132ff4c921b Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Sun, 10 Jun 2018 12:16:43 -0400 Subject: [PATCH] Websocket stdio / websocketd experiment Signed-off-by: Bill Erickson --- .gitignore | 4 + src/websocket-stdio/osrf-websocket-stdio.c | 445 ++++++++++++++++++++++++++++- 2 files changed, 437 insertions(+), 12 deletions(-) diff --git a/.gitignore b/.gitignore index 0170ebe..ac2a33e 100644 --- a/.gitignore +++ b/.gitignore @@ -96,3 +96,7 @@ tests/check_transport_client tests/check_transport_message tests/Makefile tests/Makefile.in +src/websocket-stdio/.deps/ +src/websocket-stdio/.libs/ +src/websocket-stdio/osrf-websocket-stdio +src/websocket-stdio/*.o diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index a237c06..f4879bd 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -1,7 +1,25 @@ +/* +Copyright (C) 2018 King County Library Service +Bill Erickson + +Code borrows heavily from osrf_websocket_translator.c + +This program is free software; you can redistribute it and/or +modify it under the terms of the GNU General Public License +as published by the Free Software Foundation; either version 2 +of the License, or (at your option) any later version. + +This program is distributed in the hope that it will be useful, +but WITHOUT ANY WARRANTY; without even the implied warranty of +MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +GNU General Public License for more details. +*/ + #include #include #include #include +#include #include #include #include @@ -9,34 +27,107 @@ #include #include +#define MAX_THREAD_SIZE 64 +#define RECIP_BUF_SIZE 256 +#define WEBSOCKET_INGRESS "ws-translator-v2" + +// maximun number of active, CONNECTed opensrf sessions allowed. in +// practice, this number will be very small, rarely reaching double +// digits. This is just a security back-stop. A client trying to open +// this many connections is almost certainly attempting to DOS the +// gateway / server. We may want to lower this further. +#define MAX_ACTIVE_STATEFUL_SESSIONS 128 + +// default values, replaced during setup (below) as needed. +static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO +static char* config_ctxt = "gateway"; +static char* osrf_router = NULL; +static char* osrf_domain = NULL; +static osrfHash* stateful_session_cache = NULL; +static char* client_ip; + +static time_t idle_timeout_interval = 120; +static time_t idle_check_interval = 5; +static time_t last_activity_time = 0; + +// Generally, we do not disconnect the client (as idle) if there is a +// request in flight. However, we need to have an upper bound on the +// amount of time we will wait for in-flight requests to complete to +// avoid leaving an effectively idle connection open after a request +// died on the backend and no response was received. +// Note that if other activity occurs while a long-running request +// is active, the wait time will get reset with each new activity. +// This is OK, though, because the goal of max_request_wait_time +// is not to chop requests off at the knees, it's to allow the client +// to timeout as idle when only a single long-running request is active +// and preventing timeout. +static time_t max_request_wait_time = 600; + +// Incremented with every REQUEST, decremented with every COMPLETE. +// Gives us a rough picture of the number of reqests we've sent to +// the server vs. the number for which a completed response has been +// received. +static int requests_in_flight = 0; + +static growing_buffer* stdin_buf = NULL; +static transport_client* osrf_handle = NULL; +static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer + static void child_init(); static void read_from_stdin(); +static void relay_stdin_message(const char*); +static char* extract_inbound_messages(); static void read_from_osrf(); -static growing_buffer* stdin_buf = NULL; -static transport_client* osrf_handle = NULL; -char* osrf_config = "/openils/conf/opensrf_core.xml"; // TODO int main() { - // Reusable stdin reading buffer. + + // stdin buffer is cleared with each read and reused. stdin_buf = buffer_init(512); - child_init(); // exits on error + // Connect to OpenSRF + // exits on error. + child_init(); // Disable output buffering. setbuf(stdout, NULL); + // The main loop waits for data to be available on both STDIN + // (inbound websocket data) and the OpenSRF XMPP socket (outbound + // replies). + fd_set fds; int stdin_no = fileno(stdin); int osrf_no = osrf_handle->session->sock_id; int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no; + int sel_resp; while (1) { FD_ZERO(&fds); - FD_SET(osrf_no, &fds); - FD_SET(stdin_no, &fds); + FD_SET(osrf_no, &fds); + FD_SET(stdin_no, &fds); + + sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); - select(maxfd + 1, &fds, NULL, NULL, NULL); + if (sel_resp < 0) { + + if (errno == EINTR) { + // Interrupted by a signal. Start the loop over. + continue; + } + + if (errno == EBADF) { + osrfLogError(OSRF_LOG_MARK, + "WS exiting on bad file descriptor"); + + } else { + osrfLogError(OSRF_LOG_MARK, + "WS exiting on irrecoverable select() error"); + } + + // No way to fix this one, exit program. + exit(1); + } if (FD_ISSET(stdin_no, &fds)) { read_from_stdin(); @@ -47,7 +138,9 @@ int main() { } } + osrfHashFree(stateful_session_cache); buffer_free(stdin_buf); + osrf_system_shutdown(); return 0; } @@ -55,26 +148,41 @@ int main() { // Connect to opensrf static void child_init() { - if (!osrf_system_bootstrap_client(osrf_config, "gateway") ) { + if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) { fprintf(stderr, "Cannot boostrap OSRF\n"); exit(1); } osrf_handle = osrfSystemGetTransportClient(); - osrfAppSessionSetIngress("osrf-websocket-stdio"); + osrfAppSessionSetIngress(WEBSOCKET_INGRESS); + + osrf_router = osrfConfigGetValue(NULL, "/router_name"); + osrf_domain = osrfConfigGetValue(NULL, "/domain"); + stateful_session_cache = osrfNewHash(); + + client_ip = getenv("REMOTE_ADDR"); + osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); + + // TODO read timeout ENV vars } + // Relay messages from STDIN to OpenSRF -// Reads one message then exits +// Reads one message then returns static void read_from_stdin() { char c; while ( (c = getchar()) ) { + if (c == EOF) { + osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect"); + exit(0); + } + if (c == '\n') { // end of current message if (stdin_buf->n_used > 0) { - printf("Echoing: %s\n", stdin_buf->buf); + relay_stdin_message(stdin_buf->buf); buffer_reset(stdin_buf); } @@ -87,12 +195,325 @@ static void read_from_stdin() { } } +static void relay_stdin_message(const char* msg_string) { + + jsonObject *msg_wrapper = NULL; // free me + const jsonObject *tmp_obj = NULL; + const jsonObject *osrf_msg = NULL; + const char *service = NULL; + const char *thread = NULL; + const char *log_xid = NULL; + char *msg_body = NULL; + char *recipient = NULL; + + // generate a new log trace for this request. it + // may be replaced by a client-provided trace below. + osrfLogMkXid(); + + osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string); + + msg_wrapper = jsonParse(msg_string); + + if (msg_wrapper == NULL) { + osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string); + return; + } + + osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg"); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) ) + service = jsonObjectGetString(tmp_obj); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) ) + thread = jsonObjectGetString(tmp_obj); + + if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) ) + log_xid = jsonObjectGetString(tmp_obj); + + if (log_xid) { + + // use the caller-provide log trace id + if (strlen(log_xid) > MAX_THREAD_SIZE) { + osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length"); + return; + } + + osrfLogForceXid(log_xid); + } + + if (thread) { + + if (strlen(thread) > MAX_THREAD_SIZE) { + osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length"); + return; + } + + // since clients can provide their own threads at session start time, + // the presence of a thread does not guarantee a cached recipient + recipient = (char*) osrfHashGet(stateful_session_cache, thread); + + if (recipient) { + osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); + } + } + + if (!recipient) { + + if (service) { + int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1, + "%s@%s/%s", osrf_router, osrf_domain, service); + recipient_buf[size] = '\0'; + recipient = recipient_buf; + + } else { + osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient"); + return; + } + } + + osrfLogDebug(OSRF_LOG_MARK, + "WS relaying message to opensrf thread=%s, recipient=%s", + thread, recipient); + + msg_body = extract_inbound_messages( + service, thread, recipient, osrf_msg); + + osrfLogInternal(OSRF_LOG_MARK, + "WS relaying inbound message: %s", msg_body); + + transport_message *tmsg = message_init( + msg_body, NULL, thread, recipient, NULL); + + message_set_osrf_xid(tmsg, osrfLogGetXid()); + + if (client_send_message(osrf_handle, tmsg) != 0) { + osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting"); + exit(1); + } + + osrfLogClearXid(); + message_free(tmsg); + jsonObjectFree(msg_wrapper); + free(msg_body); + + last_activity_time = time(NULL); +} + +static char* extract_inbound_messages( + const char* service, + const char* thread, + const char* recipient, + const jsonObject *osrf_msg) { + + int i; + int num_msgs = osrf_msg->size; + osrfMessage* msg; + osrfMessage* msg_list[num_msgs]; + char* recipToFree = NULL; + + // here we do an extra json round-trip to get the data + // in a form osrf_message_deserialize can understand + // TODO: consider a version of osrf_message_init which can + // accept a jsonObject* instead of a JSON string. + char *osrf_msg_json = jsonObjectToJSON(osrf_msg); + osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); + free(osrf_msg_json); + + // should we require the caller to always pass the service? + if (service == NULL) service = ""; + + for(i = 0; i < num_msgs; i++) { + msg = msg_list[i]; + osrfMessageSetIngress(msg, WEBSOCKET_INGRESS); + + switch (msg->m_type) { + + case REQUEST: { + const jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + char* method = msg->method_name; + buffer_fadd(act, "[%s] [%s] %s %s", + client_ip, "", service, method); + + const jsonObject* obj = NULL; + int i = 0; + const char* str; + int redactParams = 0; + while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { + if(!strncmp(method, str, strlen(str))) { + redactParams = 1; + break; + } + } + if(redactParams) { + OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); + } else { + i = 0; + while((obj = jsonObjectGetIndex(params, i++))) { + char* str = jsonObjectToJSON(obj); + if( i == 1 ) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + free(str); + } + } + osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); + buffer_free(act); + requests_in_flight++; + break; + } + + case DISCONNECT: + recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread); + if (recipToFree != NULL) { + free(recipToFree); + } + + break; + + default: + osrfLogError(OSRF_LOG_MARK, + "WS received unexpected message type"); + break; + } + } + + char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs); + + // clean up our messages + for(i = 0; i < num_msgs; i++) + osrfMessageFree(msg_list[i]); + + return finalMsg; +} + + // Relay messages from OpenSRF to STDIN // Relays all available messages static void read_from_osrf() { + osrfList *msg_list = NULL; + osrfMessage *one_msg = NULL; + char* recipToFree = NULL; + int i; + + // At this point we know the osrf socket has data, read + // up to one message with no waiting. + transport_message* tmsg = client_recv(osrf_handle, 0); + + if (!tmsg) { + // tmsg can only be NULL if the underlying select() call is + // interrupted or the jabber socket connection was severed. + + if (client_connected(osrf_handle) && + socket_connected(osrf_handle->session->sock_id)) { + return; // back to main select loop + } + + // Socket connection was broken. Send disconnect to client, + // causing on_disconnect_handler to run and cleanup. + osrfLogWarning(OSRF_LOG_MARK, + "WS: Jabber socket disconnected, exiting"); + + exit(1); + } + + osrfLogDebug(OSRF_LOG_MARK, + "WS received opensrf response for thread=%s", tmsg->thread); + + // first we need to perform some maintenance + msg_list = osrfMessageDeserialize(tmsg->body, NULL); + + for (i = 0; i < msg_list->size; i++) { + one_msg = OSRF_LIST_GET_INDEX(msg_list, i); + + osrfLogDebug(OSRF_LOG_MARK, + "WS returned response of type %d", one_msg->m_type); + + /* if our client just successfully connected to an opensrf service, + cache the sender so that future calls on this thread will use + the correct recipient. */ + if (one_msg && one_msg->m_type == STATUS) { + + if (one_msg->status_code == OSRF_STATUS_OK) { + + if (!osrfHashGet(stateful_session_cache, tmsg->thread)) { + + unsigned long ses_size = + osrfHashGetCount(stateful_session_cache); + + if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { + + osrfLogDebug(OSRF_LOG_MARK, "WS caching sender " + "thread=%s, sender=%s; concurrent=%d", + tmsg->thread, tmsg->sender, ses_size); + + char* sender = strdup(tmsg->sender); // free in *Remove + osrfHashSet(stateful_session_cache, sender, tmsg->thread); + + } else { + osrfLogWarning(OSRF_LOG_MARK, + "WS max concurrent sessions (%d) reached. " + "Current session will not be tracked", + MAX_ACTIVE_STATEFUL_SESSIONS + ); + } + } + + } else { + + // connection timed out; clear the cached recipient + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { + recipToFree = + (char*) osrfHashRemove(stateful_session_cache, tmsg->thread); + if (recipToFree != NULL) { + free(recipToFree); + } + + } else { + if (one_msg->status_code == OSRF_STATUS_COMPLETE) + requests_in_flight--; + } + } + } + } + + // osrfMessageDeserialize applies the freeItem handler to the + // newly created osrfList. We only need to free the list and + // the individual osrfMessage's will be freed along with it + osrfListFree(msg_list); + + // relay the response messages to the client + jsonObject *msg_wrapper = NULL; + char *msg_string = NULL; + + // build the wrapper object + msg_wrapper = jsonNewObject(NULL); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); + + if (tmsg->is_error) { + osrfLogError(OSRF_LOG_MARK, + "WS received jabber error message in response to thread=%s", + tmsg->thread); + jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + } + + msg_string = jsonObjectToJSONRaw(msg_wrapper); + + // Send the JSON to STDOUT + printf("%s\n", msg_string); + + free(msg_string); + jsonObjectFree(msg_wrapper); } + + + -- 2.11.0