From: Bill Erickson Date: Mon, 11 Jun 2018 14:51:53 +0000 (-0400) Subject: Websocket stdio / websocketd experiment X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=2ac62b2c5cfc5415e7614e38ad79670bdcb77497;p=working%2FOpenSRF.git Websocket stdio / websocketd experiment Signed-off-by: Bill Erickson --- diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index f32e28c..68cde32 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -44,9 +44,19 @@ // practice, this number will be very small, rarely reaching double // digits. This is just a security back-stop. A client trying to open // this many connections is almost certainly attempting to DOS the -// gateway / server. We may want to lower this further. +// gateway / server. #define MAX_ACTIVE_STATEFUL_SESSIONS 64 +// Message exceeding this size are discarded. +// This value must be greater than RESET_MESSAGE_SIZE (below) +// ~10M +#define MAX_MESSAGE_SIZE 10485760 + +// After processing any message this size or larger, free and +// recreate the stdin buffer to release the memory. +// ~100k +#define RESET_MESSAGE_SIZE 102400 + // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO static char* config_ctxt = "gateway"; @@ -61,6 +71,7 @@ static transport_client* osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer static char* client_ip = NULL; +static void rebuild_stdin_buffer(); static void child_init(); static void read_from_stdin(); static void relay_stdin_message(const char*); @@ -81,8 +92,7 @@ int main() { // Handle shutdown signal signal(SIGINT, sigint_handler); - // stdin buffer is cleared with each read and reused. - stdin_buf = buffer_init(512); + rebuild_stdin_buffer(); // Connect to OpenSRF // exits on error. @@ -141,6 +151,15 @@ int main() { return shut_it_down(0); } +static void rebuild_stdin_buffer() { + + if (stdin_buf != NULL) { + buffer_free(stdin_buf); + } + + stdin_buf = buffer_init(1024); +} + static int shut_it_down(int stat) { osrfHashFree(stateful_session_cache); buffer_free(stdin_buf); @@ -216,14 +235,38 @@ static void read_from_stdin() { if (c == '\n') { // end of current message + if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { + osrfLogError(OSRF_LOG_MARK, + "WS message exceeded MAX_MESSAGE_SIZE, discarding"); + rebuild_stdin_buffer(); + return; + } + if (stdin_buf->n_used > 0) { relay_stdin_message(stdin_buf->buf); - buffer_reset(stdin_buf); + + if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) { + // Current message is large. Rebuild the buffer + // to free the excess memory. + rebuild_stdin_buffer(); + + } else { + + // Reset the buffer and carry on. + buffer_reset(stdin_buf); + } } return; } else { + + if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { + // Message exceeds max message size. + // Continue reading and discarding data. + continue; + } + // Add the char to our current message buffer buffer_add_char(stdin_buf, c); } @@ -238,7 +281,6 @@ static void relay_stdin_message(const char* msg_string) { const char *service = NULL; const char *thread = NULL; const char *log_xid = NULL; - char *tmp_recip = NULL; char *msg_body = NULL; char *recipient = NULL; @@ -313,17 +355,17 @@ static void relay_stdin_message(const char* msg_string) { // 'recipient' will be freed in extract_inbound_messages // during a DISCONNECT call. Retain a local copy. - tmp_recip = strdup(recipient); + recipient = strdup(recipient); - msg_body = extract_inbound_messages(service, thread, recipient, osrf_msg); + msg_body = extract_inbound_messages(service, thread, osrf_msg); osrfLogInternal(OSRF_LOG_MARK, "WS relaying inbound message: %s", msg_body); transport_message *tmsg = message_init( - msg_body, NULL, thread, tmp_recip, NULL); + msg_body, NULL, thread, recipient, NULL); - free(tmp_recip); + free(recipient); message_set_osrf_xid(tmsg, osrfLogGetXid()); @@ -339,10 +381,7 @@ static void relay_stdin_message(const char* msg_string) { } static char* extract_inbound_messages( - const char* service, - const char* thread, - const char* recipient, - const jsonObject *osrf_msg) { + const char* service, const char* thread, const jsonObject *osrf_msg) { int i; int num_msgs = osrf_msg->size; @@ -366,6 +405,9 @@ static char* extract_inbound_messages( switch (msg->m_type) { + case CONNECT: + break; + case REQUEST: log_request(service, msg); break; @@ -375,8 +417,8 @@ static char* extract_inbound_messages( break; default: - osrfLogError(OSRF_LOG_MARK, - "WS received unexpected message type"); + osrfLogError(OSRF_LOG_MARK, "WS received unexpected message " + "type from WebSocket client: %d", msg->m_type); break; } }