/* --------------------------------------------------------------------
* Copyright (C) 2018 King County Library Service
* Bill Erickson <berickxx@gmail.com>
- *
+ *
* Code borrows heavily from osrf_websocket_translator.c
- *
+ *
* This program is free software; you can redistribute it and/or
* modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation; either version 2
* of the License, or (at your option) any later version.
- *
+ *
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// practice, this number will be very small, rarely reaching double
// digits. This is just a security back-stop. A client trying to open
// this many connections is almost certainly attempting to DOS the
-// gateway / server.
+// gateway / server.
#define MAX_ACTIVE_STATEFUL_SESSIONS 64
// Message exceeding this size are discarded.
// ~10M
#define MAX_MESSAGE_SIZE 10485760
-// After processing any message this size or larger, free and
+// After processing any message this size or larger, free and
// recreate the stdin buffer to release the memory.
// ~100k
#define RESET_MESSAGE_SIZE 102400
// default values, replaced during setup (below) as needed.
-static char* config_file = "/openils/conf/opensrf_core.xml";
+static char* config_file = "/openils/conf/opensrf_core.xml";
static char* config_ctxt = "gateway";
static char* osrf_router = NULL;
static char* osrf_domain = NULL;
// Cache of opensrf thread strings and back-end receipients.
-// Tracking this here means the caller only needs to track/know the thread.
+// Tracking this here means the caller only needs to track the thread.
+// It also means we don't have to expose internal XMPP IDs
static osrfHash* stateful_session_cache = NULL;
+// Message on STDIN go into our reusable buffer
static growing_buffer* stdin_buf = NULL;
+// OpenSRF XMPP connection handle
static transport_client* osrf_handle = NULL;
-static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+// Reusable string buf for recipient addresses
+static char recipient_buf[RECIP_BUF_SIZE];
+// Websocket client IP address (for logging)
static char* client_ip = NULL;
static void rebuild_stdin_buffer();
static int shut_it_down(int);
static void release_hash_string(char*, void*);
-static void sigint_handler(int sig) {
+// Websocketd sends SIGINT for shutdown, followed by SIGTERM
+// if SIGINT takes too long.
+static void sigint_handler(int sig) {
osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown");
shut_it_down(0);
}
int main(int argc, char* argv[]) {
- // Handle shutdown signal
+ // Handle shutdown signal -- only needed once.
signal(SIGINT, sigint_handler);
- rebuild_stdin_buffer();
-
- // Connect to OpenSRF
- // exits on error.
+ // Connect to OpenSR -- exits on error
child_init(argc, argv);
// Disable output buffering.
setbuf(stdout, NULL);
+ rebuild_stdin_buffer();
// The main loop waits for data to be available on both STDIN
- // (inbound websocket data) and the OpenSRF XMPP socket (outbound
- // replies).
-
+ // (websocket client request) and the OpenSRF XMPP socket
+ // (replies returning to the websocket client).
fd_set fds;
int stdin_no = fileno(stdin);
int osrf_no = osrf_handle->session->sock_id;
FD_SET(osrf_no, &fds);
FD_SET(stdin_no, &fds);
+ // Wait indefinitely for activity to process
sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL);
- if (sel_resp < 0) {
+ if (sel_resp < 0) { // error
if (errno == EINTR) {
// Interrupted by a signal. Start the loop over.
continue;
}
- if (errno == EBADF) {
- osrfLogError(OSRF_LOG_MARK,
- "WS exiting on bad file descriptor");
-
- } else {
- osrfLogError(OSRF_LOG_MARK,
- "WS select() failed with [%s]. Exiting", strerror(errno));
- }
+ osrfLogError(OSRF_LOG_MARK,
+ "WS select() failed with [%s]. Exiting", strerror(errno));
- // No way to fix this one, get outta here.
shut_it_down(1);
}
static int shut_it_down(int stat) {
osrfHashFree(stateful_session_cache);
buffer_free(stdin_buf);
- osrf_system_shutdown();
+ osrf_system_shutdown(); // clean XMPP disconnect
exit(stat);
return stat;
}
-// Connect to opensrf
+// Connect to OpenSRF/XMPP
+// Apply settings and command line args.
static void child_init(int argc, char* argv[]) {
if (argc > 1) {
}
-// Relay messages from STDIN to OpenSRF
-// Reads one message then returns to allow for responses to intermingle
-// with a long series of requests.
+// Relay websocket client messages from STDIN to OpenSRF. Reads one
+// message then returns, allowing responses to intermingle with long
+// series of requests.
static void read_from_stdin() {
char char_buf[1];
char c;
if (errno == EAGAIN) {
// read interrupted. Return to main loop to resume.
+ // Returning here will leave any in-progress message in
+ // the stdin_buf. We return to the main select loop
+ // to confirm we really have more data to read and to
+ // perform additional error checking on the stream.
return;
}
- osrfLogError(OSRF_LOG_MARK,
+ // All other errors reading STDIN are considered fatal.
+ osrfLogError(OSRF_LOG_MARK,
"WS STDIN read failed with [%s]. Exiting", strerror(errno));
shut_it_down(1);
return;
if (c == '\n') { // end of current message
if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
- osrfLogError(OSRF_LOG_MARK,
+ osrfLogError(OSRF_LOG_MARK,
"WS message exceeded MAX_MESSAGE_SIZE, discarding");
rebuild_stdin_buffer();
return;
} else {
if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) {
- // Message exceeds max message size.
- // Continue reading and discarding data.
+ // Message exceeds max message size. Continue reading
+ // and discarding data. NOTE: don't reset stdin_buf
+ // here becase we check n_used again once reading is done.
continue;
}
}
}
+// Relays a single websocket request to the OpenSRF/XMPP network.
static void relay_stdin_message(const char* msg_string) {
jsonObject *msg_wrapper = NULL; // free me
free(msg_body);
}
+// Turn the OpenSRF message JSON into a set of osrfMessage's for
+// analysis, ingress application, and logging.
static char* extract_inbound_messages(
const char* service, const char* thread, const jsonObject *osrf_msg) {
// here we do an extra json round-trip to get the data
// in a form osrf_message_deserialize can understand
- // TODO: consider a version of osrf_message_init which can
+ // TODO: consider a version of osrf_message_init which can
// accept a jsonObject* instead of a JSON string.
char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs);
// clean up our messages
- for (i = 0; i < num_msgs; i++)
+ for (i = 0; i < num_msgs; i++)
osrfMessageFree(msg_list[i]);
return finalMsg;
}
+// All REQUESTs are logged as activity.
static void log_request(const char* service, osrfMessage* msg) {
const jsonObject* params = msg->_params;
-// Relay messages from OpenSRF to STDIN
+// Relay response messages from OpenSRF to STDIN
// Relays all available messages
static void read_from_osrf() {
transport_message* tmsg = NULL;
// Double check the socket connection before continuing.
if (!client_connected(osrf_handle) ||
!socket_connected(osrf_handle->session->sock_id)) {
- osrfLogWarning(OSRF_LOG_MARK,
+ osrfLogWarning(OSRF_LOG_MARK,
"WS: Jabber socket disconnected, exiting");
shut_it_down(1);
}
- // Once client_recv is called all data waiting on the socket is read.
- // This means we can't return to the main select() loop after each message,
- // because any subsequent messages will get stuck in the opensrf receive
- // queue. Process all available messages.
+ // Once client_recv is called all data waiting on the socket is
+ // read. This means we can't return to the main select() loop after
+ // each message, because any subsequent messages will get stuck in
+ // the opensrf receive queue. Process all available messages.
while ( (tmsg = client_recv(osrf_handle, 0)) ) {
if (tmsg->is_error) {
- // tmsg here is bounced message, likely the result of a XMPP
- // cancel code 503 (forbidden) from an attempt to send a
- // message to a service that's not available on the public
- // XMPP domain. Recovery not possible.
- osrfLogError(OSRF_LOG_MARK,
+ // tmsg here is likely a bounced message, possibly the result
+ // of a XMPP cancel code 503 (forbidden) from an attempt to
+ // send a message to a service that's not available on the
+ // public XMPP domain. Treat as irrecoverable.
+ osrfLogError(OSRF_LOG_MARK,
"WS XMPP error [%d] occured, exiting", tmsg->error_code);
shut_it_down(1);
return;
}
}
+// Process a single OpenSRF response message and print the reponse
+// to STDOUT for delivery to the websocket client.
static void read_one_osrf_message(transport_message* tmsg) {
osrfList *msg_list = NULL;
osrfMessage *one_msg = NULL;
if (!osrfHashGet(stateful_session_cache, tmsg->thread)) {
- unsigned long ses_size =
+ unsigned long ses_size =
osrfHashGetCount(stateful_session_cache);
if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
// the individual osrfMessage's will be freed along with it
osrfListFree(msg_list);
- // relay the response messages to the client
+ // Pack the response into a websocket wrapper message.
jsonObject *msg_wrapper = NULL;
char *msg_string = NULL;
-
- // build the wrapper object
msg_wrapper = jsonNewObject(NULL);
+
jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
- if (tmsg->is_error) {
- osrfLogError(OSRF_LOG_MARK,
- "WS received jabber error message in response to thread=%s",
- tmsg->thread);
- jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
- }
-
msg_string = jsonObjectToJSONRaw(msg_wrapper);
// Send the JSON to STDOUT
free(msg_string);
jsonObjectFree(msg_wrapper);
-
}
-
-
-
-