From: Bill Erickson Date: Fri, 1 Jun 2018 19:44:34 +0000 (-0400) Subject: LP#1774703 Limit scope of thread locks X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=refs%2Fheads%2Fuser%2Fberick%2Flp1774703-websocket-thread-lock;p=working%2FOpenSRF.git LP#1774703 Limit scope of thread locks Avoid thread contention issues when running this websockets code: https://github.com/jchampio/apache-websocket In general, the scope of the thread locking has been dimished, focused more closely on keeping blocks of reads/writes to the shared recipient cache running in a single thread at a time. Thread locks have also been fully removed from the idle timeout thread. By the time it modifies any shared data, the process is already shutting down. Added a few thread safety checks to avoid (the already unlikely and maybe not even a problem) multiple translator->close() calls just to be extra safe. Signed-off-by: Bill Erickson --- diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 75d6876..cbe1104 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -208,7 +208,6 @@ static transport_client *osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer static void clear_cached_recipient(const char* thread) { - apr_pool_t *pool = NULL; request_rec *r = trans->server->request(trans->server); if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) { @@ -257,6 +256,12 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { if (one_msg->status_code == OSRF_STATUS_OK) { + // Keep shared hash modifications threadsafe + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } + if (!apr_hash_get(trans->stateful_session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { @@ -283,12 +288,29 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { } } + // Done modifying the shared hash + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } + } else { // connection timed out; clear the cached recipient if (one_msg->status_code == OSRF_STATUS_TIMEOUT) { + + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return NULL; + } + clear_cached_recipient(tmsg->thread); + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } + } else { if (one_msg->status_code == OSRF_STATUS_COMPLETE) requests_in_flight--; @@ -339,11 +361,6 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat transport_message *tmsg; while (1) { - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } - // wait indefinitely for a response tmsg = client_recv(osrf_handle, -1); @@ -361,17 +378,15 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat osrfLogWarning(OSRF_LOG_MARK, "WS: Jabber socket disconnected. Sending close() to client"); - trans->server->close(trans->server); + if (trans->client_connected) { + trans->client_connected = 0; + trans->server->close(trans->server); + } + return NULL; // exit thread } if (trans->client_connected) { - - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } - osrfLogForceXid(tmsg->osrf_xid); osrf_responder_thread_main_body(tmsg); last_activity_time = time(NULL); @@ -418,22 +433,12 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( while (1) { - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } - // note: receiving a signal (e.g. SIGUSR1) will not interrupt // this sleep(), since it's running within its own thread. // During graceful shtudown, we may wait up to // idle_check_interval seconds before initiating shutdown. sleep(sleep_time); - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } - // no client is connected. reset sleep time go back to sleep. if (!trans->client_connected) { sleep_time = idle_check_interval; @@ -503,7 +508,11 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( // send a disconnect to the client, which will come back around // to cause our on_disconnect_handler to run. osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client"); - trans->server->close(trans->server); + + if (trans->client_connected) { + trans->client_connected = 0; + trans->server->close(trans->server); + } // client will be going away, reset sleep time sleep_time = idle_check_interval; @@ -949,7 +958,11 @@ static size_t CALLBACK on_message_handler(void *data, // will run, clean it all up, and kill the process. osrfLogError(OSRF_LOG_MARK, "Error relaying message, forcing client disconnect"); - trans->server->close(trans->server); + + if (trans->client_connected) { + trans->client_connected = 0; + trans->server->close(trans->server); + } } return stat;