-#include <opensrf/osrf_prefork.h>
-#include <opensrf/osrf_app_session.h>
-#include <opensrf/osrf_application.h>
#include <signal.h>
+#include "opensrf/osrf_prefork.h"
+#include "opensrf/osrf_app_session.h"
+#include "opensrf/osrf_application.h"
-//#define READ_BUFSIZE 4096
#define READ_BUFSIZE 1024
-//#define MAX_BUFSIZE 10485760 /* 10M enough? ;) */
#define ABS_MAX_CHILDREN 256
-struct prefork_simple_struct {
+typedef struct {
int max_requests;
int min_children;
int max_children;
- int fd;
- int data_to_child;
- int data_to_parent;
+ int fd; /**< Unused. */
+ int data_to_child; /**< Unused. */
+ int data_to_parent; /**< Unused. */
int current_num_children;
- int keepalive; /* keepalive time for stateful sessions */
+ int keepalive; /**< keepalive time for stateful sessions. */
char* appname;
struct prefork_child_struct* first_child;
transport_client* connection;
-};
-typedef struct prefork_simple_struct prefork_simple;
+} prefork_simple;
struct prefork_child_struct {
pid_t pid;
set_proc_title( "OpenSRF Listener [%s]", appname );
- int maxr = 1000;
+ int maxr = 1000;
int maxc = 10;
int minc = 3;
- int kalive = 5;
+ int kalive = 5;
osrfLogInfo( OSRF_LOG_MARK, "Loading config in osrf_forker for app %s", appname);
- char* max_req = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
+ char* max_req = osrf_settings_host_value("/apps/%s/unix_config/max_requests", appname);
char* min_children = osrf_settings_host_value("/apps/%s/unix_config/min_children", appname);
char* max_children = osrf_settings_host_value("/apps/%s/unix_config/max_children", appname);
- char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
+ char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname);
if(!keepalive) osrfLogWarning( OSRF_LOG_MARK, "Keepalive is not defined, assuming %d", kalive);
else kalive = atoi(keepalive);
if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming %d", maxr);
else maxr = atoi(max_req);
- if(!min_children) osrfLogWarning( OSRF_LOG_MARK, "Min children not defined, assuming %d", minc);
+ if(!min_children) osrfLogWarning( OSRF_LOG_MARK,
+ "Min children not defined, assuming %d", minc);
else minc = atoi(min_children);
- if(!max_children) osrfLogWarning( OSRF_LOG_MARK, "Max children not defined, assuming %d", maxc);
+ if(!max_children) osrfLogWarning( OSRF_LOG_MARK,
+ "Max children not defined, assuming %d", maxc);
else maxc = atoi(max_children);
- free(keepalive);
+ free(keepalive);
free(max_req);
free(min_children);
free(max_children);
return -1;
}
- forker->appname = strdup(appname);
- forker->keepalive = kalive;
+ forker->appname = strdup(appname);
+ forker->keepalive = kalive;
prefork_launch_children(forker);
osrf_prefork_register_routers(appname);
-
+
osrfLogInfo( OSRF_LOG_MARK, "Launching osrf_forker for app %s", appname);
prefork_run(forker);
-
+
osrfLogWarning( OSRF_LOG_MARK, "prefork_run() retuned - how??");
prefork_free(forker);
return 0;
/* sends the "register" packet to the specified router */
static void osrf_prefork_send_router_registration(const char* appname, const char* routerName, const char* routerDomain) {
transport_client* client = osrfSystemGetTransportClient();
- char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
- osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
- transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
- message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
- client_send_message( client, msg );
- message_free( msg );
- free(jid);
+ char* jid = va_list_to_string( "%s@%s/router", routerName, routerDomain );
+ osrfLogInfo( OSRF_LOG_MARK, "%s registering with router %s", appname, jid );
+ transport_message* msg = message_init("registering", NULL, NULL, jid, NULL );
+ message_set_router_info( msg, NULL, NULL, appname, "register", 0 );
+ client_send_message( client, msg );
+ message_free( msg );
+ free(jid);
}
/** parses a single "complex" router configuration chunk */
const char* routerName = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "name"));
const char* domain = jsonObjectGetString(jsonObjectGetKeyConst(routerChunk, "domain"));
const jsonObject* services = jsonObjectGetKeyConst(routerChunk, "services");
- osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s", routerName, domain);
+ osrfLogDebug(OSRF_LOG_MARK, "found router config with domain %s and name %s",
+ routerName, domain);
if( services && services->type == JSON_HASH ) {
osrfLogDebug(OSRF_LOG_MARK, "investigating router information...");
static void osrf_prefork_register_routers( const char* appname ) {
- jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
-
- int i;
- for(i = 0; i < routerInfo->size; i++) {
- jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
+ jsonObject* routerInfo = osrfConfigGetValueObject(NULL, "/routers/router");
- if(routerChunk->type == JSON_STRING) {
- /* this accomodates simple router configs */
- char* routerName = osrfConfigGetValue( NULL, "/router_name" );
- char* domain = osrfConfigGetValue(NULL, "/routers/router");
- osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s", routerName);
- osrf_prefork_send_router_registration(appname, routerName, domain);
-
- } else {
- osrf_prefork_parse_router_chunk(appname, routerChunk);
- }
- }
+ int i;
+ for(i = 0; i < routerInfo->size; i++) {
+ jsonObject* routerChunk = jsonObjectGetIndex(routerInfo, i);
+
+ if(routerChunk->type == JSON_STRING) {
+ /* this accomodates simple router configs */
+ char* routerName = osrfConfigGetValue( NULL, "/router_name" );
+ char* domain = osrfConfigGetValue(NULL, "/routers/router");
+ osrfLogDebug(OSRF_LOG_MARK, "found simple router settings with router name %s",
+ routerName);
+ osrf_prefork_send_router_registration(appname, routerName, domain);
+
+ } else {
+ osrf_prefork_parse_router_chunk(appname, routerChunk);
+ }
+ }
}
static int prefork_child_init_hook(prefork_child* child) {
if(!child) return -1;
osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
- osrfSystemInitCache();
+ osrfSystemInitCache();
char* resc = va_list_to_string("%s_drone",child->appname);
- /* if we're a source-client, tell the logger now that we're a new process*/
- char* isclient = osrfConfigGetValue(NULL, "/client");
- if( isclient && !strcasecmp(isclient,"true") )
- osrfLogSetIsClient(1);
- free(isclient);
-
+ /* if we're a source-client, tell the logger now that we're a new process*/
+ char* isclient = osrfConfigGetValue(NULL, "/client");
+ if( isclient && !strcasecmp(isclient,"true") )
+ osrfLogSetIsClient(1);
+ free(isclient);
- /* we want to remove traces of our parents socket connection
- * so we can have our own */
+ /* we want to remove traces of our parents socket connection
+ * so we can have our own */
osrfSystemIgnoreTransportClient();
if(!osrfSystemBootstrapClientResc( NULL, NULL, resc)) {
osrfSystemIgnoreTransportClient();
osrfLogWarning(OSRF_LOG_MARK, "Reconnecting child to opensrf after disconnect...");
if(!osrf_system_bootstrap_client(NULL, NULL)) {
- osrfLogError( OSRF_LOG_MARK,
+ osrfLogError( OSRF_LOG_MARK,
"Unable to bootstrap client in prefork_child_process_request()");
sleep(1);
- osrf_prefork_child_exit(child);
+ osrf_prefork_child_exit(child);
}
}
while(1) {
- osrfLogDebug(OSRF_LOG_MARK,
+ osrfLogDebug(OSRF_LOG_MARK,
"osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive);
- start = time(NULL);
- retval = osrf_app_session_queue_wait(session, keepalive, &recvd);
- end = time(NULL);
+ start = time(NULL);
+ retval = osrf_app_session_queue_wait(session, keepalive, &recvd);
+ end = time(NULL);
osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
/* if no data was reveived within the timeout interval */
if( !recvd && (end - start) >= keepalive ) {
- osrfLogInfo(OSRF_LOG_MARK, "No request was received in %d seconds, exiting stateful session", keepalive);
- osrfAppSessionStatus(
- session,
- OSRF_STATUS_TIMEOUT,
- "osrfConnectStatus",
+ osrfLogInfo(OSRF_LOG_MARK,
+ "No request was received in %d seconds, exiting stateful session", keepalive);
+ osrfAppSessionStatus(
+ session,
+ OSRF_STATUS_TIMEOUT,
+ "osrfConnectStatus",
0, "Disconnected on timeout" );
break;
}
-static prefork_simple* prefork_simple_init( transport_client* client,
+static prefork_simple* prefork_simple_init( transport_client* client,
int max_requests, int min_children, int max_children ) {
if( min_children > max_children ) {
"min_children=%d, max_children=%d", max_requests, min_children, max_children );
/* flesh out the struct */
- prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple));
+ prefork_simple* prefork = safe_malloc(sizeof(prefork_simple));
prefork->max_requests = max_requests;
prefork->min_children = min_children;
prefork->max_children = max_children;
prefork->current_num_children = 0;
prefork->keepalive = 0;
prefork->appname = NULL;
- prefork->first_child = NULL;
- prefork->connection = client;
+ prefork->first_child = NULL;
+ prefork->connection = client;
return prefork;
}
return NULL;
}
- osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
- prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
+ osrfLogInternal( OSRF_LOG_MARK, "Pipes: %d %d %d %d",
+ data_fd[0], data_fd[1], status_fd[0], status_fd[1] );
+ prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0],
data_fd[1], status_fd[0], status_fd[1] );
child->appname = strdup(forker->appname);
else { /* child */
- osrfLogInternal( OSRF_LOG_MARK, "I am new child with read_data_fd = %d and write_status_fd = %d",
- child->read_data_fd, child->write_status_fd );
+ osrfLogInternal( OSRF_LOG_MARK,
+ "I am new child with read_data_fd = %d and write_status_fd = %d",
+ child->read_data_fd, child->write_status_fd );
child->pid = getpid();
close( child->write_data_fd );
/* do the initing */
if( prefork_child_init_hook(child) == -1 ) {
- osrfLogError(OSRF_LOG_MARK,
+ osrfLogError(OSRF_LOG_MARK,
"Forker child going away because we could not connect to OpenSRF...");
- osrf_prefork_child_exit(child);
+ osrf_prefork_child_exit(child);
}
prefork_child_wait( child );
- osrf_prefork_child_exit(child); /* just to be sure */
- }
+ osrf_prefork_child_exit(child); /* just to be sure */
+ }
return NULL;
}
static void osrf_prefork_child_exit(prefork_child* child) {
- osrfAppRunExitCode();
- exit(0);
+ osrfAppRunExitCode();
+ exit(0);
}
static void prefork_launch_children( prefork_simple* forker ) {
pid_t child_pid;
int status;
- while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
- del_prefork_child( forker, child_pid );
+ while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0)
+ del_prefork_child( forker, child_pid );
/* replenish */
- while( forker->current_num_children < forker->min_children )
+ while( forker->current_num_children < forker->min_children )
launch_child( forker );
child_dead = 0;
if( cur_msg == NULL ) continue;
- int honored = 0; /* true if we've serviced the request */
+ int honored = 0; /* true if we've serviced the request */
int no_recheck = 0;
while( ! honored ) {
- if(!no_recheck) check_children( forker, 0 );
+ if(!no_recheck) check_children( forker, 0 );
no_recheck = 0;
osrfLogDebug( OSRF_LOG_MARK, "Server received inbound data" );
/* Look for an available child */
for( k = 0; k < forker->current_num_children; k++ ) {
- osrfLogInternal( OSRF_LOG_MARK, "Searching for available child. cur_child->pid = %d", cur_child->pid );
- osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d", forker->current_num_children, k);
-
+ osrfLogInternal( OSRF_LOG_MARK,
+ "Searching for available child. cur_child->pid = %d", cur_child->pid );
+ osrfLogInternal( OSRF_LOG_MARK, "Current num children %d and loop %d",
+ forker->current_num_children, k);
+
if( cur_child->available ) {
osrfLogDebug( OSRF_LOG_MARK, "forker sending data to %d", cur_child->pid );
if( ! data || strlen(data) < 1 ) break;
cur_child->available = 0;
- osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d", cur_child->write_data_fd );
+ osrfLogInternal( OSRF_LOG_MARK, "Writing to child fd %d",
+ cur_child->write_data_fd );
int written = 0;
- //fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() );
if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) {
osrfLogWarning( OSRF_LOG_MARK, "Write returned error %d", errno);
cur_child = cur_child->next;
continue;
}
- //fprintf(stderr, "Wrote %d bytes to child\n", written);
-
forker->first_child = cur_child->next;
honored = 1;
break;
- } else
+ } else
cur_child = cur_child->next;
- }
+ }
/* if none available, add a new child if we can */
if( ! honored ) {
forker->current_num_children );
prefork_child* new_child = launch_child( forker );
- if( new_child ) {
-
- message_prepare_xml( cur_msg );
- char* data = cur_msg->msg_xml;
-
- if( data ) {
- int len = strlen(data);
-
- if( len > 0 ) {
- new_child->available = 0;
- osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
- new_child->write_data_fd, new_child->pid );
-
- if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
- forker->first_child = new_child->next;
- honored = 1;
- }
- }
- }
- }
+ if( new_child ) {
+
+ message_prepare_xml( cur_msg );
+ char* data = cur_msg->msg_xml;
+
+ if( data ) {
+ int len = strlen(data);
+
+ if( len > 0 ) {
+ new_child->available = 0;
+ osrfLogDebug( OSRF_LOG_MARK, "Writing to new child fd %d : pid %d",
+ new_child->write_data_fd, new_child->pid );
+
+ if( write( new_child->write_data_fd, data, len + 1 ) >= 0 ) {
+ forker->first_child = new_child->next;
+ honored = 1;
+ }
+ }
+ }
+ }
}
}
/** XXX Add a flag which tells select() to wait forever on children
- * in the best case, this will be faster than calling usleep(x), and
- * in the worst case it won't be slower and will do less logging...
- */
-
+ in the best case, this will be faster than calling usleep(x), and
+ in the worst case it won't be slower and will do less logging...
+*/
static void check_children( prefork_simple* forker, int forever ) {
//check_begin:
cur_child = cur_child->next;
}
- FD_CLR(0,&read_set);/* just to be sure */
+ FD_CLR(0,&read_set); /* just to be sure */
if( forever ) {
- osrfLogWarning(OSRF_LOG_MARK, "We have no children available - waiting for one to show up...");
+ osrfLogWarning(OSRF_LOG_MARK,
+ "We have no children available - waiting for one to show up...");
if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, NULL)) == -1 ) {
osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
}
- osrfLogInfo(OSRF_LOG_MARK, "select() completed after waiting on children to become available");
+ osrfLogInfo(OSRF_LOG_MARK,
+ "select() completed after waiting on children to become available");
} else {
struct timeval tv;
- tv.tv_sec = 0;
- tv.tv_usec = 0;
-
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) {
osrfLogWarning( OSRF_LOG_MARK, "Select returned error %d on check_children", errno );
}
if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) {
//printf( "Server received status from a child %d\n", cur_child->pid );
- osrfLogDebug( OSRF_LOG_MARK, "Server received status from a child %d", cur_child->pid );
+ osrfLogDebug( OSRF_LOG_MARK,
+ "Server received status from a child %d", cur_child->pid );
num_handled++;
char buf[64];
osrf_clearbuf( buf, sizeof(buf) );
if( (n=read(cur_child->read_status_fd, buf, sizeof(buf) - 1)) < 0 ) {
- osrfLogWarning( OSRF_LOG_MARK, "Read error after select in child status read with errno %d", errno);
+ osrfLogWarning( OSRF_LOG_MARK,
+ "Read error after select in child status read with errno %d", errno);
}
else {
buf[n] = '\0';
cur_child->available = 1;
}
cur_child = cur_child->next;
- }
+ }
}
if( errno == EAGAIN ) n = 0;
- if( errno == EPIPE ) {
- osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
- break;
- }
+ if( errno == EPIPE ) {
+ osrfLogDebug(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
+ break;
+ }
if( n < 0 ) {
- osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno );
+ osrfLogWarning( OSRF_LOG_MARK,
+ "Prefork child read returned error with errno %d", errno );
break;
} else if( gotdata ) {
buffer_reset( gbuf );
}
- if( i < child->max_requests - 1 )
+ if( i < child->max_requests - 1 )
write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 );
}
buffer_free(gbuf);
- osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
+ osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%ld]",
child->max_requests, i, (long) getpid() );
- osrf_prefork_child_exit(child); /* just to be sure */
+ osrf_prefork_child_exit(child); /* just to be sure */
}
static void add_prefork_child( prefork_simple* forker, prefork_child* child ) {
-
+
if( forker->first_child == NULL ) {
forker->first_child = child;
child->next = child;
return;
}
- /* we put the child in as the last because, regardless,
+ /* we put the child in as the last because, regardless,
we have to do the DLL splice dance, and this is the
- simplest way */
+ simplest way */
prefork_child* start_child = forker->first_child;
while(1) {
- if( forker->first_child->next == start_child )
+ if( forker->first_child->next == start_child )
break;
forker->first_child = forker->first_child->next;
}
- /* here we know that forker->first_child is the last element
+ /* here we know that forker->first_child is the last element
in the list and start_child is the first. Insert the
new child between them*/
//static prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) {
//
-// if( forker->first_child == NULL ) { return NULL; }
-// prefork_child* start_child = forker->first_child;
-// do {
-// if( forker->first_child->pid == pid )
-// return forker->first_child;
-// } while( (forker->first_child = forker->first_child->next) != start_child );
+// if( forker->first_child == NULL ) { return NULL; }
+// prefork_child* start_child = forker->first_child;
+// do {
+// if( forker->first_child->pid == pid )
+// return forker->first_child;
+// } while( (forker->first_child = forker->first_child->next) != start_child );
//
-// return NULL;
+// return NULL;
//}
-static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
+static void del_prefork_child( prefork_simple* forker, pid_t pid ) {
if( forker->first_child == NULL ) { return; }
osrfLogDebug( OSRF_LOG_MARK, "Deleting Child: %d", pid );
prefork_child* start_child = forker->first_child; /* starting point */
- prefork_child* cur_child = start_child; /* current pointer */
- prefork_child* prev_child = start_child; /* the trailing pointer */
+ prefork_child* cur_child = start_child; /* current pointer */
+ prefork_child* prev_child = start_child; /* the trailing pointer */
/* special case where there is only one in the list */
if( start_child == start_child->next ) {
/* special case where the first item in the list needs to be removed */
- if( start_child->pid == pid ) {
+ if( start_child->pid == pid ) {
/* find the last one so we can remove the start_child */
- do {
+ do {
prev_child = cur_child;
cur_child = cur_child->next;
- }while( cur_child != start_child );
+ } while( cur_child != start_child );
/* now cur_child == start_child */
prev_child->next = cur_child->next;
prefork_child_free( cur_child );
return;
- }
+ }
do {
prev_child = cur_child;
-static prefork_child* prefork_child_init(
- int max_requests, int read_data_fd, int write_data_fd,
+static prefork_child* prefork_child_init(
+ int max_requests, int read_data_fd, int write_data_fd,
int read_status_fd, int write_status_fd ) {
prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child));
child->pid = 0;
- child->max_requests = max_requests;
- child->read_data_fd = read_data_fd;
- child->write_data_fd = write_data_fd;
- child->read_status_fd = read_status_fd;
- child->write_status_fd = write_status_fd;
+ child->max_requests = max_requests;
+ child->read_data_fd = read_data_fd;
+ child->write_data_fd = write_data_fd;
+ child->read_status_fd = read_status_fd;
+ child->write_status_fd = write_status_fd;
child->min_children = 0;
- child->available = 1;
+ child->available = 1;
child->appname = NULL;
child->keepalive = 0;
child->next = NULL;
static int prefork_free( prefork_simple* prefork ) {
-
+
while( prefork->first_child != NULL ) {
- osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
- kill( 0, SIGKILL );
+ osrfLogInfo( OSRF_LOG_MARK, "Killing children and sleeping 1 to reap..." );
+ kill( 0, SIGKILL );
sleep(1);
}
return 1;
}
-static int prefork_child_free( prefork_child* child ) {
+static int prefork_child_free( prefork_child* child ) {
free(child->appname);
close(child->read_data_fd);
close(child->write_status_fd);
- free( child );
+ free( child );
return 1;
}
-