From: Bill Erickson Date: Tue, 11 Mar 2014 21:25:19 +0000 (-0400) Subject: LP#1268619: websockets: gateway code repairs X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=f478e6d4f06df6da312d6776ee1d6a30d9a157a8;p=working%2FOpenSRF.git LP#1268619: websockets: gateway code repairs * avoid unneccessary and wrong incantation of apr_thread_exit. The two sub-threads now both live for the duration of the process. * to be safe, create thread mutex before threads Signed-off-by: Bill Erickson --- diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 5b9d607..be643e5 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -181,6 +181,7 @@ static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer static void clear_cached_recipient(const char* thread) { apr_pool_t *pool = NULL; + request_rec *r = trans->server->request(trans->server); if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { @@ -199,7 +200,7 @@ static void clear_cached_recipient(const char* thread) { // re-create it for future caching. apr_pool_destroy(trans->session_pool); - if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); trans->session_pool = NULL; return; @@ -303,6 +304,45 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { } /** + * Responder thread main body. + * Collects responses from the opensrf network and relays them to the + * websocket caller. + */ +void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { + + transport_message *tmsg; + while (1) { + + if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); + return NULL; + } + + // wait for a response + tmsg = client_recv(osrf_handle, -1); + + if (!tmsg) continue; // interrupt + + if (trans->client_connected) { + + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); + return NULL; + } + + osrfLogForceXid(tmsg->osrf_xid); + osrf_responder_thread_main_body(tmsg); + last_activity_time = time(NULL); + } + + message_free(tmsg); + } + + return NULL; +} + + +/** * Sleep and regularly wake to see if the process has been idle for too * long. If so, send a disconnect to the client. * @@ -327,7 +367,7 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( // During graceful shtudown, we may wait up to // idle_check_interval seconds before initiating shutdown. sleep(sleep_time); - + if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); return NULL; @@ -412,41 +452,6 @@ void* APR_THREAD_FUNC osrf_idle_timeout_thread_main( return NULL; } -/** - * Responder thread main body. - * Collects responses from the opensrf network and relays them to the - * websocket caller. - */ -void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { - - transport_message *tmsg; - while (1) { - - if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex"); - return NULL; - } - - // wait for a response - tmsg = client_recv(osrf_handle, -1); - - if (!tmsg) continue; // early exit on interrupt - - if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex"); - return NULL; - } - - osrfLogForceXid(tmsg->osrf_xid); - osrf_responder_thread_main_body(tmsg); - message_free(tmsg); - last_activity_time = time(NULL); - } - - return NULL; -} - - /** * Connect to OpenSRF, create the main pool, responder thread @@ -460,7 +465,6 @@ int child_init(const WebSocketServer *server) { apr_thread_mutex_t *mutex = NULL; request_rec *r = server->request(server); - // osrf_handle will already be connected if this is not the first request // served by this process. if ( !(osrf_handle = osrfSystemGetTransportClient()) ) { @@ -524,7 +528,9 @@ int child_init(const WebSocketServer *server) { osrf_handle = osrfSystemGetTransportClient(); } - // create a standalone pool for our translator data + // create a pool for our translator data + // Do not use r->pool as the parent, since r->pool will be freed + // when this client disconnects. if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return 1; @@ -545,12 +551,19 @@ int child_init(const WebSocketServer *server) { trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); trans->session_cache = apr_hash_make(pool); - if (trans->session_cache == NULL) { osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); return 1; } + if (apr_thread_mutex_create( + &mutex, APR_THREAD_MUTEX_UNNESTED, + trans->main_pool) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); + return 1; + } + trans->mutex = mutex; + // Create the responder thread. Once created, // it runs for the lifetime of this process. if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && @@ -582,18 +595,7 @@ int child_init(const WebSocketServer *server) { return 1; } - - if (apr_thread_mutex_create( - &mutex, APR_THREAD_MUTEX_UNNESTED, - trans->main_pool) != APR_SUCCESS) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex"); - return 1; - } - - trans->mutex = mutex; - signal(SIGUSR1, sigusr1_handler); - return APR_SUCCESS; } @@ -616,9 +618,11 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { } } - // create a standalone pool for the session cache values + // create a pool for the session cache values // this pool will be destroyed and re-created regularly // to clear session memory + // Use r->pool as the parent since this pool only lives to serve + // a single connection. if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return NULL; @@ -867,13 +871,11 @@ static size_t CALLBACK on_message_handler(void *data, void CALLBACK on_disconnect_handler( void *data, const WebSocketServer *server) { - osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data; + request_rec *r = server->request(server); + osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); + trans->client_connected = 0; - // timeout thread is recreated w/ each new connection - apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS); - trans->idle_timeout_thread = NULL; - // ensure no errant session data is sticking around apr_hash_clear(trans->session_cache); @@ -882,31 +884,12 @@ void CALLBACK on_disconnect_handler( // destroy it ourselves. apr_pool_destroy(trans->session_pool); trans->session_pool = NULL; - - request_rec *r = server->request(server); - - osrfLogInfo(OSRF_LOG_MARK, "WS disconnect from %s", get_client_ip(r)); -} - -/** - * Be nice and clean up our mess - */ -void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { - if (trans) { - apr_thread_exit(trans->responder_thread, APR_SUCCESS); - apr_thread_mutex_destroy(trans->mutex); - if (trans->session_pool) - apr_pool_destroy(trans->session_pool); - apr_pool_destroy(trans->main_pool); - } - - trans = NULL; } static WebSocketPlugin osrf_websocket_plugin = { sizeof(WebSocketPlugin), WEBSOCKET_PLUGIN_VERSION_0, - on_destroy_handler, + NULL, // on_destroy_handler on_connect_handler, on_message_handler, on_disconnect_handler