* "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...] // required
* }
*
- * Each translator operates with two threads. One thread receives messages
+ * Each translator operates with three threads. One thread receives messages
* from the websocket client, translates, and relays them to the opensrf
* network. The second thread collects responses from the opensrf network and
- * relays them back to the websocket client.
+ * relays them back to the websocket client. The third thread inspects
+ * the idle timeout interval t see if it's time to drop the idle client.
*
- * After the initial setup, all thread actions occur within a thread mutex.
- * The desired affect is a non-threaded application that uses threads for
- * the sole purpose of having one thread listening for incoming data, while
- * a second thread listens for responses. When either thread awakens, it's
- * the only thread in town until it goes back to sleep (i.e. listening on
+ * After the initial setup, all thread actions occur within a thread
+ * mutex. The desired affect is a non-threaded application that uses
+ * threads for the sole purpose of having one thread listening for
+ * incoming data, while a second thread listens for responses, and a
+ * third checks the idle timeout. When any thread awakens, it's the
+ * only thread in town until it goes back to sleep (i.e. listening on
* its socket for data).
*
- * Note that with a "thread", which allows us to identify the opensrf session,
- * the caller does not need to provide a recipient address. The "service" is
- * only required to start a new opensrf session. After the sesession is
- * started, all future communication is based solely on the thread. However,
- * the "service" should be passed by the caller for all requests to ensure it
- * is properly logged in the activity log.
+ * Note that with the opensrf "thread", which allows us to identify the
+ * opensrf session, the caller does not need to provide a recipient
+ * address. The "service" is only required to start a new opensrf
+ * session. After the sesession is started, all future communication is
+ * based solely on the thread. However, the "service" should be passed
+ * by the caller for all requests to ensure it is properly logged in the
+ * activity log.
+ *
+ * Every inbound and outbound message updates the last_activity_time.
+ * A separate thread wakes periodically to see if the time since the
+ * last_activity_time exceeds the configured idle_timeout_interval. If
+ * so, a disconnect is sent to the client, completing the conversation.
+ *
+ * Configuration goes directly into the Apache envvars file.
+ * (e.g. /etc/apache2-websockets/envvars). As of today, it's not
+ * possible to leverage Apache configuration directives directly,
+ * since this is not an Apache module, but a shared library loaded
+ * by an apache module. This includes SetEnv / SetEnvIf.
+ *
+ * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
+ * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
+ * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
+ * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
*/
/**
* down for graceful disconnects.
*/
+#include <stdlib.h>
+#include <signal.h>
+#include <unistd.h>
#include "httpd.h"
+#include "http_log.h"
#include "apr_strings.h"
#include "apr_thread_proc.h"
#include "apr_hash.h"
#define RECIP_BUF_SIZE 128
#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
+
+// default values, replaced during setup (below) as needed.
+static char* config_file = "/openils/conf/opensrf_core.xml";
+static char* config_ctxt = "gateway";
+static time_t idle_timeout_interval = 300;
+static time_t idle_check_interval = 5;
+static time_t last_activity_time = 0;
+
+// true if we've received a signal to start graceful shutdown
+static int shutdown_requested = 0;
+static void sigusr1_handler(int sig);
+static void sigusr1_handler(int sig) {
+ shutdown_requested = 1;
+ signal(SIGUSR1, sigusr1_handler);
+ osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
+}
+
typedef struct _osrfWebsocketTranslator {
/** Our handle for communicating with the caller */
apr_thread_t *responder_thread;
/**
+ * Thread responsible for checking inactivity timeout.
+ * If no activitity occurs within the configured interval,
+ * a disconnect is sent to the client and the connection
+ * is terminated.
+ */
+ apr_thread_t *idle_timeout_thread;
+
+ /**
* All message handling code is wrapped in a thread mutex such
* that all actions (after the initial setup) are serialized
* to minimize the possibility of multi-threading snafus.
}
}
-
void* osrf_responder_thread_main_body(transport_message *tmsg) {
osrfList *msg_list = NULL;
free(msg_string);
jsonObjectFree(msg_wrapper);
+}
+
+/**
+ * Sleep and regularly wake to see if the process has been idle for too
+ * long. If so, send a disconnect to the client.
+ *
+ */
+void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
+ apr_thread_t *thread, void *data) {
+
+ // sleep time defaults to the check interval, but may
+ // be shortened during shutdown.
+ int sleep_time = idle_check_interval;
+ int shutdown_loops = 0;
+
+ while (1) {
+
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+ return NULL;
+ }
+
+ // note: receiving a signal (e.g. SIGUSR1) will not interrupt
+ // this sleep(), since it's running within its own thread.
+ // During graceful shtudown, we may wait up to
+ // idle_check_interval seconds before initiating shutdown.
+ sleep(sleep_time);
+
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return NULL;
+ }
+
+ // no client is connected. reset sleep time go back to sleep.
+ if (!trans->client_connected) {
+ sleep_time = idle_check_interval;
+ continue;
+ }
+
+ // do we have any active conversations with the connected client?
+ int active_count = apr_hash_count(trans->session_cache);
+
+ if (active_count) {
+
+ if (shutdown_requested) {
+ // active conversations means we can't shut down.
+ // shorten the check interval to re-check more often.
+ shutdown_loops++;
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS: %d active conversation(s) found in shutdown after "
+ "%d attempts. Sleeping...", shutdown_loops, active_count
+ );
+
+ if (shutdown_loops > 30) {
+ // this is clearly a long-running conversation, let's
+ // check less frequently to avoid excessive logging.
+ sleep_time = 3;
+ } else {
+ sleep_time = 1;
+ }
+ }
+
+ // active conversations means keep going. There's no point in
+ // checking the idle time (below) if we're mid-conversation
+ continue;
+ }
+
+ // no active conversations
+
+ if (shutdown_requested) {
+ // there's no need to reset the shutdown vars (loops/requested)
+ // SIGUSR1 is Apaches reload signal, which means this process
+ // will be going away as soon as the client is disconnected.
+
+ osrfLogInfo(OSRF_LOG_MARK,
+ "WS: no active conversations remain in shutdown; "
+ "closing client connection");
+
+ } else {
+ // see how long we've been idle. If too long, kick the client
+
+ time_t now = time(NULL);
+ time_t difference = now - last_activity_time;
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS has been idle for %d seconds", difference);
+
+ if (difference < idle_timeout_interval) {
+ // Last activity occurred within the idle timeout interval.
+ continue;
+ }
+
+ // idle timeout exceeded
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS: idle timeout exceeded. now=%d / last=%d; "
+ "closing client connection", now, last_activity_time);
+ }
+
+
+ // send a disconnect to the client, which will come back around
+ // to cause our on_disconnect_handler to run.
+ osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
+ trans->server->close(trans->server);
+
+ // client will be going away, reset sleep time
+ sleep_time = idle_check_interval;
+ }
+ // should never get here
+ return NULL;
}
/**
osrfLogForceXid(tmsg->osrf_xid);
osrf_responder_thread_main_body(tmsg);
message_free(tmsg);
+ last_activity_time = time(NULL);
}
return NULL;
apr_threadattr_t *thread_attr = NULL;
apr_thread_mutex_t *mutex = NULL;
request_rec *r = server->request(server);
-
- osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
+
// osrf_handle will already be connected if this is not the first request
// served by this process.
if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
- char* config_file = "/openils/conf/opensrf_core.xml";
- char* config_ctx = "gateway"; //TODO config
- if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {
+
+ // load config values from the env
+ char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
+ if (timeout) {
+ if (!atoi(timeout)) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
+ } else {
+ idle_timeout_interval = (time_t) atoi(timeout);
+ }
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "WS: timeout set to %d", idle_timeout_interval);
+
+ char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
+ if (interval) {
+ if (!atoi(interval)) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s",
+ interval
+ );
+ } else {
+ idle_check_interval = (time_t) atoi(interval);
+ }
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "WS: idle check interval set to %d", idle_check_interval);
+
+
+ char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
+ if (cfile) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "WS: config file set to %s", cfile);
+ config_file = cfile;
+ }
+
+ char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
+ if (ctxt) {
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "WS: config context set to %s", ctxt);
+ config_ctxt = ctxt;
+ }
+
+ // connect to opensrf
+ if (!osrfSystemBootstrapClientResc(
+ config_file, config_ctxt, "websocket")) {
+
osrfLogError(OSRF_LOG_MARK,
- "WS unable to bootstrap OpenSRF client with config %s", config_file);
+ "WS unable to bootstrap OpenSRF client with config %s "
+ "and context %s", config_file, config_ctxt
+ );
return 1;
}
return 1;
}
-
// allocate our static translator instance
trans = (osrfWebsocketTranslator*)
apr_palloc(pool, sizeof(osrfWebsocketTranslator));
return 1;
}
+ // Create the idle timeout thread, which lives for the lifetime
+ // of the process.
+ thread = NULL; // reset
+ thread_attr = NULL; // reset
+ if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+ (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+ (apr_thread_create(&thread, thread_attr,
+ osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+ osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
+ trans->idle_timeout_thread = thread;
+
+ } else {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
+ return 1;
+ }
+
+
if (apr_thread_mutex_create(
&mutex, APR_THREAD_MUTEX_UNNESTED,
trans->main_pool) != APR_SUCCESS) {
trans->mutex = mutex;
+ signal(SIGUSR1, sigusr1_handler);
+
return APR_SUCCESS;
}
void* CALLBACK on_connect_handler(const WebSocketServer *server) {
request_rec *r = server->request(server);
apr_pool_t *pool;
+ apr_thread_t *thread = NULL;
+ apr_threadattr_t *thread_attr = NULL;
- osrfLogInfo(OSRF_LOG_MARK,
- "WS connect from %s", r->connection->remote_ip);
- //"WS connect from %s", r->connection->client_ip); // apache 2.4
+#ifdef APACHE_MIN_24
+ char* client_ip = r->connection->client_ip;
+#else
+ char* client_ip = r->connection->remote_ip;
+#endif
+
+ osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
if (!trans) {
// first connection
trans->session_pool = pool;
trans->client_connected = 1;
+ last_activity_time = time(NULL);
+
return trans;
}
jsonObjectFree(msg_wrapper);
free(msg_body);
+ last_activity_time = time(NULL);
+
return OK;
}
osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
trans->client_connected = 0;
+
+ // timeout thread is recreated w/ each new connection
+ apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
+ trans->idle_timeout_thread = NULL;
// ensure no errant session data is sticking around
apr_hash_clear(trans->session_cache);
if (trans) {
apr_thread_exit(trans->responder_thread, APR_SUCCESS);
apr_thread_mutex_destroy(trans->mutex);
- apr_pool_destroy(trans->session_pool);
+ if (trans->session_pool)
+ apr_pool_destroy(trans->session_pool);
apr_pool_destroy(trans->main_pool);
}