From: erickson Date: Mon, 15 May 2006 14:37:43 +0000 (+0000) Subject: added connection oriented statefull session handling to the server stack X-Git-Tag: osrf_rel_2_0_1~1158 X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=32c2d30bf06927964f520a1f3b55feb558724ab5;p=OpenSRF.git added connection oriented statefull session handling to the server stack git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@715 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/src/c-apps/osrf_math.c b/src/c-apps/osrf_math.c index f307804..deb11f1 100644 --- a/src/c-apps/osrf_math.c +++ b/src/c-apps/osrf_math.c @@ -72,6 +72,12 @@ int osrfMathRun( osrfMethodContext* ctx ) { /* connect to db math */ osrfAppSession* ses = osrfAppSessionClientInit("opensrf.dbmath"); + /* forcing an explicit connect allows us to talk to one worker backend + * regardless of "stateful" config settings for the server + * This buys us nothing here since we're only sending one request... + * */ + /*osrfAppSessionConnect(ses);*/ + /* dbmath uses the same method names that math does */ int req_id = osrfAppSessionMakeRequest( ses, newParams, ctx->method->name, 1, NULL ); osrfMessage* omsg = osrfAppSessionRequestRecv( ses, req_id, 60 ); diff --git a/src/libstack/osrf_app_session.c b/src/libstack/osrf_app_session.c index f7ed56c..2813c43 100644 --- a/src/libstack/osrf_app_session.c +++ b/src/libstack/osrf_app_session.c @@ -401,6 +401,11 @@ int osrf_app_session_push_queue( return 0; } +int osrfAppSessionConnect( osrf_app_session* session ) { + return osrf_app_session_connect(session); +} + + /** Attempts to connect to the remote service */ int osrf_app_session_connect(osrf_app_session* session){ @@ -486,17 +491,22 @@ int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int osrf_app_session_queue_wait( session, 0 ); - /* if we're not stateless and not connected and the first - message is not a connect message, then we do the connect first */ - if(session->stateless) { + if(session->state != OSRF_SESSION_CONNECTED) { + + if(session->stateless) { /* stateless session always send to the root listener */ osrf_app_session_reset_remote(session); - } else { + } else { + + /* do an auto-connect if necessary */ + if( ! session->stateless && + (msg->m_type != CONNECT) && + (msg->m_type != DISCONNECT) && + (session->state != OSRF_SESSION_CONNECTED) ) { - if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) && - (session->state != OSRF_SESSION_CONNECTED) ) { - if(!osrf_app_session_connect( session )) - return 0; + if(!osrf_app_session_connect( session )) + return 0; + } } } } diff --git a/src/libstack/osrf_app_session.h b/src/libstack/osrf_app_session.h index 9e357b6..f503821 100644 --- a/src/libstack/osrf_app_session.h +++ b/src/libstack/osrf_app_session.h @@ -144,6 +144,7 @@ int osrf_app_session_push_queue( osrf_app_session*, osrf_message* msg ); * connection, 0 otherwise. */ int osrf_app_session_connect( osrf_app_session* ); +int osrfAppSessionConnect( osrf_app_session* ); /** Sends a disconnect message to the remote service. No response is expected */ int osrf_app_session_disconnect( osrf_app_session* ); diff --git a/src/libstack/osrf_prefork.c b/src/libstack/osrf_prefork.c index f9761d2..1246236 100644 --- a/src/libstack/osrf_prefork.c +++ b/src/libstack/osrf_prefork.c @@ -29,6 +29,18 @@ int osrf_prefork_run(char* appname) { jsonObject* min_children = osrf_settings_host_value_object("/apps/%s/unix_config/min_children", appname); jsonObject* max_children = osrf_settings_host_value_object("/apps/%s/unix_config/max_children", appname); + char* keepalive = osrf_settings_host_value("/apps/%s/keepalive", appname); + time_t kalive; + if( keepalive ) { + kalive = atoi(keepalive); + free(keepalive); + } else { + kalive = 5; /* give it a default */ + } + + osrfLogInfo(OSRF_LOG_MARK, "keepalive setting = %d seconds", kalive); + + if(!max_req) osrfLogWarning( OSRF_LOG_MARK, "Max requests not defined, assuming 1000"); else maxr = (int) jsonObjectGetNumber(max_req); @@ -58,6 +70,7 @@ int osrf_prefork_run(char* appname) { osrfSystemGetTransportClient(), maxr, minc, maxc); forker->appname = strdup(appname); + forker->keepalive = kalive; if(forker == NULL) { osrfLogError( OSRF_LOG_MARK, "osrf_prefork_run() failed to create prefork_simple object"); @@ -148,9 +161,46 @@ void prefork_child_process_request(prefork_child* child, char* data) { return; } - /* keepalive loop for stateful sessions */ + osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id ); + int keepalive = child->keepalive; + int retval; + time_t start; + time_t end; + + while(1) { + + osrfLogDebug(OSRF_LOG_MARK, + "osrf_prefork calling queue_wait [%d] in keepalive loop", keepalive); + start = time(NULL); + retval = osrf_app_session_queue_wait(session, keepalive); + end = time(NULL); + + if(retval) { + osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval); + break; + } + + /* see if the client disconnected from us */ + if(session->state != OSRF_SESSION_CONNECTED) break; + + /* see if the used up the timeout */ + if( (end - start) >= keepalive ) { + + osrfLogDebug(OSRF_LOG_MARK, "Keepalive timed out, exiting connected session"); + + osrfAppSessionStatus( + session, + OSRF_STATUS_TIMEOUT, + "osrfConnectStatus", + 0, "Disconnected on timeout" ); - osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id ); + break; + } + } + + osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id ); + osrfAppSessionFree( session ); + return; } @@ -205,6 +255,7 @@ prefork_child* launch_child( prefork_simple* forker ) { data_fd[1], status_fd[0], status_fd[1] ); child->appname = strdup(forker->appname); + child->keepalive = forker->keepalive; add_prefork_child( forker, child ); diff --git a/src/libstack/osrf_prefork.h b/src/libstack/osrf_prefork.h index 474d2b0..0e14259 100644 --- a/src/libstack/osrf_prefork.h +++ b/src/libstack/osrf_prefork.h @@ -37,6 +37,7 @@ struct prefork_simple_struct { int data_to_child; int data_to_parent; int current_num_children; + int keepalive; /* keepalive time for stateful sessions */ char* appname; struct prefork_child_struct* first_child; transport_client* connection; @@ -53,6 +54,7 @@ struct prefork_child_struct { int available; int max_requests; char* appname; + int keepalive; struct prefork_child_struct* next; }; diff --git a/src/libstack/osrf_stack.c b/src/libstack/osrf_stack.c index 1c2e4c8..83de64a 100644 --- a/src/libstack/osrf_stack.c +++ b/src/libstack/osrf_stack.c @@ -200,11 +200,14 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) { case DISCONNECT: /* session will be freed by the forker */ + osrfLogDebug(OSRF_LOG_MARK, "Client sent explicit disconnect"); + session->state = OSRF_SESSION_DISCONNECTED; return NULL; case CONNECT: osrfAppSessionStatus( session, OSRF_STATUS_OK, "osrfConnectStatus", msg->thread_trace, "Connection Successful" ); + session->state = OSRF_SESSION_CONNECTED; return NULL; case REQUEST: @@ -215,6 +218,7 @@ osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) { default: osrfLogWarning( OSRF_LOG_MARK, "Server cannot handle message of type %d", msg->m_type ); + session->state = OSRF_SESSION_DISCONNECTED; return NULL; } diff --git a/src/srfsh/srfsh.c b/src/srfsh/srfsh.c index b26f9b3..52b6918 100644 --- a/src/srfsh/srfsh.c +++ b/src/srfsh/srfsh.c @@ -800,6 +800,7 @@ int handle_math( char* words[] ) { int do_math( int count, int style ) { osrf_app_session* session = osrf_app_client_session_init( "opensrf.math" ); + osrf_app_session_connect(session); jsonObject* params = json_parse_string("[]"); jsonObjectPush(params,jsonNewObject("1"));