+/*
+Copyright (C) 2018 King County Library Service
+Bill Erickson <berickxx@gmail.com>
+
+Code borrows heavily from osrf_websocket_translator.c
+
+This program is free software; you can redistribute it and/or
+modify it under the terms of the GNU General Public License
+as published by the Free Software Foundation; either version 2
+of the License, or (at your option) any later version.
+
+This program is distributed in the hope that it will be useful,
+but WITHOUT ANY WARRANTY; without even the implied warranty of
+MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+GNU General Public License for more details.
+*/
+
#include <stdio.h>
#include <unistd.h>
#include <string.h>
#include <opensrf/utils.h>
+#include <opensrf/osrf_hash.h>
#include <opensrf/socket_bundle.h>
#include <opensrf/transport_client.h>
#include <opensrf/osrf_message.h>
#include <opensrf/utils.h>
#include <opensrf/log.h>
+#define MAX_THREAD_SIZE 64
+#define RECIP_BUF_SIZE 256
+#define WEBSOCKET_INGRESS "ws-translator-v2"
+
+// maximun number of active, CONNECTed opensrf sessions allowed. in
+// practice, this number will be very small, rarely reaching double
+// digits. This is just a security back-stop. A client trying to open
+// this many connections is almost certainly attempting to DOS the
+// gateway / server. We may want to lower this further.
+#define MAX_ACTIVE_STATEFUL_SESSIONS 128
+
+// default values, replaced during setup (below) as needed.
+static char* config_file = "/openils/conf/opensrf_core.xml"; // TODO
+static char* config_ctxt = "gateway";
+static char* osrf_router = NULL;
+static char* osrf_domain = NULL;
+static osrfHash* stateful_session_cache = NULL;
+static char* client_ip;
+
+static time_t idle_timeout_interval = 120;
+static time_t idle_check_interval = 5;
+static time_t last_activity_time = 0;
+
+// Generally, we do not disconnect the client (as idle) if there is a
+// request in flight. However, we need to have an upper bound on the
+// amount of time we will wait for in-flight requests to complete to
+// avoid leaving an effectively idle connection open after a request
+// died on the backend and no response was received.
+// Note that if other activity occurs while a long-running request
+// is active, the wait time will get reset with each new activity.
+// This is OK, though, because the goal of max_request_wait_time
+// is not to chop requests off at the knees, it's to allow the client
+// to timeout as idle when only a single long-running request is active
+// and preventing timeout.
+static time_t max_request_wait_time = 600;
+
+// Incremented with every REQUEST, decremented with every COMPLETE.
+// Gives us a rough picture of the number of reqests we've sent to
+// the server vs. the number for which a completed response has been
+// received.
+static int requests_in_flight = 0;
+
+static growing_buffer* stdin_buf = NULL;
+static transport_client* osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+
static void child_init();
static void read_from_stdin();
+static void relay_stdin_message(const char*);
+static char* extract_inbound_messages();
static void read_from_osrf();
-static growing_buffer* stdin_buf = NULL;
-static transport_client* osrf_handle = NULL;
-char* osrf_config = "/openils/conf/opensrf_core.xml"; // TODO
int main() {
- // Reusable stdin reading buffer.
+
+ // stdin buffer is cleared with each read and reused.
stdin_buf = buffer_init(512);
- child_init(); // exits on error
+ // Connect to OpenSRF
+ // exits on error.
+ child_init();
// Disable output buffering.
setbuf(stdout, NULL);
+ // The main loop waits for data to be available on both STDIN
+ // (inbound websocket data) and the OpenSRF XMPP socket (outbound
+ // replies).
+
fd_set fds;
int stdin_no = fileno(stdin);
int osrf_no = osrf_handle->session->sock_id;
int maxfd = osrf_no > stdin_no ? osrf_no : stdin_no;
+ int sel_resp;
while (1) {
FD_ZERO(&fds);
- FD_SET(osrf_no, &fds);
- FD_SET(stdin_no, &fds);
+ FD_SET(osrf_no, &fds);
+ FD_SET(stdin_no, &fds);
+
+ sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
- select(maxfd + 1, &fds, NULL, NULL, NULL);
+ if (sel_resp < 0) {
+
+ if (errno == EINTR) {
+ // Interrupted by a signal. Start the loop over.
+ continue;
+ }
+
+ if (errno == EBADF) {
+ osrfLogError(OSRF_LOG_MARK,
+ "WS exiting on bad file descriptor");
+
+ } else {
+ osrfLogError(OSRF_LOG_MARK,
+ "WS exiting on irrecoverable select() error");
+ }
+
+ // No way to fix this one, exit program.
+ exit(1);
+ }
if (FD_ISSET(stdin_no, &fds)) {
read_from_stdin();
}
}
+ osrfHashFree(stateful_session_cache);
buffer_free(stdin_buf);
+ osrf_system_shutdown();
return 0;
}
// Connect to opensrf
static void child_init() {
- if (!osrf_system_bootstrap_client(osrf_config, "gateway") ) {
+ if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) {
fprintf(stderr, "Cannot boostrap OSRF\n");
exit(1);
}
osrf_handle = osrfSystemGetTransportClient();
- osrfAppSessionSetIngress("osrf-websocket-stdio");
+ osrfAppSessionSetIngress(WEBSOCKET_INGRESS);
+
+ osrf_router = osrfConfigGetValue(NULL, "/router_name");
+ osrf_domain = osrfConfigGetValue(NULL, "/domain");
+ stateful_session_cache = osrfNewHash();
+
+ client_ip = getenv("REMOTE_ADDR");
+ osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+
+ // TODO read timeout ENV vars
}
+
// Relay messages from STDIN to OpenSRF
-// Reads one message then exits
+// Reads one message then returns
static void read_from_stdin() {
char c;
while ( (c = getchar()) ) {
+ if (c == EOF) {
+ osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect");
+ exit(0);
+ }
+
if (c == '\n') { // end of current message
if (stdin_buf->n_used > 0) {
- printf("Echoing: %s\n", stdin_buf->buf);
+ relay_stdin_message(stdin_buf->buf);
buffer_reset(stdin_buf);
}
}
}
+static void relay_stdin_message(const char* msg_string) {
+
+ jsonObject *msg_wrapper = NULL; // free me
+ const jsonObject *tmp_obj = NULL;
+ const jsonObject *osrf_msg = NULL;
+ const char *service = NULL;
+ const char *thread = NULL;
+ const char *log_xid = NULL;
+ char *msg_body = NULL;
+ char *recipient = NULL;
+
+ // generate a new log trace for this request. it
+ // may be replaced by a client-provided trace below.
+ osrfLogMkXid();
+
+ osrfLogInternal(OSRF_LOG_MARK, "WS received inbound message: %s", msg_string);
+
+ msg_wrapper = jsonParse(msg_string);
+
+ if (msg_wrapper == NULL) {
+ osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", msg_string);
+ return;
+ }
+
+ osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
+
+ if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) )
+ service = jsonObjectGetString(tmp_obj);
+
+ if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "thread")) )
+ thread = jsonObjectGetString(tmp_obj);
+
+ if ( (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "log_xid")) )
+ log_xid = jsonObjectGetString(tmp_obj);
+
+ if (log_xid) {
+
+ // use the caller-provide log trace id
+ if (strlen(log_xid) > MAX_THREAD_SIZE) {
+ osrfLogWarning(OSRF_LOG_MARK, "WS log_xid exceeds max length");
+ return;
+ }
+
+ osrfLogForceXid(log_xid);
+ }
+
+ if (thread) {
+
+ if (strlen(thread) > MAX_THREAD_SIZE) {
+ osrfLogWarning(OSRF_LOG_MARK, "WS thread exceeds max length");
+ return;
+ }
+
+ // since clients can provide their own threads at session start time,
+ // the presence of a thread does not guarantee a cached recipient
+ recipient = (char*) osrfHashGet(stateful_session_cache, thread);
+
+ if (recipient) {
+ osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
+ }
+ }
+
+ if (!recipient) {
+
+ if (service) {
+ int size = snprintf(recipient_buf, RECIP_BUF_SIZE - 1,
+ "%s@%s/%s", osrf_router, osrf_domain, service);
+ recipient_buf[size] = '\0';
+ recipient = recipient_buf;
+
+ } else {
+ osrfLogWarning(OSRF_LOG_MARK, "WS Unable to determine recipient");
+ return;
+ }
+ }
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS relaying message to opensrf thread=%s, recipient=%s",
+ thread, recipient);
+
+ msg_body = extract_inbound_messages(
+ service, thread, recipient, osrf_msg);
+
+ osrfLogInternal(OSRF_LOG_MARK,
+ "WS relaying inbound message: %s", msg_body);
+
+ transport_message *tmsg = message_init(
+ msg_body, NULL, thread, recipient, NULL);
+
+ message_set_osrf_xid(tmsg, osrfLogGetXid());
+
+ if (client_send_message(osrf_handle, tmsg) != 0) {
+ osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting");
+ exit(1);
+ }
+
+ osrfLogClearXid();
+ message_free(tmsg);
+ jsonObjectFree(msg_wrapper);
+ free(msg_body);
+
+ last_activity_time = time(NULL);
+}
+
+static char* extract_inbound_messages(
+ const char* service,
+ const char* thread,
+ const char* recipient,
+ const jsonObject *osrf_msg) {
+
+ int i;
+ int num_msgs = osrf_msg->size;
+ osrfMessage* msg;
+ osrfMessage* msg_list[num_msgs];
+ char* recipToFree = NULL;
+
+ // here we do an extra json round-trip to get the data
+ // in a form osrf_message_deserialize can understand
+ // TODO: consider a version of osrf_message_init which can
+ // accept a jsonObject* instead of a JSON string.
+ char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+ osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+ free(osrf_msg_json);
+
+ // should we require the caller to always pass the service?
+ if (service == NULL) service = "";
+
+ for(i = 0; i < num_msgs; i++) {
+ msg = msg_list[i];
+ osrfMessageSetIngress(msg, WEBSOCKET_INGRESS);
+
+ switch (msg->m_type) {
+
+ case REQUEST: {
+ const jsonObject* params = msg->_params;
+ growing_buffer* act = buffer_init(128);
+ char* method = msg->method_name;
+ buffer_fadd(act, "[%s] [%s] %s %s",
+ client_ip, "", service, method);
+
+ const jsonObject* obj = NULL;
+ int i = 0;
+ const char* str;
+ int redactParams = 0;
+ while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+ if(!strncmp(method, str, strlen(str))) {
+ redactParams = 1;
+ break;
+ }
+ }
+ if(redactParams) {
+ OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+ } else {
+ i = 0;
+ while((obj = jsonObjectGetIndex(params, i++))) {
+ char* str = jsonObjectToJSON(obj);
+ if( i == 1 )
+ OSRF_BUFFER_ADD(act, " ");
+ else
+ OSRF_BUFFER_ADD(act, ", ");
+ OSRF_BUFFER_ADD(act, str);
+ free(str);
+ }
+ }
+ osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+ buffer_free(act);
+ requests_in_flight++;
+ break;
+ }
+
+ case DISCONNECT:
+ recipToFree = (char*) osrfHashRemove(stateful_session_cache, thread);
+ if (recipToFree != NULL) {
+ free(recipToFree);
+ }
+
+ break;
+
+ default:
+ osrfLogError(OSRF_LOG_MARK,
+ "WS received unexpected message type");
+ break;
+ }
+ }
+
+ char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
+
+ // clean up our messages
+ for(i = 0; i < num_msgs; i++)
+ osrfMessageFree(msg_list[i]);
+
+ return finalMsg;
+}
+
+
// Relay messages from OpenSRF to STDIN
// Relays all available messages
static void read_from_osrf() {
+ osrfList *msg_list = NULL;
+ osrfMessage *one_msg = NULL;
+ char* recipToFree = NULL;
+ int i;
+
+ // At this point we know the osrf socket has data, read
+ // up to one message with no waiting.
+ transport_message* tmsg = client_recv(osrf_handle, 0);
+
+ if (!tmsg) {
+ // tmsg can only be NULL if the underlying select() call is
+ // interrupted or the jabber socket connection was severed.
+
+ if (client_connected(osrf_handle) &&
+ socket_connected(osrf_handle->session->sock_id)) {
+ return; // back to main select loop
+ }
+
+ // Socket connection was broken. Send disconnect to client,
+ // causing on_disconnect_handler to run and cleanup.
+ osrfLogWarning(OSRF_LOG_MARK,
+ "WS: Jabber socket disconnected, exiting");
+
+ exit(1);
+ }
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS received opensrf response for thread=%s", tmsg->thread);
+
+ // first we need to perform some maintenance
+ msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+ for (i = 0; i < msg_list->size; i++) {
+ one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS returned response of type %d", one_msg->m_type);
+
+ /* if our client just successfully connected to an opensrf service,
+ cache the sender so that future calls on this thread will use
+ the correct recipient. */
+ if (one_msg && one_msg->m_type == STATUS) {
+
+ if (one_msg->status_code == OSRF_STATUS_OK) {
+
+ if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
+
+ unsigned long ses_size =
+ osrfHashGetCount(stateful_session_cache);
+
+ if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
+
+ osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
+ "thread=%s, sender=%s; concurrent=%d",
+ tmsg->thread, tmsg->sender, ses_size);
+
+ char* sender = strdup(tmsg->sender); // free in *Remove
+ osrfHashSet(stateful_session_cache, sender, tmsg->thread);
+
+ } else {
+ osrfLogWarning(OSRF_LOG_MARK,
+ "WS max concurrent sessions (%d) reached. "
+ "Current session will not be tracked",
+ MAX_ACTIVE_STATEFUL_SESSIONS
+ );
+ }
+ }
+
+ } else {
+
+ // connection timed out; clear the cached recipient
+ if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+ recipToFree =
+ (char*) osrfHashRemove(stateful_session_cache, tmsg->thread);
+ if (recipToFree != NULL) {
+ free(recipToFree);
+ }
+
+ } else {
+ if (one_msg->status_code == OSRF_STATUS_COMPLETE)
+ requests_in_flight--;
+ }
+ }
+ }
+ }
+
+ // osrfMessageDeserialize applies the freeItem handler to the
+ // newly created osrfList. We only need to free the list and
+ // the individual osrfMessage's will be freed along with it
+ osrfListFree(msg_list);
+
+ // relay the response messages to the client
+ jsonObject *msg_wrapper = NULL;
+ char *msg_string = NULL;
+
+ // build the wrapper object
+ msg_wrapper = jsonNewObject(NULL);
+ jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+ jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+ jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+ if (tmsg->is_error) {
+ osrfLogError(OSRF_LOG_MARK,
+ "WS received jabber error message in response to thread=%s",
+ tmsg->thread);
+ jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+ }
+
+ msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+ // Send the JSON to STDOUT
+ printf("%s\n", msg_string);
+
+ free(msg_string);
+ jsonObjectFree(msg_wrapper);
}
+
+
+