+/**
+ @file osrf_prefork.c
+ @brief Implementation of
+
+ Spawn a collection of child processes, replacing them as needed. Forward requests to them
+ and let the children do the work.
+
+ Each child processes some maximum number of requests before it terminates itself. When a
+ child dies, either deliberately or otherwise, we can spawn another one to replace it,
+ keeping the number of children within a predefined range.
+
+ Use a singly-linked circular list to keep track of the children, forwarding requests to them
+ in an approximately round-robin fashion.
+
+ For each child, set up two pipes:
+ - One for the parent to send requests to the child.
+ - One for the child to notify the parent that it is available for another request.
+
+ The message sent to the child is an XML stanza as received from Jabber.
+
+ When the child finishes processing the request, it writes the string "available" back
+ to the parent. Then the parent knows that it can send that child another request.
+*/
+
#include <signal.h>
#include <sys/types.h>
#include <sys/time.h>
#define ABS_MAX_CHILDREN 256
typedef struct {
- int max_requests;
- int min_children;
- int max_children;
+ int max_requests; /**< How many requests a child processes before terminating. */
+ int min_children; /**< Minimum number of children to maintain. */
+ int max_children; /**< Maximum number of children to maintain. */
int fd; /**< Unused. */
int data_to_child; /**< Unused. */
int data_to_parent; /**< Unused. */
- int current_num_children;
- int keepalive; /**< keepalive time for stateful sessions. */
- char* appname;
+ int current_num_children; /**< How many children are currently on the list. */
+ int keepalive; /**< Keepalive time for stateful sessions. */
+ char* appname; /**< Name of the application.
+ /** Points to a circular linked list of children */
struct prefork_child_struct* first_child;
- transport_client* connection;
+ transport_client* connection; /**< Connection to Jabber */
} prefork_simple;
struct prefork_child_struct {
- pid_t pid;
- int read_data_fd;
- int write_data_fd;
- int read_status_fd;
- int write_status_fd;
- int min_children;
- int available;
- int max_requests;
- char* appname;
- int keepalive;
- struct prefork_child_struct* next;
+ pid_t pid; /**< Process ID of the child */
+ int read_data_fd; /**< Child uses to read request */
+ int write_data_fd; /**< Parent uses to write request */
+ int read_status_fd; /**< Parent reads to see if child is available */
+ int write_status_fd; /**< Child uses to notify parent when it's available again */
+ int available; /**< Boolean; true when the child is between requests */
+ int max_requests; /**< How many requests a child can process before terminating */
+ char* appname; /**< Name of the application */
+ int keepalive; /**< Keepalive time for stateful sessions. */
+ struct prefork_child_struct* next; /**< Linkage pointer for linked list */
};
typedef struct prefork_child_struct prefork_child;
static void prefork_run(prefork_simple* forker);
static void add_prefork_child( prefork_simple* forker, prefork_child* child );
-//static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid );
-
static void del_prefork_child( prefork_simple* forker, pid_t pid );
static void check_children( prefork_simple* forker, int forever );
static void prefork_child_process_request(prefork_child*, char* data);
/* listens on the 'data_to_child' fd and wait for incoming data */
static void prefork_child_wait( prefork_child* child );
static void prefork_clear( prefork_simple* );
-static int prefork_child_free( prefork_child* );
+static void prefork_child_free( prefork_child* );
static void osrf_prefork_register_routers( const char* appname );
static void osrf_prefork_child_exit( prefork_child* );
-
-/* true if we just deleted a child. This will allow us to make sure we're
- not trying to use freed memory */
-static sig_atomic_t child_dead;
+/** Boolean. Set to true by a signal handler when it traps SIGCHLD. */
+static volatile sig_atomic_t child_dead;
static void sigchld_handler( int sig );
osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
prefork_run( &forker );
- osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
+ osrfLogWarning( OSRF_LOG_MARK, "prefork_run() returned - how??");
prefork_clear( &forker );
return 0;
}
osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
// Create the registration message, and send it
- transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
+ transport_message* msg = message_init( "registering", NULL, NULL, jid, NULL );
message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
client_send_message( client, msg );
osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
osrfSystemInitCache();
- char* resc = va_list_to_string("%s_drone",child->appname);
+ char* resc = va_list_to_string("%s_drone", child->appname);
/* if we're a source-client, tell the logger now that we're a new process*/
char* isclient = osrfConfigGetValue(NULL, "/client");
}
/* see if the client disconnected from us */
- if(session->state != OSRF_SESSION_CONNECTED) break;
+ if(session->state != OSRF_SESSION_CONNECTED)
+ break;
/* if no data was reveived within the timeout interval */
if( !recvd && (end - start) >= keepalive ) {
}
+/**
+ @brief Signal handler for SIGCHLD: note that a child process has terminated.
+ @param sig The value of the trapped signal; always SIGCHLD.
+
+ Set a boolean to be checked later.
+*/
static void sigchld_handler( int sig ) {
signal(SIGCHLD, sigchld_handler);
child_dead = 1;
}
+/**
+ @brief Replenish the collection of child processes, after one has terminated.
+ @param forker Pointer to the prefork_simple that manages the child processes.
+
+ This function is called when we notice (via a signal handler) that a child
+ process has died.
+
+ Spawn a new child process to replace each of the terminated ones.
+*/
void reap_children( prefork_simple* forker ) {
pid_t child_pid;
- int status;
- while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
+ // Reset our boolean so that we can detect any further terminations.
+ child_dead = 0;
+
+ // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
+ // immediately if there are no waitable children, instead of waiting for more to die.
+ // Ignore the return code of the child. We don't do an autopsy.
+ while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
del_prefork_child( forker, child_pid );
- /* replenish */
+ /* Spawn more children as needed to maintain a minimum brood. */
while( forker->current_num_children < forker->min_children )
launch_child( forker );
-
- child_dead = 0;
}
static void prefork_run(prefork_simple* forker) {
message_prepare_xml( cur_msg );
char* data = cur_msg->msg_xml;
- if( ! data || strlen(data) < 1 ) break;
+ if( ! data || strlen(data) < 1 )
+ break;
cur_child->available = 0;
osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
if( child_dead )
reap_children(forker);
-
- //fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() );
-
} // honored?
message_free( cur_msg );
- } /* top level listen loop */
+ } /* end top level listen loop */
}
int max_fd = 0;
int n;
-
if( child_dead )
reap_children(forker);
/* now suck off the data */
char buf[64];
- osrf_clearbuf( buf, sizeof(buf) );
if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
osrfLogWarning( OSRF_LOG_MARK,
"Read error after select in child status read with errno %d", errno);
int i,n;
growing_buffer* gbuf = buffer_init( READ_BUFSIZE );
char buf[READ_BUFSIZE];
- osrf_clearbuf( buf, sizeof(buf) );
for( i = 0; i < child->max_requests; i++ ) {
n = -1;
- int gotdata = 0;
+ int gotdata = 0; // boolean; set to true if we get data
clr_fl(child->read_data_fd, O_NONBLOCK );
while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) {
buf[n] = '\0';
osrfLogDebug(OSRF_LOG_MARK, "Prefork child read %d bytes of data", n);
- if(!gotdata)
+ if(!gotdata) {
set_fl(child->read_data_fd, O_NONBLOCK );
+ gotdata = 1;
+ }
buffer_add( gbuf, buf );
- osrf_clearbuf( buf, sizeof(buf) );
- gotdata = 1;
}
if( errno == EAGAIN ) n = 0;
osrf_prefork_child_exit(child); /* just to be sure */
}
-
+/**
+ @brief Add a prefork_child to the list.
+ @param forker Pointer to the prefork_simple that owns the list.
+ @param child Pointer to the prefork_child to be added.
+*/
static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
if( forker->first_child == NULL ) {
+ // Simplest case: list is initially empty.
forker->first_child = child;
child->next = child;
return;
}
- /* we put the child in as the last because, regardless,
- we have to do the DLL splice dance, and this is the
- simplest way */
-
+ // Reposition forker->first_child to the last node by walking around the circle.
prefork_child* start_child = forker->first_child;
while(1) {
if( forker->first_child->next == start_child )
forker->first_child = forker->first_child->next;
}
- /* here we know that forker->first_child is the last element
- in the list and start_child is the first. Insert the
- new child between them*/
-
+ // Insert the new child after forker->first_child.
forker->first_child->next = child;
child->next = start_child;
return;
-}
-//static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
-//
-// if( forker->first_child == NULL ) { return NULL; }
-// prefork_child* start_child = forker->first_child;
-// do {
-// if( forker->first_child->pid == pid )
-// return forker->first_child;
-// } while( (forker->first_child = forker->first_child->next) != start_child );
-//
-// return NULL;
-//}
+ // What had been the last node in the list is now the first node,
+ // and the new node is second.
+}
+/**
+ @brief Remove a prefork_child from the child list.
+ @param forker Pointer to the prefork_simple that owns the child.
+ @param pid Process ID of the child to be removed.
+ Besides removing the node from the list, we also close its file descriptors.
+*/
static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
if( forker->first_child == NULL ) { return; }
if( start_child == start_child->next ) {
if( start_child->pid == pid ) {
forker->first_child = NULL;
-
- close( start_child->read_data_fd );
- close( start_child->write_data_fd );
- close( start_child->read_status_fd );
- close( start_child->write_status_fd );
-
prefork_child_free( start_child );
}
return;
/* now cur_child == start_child */
prev_child->next = cur_child->next;
forker->first_child = prev_child;
-
- close( cur_child->read_data_fd );
- close( cur_child->write_data_fd );
- close( cur_child->read_status_fd );
- close( cur_child->write_status_fd );
-
prefork_child_free( cur_child );
return;
}
if( cur_child->pid == pid ) {
prev_child->next = cur_child->next;
-
- close( cur_child->read_data_fd );
- close( cur_child->write_data_fd );
- close( cur_child->read_status_fd );
- close( cur_child->write_status_fd );
-
prefork_child_free( cur_child );
return;
}
} while(cur_child != start_child);
}
-
-
-
+/**
+ @brief Create and initialize a prefork_child.
+ @param max_requests How many requests to service before terminating.
+ @param read_data_fd Used by child to read request from parent.
+ @param write_data_fd Used by parent to write request to child.
+ @param read_status_fd Used by parent to read status from child.
+ @param write_status_fd Used by child to write status to parent.
+ @return Pointer to the newly created prefork_child.
+
+ The calling code is responsible for freeing the prefork_child by calling
+ prefork_child_free().
+*/
static prefork_child* prefork_child_init(
int max_requests, int read_data_fd, int write_data_fd,
int read_status_fd, int write_status_fd ) {
child->write_data_fd = write_data_fd;
child->read_status_fd = read_status_fd;
child->write_status_fd = write_status_fd;
- child->min_children = 0;
child->available = 1;
child->appname = NULL;
child->keepalive = 0;
}
+/**
+ @brief Terminate all child processes and clear out a prefork_simple.
+ @param prefork Pointer to the prefork_simple to be cleared out.
+
+ We do not deallocate the prefork_simple itself, just its contents.
+*/
static void prefork_clear( prefork_simple* prefork ) {
+ // Kill all the child processes with a single call, not by killing each one separately.
+ // Implication: we can't have more than one prefork_simple outstanding, because
+ // destroying one would kill the children of both.
while( prefork->first_child != NULL ) {
- osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
+ osrfLogInfo( OSRF_LOG_MARK, "Killing child processes ..." );
kill( 0, SIGKILL );
- sleep(1);
}
- client_free(prefork->connection);
+ // Bury the children so that they won't be zombies. WNOHANG means that waitpid() returns
+ // immediately if there are no waitable children, instead of waiting for more to die.
+ // Ignore the return code of the child. We don't do an autopsy.
+ pid_t child_pid;
+ while( (child_pid = waitpid(-1, NULL, WNOHANG)) > 0)
+ del_prefork_child( prefork, child_pid );
+
+ client_free(prefork->connection); // Close the Jabber connection
free(prefork->appname);
}
-static int prefork_child_free( prefork_child* child ) {
+/**
+ @brief Destroy and deallocate a prefork_child.
+ @param child Pointer to the prefork_child to be destroyed.
+*/
+static void prefork_child_free( prefork_child* child ) {
free(child->appname);
- close(child->read_data_fd);
- close(child->write_status_fd);
+ close( child->read_data_fd );
+ close( child->write_data_fd );
+ close( child->read_status_fd );
+ close( child->write_status_fd );
free( child );
- return 1;
}