Revert "LP#1729610: extend backlog queue to C apps"
authorBill Erickson <berickxx@gmail.com>
Tue, 22 Nov 2022 20:52:03 +0000 (15:52 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 20 Apr 2023 14:18:05 +0000 (10:18 -0400)
This reverts commit d7e9df6838f1c9a72db3fd41556d178cfe7f6700.

examples/opensrf.xml.example
src/c-apps/Makefile.am
src/c-apps/osrf_cslow.c [deleted file]
src/libopensrf/osrf_prefork.c

index 1a35816..2e52c7a 100644 (file)
@@ -165,24 +165,6 @@ vim:et:ts=2:sw=2:
         </unix_config>
       </opensrf.dbmath>
 
-      <opensrf.cslow>
-        <keepalive>3</keepalive>
-        <stateless>1</stateless>
-        <language>c</language>
-        <implementation>libosrf_cslow.so</implementation>
-        <unix_config>
-          <max_requests>1000</max_requests>
-          <unix_log>opensrf.cslow_unix.log</unix_log>
-          <unix_sock>opensrf.cslow_unix.sock</unix_sock>
-          <unix_pid>opensrf.cslow_unix.pid</unix_pid>
-          <min_children>5</min_children>
-          <max_children>15</max_children>
-          <min_spare_children>2</min_spare_children>
-          <max_spare_children>5</max_spare_children>
-          <max_backlog_queue>10</max_backlog_queue>
-        </unix_config>
-      </opensrf.cslow>
-
       <opensrf.settings>
         <keepalive>1</keepalive>
         <stateless>1</stateless>
@@ -279,7 +261,6 @@ vim:et:ts=2:sw=2:
         <appname>opensrf.dbmath</appname>
         <appname>opensrf.validator</appname>
         <appname>opensrf.slooooooow</appname>
-        <appname>opensrf.cslow</appname>
       </activeapps>
 
       <apps>
index 9138ff6..54c3cac 100644 (file)
@@ -18,15 +18,11 @@ AM_LDFLAGS = $(DEF_LDFLAGS) -L@top_builddir@/src/libopensrf
 DISTCLEANFILES = Makefile.in Makefile
 
 noinst_PROGRAMS = timejson
-lib_LTLIBRARIES = libosrf_cslow.la libosrf_dbmath.la libosrf_math.la libosrf_version.la
+lib_LTLIBRARIES = libosrf_dbmath.la libosrf_math.la libosrf_version.la
 
 timejson_SOURCES = timejson.c
 timejson_LDADD = @top_builddir@/src/libopensrf/libopensrf.la
 
-libosrf_cslow_la_SOURCES = osrf_cslow.c
-libosrf_cslow_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
-libosrf_cslow_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
-
 libosrf_dbmath_la_SOURCES = osrf_dbmath.c 
 libosrf_dbmath_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2
 libosrf_dbmath_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la
diff --git a/src/c-apps/osrf_cslow.c b/src/c-apps/osrf_cslow.c
deleted file mode 100644 (file)
index 108977a..0000000
+++ /dev/null
@@ -1,58 +0,0 @@
-#include <opensrf/osrf_app_session.h>
-#include <opensrf/osrf_application.h>
-#include <opensrf/osrf_json.h>
-#include <opensrf/log.h>
-
-#define MODULENAME "opensrf.cslow"
-
-int osrfAppInitialize();
-int osrfAppChildInit();
-int osrfCSlowWait( osrfMethodContext* );
-
-
-int osrfAppInitialize() {
-
-       osrfAppRegisterMethod( 
-                       MODULENAME, 
-                       "opensrf.cslow.wait", 
-                       "osrfCSlowWait", 
-                       "Wait specified number of seconds, then return that number", 1, 0 );
-
-       return 0;
-}
-
-int osrfAppChildInit() {
-       return 0;
-}
-
-int osrfCSlowWait( osrfMethodContext* ctx ) {
-       if( osrfMethodVerifyContext( ctx ) ) {
-               osrfLogError( OSRF_LOG_MARK,  "Invalid method context" );
-               return -1;
-       }
-
-       const jsonObject* x = jsonObjectGetIndex(ctx->params, 0);
-
-       if( x ) {
-
-               char* a = jsonObjectToSimpleString(x);
-
-               if( a ) {
-
-                       unsigned int pause = atoi(a);
-                       sleep(pause);
-
-                       jsonObject* resp = jsonNewNumberObject(pause);
-                       osrfAppRespondComplete( ctx, resp );
-                       jsonObjectFree(resp);
-
-                       free(a);
-                       return 0;
-               }
-       }
-
-       return -1;
-}
-
-
-
index 845a64e..f02f635 100644 (file)
@@ -48,7 +48,6 @@ typedef struct {
        int max_requests;     /**< How many requests a child processes before terminating. */
        int min_children;     /**< Minimum number of children to maintain. */
        int max_children;     /**< Maximum number of children to maintain. */
-       int max_backlog_queue; /**< Maximum size of backlog queue. */
        int fd;               /**< Unused. */
        int data_to_child;    /**< Unused. */
        int data_to_parent;   /**< Unused. */
@@ -87,7 +86,7 @@ typedef struct prefork_child_struct prefork_child;
 static volatile sig_atomic_t child_dead;
 
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-       int max_requests, int min_children, int max_children, int max_backlog_queue );
+       int max_requests, int min_children, int max_children );
 static prefork_child* launch_child( prefork_simple* forker );
 static void prefork_launch_children( prefork_simple* forker );
 static void prefork_run( prefork_simple* forker );
