LP#1268619: websocket : wrap all thread work in mutex
authorBill Erickson <berick@esilibrary.com>
Tue, 4 Dec 2012 22:34:41 +0000 (17:34 -0500)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:34 +0000 (16:10 -0400)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index 2cb66c6..4ce4db2 100644 (file)
@@ -75,6 +75,7 @@ typedef struct _osrfWebsocketTranslator {
     apr_pool_t *session_pool; // child of trans->main_pool; per-session
     apr_hash_t *session_cache; 
     apr_thread_t *responder_thread;
+    apr_thread_mutex_t *mutex;
     int client_connected;
     char* osrf_router;
     char* osrf_domain;
@@ -85,101 +86,115 @@ static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the 
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
-    transport_message *tmsg;
     jsonObject *msg_wrapper;
     osrfList *msg_list;
     osrfMessage *one_msg;
     char *msg_string;
     int i;
 
-    while (1) {
-        // TODO: RELEASE THREAD MUTEX
-
-        tmsg = client_recv(osrf_handle, -1);
-
-        if (!tmsg) continue; // early exit on interrupt
+    // discard responses received after client disconnect
+    if (!trans->client_connected) {
+        osrfLogDebug(OSRF_LOG_MARK, 
+            "WS discarding response for thread=%s, xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+        return; 
+    }
 
-        // TODO: THREAD MUTEX LOCK STARTS
-        
-        // discard responses received after client disconnect
-        if (!trans->client_connected) {
-            osrfLogDebug(OSRF_LOG_MARK, 
-                "WS discarding response for thread=%s, xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-            message_free(tmsg);                                                         
-            continue; 
-        }
+    osrfLogDebug(OSRF_LOG_MARK, 
+        "WS received opensrf response for thread=%s, xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+
+    // build the wrapper object
+    msg_wrapper = jsonNewObject(NULL);
+    jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+    jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+    jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+    if (tmsg->is_error) {
+        fprintf(stderr,  
+            "WS received jabber error message in response to thread=%s and xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+        fflush(stderr);
+        jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+    }
 
-        osrfLogDebug(OSRF_LOG_MARK, 
-            "WS received opensrf response for thread=%s, xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-
-        // build the wrapper object
-        msg_wrapper = jsonNewObject(NULL);
-        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
-        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
-        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
-
-        if (tmsg->is_error) {
-            fprintf(stderr,  
-                "WS received jabber error message in response to thread=%s and xid=%s", 
-                tmsg->thread, tmsg->osrf_xid);
-            fflush(stderr);
-            jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+    msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+    // deliver the wrapped message json to the websocket client
+    trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
+        (unsigned char*) msg_string, strlen(msg_string));
+
+    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+    for (i = 0; i < msg_list->size; i++) {
+        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+        /*  if our client just successfully connected to an opensrf service,
+            cache the sender so that future calls on this thread will use
+            the correct recipient.  
+            TODO: this cache will grow to add one entry per connected client 
+            session.  Even when entries are removed, they are not cleaned up 
+            until the session_pool is destroyed.  We need to ensure that client 
+            sessions don't last too long and/or create a last-touched timeout 
+            mechanism to periodically remove old entries. */
+        if (one_msg && one_msg->m_type == STATUS && 
+                one_msg->status_code == OSRF_STATUS_OK) {
+
+            if (!apr_hash_get(trans->session_cache, 
+                    tmsg->thread, APR_HASH_KEY_STRING)) {
+
+                osrfLogDebug(OSRF_LOG_MARK, 
+                    "WS caching sender thread=%s, sender=%s", 
+                    tmsg->thread, tmsg->sender);
+
+                apr_hash_set(trans->session_cache, 
+                    apr_pstrdup(trans->session_pool, tmsg->thread),
+                    APR_HASH_KEY_STRING, 
+                    apr_pstrdup(trans->session_pool, tmsg->sender));
+            }
         }
+    }
 
-        msg_string = jsonObjectToJSONRaw(msg_wrapper);
+    free(msg_string);
+    osrfListFree(msg_list);
+    jsonObjectFree(msg_wrapper);
+}
 
-        // deliver the wrapped message json to the websocket client
-        trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
-            (unsigned char*) msg_string, strlen(msg_string));
+/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the 
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
 
-        msg_list = osrfMessageDeserialize(tmsg->body, NULL);
-        for (i = 0; i < msg_list->size; i++) {
-            one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+    transport_message *tmsg;
+    while (1) {
 
-            /*  if our client just successfully connected to an opensrf service,
-                cache the sender so that future calls on this thread will use
-                the correct recipient.  
-                TODO: this cache will grow to add one entry per client session.  
-                Even when entries are removed, they are not cleaned up until the
-                session_pool is destroyed.  We need to ensure that client sessions
-                don't last too long and/or create a last-touched timeout mechanism 
-                to periodically remove old entries. */
-            if (one_msg && one_msg->m_type == STATUS && 
-                    one_msg->status_code == OSRF_STATUS_OK) {
+        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+            return NULL;
+        }
 
-                if (!apr_hash_get(trans->session_cache, 
-                        tmsg->thread, APR_HASH_KEY_STRING)) {
+        // wait for a response
+        tmsg = client_recv(osrf_handle, -1);
 
-                    osrfLogDebug(OSRF_LOG_MARK, 
-                        "WS caching sender thread=%s, sender=%s", 
-                        tmsg->thread, tmsg->sender);
+        if (!tmsg) continue; // early exit on interrupt
 
-                    apr_hash_set(trans->session_cache, 
-                        apr_pstrdup(trans->session_pool, tmsg->thread),
-                        APR_HASH_KEY_STRING, 
-                        apr_pstrdup(trans->session_pool, tmsg->sender));
-                }
-            }
+        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+            return NULL;
         }
 
-        free(msg_string);
-        osrfListFree(msg_list);
-        jsonObjectFree(msg_wrapper);
+        osrf_responder_thread_main_body(tmsg);
         message_free(tmsg);                                                         
     }
 
     return NULL;
 }
 
+
+
 /**
  * Allocate the session cache and create the responder thread
  */
@@ -188,6 +203,7 @@ int child_init(const WebSocketServer *server) {
     apr_pool_t *pool = NULL;                                                
     apr_thread_t *thread = NULL;
     apr_threadattr_t *thread_attr = NULL;
+    apr_thread_mutex_t *mutex = NULL;
     request_rec *r = server->request(server);
         
     osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
@@ -241,6 +257,15 @@ int child_init(const WebSocketServer *server) {
         return 1;
     }
 
+    if (apr_thread_mutex_create(
+            &mutex, APR_THREAD_MUTEX_UNNESTED, 
+            trans->main_pool) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+        return 1;
+    }
+
+    trans->mutex = mutex;
+
     return APR_SUCCESS;
 }
 
@@ -371,14 +396,10 @@ static char* extract_inbound_messages(
     return osrfMessageSerializeBatch(msg_list, num_msgs);
 }
 
-
-
-
-
 /**
  * Parse opensrf request and relay the request to the opensrf network.
  */
-static size_t CALLBACK on_message_handler(void *data,
+static size_t on_message_handler_body(void *data,
                 const WebSocketServer *server, const int type, 
                 unsigned char *buffer, const size_t buffer_size) {
 
@@ -491,6 +512,25 @@ static size_t CALLBACK on_message_handler(void *data,
     return OK;
 }
 
+static size_t CALLBACK on_message_handler(void *data,
+                const WebSocketServer *server, const int type, 
+                unsigned char *buffer, const size_t buffer_size) {
+
+    if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+        return 1; // TODO: map to apr_status_t value?
+    }
+
+    apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
+
+    if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+        osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+        return 1;
+    }
+
+    return stat;
+}
+
 
 /**
  * Release all memory allocated from the translator pool and kill the pool.