Websocket stdio / websocketd experiment
authorBill Erickson <berickxx@gmail.com>
Mon, 11 Jun 2018 14:51:53 +0000 (10:51 -0400)
committerBill Erickson <berickxx@gmail.com>
Mon, 11 Jun 2018 14:51:53 +0000 (10:51 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/websocket-stdio/osrf-websocket-stdio.c

index f32e28c..68cde32 100644 (file)
 // practice, this number will be very small, rarely reaching double
 // digits.  This is just a security back-stop.  A client trying to open
 // this many connections is almost certainly attempting to DOS the
-// gateway / server.  We may want to lower this further.
+// gateway / server.  
 #define MAX_ACTIVE_STATEFUL_SESSIONS 64
 
+// Message exceeding this size are discarded.
+// This value must be greater than RESET_MESSAGE_SIZE (below)
+// ~10M
+#define MAX_MESSAGE_SIZE 10485760
+
+// After processing any message this size or larger, free and 
+// recreate the stdin buffer to release the memory.
+// ~100k
+#define RESET_MESSAGE_SIZE 102400
+
 // default values, replaced during setup (below) as needed.
 static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
 static char* config_ctxt = "gateway";
@@ -61,6 +71,7 @@ static transport_client* osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 static char* client_ip = NULL;
 
+static void rebuild_stdin_buffer();
 static void child_init();
 static void read_from_stdin();
 static void relay_stdin_message(const char*);
@@ -81,8 +92,7 @@ int main() {
     // Handle shutdown signal
     signal(SIGINT, sigint_handler);
 
-    // stdin buffer is cleared with each read and reused.
-    stdin_buf = buffer_init(512);
+    rebuild_stdin_buffer();
 
     // Connect to OpenSRF
     // exits on error.
@@ -141,6 +151,15 @@ int main() {
     return shut_it_down(0);
 }
 
+static void rebuild_stdin_buffer() {
+
+    if (stdin_buf != NULL) {
+        buffer_free(stdin_buf);
+    }
+
+    stdin_buf = buffer_init(1024);
+}
+
 static int shut_it_down(int stat) {
     osrfHashFree(stateful_session_cache);
     buffer_free(stdin_buf);
@@ -216,14 +235,38 @@ static void read_from_stdin() {
 
         if (c == '\n') { // end of current message
 
+            if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+                osrfLogError(OSRF_LOG_MARK, 
+                    "WS message exceeded MAX_MESSAGE_SIZE, discarding");
+                rebuild_stdin_buffer();
+                return;
+            }
+
             if (stdin_buf->n_used > 0) {
                 relay_stdin_message(stdin_buf->buf);
-                buffer_reset(stdin_buf);
+
+                if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
+                    // Current message is large.  Rebuild the buffer
+                    // to free the excess memory.
+                    rebuild_stdin_buffer();
+
+                } else {
+
+                    // Reset the buffer and carry on.
+                    buffer_reset(stdin_buf);
+                }
             }
 
             return;
 
         } else {
+
+            if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+                // Message exceeds max message size.
+                // Continue reading and discarding data.
+                continue;
+            }
+
             // Add the char to our current message buffer
             buffer_add_char(stdin_buf, c);
         }
@@ -238,7 +281,6 @@ static void relay_stdin_message(const char* msg_string) {
     const char *service = NULL;
     const char *thread = NULL;
     const char *log_xid = NULL;
-    char *tmp_recip = NULL;
     char *msg_body = NULL;
     char *recipient = NULL;
 
@@ -313,17 +355,17 @@ static void relay_stdin_message(const char* msg_string) {
 
     // 'recipient' will be freed in extract_inbound_messages
     // during a DISCONNECT call.  Retain a local copy.
-    tmp_recip = strdup(recipient);
+    recipient = strdup(recipient);
 
-    msg_body = extract_inbound_messages(service, thread, recipient, osrf_msg);
+    msg_body = extract_inbound_messages(service, thread, osrf_msg);
 
     osrfLogInternal(OSRF_LOG_MARK,
         "WS relaying inbound message: %s", msg_body);
 
     transport_message *tmsg = message_init(
-        msg_body, NULL, thread, tmp_recip, NULL);
+        msg_body, NULL, thread, recipient, NULL);
 
-    free(tmp_recip);
+    free(recipient);
 
     message_set_osrf_xid(tmsg, osrfLogGetXid());
 
@@ -339,10 +381,7 @@ static void relay_stdin_message(const char* msg_string) {
 }
 
 static char* extract_inbound_messages(
-        const char* service, 
-        const char* thread, 
-        const char* recipient, 
-        const jsonObject *osrf_msg) {
+        const char* service, const char* thread, const jsonObject *osrf_msg) {
 
     int i;
     int num_msgs = osrf_msg->size;
@@ -366,6 +405,9 @@ static char* extract_inbound_messages(
 
         switch (msg->m_type) {
 
+            case CONNECT:
+                break;
+
             case REQUEST:
                 log_request(service, msg);
                 break;
@@ -375,8 +417,8 @@ static char* extract_inbound_messages(
                 break;
 
             default:
-                osrfLogError(OSRF_LOG_MARK, 
-                    "WS received unexpected message type");
+                osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
+                    "type from WebSocket client: %d", msg->m_type);
                 break;
         }
     }