This update restructures the mechanism for queueing incoming transport
authorscottmk <scottmk@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 5 Jan 2009 17:36:42 +0000 (17:36 +0000)
committerscottmk <scottmk@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 5 Jan 2009 17:36:42 +0000 (17:36 +0000)
messages.  In addition, the update to transport_client.c rearranges the
logic a bit in client_recv().

1. A transport_message now carries a pointer to be used in a linked list.
It is initialized to NULL when the message is created.  We no longer use
a separately allocated list node to carry the message.

2. The queue of transport_messages no longer starts with a dummy node.

3. Instead of finding the tail of the queue by traversing the list from
the head, we maintain a separate pointer to the tail node.  Thus the
enqueuing operation occurs in constant time instead of linear time.

4. In client_recv: we now have the dequeueing code in a single place,
instead of duplicating it.

5. In client_recv: I eliminated some conditional compilation that made
no real difference, since both branches of the #ifdef were effectively
identical.

6. In client_recv: changed both loops from while loops to do-while
loops, since in each case we want to perform at least one iteration.

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1570 9efc2488-bf62-4759-914b-345cdb29e865

include/opensrf/transport_client.h
include/opensrf/transport_message.h
src/libopensrf/transport_client.c
src/libopensrf/transport_message.c

index 9b375d6..924605f 100644 (file)
@@ -12,7 +12,8 @@ struct message_list_struct;
 // Our client struct.  We manage a list of messages and a controlling session
 // ---------------------------------------------------------------------------
 struct transport_client_struct {
-       struct message_list_struct* m_list;
+       transport_message* msg_q_head;
+       transport_message* msg_q_tail;
        transport_session* session;
        int error;
 };
index 6cf3a49..8508326 100644 (file)
@@ -34,6 +34,7 @@ struct transport_message_struct {
        int error_code;
        int broadcast;
        char* msg_xml; /* the entire message as XML complete with entity encoding */
+       struct transport_message_struct* next;
 };
 typedef struct transport_message_struct transport_message;
 
@@ -55,13 +56,6 @@ void message_set_router_info( transport_message* msg, const char* router_from,
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid );
 
 // ---------------------------------------------------------------------------------
-// Formats the Jabber message as XML for encoding. 
-// Returns NULL on error
-// ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg );
-
-
-// ---------------------------------------------------------------------------------
 // Call this to create the encoded XML for sending on the wire.
 // This is a seperate function so that encoding will not necessarily have
 // to happen on all messages (i.e. typically only occurs outbound messages).
@@ -75,11 +69,6 @@ int message_prepare_xml( transport_message* msg );
 int message_free( transport_message* msg );
 
 // ---------------------------------------------------------------------------------
-// Prepares the shared XML document
-// ---------------------------------------------------------------------------------
-//int message_init_xml();
-
-// ---------------------------------------------------------------------------------
 // Determines the username of a Jabber ID.  This expects a pre-allocated char 
 // array for the return value.
 // ---------------------------------------------------------------------------------
index 1f82702..be83eb7 100644 (file)
@@ -1,21 +1,5 @@
 #include <opensrf/transport_client.h>
 
-#define MESSAGE_LIST_HEAD 1
-#define MESSAGE_LIST_ITEM 2
-
-// ---------------------------------------------------------------------------
-// Represents a node in a linked list.  The node holds a pointer to the next
-// node (which is null unless set), a pointer to a transport_message, and
-// and a type variable (which is not really curently necessary).
-// ---------------------------------------------------------------------------
-struct message_list_struct {
-       struct message_list_struct* next;
-       transport_message* message;
-       int type;
-};
-typedef struct message_list_struct transport_message_list;
-typedef struct message_list_struct transport_message_node;
-
 static void client_message_handler( void* client, transport_message* msg );
 
 //int main( int argc, char** argv );