@@ -138,7 +137,6 @@ int osrf_prefork_run( const char* appname ) {
 
        int maxr = 1000;
        int maxc = 10;
-       int maxbq = 1000;
        int minc = 3;
        int kalive = 5;
 
@@ -148,7 +146,6 @@ int osrf_prefork_run( const char* appname ) {
        char* max_req      = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
        char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
        char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
-       char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
        char* keepalive    = osrf_settings_host_value( "/apps/%s/keepalive", appname );
 
        if( !keepalive )
@@ -171,16 +168,10 @@ int osrf_prefork_run( const char* appname ) {
        else
                maxc = atoi( max_children );
 
-       if( !max_backlog_queue )
-               osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
-       else
-               maxbq = atoi( max_backlog_queue );
-
        free( keepalive );
        free( max_req );
        free( min_children );
        free( max_children );
-       free( max_backlog_queue );
        /* --------------------------------------------------- */
 
        char* resc = va_list_to_string( "%s_listener", appname );
@@ -196,7 +187,7 @@ int osrf_prefork_run( const char* appname ) {
 
        prefork_simple forker;
 
-       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
+       if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
                osrfLogError( OSRF_LOG_MARK,
                        "osrf_prefork_run() failed to create prefork_simple object" );
                return -1;
@@ -531,11 +522,10 @@ static int prefork_child_process_request( prefork_child* child, char* data ) {
                        before terminating.
        @param min_children Minimum number of child processes to maintain.
        @param max_children Maximum number of child processes to maintain.
-       @param max_backlog_queue Maximum size of backlog queue.
        @return 0 if successful, or 1 if not (due to invalid parameters).
 */
 static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
-               int max_requests, int min_children, int max_children, int max_backlog_queue ) {
+               int max_requests, int min_children, int max_children ) {
 
        if( min_children > max_children ) {
                osrfLogError( OSRF_LOG_MARK,  "min_children (%d) is greater "
@@ -556,7 +546,6 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien
        prefork->max_requests = max_requests;
        prefork->min_children = min_children;
        prefork->max_children = max_children;
-       prefork->max_backlog_queue = max_backlog_queue;
        prefork->fd           = 0;
        prefork->data_to_child = 0;
        prefork->data_to_parent = 0;
@@ -861,16 +850,6 @@ static void prefork_run( prefork_simple* forker ) {
 
        transport_message* cur_msg = NULL;
 
-       // The backlog queue accumulates messages received while there
-       // are not yet children available to process them. While the
-       // transport client maintains its own queue of messages, sweeping
-       // the transport client's queue in the backlog queue gives us the
-       // ability to set a limit on the size of the backlog queue (and
-       // then to drop messages once the backlog queue has filled up)
-       transport_message* backlog_queue_head = NULL;
-       transport_message* backlog_queue_tail = NULL;
-       int backlog_queue_size = 0;
-
        while( 1 ) {
 
                if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
@@ -878,101 +857,45 @@ static void prefork_run( prefork_simple* forker ) {
                        return;
                }
 
-               int received_from_network = 0;
-               if ( backlog_queue_size == 0 ) {
-                       // Wait indefinitely for an input message
-                       osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
-                       cur_msg = client_recv( forker->connection, -1 );
-                       received_from_network = 1;
-               } else {
-                       // We have queued messages, which means all of our drones
-                       // are occupied.  See if any new messages are available on the
-                       // network while waiting up to 1 second to allow time for a drone
-                       // to become available to handle the next request in the queue.
-                       cur_msg = client_recv( forker->connection, 1 );
-                       if ( cur_msg != NULL )
-                               received_from_network = 1;
-               }
-
-               if (received_from_network) {
-                       if( cur_msg == NULL ) {
-                               // most likely a signal was received.  clean up any recently
-                               // deceased children and try again.
-                               if(child_dead)
-                                       reap_children(forker);
-                               continue;
-                       }
-
-                       if (cur_msg->error_type) {
-                               osrfLogInfo(OSRF_LOG_MARK,
-                                       "Listener received an XMPP error message.  "
-                                       "Likely a bounced message. sender=%s", cur_msg->sender);
-                               if(child_dead)
-                                       reap_children(forker);
-                               continue;
-                       }
+               // Wait indefinitely for an input message
+               osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+               cur_msg = client_recv( forker->connection, -1 );
 
-                       message_prepare_xml( cur_msg );
-                       const char* msg_data = cur_msg->msg_xml;
-                       if( ! msg_data || ! *msg_data ) {
-                               osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
-                                       (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
-                               message_free( cur_msg );
-                               continue;       // Message not usable; go on to the next one.
-                       }
+               if( cur_msg == NULL ) {
+                       // most likely a signal was received.  clean up any recently
+                       // deceased children and try again.
+                       if(child_dead)
+                               reap_children(forker);
+                       continue;
+        }
 
-                       // stick message onto queue
-                       cur_msg->next = NULL;
-                       if (backlog_queue_size == 0) {
-                               backlog_queue_head = cur_msg;
-                               backlog_queue_tail = cur_msg;
-                       } else {
-                               if (backlog_queue_size >= forker->max_backlog_queue) {
-                                       osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
-                                               "latest message",
-                                               forker->max_backlog_queue );
-                                       osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
-                                       osrf_message_set_status_info( err, "osrfMethodException",
-                                               "Service unavailable: no available children and backlog queue at limit",
-                                               OSRF_STATUS_SERVICEUNAVAILABLE );
-                                       char *data = osrf_message_serialize( err );
-                                       osrfMessageFree( err );
-                                       transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
-                                       message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
-                                       free( data );
-                                       transport_client* client = osrfSystemGetTransportClient();
-                                       client_send_message( client, tresponse );
-                                       message_free( tresponse );
-                                       message_free(cur_msg);
-                                       continue;
-                               }
-                               backlog_queue_tail->next = cur_msg;
-                               backlog_queue_tail = cur_msg;
-                               osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
-                       }
-                       backlog_queue_size++;
-               }
+        if (cur_msg->error_type) {
+            osrfLogInfo(OSRF_LOG_MARK, 
+                "Listener received an XMPP error message.  "
+                "Likely a bounced message. sender=%s", cur_msg->sender);
+            if(child_dead)
+                reap_children(forker);
+            continue;
+        }
 
-               if (backlog_queue_size == 0) {
-                       // strictly speaking, this check may be redundant, but
-                       // from this point forward we can be sure that the
-                       // backlog queue has at least one message in it and
-                       // that if we can find a child to process it, we want to
-                       // process the head of that queue.
-                       continue;
+               message_prepare_xml( cur_msg );
+               const char* msg_data = cur_msg->msg_xml;
+               if( ! msg_data || ! *msg_data ) {
+                       osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+                               (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+                       message_free( cur_msg );
+                       continue;       // Message not usable; go on to the next one.
                }
 
-               cur_msg = backlog_queue_head;
-
                int honored = 0;     /* will be set to true when we service the request */
                int no_recheck = 0;
 
                while( ! honored ) {
 
-                       if( !no_recheck ) {
-                               if(check_children( forker, 0 ) < 0) {
+            if( !no_recheck ) {
+                if(check_children( forker, 0 ) < 0) {
                     continue; // check failed, try again
-                               }
+                }
             }
             no_recheck = 0;
 
@@ -1001,7 +924,6 @@ static void prefork_run( prefork_simple* forker ) {
                                osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
                                        cur_child->write_data_fd );
 
-                               const char* msg_data = cur_msg->msg_xml;
                                int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                if( written < 0 ) {
                                        // This child appears to be dead or unusable.  Discard it.
@@ -1036,7 +958,6 @@ static void prefork_run( prefork_simple* forker ) {
                                                osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
                                                        new_child->write_data_fd, new_child->pid );
 
-                                               const char* msg_data = cur_msg->msg_xml;
                                                int written = write(
                                                        new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
                                                if( written < 0 ) {
@@ -1059,21 +980,20 @@ static void prefork_run( prefork_simple* forker ) {
                 }
             }
 
-                       if( child_dead )
-                               reap_children( forker );
-
                        if( !honored ) {
-                               break;
+                               osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
+                               if( check_children( forker, 1 ) >= 0 ) {
+                                   // Tell the loop not to call check_children again, since we just successfully called it
+                                   no_recheck = 1;
+                }
                        }
 
+                       if( child_dead )
+                               reap_children( forker );
+
                } // end while( ! honored )
 
-               if ( honored ) {
-                       backlog_queue_head = cur_msg->next;
-                       backlog_queue_size--;
-                       cur_msg->next = NULL;
-                       message_free( cur_msg );
-               }
+               message_free( cur_msg );
 
        } /* end top level listen loop */
 }