From: scottmk Date: Mon, 5 Jan 2009 17:36:42 +0000 (+0000) Subject: This update restructures the mechanism for queueing incoming transport X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=af61b0530efd864ec78367265a97baa1a9439752;p=opensrf%2Fbjwebb.git This update restructures the mechanism for queueing incoming transport messages. In addition, the update to transport_client.c rearranges the logic a bit in client_recv(). 1. A transport_message now carries a pointer to be used in a linked list. It is initialized to NULL when the message is created. We no longer use a separately allocated list node to carry the message. 2. The queue of transport_messages no longer starts with a dummy node. 3. Instead of finding the tail of the queue by traversing the list from the head, we maintain a separate pointer to the tail node. Thus the enqueuing operation occurs in constant time instead of linear time. 4. In client_recv: we now have the dequeueing code in a single place, instead of duplicating it. 5. In client_recv: I eliminated some conditional compilation that made no real difference, since both branches of the #ifdef were effectively identical. 6. In client_recv: changed both loops from while loops to do-while loops, since in each case we want to perform at least one iteration. git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1570 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/include/opensrf/transport_client.h b/include/opensrf/transport_client.h index 9b375d6..924605f 100644 --- a/include/opensrf/transport_client.h +++ b/include/opensrf/transport_client.h @@ -12,7 +12,8 @@ struct message_list_struct; // Our client struct. We manage a list of messages and a controlling session // --------------------------------------------------------------------------- struct transport_client_struct { - struct message_list_struct* m_list; + transport_message* msg_q_head; + transport_message* msg_q_tail; transport_session* session; int error; }; diff --git a/include/opensrf/transport_message.h b/include/opensrf/transport_message.h index 6cf3a49..8508326 100644 --- a/include/opensrf/transport_message.h +++ b/include/opensrf/transport_message.h @@ -34,6 +34,7 @@ struct transport_message_struct { int error_code; int broadcast; char* msg_xml; /* the entire message as XML complete with entity encoding */ + struct transport_message_struct* next; }; typedef struct transport_message_struct transport_message; @@ -55,13 +56,6 @@ void message_set_router_info( transport_message* msg, const char* router_from, void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ); // --------------------------------------------------------------------------------- -// Formats the Jabber message as XML for encoding. -// Returns NULL on error -// --------------------------------------------------------------------------------- -char* message_to_xml( const transport_message* msg ); - - -// --------------------------------------------------------------------------------- // Call this to create the encoded XML for sending on the wire. // This is a seperate function so that encoding will not necessarily have // to happen on all messages (i.e. typically only occurs outbound messages). @@ -75,11 +69,6 @@ int message_prepare_xml( transport_message* msg ); int message_free( transport_message* msg ); // --------------------------------------------------------------------------------- -// Prepares the shared XML document -// --------------------------------------------------------------------------------- -//int message_init_xml(); - -// --------------------------------------------------------------------------------- // Determines the username of a Jabber ID. This expects a pre-allocated char // array for the return value. // --------------------------------------------------------------------------------- diff --git a/src/libopensrf/transport_client.c b/src/libopensrf/transport_client.c index 1f82702..be83eb7 100644 --- a/src/libopensrf/transport_client.c +++ b/src/libopensrf/transport_client.c @@ -1,21 +1,5 @@ #include -#define MESSAGE_LIST_HEAD 1 -#define MESSAGE_LIST_ITEM 2 - -// --------------------------------------------------------------------------- -// Represents a node in a linked list. The node holds a pointer to the next -// node (which is null unless set), a pointer to a transport_message, and -// and a type variable (which is not really curently necessary). -// --------------------------------------------------------------------------- -struct message_list_struct { - struct message_list_struct* next; - transport_message* message; - int type; -}; -typedef struct message_list_struct transport_message_list; -typedef struct message_list_struct transport_message_node; - static void client_message_handler( void* client, transport_message* msg ); //int main( int argc, char** argv ); @@ -69,15 +53,11 @@ transport_client* client_init( const char* server, int port, const char* unix_pa /* build and clear the client object */ transport_client* client = safe_malloc( sizeof( transport_client) ); - /* build and clear the message list */ - client->m_list = safe_malloc( sizeof( transport_message_list ) ); - - client->m_list->next = NULL; - client->m_list->message = NULL; - client->m_list->type = MESSAGE_LIST_HEAD; - - /* build the session */ + /* start with an empty message queue */ + client->msg_q_head = NULL; + client->msg_q_tail = NULL; + /* build the session */ client->session = init_transport( server, port, unix_path, client, component ); client->session->message_callback = client_message_handler; @@ -116,85 +96,63 @@ int client_send_message( transport_client* client, transport_message* msg ) { transport_message* client_recv( transport_client* client, int timeout ) { if( client == NULL ) { return NULL; } - transport_message_node* node; - transport_message* msg; + int error = 0; /* boolean */ + if( NULL == client->msg_q_head ) { - /* see if there are any message in the messages queue */ - if( client->m_list->next != NULL ) { - /* pop off the first one... */ - node = client->m_list->next; - client->m_list->next = node->next; - msg = node->message; - free( node ); - return msg; - } - - if( timeout == -1 ) { /* wait potentially forever for data to arrive */ + /* no messaage available? try to get one */ + if( timeout == -1 ) { /* wait potentially forever for data to arrive */ - while( client->m_list->next == NULL ) { - // if( ! session_wait( client->session, -1 ) ) { int x; - if( (x = session_wait( client->session, -1 )) ) { - osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x); - client->error = 1; - return NULL; - } + do { + if( (x = session_wait( client->session, -1 )) ) { + osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x); + error = 1; + break; + } + } while( client->msg_q_head == NULL ); + + } else { /* loop up to 'timeout' seconds waiting for data to arrive */ + + /* This loop assumes that a time_t is denominated in seconds -- not */ + /* guaranteed by Standard C, but a fair bet for Linux or UNIX */ + + time_t start = time(NULL); + time_t remaining = (time_t) timeout; + + int wait_ret; + do { + if( (wait_ret = session_wait( client->session, (int) remaining)) ) { + error = 1; + osrfLogDebug(OSRF_LOG_MARK, + "session_wait returned failure code %d: setting error=1\n", wait_ret); + break; + } + + remaining -= time(NULL) - start; + } while( NULL == client->msg_q_head && remaining > 0 ); } - - } else { /* wait at most timeout seconds */ - + } - /* if not, loop up to 'timeout' seconds waiting for data to arrive */ - time_t start = time(NULL); - time_t remaining = (time_t) timeout; - - int counter = 0; - - int wait_ret; - while( client->m_list->next == NULL && remaining >= 0 ) { - - if( (wait_ret= session_wait( client->session, remaining)) ) { - client->error = 1; - osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret); - return NULL; - } - - ++counter; - -#ifdef _ROUTER - // session_wait returns -1 if there is no more data and we're a router - if( remaining == 0 ) { // && wait_ret == -1 ) { - break; - } -#else - if( remaining == 0 ) // or infinite loop - break; -#endif - - remaining -= (int) (time(NULL) - start); - } - + transport_message* msg = NULL; + + if( error ) + client->error = 1; + else if( client->msg_q_head != NULL ) { + /* got message(s); dequeue the oldest one */ + msg = client->msg_q_head; + client->msg_q_head = msg->next; + msg->next = NULL; /* shouldn't be necessary; nullify for good hygiene */ + if( NULL == client->msg_q_head ) + client->msg_q_tail = NULL; } - /* again, see if there are any messages in the message queue */ - if( client->m_list->next != NULL ) { - /* pop off the first one... */ - node = client->m_list->next; - client->m_list->next = node->next; - msg = node->message; - free( node ); - return msg; - - } else { - return NULL; - } + return msg; } // --------------------------------------------------------------------------- // This is the message handler required by transport_session. This handler -// takes all incoming messages and puts them into the back of a linked list -// of messages. +// takes an incoming message and adds it to the tail of a message queue. // --------------------------------------------------------------------------- static void client_message_handler( void* client, transport_message* msg ){ @@ -203,21 +161,14 @@ static void client_message_handler( void* client, transport_message* msg ){ transport_client* cli = (transport_client*) client; - transport_message_node* node = safe_malloc( sizeof( transport_message_node) ); - node->next = NULL; - node->type = MESSAGE_LIST_ITEM; - node->message = msg; - - - /* find the last node and put this onto the end */ - transport_message_node* tail = cli->m_list; - transport_message_node* current = tail->next; - - while( current != NULL ) { - tail = current; - current = current->next; + /* add the new message to the tail of the queue */ + if( NULL == cli->msg_q_head ) + cli->msg_q_tail = cli->msg_q_head = msg; + else { + cli->msg_q_tail->next = msg; + cli->msg_q_tail = msg; } - tail->next = node; + msg->next = NULL; } @@ -225,18 +176,16 @@ int client_free( transport_client* client ){ if(client == NULL) return 0; session_free( client->session ); - transport_message_node* current = client->m_list->next; - transport_message_node* next; + transport_message* current = client->msg_q_head; + transport_message* next; /* deallocate the list of messages */ while( current != NULL ) { next = current->next; - message_free( current->message ); - free(current); + message_free( current ); current = next; } - free( client->m_list ); free( client ); return 1; } diff --git a/src/libopensrf/transport_message.c b/src/libopensrf/transport_message.c index b23e317..9189167 100644 --- a/src/libopensrf/transport_message.c +++ b/src/libopensrf/transport_message.c @@ -1,6 +1,5 @@ #include - // --------------------------------------------------------------------------------- // Allocates and initializes a new transport_message // --------------------------------------------------------------------------------- @@ -45,6 +44,7 @@ transport_message* message_init( const char* body, const char* subject, msg->error_code = 0; msg->broadcast = 0; msg->msg_xml = NULL; + msg->next = NULL; return msg; } @@ -72,6 +72,7 @@ transport_message* new_message_from_xml( const char* msg_xml ) { new_msg->error_code = 0; new_msg->broadcast = 0; new_msg->msg_xml = NULL; + new_msg->next = NULL; xmlKeepBlanksDefault(0); xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 ); @@ -160,62 +161,42 @@ transport_message* new_message_from_xml( const char* msg_xml ) { new_msg->body = strdup(""); new_msg->msg_xml = xmlDocToString(msg_doc, 0); - xmlFreeDoc(msg_doc); - xmlCleanupParser(); + xmlFreeDoc(msg_doc); + xmlCleanupParser(); return new_msg; } void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ) { - if(!msg) return; - if( osrf_xid ) - msg->osrf_xid = strdup(osrf_xid); - else msg->osrf_xid = strdup(""); + if( msg ) { + if( msg->osrf_xid ) free( msg->osrf_xid ); + msg->osrf_xid = strdup( osrf_xid ? osrf_xid : "" ); + } } void message_set_router_info( transport_message* msg, const char* router_from, const char* router_to, const char* router_class, const char* router_command, int broadcast_enabled ) { - if( !msg ) return; - - if(router_from) - msg->router_from = strdup(router_from); - else - msg->router_from = strdup(""); - - if(router_to) - msg->router_to = strdup(router_to); - else - msg->router_to = strdup(""); - - if(router_class) - msg->router_class = strdup(router_class); - else - msg->router_class = strdup(""); - - if(router_command) - msg->router_command = strdup(router_command); - else - msg->router_command = strdup(""); - - msg->broadcast = broadcast_enabled; + if( msg ) { - if( msg->router_from == NULL || msg->router_to == NULL || - msg->router_class == NULL || msg->router_command == NULL ) - osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" ); - - return; -} + /* free old values, if any */ + if( msg->router_from ) free( msg->router_from ); + if( msg->router_to ) free( msg->router_to ); + if( msg->router_class ) free( msg->router_class ); + if( msg->router_command ) free( msg->router_command ); + /* install new values */ + msg->router_from = strdup( router_from ? router_from : "" ); + msg->router_to = strdup( router_to ? router_to : "" ); + msg->router_class = strdup( router_class ? router_class : "" ); + msg->router_command = strdup( router_command ? router_command : "" ); + msg->broadcast = broadcast_enabled; - -/* encodes the message for traversal */ -int message_prepare_xml( transport_message* msg ) { - if( !msg ) return 0; - if( msg->msg_xml == NULL ) - msg->msg_xml = message_to_xml( msg ); - return 1; + if( msg->router_from == NULL || msg->router_to == NULL || + msg->router_class == NULL || msg->router_command == NULL ) + osrfLogError(OSRF_LOG_MARK, "message_set_router_info(): Out of Memory" ); + } } @@ -240,16 +221,16 @@ int message_free( transport_message* msg ){ free(msg); return 1; } - + + // --------------------------------------------------------------------------------- -// Allocates a char* holding the XML representation of this jabber message +// Encodes the message as XML for traversal; stores in msg_xml member // --------------------------------------------------------------------------------- -char* message_to_xml( const transport_message* msg ) { - - //int bufsize; - //xmlChar* xmlbuf; - //char* encoded_body; +int message_prepare_xml( transport_message* msg ) { + if( !msg ) return 0; + if( msg->msg_xml ) return 1; /* already done */ + xmlNodePtr message_node; xmlNodePtr body_node; xmlNodePtr thread_node; @@ -260,11 +241,6 @@ char* message_to_xml( const transport_message* msg ) { xmlKeepBlanksDefault(0); - if( ! msg ) { - osrfLogWarning(OSRF_LOG_MARK, "Passing NULL message to message_to_xml()"); - return NULL; - } - doc = xmlReadDoc( BAD_CAST "", NULL, NULL, XML_PARSE_NSCLEAN ); message_node = xmlDocGetRootElement(doc); @@ -318,11 +294,13 @@ char* message_to_xml( const transport_message* msg ) { xmlBufferPtr xmlbuf = xmlBufferCreate(); xmlNodeDump( xmlbuf, doc, xmlDocGetRootElement(doc), 0, 0); - char* xml = strdup((const char*) (xmlBufferContent(xmlbuf))); + msg->msg_xml = strdup((const char*) (xmlBufferContent(xmlbuf))); + xmlBufferFree(xmlbuf); xmlFreeDoc( doc ); xmlCleanupParser(); - return xml; + + return 1; }