@@ -69,15 +53,11 @@ transport_client* client_init( const char* server, int port, const char* unix_pa
        /* build and clear the client object */
        transport_client* client = safe_malloc( sizeof( transport_client) );
 
-       /* build and clear the message list */
-       client->m_list = safe_malloc( sizeof( transport_message_list ) );
-
-       client->m_list->next = NULL;
-       client->m_list->message = NULL;
-       client->m_list->type = MESSAGE_LIST_HEAD;
-
-       /* build the session */
+       /* start with an empty message queue */
+       client->msg_q_head = NULL;
+       client->msg_q_tail = NULL;
        
+       /* build the session */
        client->session = init_transport( server, port, unix_path, client, component );
 
        client->session->message_callback = client_message_handler;
@@ -116,85 +96,63 @@ int client_send_message( transport_client* client, transport_message* msg ) {
 transport_message* client_recv( transport_client* client, int timeout ) {
        if( client == NULL ) { return NULL; }
 
-       transport_message_node* node;
-       transport_message* msg;
+       int error = 0;  /* boolean */
 
+       if( NULL == client->msg_q_head ) {
 
-       /* see if there are any message in the messages queue */
-       if( client->m_list->next != NULL ) {
-               /* pop off the first one... */
-               node = client->m_list->next;
-               client->m_list->next = node->next;
-               msg = node->message;
-               free( node );
-               return msg;
-       }
-
-       if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
+               /* no messaage available?  try to get one */
+               if( timeout == -1 ) {  /* wait potentially forever for data to arrive */
 
-               while( client->m_list->next == NULL ) {
-               //      if( ! session_wait( client->session, -1 ) ) {
                        int x;
-                       if( (x = session_wait( client->session, -1 )) ) {
-                               osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
-                               client->error = 1;
-                               return NULL;
-                       }
+                       do {
+                               if( (x = session_wait( client->session, -1 )) ) {
+                                       osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d\n", x);
+                                       error = 1;
+                                       break;
+                               }
+                       } while( client->msg_q_head == NULL );
+
+               } else {    /* loop up to 'timeout' seconds waiting for data to arrive  */
+
+                       /* This loop assumes that a time_t is denominated in seconds -- not */
+                       /* guaranteed by Standard C, but a fair bet for Linux or UNIX       */
+
+                       time_t start = time(NULL);
+                       time_t remaining = (time_t) timeout;
+
+                       int wait_ret;
+                       do {
+                               if( (wait_ret = session_wait( client->session, (int) remaining)) ) {
+                                       error = 1;
+                                       osrfLogDebug(OSRF_LOG_MARK,
+                                               "session_wait returned failure code %d: setting error=1\n", wait_ret);
+                                       break;
+                               }
+
+                               remaining -= time(NULL) - start;
+                       } while( NULL == client->msg_q_head && remaining > 0 );
                }
-
-       } else { /* wait at most timeout seconds */
-
+       }
        
-               /* if not, loop up to 'timeout' seconds waiting for data to arrive */
-               time_t start = time(NULL);      
-               time_t remaining = (time_t) timeout;
-
-               int counter = 0;
-
-               int wait_ret;
-               while( client->m_list->next == NULL && remaining >= 0 ) {
-
-                       if( (wait_ret= session_wait( client->session, remaining)) ) {
-                               client->error = 1;
-                               osrfLogDebug(OSRF_LOG_MARK, "session_wait returned failure code %d: setting error=1\n", wait_ret);
-                               return NULL;
-                       }
-
-                       ++counter;
-
-#ifdef _ROUTER
-                       // session_wait returns -1 if there is no more data and we're a router
-                       if( remaining == 0 ) { // && wait_ret == -1 ) {
-                               break;
-                       }
-#else
-                       if( remaining == 0 ) // or infinite loop
-                               break;
-#endif
-
-                       remaining -= (int) (time(NULL) - start);
-               }
-
+       transport_message* msg = NULL;
+
+       if( error )
+               client->error = 1;
+       else if( client->msg_q_head != NULL ) {
+               /* got message(s); dequeue the oldest one */
+               msg = client->msg_q_head;
+               client->msg_q_head = msg->next;
+               msg->next = NULL;  /* shouldn't be necessary; nullify for good hygiene */
+               if( NULL == client->msg_q_head )
+                       client->msg_q_tail = NULL;
        }
 
-       /* again, see if there are any messages in the message queue */
-       if( client->m_list->next != NULL ) {
-               /* pop off the first one... */
-               node = client->m_list->next;
-               client->m_list->next = node->next;
-               msg = node->message;
-               free( node );
-               return msg;
-
-       } else {
-               return NULL;
-       }
+       return msg;
 }
 
 // ---------------------------------------------------------------------------
 // This is the message handler required by transport_session.  This handler
-// takes all incoming messages and puts them into the back of a linked list
-// of messages.
+// takes an incoming message and adds it to the tail of a message queue.
 // ---------------------------------------------------------------------------
 static void client_message_handler( void* client, transport_message* msg ){
 
@@ -203,21 +161,14 @@ static void client_message_handler( void* client, transport_message* msg ){
 
        transport_client* cli = (transport_client*) client;
 
-       transport_message_node* node = safe_malloc( sizeof( transport_message_node) );
-       node->next = NULL;
-       node->type = MESSAGE_LIST_ITEM;
-       node->message = msg;
-
-
-       /* find the last node and put this onto the end */
-       transport_message_node* tail = cli->m_list;
-       transport_message_node* current = tail->next;
-
-       while( current != NULL ) {
-               tail = current;
-               current = current->next;
+       /* add the new message to the tail of the queue */
+       if( NULL == cli->msg_q_head )
+               cli->msg_q_tail = cli->msg_q_head = msg;
+       else {
+               cli->msg_q_tail->next = msg;
+               cli->msg_q_tail = msg;
        }
-       tail->next = node;
+       msg->next = NULL;
 }
 
 
@@ -225,18 +176,16 @@ int client_free( transport_client* client ){
        if(client == NULL) return 0; 
 
        session_free( client->session );
-       transport_message_node* current = client->m_list->next;
-       transport_message_node* next;
+       transport_message* current = client->msg_q_head;
+       transport_message* next;
 
        /* deallocate the list of messages */
        while( current != NULL ) {
                next = current->next;
-               message_free( current->message );
-               free(current);
+               message_free( current );
                current = next;
        }
 
-       free( client->m_list );
        free( client );
        return 1;
 }
index b23e317..9189167 100644 (file)
@@ -1,6 +1,5 @@
 #include <opensrf/transport_message.h>
 
-
 // ---------------------------------------------------------------------------------
 // Allocates and initializes a new transport_message
 // ---------------------------------------------------------------------------------
@@ -45,6 +44,7 @@ transport_message* message_init( const char* body, const char* subject,
        msg->error_code     = 0;
        msg->broadcast      = 0;
        msg->msg_xml        = NULL;
+       msg->next           = NULL;
 
        return msg;
 }
@@ -72,6 +72,7 @@ transport_message* new_message_from_xml( const char* msg_xml ) {
        new_msg->error_code     = 0;
        new_msg->broadcast      = 0;
        new_msg->msg_xml        = NULL;
+       new_msg->next           = NULL;
 
        xmlKeepBlanksDefault(0);
        xmlDocPtr msg_doc = xmlReadDoc( BAD_CAST msg_xml, NULL, NULL, 0 );
@@ -160,62 +161,42 @@ transport_message* new_message_from_xml( const char* msg_xml ) {
                new_msg->body = strdup("");
 
        new_msg->msg_xml = xmlDocToString(msg_doc, 0);
-   xmlFreeDoc(msg_doc);
-   xmlCleanupParser();
+       xmlFreeDoc(msg_doc);
+       xmlCleanupParser();
 
        return new_msg;
 }
 
 void message_set_osrf_xid( transport_message* msg, const char* osrf_xid ) {
-   if(!msg) return;
-   if( osrf_xid )
-      msg->osrf_xid = strdup(osrf_xid);
-   else msg->osrf_xid = strdup("");
+       if( msg ) {
+               if( msg->osrf_xid ) free( msg->osrf_xid );
+               msg->osrf_xid = strdup( osrf_xid ? osrf_xid : "" );
+       }
 }
 
 void message_set_router_info( transport_message* msg, const char* router_from,
                const char* router_to, const char* router_class, const char* router_command,
                int broadcast_enabled ) {
 
-       if( !msg ) return;
-       
-       if(router_from)
-               msg->router_from                = strdup(router_from);
-       else
-               msg->router_from                = strdup("");
-
-       if(router_to)
-               msg->router_to                  = strdup(router_to);
-       else
-               msg->router_to                  = strdup("");
-
-       if(router_class)
-               msg->router_class               = strdup(router_class);
-       else 
-               msg->router_class               = strdup("");
-       
-       if(router_command)
-               msg->router_command     = strdup(router_command);
-       else
-               msg->router_command     = strdup("");
-
-       msg->broadcast = broadcast_enabled;
+       if( msg ) {
 
-       if( msg->router_from == NULL || msg->router_to == NULL ||
-                       msg->router_class == NULL || msg->router_command == NULL ) 
-               osrfLogError(OSRF_LOG_MARK,  "message_set_router_info(): Out of Memory" );
-
-       return;
-}
+               /* free old values, if any */
+               if( msg->router_from    ) free( msg->router_from );
+               if( msg->router_to      ) free( msg->router_to );
+               if( msg->router_class   ) free( msg->router_class );
+               if( msg->router_command ) free( msg->router_command );
 
+               /* install new values */
+               msg->router_from     = strdup( router_from     ? router_from     : "" );
+               msg->router_to       = strdup( router_to       ? router_to       : "" );
+               msg->router_class    = strdup( router_class    ? router_class    : "" );
+               msg->router_command  = strdup( router_command  ? router_command  : "" );
+               msg->broadcast = broadcast_enabled;
 
-
-/* encodes the message for traversal */
-int message_prepare_xml( transport_message* msg ) {
-       if( !msg ) return 0;
-       if( msg->msg_xml == NULL )
-               msg->msg_xml = message_to_xml( msg );
-       return 1;
+               if( msg->router_from == NULL || msg->router_to == NULL ||
+                               msg->router_class == NULL || msg->router_command == NULL ) 
+                       osrfLogError(OSRF_LOG_MARK,  "message_set_router_info(): Out of Memory" );
+       }
 }
 
 
@@ -240,16 +221,16 @@ int message_free( transport_message* msg ){
        free(msg);
        return 1;
 }
-       
+
+
 // ---------------------------------------------------------------------------------
-// Allocates a char* holding the XML representation of this jabber message
+// Encodes the message as XML for traversal; stores in msg_xml member
 // ---------------------------------------------------------------------------------
-char* message_to_xml( const transport_message* msg ) {
-
-       //int                   bufsize;
-       //xmlChar*              xmlbuf;
-       //char*                 encoded_body;
+int message_prepare_xml( transport_message* msg ) {
 
+       if( !msg ) return 0;
+       if( msg->msg_xml ) return 1;   /* already done */
+       
        xmlNodePtr      message_node;
        xmlNodePtr      body_node;
        xmlNodePtr      thread_node;
@@ -260,11 +241,6 @@ char* message_to_xml( const transport_message* msg ) {
 
        xmlKeepBlanksDefault(0);
 
-       if( ! msg ) { 
-               osrfLogWarning(OSRF_LOG_MARK,  "Passing NULL message to message_to_xml()"); 
-               return NULL; 
-       }
-
        doc = xmlReadDoc( BAD_CAST "<message/>", NULL, NULL, XML_PARSE_NSCLEAN );
        message_node = xmlDocGetRootElement(doc);
 
@@ -318,11 +294,13 @@ char* message_to_xml( const transport_message* msg ) {
 
        xmlBufferPtr xmlbuf = xmlBufferCreate();
        xmlNodeDump( xmlbuf, doc, xmlDocGetRootElement(doc), 0, 0);
-       char* xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+       msg->msg_xml = strdup((const char*) (xmlBufferContent(xmlbuf)));
+       
        xmlBufferFree(xmlbuf);
        xmlFreeDoc( doc );               
        xmlCleanupParser();
-       return xml;
+       
+       return 1;
 }