*/
/**
- * Dumb websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
+ * websocket <-> opensrf gateway. Wrapped opensrf messages are extracted
* and relayed to the opensrf network. Responses are pulled from the opensrf
- * network and passed back to the client. No attempt is made to understand
- * the contents of the messages.
+ * network and passed back to the client. Messages are analyzed to determine
+ * when a connect/disconnect occurs, so that the cache of recipients can be
+ * properly managed. We also activity-log REQUEST messages.
*
* Messages to/from the websocket client take the following form:
* {
- * "service" : "opensrf.foo", // required for new sessions (inbound only)
+ * "service" : "opensrf.foo", // required
* "thread" : "123454321", // AKA thread. required for follow-up requests; max 64 chars.
* "log_xid" : "123..32", // optional log trace ID, max 64 chars;
* "osrf_msg" : {<osrf_msg>} // required
* network. The second thread collects responses from the opensrf network and
* relays them back to the websocket client.
*
- * The main thread reads from socket A (apache) and writes to socket B
- * (openesrf), while the responder thread reads from B and writes to A. The
- * apr data structures used are threadsafe. For now, no thread mutex's are
- * used.
+ * After the initial setup, all thread actions occur within a thread mutex.
+ * The desired affect is a non-threaded application that uses threads for
+ * the sole purpose of having one thread listening for incoming data, while
+ * a second thread listens for responses. When either thread awakens, it's
+ * the only thread in town until it goes back to sleep (i.e. listening on
+ * its socket for data).
*
* Note that with a "thread", which allows us to identify the opensrf session,
* the caller does not need to provide a recipient address. The "service" is
* only required to start a new opensrf session. After the sesession is
- * started, all future communication is based solely on the thread.
- *
- * We use jsonParseRaw and jsonObjectToJSONRaw since this service does not care
- * about the contents of the messages.
+ * started, all future communication is based solely on the thread. However,
+ * the "service" should be passed by the caller for all requests to ensure it
+ * is properly logged in the activity log.
*/
/**
#define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
typedef struct _osrfWebsocketTranslator {
+
+ /** Our handle for communicating with the caller */
const WebSocketServer *server;
- apr_pool_t *main_pool; // standalone per-process pool
- apr_pool_t *session_pool; // child of r->pool; per-session
+
+ /**
+ * Standalone, per-process APR pool. Primarily
+ * there for managing thread data, which lasts
+ * the duration of the process.
+ */
+ apr_pool_t *main_pool;
+
+ /**
+ * Map of thread => drone-xmpp-address. Maintaining this
+ * map internally means the caller never need know about
+ * internal XMPP addresses and the server doesn't have to
+ * verify caller-specified recipient addresses. It's
+ * all managed internally.
+ */
apr_hash_t *session_cache;
+
+ /**
+ * session_pool contains the key/value pairs stored in
+ * the session_cache. The pool is regularly destroyed
+ * and re-created to avoid long-term memory consumption
+ */
+ apr_pool_t *session_pool;
+
+ /**
+ * Thread responsible for collecting responses on the opensrf
+ * network and relaying them back to the caller
+ */
apr_thread_t *responder_thread;
+
+ /**
+ * All message handling code is wrapped in a thread mutex such
+ * that all actions (after the initial setup) are serialized
+ * to minimize the possibility of multi-threading snafus.
+ */
apr_thread_mutex_t *mutex;
+
+ /**
+ * True if a websocket client is currently connected
+ */
int client_connected;
+
+ /** OpenSRF jouter name */
char* osrf_router;
+
+ /** OpenSRF domain */
char* osrf_domain;
+
} osrfWebsocketTranslator;
static osrfWebsocketTranslator *trans = NULL;
static transport_client *osrf_handle = NULL;
static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-
static void clear_cached_recipient(const char* thread) {
apr_pool_t *pool = NULL;
// memory accumulates in the session_pool as sessions are cached then
// un-cached. Un-caching removes strings from the hash, but not the
- // pool itself. That only happens when the pool is destroyed. destroy
- // the session pool to clear any lingering memory
+ // pool itself. That only happens when the pool is destroyed. Here
+ // we destroy the session pool to clear any lingering memory, then
+ // re-create it for future caching.
apr_pool_destroy(trans->session_pool);
- // create a standalone pool for our translator data
if (apr_pool_create(&pool, NULL) != APR_SUCCESS) {
osrfLogError(OSRF_LOG_MARK, "WS Unable to create session_pool");
trans->session_pool = NULL;
}
-
void* osrf_responder_thread_main_body(transport_message *tmsg) {
osrfList *msg_list = NULL;
osrfListFree(msg_list);
if (!trans->client_connected) {
- // responses received after client disconnect are discarded
osrfLogDebug(OSRF_LOG_MARK,
"WS discarding response for thread=%s, xid=%s",
}
- // client is still connected; relay the messages to the client
+ // client is still connected.
+ // relay the response messages to the client
jsonObject *msg_wrapper = NULL;
char *msg_string = NULL;
jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
if (tmsg->is_error) {
- fprintf(stderr,
+ osrfLogError(OSRF_LOG_MARK,
"WS received jabber error message in response to thread=%s and xid=%s",
tmsg->thread, tmsg->osrf_xid);
- fflush(stderr);
jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
}
msg_string = jsonObjectToJSONRaw(msg_wrapper);
- // deliver the wrapped message json to the websocket client
+ // drop the JSON on the outbound wire
trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
(unsigned char*) msg_string, strlen(msg_string));
/**
- * Allocate the session cache and create the responder thread
+ * Connect to OpenSRF, create the main pool, responder thread
+ * session cache and session pool.
*/
int child_init(const WebSocketServer *server) {
//"WS connect from %s", r->connection->client_ip); // apache 2.4
if (!trans) {
+ // first connection
if (child_init(server) != APR_SUCCESS) {
return NULL;
}
}
// create a standalone pool for the session cache values
- // this pool will be destroyed and re-created regularly to
- // clear session memory
+ // this pool will be destroyed and re-created regularly
+ // to clear session memory
if (apr_pool_create(&pool, r->pool) != APR_SUCCESS) {
osrfLogError(OSRF_LOG_MARK, "WS Unable to create apr_pool");
return NULL;
/**
- * Release all memory allocated from the translator pool and kill the pool.
+ * Clear the session cache, release the session pool
*/
void CALLBACK on_disconnect_handler(
void *data, const WebSocketServer *server) {
osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
trans->client_connected = 0;
+
+ // ensure no errant session data is sticking around
+ apr_hash_clear(trans->session_cache);
- /*
- It's not necessary to destroy our session_pool, since
- it's a child of the apache request_rec pool, which is
- destroyed after client disconnect.
+ // strictly speaking, this pool will get destroyed when
+ // r->pool is destroyed, but it doesn't hurt to explicitly
+ // destroy it ourselves.
apr_pool_destroy(trans->session_pool);
- */
-
trans->session_pool = NULL;
request_rec *r = server->request(server);
+
osrfLogDebug(OSRF_LOG_MARK,
"WS disconnect from %s", r->connection->remote_ip);
//"WS disconnect from %s", r->connection->client_ip); // apache 2.4
if (trans) {
apr_thread_exit(trans->responder_thread, APR_SUCCESS);
apr_thread_mutex_destroy(trans->mutex);
+ apr_pool_destroy(trans->session_pool);
apr_pool_destroy(trans->main_pool);
}