From 61a076e0e3d5fa25dcdabdc2411efbb619ec9a12 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 29 Oct 2012 17:27:44 -0400 Subject: [PATCH] LP#1268619: websocket gateway repairs and cleanup * use jsonObjectFree() on jsonObjets, not free(); * removed some debugging logs Signed-off-by: Bill Erickson --- src/gateway/osrf_websocket_translator.c | 65 +++++++++++++++++---------------- 1 file changed, 33 insertions(+), 32 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index fd19f2d..2a6db5a 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -68,12 +68,10 @@ #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 128 -static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer -static transport_client *osrf_handle = NULL; typedef struct _osrfWebsocketTranslator { const WebSocketServer *server; - apr_pool_t *main_pool; // standline per-process pool + apr_pool_t *main_pool; // standalone per-process pool apr_pool_t *session_pool; // child of trans->main_pool; per-session apr_hash_t *session_cache; apr_thread_t *responder_thread; @@ -83,6 +81,8 @@ typedef struct _osrfWebsocketTranslator { } osrfWebsocketTranslator; static osrfWebsocketTranslator *trans = NULL; +static transport_client *osrf_handle = NULL; +static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer /** @@ -93,37 +93,38 @@ static osrfWebsocketTranslator *trans = NULL; void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) { request_rec *r = trans->server->request(trans->server); + transport_message *tmsg; jsonObject *msg_wrapper; char *msg_string; while (1) { - transport_message *msg = client_recv(osrf_handle, -1); - if (!msg) continue; // early exit on interrupt + tmsg = client_recv(osrf_handle, -1); + if (!tmsg) continue; // early exit on interrupt // discard responses received after client disconnect if (!trans->client_connected) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS discarding response for thread=%s, xid=%s", - msg->thread, msg->osrf_xid); - message_free(msg); + tmsg->thread, tmsg->osrf_xid); + message_free(tmsg); continue; } ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, "WS received opensrf response for thread=%s, xid=%s", - msg->thread, msg->osrf_xid); + tmsg->thread, tmsg->osrf_xid); // build the wrapper object msg_wrapper = jsonNewObject(NULL); - jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread)); - jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid)); - jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body)); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); + jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); + jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - if (msg->is_error) { + if (tmsg->is_error) { ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS received jabber error message in response to thread=%s and xid=%s", - msg->thread, msg->osrf_xid); + tmsg->thread, tmsg->osrf_xid); jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); } @@ -135,21 +136,22 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat // capture the true message sender // TODO: this will grow to add one entry per client session. - // need a last-touched timeout mechanism to periodically remove old entries - if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) { + // need to ensure that connected-sessions don't last /too/ long or create + // a last-touched timeout mechanism to periodically remove old entries + if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) { ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, - "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender); + "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender); apr_hash_set(trans->session_cache, - apr_pstrdup(trans->session_pool, msg->thread), + apr_pstrdup(trans->session_pool, tmsg->thread), APR_HASH_KEY_STRING, - apr_pstrdup(trans->session_pool, msg->sender)); + apr_pstrdup(trans->session_pool, tmsg->sender)); } free(msg_string); jsonObjectFree(msg_wrapper); - message_free(msg); + message_free(tmsg); } return NULL; @@ -202,8 +204,8 @@ int child_init(const WebSocketServer *server) { trans->osrf_router = osrfConfigGetValue(NULL, "/router_name"); trans->osrf_domain = osrfConfigGetValue(NULL, "/domain"); - // Create the responder thread. Once created, it runs for the lifetime - // of this process. + // Create the responder thread. Once created, + // it runs for the lifetime of this process. if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) && (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) && (apr_thread_create(&thread, thread_attr, @@ -247,18 +249,12 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { trans->session_pool = pool; trans->session_cache = apr_hash_make(trans->session_pool); - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS created new pool %x", trans->session_pool); - if (trans->session_cache == NULL) { ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, "WS unable to create session cache"); return NULL; } - ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, - "WS created new hash %x", trans->session_cache); - trans->client_connected = 1; return trans; } @@ -313,13 +309,18 @@ static size_t CALLBACK on_message_handler(void *data, log_xid = jsonObjectGetString(tmp_obj); if (log_xid) { + // use the caller-provide log trace id if (strlen(log_xid) > MAX_THREAD_SIZE) { ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 0, r, "WS log_xid exceeds max length"); return HTTP_BAD_REQUEST; } - osrfLogSetXid(log_xid); // TODO: make with with non-client + + // TODO: make this work with non-client and make this call accept + // const char*'s. casting to (char*) for now to silence warnings. + osrfLogSetXid((char*) log_xid); + } else { // generate a new log trace id for this relay osrfLogMkXid(); @@ -374,7 +375,7 @@ static size_t CALLBACK on_message_handler(void *data, osrfLogClearXid(); message_free(tmsg); - free(msg_wrapper); + jsonObjectFree(msg_wrapper); free(msg_body); return OK; @@ -400,10 +401,10 @@ void CALLBACK on_disconnect_handler( "WS disconnect from %s", r->connection->remote_ip); } +/** + * Be nice and clean up our mess + */ void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) { - fprintf(stderr, "WS on_destroy_handler()\n"); - fflush(stderr); - if (trans) { apr_thread_exit(trans->responder_thread, APR_SUCCESS); apr_pool_destroy(trans->main_pool); -- 2.11.0