static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
+static void clear_cached_recipient(const char* thread) {
+ apr_pool_t *pool = NULL;
+
+ if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
+
+ osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
+
+ // remove it from the hash
+ apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
+
+ if (apr_hash_count(trans->session_cache) == 0) {
+ osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
+
+ // memory accumulates in the session_pool as sessions are cached then
+ // un-cached. Un-caching removes strings from the hash, but not the
+ // pool itself. That only happens when the pool is destroyed. destroy
+ // the session pool to clear any lingering memory
+ apr_pool_destroy(trans->session_pool);
+
+ // create a standalone pool for our translator data
+ if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
+ trans->session_pool = NULL;
+ return;
+ }
+
+ trans->session_pool = pool;
+ }
+ }
+}
+
+
+
void* osrf_responder_thread_main_body(transport_message *tmsg) {
- jsonObject *msg_wrapper;
- osrfList *msg_list;
- osrfMessage *one_msg;
- char *msg_string;
+ osrfList *msg_list = NULL;
+ osrfMessage *one_msg = NULL;
int i;
- // discard responses received after client disconnect
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS received opensrf response for thread=%s, xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+
+ // first we need to perform some maintenance
+ msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+ for (i = 0; i < msg_list->size; i++) {
+ one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS returned response of type %d", one_msg->m_type);
+
+ /* if our client just successfully connected to an opensrf service,
+ cache the sender so that future calls on this thread will use
+ the correct recipient. */
+ if (one_msg && one_msg->m_type == STATUS) {
+
+
+ // only cache recipients if the client is still connected
+ if (trans->client_connected &&
+ one_msg->status_code == OSRF_STATUS_OK) {
+
+ if (!apr_hash_get(trans->session_cache,
+ tmsg->thread, APR_HASH_KEY_STRING)) {
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS caching sender thread=%s, sender=%s",
+ tmsg->thread, tmsg->sender);
+
+ apr_hash_set(trans->session_cache,
+ apr_pstrdup(trans->session_pool, tmsg->thread),
+ APR_HASH_KEY_STRING,
+ apr_pstrdup(trans->session_pool, tmsg->sender));
+ }
+
+ } else {
+
+ // connection timed out; clear the cached recipient
+ // regardless of whether the client is still connected
+ if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+ clear_cached_recipient(tmsg->thread);
+ }
+ }
+ }
+
+ // maintenance is done
+ osrfListFree(msg_list);
+
if (!trans->client_connected) {
+ // responses received after client disconnect are discarded
+
osrfLogDebug(OSRF_LOG_MARK,
"WS discarding response for thread=%s, xid=%s",
tmsg->thread, tmsg->osrf_xid);
- return;
+
+ return;
}
- osrfLogDebug(OSRF_LOG_MARK,
- "WS received opensrf response for thread=%s, xid=%s",
- tmsg->thread, tmsg->osrf_xid);
+
+ // client is still connected; relay the messages to the client
+ jsonObject *msg_wrapper = NULL;
+ char *msg_string = NULL;
// build the wrapper object
msg_wrapper = jsonNewObject(NULL);
trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
(unsigned char*) msg_string, strlen(msg_string));
- msg_list = osrfMessageDeserialize(tmsg->body, NULL);
- for (i = 0; i < msg_list->size; i++) {
- one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
-
- /* if our client just successfully connected to an opensrf service,
- cache the sender so that future calls on this thread will use
- the correct recipient.
- TODO: this cache will grow to add one entry per connected client
- session. Even when entries are removed, they are not cleaned up
- until the session_pool is destroyed. We need to ensure that client
- sessions don't last too long and/or create a last-touched timeout
- mechanism to periodically remove old entries. */
- if (one_msg && one_msg->m_type == STATUS &&
- one_msg->status_code == OSRF_STATUS_OK) {
-
- if (!apr_hash_get(trans->session_cache,
- tmsg->thread, APR_HASH_KEY_STRING)) {
-
- osrfLogDebug(OSRF_LOG_MARK,
- "WS caching sender thread=%s, sender=%s",
- tmsg->thread, tmsg->sender);
-
- apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, tmsg->thread),
- APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, tmsg->sender));
- }
- }
- }
-
free(msg_string);
- osrfListFree(msg_list);
jsonObjectFree(msg_wrapper);
+
}
/**
trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
+ trans->session_cache = apr_hash_make(pool);
+
+ if (trans->session_cache == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+ return 1;
+ }
+
// Create the responder thread. Once created,
// it runs for the lifetime of this process.
if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
}
}
- // create a standalone pool for the session cache values, which will be
- // destroyed on client disconnect.
- //if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
+ // create a standalone pool for the session cache values
+ // this pool will be destroyed and re-created regularly to
+ // clear session memory
if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
return NULL;
}
trans->session_pool = pool;
- trans->session_cache = apr_hash_make(trans->session_pool);
-
- if (trans->session_cache == NULL) {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
- return NULL;
- }
-
trans->client_connected = 1;
return trans;
}
}
case DISCONNECT:
-
- if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
-
- // this clears the hash value, but the key/value memory
- // will not be cleared until their pool is cleared.
- apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, thread),
- APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, recipient));
- }
-
+ clear_cached_recipient(thread);
+ break;
}
}
osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
trans->client_connected = 0;
- apr_hash_clear(trans->session_cache);
-
/*
It's not necessary to destroy our session_pool, since
it's a child of the apache request_rec pool, which is
*/
trans->session_pool = NULL;
- trans->session_cache = NULL;
request_rec *r = server->request(server);
osrfLogDebug(OSRF_LOG_MARK,