#include "opensrf/osrfConfig.h"
#define MAX_THREAD_SIZE 64
-#define RECIP_BUF_SIZE 128
+#define RECIP_BUF_SIZE 256
#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
+// maximun number of active, CONNECTed opensrf sessions allowed. in
+// practice, this number will be very small, rarely reaching double
+// digits. This is just a security back-stop. A client trying to open
+// this many connections is almost certainly attempting to DOS the
+// gateway / server. We may want to lower this further.
+#define MAX_ACTIVE_STATEFUL_SESSIONS 128
// default values, replaced during setup (below) as needed.
static char* config_file = "/openils/conf/opensrf_core.xml";
static char* config_ctxt = "gateway";
-static time_t idle_timeout_interval = 60;
+
+static time_t idle_timeout_interval = 120;
static time_t idle_check_interval = 5;
static time_t last_activity_time = 0;
+// Generally, we do not disconnect the client (as idle) if there is a
+// request in flight. However, we need to have an upper bound on the
+// amount of time we will wait for in-flight requests to complete to
+// avoid leaving an effectively idle connection open after a request
+// died on the backend and no response was received.
+// Note that if other activity occurs while a long-running request
+// is active, the wait time will get reset with each new activity.
+// This is OK, though, because the goal of max_request_wait_time
+// is not to chop requests off at the knees, it's to allow the client
+// to timeout as idle when only a single long-running request is active
+// and preventing timeout.
+static time_t max_request_wait_time = 600;
+
+// Incremented with every REQUEST, decremented with every COMPLETE.
+// Gives us a rough picture of the number of reqests we've sent to
+// the server vs. the number for which a completed response has been
+// received.
+static int requests_in_flight = 0;
+
// true if we've received a signal to start graceful shutdown
static int shutdown_requested = 0;
static void sigusr1_handler(int sig);
* map internally means the caller never need know about
* internal XMPP addresses and the server doesn't have to
* verify caller-specified recipient addresses. It's
- * all managed internally.
+ * all managed internally. This is only used for stateful
+ * (CONNECT'ed) session. Stateless sessions need not
+ * track the recipient, since they are one-off calls.
*/
- apr_hash_t *session_cache;
+ apr_hash_t *stateful_session_cache;
/**
- * session_pool contains the key/value pairs stored in
- * the session_cache. The pool is regularly destroyed
+ * stateful_session_pool contains the key/value pairs stored in
+ * the stateful_session_cache. The pool is regularly destroyed
* and re-created to avoid long-term memory consumption
*/
- apr_pool_t *session_pool;
+ apr_pool_t *stateful_session_pool;
/**
* Thread responsible for collecting responses on the opensrf
static void clear_cached_recipient(const char* thread) {
apr_pool_t *pool = NULL;
+ request_rec *r = trans->server->request(trans->server);
- if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
+ if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
// remove it from the hash
- apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
-
- if (apr_hash_count(trans->session_cache) == 0) {
- osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
-
- // memory accumulates in the session_pool as sessions are cached then
- // un-cached. Un-caching removes strings from the hash, but not the
- // pool itself. That only happens when the pool is destroyed. Here
- // we destroy the session pool to clear any lingering memory, then
- // re-create it for future caching.
- apr_pool_destroy(trans->session_pool);
-
- if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
- trans->session_pool = NULL;
- return;
- }
-
- trans->session_pool = pool;
+ apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL);
+
+ if (apr_hash_count(trans->stateful_session_cache) == 0) {
+ osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool");
+
+ // memory accumulates in the stateful_session_pool as
+ // sessions are cached then un-cached. Un-caching removes
+ // strings from the hash, but not from the pool. Clear the
+ // pool here. note: apr_pool_clear does not free memory, it
+ // reclaims it for use again within the pool. This is more
+ // effecient than freeing and allocating every time.
+ apr_pool_clear(trans->stateful_session_pool);
}
}
}
the correct recipient. */
if (one_msg && one_msg->m_type == STATUS) {
+ if (one_msg->status_code == OSRF_STATUS_OK) {
- // only cache recipients if the client is still connected
- if (trans->client_connected &&
- one_msg->status_code == OSRF_STATUS_OK) {
-
- if (!apr_hash_get(trans->session_cache,
+ if (!apr_hash_get(trans->stateful_session_cache,
tmsg->thread, APR_HASH_KEY_STRING)) {
- osrfLogDebug(OSRF_LOG_MARK,
- "WS caching sender thread=%s, sender=%s",
- tmsg->thread, tmsg->sender);
+ apr_size_t ses_size =
+ apr_hash_count(trans->stateful_session_cache);
- apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, tmsg->thread),
- APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, tmsg->sender));
+ if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) {
+
+ osrfLogDebug(OSRF_LOG_MARK, "WS caching sender "
+ "thread=%s, sender=%s; concurrent=%d",
+ tmsg->thread, tmsg->sender, ses_size);
+
+ apr_hash_set(trans->stateful_session_cache,
+ apr_pstrdup(trans->stateful_session_pool, tmsg->thread),
+ APR_HASH_KEY_STRING,
+ apr_pstrdup(trans->stateful_session_pool, tmsg->sender));
+
+ } else {
+ osrfLogWarning(OSRF_LOG_MARK,
+ "WS max concurrent sessions (%d) reached. "
+ "Current session will not be tracked",
+ MAX_ACTIVE_STATEFUL_SESSIONS
+ );
+ }
}
} else {
// connection timed out; clear the cached recipient
- // regardless of whether the client is still connected
- if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+ if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
clear_cached_recipient(tmsg->thread);
+
+ } else {
+ if (one_msg->status_code == OSRF_STATUS_COMPLETE)
+ requests_in_flight--;
+ }
}
}
}
// newly created osrfList. We only need to free the list and
// the individual osrfMessage's will be freed along with it
osrfListFree(msg_list);
-
- if (!trans->client_connected) {
-
- osrfLogInfo(OSRF_LOG_MARK,
- "WS discarding response for thread=%s", tmsg->thread);
-
- return;
- }
- // client is still connected.
// relay the response messages to the client
jsonObject *msg_wrapper = NULL;
char *msg_string = NULL;
}
/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+
+ transport_message *tmsg;
+ while (1) {
+
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+ return NULL;
+ }
+
+ // wait for a response
+ tmsg = client_recv(osrf_handle, -1);
+
+ if (!tmsg) continue; // interrupt
+
+ if (trans->client_connected) {
+
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return NULL;
+ }
+
+ osrfLogForceXid(tmsg->osrf_xid);
+ osrf_responder_thread_main_body(tmsg);
+ last_activity_time = time(NULL);
+ }
+
+ message_free(tmsg);
+ }
+
+ return NULL;
+}
+
+static int active_connection_count() {
+
+ if (requests_in_flight) {
+
+ time_t now = time(NULL);
+ time_t difference = now - last_activity_time;
+
+ if (difference >= max_request_wait_time) {
+ osrfLogWarning(OSRF_LOG_MARK,
+ "%d In-flight request(s) took longer than %d seconds "
+ "to complete. Treating request as dead and moving on.",
+ requests_in_flight,
+ max_request_wait_time
+ );
+ requests_in_flight = 0;
+ }
+ }
+
+ return requests_in_flight;
+}
+
+/**
* Sleep and regularly wake to see if the process has been idle for too
* long. If so, send a disconnect to the client.
- *
*/
void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
apr_thread_t *thread, void *data) {
// During graceful shtudown, we may wait up to
// idle_check_interval seconds before initiating shutdown.
sleep(sleep_time);
-
+
if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
return NULL;
continue;
}
- // do we have any active conversations with the connected client?
- int active_count = apr_hash_count(trans->session_cache);
+ // do we have any active stateful conversations with the client?
+ int active_count = active_connection_count();
if (active_count) {
time_t difference = now - last_activity_time;
osrfLogDebug(OSRF_LOG_MARK,
- "WS has been idle for %d seconds", difference);
+ "WS connection idle for %d seconds", difference);
if (difference < idle_timeout_interval) {
// Last activity occurred within the idle timeout interval.
return NULL;
}
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+static int build_startup_data(const WebSocketServer *server) {
- transport_message *tmsg;
- while (1) {
+ apr_pool_t *main_pool = NULL;
+ apr_pool_t *stateful_session_pool = NULL;
+ apr_thread_t *thread = NULL;
+ apr_threadattr_t *thread_attr = NULL;
+ apr_thread_mutex_t *mutex = NULL;
+ request_rec *r = server->request(server);
- if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
- return NULL;
- }
+ // create a pool for our translator data
+ // Do not use r->pool as the parent, since r->pool will be freed
+ // when the current client disconnects.
+ if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
+ return 1;
+ }
- // wait for a response
- tmsg = client_recv(osrf_handle, -1);
+ trans = (osrfWebsocketTranslator*)
+ apr_palloc(main_pool, sizeof(osrfWebsocketTranslator));
- if (!tmsg) continue; // early exit on interrupt
+ if (trans == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
+ return 1;
+ }
- if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
- return NULL;
- }
+ trans->server = server;
+ trans->main_pool = main_pool;
+ trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
+ trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
- osrfLogForceXid(tmsg->osrf_xid);
- osrf_responder_thread_main_body(tmsg);
- message_free(tmsg);
- last_activity_time = time(NULL);
+ // opensrf session / recipient cache
+ trans->stateful_session_cache = apr_hash_make(trans->main_pool);
+ if (trans->stateful_session_cache == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+ return 1;
}
- return NULL;
-}
+ // opensrf session / recipient string pool; cleared regularly
+ // the only data entering this pools are the session strings.
+ if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
+ return NULL;
+ }
+ trans->stateful_session_pool = stateful_session_pool;
+
+ if (apr_thread_mutex_create(
+ &mutex, APR_THREAD_MUTEX_UNNESTED,
+ trans->main_pool) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+ return 1;
+ }
+ trans->mutex = mutex;
+
+ // responder thread
+ if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+ (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+ (apr_thread_create(&thread, thread_attr,
+ osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+ trans->responder_thread = thread;
+
+ } else {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
+ return 1;
+ }
+
+ // idle timeout thread
+ thread = NULL; // reset
+ thread_attr = NULL; // reset
+ if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+ (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+ (apr_thread_create(&thread, thread_attr,
+ osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+ osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
+ trans->idle_timeout_thread = thread;
+
+ } else {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
+ return 1;
+ }
+ return APR_SUCCESS;
+}
/**
* session cache and session pool.
*/
int child_init(const WebSocketServer *server) {
-
- apr_pool_t *pool = NULL;
- apr_thread_t *thread = NULL;
- apr_threadattr_t *thread_attr = NULL;
- apr_thread_mutex_t *mutex = NULL;
request_rec *r = server->request(server);
-
// osrf_handle will already be connected if this is not the first request
// served by this process.
if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
"WS: timeout set to %d", idle_timeout_interval);
+ timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME");
+ if (timeout) {
+ if (!atoi(timeout)) {
+ ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
+ "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s",
+ timeout
+ );
+ } else {
+ max_request_wait_time = (time_t) atoi(timeout);
+ }
+ }
+
+ ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+ "WS: max request wait time set to %d", max_request_wait_time);
+
char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
if (interval) {
if (!atoi(interval)) {
osrf_handle = osrfSystemGetTransportClient();
}
- // create a standalone pool for our translator data
- if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
- return 1;
- }
-
- // allocate our static translator instance
- trans = (osrfWebsocketTranslator*)
- apr_palloc(pool, sizeof(osrfWebsocketTranslator));
-
- if (trans == NULL) {
- osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator");
- return 1;
- }
-
- trans->main_pool = pool;
- trans->server = server;
- trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
- trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
-
- trans->session_cache = apr_hash_make(pool);
-
- if (trans->session_cache == NULL) {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
- return 1;
- }
-
- // Create the responder thread. Once created,
- // it runs for the lifetime of this process.
- if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
- (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
- (apr_thread_create(&thread, thread_attr,
- osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
-
- trans->responder_thread = thread;
-
- } else {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread");
- return 1;
- }
-
- // Create the idle timeout thread, which lives for the lifetime
- // of the process.
- thread = NULL; // reset
- thread_attr = NULL; // reset
- if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
- (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
- (apr_thread_create(&thread, thread_attr,
- osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
-
- osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
- trans->idle_timeout_thread = thread;
-
- } else {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
- return 1;
- }
-
-
- if (apr_thread_mutex_create(
- &mutex, APR_THREAD_MUTEX_UNNESTED,
- trans->main_pool) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
- return 1;
- }
-
- trans->mutex = mutex;
-
signal(SIGUSR1, sigusr1_handler);
-
return APR_SUCCESS;
}
*/
void* CALLBACK on_connect_handler(const WebSocketServer *server) {
request_rec *r = server->request(server);
- apr_pool_t *pool;
- apr_thread_t *thread = NULL;
- apr_threadattr_t *thread_attr = NULL;
- const char* client_ip = get_client_ip(r);
- osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+ if (!trans) { // first connection
- if (!trans) {
- // first connection
- if (child_init(server) != APR_SUCCESS) {
+ // connect to opensrf
+ if (child_init(server) != APR_SUCCESS)
return NULL;
- }
- }
- // create a standalone pool for the session cache values
- // this pool will be destroyed and re-created regularly
- // to clear session memory
- if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
- osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
- return NULL;
+ // build pools, thread data, and the translator
+ if (build_startup_data(server) != APR_SUCCESS)
+ return NULL;
}
- trans->session_pool = pool;
- trans->client_connected = 1;
- last_activity_time = time(NULL);
+ const char* client_ip = get_client_ip(r);
+ osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
+ last_activity_time = time(NULL);
+ trans->client_connected = 1;
return trans;
}
}
osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
buffer_free(act);
+ requests_in_flight++;
break;
}
// since clients can provide their own threads at session start time,
// the presence of a thread does not guarantee a cached recipient
recipient = (char*) apr_hash_get(
- trans->session_cache, thread, APR_HASH_KEY_STRING);
+ trans->stateful_session_cache, thread, APR_HASH_KEY_STRING);
if (recipient) {
osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient);
free(msg_body);
last_activity_time = time(NULL);
-
return OK;
}
void CALLBACK on_disconnect_handler(
void *data, const WebSocketServer *server) {
- osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
+ // if the threads wake up during disconnect, this tells
+ // them to go back to sleep.
trans->client_connected = 0;
- // timeout thread is recreated w/ each new connection
- apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
- trans->idle_timeout_thread = NULL;
-
- // ensure no errant session data is sticking around
- apr_hash_clear(trans->session_cache);
-
- // strictly speaking, this pool will get destroyed when
- // r->pool is destroyed, but it doesn't hurt to explicitly
- // destroy it ourselves.
- apr_pool_destroy(trans->session_pool);
- trans->session_pool = NULL;
-
request_rec *r = server->request(server);
-
osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r));
-}
-
-/**
- * Be nice and clean up our mess
- */
-void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
- if (trans) {
- apr_thread_exit(trans->responder_thread, APR_SUCCESS);
- apr_thread_mutex_destroy(trans->mutex);
- if (trans->session_pool)
- apr_pool_destroy(trans->session_pool);
- apr_pool_destroy(trans->main_pool);
- }
- trans = NULL;
+ // Clear any lingering session data
+ // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free
+ // the memory, but since there is a limit to the size of the pool
+ // (max_concurrent_sessions), the memory cannot grow unbounded,
+ // so there's no need.
+ apr_hash_clear(trans->stateful_session_cache);
+ apr_pool_clear(trans->stateful_session_pool);
}
static WebSocketPlugin osrf_websocket_plugin = {
sizeof(WebSocketPlugin),
WEBSOCKET_PLUGIN_VERSION_0,
- on_destroy_handler,
+ NULL, // on_destroy_handler
on_connect_handler,
on_message_handler,
on_disconnect_handler