From: Bill Erickson Date: Tue, 4 Dec 2012 22:34:41 +0000 (-0500) Subject: LP#1268619: websocket : wrap all thread work in mutex X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=a1603e88b812f2c7c5e559816250833a61132e0c;p=working%2FOpenSRF.git LP#1268619: websocket : wrap all thread work in mutex Signed-off-by: Bill Erickson --- diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 2cb66c6..4ce4db2 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -75,6 +75,7 @@ typedef struct _osrfWebsocketTranslator { apr_pool_t *session_pool; // child of trans->main_pool; per-session apr_hash_t *session_cache; apr_thread_t *responder_thread; + apr_thread_mutex_t *mutex; int client_connected; char* osrf_router; char* osrf_domain; @@ -85,101 +86,115 @@ static transport_client *osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer -/** - * Responder thread main body. - * Collects responses from the opensrf network and relays them to the - * websocket caller. - */ -void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { +void* osrf_responder_thread_main_body(transport_message *tmsg) { - transport_message *tmsg; jsonObject *msg_wrapper; osrfList *msg_list; osrfMessage *one_msg; char *msg_string; int i; - while (1) { - // TODO: RELEASE THREAD MUTEX - - tmsg = client_recv(osrf_handle, -1); - - if (!tmsg) continue; // early exit on interrupt + // discard responses received after client disconnect + if (!trans->client_connected) { + osrfLogDebug(OSRF_LOG_MARK, + "WS discarding response for thread=%s, xid=%s", + tmsg->thread, tmsg->osrf_xid); + return; + } - // TODO: THREAD MUTEX LOCK STARTS - - // discard responses received after client disconnect - if (!trans->client_connected) { - osrfLogDebug(OSRF_LOG_MARK, - "WS discarding response for thread=%s, xid=%s", - tmsg->thread, tmsg->osrf_xid); - message_free(tmsg); - continue; - } + osrfLogDebug(OSRF_LOG_MARK, + "WS received opensrf response for thread=%s, xid=%s", + tmsg->thread, tmsg->osrf_xid); + + // build the wrapper object + msg_wrapper = jsonNewObject(NULL); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); + + if (tmsg->is_error) { + fprintf(stderr, + "WS received jabber error message in response to thread=%s and xid=%s", + tmsg->thread, tmsg->osrf_xid); + fflush(stderr); + jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + } - osrfLogDebug(OSRF_LOG_MARK, - "WS received opensrf response for thread=%s, xid=%s", - tmsg->thread, tmsg->osrf_xid); - - // build the wrapper object - msg_wrapper = jsonNewObject(NULL); - jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); - jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); - jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - - if (tmsg->is_error) { - fprintf(stderr, - "WS received jabber error message in response to thread=%s and xid=%s", - tmsg->thread, tmsg->osrf_xid); - fflush(stderr); - jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); + msg_string = jsonObjectToJSONRaw(msg_wrapper); + + // deliver the wrapped message json to the websocket client + trans->server->send(trans->server, MESSAGE_TYPE_TEXT, + (unsigned char*) msg_string, strlen(msg_string)); + + msg_list = osrfMessageDeserialize(tmsg->body, NULL); + for (i = 0; i < msg_list->size; i++) { + one_msg = OSRF_LIST_GET_INDEX(msg_list, i); + + /* if our client just successfully connected to an opensrf service, + cache the sender so that future calls on this thread will use + the correct recipient. + TODO: this cache will grow to add one entry per connected client + session. Even when entries are removed, they are not cleaned up + until the session_pool is destroyed. We need to ensure that client + sessions don't last too long and/or create a last-touched timeout + mechanism to periodically remove old entries. */ + if (one_msg && one_msg->m_type == STATUS && + one_msg->status_code == OSRF_STATUS_OK) { + + if (!apr_hash_get(trans->session_cache, + tmsg->thread, APR_HASH_KEY_STRING)) { + + osrfLogDebug(OSRF_LOG_MARK, + "WS caching sender thread=%s, sender=%s", + tmsg->thread, tmsg->sender); + + apr_hash_set(trans->session_cache, + apr_pstrdup(trans->session_pool, tmsg->thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->session_pool, tmsg->sender)); + } } + } - msg_string = jsonObjectToJSONRaw(msg_wrapper); + free(msg_string); + osrfListFree(msg_list); + jsonObjectFree(msg_wrapper); +} - // deliver the wrapped message json to the websocket client - trans->server->send(trans->server, MESSAGE_TYPE_TEXT, - (unsigned char*) msg_string, strlen(msg_string)); +/** + * Responder thread main body. + * Collects responses from the opensrf network and relays them to the + * websocket caller. + */ +void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { - msg_list = osrfMessageDeserialize(tmsg->body, NULL); - for (i = 0; i < msg_list->size; i++) { - one_msg = OSRF_LIST_GET_INDEX(msg_list, i); + transport_message *tmsg; + while (1) { - /* if our client just successfully connected to an opensrf service, - cache the sender so that future calls on this thread will use - the correct recipient. - TODO: this cache will grow to add one entry per client session. - Even when entries are removed, they are not cleaned up until the - session_pool is destroyed. We need to ensure that client sessions - don't last too long and/or create a last-touched timeout mechanism - to periodically remove old entries. */ - if (one_msg && one_msg->m_type == STATUS && - one_msg->status_code == OSRF_STATUS_OK) { + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } - if (!apr_hash_get(trans->session_cache, - tmsg->thread, APR_HASH_KEY_STRING)) { + // wait for a response + tmsg = client_recv(osrf_handle, -1); - osrfLogDebug(OSRF_LOG_MARK, - "WS caching sender thread=%s, sender=%s", - tmsg->thread, tmsg->sender); + if (!tmsg) continue; // early exit on interrupt - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, tmsg->sender)); - } - } + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return NULL; } - free(msg_string); - osrfListFree(msg_list); - jsonObjectFree(msg_wrapper); + osrf_responder_thread_main_body(tmsg); message_free(tmsg); } return NULL; } + + /** * Allocate the session cache and create the responder thread */ @@ -188,6 +203,7 @@ int child_init(const WebSocketServer *server) { apr_pool_t *pool = NULL; apr_thread_t *thread = NULL; apr_threadattr_t *thread_attr = NULL; + apr_thread_mutex_t *mutex = NULL; request_rec *r = server->request(server); osrfLogDebug(OSRF_LOG_MARK, "WS child_init"); @@ -241,6 +257,15 @@ int child_init(const WebSocketServer *server) { return 1; } + if (apr_thread_mutex_create( + &mutex, APR_THREAD_MUTEX_UNNESTED, + trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); + return 1; + } + + trans->mutex = mutex; + return APR_SUCCESS; } @@ -371,14 +396,10 @@ static char* extract_inbound_messages( return osrfMessageSerializeBatch(msg_list, num_msgs); } - - - - /** * Parse opensrf request and relay the request to the opensrf network. */ -static size_t CALLBACK on_message_handler(void *data, +static size_t on_message_handler_body(void *data, const WebSocketServer *server, const int type, unsigned char *buffer, const size_t buffer_size) { @@ -491,6 +512,25 @@ static size_t CALLBACK on_message_handler(void *data, return OK; } +static size_t CALLBACK on_message_handler(void *data, + const WebSocketServer *server, const int type, + unsigned char *buffer, const size_t buffer_size) { + + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return 1; // TODO: map to apr_status_t value? + } + + apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size); + + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return 1; + } + + return stat; +} + /** * Release all memory allocated from the translator pool and kill the pool.