LP#1268619: websocket gateway repairs and cleanup
authorBill Erickson <berick@esilibrary.com>
Mon, 29 Oct 2012 21:27:44 +0000 (17:27 -0400)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:33 +0000 (16:10 -0400)
* use jsonObjectFree() on jsonObjets, not free();
* removed some debugging logs

Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index fd19f2d..2a6db5a 100644 (file)
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 128
-static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-static transport_client *osrf_handle = NULL;
 
 typedef struct _osrfWebsocketTranslator {
     const WebSocketServer *server;
-    apr_pool_t *main_pool; // standline per-process pool
+    apr_pool_t *main_pool; // standalone per-process pool
     apr_pool_t *session_pool; // child of trans->main_pool; per-session
     apr_hash_t *session_cache; 
     apr_thread_t *responder_thread;
@@ -83,6 +81,8 @@ typedef struct _osrfWebsocketTranslator {
 } osrfWebsocketTranslator;
 
 static osrfWebsocketTranslator *trans = NULL;
+static transport_client *osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
 
 
 /**
@@ -93,37 +93,38 @@ static osrfWebsocketTranslator *trans = NULL;
 void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
 
     request_rec *r = trans->server->request(trans->server);
+    transport_message *tmsg;
     jsonObject *msg_wrapper;
     char *msg_string;
 
     while (1) {
 
-        transport_message *msg = client_recv(osrf_handle, -1);
-        if (!msg) continue; // early exit on interrupt
+        tmsg = client_recv(osrf_handle, -1);
+        if (!tmsg) continue; // early exit on interrupt
         
         // discard responses received after client disconnect
         if (!trans->client_connected) {
             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
                 "WS discarding response for thread=%s, xid=%s", 
-                    msg->thread, msg->osrf_xid);
-            message_free(msg);                                                         
+                    tmsg->thread, tmsg->osrf_xid);
+            message_free(tmsg);                                                         
             continue; 
         }
 
         ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
             "WS received opensrf response for thread=%s, xid=%s", 
-                msg->thread, msg->osrf_xid);
+                tmsg->thread, tmsg->osrf_xid);
 
         // build the wrapper object
         msg_wrapper = jsonNewObject(NULL);
-        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
-        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
-        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
+        jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+        jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+        jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
 
-        if (msg->is_error) {
+        if (tmsg->is_error) {
             ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
                 "WS received jabber error message in response to thread=%s and xid=%s", 
-                    msg->thread, msg->osrf_xid);
+                    tmsg->thread, tmsg->osrf_xid);
             jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
         }
 
@@ -135,21 +136,22 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
 
         // capture the true message sender
         // TODO: this will grow to add one entry per client session.  
-        // need a last-touched timeout mechanism to periodically remove old entries
-        if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
+        // need to ensure that connected-sessions don't last /too/ long or create 
+        // a last-touched timeout mechanism to periodically remove old  entries
+        if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
 
             ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
-                "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
+                "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
 
             apr_hash_set(trans->session_cache, 
-                apr_pstrdup(trans->session_pool, msg->thread),
+                apr_pstrdup(trans->session_pool, tmsg->thread),
                 APR_HASH_KEY_STRING, 
-                apr_pstrdup(trans->session_pool, msg->sender));
+                apr_pstrdup(trans->session_pool, tmsg->sender));
         }
 
         free(msg_string);
         jsonObjectFree(msg_wrapper);
-        message_free(msg);                                                         
+        message_free(tmsg);                                                         
     }
 
     return NULL;
@@ -202,8 +204,8 @@ int child_init(const WebSocketServer *server) {
     trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");                      
     trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
 
-    // Create the responder thread.  Once created, it runs for the lifetime
-    // of this process.
+    // Create the responder thread.  Once created, 
+    // it runs for the lifetime of this process.
     if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
          (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
          (apr_thread_create(&thread, thread_attr, 
@@ -247,18 +249,12 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
     trans->session_pool = pool;
     trans->session_cache = apr_hash_make(trans->session_pool);
 
-    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-        "WS created new pool %x", trans->session_pool);
-
     if (trans->session_cache == NULL) {
         ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
             "WS unable to create session cache");
         return NULL;
     }
 
-    ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
-        "WS created new hash %x", trans->session_cache);
-
     trans->client_connected = 1;
     return trans;
 }
@@ -313,13 +309,18 @@ static size_t CALLBACK on_message_handler(void *data,
         log_xid = jsonObjectGetString(tmp_obj);
 
     if (log_xid) {
+
         // use the caller-provide log trace id
         if (strlen(log_xid) > MAX_THREAD_SIZE) {
             ap_log_rerror(APLOG_MARK, APLOG_NOTICE, 
                 0, r, "WS log_xid exceeds max length");
             return HTTP_BAD_REQUEST;
         }
-        osrfLogSetXid(log_xid); // TODO: make with with non-client
+
+        // TODO: make this work with non-client and make this call accept 
+        // const char*'s.  casting to (char*) for now to silence warnings.
+        osrfLogSetXid((char*) log_xid); 
+
     } else {
         // generate a new log trace id for this relay
         osrfLogMkXid();
@@ -374,7 +375,7 @@ static size_t CALLBACK on_message_handler(void *data,
     osrfLogClearXid();
 
     message_free(tmsg);                                                         
-    free(msg_wrapper);
+    jsonObjectFree(msg_wrapper);
     free(msg_body);
 
     return OK;
@@ -400,10 +401,10 @@ void CALLBACK on_disconnect_handler(
         "WS disconnect from %s", r->connection->remote_ip);
 }
 
+/**
+ * Be nice and clean up our mess
+ */
 void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
-    fprintf(stderr, "WS on_destroy_handler()\n");
-    fflush(stderr);
-
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
         apr_pool_destroy(trans->main_pool);