From 22c803057fce55970240c64e63e237bd431e2066 Mon Sep 17 00:00:00 2001 From: scottmk Date: Wed, 4 Aug 2010 03:20:33 +0000 Subject: [PATCH] Implement the chunking of OSRF messages. I.e. bundle multiple OSRF messages into an XMPP message, up to about 10k bytes, so as to reduce networking overhead. M include/opensrf/osrf_application.h M src/libopensrf/osrf_application.c git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1988 9efc2488-bf62-4759-914b-345cdb29e865 --- include/opensrf/osrf_application.h | 136 ++++++++++++++++++++++--------------- src/libopensrf/osrf_application.c | 134 +++++++++++++++++++++++++++++------- 2 files changed, 191 insertions(+), 79 deletions(-) diff --git a/include/opensrf/osrf_application.h b/include/opensrf/osrf_application.h index 0bbb69f..433c42d 100644 --- a/include/opensrf/osrf_application.h +++ b/include/opensrf/osrf_application.h @@ -4,6 +4,31 @@ /** @file osrf_application.h @brief Routines to load and manage shared object libraries. + + Every method of a service is implemented by a C function. In a few cases those + functions are generic to all services. In other cases they are loaded and executed from + a shared object library that is specific to the application offering the service, A + registry maps method names to function names so that we can call the right function. + + Each such function has a similar signature: + + int method_name( osrfMethodContext* ctx ); + + The return value is negative in case of an error. A return code of zero implies that + the method has already sent the client a STATUS message to say that it is finished. + A return code greater than zero implies that the method has not sent such a STATUS + message, so we need to do so after the method returns. + + Any arguments passed to the method are bundled together in a jsonObject inside the + osrfMethodContext. + + An application's shared object may also implement any or all of three standard functions: + + - int osrfAppInitialize( void ) Called when an application is registered + - int osrfAppChildInit( void ) Called when a server drone is spawned + - void osrfAppChildExit( void ) Called when a server drone terminates + + osrfAppInitialize() and osrfAppChild return zero if successful, and non-zero if not. */ #include @@ -68,18 +93,65 @@ extern "C" { #define OSRF_METHOD_VERIFY_CONTEXT(d) _OSRF_METHOD_VERIFY_CONTEXT(d); #endif +/** + @name Method options + @brief Macros that get OR'd together to form method options. +*/ +/*@{*/ +/** + @brief Marks a method as a system method. + + System methods are implemented by generic functions, called via static linkage. They + are not loaded or executed from shared objects. +*/ #define OSRF_METHOD_SYSTEM 1 +/** + @brief Notes that the method may return more than one result. + + For a @em streaming method, we register both an atomic method and a non-atomic method. + See also OSRF_METHOD_ATOMIC. +*/ #define OSRF_METHOD_STREAMING 2 +/** + @brief Combines all responses into a single RESULT message. + + For a @em non-atomic method, the server returns each response to the client in a + separate RESULT message. It sends a STATUS message at the end to signify the end of the + message stream. + + For an @em atomic method, the server buffers all responses until the method returns, + and then sends them all at once in a single RESULT message (followed by a STATUS message). + Each individual response is encoded as an entry in a JSON array. This buffering is + transparent to the function that implements the method. + + Atomic methods incur less networking overhead than non-atomic methods, at the risk of + creating excessively large RESULT messages. The HTTP gateway requires the atomic versions + of streaming methods because of the stateless nature of the HTTP protocol. + + If OSRF_METHOD_STREAMING is set for a method, the application generates both an atomic + and a non-atomic method, whose names are identical except that the atomic one carries a + suffix of ".atomic". +*/ #define OSRF_METHOD_ATOMIC 4 +/** + @brief Notes that a previous result to the same call may be available in memcache. + + Before calling the registered function, a cachable method checks memcache for a previously + determined result for the same call. If no such result is available, it calls the + registered function and caches the new result before returning. + + This caching is not currently implemented for C methods. +*/ #define OSRF_METHOD_CACHABLE 8 +/*@}*/ typedef struct { - char* name; /**< the method name. */ - char* symbol; /**< the symbol name (function name). */ - char* notes; /**< public method documentation. */ - int argc; /**< how many args this method expects. */ + char* name; /**< Method name. */ + char* symbol; /**< Symbol name (function name) within the shared object. */ + char* notes; /**< Public method documentation. */ + int argc; /**< The minimum number of arguments for the method. */ //char* paramNotes; /**< Description of the params expected for this method. */ - int options; /**< bitswitches setting various options for this method. */ + int options; /**< Bit switches setting various options for this method. */ void* userData; /**< Opaque pointer to application-specific data. */ /* @@ -91,35 +163,15 @@ typedef struct { } osrfMethod; typedef struct { - osrfAppSession* session; /**< the current session. */ - osrfMethod* method; /**< the requested method. */ - jsonObject* params; /**< the params to the method. */ - int request; /**< request id. */ - jsonObject* responses; /**< array of cached responses. */ + osrfAppSession* session; /**< Pointer to the current application session. */ + osrfMethod* method; /**< Pointer to the requested method. */ + jsonObject* params; /**< Parameters to the method. */ + int request; /**< Request id. */ + jsonObject* responses; /**< Array of cached responses. */ } osrfMethodContext; -/** - Register an application - @param appName The name of the application - @param soFile The library (.so) file that implements this application - @return 0 on success, -1 on error -*/ int osrfAppRegisterApplication( const char* appName, const char* soFile ); -/** - @brief Register a method for a given application. - - @param appName Name of the application that implements the method. - @param methodName The fully qualified name of the method. - @param symbolName The symbol name (function name) that implements the method. - @param notes Public documentation for this method. - @params argc The number of arguments this method expects. - @param options Bit switches setting various options. - @return 0 on success, -1 on error - - Any method with the OSRF_METHOD_STREAMING option set will have a ".atomic" - version of the method registered automatically. -*/ int osrfAppRegisterMethod( const char* appName, const char* methodName, const char* symbolName, const char* notes, int argc, int options ); @@ -128,41 +180,19 @@ int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName, osrfMethod* _osrfAppFindMethod( const char* appName, const char* methodName ); -/** - Runs the specified method for the specified application. - @param appName The name of the application who's method to run - @param methodName The name of the method to run - @param ses The app session attached to this request - @params reqId The request id for this request - @param params The method parameters -*/ int osrfAppRunMethod( const char* appName, const char* methodName, osrfAppSession* ses, int reqId, jsonObject* params ); -/** - @brief Respond to the client with a method exception. - @param ses The current session. - @param request The request id. - @param msg The debug message to send to the client. - @return 0 on successfully sending of the message, -1 otherwise. -*/ int osrfAppRequestRespondException( osrfAppSession* ses, int request, const char* msg, ... ); int osrfAppRespond( osrfMethodContext* context, const jsonObject* data ); -int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data ); -/* OSRF_METHOD_ATOMIC and/or OSRF_METHOD_CACHABLE and/or 0 for no special options */ -//int osrfAppProcessMethodOptions( char* method ); +int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data ); -/** Tell the backend process to run its child init function */ int osrfAppRunChildInit(const char* appname); void osrfAppRunExitCode( void ); -/** - Determine whether the context looks healthy. - Return 0 if it does, or -1 if it doesn't. -*/ int osrfMethodVerifyContext( osrfMethodContext* ctx ); #ifdef __cplusplus diff --git a/src/libopensrf/osrf_application.c b/src/libopensrf/osrf_application.c index 836f73b..141d39f 100644 --- a/src/libopensrf/osrf_application.c +++ b/src/libopensrf/osrf_application.c @@ -48,6 +48,8 @@ #define OSRF_SYSMETHOD_ECHO_ATOMIC "opensrf.system.echo.atomic" /*@}*/ +#define OSRF_MSG_BUFFER_SIZE 10240 + /** @brief Represent an Application. */ @@ -238,13 +240,12 @@ void osrfAppRunExitCode( void ) { The @a options parameter is zero or more of the following macros, OR'd together: - - OSRF_METHOD_SYSTEM - - OSRF_METHOD_STREAMING - - OSRF_METHOD_ATOMIC - - OSRF_METHOD_CACHABLE + - OSRF_METHOD_SYSTEM called by static linkage (shouldn't be used here) + - OSRF_METHOD_STREAMING method may return more than one response + - OSRF_METHOD_ATOMIC return all responses collected in a single RESULT message + - OSRF_METHOD_CACHABLE cache results in memcache - If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of - the method. + If the OSRF_METHOD_STREAMING bit is set, also register an ".atomic" version of the method. */ int osrfAppRegisterMethod( const char* appName, const char* methodName, const char* symbolName, const char* notes, int argc, int options ) { @@ -325,7 +326,7 @@ static osrfMethod* _osrfAppBuildMethod( const char* methodName, const char* symb methodName = ""; // should never happen if( options & OSRF_METHOD_ATOMIC ) { - // Append ".atomic" to the name, and make the method streaming + // Append ".atomic" to the name, and make the method atomic char mb[ strlen( methodName ) + 8 ]; sprintf( mb, "%s.atomic", methodName ); method->name = strdup( mb ); @@ -486,7 +487,11 @@ int osrfAppRunMethod( const char* appName, const char* methodName, return osrfAppRequestRespondException( ses, reqId, "An unknown server error occurred" ); - return _osrfAppPostProcess( &context, retcode ); + retcode = _osrfAppPostProcess( &context, retcode ); + + if( context.responses ) + jsonObjectFree( context.responses ); + return retcode; } /** @@ -526,25 +531,64 @@ int osrfAppRespondComplete( osrfMethodContext* context, const jsonObject* data ) } /** + @brief Send any response messages that have accumulated in the output buffer. + @param ses Pointer to the current application session. + @param outbuf Pointer to the output buffer. + @return Zero if successful, or -1 if not. + + Used only by servers to respond to clients. +*/ +static int flush_responses( osrfAppSession* ses, growing_buffer* outbuf ) { + + // Collect any inbound traffic on the socket(s). This doesn't accomplish anything for the + // immediate task at hand, but it may help to keep TCP from getting clogged in some cases. + osrf_app_session_queue_wait( ses, 0, NULL ); + + int rc = 0; + if( buffer_length( outbuf ) > 0 ) { // If there's anything to send... + buffer_add_char( outbuf, ']' ); // Close the JSON array + if( osrfSendTransportPayload( ses, OSRF_BUFFER_C_STR( ses->outbuf ))) { + osrfLogError( OSRF_LOG_MARK, "Unable to flush response buffer" ); + rc = -1; + } + } + buffer_reset( ses->outbuf ); + return rc; +} + +/** + @brief Add a message to an output buffer. + @param outbuf Pointer to the output buffer. + @param msg Pointer to the message to be added, in the form of a JSON string. + + Since the output buffer is in the form of a JSON array, prepend a left bracket to the + first message, and a comma to subsequent ones. + + Used only by servers to respond to clients. +*/ +static inline void append_msg( growing_buffer* outbuf, const char* msg ) { + if( outbuf && msg ) { + char prefix = buffer_length( outbuf ) > 0 ? ',' : '['; + buffer_add_char( outbuf, prefix ); + buffer_add( outbuf, msg ); + } +} + +/** @brief Either send or enqueue a response to a client, optionally with a completion notice. @param ctx Pointer to the method context. @param data Pointer to the response, in the form of a jsonObject. @param complete Boolean: if true, we will accompany the RESULT message with a STATUS message indicating that the response is complete. - @return Zero if successful, or -1 upon error. The only recognized errors are if either - the @a ctx pointer or its method pointer is NULL. + @return Zero if successful, or -1 upon error. For an atomic method, add a copy of the response data to a cache within the method context, to be sent later. In this case the @a complete parameter has no effect, because we'll send the STATUS message later when we send the cached results. - If the method is cachable but not atomic, do nothing, ignoring the results in @a data. - Apparently there are no cachable methods at this writing. If we ever invent some, we - may need to revisit this function. - - If the method is neither atomic nor cachable, then send a RESULT message to the client, - with the results in @a data. If @a complete is true, also send a STATUS message to - indicate that the response is complete. + If the method is not atomic, translate the message into JSON and append it to a buffer, + flushing the buffer as needed to avoid overflow. If @a complete is true, append + a STATUS message (as JSON) to the buffer and flush the buffer. */ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int complete ) { if(!(ctx && ctx->method)) return -1; @@ -560,15 +604,55 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int // Add a copy of the data object to the cache. if ( data != NULL ) jsonObjectPush( ctx->responses, jsonObjectClone(data) ); - } + } else { + osrfLogDebug( OSRF_LOG_MARK, + "Adding responses to stash for method %s", ctx->method->name ); + + if( data ) { + // If you want to flush the intput buffers for every output message, + // this is the place to do it. + //osrf_app_session_queue_wait( ctx->session, 0, NULL ); + + // Create an OSRF message + osrfMessage* msg = osrf_message_init( RESULT, ctx->request, 1 ); + osrf_message_set_status_info( msg, NULL, "OK", OSRF_STATUS_OK ); + osrf_message_set_result( msg, data ); + + // Serialize the OSRF message into JSON text + char* json = jsonObjectToJSON( osrfMessageToJSON( msg )); + osrfMessageFree( msg ); + + // If the new message would overflow the buffer, flush the output buffer first + int len_so_far = buffer_length( ctx->session->outbuf ); + if( len_so_far && (strlen( json ) + len_so_far >= OSRF_MSG_BUFFER_SIZE - 3) ) { + if( flush_responses( ctx->session, ctx->session->outbuf )) + return -1; + } + + // Append the JSON text to the output buffer + append_msg( ctx->session->outbuf, json ); + free( json ); + } - if( !(ctx->method->options & OSRF_METHOD_ATOMIC ) && - !(ctx->method->options & OSRF_METHOD_CACHABLE) ) { + if(complete) { + // Create a STATUS message + osrfMessage* status_msg = osrf_message_init( STATUS, ctx->request, 1 ); + osrf_message_set_status_info( status_msg, "osrfConnectStatus", "Request Complete", + OSRF_STATUS_COMPLETE ); - if(complete) - osrfAppRequestRespondComplete( ctx->session, ctx->request, data ); - else - osrfAppRequestRespond( ctx->session, ctx->request, data ); + // Serialize the STATUS message into JSON text + char* json = jsonObjectToJSON( osrfMessageToJSON( status_msg )); + osrfMessageFree( status_msg ); + + // Add the STATUS message to the output buffer. + // It's short, so don't worry about avoiding overflow. + append_msg( ctx->session->outbuf, json ); + free( json ); + + // Flush the output buffer, sending any accumulated messages. + if( flush_responses( ctx->session, ctx->session->outbuf )) + return -1; + } } return 0; @@ -598,8 +682,6 @@ static int _osrfAppPostProcess( osrfMethodContext* ctx, int retcode ) { // any responses yet). Now send them all at once, followed by a STATUS message // to say that we're finished. osrfAppRequestRespondComplete( ctx->session, ctx->request, ctx->responses ); - jsonObjectFree(ctx->responses); - ctx->responses = NULL; } else { // We have no cached responses to return. -- 2.11.0