LP#1268619: websocket translator : session memory goodness
authorBill Erickson <berick@esilibrary.com>
Mon, 10 Dec 2012 16:47:43 +0000 (11:47 -0500)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:34 +0000 (16:10 -0400)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index fa54821..35f986d 100644 (file)
@@ -86,25 +86,108 @@ static transport_client *osrf_handle = NULL;
 static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 
+static void clear_cached_recipient(const char* thread) {
+    apr_pool_t *pool = NULL;                                                
+
+    if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
+
+        osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
+
+        // remove it from the hash
+        apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
+
+        if (apr_hash_count(trans->session_cache) == 0) {
+            osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
+
+            // memory accumulates in the session_pool as sessions are cached then 
+            // un-cached.  Un-caching removes strings from the hash, but not the 
+            // pool itself.  That only happens when the pool is destroyed. destroy 
+            // the session pool to clear any lingering memory
+            apr_pool_destroy(trans->session_pool);
+    
+            // create a standalone pool for our translator data
+            if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+                osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
+                trans->session_pool = NULL;
+                return;
+            }
+
+            trans->session_pool = pool;
+        }
+    }
+}
+
+
+
 void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
-    jsonObject *msg_wrapper;
-    osrfList *msg_list;
-    osrfMessage *one_msg;
-    char *msg_string;
+    osrfList *msg_list = NULL;
+    osrfMessage *one_msg = NULL;
     int i;
 
-    // discard responses received after client disconnect
+    osrfLogDebug(OSRF_LOG_MARK, 
+        "WS received opensrf response for thread=%s, xid=%s", 
+            tmsg->thread, tmsg->osrf_xid);
+
+    // first we need to perform some maintenance
+    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+    for (i = 0; i < msg_list->size; i++) {
+        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+        osrfLogDebug(OSRF_LOG_MARK, 
+            "WS returned response of type %d", one_msg->m_type);
+
+        /*  if our client just successfully connected to an opensrf service,
+            cache the sender so that future calls on this thread will use
+            the correct recipient. */
+        if (one_msg && one_msg->m_type == STATUS) {
+
+
+            // only cache recipients if the client is still connected
+            if (trans->client_connected && 
+                    one_msg->status_code == OSRF_STATUS_OK) {
+
+                if (!apr_hash_get(trans->session_cache, 
+                        tmsg->thread, APR_HASH_KEY_STRING)) {
+
+                    osrfLogDebug(OSRF_LOG_MARK, 
+                        "WS caching sender thread=%s, sender=%s", 
+                        tmsg->thread, tmsg->sender);
+
+                    apr_hash_set(trans->session_cache, 
+                        apr_pstrdup(trans->session_pool, tmsg->thread),
+                        APR_HASH_KEY_STRING, 
+                        apr_pstrdup(trans->session_pool, tmsg->sender));
+                }
+
+            } else {
+
+                // connection timed out; clear the cached recipient
+                // regardless of whether the client is still connected
+                if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+                    clear_cached_recipient(tmsg->thread);
+            }
+        }
+    }
+
+    // maintenance is done
+    osrfListFree(msg_list);
+
     if (!trans->client_connected) {
+        // responses received after client disconnect are discarded
+
         osrfLogDebug(OSRF_LOG_MARK, 
             "WS discarding response for thread=%s, xid=%s", 
             tmsg->thread, tmsg->osrf_xid);
-        return; 
+
+        return;
     }
 
-    osrfLogDebug(OSRF_LOG_MARK, 
-        "WS received opensrf response for thread=%s, xid=%s", 
-            tmsg->thread, tmsg->osrf_xid);
+    
+    // client is still connected; relay the messages to the client
+    jsonObject *msg_wrapper = NULL;
+    char *msg_string = NULL;
 
     // build the wrapper object
     msg_wrapper = jsonNewObject(NULL);
@@ -126,39 +209,9 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
     trans->server->send(trans->server, MESSAGE_TYPE_TEXT, 
         (unsigned char*) msg_string, strlen(msg_string));
 
-    msg_list = osrfMessageDeserialize(tmsg->body, NULL);
-    for (i = 0; i < msg_list->size; i++) {
-        one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
-
-        /*  if our client just successfully connected to an opensrf service,
-            cache the sender so that future calls on this thread will use
-            the correct recipient.  
-            TODO: this cache will grow to add one entry per connected client 
-            session.  Even when entries are removed, they are not cleaned up 
-            until the session_pool is destroyed.  We need to ensure that client 
-            sessions don't last too long and/or create a last-touched timeout 
-            mechanism to periodically remove old entries. */
-        if (one_msg && one_msg->m_type == STATUS && 
-                one_msg->status_code == OSRF_STATUS_OK) {
-
-            if (!apr_hash_get(trans->session_cache, 
-                    tmsg->thread, APR_HASH_KEY_STRING)) {
-
-                osrfLogDebug(OSRF_LOG_MARK, 
-                    "WS caching sender thread=%s, sender=%s", 
-                    tmsg->thread, tmsg->sender);
-
-                apr_hash_set(trans->session_cache, 
-                    apr_pstrdup(trans->session_pool, tmsg->thread),
-                    APR_HASH_KEY_STRING, 
-                    apr_pstrdup(trans->session_pool, tmsg->sender));
-            }
-        }
-    }
-
     free(msg_string);
-    osrfListFree(msg_list);
     jsonObjectFree(msg_wrapper);
+
 }
 
 /**
@@ -243,6 +296,13 @@ int child_init(const WebSocketServer *server) {
     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
+    trans->session_cache = apr_hash_make(pool);
+
+    if (trans->session_cache == NULL) {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+        return 1;
+    }
+
     // Create the responder thread.  Once created, 
     // it runs for the lifetime of this process.
     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
@@ -286,22 +346,15 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
         }
     }
 
-    // create a standalone pool for the session cache values, which will be
-    // destroyed on client disconnect.
-    //if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
+    // create a standalone pool for the session cache values
+    // this pool will be destroyed and re-created regularly to 
+    // clear session memory
     if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
         osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
         return NULL;
     }
 
     trans->session_pool = pool;
-    trans->session_cache = apr_hash_make(trans->session_pool);
-
-    if (trans->session_cache == NULL) {
-        osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
-        return NULL;
-    }
-
     trans->client_connected = 1;
     return trans;
 }
@@ -379,17 +432,8 @@ static char* extract_inbound_messages(
             }
 
             case DISCONNECT:
-
-                if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
-
-                    // this clears the hash value, but the key/value memory 
-                    // will not be cleared until their pool is cleared.
-                    apr_hash_set(trans->session_cache, 
-                        apr_pstrdup(trans->session_pool, thread),
-                        APR_HASH_KEY_STRING, 
-                        apr_pstrdup(trans->session_pool, recipient));
-                }
-
+                clear_cached_recipient(thread);
+                break;
         }
     }
 
@@ -541,8 +585,6 @@ void CALLBACK on_disconnect_handler(
     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
     trans->client_connected = 0;
 
-    apr_hash_clear(trans->session_cache);
-
     /*
     It's not necessary to destroy our session_pool, since
     it's a child of the apache request_rec pool, which is 
@@ -551,7 +593,6 @@ void CALLBACK on_disconnect_handler(
     */
     
     trans->session_pool = NULL;
-    trans->session_cache = NULL;
 
     request_rec *r = server->request(server);
     osrfLogDebug(OSRF_LOG_MARK,