From b0f28941a39f1bf509dc7a8abda6d47b3c631774 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Wed, 12 Mar 2014 11:39:22 -0400 Subject: [PATCH] LP#1268619: websockets : more gateway cleanup and config options Signed-off-by: Bill Erickson --- README.websockets | 9 +- src/gateway/osrf_websocket_translator.c | 381 +++++++++++++++++++------------- 2 files changed, 230 insertions(+), 160 deletions(-) diff --git a/README.websockets b/README.websockets index 69a56c0..15f38b1 100644 --- a/README.websockets +++ b/README.websockets @@ -23,10 +23,11 @@ Websockets installation instructions for Debian # OPTIONAL: add these configuration variables to # /etc/apache2-websockets/envvars and adjust as needed. -# export OSRF_WEBSOCKET_IDLE_TIMEOUT=60 +# export OSRF_WEBSOCKET_IDLE_TIMEOUT=120 # export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5 # export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml # export OSRF_WEBSOCKET_CONFIG_CTXT=gateway +# export OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME=600 # # IDLE_TIMEOUT specifies how long we will allow a client to stay connected # while idle. A longer timeout means less network traffic (from fewer @@ -36,6 +37,12 @@ Websockets installation instructions for Debian # IDLE_CHECK_INTERVAL specifies how often we wake to check the idle status # of the connected client. # +# MAX_REQUEST_WAIT_TIME is the maximum amount of time the gateway will +# wait before declaring a client as idle when there is a long-running +# outstanding request, yet no other activity is occurring. This is +# primarily a fail-safe to allow idle timeouts when one or more requests +# died on the server, and thus no response was ever delivered to the gateway. +# # Both specified in seconds # # CONFIG_FILE / CTXT are the standard opensrf core config options. diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index be643e5..ef8d4af 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -85,17 +85,43 @@ #include "opensrf/osrfConfig.h" #define MAX_THREAD_SIZE 64 -#define RECIP_BUF_SIZE 128 +#define RECIP_BUF_SIZE 256 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" +// maximun number of active, CONNECTed opensrf sessions allowed. in +// practice, this number will be very small, rarely reaching double +// digits. This is just a security back-stop. A client trying to open +// this many connections is almost certainly attempting to DOS the +// gateway / server. We may want to lower this further. +#define MAX_ACTIVE_STATEFUL_SESSIONS 128 // default values, replaced during setup (below) as needed. static char* config_file = "/openils/conf/opensrf_core.xml"; static char* config_ctxt = "gateway"; -static time_t idle_timeout_interval = 60; + +static time_t idle_timeout_interval = 120; static time_t idle_check_interval = 5; static time_t last_activity_time = 0; +// Generally, we do not disconnect the client (as idle) if there is a +// request in flight. However, we need to have an upper bound on the +// amount of time we will wait for in-flight requests to complete to +// avoid leaving an effectively idle connection open after a request +// died on the backend and no response was received. +// Note that if other activity occurs while a long-running request +// is active, the wait time will get reset with each new activity. +// This is OK, though, because the goal of max_request_wait_time +// is not to chop requests off at the knees, it's to allow the client +// to timeout as idle when only a single long-running request is active +// and preventing timeout. +static time_t max_request_wait_time = 600; + +// Incremented with every REQUEST, decremented with every COMPLETE. +// Gives us a rough picture of the number of reqests we've sent to +// the server vs. the number for which a completed response has been +// received. +static int requests_in_flight = 0; + // true if we've received a signal to start graceful shutdown static int shutdown_requested = 0; static void sigusr1_handler(int sig); @@ -130,16 +156,18 @@ typedef struct _osrfWebsocketTranslator { * map internally means the caller never need know about * internal XMPP addresses and the server doesn't have to * verify caller-specified recipient addresses. It's - * all managed internally. + * all managed internally. This is only used for stateful + * (CONNECT'ed) session. Stateless sessions need not + * track the recipient, since they are one-off calls. */ - apr_hash_t *session_cache; + apr_hash_t *stateful_session_cache; /** - * session_pool contains the key/value pairs stored in - * the session_cache. The pool is regularly destroyed + * stateful_session_pool contains the key/value pairs stored in + * the stateful_session_cache. The pool is regularly destroyed * and re-created to avoid long-term memory consumption */ - apr_pool_t *session_pool; + apr_pool_t *stateful_session_pool; /** * Thread responsible for collecting responses on the opensrf @@ -183,30 +211,23 @@ static void clear_cached_recipient(const char* thread) { apr_pool_t *pool = NULL; request_rec *r = trans->server->request(trans->server); - if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { + if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) { osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect"); // remove it from the hash - apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL); - - if (apr_hash_count(trans->session_cache) == 0) { - osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool"); - - // memory accumulates in the session_pool as sessions are cached then - // un-cached. Un-caching removes strings from the hash, but not the - // pool itself. That only happens when the pool is destroyed. Here - // we destroy the session pool to clear any lingering memory, then - // re-create it for future caching. - apr_pool_destroy(trans->session_pool); - - if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); - trans->session_pool = NULL; - return; - } - - trans->session_pool = pool; + apr_hash_set(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING, NULL); + + if (apr_hash_count(trans->stateful_session_cache) == 0) { + osrfLogDebug(OSRF_LOG_MARK, "WS re-setting stateful_session_pool"); + + // memory accumulates in the stateful_session_pool as + // sessions are cached then un-cached. Un-caching removes + // strings from the hash, but not from the pool. Clear the + // pool here. note: apr_pool_clear does not free memory, it + // reclaims it for use again within the pool. This is more + // effecient than freeing and allocating every time. + apr_pool_clear(trans->stateful_session_pool); } } } @@ -234,30 +255,44 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { the correct recipient. */ if (one_msg && one_msg->m_type == STATUS) { + if (one_msg->status_code == OSRF_STATUS_OK) { - // only cache recipients if the client is still connected - if (trans->client_connected && - one_msg->status_code == OSRF_STATUS_OK) { - - if (!apr_hash_get(trans->session_cache, + if (!apr_hash_get(trans->stateful_session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { - osrfLogDebug(OSRF_LOG_MARK, - "WS caching sender thread=%s, sender=%s", - tmsg->thread, tmsg->sender); + apr_size_t ses_size = + apr_hash_count(trans->stateful_session_cache); + + if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, tmsg->sender)); + osrfLogDebug(OSRF_LOG_MARK, "WS caching sender " + "thread=%s, sender=%s; concurrent=%d", + tmsg->thread, tmsg->sender, ses_size); + + apr_hash_set(trans->stateful_session_cache, + apr_pstrdup(trans->stateful_session_pool, tmsg->thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->stateful_session_pool, tmsg->sender)); + + } else { + osrfLogWarning(OSRF_LOG_MARK, + "WS max concurrent sessions (%d) reached. " + "Current session will not be tracked", + MAX_ACTIVE_STATEFUL_SESSIONS + ); + } } } else { // connection timed out; clear the cached recipient - // regardless of whether the client is still connected - if (one_msg->status_code == OSRF_STATUS_TIMEOUT) + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { clear_cached_recipient(tmsg->thread); + + } else { + if (one_msg->status_code == OSRF_STATUS_COMPLETE) + requests_in_flight--; + } } } } @@ -266,16 +301,7 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { // newly created osrfList. We only need to free the list and // the individual osrfMessage's will be freed along with it osrfListFree(msg_list); - - if (!trans->client_connected) { - - osrfLogInfo(OSRF_LOG_MARK, - "WS discarding response for thread=%s", tmsg->thread); - - return; - } - // client is still connected. // relay the response messages to the client jsonObject *msg_wrapper = NULL; char *msg_string = NULL; @@ -341,11 +367,30 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat return NULL; } +static int active_connection_count() { + + if (requests_in_flight) { + + time_t now = time(NULL); + time_t difference = now - last_activity_time; + + if (difference >= max_request_wait_time) { + osrfLogWarning(OSRF_LOG_MARK, + "%d In-flight request(s) took longer than %d seconds " + "to complete. Treating request as dead and moving on.", + requests_in_flight, + max_request_wait_time + ); + requests_in_flight = 0; + } + } + + return requests_in_flight; +} /** * Sleep and regularly wake to see if the process has been idle for too * long. If so, send a disconnect to the client. - * */ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( apr_thread_t *thread, void *data) { @@ -379,8 +424,8 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( continue; } - // do we have any active conversations with the connected client? - int active_count = apr_hash_count(trans->session_cache); + // do we have any active stateful conversations with the client? + int active_count = active_connection_count(); if (active_count) { @@ -425,7 +470,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( time_t difference = now - last_activity_time; osrfLogDebug(OSRF_LOG_MARK, - "WS has been idle for %d seconds", difference); + "WS connection idle for %d seconds", difference); if (difference < idle_timeout_interval) { // Last activity occurred within the idle timeout interval. @@ -452,17 +497,97 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( return NULL; } +static int build_startup_data(const WebSocketServer *server) { + + apr_pool_t *main_pool = NULL; + apr_pool_t *stateful_session_pool = NULL; + apr_thread_t *thread = NULL; + apr_threadattr_t *thread_attr = NULL; + apr_thread_mutex_t *mutex = NULL; + request_rec *r = server->request(server); + + // create a pool for our translator data + // Do not use r->pool as the parent, since r->pool will be freed + // when the current client disconnects. + if (apr_pool_create(&main_pool, NULL) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); + return 1; + } + + trans = (osrfWebsocketTranslator*) + apr_palloc(main_pool, sizeof(osrfWebsocketTranslator)); + + if (trans == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); + return 1; + } + + trans->server = server; + trans->main_pool = main_pool; + trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); + trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); + + // opensrf session / recipient cache + trans->stateful_session_cache = apr_hash_make(trans->main_pool); + if (trans->stateful_session_cache == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); + return 1; + } + + // opensrf session / recipient string pool; cleared regularly + // the only data entering this pools are the session strings. + if (apr_pool_create(&stateful_session_pool, trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); + return NULL; + } + trans->stateful_session_pool = stateful_session_pool; + + if (apr_thread_mutex_create( + &mutex, APR_THREAD_MUTEX_UNNESTED, + trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); + return 1; + } + trans->mutex = mutex; + + // responder thread + if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && + (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && + (apr_thread_create(&thread, thread_attr, + osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { + + trans->responder_thread = thread; + + } else { + osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); + return 1; + } + + // idle timeout thread + thread = NULL; // reset + thread_attr = NULL; // reset + if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && + (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && + (apr_thread_create(&thread, thread_attr, + osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { + + osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread"); + trans->idle_timeout_thread = thread; + + } else { + osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread"); + return 1; + } + + return APR_SUCCESS; +} + /** * Connect to OpenSRF, create the main pool, responder thread * session cache and session pool. */ int child_init(const WebSocketServer *server) { - - apr_pool_t *pool = NULL; - apr_thread_t *thread = NULL; - apr_threadattr_t *thread_attr = NULL; - apr_thread_mutex_t *mutex = NULL; request_rec *r = server->request(server); // osrf_handle will already be connected if this is not the first request @@ -483,6 +608,21 @@ int child_init(const WebSocketServer *server) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS: timeout set to %d", idle_timeout_interval); + timeout = getenv("OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME"); + if (timeout) { + if (!atoi(timeout)) { + ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, + "WS: invalid OSRF_WEBSOCKET_MAX_REQUEST_WAIT_TIME: %s", + timeout + ); + } else { + max_request_wait_time = (time_t) atoi(timeout); + } + } + + ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, + "WS: max request wait time set to %d", max_request_wait_time); + char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL"); if (interval) { if (!atoi(interval)) { @@ -528,73 +668,6 @@ int child_init(const WebSocketServer *server) { osrf_handle = osrfSystemGetTransportClient(); } - // create a pool for our translator data - // Do not use r->pool as the parent, since r->pool will be freed - // when this client disconnects. - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return 1; - } - - // allocate our static translator instance - trans = (osrfWebsocketTranslator*) - apr_palloc(pool, sizeof(osrfWebsocketTranslator)); - - if (trans == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create translator"); - return 1; - } - - trans->main_pool = pool; - trans->server = server; - trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); - trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - - trans->session_cache = apr_hash_make(pool); - if (trans->session_cache == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); - return 1; - } - - if (apr_thread_mutex_create( - &mutex, APR_THREAD_MUTEX_UNNESTED, - trans->main_pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); - return 1; - } - trans->mutex = mutex; - - // Create the responder thread. Once created, - // it runs for the lifetime of this process. - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_responder_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - trans->responder_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create responder thread"); - return 1; - } - - // Create the idle timeout thread, which lives for the lifetime - // of the process. - thread = NULL; // reset - thread_attr = NULL; // reset - if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && - (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && - (apr_thread_create(&thread, thread_attr, - osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) { - - osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread"); - trans->idle_timeout_thread = thread; - - } else { - osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread"); - return 1; - } - signal(SIGUSR1, sigusr1_handler); return APR_SUCCESS; } @@ -604,34 +677,23 @@ int child_init(const WebSocketServer *server) { */ void* CALLBACK on_connect_handler(const WebSocketServer *server) { request_rec *r = server->request(server); - apr_pool_t *pool; - apr_thread_t *thread = NULL; - apr_threadattr_t *thread_attr = NULL; - const char* client_ip = get_client_ip(r); - osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); + if (!trans) { // first connection - if (!trans) { - // first connection - if (child_init(server) != APR_SUCCESS) { + // connect to opensrf + if (child_init(server) != APR_SUCCESS) return NULL; - } - } - // create a pool for the session cache values - // this pool will be destroyed and re-created regularly - // to clear session memory - // Use r->pool as the parent since this pool only lives to serve - // a single connection. - if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); - return NULL; + // build pools, thread data, and the translator + if (build_startup_data(server) != APR_SUCCESS) + return NULL; } - trans->session_pool = pool; - trans->client_connected = 1; - last_activity_time = time(NULL); + const char* client_ip = get_client_ip(r); + osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip); + last_activity_time = time(NULL); + trans->client_connected = 1; return trans; } @@ -706,6 +768,7 @@ static char* extract_inbound_messages( } osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); buffer_free(act); + requests_in_flight++; break; } @@ -797,7 +860,7 @@ static size_t on_message_handler_body(void *data, // since clients can provide their own threads at session start time, // the presence of a thread does not guarantee a cached recipient recipient = (char*) apr_hash_get( - trans->session_cache, thread, APR_HASH_KEY_STRING); + trans->stateful_session_cache, thread, APR_HASH_KEY_STRING); if (recipient) { osrfLogDebug(OSRF_LOG_MARK, "WS found cached recipient %s", recipient); @@ -841,7 +904,6 @@ static size_t on_message_handler_body(void *data, free(msg_body); last_activity_time = time(NULL); - return OK; } @@ -871,19 +933,20 @@ static size_t CALLBACK on_message_handler(void *data, void CALLBACK on_disconnect_handler( void *data, const WebSocketServer *server) { - request_rec *r = server->request(server); - osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); - + // if the threads wake up during disconnect, this tells + // them to go back to sleep. trans->client_connected = 0; - // ensure no errant session data is sticking around - apr_hash_clear(trans->session_cache); + request_rec *r = server->request(server); + osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); - // strictly speaking, this pool will get destroyed when - // r->pool is destroyed, but it doesn't hurt to explicitly - // destroy it ourselves. - apr_pool_destroy(trans->session_pool); - trans->session_pool = NULL; + // Clear any lingering session data + // NOTE: we could apr_pool_destroy the stateful_session_pool to truly free + // the memory, but since there is a limit to the size of the pool + // (max_concurrent_sessions), the memory cannot grow unbounded, + // so there's no need. + apr_hash_clear(trans->stateful_session_cache); + apr_pool_clear(trans->stateful_session_pool); } static WebSocketPlugin osrf_websocket_plugin = { -- 2.11.0