LP#1268619: websocket translator ; activity log; recipient removal
authorBill Erickson <berick@esilibrary.com>
Thu, 29 Nov 2012 21:41:33 +0000 (16:41 -0500)
committerBill Erickson <berick@esilibrary.com>
Sun, 4 May 2014 20:10:34 +0000 (16:10 -0400)
Signed-off-by: Bill Erickson <berick@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index 49cff49..898837e 100644 (file)
 
 #define MAX_THREAD_SIZE 64
 #define RECIP_BUF_SIZE 128
+#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
 typedef struct _osrfWebsocketTranslator {
     const WebSocketServer *server;
     apr_pool_t *main_pool; // standalone per-process pool
-    apr_pool_t *session_pool; // child of trans->main_pool; per-session
+    apr_pool_t *session_pool; // child of trans->main_pool; per-session TODO: use apache connection pool instead
     apr_hash_t *session_cache; 
     apr_thread_t *responder_thread;
     int client_connected;
@@ -259,6 +260,98 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
 }
 
 
+/** 
+ * for each inbound opensrf message:
+ * 1. Stamp the ingress
+ * 2. REQUEST: log it as activity
+ * 3. DISCONNECT: remove the cached recipient
+ * then re-string-ify for xmpp delivery
+ */
+
+static char* extract_inbound_messages(
+        const request_rec *r, 
+        const char* service, 
+        const char* thread, 
+        const char* recipient, 
+        const jsonObject *osrf_msg) {
+
+    int i;
+    int num_msgs = osrf_msg->size;
+    osrfMessage* msg;
+    osrfMessage* msg_list[num_msgs];
+
+    // here we do an extra json round-trip to get the data
+    // in a form osrf_message_deserialize can understand
+    char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+    osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+    free(osrf_msg_json);
+
+    // should we require the caller to always pass the service?
+    if (service == NULL) service = "";
+
+    for(i = 0; i < num_msgs; i++) {
+        msg = msg_list[i];
+        osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
+
+        switch(msg->m_type) {
+
+            case REQUEST: {
+                const jsonObject* params = msg->_params;
+                growing_buffer* act = buffer_init(128);
+                char* method = msg->method_name;
+                buffer_fadd(act, "[%s] [%s] %s %s", 
+                    r->connection->remote_ip, "", service, method);
+
+                const jsonObject* obj = NULL;
+                int i = 0;
+                const char* str;
+                int redactParams = 0;
+                while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+                    if(!strncmp(method, str, strlen(str))) {
+                        redactParams = 1;
+                        break;
+                    }
+                }
+                if(redactParams) {
+                    OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+                } else {
+                    i = 0;
+                    while((obj = jsonObjectGetIndex(params, i++))) {
+                        char* str = jsonObjectToJSON(obj);
+                        if( i == 1 )
+                            OSRF_BUFFER_ADD(act, " ");
+                        else
+                            OSRF_BUFFER_ADD(act, ", ");
+                        OSRF_BUFFER_ADD(act, str);
+                        free(str);
+                    }
+                }
+                osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+                buffer_free(act);
+                break;
+            }
+
+            case DISCONNECT:
+
+                if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
+
+                    // this clears the hash value, but the key/value memory 
+                    // will not be cleared until their pool is cleared.
+                    apr_hash_set(trans->session_cache, 
+                        apr_pstrdup(trans->session_pool, thread),
+                        APR_HASH_KEY_STRING, 
+                        apr_pstrdup(trans->session_pool, recipient));
+                }
+
+        }
+    }
+
+    return osrfMessageSerializeBatch(msg_list, num_msgs);
+}
+
+
+
+
 
 /**
  * Parse opensrf request and relay the request to the opensrf network.
@@ -271,7 +364,6 @@ static size_t CALLBACK on_message_handler(void *data,
 
     jsonObject *msg_wrapper = NULL; // free me
     const jsonObject *tmp_obj = NULL;
-    const jsonObject *osrf_msg_list = NULL;
     const jsonObject *osrf_msg = NULL;
     const char *service = NULL;
     const char *thread = NULL;
@@ -296,7 +388,7 @@ static size_t CALLBACK on_message_handler(void *data,
         return HTTP_BAD_REQUEST;
     }
 
-    osrf_msg_list = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
+    osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg");
 
     if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) 
         service = jsonObjectGetString(tmp_obj);
@@ -359,29 +451,14 @@ static size_t CALLBACK on_message_handler(void *data,
         "WS relaying message thread=%s, xid=%s, recipient=%s", 
             thread, osrfLogGetXid(), recipient);
 
-    // for each included message:
-    // 1. Stamp the ingress
-    // 2. REQUEST: log it as activity
-    // 3. DISCONNECT: remove the cached recipient
-
-    osrfMessage* msg;
-    osrfMessage* msg_ist[MAX_MSGS_PER_PACKET];
-    int num_msgs = osrfMessageDeserialize(osrf_msg_list, msg_list, MAX_MSGS_PER_PACKET);
-
-    msg_body = jsonObjectToJSON(osrf_msg_list);
+    msg_body = extract_inbound_messages(
+        r, service, thread, recipient, osrf_msg);
 
     transport_message *tmsg = message_init(
         msg_body, NULL, thread, recipient, NULL);
 
-    message_set_osrf_xid(tmsg, osrfLogGetXid());                                
-    client_send_message(osrf_handle, tmsg);                                   
-
-    
-    for (i = 0; i < osrf_msg_list->size; i++) {
-        osrf_msg = jsonObjectGetIndex(osrf_msg_list, i);
-        if (osrf_msg) {
-        }
-    }
+    message_set_osrf_xid(tmsg, osrfLogGetXid());
+    client_send_message(osrf_handle, tmsg);
 
 
     osrfLogClearXid();