LP#1774703 Limit scope of thread locks user/berick/lp1774703-websocket-thread-lock
authorBill Erickson <berickxx@gmail.com>
Fri, 1 Jun 2018 19:44:34 +0000 (15:44 -0400)
committerBill Erickson <berickxx@gmail.com>
Fri, 1 Jun 2018 19:44:36 +0000 (15:44 -0400)
Avoid thread contention issues when running this websockets code:

https://github.com/jchampio/apache-websocket

In general, the scope of the thread locking has been dimished, focused
more closely on keeping blocks of reads/writes to the shared recipient
cache running in a single thread at a time.

Thread locks have also been fully removed from the idle timeout thread.
By the time it modifies any shared data, the process is already shutting
down.

Added a few thread safety checks to avoid (the already unlikely and
maybe not even a problem) multiple translator->close() calls just to be
extra safe.

Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/gateway/osrf_websocket_translator.c

index 75d6876..cbe1104 100644 (file)
@@ -208,7 +208,6 @@ static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 static void clear_cached_recipient(const char* thread) {
-    apr_pool_t *pool = NULL;                                                
     request_rec *r = trans->server->request(trans->server);
 
     if (apr_hash_get(trans->stateful_session_cache, thread, APR_HASH_KEY_STRING)) {
@@ -257,6 +256,12 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
             if (one_msg->status_code == OSRF_STATUS_OK) {
 
+                // Keep shared hash modifications threadsafe
+                if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+                    osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+                    return NULL;
+                }
+
                 if (!apr_hash_get(trans->stateful_session_cache, 
                         tmsg->thread, APR_HASH_KEY_STRING)) {
 
@@ -283,12 +288,29 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
                     }
                 }
 
+                // Done modifying the shared hash
+                if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+                    osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+                    return NULL;
+                }
+
             } else {
 
                 // connection timed out; clear the cached recipient
                 if (one_msg->status_code == OSRF_STATUS_TIMEOUT) {
+
+                    if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+                        osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+                        return NULL;
+                    }
+
                     clear_cached_recipient(tmsg->thread);
 
+                    if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+                        osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+                        return NULL;
+                    }
+
                 } else {
                     if (one_msg->status_code == OSRF_STATUS_COMPLETE)
                         requests_in_flight--;
@@ -339,11 +361,6 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
     transport_message *tmsg;
     while (1) {
 
-        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
-            return NULL;
-        }
-
         // wait indefinitely for a response
         tmsg = client_recv(osrf_handle, -1);
 
@@ -361,17 +378,15 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
             osrfLogWarning(OSRF_LOG_MARK, 
                 "WS: Jabber socket disconnected. Sending close() to client");
 
-            trans->server->close(trans->server);
+            if (trans->client_connected) {
+                trans->client_connected = 0;
+                trans->server->close(trans->server);
+            }
+
             return NULL; // exit thread
         }
 
         if (trans->client_connected) {
-
-            if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
-                osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
-                return NULL;
-            }
-
             osrfLogForceXid(tmsg->osrf_xid);
             osrf_responder_thread_main_body(tmsg);
             last_activity_time = time(NULL);
@@ -418,22 +433,12 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
 
     while (1) {
 
-        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
-            return NULL;
-        }
-
         // note: receiving a signal (e.g. SIGUSR1) will not interrupt
         // this sleep(), since it's running within its own thread.
         // During graceful shtudown, we may wait up to 
         // idle_check_interval seconds before initiating shutdown.
         sleep(sleep_time);
 
-        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
-            return NULL;
-        }
-
         // no client is connected.  reset sleep time go back to sleep.
         if (!trans->client_connected) {
             sleep_time = idle_check_interval;
@@ -503,7 +508,11 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
         // send a disconnect to the client, which will come back around
         // to cause our on_disconnect_handler to run.
         osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
-        trans->server->close(trans->server);
+
+        if (trans->client_connected) {
+            trans->client_connected = 0;
+            trans->server->close(trans->server);
+        }
 
         // client will be going away, reset sleep time
         sleep_time = idle_check_interval;
@@ -949,7 +958,11 @@ static size_t CALLBACK on_message_handler(void *data,
         // will run, clean it all up, and kill the process.
         osrfLogError(OSRF_LOG_MARK,
             "Error relaying message, forcing client disconnect");
-        trans->server->close(trans->server);
+
+        if (trans->client_connected) {
+            trans->client_connected = 0;
+            trans->server->close(trans->server);
+        }
     }
 
     return stat;