From dbef1f350feda4c6dadaf1c8a6ac3b823daa8e07 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Thu, 29 Nov 2012 16:41:33 -0500 Subject: [PATCH] LP#1268619: websocket translator ; activity log; recipient removal Signed-off-by: Bill Erickson --- src/gateway/osrf_websocket_translator.c | 121 ++++++++++++++++++++++++++------ 1 file changed, 99 insertions(+), 22 deletions(-) diff --git a/src/gateway/osrf_websocket_translator.c b/src/gateway/osrf_websocket_translator.c index 49cff49..898837e 100644 --- a/src/gateway/osrf_websocket_translator.c +++ b/src/gateway/osrf_websocket_translator.c @@ -67,11 +67,12 @@ #define MAX_THREAD_SIZE 64 #define RECIP_BUF_SIZE 128 +#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1" typedef struct _osrfWebsocketTranslator { const WebSocketServer *server; apr_pool_t *main_pool; // standalone per-process pool - apr_pool_t *session_pool; // child of trans->main_pool; per-session + apr_pool_t *session_pool; // child of trans->main_pool; per-session TODO: use apache connection pool instead apr_hash_t *session_cache; apr_thread_t *responder_thread; int client_connected; @@ -259,6 +260,98 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) { } +/** + * for each inbound opensrf message: + * 1. Stamp the ingress + * 2. REQUEST: log it as activity + * 3. DISCONNECT: remove the cached recipient + * then re-string-ify for xmpp delivery + */ + +static char* extract_inbound_messages( + const request_rec *r, + const char* service, + const char* thread, + const char* recipient, + const jsonObject *osrf_msg) { + + int i; + int num_msgs = osrf_msg->size; + osrfMessage* msg; + osrfMessage* msg_list[num_msgs]; + + // here we do an extra json round-trip to get the data + // in a form osrf_message_deserialize can understand + char *osrf_msg_json = jsonObjectToJSON(osrf_msg); + osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); + free(osrf_msg_json); + + // should we require the caller to always pass the service? + if (service == NULL) service = ""; + + for(i = 0; i < num_msgs; i++) { + msg = msg_list[i]; + osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS); + + switch(msg->m_type) { + + case REQUEST: { + const jsonObject* params = msg->_params; + growing_buffer* act = buffer_init(128); + char* method = msg->method_name; + buffer_fadd(act, "[%s] [%s] %s %s", + r->connection->remote_ip, "", service, method); + + const jsonObject* obj = NULL; + int i = 0; + const char* str; + int redactParams = 0; + while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) { + if(!strncmp(method, str, strlen(str))) { + redactParams = 1; + break; + } + } + if(redactParams) { + OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**"); + } else { + i = 0; + while((obj = jsonObjectGetIndex(params, i++))) { + char* str = jsonObjectToJSON(obj); + if( i == 1 ) + OSRF_BUFFER_ADD(act, " "); + else + OSRF_BUFFER_ADD(act, ", "); + OSRF_BUFFER_ADD(act, str); + free(str); + } + } + osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf); + buffer_free(act); + break; + } + + case DISCONNECT: + + if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) { + + // this clears the hash value, but the key/value memory + // will not be cleared until their pool is cleared. + apr_hash_set(trans->session_cache, + apr_pstrdup(trans->session_pool, thread), + APR_HASH_KEY_STRING, + apr_pstrdup(trans->session_pool, recipient)); + } + + } + } + + return osrfMessageSerializeBatch(msg_list, num_msgs); +} + + + + /** * Parse opensrf request and relay the request to the opensrf network. @@ -271,7 +364,6 @@ static size_t CALLBACK on_message_handler(void *data, jsonObject *msg_wrapper = NULL; // free me const jsonObject *tmp_obj = NULL; - const jsonObject *osrf_msg_list = NULL; const jsonObject *osrf_msg = NULL; const char *service = NULL; const char *thread = NULL; @@ -296,7 +388,7 @@ static size_t CALLBACK on_message_handler(void *data, return HTTP_BAD_REQUEST; } - osrf_msg_list = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg"); + osrf_msg = jsonObjectGetKeyConst(msg_wrapper, "osrf_msg"); if (tmp_obj = jsonObjectGetKeyConst(msg_wrapper, "service")) service = jsonObjectGetString(tmp_obj); @@ -359,29 +451,14 @@ static size_t CALLBACK on_message_handler(void *data, "WS relaying message thread=%s, xid=%s, recipient=%s", thread, osrfLogGetXid(), recipient); - // for each included message: - // 1. Stamp the ingress - // 2. REQUEST: log it as activity - // 3. DISCONNECT: remove the cached recipient - - osrfMessage* msg; - osrfMessage* msg_ist[MAX_MSGS_PER_PACKET]; - int num_msgs = osrfMessageDeserialize(osrf_msg_list, msg_list, MAX_MSGS_PER_PACKET); - - msg_body = jsonObjectToJSON(osrf_msg_list); + msg_body = extract_inbound_messages( + r, service, thread, recipient, osrf_msg); transport_message *tmsg = message_init( msg_body, NULL, thread, recipient, NULL); - message_set_osrf_xid(tmsg, osrfLogGetXid()); - client_send_message(osrf_handle, tmsg); - - - for (i = 0; i < osrf_msg_list->size; i++) { - osrf_msg = jsonObjectGetIndex(osrf_msg_list, i); - if (osrf_msg) { - } - } + message_set_osrf_xid(tmsg, osrfLogGetXid()); + client_send_message(osrf_handle, tmsg); osrfLogClearXid(); -- 2.11.0