#define MAX_THREAD_SIZE 64
#define RECIP_BUF_SIZE 128
+#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
typedef struct _osrfWebsocketTranslator {
const WebSocketServer *server;
apr_pool_t *main_pool; // standalone per-process pool
- apr_pool_t *session_pool; // child of trans->main_pool; per-session
+ apr_pool_t *session_pool; // child of r->pool; per-session
apr_hash_t *session_cache;
apr_thread_t *responder_thread;
+ apr_thread_mutex_t *mutex;
int client_connected;
char* osrf_router;
char* osrf_domain;
static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+static void clear_cached_recipient(const char* thread) {
+ apr_pool_t *pool = NULL;
- transport_message *tmsg;
- jsonObject *msg_wrapper;
- char *msg_string;
+ if (apr_hash_get(trans->session_cache, thread, APR_HASH_KEY_STRING)) {
- while (1) {
+ osrfLogDebug(OSRF_LOG_MARK, "WS removing cached recipient on disconnect");
- tmsg = client_recv(osrf_handle, -1);
+ // remove it from the hash
+ apr_hash_set(trans->session_cache, thread, APR_HASH_KEY_STRING, NULL);
- if (!tmsg) continue; // early exit on interrupt
-
- // discard responses received after client disconnect
- if (!trans->client_connected) {
- osrfLogDebug(OSRF_LOG_MARK,
- "WS discarding response for thread=%s, xid=%s",
- tmsg->thread, tmsg->osrf_xid);
- message_free(tmsg);
- continue;
+ if (apr_hash_count(trans->session_cache) == 0) {
+ osrfLogDebug(OSRF_LOG_MARK, "WS re-setting session_pool");
+
+ // memory accumulates in the session_pool as sessions are cached then
+ // un-cached. Un-caching removes strings from the hash, but not the
+ // pool itself. That only happens when the pool is destroyed. destroy
+ // the session pool to clear any lingering memory
+ apr_pool_destroy(trans->session_pool);
+
+ // create a standalone pool for our translator data
+ if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
+ trans->session_pool = NULL;
+ return;
+ }
+
+ trans->session_pool = pool;
}
+ }
+}
+
+
+
+void* osrf_responder_thread_main_body(transport_message *tmsg) {
+ osrfList *msg_list = NULL;
+ osrfMessage *one_msg = NULL;
+ int i;
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS received opensrf response for thread=%s, xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+
+ // first we need to perform some maintenance
+ msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+
+ for (i = 0; i < msg_list->size; i++) {
+ one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
osrfLogDebug(OSRF_LOG_MARK,
- "WS received opensrf response for thread=%s, xid=%s",
- tmsg->thread, tmsg->osrf_xid);
-
- // build the wrapper object
- msg_wrapper = jsonNewObject(NULL);
- jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
- jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
- jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
-
- if (tmsg->is_error) {
- fprintf(stderr,
- "WS received jabber error message in response to thread=%s and xid=%s",
- tmsg->thread, tmsg->osrf_xid);
- fflush(stderr);
- jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+ "WS returned response of type %d", one_msg->m_type);
+
+ /* if our client just successfully connected to an opensrf service,
+ cache the sender so that future calls on this thread will use
+ the correct recipient. */
+ if (one_msg && one_msg->m_type == STATUS) {
+
+
+ // only cache recipients if the client is still connected
+ if (trans->client_connected &&
+ one_msg->status_code == OSRF_STATUS_OK) {
+
+ if (!apr_hash_get(trans->session_cache,
+ tmsg->thread, APR_HASH_KEY_STRING)) {
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS caching sender thread=%s, sender=%s",
+ tmsg->thread, tmsg->sender);
+
+ apr_hash_set(trans->session_cache,
+ apr_pstrdup(trans->session_pool, tmsg->thread),
+ APR_HASH_KEY_STRING,
+ apr_pstrdup(trans->session_pool, tmsg->sender));
+ }
+
+ } else {
+
+ // connection timed out; clear the cached recipient
+ // regardless of whether the client is still connected
+ if (one_msg->status_code == OSRF_STATUS_TIMEOUT)
+ clear_cached_recipient(tmsg->thread);
+ }
}
+ }
+
+ // maintenance is done
+ osrfListFree(msg_list);
+
+ if (!trans->client_connected) {
+ // responses received after client disconnect are discarded
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS discarding response for thread=%s, xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+
+ return;
+ }
+
+
+ // client is still connected; relay the messages to the client
+ jsonObject *msg_wrapper = NULL;
+ char *msg_string = NULL;
+
+ // build the wrapper object
+ msg_wrapper = jsonNewObject(NULL);
+ jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+ jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+ jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+ if (tmsg->is_error) {
+ fprintf(stderr,
+ "WS received jabber error message in response to thread=%s and xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+ fflush(stderr);
+ jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+ }
+
+ msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+ // deliver the wrapped message json to the websocket client
+ trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
+ (unsigned char*) msg_string, strlen(msg_string));
+
+ free(msg_string);
+ jsonObjectFree(msg_wrapper);
+
+}
+
+/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
- msg_string = jsonObjectToJSONRaw(msg_wrapper);
+ transport_message *tmsg;
+ while (1) {
- // deliver the wrapped message json to the websocket client
- trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
- (unsigned char*) msg_string, strlen(msg_string));
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+ return NULL;
+ }
- // capture the true message sender
- // TODO: this will grow to add one entry per client session.
- // need to ensure that connected-sessions don't last /too/ long or create
- // a last-touched timeout mechanism to periodically remove old entries
- if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
+ // wait for a response
+ tmsg = client_recv(osrf_handle, -1);
- osrfLogDebug(OSRF_LOG_MARK,
- "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
+ if (!tmsg) continue; // early exit on interrupt
- apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, tmsg->thread),
- APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, tmsg->sender));
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return NULL;
}
- free(msg_string);
- jsonObjectFree(msg_wrapper);
+ osrf_responder_thread_main_body(tmsg);
message_free(tmsg);
}
return NULL;
}
+
+
/**
* Allocate the session cache and create the responder thread
*/
apr_pool_t *pool = NULL;
apr_thread_t *thread = NULL;
apr_threadattr_t *thread_attr = NULL;
+ apr_thread_mutex_t *mutex = NULL;
request_rec *r = server->request(server);
osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
+ trans->session_cache = apr_hash_make(pool);
+
+ if (trans->session_cache == NULL) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
+ return 1;
+ }
+
// Create the responder thread. Once created,
// it runs for the lifetime of this process.
if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
return 1;
}
+ if (apr_thread_mutex_create(
+ &mutex, APR_THREAD_MUTEX_UNNESTED,
+ trans->main_pool) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+ return 1;
+ }
+
+ trans->mutex = mutex;
+
return APR_SUCCESS;
}
}
}
- // create a standalone pool for the session cache values, which will be
- // destroyed on client disconnect.
- if (apr_pool_create(&pool, trans->main_pool) != APR_SUCCESS) {
+ // create a standalone pool for the session cache values
+ // this pool will be destroyed and re-created regularly to
+ // clear session memory
+ if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
return NULL;
}
trans->session_pool = pool;
- trans->session_cache = apr_hash_make(trans->session_pool);
-
- if (trans->session_cache == NULL) {
- osrfLogError(OSRF_LOG_MARK, "WS unable to create session cache");
- return NULL;
- }
-
trans->client_connected = 1;
return trans;
}
+/**
+ * for each inbound opensrf message:
+ * 1. Stamp the ingress
+ * 2. REQUEST: log it as activity
+ * 3. DISCONNECT: remove the cached recipient
+ * then re-string-ify for xmpp delivery
+ */
+
+static char* extract_inbound_messages(
+ const request_rec *r,
+ const char* service,
+ const char* thread,
+ const char* recipient,
+ const jsonObject *osrf_msg) {
+
+ int i;
+ int num_msgs = osrf_msg->size;
+ osrfMessage* msg;
+ osrfMessage* msg_list[num_msgs];
+
+ // here we do an extra json round-trip to get the data
+ // in a form osrf_message_deserialize can understand
+ char *osrf_msg_json = jsonObjectToJSON(osrf_msg);
+ osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs);
+ free(osrf_msg_json);
+
+ // should we require the caller to always pass the service?
+ if (service == NULL) service = "";
+
+ for(i = 0; i < num_msgs; i++) {
+ msg = msg_list[i];
+ osrfMessageSetIngress(msg, WEBSOCKET_TRANSLATOR_INGRESS);
+
+ switch(msg->m_type) {
+
+ case REQUEST: {
+ const jsonObject* params = msg->_params;
+ growing_buffer* act = buffer_init(128);
+ char* method = msg->method_name;
+ buffer_fadd(act, "[%s] [%s] %s %s",
+ r->connection->remote_ip, "", service, method);
+
+ const jsonObject* obj = NULL;
+ int i = 0;
+ const char* str;
+ int redactParams = 0;
+ while( (str = osrfStringArrayGetString(log_protect_arr, i++)) ) {
+ if(!strncmp(method, str, strlen(str))) {
+ redactParams = 1;
+ break;
+ }
+ }
+ if(redactParams) {
+ OSRF_BUFFER_ADD(act, " **PARAMS REDACTED**");
+ } else {
+ i = 0;
+ while((obj = jsonObjectGetIndex(params, i++))) {
+ char* str = jsonObjectToJSON(obj);
+ if( i == 1 )
+ OSRF_BUFFER_ADD(act, " ");
+ else
+ OSRF_BUFFER_ADD(act, ", ");
+ OSRF_BUFFER_ADD(act, str);
+ free(str);
+ }
+ }
+ osrfLogActivity(OSRF_LOG_MARK, "%s", act->buf);
+ buffer_free(act);
+ break;
+ }
+
+ case DISCONNECT:
+ clear_cached_recipient(thread);
+ break;
+ }
+ }
+
+ return osrfMessageSerializeBatch(msg_list, num_msgs);
+}
/**
* Parse opensrf request and relay the request to the opensrf network.
*/
-static size_t CALLBACK on_message_handler(void *data,
+static size_t on_message_handler_body(void *data,
const WebSocketServer *server, const int type,
unsigned char *buffer, const size_t buffer_size) {
const char *log_xid = NULL;
char *msg_body = NULL;
char *recipient = NULL;
+ int i;
if (buffer_size <= 0) return OK;
memcpy(buf, buffer, buffer_size);
buf[buffer_size] = '\0';
- msg_wrapper = jsonParseRaw(buf);
+ msg_wrapper = jsonParse(buf);
if (msg_wrapper == NULL) {
osrfLogWarning(OSRF_LOG_MARK, "WS Invalid JSON: %s", buf);
}
}
- // TODO: activity log entry? -- requires message analysis
osrfLogDebug(OSRF_LOG_MARK,
"WS relaying message thread=%s, xid=%s, recipient=%s",
thread, osrfLogGetXid(), recipient);
- msg_body = jsonObjectToJSONRaw(osrf_msg);
+ msg_body = extract_inbound_messages(
+ r, service, thread, recipient, osrf_msg);
transport_message *tmsg = message_init(
msg_body, NULL, thread, recipient, NULL);
- message_set_osrf_xid(tmsg, osrfLogGetXid());
- client_send_message(osrf_handle, tmsg);
- osrfLogClearXid();
+ message_set_osrf_xid(tmsg, osrfLogGetXid());
+ client_send_message(osrf_handle, tmsg);
+
+ osrfLogClearXid();
message_free(tmsg);
jsonObjectFree(msg_wrapper);
free(msg_body);
return OK;
}
+static size_t CALLBACK on_message_handler(void *data,
+ const WebSocketServer *server, const int type,
+ unsigned char *buffer, const size_t buffer_size) {
+
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return 1; // TODO: map to apr_status_t value?
+ }
+
+ apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
+
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return 1;
+ }
+
+ return stat;
+}
+
/**
* Release all memory allocated from the translator pool and kill the pool.
osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
trans->client_connected = 0;
- apr_hash_clear(trans->session_cache);
+ /*
+ It's not necessary to destroy our session_pool, since
+ it's a child of the apache request_rec pool, which is
+ destroyed after client disconnect.
apr_pool_destroy(trans->session_pool);
+ */
+
trans->session_pool = NULL;
- trans->session_cache = NULL;
request_rec *r = server->request(server);
osrfLogDebug(OSRF_LOG_MARK,
void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
if (trans) {
apr_thread_exit(trans->responder_thread, APR_SUCCESS);
+ apr_thread_mutex_destroy(trans->mutex);
apr_pool_destroy(trans->main_pool);
}