// practice, this number will be very small, rarely reaching double
// digits. This is just a security back-stop. A client trying to open
// this many connections is almost certainly attempting to DOS the
-// gateway / server. We may want to lower this further.
+// gateway / server.
#define MAX_ACTIVE_STATEFUL_SESSIONS 64
+// Message exceeding this size are discarded.
+// This value must be greater than RESET_MESSAGE_SIZE (below)
+// ~10M
+#define MAX_MESSAGE_SIZE 10485760
+
+// After processing any message this size or larger, free and
+// recreate the stdin buffer to release the memory.
+// ~100k
+#define RESET_MESSAGE_SIZE 102400
+
// default values, replaced during setup (below) as needed.
static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
static char* config_ctxt = "gateway";
static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
static char* client_ip = NULL;
+static void rebuild_stdin_buffer();
static void child_init();
static void read_from_stdin();
static void relay_stdin_message(const char*);
// Handle shutdown signal
signal(SIGINT, sigint_handler);
- // stdin buffer is cleared with each read and reused.
- stdin_buf = buffer_init(512);
+ rebuild_stdin_buffer();
// Connect to OpenSRF
// exits on error.
return shut_it_down(0);
}
+static void rebuild_stdin_buffer() {
+
+ if (stdin_buf != NULL) {
+ buffer_free(stdin_buf);
+ }
+
+ stdin_buf = buffer_init(1024);
+}
+
static int shut_it_down(int stat) {
osrfHashFree(stateful_session_cache);
buffer_free(stdin_buf);
if (c == '\n') { // end of current message
+ if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+ osrfLogError(OSRF_LOG_MARK,
+ "WS message exceeded MAX_MESSAGE_SIZE, discarding");
+ rebuild_stdin_buffer();
+ return;
+ }
+
if (stdin_buf->n_used > 0) {
relay_stdin_message(stdin_buf->buf);
- buffer_reset(stdin_buf);
+
+ if (stdin_buf->n_used >= RESET_MESSAGE_SIZE) {
+ // Current message is large. Rebuild the buffer
+ // to free the excess memory.
+ rebuild_stdin_buffer();
+
+ } else {
+
+ // Reset the buffer and carry on.
+ buffer_reset(stdin_buf);
+ }
}
return;
} else {
+
+ if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
+ // Message exceeds max message size.
+ // Continue reading and discarding data.
+ continue;
+ }
+
// Add the char to our current message buffer
buffer_add_char(stdin_buf, c);
}
const char *service = NULL;
const char *thread = NULL;
const char *log_xid = NULL;
- char *tmp_recip = NULL;
char *msg_body = NULL;
char *recipient = NULL;
// 'recipient' will be freed in extract_inbound_messages
// during a DISCONNECT call. Retain a local copy.
- tmp_recip = strdup(recipient);
+ recipient = strdup(recipient);
- msg_body = extract_inbound_messages(service, thread, recipient, osrf_msg);
+ msg_body = extract_inbound_messages(service, thread, osrf_msg);
osrfLogInternal(OSRF_LOG_MARK,
"WS relaying inbound message: %s", msg_body);
transport_message *tmsg = message_init(
- msg_body, NULL, thread, tmp_recip, NULL);
+ msg_body, NULL, thread, recipient, NULL);
- free(tmp_recip);
+ free(recipient);
message_set_osrf_xid(tmsg, osrfLogGetXid());
}
static char* extract_inbound_messages(
- const char* service,
- const char* thread,
- const char* recipient,
- const jsonObject *osrf_msg) {
+ const char* service, const char* thread, const jsonObject *osrf_msg) {
int i;
int num_msgs = osrf_msg->size;
switch (msg->m_type) {
+ case CONNECT:
+ break;
+
case REQUEST:
log_request(service, msg);
break;
break;
default:
- osrfLogError(OSRF_LOG_MARK,
- "WS received unexpected message type");
+ osrfLogError(OSRF_LOG_MARK, "WS received unexpected message "
+ "type from WebSocket client: %d", msg->m_type);
break;
}
}