int max_requests; /**< How many requests a child processes before terminating. */
int min_children; /**< Minimum number of children to maintain. */
int max_children; /**< Maximum number of children to maintain. */
- int max_backlog_queue; /**< Maximum size of backlog queue. */
int fd; /**< Unused. */
int data_to_child; /**< Unused. */
int data_to_parent; /**< Unused. */
static volatile sig_atomic_t child_dead;
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
- int max_requests, int min_children, int max_children, int max_backlog_queue );
+ int max_requests, int min_children, int max_children );
static prefork_child* launch_child( prefork_simple* forker );
static void prefork_launch_children( prefork_simple* forker );
static void prefork_run( prefork_simple* forker );
int maxr = 1000;
int maxc = 10;
- int maxbq = 1000;
int minc = 3;
int kalive = 5;
char* max_req = osrf_settings_host_value( "/apps/%s/unix_config/max_requests", appname );
char* min_children = osrf_settings_host_value( "/apps/%s/unix_config/min_children", appname );
char* max_children = osrf_settings_host_value( "/apps/%s/unix_config/max_children", appname );
- char* max_backlog_queue = osrf_settings_host_value( "/apps/%s/unix_config/max_backlog_queue", appname );
char* keepalive = osrf_settings_host_value( "/apps/%s/keepalive", appname );
if( !keepalive )
else
maxc = atoi( max_children );
- if( !max_backlog_queue )
- osrfLogWarning( OSRF_LOG_MARK, "Max backlog queue size not defined, assuming %d", maxbq );
- else
- maxbq = atoi( max_backlog_queue );
-
free( keepalive );
free( max_req );
free( min_children );
free( max_children );
- free( max_backlog_queue );
/* --------------------------------------------------- */
char* resc = va_list_to_string( "%s_listener", appname );
prefork_simple forker;
- if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc, maxbq )) {
+ if( prefork_simple_init( &forker, osrfSystemGetTransportClient(), maxr, minc, maxc )) {
osrfLogError( OSRF_LOG_MARK,
"osrf_prefork_run() failed to create prefork_simple object" );
return -1;
before terminating.
@param min_children Minimum number of child processes to maintain.
@param max_children Maximum number of child processes to maintain.
- @param max_backlog_queue Maximum size of backlog queue.
@return 0 if successful, or 1 if not (due to invalid parameters).
*/
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
- int max_requests, int min_children, int max_children, int max_backlog_queue ) {
+ int max_requests, int min_children, int max_children ) {
if( min_children > max_children ) {
osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
prefork->max_requests = max_requests;
prefork->min_children = min_children;
prefork->max_children = max_children;
- prefork->max_backlog_queue = max_backlog_queue;
prefork->fd = 0;
prefork->data_to_child = 0;
prefork->data_to_parent = 0;
transport_message* cur_msg = NULL;
- // The backlog queue accumulates messages received while there
- // are not yet children available to process them. While the
- // transport client maintains its own queue of messages, sweeping
- // the transport client's queue in the backlog queue gives us the
- // ability to set a limit on the size of the backlog queue (and
- // then to drop messages once the backlog queue has filled up)
- transport_message* backlog_queue_head = NULL;
- transport_message* backlog_queue_tail = NULL;
- int backlog_queue_size = 0;
-
while( 1 ) {
if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
return;
}
- int received_from_network = 0;
- if ( backlog_queue_size == 0 ) {
- // Wait indefinitely for an input message
- osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
- cur_msg = client_recv( forker->connection, -1 );
- received_from_network = 1;
- } else {
- // We have queued messages, which means all of our drones
- // are occupied. See if any new messages are available on the
- // network while waiting up to 1 second to allow time for a drone
- // to become available to handle the next request in the queue.
- cur_msg = client_recv( forker->connection, 1 );
- if ( cur_msg != NULL )
- received_from_network = 1;
- }
-
- if (received_from_network) {
- if( cur_msg == NULL ) {
- // most likely a signal was received. clean up any recently
- // deceased children and try again.
- if(child_dead)
- reap_children(forker);
- continue;
- }
-
- if (cur_msg->error_type) {
- osrfLogInfo(OSRF_LOG_MARK,
- "Listener received an XMPP error message. "
- "Likely a bounced message. sender=%s", cur_msg->sender);
- if(child_dead)
- reap_children(forker);
- continue;
- }
+ // Wait indefinitely for an input message
+ osrfLogDebug( OSRF_LOG_MARK, "Forker going into wait for data..." );
+ cur_msg = client_recv( forker->connection, -1 );
- message_prepare_xml( cur_msg );
- const char* msg_data = cur_msg->msg_xml;
- if( ! msg_data || ! *msg_data ) {
- osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
- (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
- message_free( cur_msg );
- continue; // Message not usable; go on to the next one.
- }
+ if( cur_msg == NULL ) {
+ // most likely a signal was received. clean up any recently
+ // deceased children and try again.
+ if(child_dead)
+ reap_children(forker);
+ continue;
+ }
- // stick message onto queue
- cur_msg->next = NULL;
- if (backlog_queue_size == 0) {
- backlog_queue_head = cur_msg;
- backlog_queue_tail = cur_msg;
- } else {
- if (backlog_queue_size >= forker->max_backlog_queue) {
- osrfLogWarning ( OSRF_LOG_MARK, "Reached backlog queue limit of %d; dropping "
- "latest message",
- forker->max_backlog_queue );
- osrfMessage* err = osrf_message_init( STATUS, 1, 1 );
- osrf_message_set_status_info( err, "osrfMethodException",
- "Service unavailable: no available children and backlog queue at limit",
- OSRF_STATUS_SERVICEUNAVAILABLE );
- char *data = osrf_message_serialize( err );
- osrfMessageFree( err );
- transport_message* tresponse = message_init( data, "", cur_msg->thread, cur_msg->router_from, cur_msg->recipient );
- message_set_osrf_xid(tresponse, cur_msg->osrf_xid);
- free( data );
- transport_client* client = osrfSystemGetTransportClient();
- client_send_message( client, tresponse );
- message_free( tresponse );
- message_free(cur_msg);
- continue;
- }
- backlog_queue_tail->next = cur_msg;
- backlog_queue_tail = cur_msg;
- osrfLogWarning( OSRF_LOG_MARK, "Adding message to non-empty backlog queue." );
- }
- backlog_queue_size++;
- }
+ if (cur_msg->error_type) {
+ osrfLogInfo(OSRF_LOG_MARK,
+ "Listener received an XMPP error message. "
+ "Likely a bounced message. sender=%s", cur_msg->sender);
+ if(child_dead)
+ reap_children(forker);
+ continue;
+ }
- if (backlog_queue_size == 0) {
- // strictly speaking, this check may be redundant, but
- // from this point forward we can be sure that the
- // backlog queue has at least one message in it and
- // that if we can find a child to process it, we want to
- // process the head of that queue.
- continue;
+ message_prepare_xml( cur_msg );
+ const char* msg_data = cur_msg->msg_xml;
+ if( ! msg_data || ! *msg_data ) {
+ osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+ (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+ message_free( cur_msg );
+ continue; // Message not usable; go on to the next one.
}
- cur_msg = backlog_queue_head;
-
int honored = 0; /* will be set to true when we service the request */
int no_recheck = 0;
while( ! honored ) {
- if( !no_recheck ) {
- if(check_children( forker, 0 ) < 0) {
+ if( !no_recheck ) {
+ if(check_children( forker, 0 ) < 0) {
continue; // check failed, try again
- }
+ }
}
no_recheck = 0;
osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
cur_child->write_data_fd );
- const char* msg_data = cur_msg->msg_xml;
int written = write( cur_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
// This child appears to be dead or unusable. Discard it.
osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
new_child->write_data_fd, new_child->pid );
- const char* msg_data = cur_msg->msg_xml;
int written = write(
new_child->write_data_fd, msg_data, strlen( msg_data ) + 1 );
if( written < 0 ) {
}
}
- if( child_dead )
- reap_children( forker );
-
if( !honored ) {
- break;
+ osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting..." );
+ if( check_children( forker, 1 ) >= 0 ) {
+ // Tell the loop not to call check_children again, since we just successfully called it
+ no_recheck = 1;
+ }
}
+ if( child_dead )
+ reap_children( forker );
+
} // end while( ! honored )
- if ( honored ) {
- backlog_queue_head = cur_msg->next;
- backlog_queue_size--;
- cur_msg->next = NULL;
- message_free( cur_msg );
- }
+ message_free( cur_msg );
} /* end top level listen loop */
}