#include <opensrf/transport_client.h>
-#define MESSAGE_LIST_HEAD 1
-#define MESSAGE_LIST_ITEM 2
-
-// ---------------------------------------------------------------------------
-// Represents a node in a linked list. The node holds a pointer to the next
-// node (which is null unless set), a pointer to a transport_message, and
-// and a type variable (which is not really curently necessary).
-// ---------------------------------------------------------------------------
-struct message_list_struct {
- struct message_list_struct* next;
- transport_message* message;
- int type;
-};
-typedef struct message_list_struct transport_message_list;
-typedef struct message_list_struct transport_message_node;
-
static void client_message_handler( void* client, transport_message* msg );
//int main( int argc, char** argv );
/* build and clear the client object */
transport_client* client = safe_malloc( sizeof( transport_client) );
- /* build and clear the message list */
- client->m_list = safe_malloc( sizeof( transport_message_list ) );
-
- client->m_list->next = NULL;
- client->m_list->message = NULL;
- client->m_list->type = MESSAGE_LIST_HEAD;
-
- /* build the session */
+ /* start with an empty message queue */
+ client->msg_q_head = NULL;
+ client->msg_q_tail = NULL;
+ /* build the session */
client->session = init_transport( server, port, unix_path, client, component );
client->session->message_callback = client_message_handler;
transport_message* client_recv( transport_client* client, int timeout ) {
if( client == NULL ) { return NULL; }
- transport_message_node* node;
- transport_message* msg;
+ int error = 0; /* boolean */
+ if( NULL == client->msg_q_head ) {
- /* see if there are any message in the messages queue */
- if( client->m_list->next != NULL ) {
- /* pop off the first one... */
- node = client->m_list->next;
- client->m_list->next = node->next;
- msg = node->message;
- free( node );
- return msg;
- }
-
- if( timeout == -1 ) { /* wait potentially forever for data to arrive */
+ /* no messaage available? try to get one */
+ if( timeout == -1 ) { /* wait potentially forever for data to arrive */
- while( client->m_list->next == NULL ) {
- // if( ! session_wait( client->session, -1 ) ) {
int x;
- if( (x = session_wait( client->session, -1 )) ) {
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
- client->error = 1;
- return NULL;
- }
+ do {
+ if( (x = session_wait( client->session, -1 )) ) {
+ osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
+ error = 1;
+ break;
+ }
+ } while( client->msg_q_head == NULL );
+
+ } else { /* loop up to 'timeout' seconds waiting for data to arrive */
+
+ /* This loop assumes that a time_t is denominated in seconds -- not */
+ /* guaranteed by Standard C, but a fair bet for Linux or UNIX */
+
+ time_t start = time(NULL);
+ time_t remaining = (time_t) timeout;
+
+ int wait_ret;
+ do {
+ if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
+ error = 1;
+ osrfLogDebug(OSRF_LOG_MARK,
+ "session_wait returned failure code %d: setting error=1\n", wait_ret);
+ break;
+ }
+
+ remaining -= time(NULL) - start;
+ } while( NULL == client->msg_q_head && remaining > 0 );
}
-
- } else { /* wait at most timeout seconds */
-
+ }
- /* if not, loop up to 'timeout' seconds waiting for data to arrive */
- time_t start = time(NULL);
- time_t remaining = (time_t) timeout;
-
- int counter = 0;
-
- int wait_ret;
- while( client->m_list->next == NULL && remaining >= 0 ) {
-
- if( (wait_ret= session_wait( client->session, remaining)) ) {
- client->error = 1;
- osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret);
- return NULL;
- }
-
- ++counter;
-
-#ifdef _ROUTER
- // session_wait returns -1 if there is no more data and we're a router
- if( remaining == 0 ) { // && wait_ret == -1 ) {
- break;
- }
-#else
- if( remaining == 0 ) // or infinite loop
- break;
-#endif
-
- remaining -= (int) (time(NULL) - start);
- }
-
+ transport_message* msg = NULL;
+
+ if( error )
+ client->error = 1;
+ else if( client->msg_q_head != NULL ) {
+ /* got message(s); dequeue the oldest one */
+ msg = client->msg_q_head;
+ client->msg_q_head = msg->next;
+ msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */
+ if( NULL == client->msg_q_head )
+ client->msg_q_tail = NULL;
}
- /* again, see if there are any messages in the message queue */
- if( client->m_list->next != NULL ) {
- /* pop off the first one... */
- node = client->m_list->next;
- client->m_list->next = node->next;
- msg = node->message;
- free( node );
- return msg;
-
- } else {
- return NULL;
- }
+ return msg;
}
// ---------------------------------------------------------------------------
// This is the message handler required by transport_session. This handler
-// takes all incoming messages and puts them into the back of a linked list
-// of messages.
+// takes an incoming message and adds it to the tail of a message queue.
// ---------------------------------------------------------------------------
static void client_message_handler( void* client, transport_message* msg ){
transport_client* cli = (transport_client*) client;
- transport_message_node* node = safe_malloc( sizeof( transport_message_node) );
- node->next = NULL;
- node->type = MESSAGE_LIST_ITEM;
- node->message = msg;
-
-
- /* find the last node and put this onto the end */
- transport_message_node* tail = cli->m_list;
- transport_message_node* current = tail->next;
-
- while( current != NULL ) {
- tail = current;
- current = current->next;
+ /* add the new message to the tail of the queue */
+ if( NULL == cli->msg_q_head )
+ cli->msg_q_tail = cli->msg_q_head = msg;
+ else {
+ cli->msg_q_tail->next = msg;
+ cli->msg_q_tail = msg;
}
- tail->next = node;
+ msg->next = NULL;
}
if(client == NULL) return 0;
session_free( client->session );
- transport_message_node* current = client->m_list->next;
- transport_message_node* next;
+ transport_message* current = client->msg_q_head;
+ transport_message* next;
/* deallocate the list of messages */
while( current != NULL ) {
next = current->next;
- message_free( current->message );
- free(current);
+ message_free( current );
current = next;
}
- free( client->m_list );
free( client );
return 1;
}
#include <opensrf/transport_message.h>
-
// ---------------------------------------------------------------------------------
// Allocates and initializes a new transport_message
// ---------------------------------------------------------------------------------
msg->error_code = 0;
msg->broadcast = 0;
msg->msg_xml = NULL;
+ msg->next = NULL;
return msg;
}
new_msg->error_code = 0;
new_msg->broadcast = 0;
new_msg->msg_xml = NULL;
+ new_msg->next = NULL;
xmlKeepBlanksDefault(0);
xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 );
new_msg->body = strdup("");
new_msg->msg_xml = xmlDocToString(msg_doc, 0);
- xmlFreeDoc(msg_doc);
- xmlCleanupParser();
+ xmlFreeDoc(msg_doc);
+ xmlCleanupParser();
return new_msg;
}
void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ) {
- if(!msg) return;
- if( osrf_xid )
- msg->osrf_xid = strdup(osrf_xid);
- else msg->osrf_xid = strdup("");
+ if( msg ) {
+ if( msg->osrf_xid ) free( msg->osrf_xid );
+ msg->osrf_xid = strdup( osrf_xid ? osrf_xid : "" );
+ }
}
void message_set_router_info( transport_message* msg, const char* router_from,
const char* router_to, const char* router_class, const char* router_command,
int broadcast_enabled ) {
- if( !msg ) return;
-
- if(router_from)
- msg->router_from = strdup(router_from);
- else
- msg->router_from = strdup("");
-
- if(router_to)
- msg->router_to = strdup(router_to);
- else
- msg->router_to = strdup("");
-
- if(router_class)
- msg->router_class = strdup(router_class);
- else
- msg->router_class = strdup("");
-
- if(router_command)
- msg->router_command = strdup(router_command);
- else
- msg->router_command = strdup("");
-
- msg->broadcast = broadcast_enabled;
+ if( msg ) {
- if( msg->router_from == NULL || msg->router_to == NULL ||
- msg->router_class == NULL || msg->router_command == NULL )
- osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" );
-
- return;
-}
+ /* free old values, if any */
+ if( msg->router_from ) free( msg->router_from );
+ if( msg->router_to ) free( msg->router_to );
+ if( msg->router_class ) free( msg->router_class );
+ if( msg->router_command ) free( msg->router_command );
+ /* install new values */
+ msg->router_from = strdup( router_from ? router_from : "" );
+ msg->router_to = strdup( router_to ? router_to : "" );
+ msg->router_class = strdup( router_class ? router_class : "" );
+ msg->router_command = strdup( router_command ? router_command : "" );
+ msg->broadcast = broadcast_enabled;
-
-/* encodes the message for traversal */
-int message_prepare_xml( transport_message* msg ) {
- if( !msg ) return 0;
- if( msg->msg_xml == NULL )
- msg->msg_xml = message_to_xml( msg );
- return 1;
+ if( msg->router_from == NULL || msg->router_to == NULL ||
+ msg->router_class == NULL || msg->router_command == NULL )
+ osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" );
+ }
}
free(msg);
return 1;
}
-
+
+
// ---------------------------------------------------------------------------------
-// Allocates a char* holding the XML representation of this jabber message
+// Encodes the message as XML for traversal; stores in msg_xml member
// ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg ) {
-
- //int bufsize;
- //xmlChar* xmlbuf;
- //char* encoded_body;
+int message_prepare_xml( transport_message* msg ) {
+ if( !msg ) return 0;
+ if( msg->msg_xml ) return 1; /* already done */
+
xmlNodePtr message_node;
xmlNodePtr body_node;
xmlNodePtr thread_node;
xmlKeepBlanksDefault(0);
- if( ! msg ) {
- osrfLogWarning(OSRF_LOG_MARK, "Passing NULL message to message_to_xml()");
- return NULL;
- }
-
doc = xmlReadDoc( BAD_CAST "<message/>", NULL, NULL, XML_PARSE_NSCLEAN );
message_node = xmlDocGetRootElement(doc);
xmlBufferPtr xmlbuf = xmlBufferCreate();
xmlNodeDump( xmlbuf, doc, xmlDocGetRootElement(doc), 0, 0);
- char* xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+ msg->msg_xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+
xmlBufferFree(xmlbuf);
xmlFreeDoc( doc );
xmlCleanupParser();
- return xml;
+
+ return 1;
}