becomes available before the end of the timeout; otherwise NULL;
If there is already a message available in the input queue for this request, dequeue and
- return it immediately. Otherwise wait up to timeout seconds until you either get an
+ return it immediately. Otherwise wait up to timeout seconds until you either get an
input message for the specified request, run out of time, or encounter an error.
+ If the only message we receive for this request is a STATUS message with a status code
+ OSRF_STATUS_COMPLETE, then return NULL. That means that the server has nothing further
+ to send in response to this request.
+
You may also receive other messages for other requests, and other sessions. These other
messages will be wholly or partially processed behind the scenes while you wait for the
one you want.
osrfLogDebug( OSRF_LOG_MARK, "In app_request receive with remaining time [%d]",
(int) remaining );
-
+
osrf_app_session_queue_wait( req->session, 0, NULL );
if(req->session->transport_error) {
osrfLogError(OSRF_LOG_MARK, "Transport error in recv()");
session->remote_service = strdup(remote_service);
session->session_locale = NULL;
session->transport_error = 0;
+ session->panic = 0;
#ifdef ASSUME_STATELESS
session->stateless = 1;
/* defaulting to protocol 1 for now */
osrfMessage* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 );
+
+ // Address this message to the router
osrf_app_session_reset_remote( session );
session->state = OSRF_SESSION_CONNECTING;
int ret = _osrf_app_session_send( session, con_msg );
message; process subsequent messages if they are available, but don't wait for them.
The first parameter identifies an osrfApp session, but all we really use it for is to
- get a pointer to the transport_session. Typically, if not always, a given process
- opens only a single transport_session (to talk to the Jabber server), and all app
- sessions in that process use the same transport_session.
+ get a pointer to the transport_session. Typically, a given process opens only a single
+ transport_session (to talk to the Jabber server), and all app sessions in that process
+ use the same transport_session.
- Hence this function indiscriminately waits for input messages for all osrfAppSessions,
- not just the one specified.
+ Hence this function indiscriminately waits for input messages for all osrfAppSessions
+ tied to the same Jabber session, not just the one specified.
Dispatch each message to the appropriate processing routine, depending on its type
and contents, and on whether we're acting as a client or as a server for that message.
free(session->orig_remote_id);
free(session->session_id);
free(session->remote_service);
-
+
// Free the request hash
int i;
for( i = 0; i < OSRF_REQUEST_HASH_SIZE; ++i ) {
/**
@brief Free the global session cache.
-
+
Note that the osrfHash that implements the global session cache does @em not have a
callback function installed for freeing its cargo. As a result, any remaining
osrfAppSessions are leaked, along with all the osrfAppRequests and osrfMessages they
osrfAppSessionCache = NULL;
}
+/**
+ @brief Arrange for immediate termination of the process.
+ @param ses Pointer to the current osrfAppSession.
-
-
-
+ Typical use case: a server drone loses its database connection, thereby becoming useless.
+ It terminates so that it will not receive further requests, being unable to service them.
+*/
+void osrfAppSessionPanic( osrfAppSession* ses ) {
+ if( ses )
+ ses->panic = 1;
+}
static void del_prefork_child( prefork_simple* forker, pid_t pid );
static void check_children( prefork_simple* forker, int forever );
-static void prefork_child_process_request(prefork_child*, char* data);
+static int prefork_child_process_request(prefork_child*, char* data);
static int prefork_child_init_hook(prefork_child*);
static prefork_child* prefork_child_init( prefork_simple* forker,
int read_data_fd, int write_data_fd,
}
// Called only by a child process
-static void prefork_child_process_request(prefork_child* child, char* data) {
- if( !child ) return;
+// Non-zero return code means that the child process has decided to terminate immediately,
+// without waiting for a DISCONNECT or max_requests.
+static int prefork_child_process_request(prefork_child* child, char* data) {
+ if( !child ) return 0;
transport_client* client = osrfSystemGetTransportClient();
transport_message* msg = new_message_from_xml( data );
osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
- if(!session) return;
+ if(!session) return 0;
+
+ int rc = session->panic;
+
+ if( rc ) {
+ osrfLogWarning( OSRF_LOG_MARK,
+ "Drone for session %s terminating immediately", session->session_id );
+ osrfAppSessionFree( session );
+ return rc;
+ }
if( session->stateless && session->state != OSRF_SESSION_CONNECTED ) {
osrfAppSessionFree( session );
- return;
+ return rc;
}
osrfLogDebug( OSRF_LOG_MARK, "Entering keepalive loop for session %s", session->session_id );
osrfLogDebug(OSRF_LOG_MARK, "Data received == %d", recvd);
+ if( session->panic )
+ rc = 1;
+
if(retval) {
osrfLogError(OSRF_LOG_MARK, "queue-wait returned non-success %d", retval);
break;
break;
}
+
+ // If the child process has decided to terminate immediately
+ if( rc )
+ break;
}
osrfLogDebug( OSRF_LOG_MARK, "Exiting keepalive loop for session %s", session->session_id );
osrfAppSessionFree( session );
- return;
+ return rc;
}
/**
break;
}
+ int terminate_now = 0; // Boolean
+
if( n < 0 ) {
osrfLogWarning( OSRF_LOG_MARK,
"Prefork child read returned error with errno %d", errno );
break;
} else if( gotdata ) {
- osrfLogDebug(OSRF_LOG_MARK, "Prefork child got a request.. processing..");
- prefork_child_process_request(child, gbuf->buf);
+ osrfLogDebug( OSRF_LOG_MARK, "Prefork child got a request.. processing.." );
+ terminate_now = prefork_child_process_request( child, gbuf->buf );
buffer_reset( gbuf );
}
+ if( terminate_now ) {
+ osrfLogWarning( OSRF_LOG_MARK, "Prefork child terminating abruptly" );
+ break;
+ }
+
if( i < child->max_requests - 1 ) {
size_t msg_len = 9;
ssize_t len = write(