LP#1268619: websockets: gateway code repairs
authorBill Erickson <berick@esilibrary.com>
Tue, 11 Mar 2014 21:25:19 +0000 (17:25 -0400)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:36 +0000 (16:10 -0400)
* avoid unneccessary and wrong incantation of apr_thread_exit.  The two
  sub-threads now both live for the duration of the process.

* to be safe, create thread mutex before threads

Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index 5b9d607..be643e5 100644 (file)
@@ -181,6 +181,7 @@ static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 static void clear_cached_recipient(const char* thread) {
     apr_pool_t *pool = NULL;                                                
+    request_rec *r = trans->server->request(trans->server);
 
     if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
 
@@ -199,7 +200,7 @@ static void clear_cached_recipient(const char* thread) {
             // re-create it for future caching.
             apr_pool_destroy(trans->session_pool);
     
-            if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+            if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
                 osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
                 trans->session_pool = NULL;
                 return;
@@ -303,6 +304,45 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
 }
 
 /**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the 
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+
+    transport_message *tmsg;
+    while (1) {
+
+        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+            return NULL;
+        }
+
+        // wait for a response
+        tmsg = client_recv(osrf_handle, -1);
+
+        if (!tmsg) continue; // interrupt
+
+        if (trans->client_connected) {
+
+            if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+                osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+                return NULL;
+            }
+
+            osrfLogForceXid(tmsg->osrf_xid);
+            osrf_responder_thread_main_body(tmsg);
+            last_activity_time = time(NULL);
+        }
+
+        message_free(tmsg);                                                         
+    }
+
+    return NULL;
+}
+
+
+/**
  * Sleep and regularly wake to see if the process has been idle for too
  * long.  If so, send a disconnect to the client.
  *
@@ -327,7 +367,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
         // During graceful shtudown, we may wait up to 
         // idle_check_interval seconds before initiating shutdown.
         sleep(sleep_time);
-        
+
         if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
             osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
             return NULL;
@@ -412,41 +452,6 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
     return NULL;
 }
 
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the 
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
-
-    transport_message *tmsg;
-    while (1) {
-
-        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
-            return NULL;
-        }
-
-        // wait for a response
-        tmsg = client_recv(osrf_handle, -1);
-
-        if (!tmsg) continue; // early exit on interrupt
-
-        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
-            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
-            return NULL;
-        }
-
-        osrfLogForceXid(tmsg->osrf_xid);
-        osrf_responder_thread_main_body(tmsg);
-        message_free(tmsg);                                                         
-        last_activity_time = time(NULL);
-    }
-
-    return NULL;
-}
-
-
 
 /**
  * Connect to OpenSRF, create the main pool, responder thread
@@ -460,7 +465,6 @@ int child_init(const WebSocketServer *server) {
     apr_thread_mutex_t *mutex = NULL;
     request_rec *r = server->request(server);
 
-
     // osrf_handle will already be connected if this is not the first request
     // served by this process.
     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
@@ -524,7 +528,9 @@ int child_init(const WebSocketServer *server) {
         osrf_handle = osrfSystemGetTransportClient();
     }
 
-    // create a standalone pool for our translator data
+    // create a pool for our translator data
+    // Do not use r->pool as the parent, since r->pool will be freed
+    // when this client disconnects.
     if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return 1;
@@ -545,12 +551,19 @@ int child_init(const WebSocketServer *server) {
     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
     trans->session_cache = apr_hash_make(pool);
-
     if (trans->session_cache == NULL) {
         osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
         return 1;
     }
 
+    if (apr_thread_mutex_create(
+            &mutex, APR_THREAD_MUTEX_UNNESTED, 
+            trans->main_pool) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+        return 1;
+    }
+    trans->mutex = mutex;
+
     // Create the responder thread.  Once created, 
     // it runs for the lifetime of this process.
     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
@@ -582,18 +595,7 @@ int child_init(const WebSocketServer *server) {
         return 1;
     }
 
-
-    if (apr_thread_mutex_create(
-            &mutex, APR_THREAD_MUTEX_UNNESTED, 
-            trans->main_pool) != APR_SUCCESS) {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
-        return 1;
-    }
-
-    trans->mutex = mutex;
-
     signal(SIGUSR1, sigusr1_handler);
-
     return APR_SUCCESS;
 }
 
@@ -616,9 +618,11 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
         }
     }
 
-    // create a standalone pool for the session cache values
+    // create a pool for the session cache values
     // this pool will be destroyed and re-created regularly
     // to clear session memory
+    // Use r->pool as the parent since this pool only lives to serve
+    // a single connection.
     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
@@ -867,13 +871,11 @@ static size_t CALLBACK on_message_handler(void *data,
 void CALLBACK on_disconnect_handler(
     void *data, const WebSocketServer *server) {
 
-    osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
+    request_rec *r = server->request(server);
+    osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
+
     trans->client_connected = 0;
 
-    // timeout thread is recreated w/ each new connection
-    apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
-    trans->idle_timeout_thread = NULL;
-    
     // ensure no errant session data is sticking around
     apr_hash_clear(trans->session_cache);
 
@@ -882,31 +884,12 @@ void CALLBACK on_disconnect_handler(
     // destroy it ourselves.
     apr_pool_destroy(trans->session_pool);
     trans->session_pool = NULL;
-
-    request_rec *r = server->request(server);
-
-    osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); 
-}
-
-/**
- * Be nice and clean up our mess
- */
-void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
-    if (trans) {
-        apr_thread_exit(trans->responder_thread, APR_SUCCESS);
-        apr_thread_mutex_destroy(trans->mutex);
-        if (trans->session_pool)
-            apr_pool_destroy(trans->session_pool);
-        apr_pool_destroy(trans->main_pool);
-    }
-
-    trans = NULL;
 }
 
 static WebSocketPlugin osrf_websocket_plugin = {
     sizeof(WebSocketPlugin),
     WEBSOCKET_PLUGIN_VERSION_0,
-    on_destroy_handler,
+    NULL, // on_destroy_handler
     on_connect_handler,
     on_message_handler,
     on_disconnect_handler