From 92846f005a47350cfbd67d8063edc72da3c446c8 Mon Sep 17 00:00:00 2001 From: erickson Date: Fri, 26 Aug 2005 20:03:31 +0000 Subject: [PATCH] adding utility methods adding the preforking for server apps fork code git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@510 9efc2488-bf62-4759-914b-345cdb29e865 --- src/libstack/Makefile | 12 +- src/libstack/osrf_app_session.c | 8 +- src/libstack/osrf_app_session.h | 5 +- src/libstack/osrf_prefork.c | 581 ++++++++++++++++++++++++++++++++++++++++ src/libstack/osrf_prefork.h | 89 ++++++ src/libstack/osrf_stack.c | 15 +- src/libstack/osrf_stack.h | 2 +- src/libstack/osrf_system.c | 40 ++- src/libstack/osrf_system.h | 8 +- src/utils/logging.c | 1 - src/utils/socket_bundle.c | 4 + src/utils/utils.c | 30 ++- src/utils/utils.h | 11 + 13 files changed, 767 insertions(+), 39 deletions(-) create mode 100644 src/libstack/osrf_prefork.c create mode 100644 src/libstack/osrf_prefork.h diff --git a/src/libstack/Makefile b/src/libstack/Makefile index 25b4fee..d86a348 100644 --- a/src/libstack/Makefile +++ b/src/libstack/Makefile @@ -1,11 +1,11 @@ CC_OPTS += -DASSUME_STATELESS LD_OPTS += -lxml2 -lopensrf_transport -lopensrf_stack -lobjson -lc_utils -SOURCES = osrf_message.c osrf_app_session.c osrf_stack.c osrf_system.c osrf_config.c osrf_settings.c -TARGETS = osrf_message.o osrf_app_session.o osrf_stack.o osrf_system.o osrf_config.o osrf_settings.o -HEADERS = osrf_message.h osrf_app_session.h osrf_stack.h osrf_system.h osrf_config.h osrf_settings.h +SOURCES = osrf_message.c osrf_app_session.c osrf_stack.c osrf_system.c osrf_config.c osrf_settings.c osrf_prefork.c +TARGETS = osrf_message.o osrf_app_session.o osrf_stack.o osrf_system.o osrf_config.o osrf_settings.o osrf_prefork.o +HEADERS = osrf_message.h osrf_app_session.h osrf_stack.h osrf_system.h osrf_config.h osrf_settings.h osrf_prefork.h -all: msg libopensrf_stack.so +all: msg libopensrf_stack.so test msg: echo "-> $$(pwd)" @@ -51,6 +51,10 @@ osrf_config.o: osrf_config.c osrf_config.h osrf_settings.o: osrf_settings.c osrf_settings.h echo $@; $(CC) -c $(CC_OPTS) osrf_settings.c -o $@ +osrf_prefork.o: osrf_prefork.c osrf_prefork.h + echo $@; $(CC) -c $(CC_OPTS) osrf_prefork.c -o $@ + + install: echo installing libopensrf_stack.so cp $(HEADERS) $(INCLUDEDIR)/$(OPENSRF) diff --git a/src/libstack/osrf_app_session.c b/src/libstack/osrf_app_session.c index 3648e40..0421878 100644 --- a/src/libstack/osrf_app_session.c +++ b/src/libstack/osrf_app_session.c @@ -247,8 +247,8 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) { char target_buf[512]; memset(target_buf,0,512); - char* domain = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_config_context ); /* just the first for now */ - char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_config_context ); + char* domain = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_get_config_context() ); /* just the first for now */ + char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_get_config_context() ); sprintf( target_buf, "%s@%s/%s", router_name, domain, remote_service ); free(domain); free(router_name); @@ -284,7 +284,7 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) { } osrf_app_session* osrf_app_server_session_init( - char* session_id, char* our_app, char* remote_service, char* remote_id ) { + char* session_id, char* our_app, char* remote_id ) { osrf_app_session* session = osrf_app_session_find_session( session_id ); if(session) @@ -302,7 +302,7 @@ osrf_app_session* osrf_app_server_session_init( session->remote_id = strdup(remote_id); session->orig_remote_id = strdup(remote_id); session->session_id = strdup(session_id); - session->remote_service = strdup(remote_service); + session->remote_service = strdup(our_app); #ifdef ASSUME_STATELESS session->stateless = 1; diff --git a/src/libstack/osrf_app_session.h b/src/libstack/osrf_app_session.h index 01b6dc5..bdb1644 100644 --- a/src/libstack/osrf_app_session.h +++ b/src/libstack/osrf_app_session.h @@ -54,7 +54,8 @@ struct osrf_app_session_struct { /** The current remote id of the remote service we're talking to */ char* remote_id; - /** Who we're talking to */ + /** Who we're talking to if we're a client. + what app we're serving if we're a server */ char* remote_service; /** The current request thread_trace */ @@ -90,7 +91,7 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ); * is checked to see if this session already exists, if so, it's returned */ osrf_app_session* osrf_app_server_session_init( - char* session_id, char* our_app, char* remote_service, char* remote_id ); + char* session_id, char* our_app, char* remote_id ); /** returns a session from the global session hash */ osrf_app_session* osrf_app_session_find_session( char* session_id ); diff --git a/src/libstack/osrf_prefork.c b/src/libstack/osrf_prefork.c new file mode 100644 index 0000000..1d829ad --- /dev/null +++ b/src/libstack/osrf_prefork.c @@ -0,0 +1,581 @@ +#include "osrf_prefork.h" +#include + +/* true if we just deleted a child. This will allow us to make sure we're + not trying to use freed memory */ +int child_dead; + +int main(); +void sigchld_handler( int sig ); + +int osrf_prefork_run(char* appname) { + + if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!"); + + int maxr = 1000; + int maxc = 10; + int minc = 3; + + info_handler("Loading config in osrf_forker for app %s", appname); + + object* max_req = osrf_settings_host_value_object("/apps/%s/unix_config/max_requests", appname); + object* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname); + object* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname); + + if(!max_req) warning_handler("Max requests not defined, assuming 1000"); + else maxr = max_req->num_value; + + if(!min_children) warning_handler("Min children not defined, assuming 3"); + else minc = min_children->num_value; + + if(!max_children) warning_handler("Max children not defined, assuming 10"); + else maxc = max_children->num_value; + + free_object(max_req); + free_object(min_children); + free_object(max_children); + /* --------------------------------------------------- */ + + char* resc = va_list_to_string("%s_listener", appname); + + if(!osrf_system_bootstrap_client_resc( + osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) + fatal_handler("Unable to bootstrap client for osrf_prefork_run()"); + free(resc); + + prefork_simple* forker = prefork_simple_init( + osrf_system_get_transport_client(), maxr, minc, maxc); + + forker->appname = strdup(appname); + + if(forker == NULL) + fatal_handler("osrf_prefork_run() failed to create prefork_simple object"); + + prefork_launch_children(forker); + + info_handler("Launching osrf_forker for app %s", appname); + prefork_run(forker); + + warning_handler("prefork_run() retuned - how??"); + prefork_free(forker); + return 0; + +} + +void osrf_prefork_register_routers() { + //char* router = osrf_config_value("//%s/ +} + +void prefork_child_init_hook(prefork_child* child) { + + if(!child) return; + info_handler("Child init hook for child %d", child->pid); + char* resc = va_list_to_string("%s_drone",child->appname); + if(!osrf_system_bootstrap_client_resc( + osrf_get_bootstrap_config(), osrf_get_config_context(), resc)) + fatal_handler("Unable to bootstrap client for osrf_prefork_run()"); + free(resc); +} + +void prefork_child_process_request(prefork_child* child, char* data) { + if(!child && child->connection) return; + + /* construct the message from the xml */ + debug_handler("Child %d received data from parent:\n%s\n", child->pid, data); + transport_message* msg = new_message_from_xml( data ); + + osrf_stack_transport_handler(msg, child->appname); + + /* + transport_message* ret_msg = message_init( + msg->body, msg->subject, msg->thread, msg->sender, NULL ); + + client_send_message(child->connection, ret_msg); + message_free( ret_msg ); + + printf("Message body size %d\n", strlen(msg->body)); + + printf( "Message Info\n" ); + printf( "%s\n", msg->sender ); + printf( "%s\n", msg->recipient ); + printf( "%s\n", msg->thread ); + printf( "%s\n", msg->body ); + printf( "%s\n", msg->subject ); + printf( "%s\n", msg->router_from ); + printf( "%d\n", msg->broadcast ); + + message_free( msg ); + */ +} + + +prefork_simple* prefork_simple_init( transport_client* client, + int max_requests, int min_children, int max_children ) { + + if( min_children > max_children ) + fatal_handler( "min_children (%d) is greater " + "than max_children (%d)", min_children, max_children ); + + if( max_children > ABS_MAX_CHILDREN ) + fatal_handler( "max_children (%d) is greater than ABS_MAX_CHILDREN (%d)", + max_children, ABS_MAX_CHILDREN ); + + /* flesh out the struct */ + prefork_simple* prefork = (prefork_simple*) safe_malloc(sizeof(prefork_simple)); + prefork->max_requests = max_requests; + prefork->min_children = min_children; + prefork->max_children = max_children; + prefork->first_child = NULL; + prefork->connection = client; + + return prefork; +} + +prefork_child* launch_child( prefork_simple* forker ) { + + pid_t pid; + int data_fd[2]; + int status_fd[2]; + + /* Set up the data pipes and add the child struct to the parent */ + if( pipe(data_fd) < 0 ) /* build the data pipe*/ + fatal_handler( "Pipe making error" ); + + if( pipe(status_fd) < 0 ) /* build the status pipe */ + fatal_handler( "Pipe making error" ); + + debug_handler( "Pipes: %d %d %d %d", data_fd[0], data_fd[1], status_fd[0], status_fd[1] ); + prefork_child* child = prefork_child_init( forker->max_requests, data_fd[0], + data_fd[1], status_fd[0], status_fd[1] ); + + child->appname = strdup(forker->appname); + + + add_prefork_child( forker, child ); + + if( (pid=fork()) < 0 ) fatal_handler( "Forking Error" ); + + if( pid > 0 ) { /* parent */ + + signal(SIGCHLD, sigchld_handler); + (forker->current_num_children)++; + child->pid = pid; + + info_handler( "Parent launched %d", pid ); + /* *no* child pipe FD's can be closed or the parent will re-use fd's that + the children are currently using */ + return child; + } + + else { /* child */ + + debug_handler("I am new child with read_data_fd = %d and write_status_fd = %d", + child->read_data_fd, child->write_status_fd ); + + child->pid = getpid(); + close( child->write_data_fd ); + close( child->read_status_fd ); + + /* do the initing */ + prefork_child_init_hook(child); + + prefork_child_wait( child ); + exit(0); /* just to be sure */ + } + return NULL; +} + + +void prefork_launch_children( prefork_simple* forker ) { + if(!forker) return; + int c = 0; + while( c++ < forker->min_children ) + launch_child( forker ); +} + + +void sigchld_handler( int sig ) { + signal(SIGCHLD, sigchld_handler); + child_dead = 1; +} + + +void reap_children( prefork_simple* forker ) { + + pid_t child_pid; + int status; + + while( (child_pid=waitpid(-1,&status,WNOHANG)) > 0) + del_prefork_child( forker, child_pid ); + + /* replenish */ + while( forker->current_num_children < forker->min_children ) + launch_child( forker ); + + child_dead = 0; +} + +void prefork_run(prefork_simple* forker) { + + if( forker->first_child == NULL ) + return; + + transport_message* cur_msg = NULL; + + while(1) { + + if( forker->first_child == NULL ) {/* no more children */ + warning_handler("No more children..." ); + return; + } + + debug_handler("Forker going into wait for data..."); + sleep(2); + cur_msg = client_recv( forker->connection, -1 ); + + fprintf(stderr, "Got Data %f\n", get_timestamp_millis() ); + + if( cur_msg == NULL ) continue; + + int honored = 0; /* true if we've serviced the request */ + + while( ! honored ) { + + check_children( forker ); + + debug_handler( "Server received inbound data" ); + int k; + prefork_child* cur_child = forker->first_child; + + /* Look for an available child */ + for( k = 0; k < forker->current_num_children; k++ ) { + + debug_handler("Searching for available child. cur_child->pid = %d", cur_child->pid ); + debug_handler("Current num children %d and loop %d", forker->current_num_children, k); + + if( cur_child->available ) { + debug_handler( "sending data to %d", cur_child->pid ); + message_prepare_xml( cur_msg ); + char* data = cur_msg->msg_xml; + if( ! data || strlen(data) < 1 ) break; + cur_child->available = 0; + debug_handler( "Writing to child fd %d", cur_child->write_data_fd ); + + int written = 0; + fprintf(stderr, "Writing Data %f\n", get_timestamp_millis() ); + if( (written = write( cur_child->write_data_fd, data, strlen(data) + 1 )) < 0 ) { + warning_handler("Write returned error %d", errno); + cur_child = cur_child->next; + continue; + } + + fprintf(stderr, "Wrote %d bytes to child\n", written); + + forker->first_child = cur_child->next; + honored = 1; + break; + } else + cur_child = cur_child->next; + } + + /* if none available, add a new child if we can */ + if( ! honored ) { + debug_handler("Not enough children, attempting to add..."); + if( forker->current_num_children < forker->max_children ) { + debug_handler( "Launching new child with current_num = %d", + forker->current_num_children ); + + prefork_child* new_child = launch_child( forker ); + message_prepare_xml( cur_msg ); + char* data = cur_msg->msg_xml; + if( ! data || strlen(data) < 1 ) break; + new_child->available = 0; + debug_handler( "sending data to %d", new_child->pid ); + debug_handler( "Writing to new child fd %d", new_child->write_data_fd ); + write( new_child->write_data_fd, data, strlen(data) + 1 ); + forker->first_child = new_child->next; + honored = 1; + } + } + + if( !honored ) { + warning_handler( "No children available, sleeping and looping..." ); + usleep( 50000 ); /* 50 milliseconds */ + } + + if( child_dead ) + reap_children(forker); + + + fprintf(stderr, "Parent done with request %f\n", get_timestamp_millis() ); + + } // honored? + + } /* top level listen loop */ + +} + + +void check_children( prefork_simple* forker ) { + + //check_begin: + + int select_ret; + fd_set read_set; + FD_ZERO(&read_set); + int max_fd = 0; + int n; + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + if( child_dead ) + reap_children(forker); + + prefork_child* cur_child = forker->first_child; + + int i; + for( i = 0; i!= forker->current_num_children; i++ ) { + + if( cur_child->read_status_fd > max_fd ) + max_fd = cur_child->read_status_fd; + FD_SET( cur_child->read_status_fd, &read_set ); + cur_child = cur_child->next; + } + + FD_CLR(0,&read_set);/* just to be sure */ + + if( (select_ret=select( max_fd + 1 , &read_set, NULL, NULL, &tv)) == -1 ) { + warning_handler( "Select returned error %d on check_children", errno ); + } + + if( select_ret == 0 ) + return; + + /* see if one of a child has told us it's done */ + cur_child = forker->first_child; + int j; + int num_handled = 0; + for( j = 0; j!= forker->current_num_children && num_handled < select_ret ; j++ ) { + + if( FD_ISSET( cur_child->read_status_fd, &read_set ) ) { + printf( "Server received status from a child %d\n", cur_child->pid ); + debug_handler( "Server received status from a child %d", cur_child->pid ); + + num_handled++; + + /* now suck off the data */ + char buf[64]; + memset( buf, 0, 64); + if( (n=read(cur_child->read_status_fd, buf, 63)) < 0 ) { + warning_handler("Read error afer select in child status read with errno %d", errno); + } + + debug_handler( "Read %d bytes from status buffer: %s", n, buf ); + cur_child->available = 1; + } + cur_child = cur_child->next; + } + +} + + +void prefork_child_wait( prefork_child* child ) { + + int i,n; + growing_buffer* gbuf = buffer_init( READ_BUFSIZE ); + char buf[READ_BUFSIZE]; + memset( buf, 0, READ_BUFSIZE ); + + for( i = 0; i!= child->max_requests; i++ ) { + + n = -1; + clr_fl(child->read_data_fd, O_NONBLOCK ); + while( (n=read(child->read_data_fd, buf, READ_BUFSIZE-1)) > 0 ) { + buffer_add( gbuf, buf ); + memset( buf, 0, READ_BUFSIZE ); + + fprintf(stderr, "Child read %d bytes\n", n); + + if( n == READ_BUFSIZE ) { + fprintf(stderr, "We read READ_BUFSIZE data....\n"); + /* XXX */ + /* either we have exactly READ_BUFSIZE data, + or there's more waiting that we need to grab*/ + /* must set to non-block for reading more */ + } else { + fprintf(stderr, "Read Data %f\n", get_timestamp_millis() ); + prefork_child_process_request(child, gbuf->buf); + buffer_reset( gbuf ); + break; + } + } + + if( n < 0 ) { + warning_handler( "Child read returned error with errno %d", errno ); + break; + } + + if( i < child->max_requests - 1 ) + write( child->write_status_fd, "available" /*less than 64 bytes*/, 9 ); + } + + buffer_free(gbuf); + + debug_handler("Child exiting...[%d]", getpid() ); + + exit(0); +} + + +void add_prefork_child( prefork_simple* forker, prefork_child* child ) { + + if( forker->first_child == NULL ) { + forker->first_child = child; + child->next = child; + return; + } + + /* we put the child in as the last because, regardless, + we have to do the DLL splice dance, and this is the + simplest way */ + + prefork_child* start_child = forker->first_child; + while(1) { + if( forker->first_child->next == start_child ) + break; + forker->first_child = forker->first_child->next; + } + + /* here we know that forker->first_child is the last element + in the list and start_child is the first. Insert the + new child between them*/ + + forker->first_child->next = child; + child->next = start_child; + return; +} + +prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ) { + + if( forker->first_child == NULL ) { return NULL; } + prefork_child* start_child = forker->first_child; + do { + if( forker->first_child->pid == pid ) + return forker->first_child; + } while( (forker->first_child = forker->first_child->next) != start_child ); + + return NULL; +} + + +void del_prefork_child( prefork_simple* forker, pid_t pid ) { + + if( forker->first_child == NULL ) { return; } + + (forker->current_num_children)--; + debug_handler("Deleting Child: %d", pid ); + + prefork_child* start_child = forker->first_child; /* starting point */ + prefork_child* cur_child = start_child; /* current pointer */ + prefork_child* prev_child = start_child; /* the trailing pointer */ + + /* special case where there is only one in the list */ + if( start_child == start_child->next ) { + if( start_child->pid == pid ) { + forker->first_child = NULL; + + close( start_child->read_data_fd ); + close( start_child->write_data_fd ); + close( start_child->read_status_fd ); + close( start_child->write_status_fd ); + + prefork_child_free( start_child ); + } + return; + } + + + /* special case where the first item in the list needs to be removed */ + if( start_child->pid == pid ) { + + /* find the last one so we can remove the start_child */ + do { + prev_child = cur_child; + cur_child = cur_child->next; + }while( cur_child != start_child ); + + /* now cur_child == start_child */ + prev_child->next = cur_child->next; + forker->first_child = prev_child; + + close( cur_child->read_data_fd ); + close( cur_child->write_data_fd ); + close( cur_child->read_status_fd ); + close( cur_child->write_status_fd ); + + prefork_child_free( cur_child ); + return; + } + + do { + prev_child = cur_child; + cur_child = cur_child->next; + + if( cur_child->pid == pid ) { + prev_child->next = cur_child->next; + + close( cur_child->read_data_fd ); + close( cur_child->write_data_fd ); + close( cur_child->read_status_fd ); + close( cur_child->write_status_fd ); + + prefork_child_free( cur_child ); + return; + } + + } while(cur_child != start_child); +} + + + + +prefork_child* prefork_child_init( + int max_requests, int read_data_fd, int write_data_fd, + int read_status_fd, int write_status_fd ) { + + prefork_child* child = (prefork_child*) safe_malloc(sizeof(prefork_child)); + child->max_requests = max_requests; + child->read_data_fd = read_data_fd; + child->write_data_fd = write_data_fd; + child->read_status_fd = read_status_fd; + child->write_status_fd = write_status_fd; + child->available = 1; + + return child; +} + + +int prefork_free( prefork_simple* prefork ) { + + while( prefork->first_child != NULL ) { + info_handler( "Killing children and sleeping 1 to reap..." ); + kill( 0, SIGKILL ); + sleep(1); + } + + client_free(prefork->connection); + free(prefork->appname); + free( prefork ); + return 1; +} + +int prefork_child_free( prefork_child* child ) { + free(child->appname); + close(child->read_data_fd); + close(child->write_status_fd); + free( child ); + return 1; +} + diff --git a/src/libstack/osrf_prefork.h b/src/libstack/osrf_prefork.h new file mode 100644 index 0000000..8c1725f --- /dev/null +++ b/src/libstack/osrf_prefork.h @@ -0,0 +1,89 @@ +#include +#include +#include +#include +#include +#include +#include +#include + +#include "utils.h" +#include "opensrf/transport_message.h" +#include "osrf_stack.h" +#include "osrf_settings.h" + +#define READ_BUFSIZE 4096 +#define MAX_BUFSIZE 10485760 /* 10M enough? ;) */ +#define ABS_MAX_CHILDREN 256 + +/* we receive data. we find the next child in + line that is available. pass the data down that childs pipe and go + back to listening for more data. + when we receive SIGCHLD, we check for any dead children and clean up + their respective prefork_child objects, close pipes, etc. + + we build a select fd_set with all the child pipes (going to the parent) + when a child is done processing a request, it writes a small chunk of + data to the parent to alert the parent that the child is again available + */ + +struct prefork_simple_struct { + int max_requests; + int min_children; + int max_children; + int fd; + int data_to_child; + int data_to_parent; + int current_num_children; + char* appname; + struct prefork_child_struct* first_child; + transport_client* connection; +}; +typedef struct prefork_simple_struct prefork_simple; + +struct prefork_child_struct { + pid_t pid; + int read_data_fd; + int write_data_fd; + int read_status_fd; + int write_status_fd; + int min_children; + int available; + int max_requests; + char* appname; + struct prefork_child_struct* next; + transport_client* connection; +}; + +typedef struct prefork_child_struct prefork_child; + +int osrf_prefork_run(char* appname); + +prefork_simple* prefork_simple_init( transport_client* client, + int max_requests, int min_children, int max_children ); + +prefork_child* launch_child( prefork_simple* forker ); +void prefork_launch_children( prefork_simple* forker ); + +void prefork_run(prefork_simple* forker); + +void add_prefork_child( prefork_simple* forker, prefork_child* child ); +prefork_child* find_prefork_child( prefork_simple* forker, pid_t pid ); +void del_prefork_child( prefork_simple* forker, pid_t pid ); + +void check_children( prefork_simple* forker ); + +void prefork_child_process_request(prefork_child*, char* data); +void prefork_child_init_hook(prefork_child*); + +prefork_child* prefork_child_init( + int max_requests, int read_data_fd, int write_data_fd, + int read_status_fd, int write_status_fd ); + +/* listens on the 'data_to_child' fd and wait for incoming data */ +void prefork_child_wait( prefork_child* child ); + +int prefork_free( prefork_simple* ); +int prefork_child_free( prefork_child* ); + + diff --git a/src/libstack/osrf_stack.c b/src/libstack/osrf_stack.c index c11cfc6..23cbf4c 100644 --- a/src/libstack/osrf_stack.c +++ b/src/libstack/osrf_stack.c @@ -10,13 +10,13 @@ int osrf_stack_process( transport_client* client, int timeout ) { transport_message* msg = client_recv( client, timeout ); if(msg == NULL) return 0; debug_handler( "Received message from transport code from %s", msg->sender ); - int status = osrf_stack_transport_handler( msg ); + int status = osrf_stack_transport_handler( msg, NULL ); while(1) { transport_message* m = client_recv( client, 0 ); if(m) { debug_handler( "Received additional message from transport code"); - status = osrf_stack_transport_handler( m ); + status = osrf_stack_transport_handler( m, NULL ); } else { debug_handler( "osrf_stack_process returning with only 1 received message" ); break; @@ -31,7 +31,7 @@ int osrf_stack_process( transport_client* client, int timeout ) { // ----------------------------------------------------------------------------- // Entry point into the stack // ----------------------------------------------------------------------------- -int osrf_stack_transport_handler( transport_message* msg ) { +int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { debug_handler( "Transport handler received new message \nfrom %s " "to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body ); @@ -40,12 +40,12 @@ int osrf_stack_transport_handler( transport_message* msg ) { if( session == NULL ) { /* we must be a server, build a new session */ info_handler( "Received message for nonexistant session. Dropping..." ); - //osrf_app_server_session_init( msg->thread, - message_free( msg ); - return 1; + osrf_app_server_session_init( msg->thread, my_service, msg->sender); + //message_free( msg ); + //return 1; } - debug_handler("Session [%s] found, building message", msg->thread ); + //debug_handler("Session [%s] found, building message", msg->thread ); osrf_app_session_set_remote( session, msg->sender ); osrf_message* arr[OSRF_MAX_MSGS_PER_PACKET]; @@ -181,7 +181,6 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) { if(session == NULL || msg == NULL) return NULL; - if( msg->m_type == STATUS ) { return NULL; } warning_handler( "We dont' do servers yet !!" ); diff --git a/src/libstack/osrf_stack.h b/src/libstack/osrf_stack.h index aaac111..fc35abb 100644 --- a/src/libstack/osrf_stack.h +++ b/src/libstack/osrf_stack.h @@ -11,7 +11,7 @@ // ----------------------------------------------------------------------------- int osrf_stack_process( transport_client* client, int timeout ); -int osrf_stack_transport_handler( transport_message* msg ); +int osrf_stack_transport_handler( transport_message* msg, char* my_service ); int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ); int osrf_stack_application_handler( osrf_app_session* session, osrf_message* msg ); diff --git a/src/libstack/osrf_system.c b/src/libstack/osrf_system.c index 62bf673..39cf7cf 100644 --- a/src/libstack/osrf_system.c +++ b/src/libstack/osrf_system.c @@ -2,20 +2,35 @@ transport_client* global_client; char* system_config = NULL; +char* config_context = NULL; +char* bootstrap_config = NULL; transport_client* osrf_system_get_transport_client() { return global_client; } + +char* osrf_get_config_context() { + return config_context; +} + +char* osrf_get_bootstrap_config() { + return bootstrap_config; +} + int osrf_system_bootstrap_client( char* config_file, char* contextnode ) { + return osrf_system_bootstrap_client_resc(config_file, contextnode, NULL); +} + +int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, char* resource ) { if( !config_file || !contextnode ) fatal_handler("No Config File Specified\n" ); - free(system_config); - free(osrf_config_context); - system_config = strdup(config_file); - osrf_config_context = strdup(contextnode); + config_context = strdup(contextnode); + bootstrap_config = strdup(config_file); + + debug_handler("Bootstrapping client with config %s and context node %s", config_file, contextnode); config_reader_init( "opensrf.bootstrap", config_file ); @@ -37,13 +52,18 @@ int osrf_system_bootstrap_client( char* config_file, char* contextnode ) { info_handler("Bootstrapping system with domain %s, port %d, and unixpath %s", domain, iport, unixpath ); transport_client* client = client_init( domain, iport, unixpath, 0 ); - - char buf[256]; - memset(buf,0,256); + char* host = getenv("HOSTNAME"); - sprintf(buf, "client_%s_%d", host, getpid() ); + if(!resource) resource = ""; + int len = strlen(resource) + 256; + char buf[len]; + memset(buf,0,len); + snprintf(buf, len - 1, "opensrf_%s_%s_%d", resource, host, getpid() ); + if(client_connect( client, username, password, buf, 10, AUTH_DIGEST )) { + /* child nodes will leak the parents client... but we can't free + it without disconnecting the parents client :( */ global_client = client; } @@ -71,8 +91,8 @@ int osrf_system_disconnect_client() { int osrf_system_shutdown() { config_reader_free(); osrf_system_disconnect_client(); - free(system_config); - free(osrf_config_context); + //free(system_config); + //free(config_context); osrf_settings_free_host_config(NULL); log_free(); return 1; diff --git a/src/libstack/osrf_system.h b/src/libstack/osrf_system.h index cf50989..66e155d 100644 --- a/src/libstack/osrf_system.h +++ b/src/libstack/osrf_system.h @@ -11,14 +11,20 @@ contextnode is the location in the config file where we collect config info */ -char* osrf_config_context; int osrf_system_bootstrap_client( char* config_file, char* contextnode ); + +/* bootstraps a client adding the given resource string to the host/pid, etc. resource string */ +int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, char* resource ); + transport_client* osrf_system_get_transport_client(); /* disconnects and destroys the current client connection */ int osrf_system_disconnect_client(); int osrf_system_shutdown(); +char* osrf_get_config_context(); + +char* osrf_get_bootstrap_config(); #endif diff --git a/src/utils/logging.c b/src/utils/logging.c index 53105b0..6f60798 100644 --- a/src/utils/logging.c +++ b/src/utils/logging.c @@ -1,7 +1,6 @@ #include #include "logging.h" - void get_timestamp( char buf_36chars[]) { struct timeb tb; diff --git a/src/utils/socket_bundle.c b/src/utils/socket_bundle.c index 1062ace..dc4a8e2 100644 --- a/src/utils/socket_bundle.c +++ b/src/utils/socket_bundle.c @@ -361,12 +361,14 @@ int socket_wait(socket_manager* mgr, int timeout, int sock_fd) { // If timeout is -1, we block indefinitely if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, NULL)) == -1 ) { + warning_handler("Sys Error: %s", strerror(errno)); return warning_handler("Call to select interrupted"); } } else if( timeout > 0 ) { /* timeout of 0 means don't block */ if( (retval = select( sock_fd + 1, &read_set, NULL, NULL, &tv)) == -1 ) { + warning_handler("Sys Error: %s", strerror(errno)); return warning_handler( "Call to select interrupted" ); } } @@ -402,12 +404,14 @@ int socket_wait_all(socket_manager* mgr, int timeout) { // If timeout is -1, there is no timeout passed to the call to select if( (retval = select( max_fd, &read_set, NULL, NULL, NULL)) == -1 ) { + warning_handler("Sys Error: %s", strerror(errno)); return warning_handler("Call to select interrupted"); } } else if( timeout != 0 ) { /* timeout of 0 means don't block */ if( (retval = select( max_fd, &read_set, NULL, NULL, &tv)) == -1 ) { + warning_handler("Sys Error: %s", strerror(errno)); return warning_handler( "Call to select interrupted" ); } } diff --git a/src/utils/utils.c b/src/utils/utils.c index 9c0a3c2..c1019da 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -14,14 +14,6 @@ MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General Public License for more details. */ -#include - -#include -#include -#include -#include - -//#include #include "utils.h" inline void* safe_malloc( int size ) { @@ -34,6 +26,8 @@ inline void* safe_malloc( int size ) { return ptr; } + + /* utility method for profiling */ double get_timestamp_millis() { //struct timeb t; @@ -92,6 +86,26 @@ long va_list_size(const char* format, va_list args) { } +char* va_list_to_string(const char* format, ...) { + + long len = 0; + va_list args; + va_list a_copy; + + va_copy(a_copy, args); + + va_start(args, format); + len = va_list_size(format, args); + + char buf[len]; + memset(buf, 0, len); + + va_start(a_copy, format); + vsnprintf(buf, len - 1, format, a_copy); + va_end(a_copy); + return strdup(buf); +} + // --------------------------------------------------------------------------------- // Flesh out a ubiqitous growing string buffer // --------------------------------------------------------------------------------- diff --git a/src/utils/utils.h b/src/utils/utils.h index af09795..1dc004d 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -17,12 +17,18 @@ GNU General Public License for more details. #ifndef UTILS_H #define UTILS_H +#include #include #include #include #include #include #include +#include +#include +#include +//#include + #define BUFFER_MAX_SIZE 10485760 @@ -57,6 +63,10 @@ int buffer_add_char(growing_buffer* gb, char c); */ long va_list_size(const char* format, va_list); +/* turns a va list into a string, caller must free the + allocated char */ +char* va_list_to_string(const char* format, ...); + /* string escape utility method. escapes unicode embeded characters. escapes the usual \n, \t, etc. @@ -92,4 +102,5 @@ char* file_to_string(char* filename); + #endif -- 2.11.0