-/*
-Copyright (C) 2018 King County Library Service
-Bill Erickson <berickxx@gmail.com>
-
-Code borrows heavily from osrf_websocket_translator.c
-
-This program is free software; you can redistribute it and/or
-modify it under the terms of the GNU General Public License
-as published by the Free Software Foundation; either version 2
-of the License, or (at your option) any later version.
-
-This program is distributed in the hope that it will be useful,
-but WITHOUT ANY WARRANTY; without even the implied warranty of
-MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-GNU General Public License for more details.
-*/
+/* --------------------------------------------------------------------
+ * Copyright (C) 2018 King County Library Service
+ * Bill Erickson <berickxx@gmail.com>
+ *
+ * Code borrows heavily from osrf_websocket_translator.c
+ *
+ * This program is free software; you can redistribute it and/or
+ * modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation; either version 2
+ * of the License, or (at your option) any later version.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+--------------------------------------------------------------------- */
+
+/**
+ * OpenSRF Websockets Relay
+ *
+ * Reads Websockets requests on STDIN
+ * Sends replies to requests on STDOUT
+ *
+ * Built to function with websocketd:
+ * https://github.com/joewalnes/websocketd
+ */
#include <stdio.h>
#include <unistd.h>
#include <signal.h>
#include <opensrf/utils.h>
#include <opensrf/osrf_hash.h>
-#include <opensrf/socket_bundle.h>
#include <opensrf/transport_client.h>
#include <opensrf/osrf_message.h>
#include <opensrf/osrf_app_session.h>
-#include <opensrf/utils.h>
#include <opensrf/log.h>
#define MAX_THREAD_SIZE 64
// digits. This is just a security back-stop. A client trying to open
// this many connections is almost certainly attempting to DOS the
// gateway / server. We may want to lower this further.
-#define MAX_ACTIVE_STATEFUL_SESSIONS 128
+#define MAX_ACTIVE_STATEFUL_SESSIONS 64
// default values, replaced during setup (below) as needed.
static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
static char* config_ctxt = "gateway";
static char* osrf_router = NULL;
static char* osrf_domain = NULL;
+
+// Cache of opensrf thread strings and back-end receipients.
+// Tracking this here means the caller only needs to track/know the thread.
static osrfHash* stateful_session_cache = NULL;
-static char* client_ip = NULL;
static growing_buffer* stdin_buf = NULL;
static transport_client* osrf_handle = NULL;
static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+static char* client_ip = NULL;
static void child_init();
static void read_from_stdin();
static void relay_stdin_message(const char*);
static char* extract_inbound_messages();
+static void log_request(const char*, osrfMessage*);
static void read_from_osrf();
static void read_one_osrf_message(transport_message*);
static int shut_it_down(int);
+static void release_hash_string(char*, void*);
static void sigint_handler(int sig) {
osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
osrf_router = osrfConfigGetValue(NULL, "/router_name");
osrf_domain = osrfConfigGetValue(NULL, "/domain");
+
stateful_session_cache = osrfNewHash();
+ osrfHashSetCallback(stateful_session_cache, release_hash_string);
client_ip = getenv("REMOTE_ADDR");
osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
}
+// Called by osrfHash when a string is removed. We strdup each
+// string before it goes into the hash.
+static void release_hash_string(char* key, void* str) {
+ if (str == NULL) return;
+ free((char*) str);
+}
+
// Relay messages from STDIN to OpenSRF
-// Reads one message then returns to allow for response to intermingle
+// Reads one message then returns to allow for responses to intermingle
// with a long series of requests.
static void read_from_stdin() {
char char_buf[1];
char c;
+ // Read one char at a time so we can stop at the first newline
+ // and leave any other data on the wire until read_from_stdin()
+ // is called again.
+
while (1) {
int stat = read(fileno(stdin), char_buf, 1);
return;
} else {
-
+ // Add the char to our current message buffer
buffer_add_char(stdin_buf, c);
}
}
int num_msgs = osrf_msg->size;
osrfMessage* msg;
osrfMessage* msg_list[num_msgs];
- char* recipToFree = NULL;
// here we do an extra json round-trip to get the data
// in a form osrf_message_deserialize can understand
// should we require the caller to always pass the service?
if (service == NULL) service = "";
- for(i = 0; i < num_msgs; i++) {
+ for (i = 0; i < num_msgs; i++) {
msg = msg_list[i];
osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
switch (msg->m_type) {
- case REQUEST: {
- const jsonObject* params = msg->_params;
- growing_buffer* act = buffer_init(128);
- char* method = msg->method_name;
- buffer_fadd(act, "[%s] [%s] %s %s",
- client_ip, "", service, method);
-
- const jsonObject* obj = NULL;
- int i = 0;
- const char* str;
- int redactParams = 0;
- while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
- if(!strncmp(method, str, strlen(str))) {
- redactParams = 1;
- break;
- }
- }
- if(redactParams) {
- OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
- } else {
- i = 0;
- while((obj = jsonObjectGetIndex(params, i++))) {
- char* str = jsonObjectToJSON(obj);
- if( i == 1 )
- OSRF_BUFFER_ADD(act, " ");
- else
- OSRF_BUFFER_ADD(act, ", ");
- OSRF_BUFFER_ADD(act, str);
- free(str);
- }
- }
- osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
- buffer_free(act);
+ case REQUEST:
+ log_request(service, msg);
break;
- }
case DISCONNECT:
- recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread);
- if (recipToFree != NULL) {
- free(recipToFree);
- }
-
+ osrfHashRemove(stateful_session_cache, thread);
break;
default:
char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
// clean up our messages
- for(i = 0; i < num_msgs; i++)
+ for (i = 0; i < num_msgs; i++)
osrfMessageFree(msg_list[i]);
return finalMsg;
}
+static void log_request(const char* service, osrfMessage* msg) {
+
+ const jsonObject* params = msg->_params;
+ growing_buffer* act = buffer_init(128);
+ char* method = msg->method_name;
+ const jsonObject* obj = NULL;
+ int i = 0;
+ const char* str;
+ int redactParams = 0;
+
+ buffer_fadd(act, "[%s] [%s] %s %s", client_ip, "", service, method);
+
+ while ( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+ if (!strncmp(method, str, strlen(str))) {
+ redactParams = 1;
+ break;
+ }
+ }
+
+ if (redactParams) {
+ OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+ } else {
+ i = 0;
+ while ((obj = jsonObjectGetIndex(params, i++))) {
+ char* str = jsonObjectToJSON(obj);
+ if (i == 1)
+ OSRF_BUFFER_ADD(act, " ");
+ else
+ OSRF_BUFFER_ADD(act, ", ");
+ OSRF_BUFFER_ADD(act, str);
+ free(str);
+ }
+ }
+
+ osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+ buffer_free(act);
+}
+
// Relay messages from OpenSRF to STDIN
static void read_one_osrf_message(transport_message* tmsg) {
osrfList *msg_list = NULL;
osrfMessage *one_msg = NULL;
- char* recipToFree = NULL;
int i;
osrfLogDebug(OSRF_LOG_MARK,
// connection timed out; clear the cached recipient
if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
- recipToFree =
- (char*) osrfHashRemove(stateful_session_cache, tmsg->thread);
- if (recipToFree != NULL) {
- free(recipToFree);
- }
+ osrfHashRemove(stateful_session_cache, tmsg->thread);
}
}
}