/**
@file osrf_prefork.c
- @brief Implementation of
+ @brief Spawn and manage a collection of child process to service requests.
Spawn a collection of child processes, replacing them as needed. Forward requests to them
and let the children do the work.
child dies, either deliberately or otherwise, we can spawn another one to replace it,
keeping the number of children within a predefined range.
- Use a doubly-linked circular list to keep track of the children, forwarding requests to them
- in an approximately round-robin fashion.
+ Use a doubly-linked circular list to keep track of the children to whom we have forwarded
+ a request, and who are still working on them. Use a separate linear linked list to keep
+ track of children that are currently idle. Move them back and forth as needed.
For each child, set up two pipes:
- One for the parent to send requests to the child.
- One for the child to notify the parent that it is available for another request.
- The message sent to the child is an XML stanza as received from Jabber.
+ The message sent to the child represents an XML stanza as received from Jabber.
When the child finishes processing the request, it writes the string "available" back
to the parent. Then the parent knows that it can send that child another request.
#include <stdio.h>
#include <string.h>
#include <sys/select.h>
+#include <signal.h>
#include <sys/wait.h>
#include "opensrf/utils.h"
int current_num_children; /**< How many children are currently on the list. */
int keepalive; /**< Keepalive time for stateful sessions. */
char* appname; /**< Name of the application. */
- /** Points to a circular linked list of children */
+ /** Points to a circular linked list of children. */
struct prefork_child_struct* first_child;
+ /** List of of child processes that aren't doing anything at the moment and are
+ therefore available to service a new request. */
+ struct prefork_child_struct* idle_list;
/** List of allocated but unused prefork_children, available for reuse. Each one is just
raw memory, apart from the "next" pointer used to stitch them together. In particular,
there is no child process for them, and the file descriptors are not open. */
struct prefork_child_struct* free_list;
- transport_client* connection; /**< Connection to Jabber */
+ transport_client* connection; /**< Connection to Jabber. */
} prefork_simple;
struct prefork_child_struct {
- pid_t pid; /**< Process ID of the child */
- int read_data_fd; /**< Child uses to read request */
- int write_data_fd; /**< Parent uses to write request */
- int read_status_fd; /**< Parent reads to see if child is available */
- int write_status_fd; /**< Child uses to notify parent when it's available again */
- int available; /**< Boolean; true when the child is between requests */
- int max_requests; /**< How many requests a child can process before terminating */
- const char* appname; /**< Name of the application */
+ pid_t pid; /**< Process ID of the child. */
+ int read_data_fd; /**< Child uses to read request. */
+ int write_data_fd; /**< Parent uses to write request. */
+ int read_status_fd; /**< Parent reads to see if child is available. */
+ int write_status_fd; /**< Child uses to notify parent when it's available again. */
+ int max_requests; /**< How many requests a child can process before terminating. */
+ const char* appname; /**< Name of the application. */
int keepalive; /**< Keepalive time for stateful sessions. */
- struct prefork_child_struct* next; /**< Linkage pointer for linked list */
- struct prefork_child_struct* prev; /**< Linkage pointer for linked list */
+ struct prefork_child_struct* next; /**< Linkage pointer for linked list. */
+ struct prefork_child_struct* prev; /**< Linkage pointer for linked list. */
};
typedef struct prefork_child_struct prefork_child;
+/** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
+static volatile sig_atomic_t child_dead;
+
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
int max_requests, int min_children, int max_children );
static prefork_child* launch_child( prefork_simple* forker );
static void osrf_prefork_register_routers( const char* appname );
static void osrf_prefork_child_exit( prefork_child* );
-/** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
-static volatile sig_atomic_t child_dead;
-
static void sigchld_handler( int sig );
-int osrf_prefork_run(const char* appname) {
+/**
+ @brief Spawn and manage a collection of drone processes for servicing requests.
+ @param appname Name of the application.
+ @return 0 if successful, or -1 if error.
+*/
+int osrf_prefork_run( const char* appname ) {
if(!appname) {
osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run requires an appname to run!");
char* resc = va_list_to_string("%s_listener", appname);
+ // Make sure that we haven't already booted
if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
free(resc);
return -1;
}
- // Finish initializing the prefork_simple
+ // Finish initializing the prefork_simple.
forker.appname = strdup(appname);
forker.keepalive = kalive;
- // Spawn the children
+ // Spawn the children; put them in the idle list.
prefork_launch_children( &forker );
- // Tell the router that you're open for business
+ // Tell the router that you're open for business.
osrf_prefork_register_routers(appname);
// Sit back and let the requests roll in
osrfLogSetIsClient(1);
free(isclient);
- /* we want to remove traces of our parents socket connection
+ /* we want to remove traces of our parent's socket connection
* so we can have our own */
osrfSystemIgnoreTransportClient();
- if(!osrfSystemBootstrapClientResc( NULL, NULL, resc)) {
+ if(!osrfSystemBootstrapClientResc( NULL, NULL, resc )) {
osrfLogError( OSRF_LOG_MARK, "Unable to bootstrap client for osrf_prefork_run()");
free(resc);
return -1;
return;
}
-
/**
@brief Partially initialize a prefork_simple provided by the caller.
@param prefork Pointer to a a raw prefork_simple to be initialized.
@param client Pointer to a transport_client (connection to Jabber).
- @param max_requests
+ @param max_requests The maximum number of requests that a child process may service
+ before terminating.
@param min_children Minimum number of child processes to maintain.
@param max_children Maximum number of child processes to maintain.
@return 0 if successful, or 1 if not (due to invalid parameters).
*/
static int prefork_simple_init( prefork_simple* prefork, transport_client* client,
- int max_requests, int min_children, int max_children ) {
+ int max_requests, int min_children, int max_children ) {
if( min_children > max_children ) {
osrfLogError( OSRF_LOG_MARK, "min_children (%d) is greater "
"min_children=%d, max_children=%d", max_requests, min_children, max_children );
/* flesh out the struct */
- //prefork_simple* prefork = safe_malloc(sizeof(prefork_simple));
prefork->max_requests = max_requests;
prefork->min_children = min_children;
prefork->max_children = max_children;
prefork->keepalive = 0;
prefork->appname = NULL;
prefork->first_child = NULL;
+ prefork->idle_list = NULL;
prefork->free_list = NULL;
prefork->connection = client;
return 0;
}
+/**
+ @brief Spawn a new child process.
+ @param forker Pointer to the prefork_simple that will own the process.
+ @return Pointer to the new prefork_child, or not at all.
+
+ Spawn a new child process. Create a prefork_child for it and put it in the idle list.
+
+ After forking, the parent returns a pointer to the new prefork_child. The child
+ services its quota of requests and then terminates without returning.
+*/
static prefork_child* launch_child( prefork_simple* forker ) {
pid_t pid;
int data_fd[2];
int status_fd[2];
- /* Set up the data pipes and add the child struct to the parent */
+ // Set up the data and status pipes
if( pipe(data_fd) < 0 ) { /* build the data pipe*/
osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
return NULL;
if( pipe(status_fd) < 0 ) {/* build the status pipe */
osrfLogError( OSRF_LOG_MARK, "Pipe making error" );
+ close( data_fd[1] );
+ close( data_fd[0] );
return NULL;
}
- osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
+ osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
+
+ // Create and initialize a prefork_child for the new process
prefork_child* child = prefork_child_init( forker, data_fd[0],
data_fd[1], status_fd[0], status_fd[1] );
- add_prefork_child( forker, child );
-
if( (pid=fork()) < 0 ) {
- osrfLogError( OSRF_LOG_MARK, "Forking Error" );
+ osrfLogError( OSRF_LOG_MARK, "Forking Error" );
+ prefork_child_free( forker, child );
return NULL;
}
+ // Add the new child to the head of the idle list
+ child->next = forker->idle_list;
+ forker->idle_list = child;
+
if( pid > 0 ) { /* parent */
signal(SIGCHLD, sigchld_handler);
(forker->current_num_children)++;
child->pid = pid;
- osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
+ osrfLogDebug( OSRF_LOG_MARK, "Parent launched %d", pid );
/* *no* child pipe FD's can be closed or the parent will re-use fd's that
the children are currently using */
return child;
else { /* child */
osrfLogInternal( OSRF_LOG_MARK,
- "I am new child with read_data_fd = %d and write_status_fd = %d",
+ "I am new child with read_data_fd = %d and write_status_fd = %d",
child->read_data_fd, child->write_status_fd );
child->pid = getpid();
osrf_prefork_child_exit(child);
}
- prefork_child_wait( child );
- osrf_prefork_child_exit(child); /* just to be sure */
+ prefork_child_wait( child ); // Should exit without returning
+ osrf_prefork_child_exit( child ); // Just to be sure
+ return NULL; // Unreachable, but it keeps the compiler happy
}
- return NULL;
}
static void osrf_prefork_child_exit(prefork_child* child) {
launch_child( forker );
}
-static void prefork_run(prefork_simple* forker) {
+/**
+ @brief Read transport_messages and dispatch them to child processes for servicing.
+ @param forker Pointer to the prefork_simple that manages the child processes.
- if( forker->first_child == NULL )
- return;
+ This is the main loop of the parent process, and once entered, does not exit.
- transport_message* cur_msg = NULL;
+ For each usable transport_message received: look for an idle child to service it. If
+ no idle children are available, either spawn a new one or, if we've already spawned the
+ maximum number of children, wait for one to become available. Once a child is available
+ by whatever means, write an XML version of the input message, to a pipe designated for
+ use by that child.
+*/
+static void prefork_run( prefork_simple* forker ) {
+
+ if( NULL == forker->idle_list )
+ return; // No available children, and we haven't even started yet
+ transport_message* cur_msg = NULL;
while(1) {
- if( forker->first_child == NULL ) {/* no more children */
+ if( forker->first_child == NULL && forker->idle_list == NULL ) {/* no more children */
osrfLogWarning( OSRF_LOG_MARK, "No more children..." );
return;
}
cur_msg = client_recv( forker->connection, -1 );
if( cur_msg == NULL )
- continue; // Error? Interrupted by a signal?
+ continue; // Error? Interrupted by a signal? Try again...
+
+ message_prepare_xml( cur_msg );
+ const char* msg_data = cur_msg->msg_xml;
+ if( ! msg_data || ! *msg_data ) {
+ osrfLogWarning( OSRF_LOG_MARK, "Received % message from %s, thread %",
+ (msg_data ? "empty" : "NULL"), cur_msg->sender, cur_msg->thread );
+ message_free( cur_msg );
+ continue; // Message not usable; go on to the next one.
+ }
int honored = 0; /* will be set to true when we service the request */
int no_recheck = 0;
no_recheck = 0;
osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
- int k;
- prefork_child* cur_child = forker->first_child;
- /* Look for an available child */
- for( k = 0; k < forker->current_num_children; k++ ) {
+ prefork_child* cur_child = NULL;
+
+ // Look for an available child in the idle list. Since the idle list operates
+ // as a stack, the child we get is the one that was most recently active, or
+ // most recently spawned. That means it's the one most likely still to be in
+ // physical memory, and the one least likely to have to be swapped in.
+ while( forker->idle_list ) {
+
+ // Grab the prefork_child at the head of the idle list
+ cur_child = forker->idle_list;
+ forker->idle_list = cur_child->next;
osrfLogInternal( OSRF_LOG_MARK,
"Searching for available child. cur_child->pid = %d", cur_child->pid );
- osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d",
- forker->current_num_children, k);
-
- if( cur_child->available ) {
- osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
-
- message_prepare_xml( cur_msg );
- char* data = cur_msg->msg_xml;
- if( ! data || strlen(data) < 1 )
- break;
-
- cur_child->available = 0;
- osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
- cur_child->write_data_fd );
-
- int written = 0;
- if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
- osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
- cur_child = cur_child->next;
- continue;
- }
+ osrfLogInternal( OSRF_LOG_MARK, "Current num children %d",
+ forker->current_num_children );
+
+ osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
+ osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
+ cur_child->write_data_fd );
+
+ int written = write(cur_child->write_data_fd, msg_data, strlen(msg_data) + 1);
+ if( written < 0 ) {
+ // This child appears to be dead or unusable. Discard it.
+ osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
+ errno, strerror( errno ) );
+ kill( cur_child->pid, SIGKILL );
+ del_prefork_child( forker, cur_child->pid );
+ continue;
+ }
- forker->first_child = cur_child->next;
- honored = 1;
- break;
- } else
- cur_child = cur_child->next;
+ add_prefork_child( forker, cur_child ); // Add it to active list
+ honored = 1;
+ break;
}
/* if none available, add a new child if we can */
prefork_child* new_child = launch_child( forker );
if( new_child ) {
- message_prepare_xml( cur_msg );
- char* data = cur_msg->msg_xml;
-
- if( data ) {
- int len = strlen(data);
-
- if( len > 0 ) {
- new_child->available = 0;
- osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
- new_child->write_data_fd, new_child->pid );
-
- if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
- forker->first_child = new_child->next;
- honored = 1;
- }
- }
+ osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
+ new_child->write_data_fd, new_child->pid );
+
+ if(write(new_child->write_data_fd, msg_data, strlen(msg_data) + 1) >= 0 ) {
+ forker->first_child = new_child->next;
+ add_prefork_child( forker, new_child );
+ honored = 1;
+ } else {
+ // This child appears to be dead or unusable. Discard it.
+ osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
+ errno, strerror( errno ) );
+ kill( cur_child->pid, SIGKILL );
+ osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d: %s",
+ errno, strerror( errno ) );
+ del_prefork_child( forker, cur_child->pid );
}
}
}
if( !honored ) {
- osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
-
- check_children( forker, 1 ); /* non-poll version */
- /* tell the loop not to call check_children again, since we're calling it now */
+ osrfLogWarning( OSRF_LOG_MARK, "No children available, waiting...");
+ check_children( forker, 1 );
+ // Tell the loop not to call check_children again, since we're calling it now
no_recheck = 1;
}
if( child_dead )
reap_children(forker);
- } // honored?
+ } // end while( ! honored )
message_free( cur_msg );
}
-/** XXX Add a flag which tells select() to wait forever on children
+/* XXX Add a flag which tells select() to wait forever on children
in the best case, this will be faster than calling usleep(x), and
in the worst case it won't be slower and will do less logging...
*/
static void check_children( prefork_simple* forker, int forever ) {
- //check_begin:
+ if( child_dead )
+ reap_children(forker);
+
+ if( NULL == forker->first_child ) {
+ // If forever is true, then we're here because we've run out of idle
+ // processes, so there should be some active ones around. Otherwise
+ // the children may all be idle, and that's okay.
+ if( forever )
+ osrfLogError( OSRF_LOG_MARK, "No active child processes to check" );
+ return;
+ }
int select_ret;
fd_set read_set;
int n;
if( child_dead )
- reap_children(forker);
+ reap_children( forker );
+ // Prepare to select() on pipes from all the active children
prefork_child* cur_child = forker->first_child;
-
- int i;
- for( i = 0; i!= forker->current_num_children; i++ ) {
-
+ do {
if( cur_child->read_status_fd > max_fd )
max_fd = cur_child->read_status_fd;
FD_SET( cur_child->read_status_fd, &read_set );
cur_child = cur_child->next;
- }
+ } while( cur_child != forker->first_child );
FD_CLR(0,&read_set); /* just to be sure */
"We have no children available - waiting for one to show up...");
if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
- osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
+ osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
+ errno, strerror( errno ) );
}
osrfLogInfo(OSRF_LOG_MARK,
"select() completed after waiting on children to become available");
tv.tv_usec = 0;
if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
- osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
+ osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children: %s",
+ errno, strerror( errno ) );
}
}
if( select_ret == 0 )
return;
- /* see if one of a child has told us it's done */
+ /* see if any children have told us they're done */
cur_child = forker->first_child;
int j;
int num_handled = 0;
for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) {
if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
- //printf( "Server received status from a child %d\n", cur_child->pid );
osrfLogDebug( OSRF_LOG_MARK,
"Server received status from a child %d", cur_child->pid );
char buf[64];
if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
osrfLogWarning( OSRF_LOG_MARK,
- "Read error after select in child status read with errno %d", errno);
+ "Read error after select in child status read with errno %d: %s",
+ errno, strerror( errno ) );
}
else {
buf[n] = '\0';
osrfLogDebug( OSRF_LOG_MARK, "Read %d bytes from status buffer: %s", n, buf );
}
- cur_child->available = 1;
+ // Remove the child from the active list
+ if( forker->first_child == cur_child ) {
+ if( cur_child->next == cur_child )
+ forker->first_child = NULL; // only child in the active list
+ else {
+ forker->first_child = cur_child->next;
+ }
+ cur_child->next->prev = cur_child->prev;
+ cur_child->prev->next = cur_child->next;
+ }
+
+ // Add it to the idle list
+ cur_child->prev = NULL;
+ cur_child->next = forker->idle_list;
+ forker->idle_list = cur_child;
}
cur_child = cur_child->next;
}
-
}
osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
child->max_requests, i, (long) getpid() );
- osrf_prefork_child_exit(child); /* just to be sure */
+ osrf_prefork_child_exit(child);
}
/**
- @brief Add a prefork_child to the end of the list.
+ @brief Add a prefork_child to the end of the active list.
@param forker Pointer to the prefork_simple that owns the list.
@param child Pointer to the prefork_child to be added.
*/
}
/**
- @brief Remove a prefork_child from the child list.
+ @brief Remove a prefork_child, representing a terminated child, from the active list.
@param forker Pointer to the prefork_simple that owns the child.
@param pid Process ID of the child to be removed.
- Besides removing the node from the list, we also close its file descriptors.
+ Remove the node from the active list, close its file descriptors, and put it in the
+ free list for potential reuse.
*/
static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
else {
if( forker->first_child == cur_child )
forker->first_child = cur_child->next; // Reseat forker->first_child
-
+
// Stitch the nodes on either side together
cur_child->prev->next = cur_child->next;
cur_child->next->prev = cur_child->prev;
}
--forker->current_num_children;
-
+
//Destroy the node
prefork_child_free( forker, cur_child );
-
- } // else we didn't find a matching node; bail out
+
+ } else {
+ // Maybe it's in the idle list. This can happen if, for example,
+ // a child is killed by a signal while it's between requests.
+
+ prefork_child* prev = NULL;
+ cur_child = forker->idle_list;
+ while( cur_child && cur_child->pid != pid ) {
+ prev = cur_child;
+ cur_child = cur_child->next;
+ }
+
+ if( cur_child ) {
+ // Detach from the list
+ if( prev )
+ prev->next = cur_child->next;
+ else
+ forker->idle_list = cur_child->next;
+
+ --forker->current_num_children;
+
+ //Destroy the node
+ prefork_child_free( forker, cur_child );
+ } // else we can't find it, so do nothing.
+ }
}
/**
child = forker->free_list;
forker->free_list = child->next;
} else
- child = (prefork_child*) safe_malloc(sizeof(prefork_child));
+ child = safe_malloc(sizeof(prefork_child));
child->pid = 0;
child->read_data_fd = read_data_fd;
child->write_data_fd = write_data_fd;
child->read_status_fd = read_status_fd;
child->write_status_fd = write_status_fd;
- child->available = 1;
child->max_requests = forker->max_requests;
child->appname = forker->appname; // We don't make a separate copy
child->keepalive = forker->keepalive;
return child;
}
-
/**
@brief Terminate all child processes and clear out a prefork_simple.
@param prefork Pointer to the prefork_simple to be cleared out.
*/
static void prefork_clear( prefork_simple* prefork ) {
- // Kill all the child processes with a single call, not by killing each one separately.
- // Implication: we can't have more than one prefork_simple outstanding, because
- // destroying one would kill the children of all.
- while( prefork->first_child != NULL ) {
- osrfLogInfo( OSRF_LOG_MARK, "Killing child processes ..." );
- kill( 0, SIGKILL );
+ // Kill all the active children, and move their prefork_child nodes to the free list.
+ while( prefork->first_child ) {
+ kill( prefork->first_child->pid, SIGKILL );
+ del_prefork_child( prefork, prefork->first_child->pid );
}
- // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
- // immediately if there are no waitable children, instead of waiting for more to die.
- // Ignore the return code of the child. We don't do an autopsy.
- pid_t child_pid;
- while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
- del_prefork_child( prefork, child_pid );
+ // Kill all the idle prefork children, close their file
+ // descriptors, and move them to the free list.
+ prefork_child* child = prefork->idle_list;
+ prefork->idle_list = NULL;
+ while( child ) {
+ prefork_child* temp = child->next;
+ kill( child->pid, SIGKILL );
+ prefork_child_free( prefork, child );
+ child = temp;
+ }
+ prefork->current_num_children = 0;
- // Close the Jabber connection
- client_free(prefork->connection);
-
// Physically free the free list of prefork_children.
- prefork_child* child = prefork->first_child;
+ child = prefork->free_list;
+ prefork->free_list = NULL;
while( child ) {
prefork_child* temp = child->next;
free( child );
child = temp;
}
+ // Close the Jabber connection
+ client_free( prefork->connection );
+ prefork->connection = NULL;
+
+ // After giving the child processes a second to terminate, wait on them so that they
+ // don't become zombies. We don't wait indefinitely, so it's possible that some
+ // children will survive a bit longer.
+ sleep( 1 );
+ while( (waitpid(-1, NULL, WNOHANG)) > 0) {
+ ; // Another one died...go around again
+ }
+
free(prefork->appname);
+ prefork->appname = NULL;
}
/**