From b3b6b4211472e4897581a93d9615d8544f29779f Mon Sep 17 00:00:00 2001 From: Mike Rylander Date: Sun, 23 Feb 2014 16:35:17 -0500 Subject: [PATCH] LP#1612771: C support for receiving chunked responses * client parsing * consistent w/ Perl, we now have "bundling" and "chunking" Signed-off-by: Bill Erickson --- include/opensrf/osrf_app_session.h | 1 + include/opensrf/osrf_application.h | 5 ++-- src/libopensrf/osrf_app_session.c | 56 ++++++++++++++++++++++++++++++++++++++ src/libopensrf/osrf_application.c | 17 ++++++------ 4 files changed, 69 insertions(+), 10 deletions(-) diff --git a/include/opensrf/osrf_app_session.h b/include/opensrf/osrf_app_session.h index 57d9092..9c9efad 100644 --- a/include/opensrf/osrf_app_session.h +++ b/include/opensrf/osrf_app_session.h @@ -34,6 +34,7 @@ struct osrf_app_request_struct; typedef struct osrf_app_request_struct osrfAppRequest; #define OSRF_REQUEST_HASH_SIZE 64 +#define OSRF_MSG_CHUNK_SIZE 104858 /* 0.1 MB */ /** @brief Representation of a session with another application. diff --git a/include/opensrf/osrf_application.h b/include/opensrf/osrf_application.h index e50e114..68b096c 100644 --- a/include/opensrf/osrf_application.h +++ b/include/opensrf/osrf_application.h @@ -91,7 +91,8 @@ typedef struct { //char* paramNotes; /**< Description of the params expected for this method. */ int options; /**< Bit switches setting various options for this method. */ void* userData; /**< Opaque pointer to application-specific data. */ - size_t bufsize; /**< How big a buffer to use for non-atomic methods */ + size_t max_bundle_size; /**< How big a buffer to use for non-atomic methods */ + size_t max_chunk_size; /**< Maximum content size per message; 0 means no limit */ /* int sysmethod; @@ -117,7 +118,7 @@ int osrfAppRegisterMethod( const char* appName, const char* methodName, int osrfAppRegisterExtendedMethod( const char* appName, const char* methodName, const char* symbolName, const char* notes, int argc, int options, void* ); -int osrfMethodSetBufferSize( const char* appName, const char* methodName, size_t bufsize ); +int osrfMethodSetBundleSize( const char* appName, const char* methodName, size_t max_bundle_size ); osrfMethod* _osrfAppFindMethod( const char* appName, const char* methodName ); diff --git a/src/libopensrf/osrf_app_session.c b/src/libopensrf/osrf_app_session.c index 0b5c6d6..c7b80d3 100644 --- a/src/libopensrf/osrf_app_session.c +++ b/src/libopensrf/osrf_app_session.c @@ -24,6 +24,9 @@ struct osrf_app_request_struct { /** Linked list of responses to the request. */ osrfMessage* result; + /** Buffer used to collect partial response messages */ + growing_buffer* part_response_buffer; + /** Boolean; if true, then a call that is waiting on a response will reset the timeout and set this variable back to false. */ int reset_timeout; @@ -76,6 +79,7 @@ static osrfAppRequest* _osrf_app_request_init( req->reset_timeout = 0; req->next = NULL; req->prev = NULL; + req->part_response_buffer = NULL; return req; } @@ -98,6 +102,9 @@ static void _osrf_app_request_free( osrfAppRequest * req ) { req->result = next_msg; } + if (req->part_response_buffer) + buffer_free(req->part_response_buffer); + free( req ); } } @@ -114,8 +121,57 @@ static void _osrf_app_request_push_queue( osrfAppRequest* req, osrfMessage* resu if(req == NULL || result == NULL) return; + if (result->status_code == OSRF_STATUS_PARTIAL) { + osrfLogDebug(OSRF_LOG_MARK, "received partial message response"); + + if (!req->part_response_buffer) { + // assume the max_chunk_size of the server matches ours for + // buffer initialization, since the setting will usually be + // a site-wide value. + req->part_response_buffer = buffer_init(OSRF_MSG_CHUNK_SIZE + 1); + } + + const char* partial = jsonObjectGetString(result->_result_content); + + if (partial != NULL) { + osrfLogDebug(OSRF_LOG_MARK, + "adding %d bytes to response buffer", strlen(partial)); + + // add the partial contents of the message to the buffer + buffer_add(req->part_response_buffer, partial); + } + + // all done. req and result are freed by the caller + return; + + } else if (result->status_code == OSRF_STATUS_NOCONTENT) { + if (req->part_response_buffer && req->part_response_buffer->n_used) { + + // part_response_buffer contains a stitched-together JSON string + osrfLogDebug(OSRF_LOG_MARK, + "partial response complete, parsing %d bytes", + req->part_response_buffer->n_used); + + // coerce the partial-complete response into a standard RESULT. + osrf_message_set_status_info(result, NULL, "OK", OSRF_STATUS_OK); + + // use the stitched-together JSON string as the result conten + osrf_message_set_result_content( + result, req->part_response_buffer->buf); + + // free string, keep the buffer + buffer_reset(req->part_response_buffer); + + } else { + osrfLogDebug(OSRF_LOG_MARK, + "Received OSRF_STATUS_NOCONTENT with no preceeding content"); + return; + } + } + osrfLogDebug( OSRF_LOG_MARK, "App Session pushing request [%d] onto request queue", result->thread_trace ); + if(req->result == NULL) { req->result = result; // Add the first node diff --git a/src/libopensrf/osrf_application.c b/src/libopensrf/osrf_application.c index c205214..03eb7a3 100644 --- a/src/libopensrf/osrf_application.c +++ b/src/libopensrf/osrf_application.c @@ -89,7 +89,7 @@ /** @brief Default size of output buffer. */ -#define OSRF_MSG_BUFFER_SIZE 10240 +#define OSRF_MSG_BUNDLE_SIZE 10240 /** @brief Represent an Application. @@ -421,7 +421,8 @@ static osrfMethod* build_method( const char* methodName, const char* symbolName, if(user_data) method->userData = user_data; - method->bufsize = OSRF_MSG_BUFFER_SIZE; + method->max_bundle_size = OSRF_MSG_BUNDLE_SIZE; + method->max_chunk_size = OSRF_MSG_CHUNK_SIZE; return method; } @@ -429,7 +430,7 @@ static osrfMethod* build_method( const char* methodName, const char* symbolName, @brief Set the effective output buffer size for a given method. @param appName Name of the application. @param methodName Name of the method. - @param bufsize Desired size of the output buffer, in bytes. + @param max_bundle_size Desired size of the output buffer, in bytes. @return Zero if successful, or -1 if the specified method cannot be found. A smaller buffer size may result in a lower latency for the first response, since we don't @@ -442,18 +443,18 @@ static osrfMethod* build_method( const char* methodName, const char* symbolName, This function has no effect on atomic methods, because all responses are sent in a single message anyway. Likewise it has no effect on a method that returns only a single response. */ -int osrfMethodSetBufferSize( const char* appName, const char* methodName, size_t bufsize ) { +int osrfMethodSetBundleSize( const char* appName, const char* methodName, size_t max_bundle_size ) { osrfMethod* method = _osrfAppFindMethod( appName, methodName ); if( method ) { osrfLogInfo( OSRF_LOG_MARK, "Setting outbuf buffer size to %lu for method %s of application %s", - (unsigned long) bufsize, methodName, appName ); - method->bufsize = bufsize; + (unsigned long) max_bundle_size, methodName, appName ); + method->max_bundle_size = max_bundle_size; return 0; } else { osrfLogWarning( OSRF_LOG_MARK, "Unable to set outbuf buffer size to %lu for method %s of application %s", - (unsigned long) bufsize, methodName, appName ); + (unsigned long) max_bundle_size, methodName, appName ); return -1; } } @@ -752,7 +753,7 @@ static int _osrfAppRespond( osrfMethodContext* ctx, const jsonObject* data, int // If the new message would overflow the buffer, flush the output buffer first int len_so_far = buffer_length( ctx->session->outbuf ); - if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->bufsize )) { + if( len_so_far && (strlen( json ) + len_so_far + 3 >= ctx->method->max_bundle_size )) { if( flush_responses( ctx->session, ctx->session->outbuf )) return -1; } -- 2.11.0