From 00cfe2cd95fd590a7398d173dc4094ca7da91999 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Tue, 22 Nov 2022 15:52:03 -0500 Subject: [PATCH] Revert "LP#1729610: extend backlog queue to C apps" This reverts commit d7e9df6838f1c9a72db3fd41556d178cfe7f6700. --- examples/opensrf.xml.example | 19 ----- src/c-apps/Makefile.am | 6 +- src/c-apps/osrf_cslow.c | 58 --------------- src/libopensrf/osrf_prefork.c | 160 +++++++++++------------------------------- 4 files changed, 41 insertions(+), 202 deletions(-) delete mode 100644 src/c-apps/osrf_cslow.c diff --git a/examples/opensrf.xml.example b/examples/opensrf.xml.example index 1a35816..2e52c7a 100644 --- a/examples/opensrf.xml.example +++ b/examples/opensrf.xml.example @@ -165,24 +165,6 @@ vim:et:ts=2:sw=2: - - 3 - 1 - c - libosrf_cslow.so - - 1000 - opensrf.cslow_unix.log - opensrf.cslow_unix.sock - opensrf.cslow_unix.pid - 5 - 15 - 2 - 5 - 10 - - - 1 1 @@ -279,7 +261,6 @@ vim:et:ts=2:sw=2: opensrf.dbmath opensrf.validator opensrf.slooooooow - opensrf.cslow diff --git a/src/c-apps/Makefile.am b/src/c-apps/Makefile.am index 9138ff6..54c3cac 100644 --- a/src/c-apps/Makefile.am +++ b/src/c-apps/Makefile.am @@ -18,15 +18,11 @@ AM_LDFLAGS = $(DEF_LDFLAGS) -L@top_builddir@/src/libopensrf DISTCLEANFILES = Makefile.in Makefile noinst_PROGRAMS = timejson -lib_LTLIBRARIES = libosrf_cslow.la libosrf_dbmath.la libosrf_math.la libosrf_version.la +lib_LTLIBRARIES = libosrf_dbmath.la libosrf_math.la libosrf_version.la timejson_SOURCES = timejson.c timejson_LDADD = @top_builddir@/src/libopensrf/libopensrf.la -libosrf_cslow_la_SOURCES = osrf_cslow.c -libosrf_cslow_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2 -libosrf_cslow_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la - libosrf_dbmath_la_SOURCES = osrf_dbmath.c libosrf_dbmath_la_LDFLAGS = $(AM_LDFLAGS) -module -version-info 2:0:2 libosrf_dbmath_la_LIBADD = @top_builddir@/src/libopensrf/libopensrf.la diff --git a/src/c-apps/osrf_cslow.c b/src/c-apps/osrf_cslow.c deleted file mode 100644 index 108977a..0000000 --- a/src/c-apps/osrf_cslow.c +++ /dev/null @@ -1,58 +0,0 @@ -#include -#include -#include -#include - -#define MODULENAME "opensrf.cslow" - -int osrfAppInitialize(); -int osrfAppChildInit(); -int osrfCSlowWait( osrfMethodContext* ); - - -int osrfAppInitialize() { - - osrfAppRegisterMethod( - MODULENAME, - "opensrf.cslow.wait", - "osrfCSlowWait", - "Wait specified number of seconds, then return that number", 1, 0 ); - - return 0; -} - -int osrfAppChildInit() { - return 0; -} - -int osrfCSlowWait( osrfMethodContext* ctx ) { - if( osrfMethodVerifyContext( ctx ) ) { - osrfLogError( OSRF_LOG_MARK, "Invalid method context" ); - return -1; - } - - const jsonObject* x = jsonObjectGetIndex(ctx->params, 0); - - if( x ) { - - char* a = jsonObjectToSimpleString(x); - - if( a ) { - - unsigned int pause = atoi(a); - sleep(pause); - - jsonObject* resp = jsonNewNumberObject(pause); - osrfAppRespondComplete( ctx, resp ); - jsonObjectFree(resp); - - free(a); - return 0; - } - } - - return -1; -} - - - diff --git a/src/libopensrf/osrf_prefork.c b/src/libopensrf/osrf_prefork.c index 845a64e..f02f635 100644 --- a/src/libopensrf/osrf_prefork.c +++ b/src/libopensrf/osrf_prefork.c @@ -48,7 +48,6 @@ typedef struct { int max_requests; /**< How many requests a child processes before terminating. */ int min_children; /**< Minimum number of children to maintain. */ int max_children; /**< Maximum number of children to maintain. */ - int max_backlog_queue; /**< Maximum size of backlog queue. */ int fd; /**< Unused. */ int data_to_child; /**< Unused. */ int data_to_parent; /**< Unused. */ @@ -87,7 +86,7 @@ typedef struct prefork_child_struct prefork_child; static volatile sig_atomic_t child_dead; static int prefork_simple_init( prefork_simple* prefork, transport_client* client, - int max_requests, int min_children, int max_children, int max_backlog_queue ); + int max_requests, int min_children, int max_children ); static prefork_child* launch_child( prefork_simple* forker ); static void prefork_launch_children( prefork_simple* forker ); static void prefork_run( prefork_simple* forker ); @@ -138,7 +137,6 @@ int osrf_prefork_run( const char* appname ) { int maxr = 1000; int maxc = 10; - int maxbq = 1000; int minc = 3; int kalive = 5; @@ -148,7 +146,6 @@ int osrf_prefork_run( const char* appname ) { char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname ); char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname ); char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname ); - char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname ); char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname ); if( !keepalive ) @@ -171,16 +168,10 @@ int osrf_prefork_run( const char* appname ) { else maxc = atoi( max_children ); - if( !max_backlog_queue ) - osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq ); - else - maxbq = atoi( max_backlog_queue ); - free( keepalive ); free( max_req ); free( min_children ); free( max_children ); - free( max_backlog_queue ); /* --------------------------------------------------- */ char* resc = va_list_to_string( "%s_listener", appname ); @@ -196,7 +187,7 @@ int osrf_prefork_run( const char* appname ) { prefork_simple forker; - if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) { + if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) { osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object" ); return -1; @@ -531,11 +522,10 @@ static int prefork_child_process_request( prefork_child* child, char* data ) { before terminating. @param min_children Minimum number of child processes to maintain. @param max_children Maximum number of child processes to maintain. - @param max_backlog_queue Maximum size of backlog queue. @return 0 if successful, or 1 if not (due to invalid parameters). */ static int prefork_simple_init( prefork_simple* prefork, transport_client* client, - int max_requests, int min_children, int max_children, int max_backlog_queue ) { + int max_requests, int min_children, int max_children ) { if( min_children > max_children ) { osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater " @@ -556,7 +546,6 @@ static int prefork_simple_init( prefork_simple* prefork, transport_client* clien prefork->max_requests = max_requests; prefork->min_children = min_children; prefork->max_children = max_children; - prefork->max_backlog_queue = max_backlog_queue; prefork->fd = 0; prefork->data_to_child = 0; prefork->data_to_parent = 0; @@ -861,16 +850,6 @@ static void prefork_run( prefork_simple* forker ) { transport_message* cur_msg = NULL; - // The backlog queue accumulates messages received while there - // are not yet children available to process them. While the - // transport client maintains its own queue of messages, sweeping - // the transport client's queue in the backlog queue gives us the - // ability to set a limit on the size of the backlog queue (and - // then to drop messages once the backlog queue has filled up) - transport_message* backlog_queue_head = NULL; - transport_message* backlog_queue_tail = NULL; - int backlog_queue_size = 0; - while( 1 ) { if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */ @@ -878,101 +857,45 @@ static void prefork_run( prefork_simple* forker ) { return; } - int received_from_network = 0; - if ( backlog_queue_size == 0 ) { - // Wait indefinitely for an input message - osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); - cur_msg = client_recv( forker->connection, -1 ); - received_from_network = 1; - } else { - // We have queued messages, which means all of our drones - // are occupied. See if any new messages are available on the - // network while waiting up to 1 second to allow time for a drone - // to become available to handle the next request in the queue. - cur_msg = client_recv( forker->connection, 1 ); - if ( cur_msg != NULL ) - received_from_network = 1; - } - - if (received_from_network) { - if( cur_msg == NULL ) { - // most likely a signal was received. clean up any recently - // deceased children and try again. - if(child_dead) - reap_children(forker); - continue; - } - - if (cur_msg->error_type) { - osrfLogInfo(OSRF_LOG_MARK, - "Listener received an XMPP error message. " - "Likely a bounced message. sender=%s", cur_msg->sender); - if(child_dead) - reap_children(forker); - continue; - } + // Wait indefinitely for an input message + osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." ); + cur_msg = client_recv( forker->connection, -1 ); - message_prepare_xml( cur_msg ); - const char* msg_data = cur_msg->msg_xml; - if( ! msg_data || ! *msg_data ) { - osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", - (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); - message_free( cur_msg ); - continue; // Message not usable; go on to the next one. - } + if( cur_msg == NULL ) { + // most likely a signal was received. clean up any recently + // deceased children and try again. + if(child_dead) + reap_children(forker); + continue; + } - // stick message onto queue - cur_msg->next = NULL; - if (backlog_queue_size == 0) { - backlog_queue_head = cur_msg; - backlog_queue_tail = cur_msg; - } else { - if (backlog_queue_size >= forker->max_backlog_queue) { - osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping " - "latest message", - forker->max_backlog_queue ); - osrfMessage* err = osrf_message_init( STATUS, 1, 1 ); - osrf_message_set_status_info( err, "osrfMethodException", - "Service unavailable: no available children and backlog queue at limit", - OSRF_STATUS_SERVICEUNAVAILABLE ); - char *data = osrf_message_serialize( err ); - osrfMessageFree( err ); - transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient ); - message_set_osrf_xid(tresponse, cur_msg->osrf_xid); - free( data ); - transport_client* client = osrfSystemGetTransportClient(); - client_send_message( client, tresponse ); - message_free( tresponse ); - message_free(cur_msg); - continue; - } - backlog_queue_tail->next = cur_msg; - backlog_queue_tail = cur_msg; - osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." ); - } - backlog_queue_size++; - } + if (cur_msg->error_type) { + osrfLogInfo(OSRF_LOG_MARK, + "Listener received an XMPP error message. " + "Likely a bounced message. sender=%s", cur_msg->sender); + if(child_dead) + reap_children(forker); + continue; + } - if (backlog_queue_size == 0) { - // strictly speaking, this check may be redundant, but - // from this point forward we can be sure that the - // backlog queue has at least one message in it and - // that if we can find a child to process it, we want to - // process the head of that queue. - continue; + message_prepare_xml( cur_msg ); + const char* msg_data = cur_msg->msg_xml; + if( ! msg_data || ! *msg_data ) { + osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %", + (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread ); + message_free( cur_msg ); + continue; // Message not usable; go on to the next one. } - cur_msg = backlog_queue_head; - int honored = 0; /* will be set to true when we service the request */ int no_recheck = 0; while( ! honored ) { - if( !no_recheck ) { - if(check_children( forker, 0 ) < 0) { + if( !no_recheck ) { + if(check_children( forker, 0 ) < 0) { continue; // check failed, try again - } + } } no_recheck = 0; @@ -1001,7 +924,6 @@ static void prefork_run( prefork_simple* forker ) { osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd ); - const char* msg_data = cur_msg->msg_xml; int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { // This child appears to be dead or unusable. Discard it. @@ -1036,7 +958,6 @@ static void prefork_run( prefork_simple* forker ) { osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d", new_child->write_data_fd, new_child->pid ); - const char* msg_data = cur_msg->msg_xml; int written = write( new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 ); if( written < 0 ) { @@ -1059,21 +980,20 @@ static void prefork_run( prefork_simple* forker ) { } } - if( child_dead ) - reap_children( forker ); - if( !honored ) { - break; + osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." ); + if( check_children( forker, 1 ) >= 0 ) { + // Tell the loop not to call check_children again, since we just successfully called it + no_recheck = 1; + } } + if( child_dead ) + reap_children( forker ); + } // end while( ! honored ) - if ( honored ) { - backlog_queue_head = cur_msg->next; - backlog_queue_size--; - cur_msg->next = NULL; - message_free( cur_msg ); - } + message_free( cur_msg ); } /* end top level listen loop */ } -- 2.11.0