#define MAX_THREAD_SIZE 64
#define RECIP_BUF_SIZE 128
-static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-static transport_client *osrf_handle = NULL;
typedef struct _osrfWebsocketTranslator {
const WebSocketServer *server;
- apr_pool_t *main_pool; // standline per-process pool
+ apr_pool_t *main_pool; // standalone per-process pool
apr_pool_t *session_pool; // child of trans->main_pool; per-session
apr_hash_t *session_cache;
apr_thread_t *responder_thread;
} osrfWebsocketTranslator;
static osrfWebsocketTranslator *trans = NULL;
+static transport_client *osrf_handle = NULL;
+static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
/**
void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
request_rec *r = trans->server->request(trans->server);
+ transport_message *tmsg;
jsonObject *msg_wrapper;
char *msg_string;
while (1) {
- transport_message *msg = client_recv(osrf_handle, -1);
- if (!msg) continue; // early exit on interrupt
+ tmsg = client_recv(osrf_handle, -1);
+ if (!tmsg) continue; // early exit on interrupt
// discard responses received after client disconnect
if (!trans->client_connected) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
"WS discarding response for thread=%s, xid=%s",
- msg->thread, msg->osrf_xid);
- message_free(msg);
+ tmsg->thread, tmsg->osrf_xid);
+ message_free(tmsg);
continue;
}
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
"WS received opensrf response for thread=%s, xid=%s",
- msg->thread, msg->osrf_xid);
+ tmsg->thread, tmsg->osrf_xid);
// build the wrapper object
msg_wrapper = jsonNewObject(NULL);
- jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(msg->thread));
- jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(msg->osrf_xid));
- jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(msg->body));
+ jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+ jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+ jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
- if (msg->is_error) {
+ if (tmsg->is_error) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
"WS received jabber error message in response to thread=%s and xid=%s",
- msg->thread, msg->osrf_xid);
+ tmsg->thread, tmsg->osrf_xid);
jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
}
// capture the true message sender
// TODO: this will grow to add one entry per client session.
- // need a last-touched timeout mechanism to periodically remove old entries
- if (!apr_hash_get(trans->session_cache, msg->thread, APR_HASH_KEY_STRING)) {
+ // need to ensure that connected-sessions don't last /too/ long or create
+ // a last-touched timeout mechanism to periodically remove old entries
+ if (!apr_hash_get(trans->session_cache, tmsg->thread, APR_HASH_KEY_STRING)) {
ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
- "WS caching sender thread=%s, sender=%s", msg->thread, msg->sender);
+ "WS caching sender thread=%s, sender=%s", tmsg->thread, tmsg->sender);
apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, msg->thread),
+ apr_pstrdup(trans->session_pool, tmsg->thread),
APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, msg->sender));
+ apr_pstrdup(trans->session_pool, tmsg->sender));
}
free(msg_string);
jsonObjectFree(msg_wrapper);
- message_free(msg);
+ message_free(tmsg);
}
return NULL;
trans->osrf_router = osrfConfigGetValue(NULL, "/router_name");
trans->osrf_domain = osrfConfigGetValue(NULL, "/domain");
- // Create the responder thread. Once created, it runs for the lifetime
- // of this process.
+ // Create the responder thread. Once created,
+ // it runs for the lifetime of this process.
if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
(apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
(apr_thread_create(&thread, thread_attr,
trans->session_pool = pool;
trans->session_cache = apr_hash_make(trans->session_pool);
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS created new pool %x", trans->session_pool);
-
if (trans->session_cache == NULL) {
ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
"WS unable to create session cache");
return NULL;
}
- ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r,
- "WS created new hash %x", trans->session_cache);
-
trans->client_connected = 1;
return trans;
}
log_xid = jsonObjectGetString(tmp_obj);
if (log_xid) {
+
// use the caller-provide log trace id
if (strlen(log_xid) > MAX_THREAD_SIZE) {
ap_log_rerror(APLOG_MARK, APLOG_NOTICE,
0, r, "WS log_xid exceeds max length");
return HTTP_BAD_REQUEST;
}
- osrfLogSetXid(log_xid); // TODO: make with with non-client
+
+ // TODO: make this work with non-client and make this call accept
+ // const char*'s. casting to (char*) for now to silence warnings.
+ osrfLogSetXid((char*) log_xid);
+
} else {
// generate a new log trace id for this relay
osrfLogMkXid();
osrfLogClearXid();
message_free(tmsg);
- free(msg_wrapper);
+ jsonObjectFree(msg_wrapper);
free(msg_body);
return OK;
"WS disconnect from %s", r->connection->remote_ip);
}
+/**
+ * Be nice and clean up our mess
+ */
void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
- fprintf(stderr, "WS on_destroy_handler()\n");
- fflush(stderr);
-
if (trans) {
apr_thread_exit(trans->responder_thread, APR_SUCCESS);
apr_pool_destroy(trans->main_pool);