apr_pool_t *session_pool; // child of trans->main_pool; per-session
apr_hash_t *session_cache;
apr_thread_t *responder_thread;
+ apr_thread_mutex_t *mutex;
int client_connected;
char* osrf_router;
char* osrf_domain;
static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer
-/**
- * Responder thread main body.
- * Collects responses from the opensrf network and relays them to the
- * websocket caller.
- */
-void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
+void* osrf_responder_thread_main_body(transport_message *tmsg) {
- transport_message *tmsg;
jsonObject *msg_wrapper;
osrfList *msg_list;
osrfMessage *one_msg;
char *msg_string;
int i;
- while (1) {
- // TODO: RELEASE THREAD MUTEX
-
- tmsg = client_recv(osrf_handle, -1);
-
- if (!tmsg) continue; // early exit on interrupt
+ // discard responses received after client disconnect
+ if (!trans->client_connected) {
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS discarding response for thread=%s, xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+ return;
+ }
- // TODO: THREAD MUTEX LOCK STARTS
-
- // discard responses received after client disconnect
- if (!trans->client_connected) {
- osrfLogDebug(OSRF_LOG_MARK,
- "WS discarding response for thread=%s, xid=%s",
- tmsg->thread, tmsg->osrf_xid);
- message_free(tmsg);
- continue;
- }
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS received opensrf response for thread=%s, xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+
+ // build the wrapper object
+ msg_wrapper = jsonNewObject(NULL);
+ jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
+ jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
+ jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
+
+ if (tmsg->is_error) {
+ fprintf(stderr,
+ "WS received jabber error message in response to thread=%s and xid=%s",
+ tmsg->thread, tmsg->osrf_xid);
+ fflush(stderr);
+ jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+ }
- osrfLogDebug(OSRF_LOG_MARK,
- "WS received opensrf response for thread=%s, xid=%s",
- tmsg->thread, tmsg->osrf_xid);
-
- // build the wrapper object
- msg_wrapper = jsonNewObject(NULL);
- jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread));
- jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid));
- jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body));
-
- if (tmsg->is_error) {
- fprintf(stderr,
- "WS received jabber error message in response to thread=%s and xid=%s",
- tmsg->thread, tmsg->osrf_xid);
- fflush(stderr);
- jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1));
+ msg_string = jsonObjectToJSONRaw(msg_wrapper);
+
+ // deliver the wrapped message json to the websocket client
+ trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
+ (unsigned char*) msg_string, strlen(msg_string));
+
+ msg_list = osrfMessageDeserialize(tmsg->body, NULL);
+ for (i = 0; i < msg_list->size; i++) {
+ one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+
+ /* if our client just successfully connected to an opensrf service,
+ cache the sender so that future calls on this thread will use
+ the correct recipient.
+ TODO: this cache will grow to add one entry per connected client
+ session. Even when entries are removed, they are not cleaned up
+ until the session_pool is destroyed. We need to ensure that client
+ sessions don't last too long and/or create a last-touched timeout
+ mechanism to periodically remove old entries. */
+ if (one_msg && one_msg->m_type == STATUS &&
+ one_msg->status_code == OSRF_STATUS_OK) {
+
+ if (!apr_hash_get(trans->session_cache,
+ tmsg->thread, APR_HASH_KEY_STRING)) {
+
+ osrfLogDebug(OSRF_LOG_MARK,
+ "WS caching sender thread=%s, sender=%s",
+ tmsg->thread, tmsg->sender);
+
+ apr_hash_set(trans->session_cache,
+ apr_pstrdup(trans->session_pool, tmsg->thread),
+ APR_HASH_KEY_STRING,
+ apr_pstrdup(trans->session_pool, tmsg->sender));
+ }
}
+ }
- msg_string = jsonObjectToJSONRaw(msg_wrapper);
+ free(msg_string);
+ osrfListFree(msg_list);
+ jsonObjectFree(msg_wrapper);
+}
- // deliver the wrapped message json to the websocket client
- trans->server->send(trans->server, MESSAGE_TYPE_TEXT,
- (unsigned char*) msg_string, strlen(msg_string));
+/**
+ * Responder thread main body.
+ * Collects responses from the opensrf network and relays them to the
+ * websocket caller.
+ */
+void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *data) {
- msg_list = osrfMessageDeserialize(tmsg->body, NULL);
- for (i = 0; i < msg_list->size; i++) {
- one_msg = OSRF_LIST_GET_INDEX(msg_list, i);
+ transport_message *tmsg;
+ while (1) {
- /* if our client just successfully connected to an opensrf service,
- cache the sender so that future calls on this thread will use
- the correct recipient.
- TODO: this cache will grow to add one entry per client session.
- Even when entries are removed, they are not cleaned up until the
- session_pool is destroyed. We need to ensure that client sessions
- don't last too long and/or create a last-touched timeout mechanism
- to periodically remove old entries. */
- if (one_msg && one_msg->m_type == STATUS &&
- one_msg->status_code == OSRF_STATUS_OK) {
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+ return NULL;
+ }
- if (!apr_hash_get(trans->session_cache,
- tmsg->thread, APR_HASH_KEY_STRING)) {
+ // wait for a response
+ tmsg = client_recv(osrf_handle, -1);
- osrfLogDebug(OSRF_LOG_MARK,
- "WS caching sender thread=%s, sender=%s",
- tmsg->thread, tmsg->sender);
+ if (!tmsg) continue; // early exit on interrupt
- apr_hash_set(trans->session_cache,
- apr_pstrdup(trans->session_pool, tmsg->thread),
- APR_HASH_KEY_STRING,
- apr_pstrdup(trans->session_pool, tmsg->sender));
- }
- }
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return NULL;
}
- free(msg_string);
- osrfListFree(msg_list);
- jsonObjectFree(msg_wrapper);
+ osrf_responder_thread_main_body(tmsg);
message_free(tmsg);
}
return NULL;
}
+
+
/**
* Allocate the session cache and create the responder thread
*/
apr_pool_t *pool = NULL;
apr_thread_t *thread = NULL;
apr_threadattr_t *thread_attr = NULL;
+ apr_thread_mutex_t *mutex = NULL;
request_rec *r = server->request(server);
osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
return 1;
}
+ if (apr_thread_mutex_create(
+ &mutex, APR_THREAD_MUTEX_UNNESTED,
+ trans->main_pool) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS unable to create thread mutex");
+ return 1;
+ }
+
+ trans->mutex = mutex;
+
return APR_SUCCESS;
}
return osrfMessageSerializeBatch(msg_list, num_msgs);
}
-
-
-
-
/**
* Parse opensrf request and relay the request to the opensrf network.
*/
-static size_t CALLBACK on_message_handler(void *data,
+static size_t on_message_handler_body(void *data,
const WebSocketServer *server, const int type,
unsigned char *buffer, const size_t buffer_size) {
return OK;
}
+static size_t CALLBACK on_message_handler(void *data,
+ const WebSocketServer *server, const int type,
+ unsigned char *buffer, const size_t buffer_size) {
+
+ if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return 1; // TODO: map to apr_status_t value?
+ }
+
+ apr_status_t stat = on_message_handler_body(data, server, type, buffer, buffer_size);
+
+ if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+ osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+ return 1;
+ }
+
+ return stat;
+}
+
/**
* Release all memory allocated from the translator pool and kill the pool.