From 78ad637486ec0992f083dafc4efcc4e79e5e6ceb Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 10 Dec 2012 11:47:43 -0500 Subject: [PATCH] LP#1268619: websocket translator : session memory goodness Signed-off-by: Bill Erickson --- src/gateway/osrf_websocket_translator.c | 169 ++++++++++++++++++++------------ 1 file changed, 105 insertions(+), 64 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index fa54821..35f986d 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -86,25 +86,108 @@ static transport_client *osrf_handle = NULL; static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer +static void clear_cached_recipient(const char* thread) { + apr_pool_t *pool = NULL; + + if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { + + osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect"); + + // remove it from the hash + apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL); + + if (apr_hash_count(trans->session_cache) == 0) { + osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool"); + + // memory accumulates in the session_pool as sessions are cached then + // un-cached. Un-caching removes strings from the hash, but not the + // pool itself. That only happens when the pool is destroyed. destroy + // the session pool to clear any lingering memory + apr_pool_destroy(trans->session_pool); + + // create a standalone pool for our translator data + if (apr_pool_create(&pool, NULL) != APR_SUCCESS) { + osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool"); + trans->session_pool = NULL; + return; + } + + trans->session_pool = pool; + } + } +} + + + void* osrf_responder_thread_main_body(transport_message *tmsg) { - jsonObject *msg_wrapper; - osrfList *msg_list; - osrfMessage *one_msg; - char *msg_string; + osrfList *msg_list = NULL; + osrfMessage *one_msg = NULL; int i; - // discard responses received after client disconnect + osrfLogDebug(OSRF_LOG_MARK, + "WS received opensrf response for thread=%s, xid=%s", + tmsg->thread, tmsg->osrf_xid); + + // first we need to perform some maintenance + msg_list = osrfMessageDeserialize(tmsg->body, NULL); + + for (i = 0; i < msg_list->size; i++) { + one_msg = OSRF_LIST_GET_INDEX(msg_list, i); + + osrfLogDebug(OSRF_LOG_MARK, + "WS returned response of type %d", one_msg->m_type); + + /* if our client just successfully connected to an opensrf service, + cache the sender so that future calls on this thread will use + the correct recipient. */ + if (one_msg && one_msg->m_type == STATUS) { + + + // only cache recipients if the client is still connected + if (trans->client_connected && + one_msg->status_code == OSRF_STATUS_OK) { + + if (!apr_hash_get(trans->session_cache, + tmsg->thread, APR_HASH_KEY_STRING)) { + + osrfLogDebug(OSRF_LOG_MARK, + "WS caching sender thread=%s, sender=%s", + tmsg->thread, tmsg->sender); + + apr_hash_set(trans->session_cache, + apr_pstrdup(trans->session_pool, tmsg->thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->session_pool, tmsg->sender)); + } + + } else { + + // connection timed out; clear the cached recipient + // regardless of whether the client is still connected + if (one_msg->status_code == OSRF_STATUS_TIMEOUT) + clear_cached_recipient(tmsg->thread); + } + } + } + + // maintenance is done + osrfListFree(msg_list); + if (!trans->client_connected) { + // responses received after client disconnect are discarded + osrfLogDebug(OSRF_LOG_MARK, "WS discarding response for thread=%s, xid=%s", tmsg->thread, tmsg->osrf_xid); - return; + + return; } - osrfLogDebug(OSRF_LOG_MARK, - "WS received opensrf response for thread=%s, xid=%s", - tmsg->thread, tmsg->osrf_xid); + + // client is still connected; relay the messages to the client + jsonObject *msg_wrapper = NULL; + char *msg_string = NULL; // build the wrapper object msg_wrapper = jsonNewObject(NULL); @@ -126,39 +209,9 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) { trans->server->send(trans->server, MESSAGE_TYPE_TEXT, (unsigned char*) msg_string, strlen(msg_string)); - msg_list = osrfMessageDeserialize(tmsg->body, NULL); - for (i = 0; i < msg_list->size; i++) { - one_msg = OSRF_LIST_GET_INDEX(msg_list, i); - - /* if our client just successfully connected to an opensrf service, - cache the sender so that future calls on this thread will use - the correct recipient. - TODO: this cache will grow to add one entry per connected client - session. Even when entries are removed, they are not cleaned up - until the session_pool is destroyed. We need to ensure that client - sessions don't last too long and/or create a last-touched timeout - mechanism to periodically remove old entries. */ - if (one_msg && one_msg->m_type == STATUS && - one_msg->status_code == OSRF_STATUS_OK) { - - if (!apr_hash_get(trans->session_cache, - tmsg->thread, APR_HASH_KEY_STRING)) { - - osrfLogDebug(OSRF_LOG_MARK, - "WS caching sender thread=%s, sender=%s", - tmsg->thread, tmsg->sender); - - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, tmsg->thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, tmsg->sender)); - } - } - } - free(msg_string); - osrfListFree(msg_list); jsonObjectFree(msg_wrapper); + } /** @@ -243,6 +296,13 @@ int child_init(const WebSocketServer *server) { trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); + trans->session_cache = apr_hash_make(pool); + + if (trans->session_cache == NULL) { + osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); + return 1; + } + // Create the responder thread. Once created, // it runs for the lifetime of this process. if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && @@ -286,22 +346,15 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { } } - // create a standalone pool for the session cache values, which will be - // destroyed on client disconnect. - //if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) { + // create a standalone pool for the session cache values + // this pool will be destroyed and re-created regularly to + // clear session memory if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) { osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool"); return NULL; } trans->session_pool = pool; - trans->session_cache = apr_hash_make(trans->session_pool); - - if (trans->session_cache == NULL) { - osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache"); - return NULL; - } - trans->client_connected = 1; return trans; } @@ -379,17 +432,8 @@ static char* extract_inbound_messages( } case DISCONNECT: - - if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { - - // this clears the hash value, but the key/value memory - // will not be cleared until their pool is cleared. - apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, thread), - APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, recipient)); - } - + clear_cached_recipient(thread); + break; } } @@ -541,8 +585,6 @@ void CALLBACK on_disconnect_handler( osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data; trans->client_connected = 0; - apr_hash_clear(trans->session_cache); - /* It's not necessary to destroy our session_pool, since it's a child of the apache request_rec pool, which is @@ -551,7 +593,6 @@ void CALLBACK on_disconnect_handler( */ trans->session_pool = NULL; - trans->session_cache = NULL; request_rec *r = server->request(server); osrfLogDebug(OSRF_LOG_MARK, -- 2.11.0