#include <opensrf/osrf_stack.h>
#include <opensrf/osrf_application.h>
+/**
+ @file osrf_stack.c
+ @brief Routines to receive and process input osrfMessages.
+*/
+
/* the max number of oilsMessage blobs present in any one root packet */
#define OSRF_MAX_MSGS_PER_PACKET 256
// -----------------------------------------------------------------------------
-static void osrf_stack_application_handler( osrfAppSession* session, osrfMessage* msg );
static void _do_client( osrfAppSession*, osrfMessage* );
static void _do_server( osrfAppSession*, osrfMessage* );
@param client Pointer to the transport_client whose socket is to be read.
@param timeout How many seconds to wait for the first message.
@param msg_received A pointer through which to report whether a message was received.
- @return 0 upon success (even if a timeout occurred) or -1 upon failure.
+ @return 0 upon success (even if a timeout occurs), or -1 upon failure.
Read and process all available transport_messages from the socket of the specified
transport_client. Pass each one through osrf_stack_transport().
// Entry point into the stack
// -----------------------------------------------------------------------------
/**
- @brief Unpack a transport into one or more osrfMessages, and process each one.
- @param msg The transport message to be unpacked and processed.
+ @brief Unpack a transport_message into one or more osrfMessages, and process each one.
+ @param msg Pointer to the transport_message to be unpacked and processed.
@param my_service Application name (optional).
@return Pointer to an osrfAppSession -- either a pre-existing one or a new one.
/* Convert the message body into one or more osrfMessages */
int num_msgs = osrf_message_deserialize(msg->body, arr, OSRF_MAX_MSGS_PER_PACKET);
- osrfLogDebug( OSRF_LOG_MARK, "We received %d messages from %s", num_msgs, msg->sender );
+ osrfLogDebug( OSRF_LOG_MARK, "We received %d messages from %s", num_msgs, msg->sender );
double starttime = get_timestamp_millis();
}
/**
- If we return a message, that message should be passed up the stack,
- if we return NULL, we're finished for now...
+ @brief Acting as a client, process an incoming osrfMessage.
+ @param session Pointer to the osrfAppSession to which the message pertains.
+ @param msg Pointer to the osrfMessage.
+
+ What we do with the message depends on the combination of message type and status code:
+ - If it's a RESULT message, add it to the message queue of the appropriate app session,
+ to be handled later.
+ - If it's a STATUS message, handle it according to its status code and return NULL --
+ unless it has an unexpected status code, in which case add it to the message queue of
+ the appropriate app session, to be handled later.
*/
static void _do_client( osrfAppSession* session, osrfMessage* msg ) {
if(session == NULL || msg == NULL)
return;
- osrfMessage* further_msg = NULL;
-
if( msg->m_type == STATUS ) {
switch( msg->status_code ) {
case OSRF_STATUS_OK:
+ // This combination of message type and status code comes
+ // only from the router, in response to a CONNECT message.
osrfLogDebug( OSRF_LOG_MARK, "We connected successfully");
session->state = OSRF_SESSION_CONNECTED;
osrfLogDebug( OSRF_LOG_MARK, "State: %x => %s => %d", session,
session->session_id, session->state );
+ osrfMessageFree(msg);
break;
case OSRF_STATUS_COMPLETE:
osrf_app_session_set_complete( session, msg->thread_trace );
+ osrfMessageFree(msg);
break;
case OSRF_STATUS_CONTINUE:
osrf_app_session_request_reset_timeout( session, msg->thread_trace );
+ osrfMessageFree(msg);
break;
case OSRF_STATUS_REDIRECTED:
osrf_app_session_reset_remote( session );
session->state = OSRF_SESSION_DISCONNECTED;
osrf_app_session_request_resend( session, msg->thread_trace );
+ osrfMessageFree(msg);
break;
case OSRF_STATUS_EXPFAILED:
osrf_app_session_reset_remote( session );
session->state = OSRF_SESSION_DISCONNECTED;
+ osrfMessageFree(msg);
break;
case OSRF_STATUS_TIMEOUT:
osrf_app_session_reset_remote( session );
session->state = OSRF_SESSION_DISCONNECTED;
osrf_app_session_request_resend( session, msg->thread_trace );
+ osrfMessageFree(msg);
break;
default:
+ {
/* Replace the old message with a new one */
- further_msg = osrf_message_init( RESULT, msg->thread_trace, msg->protocol );
- osrf_message_set_status_info( further_msg,
+ osrfMessage* new_msg = osrf_message_init(
+ RESULT, msg->thread_trace, msg->protocol );
+ osrf_message_set_status_info( new_msg,
msg->status_name, msg->status_text, msg->status_code );
osrfLogWarning( OSRF_LOG_MARK, "The stack doesn't know what to do with "
"the provided message code: %d, name %s. Passing UP.",
msg->status_code, msg->status_name );
- further_msg->is_exception = 1;
+ new_msg->is_exception = 1;
osrf_app_session_set_complete( session, msg->thread_trace );
+ osrfLogDebug( OSRF_LOG_MARK,
+ "passing client message %d / session %s to app handler",
+ msg->thread_trace, session->session_id );
+ osrfMessageFree(msg);
+ // Enqueue the new message to be processed later
+ osrf_app_session_push_queue( session, new_msg );
break;
- }
+ } // end default
+ } // end switch
} else if( msg->m_type == RESULT ) {
- further_msg = msg;
- }
-
- if(further_msg) {
osrfLogDebug( OSRF_LOG_MARK, "passing client message %d / session %s to app handler",
msg->thread_trace, session->session_id );
- osrf_stack_application_handler( session, further_msg );
- }
+ // Enqueue the RESULT message to be processed later
+ osrf_app_session_push_queue( session, msg );
- if(msg != further_msg)
- osrfMessageFree(msg);
+ }
return;
}
-
/**
- If we return a message, that message should be passed up the stack,
- if we return NULL, we're finished for now...
+ @brief Acting as a server, process an incoming osrfMessage.
+ @param session Pointer to the osrfAppSession to which the message pertains.
+ @param msg Pointer to the osrfMessage.
+
+ Branch on the message type. In particular, if it's a REQUEST, call the requested method.
*/
static void _do_server( osrfAppSession* session, osrfMessage* msg ) {
osrfLogDebug( OSRF_LOG_MARK, "Server received message of type %d", msg->m_type );
- osrfMessage* further_msg = NULL;
-
switch( msg->m_type ) {
case STATUS:
break;
case REQUEST:
-
osrfLogDebug( OSRF_LOG_MARK, "server passing message %d to application handler "
"for session %s", msg->thread_trace, session->session_id );
- further_msg = msg;
+
+ osrfAppRunMethod( session->remote_service, msg->method_name,
+ session, msg->thread_trace, msg->_params );
+
break;
default:
break;
}
- if(further_msg) {
- osrfLogDebug( OSRF_LOG_MARK, "passing server message %d / session %s to app handler",
- msg->thread_trace, session->session_id );
- osrf_stack_application_handler( session, further_msg );
- }
-
- if(msg != further_msg)
- osrfMessageFree(msg);
-
+ osrfMessageFree(msg);
return;
}
-
-
-
-static void osrf_stack_application_handler( osrfAppSession* session, osrfMessage* msg ) {
- if(session == NULL || msg == NULL) return;
-
- if(msg->m_type == RESULT && session->type == OSRF_SESSION_CLIENT) {
- /* Enqueue the RESULT message to be processed later */
- osrf_app_session_push_queue( session, msg );
- }
- else if(msg->m_type == REQUEST) {
- char* method = msg->method_name;
- char* app = session->remote_service;
- jsonObject* params = msg->_params;
-
- osrfAppRunMethod( app, method, session, msg->thread_trace, params );
- osrfMessageFree(msg);
- }
-
- return;
-}
-
-