From: erickson Date: Tue, 6 Sep 2005 01:04:09 +0000 (+0000) Subject: added CONNECT handling X-Git-Tag: osrf_rel_2_0_1~1351 X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=3669038112c22a663a299a1849916b298fd4b390;p=OpenSRF.git added CONNECT handling added respond complete (general message batch handling) changed Dispatcher to Context cuz i thought it made more sense added gnarly function to change the process name for C progs laid groundwork for statefull keepalive loop git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@522 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/src/c-apps/osrf_dbmath.c b/src/c-apps/osrf_dbmath.c index 02b0ecc..a792245 100644 --- a/src/c-apps/osrf_dbmath.c +++ b/src/c-apps/osrf_dbmath.c @@ -4,7 +4,7 @@ int initialize(); int childInit(); -int osrfMathRun( osrfMethodDispatcher* ); +int osrfMathRun( osrfMethodContext* ); int initialize() { @@ -19,9 +19,9 @@ int childInit() { return 0; } -int osrfMathRun( osrfMethodDispatcher* d ) { +int osrfMathRun( osrfMethodContext* d ) { - OSRF_METHOD_VERIFY_DISPATCHER(d); + OSRF_METHOD_VERIFY_CONTEXT(d); jsonObject* x = jsonObjectGetIndex(params, 0); jsonObject* y = jsonObjectGetIndex(params, 1); @@ -43,7 +43,7 @@ int osrfMathRun( osrfMethodDispatcher* d ) { if(!strcmp(method->name, "div")) r = i / j; jsonObject* resp = jsonNewNumberObject(r); - osrfAppRequestRespond( session, request, resp ); + osrfAppRequestRespondComplete( session, request, resp ); jsonObjectFree(resp); free(a); free(b); diff --git a/src/c-apps/osrf_math.c b/src/c-apps/osrf_math.c index d695e0f..c727c25 100644 --- a/src/c-apps/osrf_math.c +++ b/src/c-apps/osrf_math.c @@ -4,7 +4,7 @@ int initialize(); int childInit(); -int osrfMathRun( osrfMethodDispatcher* ); +int osrfMathRun( osrfMethodContext* ); int initialize() { @@ -21,9 +21,9 @@ int childInit() { return 0; } -int osrfMathRun( osrfMethodDispatcher* d ) { +int osrfMathRun( osrfMethodContext* c ) { - OSRF_METHOD_VERIFY_DISPATCHER(d); /* see osrf_application.h */ + OSRF_METHOD_VERIFY_CONTEXT(c); /* see osrf_application.h */ /* collect the request params */ jsonObject* x = jsonObjectGetIndex(params, 0); @@ -52,7 +52,7 @@ int osrfMathRun( osrfMethodDispatcher* d ) { if(omsg) { /* return dbmath's response to the user */ - osrfAppRequestRespond( session, request, osrfMessageGetResult(omsg) ); + osrfAppRequestRespondComplete( session, request, osrfMessageGetResult(omsg) ); osrfMessageFree(omsg); return 0; } diff --git a/src/libstack/opensrf.c b/src/libstack/opensrf.c index 6e77ace..1d4c656 100644 --- a/src/libstack/opensrf.c +++ b/src/libstack/opensrf.c @@ -1,4 +1,5 @@ #include "osrf_system.h" +#include "opensrf/utils.h" int main( int argc, char* argv[] ) { @@ -10,7 +11,18 @@ int main( int argc, char* argv[] ) { fprintf(stderr, "Loading OpenSRF host %s with bootstrap config %s " "and config context %s\n", argv[1], argv[2], argv[3] ); - osrfSystemBootstrap( argv[1], argv[2], argv[3] ); + char* host = strdup( argv[1] ); + char* config = strdup( argv[2] ); + char* context = strdup( argv[3] ); + + init_proc_title( argc, argv ); + set_proc_title( "opensrf system" ); + + osrfSystemBootstrap( host, config, context ); + + free(host); + free(config); + free(context); return 0; } diff --git a/src/libstack/osrf_app_session.c b/src/libstack/osrf_app_session.c index 35036ce..5c0c725 100644 --- a/src/libstack/osrf_app_session.c +++ b/src/libstack/osrf_app_session.c @@ -604,45 +604,67 @@ int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) { return _osrf_app_request_resend( req ); } -/** Send the given message */ -int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){ - if(session == NULL) return 0; - int ret_val= 0; - osrf_app_session_queue_wait( session, 0 ); - debug_handler( "AppSession sending type %d, and thread_trace %d", - msg->m_type, msg->thread_trace ); +int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) { - if(session->stateless) { - osrf_app_session_reset_remote(session); + if( !(session && msgs && size > 0) ) return 0; + int retval = 0; - } else { - if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) && + osrfMessage* msg = msgs[0]; + + if(msg) { + + osrf_app_session_queue_wait( session, 0 ); + + /* if we're not stateless and the first message is not a connect + message, then we do the connect first */ + if(session->stateless) { + osrf_app_session_reset_remote(session); + + } else { + + if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) && (session->state != OSRF_SESSION_CONNECTED) ) { - if(!osrf_app_session_connect( session )) - return 0; + if(!osrf_app_session_connect( session )) + return 0; + } } } - //char* xml = osrf_message_to_xml(msg); - char* string = osrf_message_serialize(msg); + char* string = osrfMessageSerializeBatch(msgs, size); - debug_handler("[%s] [%s] Remote Id: %s", - session->remote_service, session->session_id, session->remote_id ); + if( string ) { - transport_message* t_msg = message_init( - string, "", session->session_id, session->remote_id, NULL ); + transport_message* t_msg = message_init( + string, "", session->session_id, session->remote_id, NULL ); + + debug_handler("Session [%s] [%s] sending to %s \nXML:\n%s", + session->remote_service, session->session_id, t_msg->recipient, string ); + + retval = client_send_message( session->transport_handle, t_msg ); + + free(string); + message_free( t_msg ); + } + + return retval; - debug_handler("Session [%s] [%s] sending to %s \nXML:\n%s", - session->remote_service, session->session_id, t_msg->recipient, string ); - ret_val = client_send_message( session->transport_handle, t_msg ); - free(string); - message_free( t_msg ); - return ret_val; } + + +int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){ + if( !(session && msg) ) return 0; + osrfMessage* a[1]; + a[0] = msg; + return osrfAppSessionSendBatch( session, a, 1 ); +} + + + + /** Waits up to 'timeout' seconds for some data to arrive. * Any data that arrives will be processed according to its * payload and message type. This method will return after @@ -715,6 +737,34 @@ int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data } +int osrfAppRequestRespondComplete( + osrfAppSession* ses, int requestId, jsonObject* data ) { + + osrf_message* payload = osrf_message_init( RESULT, requestId, 1 ); + osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK ); + + char* json = jsonObjectToJSON( data ); + osrf_message_set_result_content( payload, json ); + free(json); + + osrf_message* status = osrf_message_init( STATUS, requestId, 1); + osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE ); + + osrfMessage* ms[2]; + ms[0] = payload; + ms[1] = status; + + osrfAppSessionSendBatch( ses, ms, 2 ); + + osrf_message_free( payload ); + osrf_message_free( status ); + + /* join and free */ + + return 0; +} + + int osrfAppSessionStatus( osrfAppSession* ses, int type, int reqId, char* message ) { diff --git a/src/libstack/osrf_app_session.h b/src/libstack/osrf_app_session.h index 8eb4f85..df74979 100644 --- a/src/libstack/osrf_app_session.h +++ b/src/libstack/osrf_app_session.h @@ -219,7 +219,10 @@ void _osrf_app_session_remove_request( osrf_app_session*, osrf_app_request* req /** Send the given message */ int _osrf_app_session_send( osrf_app_session*, osrf_message* msg ); +int osrfAppSessionSendBatch( osrf_app_session*, osrf_message* msgs[], int size ); + int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data ); +int osrfAppRequestRespondComplete( osrfAppSession* ses, int requestId, jsonObject* data ); int osrfAppSessionStatus( osrfAppSession* ses, int type, int reqId, char* message ); diff --git a/src/libstack/osrf_application.c b/src/libstack/osrf_application.c index 3106af5..c3d7485 100644 --- a/src/libstack/osrf_application.c +++ b/src/libstack/osrf_application.c @@ -107,20 +107,24 @@ osrfMethod* _osrfAppFindMethod( char* appName, char* methodName ) { int osrfAppRunMethod( char* appName, char* methodName, osrfAppSession* ses, int reqId, jsonObject* params ) { - if(!appName || ! methodName || ! ses) return -1; + if( !(appName && methodName && ses) ) return -1; + char* error; + osrfApplication* app; + osrfMethod* method; + osrfMethodContext context; + + /* this is the method we're gonna run */ + int (*meth) (osrfMethodContext*); info_handler("Running method [%s] for app [%s] with request id %d and " "thread trace %s", methodName, appName, reqId, ses->session_id ); - osrfApplication* app = _osrfAppFindApplication(appName); - if(!app) return warning_handler( "Application not found: %s", appName ); - - osrfMethod* method = __osrfAppFindMethod( app, methodName ); - if(!method) return warning_handler( "NOT FOUND: app %s / method %s", appName, methodName ); + if( !(app = _osrfAppFindApplication(appName)) ) + return warning_handler( "Application not found: %s", appName ); - /* this is the method we're gonna run */ - int (*meth) (osrfMethodDispatcher*); + if( !(method = __osrfAppFindMethod( app, methodName )) ) + return warning_handler( "NOT FOUND: app %s / method %s", appName, methodName ); /* open the method */ *(void **) (&meth) = dlsym(app->handle, method->symbol); @@ -130,14 +134,13 @@ int osrfAppRunMethod( char* appName, char* methodName, osrfAppSession* ses, int "for method %s and app %s", method->symbol, method->name, app->name ); } - osrfMethodDispatcher d; - d.session = ses; - d.method = method; - d.params = params; - d.request = reqId; + context.session = ses; + context.method = method; + context.params = params; + context.request = reqId; /* run the method */ - int ret = (*meth) (&d); + int ret = (*meth) (&context); debug_handler("method returned %d", ret ); diff --git a/src/libstack/osrf_application.h b/src/libstack/osrf_application.h index 1724f80..da6d2e8 100644 --- a/src/libstack/osrf_application.h +++ b/src/libstack/osrf_application.h @@ -6,21 +6,13 @@ #include "objson/object.h" #include "osrf_app_session.h" -#define OSRF_METHOD_CHECK_PARAMS(x,y) \ - if( ! x || ! y ) return -1; \ - if(y->type != JSON_ARRAY ) return -1; \ - char* __j = jsonObjectToJSON(y);\ - if(__j) { \ - debug_handler("Service: %s | Params: %s", x->remote_service, __j);free(__j);} - - /** This macro verifies methods receive the correct parameters It also creates local variables "session", "method", "params", and "request" */ -#define OSRF_METHOD_VERIFY_DISPATCHER(__d) \ +#define OSRF_METHOD_VERIFY_CONTEXT(__d) \ if(!__d) return -1; \ \ osrfAppSession* session = __d->session; \ @@ -59,13 +51,13 @@ struct _osrfMethodStruct { }; typedef struct _osrfMethodStruct osrfMethod; -struct _osrfMethodDispatcherStruct { +struct _osrfMethodContextStruct { osrfAppSession* session; osrfMethod* method; jsonObject* params; int request; }; -typedef struct _osrfMethodDispatcherStruct osrfMethodDispatcher; +typedef struct _osrfMethodContextStruct osrfMethodContext; /** diff --git a/src/libstack/osrf_message.c b/src/libstack/osrf_message.c index 4a10c81..603f5a7 100644 --- a/src/libstack/osrf_message.c +++ b/src/libstack/osrf_message.c @@ -109,8 +109,45 @@ void osrf_message_free( osrf_message* msg ) { free(msg); } + +char* osrfMessageSerializeBatch( osrfMessage* msgs [], int count ) { + if( !msgs ) return NULL; + + char* j; + int i = 0; + osrfMessage* msg = NULL; + jsonObject* wrapper = jsonNewObject(NULL); + + while( ((msg = msgs[i]) && (i++ < count)) ) + jsonObjectPush(wrapper, osrfMessageToJSON( msg )); + + j = jsonObjectToJSON(wrapper); + jsonObjectFree(wrapper); + + return j; +} + + char* osrf_message_serialize(osrf_message* msg) { + if( msg == NULL ) return NULL; + char* j = NULL; + + jsonObject* json = osrfMessageToJSON( msg ); + + if(json) { + jsonObject* wrapper = jsonNewObject(NULL); + jsonObjectPush(wrapper, json); + j = jsonObjectToJSON(wrapper); + jsonObjectFree(wrapper); + } + + return j; +} + + +jsonObject* osrfMessageToJSON( osrfMessage* msg ) { + jsonObject* json = jsonNewObject(NULL); jsonObjectSetClass(json, "osrfMessage"); jsonObject* payload; @@ -166,12 +203,8 @@ char* osrf_message_serialize(osrf_message* msg) { jsonObjectSetKey(json, "payload", payload); break; } - - jsonObject* wrapper = jsonNewObject(NULL); - jsonObjectPush(wrapper, json); - char* j = jsonObjectToJSON(wrapper); - jsonObjectFree(wrapper); - return j; + + return json; } @@ -194,11 +227,6 @@ int osrf_message_deserialize(char* string, osrf_message* msgs[], int count) { jsonObject* message = jsonObjectGetIndex(json, x); - char* j = jsonObjectToJSON(message); - debug_handler("deserialize parsed message \n%s\n", j ); - free(j); - - if(message && message->type != JSON_NULL && message->classname && !strcmp(message->classname, "osrfMessage")) { @@ -218,20 +246,35 @@ int osrf_message_deserialize(char* string, osrf_message* msgs[], int count) { tmp = jsonObjectGetKey(message, "threadTrace"); if(tmp) { + char* tt = jsonObjectToSimpleString(tmp); + if(tt) { + new_msg->thread_trace = atoi(tt); + free(tt); + } + /* if(tmp->type == JSON_NUMBER) new_msg->thread_trace = (int) jsonObjectGetNumber(tmp); if(tmp->type == JSON_STRING) new_msg->thread_trace = atoi(jsonObjectGetString(tmp)); + */ } tmp = jsonObjectGetKey(message, "protocol"); if(tmp) { + char* proto = jsonObjectToSimpleString(tmp); + if(proto) { + new_msg->protocol = atoi(proto); + free(proto); + } + + /* if(tmp->type == JSON_NUMBER) new_msg->protocol = (int) jsonObjectGetNumber(tmp); if(tmp->type == JSON_STRING) new_msg->protocol = atoi(jsonObjectGetString(tmp)); + */ } tmp = jsonObjectGetKey(message, "payload"); @@ -271,7 +314,6 @@ int osrf_message_deserialize(char* string, osrf_message* msgs[], int count) { } msgs[numparsed++] = new_msg; - debug_handler("deserialize has parsed %d messages", numparsed); } } diff --git a/src/libstack/osrf_message.h b/src/libstack/osrf_message.h index dfb8f9c..9467c6b 100644 --- a/src/libstack/osrf_message.h +++ b/src/libstack/osrf_message.h @@ -94,6 +94,7 @@ char* osrf_message_serialize(osrf_message*); int osrf_message_deserialize(char* json, osrf_message* msgs[], int count); + /** Pushes any message retreived from the xml into the 'msgs' array. * it is assumed that 'msgs' has beenn pre-allocated. * Returns the number of message that are in the buffer. @@ -108,5 +109,13 @@ void osrf_message_add_param( osrf_message*, char* param_string ); jsonObject* osrfMessageGetResult( osrfMessage* msg ); +/** + Returns the message as a jsonObject + @return The jsonObject which must be freed by the caller. + */ +jsonObject* osrfMessageToJSON( osrfMessage* msg ); + +char* osrfMessageSerializeBatch( osrfMessage* msgs [], int count ); + #endif diff --git a/src/libstack/osrf_prefork.c b/src/libstack/osrf_prefork.c index 8dfeaf9..477b0dc 100644 --- a/src/libstack/osrf_prefork.c +++ b/src/libstack/osrf_prefork.c @@ -1,5 +1,6 @@ #include "osrf_prefork.h" #include +#include "osrf_app_session.h" /* true if we just deleted a child. This will allow us to make sure we're not trying to use freed memory */ @@ -12,6 +13,8 @@ int osrf_prefork_run(char* appname) { if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!"); + set_proc_title( "opensrf listener [%s]", appname ); + int maxr = 1000; int maxc = 10; int minc = 3; @@ -101,16 +104,22 @@ void prefork_child_init_hook(prefork_child* child) { if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc)) fatal_handler("Unable to bootstrap client for osrf_prefork_run()"); free(resc); + + set_proc_title( "opensrf drone [%s]", child->appname ); } void prefork_child_process_request(prefork_child* child, char* data) { - if(!child && child->connection) return; + if( !child ) return; /* construct the message from the xml */ - debug_handler("Child %d received data from parent:\n%s\n", child->pid, data); transport_message* msg = new_message_from_xml( data ); - osrf_stack_transport_handler(msg, child->appname); + osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname); + + if( ! session->stateless ) { /* keepalive loop for stateful sessions */ + + debug_handler("Entering keepalive loop for session %s", session->session_id ); + } } diff --git a/src/libstack/osrf_prefork.h b/src/libstack/osrf_prefork.h index 22d7742..474d2b0 100644 --- a/src/libstack/osrf_prefork.h +++ b/src/libstack/osrf_prefork.h @@ -54,7 +54,6 @@ struct prefork_child_struct { int max_requests; char* appname; struct prefork_child_struct* next; - transport_client* connection; }; typedef struct prefork_child_struct prefork_child; diff --git a/src/libstack/osrf_stack.c b/src/libstack/osrf_stack.c index e6e1cb0..a8dfef8 100644 --- a/src/libstack/osrf_stack.c +++ b/src/libstack/osrf_stack.c @@ -8,23 +8,16 @@ osrf_message* _do_server( osrf_app_session*, osrf_message* ); int (*osrf_stack_entry_point) (transport_client*, int) = &osrf_stack_process; int osrf_stack_process( transport_client* client, int timeout ) { - transport_message* msg = client_recv( client, timeout ); - if(msg == NULL) return 0; - debug_handler( "Received message from transport code from %s", msg->sender ); - int status = osrf_stack_transport_handler( msg, NULL ); - - while(1) { - transport_message* m = client_recv( client, 0 ); - if(m) { - debug_handler( "Received additional message from transport code"); - status = osrf_stack_transport_handler( m, NULL ); - } else { - debug_handler( "osrf_stack_process returning with only 1 received message" ); - break; - } + if( !client ) return -1; + transport_message* msg = NULL; + + while( (msg = client_recv( client, timeout )) ) { + debug_handler( "Received message from transport code from %s", msg->sender ); + osrf_stack_transport_handler( msg, NULL ); + timeout = 0; } - return status; + return 0; } @@ -32,9 +25,9 @@ int osrf_stack_process( transport_client* client, int timeout ) { // ----------------------------------------------------------------------------- // Entry point into the stack // ----------------------------------------------------------------------------- -int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { +osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_service ) { - if(!msg) return -1; + if(!msg) return NULL; debug_handler( "Transport handler received new message \nfrom %s " "to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body ); @@ -46,9 +39,11 @@ int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { osrf_app_session* session = osrf_app_session_find_session( msg->thread ); - if( session == NULL ) + if( !session ) session = osrf_app_server_session_init( msg->thread, my_service, msg->sender); + if( !session ) return NULL; + if(!msg->is_error) debug_handler("Session [%s] found or built", session->session_id ); @@ -59,7 +54,6 @@ int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { debug_handler( "We received %d messages from %s", num_msgs, msg->sender ); - /* XXX ERROR CHECKING, BAD JSON, ETC... */ int i; for( i = 0; i != num_msgs; i++ ) { @@ -88,7 +82,7 @@ int osrf_stack_transport_handler( transport_message* msg, char* my_service ) { message_free( msg ); debug_handler("after msg delete"); - return 1; + return session; } int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ) { @@ -202,7 +196,8 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) { return NULL; case CONNECT: - /* handle connect message */ + osrfAppSessionStatus( session, + OSRF_STATUS_OK, msg->thread_trace, "Connection Successful" ); return NULL; case REQUEST: diff --git a/src/libstack/osrf_stack.h b/src/libstack/osrf_stack.h index d3c6d5b..fbffbe8 100644 --- a/src/libstack/osrf_stack.h +++ b/src/libstack/osrf_stack.h @@ -10,7 +10,7 @@ // ----------------------------------------------------------------------------- int osrf_stack_process( transport_client* client, int timeout ); -int osrf_stack_transport_handler( transport_message* msg, char* my_service ); +osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_service ); int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ); int osrf_stack_application_handler( osrf_app_session* session, osrf_message* msg ); diff --git a/src/utils/utils.c b/src/utils/utils.c index e60925d..972049c 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -27,14 +27,35 @@ inline void* safe_malloc( int size ) { } +char** __global_argv = NULL; +int __global_argv_size = 0; + +int init_proc_title( int argc, char* argv[] ) { + + __global_argv = argv; + + int i = 0; + while( i < argc ) { + int len = strlen( __global_argv[i]); + bzero( __global_argv[i++], len ); + __global_argv_size += len; + } + + __global_argv_size -= 2; + return 0; +} + +int set_proc_title( char* format, ... ) { + VA_LIST_TO_STRING(format); + bzero( *(__global_argv), __global_argv_size ); + return snprintf( *(__global_argv), __global_argv_size, VA_BUF ); +} + /* utility method for profiling */ double get_timestamp_millis() { - //struct timeb t; struct timeval tv; gettimeofday(&tv, NULL); - //ftime(&t); -// double time = (int)t.time + ( ((double)t.millitm) / 1000 ) + ( ((double)tv.tv_usec / 1000000) ); double time = (int)tv.tv_sec + ( ((double)tv.tv_usec / 1000000) ); return time; } diff --git a/src/utils/utils.h b/src/utils/utils.h index 8da7002..96295a6 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -75,6 +75,11 @@ GNU General Public License for more details. #define BUFFER_MAX_SIZE 10485760 +/* these are evil and should be condemned */ +int init_proc_title( int argc, char* argv[] ); +int set_proc_title( char* format, ... ); + + int daemonize(); void* safe_malloc(int size);