int initialize();
int childInit();
-int osrfMathRun( osrfMethodDispatcher* );
+int osrfMathRun( osrfMethodContext* );
int initialize() {
return 0;
}
-int osrfMathRun( osrfMethodDispatcher* d ) {
+int osrfMathRun( osrfMethodContext* d ) {
- OSRF_METHOD_VERIFY_DISPATCHER(d);
+ OSRF_METHOD_VERIFY_CONTEXT(d);
jsonObject* x = jsonObjectGetIndex(params, 0);
jsonObject* y = jsonObjectGetIndex(params, 1);
if(!strcmp(method->name, "div")) r = i / j;
jsonObject* resp = jsonNewNumberObject(r);
- osrfAppRequestRespond( session, request, resp );
+ osrfAppRequestRespondComplete( session, request, resp );
jsonObjectFree(resp);
free(a); free(b);
int initialize();
int childInit();
-int osrfMathRun( osrfMethodDispatcher* );
+int osrfMathRun( osrfMethodContext* );
int initialize() {
return 0;
}
-int osrfMathRun( osrfMethodDispatcher* d ) {
+int osrfMathRun( osrfMethodContext* c ) {
- OSRF_METHOD_VERIFY_DISPATCHER(d); /* see osrf_application.h */
+ OSRF_METHOD_VERIFY_CONTEXT(c); /* see osrf_application.h */
/* collect the request params */
jsonObject* x = jsonObjectGetIndex(params, 0);
if(omsg) {
/* return dbmath's response to the user */
- osrfAppRequestRespond( session, request, osrfMessageGetResult(omsg) );
+ osrfAppRequestRespondComplete( session, request, osrfMessageGetResult(omsg) );
osrfMessageFree(omsg);
return 0;
}
#include "osrf_system.h"
+#include "opensrf/utils.h"
int main( int argc, char* argv[] ) {
fprintf(stderr, "Loading OpenSRF host %s with bootstrap config %s "
"and config context %s\n", argv[1], argv[2], argv[3] );
- osrfSystemBootstrap( argv[1], argv[2], argv[3] );
+ char* host = strdup( argv[1] );
+ char* config = strdup( argv[2] );
+ char* context = strdup( argv[3] );
+
+ init_proc_title( argc, argv );
+ set_proc_title( "opensrf system" );
+
+ osrfSystemBootstrap( host, config, context );
+
+ free(host);
+ free(config);
+ free(context);
return 0;
}
return _osrf_app_request_resend( req );
}
-/** Send the given message */
-int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
- if(session == NULL) return 0;
- int ret_val= 0;
- osrf_app_session_queue_wait( session, 0 );
- debug_handler( "AppSession sending type %d, and thread_trace %d",
- msg->m_type, msg->thread_trace );
+int osrfAppSessionSendBatch( osrfAppSession* session, osrf_message* msgs[], int size ) {
- if(session->stateless) {
- osrf_app_session_reset_remote(session);
+ if( !(session && msgs && size > 0) ) return 0;
+ int retval = 0;
- } else {
- if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
+ osrfMessage* msg = msgs[0];
+
+ if(msg) {
+
+ osrf_app_session_queue_wait( session, 0 );
+
+ /* if we're not stateless and the first message is not a connect
+ message, then we do the connect first */
+ if(session->stateless) {
+ osrf_app_session_reset_remote(session);
+
+ } else {
+
+ if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) &&
(session->state != OSRF_SESSION_CONNECTED) ) {
- if(!osrf_app_session_connect( session ))
- return 0;
+ if(!osrf_app_session_connect( session ))
+ return 0;
+ }
}
}
- //char* xml = osrf_message_to_xml(msg);
- char* string = osrf_message_serialize(msg);
+ char* string = osrfMessageSerializeBatch(msgs, size);
- debug_handler("[%s] [%s] Remote Id: %s",
- session->remote_service, session->session_id, session->remote_id );
+ if( string ) {
- transport_message* t_msg = message_init(
- string, "", session->session_id, session->remote_id, NULL );
+ transport_message* t_msg = message_init(
+ string, "", session->session_id, session->remote_id, NULL );
+
+ debug_handler("Session [%s] [%s] sending to %s \nXML:\n%s",
+ session->remote_service, session->session_id, t_msg->recipient, string );
+
+ retval = client_send_message( session->transport_handle, t_msg );
+
+ free(string);
+ message_free( t_msg );
+ }
+
+ return retval;
- debug_handler("Session [%s] [%s] sending to %s \nXML:\n%s",
- session->remote_service, session->session_id, t_msg->recipient, string );
- ret_val = client_send_message( session->transport_handle, t_msg );
- free(string);
- message_free( t_msg );
- return ret_val;
}
+
+
+int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){
+ if( !(session && msg) ) return 0;
+ osrfMessage* a[1];
+ a[0] = msg;
+ return osrfAppSessionSendBatch( session, a, 1 );
+}
+
+
+
+
/** Waits up to 'timeout' seconds for some data to arrive.
* Any data that arrives will be processed according to its
* payload and message type. This method will return after
}
+int osrfAppRequestRespondComplete(
+ osrfAppSession* ses, int requestId, jsonObject* data ) {
+
+ osrf_message* payload = osrf_message_init( RESULT, requestId, 1 );
+ osrf_message_set_status_info( payload, NULL, "OK", OSRF_STATUS_OK );
+
+ char* json = jsonObjectToJSON( data );
+ osrf_message_set_result_content( payload, json );
+ free(json);
+
+ osrf_message* status = osrf_message_init( STATUS, requestId, 1);
+ osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
+
+ osrfMessage* ms[2];
+ ms[0] = payload;
+ ms[1] = status;
+
+ osrfAppSessionSendBatch( ses, ms, 2 );
+
+ osrf_message_free( payload );
+ osrf_message_free( status );
+
+ /* join and free */
+
+ return 0;
+}
+
+
int osrfAppSessionStatus( osrfAppSession* ses, int type, int reqId, char* message ) {
/** Send the given message */
int _osrf_app_session_send( osrf_app_session*, osrf_message* msg );
+int osrfAppSessionSendBatch( osrf_app_session*, osrf_message* msgs[], int size );
+
int osrfAppRequestRespond( osrfAppSession* ses, int requestId, jsonObject* data );
+int osrfAppRequestRespondComplete( osrfAppSession* ses, int requestId, jsonObject* data );
int osrfAppSessionStatus( osrfAppSession* ses, int type, int reqId, char* message );
int osrfAppRunMethod( char* appName, char* methodName, osrfAppSession* ses, int reqId, jsonObject* params ) {
- if(!appName || ! methodName || ! ses) return -1;
+ if( !(appName && methodName && ses) ) return -1;
+
char* error;
+ osrfApplication* app;
+ osrfMethod* method;
+ osrfMethodContext context;
+
+ /* this is the method we're gonna run */
+ int (*meth) (osrfMethodContext*);
info_handler("Running method [%s] for app [%s] with request id %d and "
"thread trace %s", methodName, appName, reqId, ses->session_id );
- osrfApplication* app = _osrfAppFindApplication(appName);
- if(!app) return warning_handler( "Application not found: %s", appName );
-
- osrfMethod* method = __osrfAppFindMethod( app, methodName );
- if(!method) return warning_handler( "NOT FOUND: app %s / method %s", appName, methodName );
+ if( !(app = _osrfAppFindApplication(appName)) )
+ return warning_handler( "Application not found: %s", appName );
- /* this is the method we're gonna run */
- int (*meth) (osrfMethodDispatcher*);
+ if( !(method = __osrfAppFindMethod( app, methodName )) )
+ return warning_handler( "NOT FOUND: app %s / method %s", appName, methodName );
/* open the method */
*(void **) (&meth) = dlsym(app->handle, method->symbol);
"for method %s and app %s", method->symbol, method->name, app->name );
}
- osrfMethodDispatcher d;
- d.session = ses;
- d.method = method;
- d.params = params;
- d.request = reqId;
+ context.session = ses;
+ context.method = method;
+ context.params = params;
+ context.request = reqId;
/* run the method */
- int ret = (*meth) (&d);
+ int ret = (*meth) (&context);
debug_handler("method returned %d", ret );
#include "objson/object.h"
#include "osrf_app_session.h"
-#define OSRF_METHOD_CHECK_PARAMS(x,y) \
- if( ! x || ! y ) return -1; \
- if(y->type != JSON_ARRAY ) return -1; \
- char* __j = jsonObjectToJSON(y);\
- if(__j) { \
- debug_handler("Service: %s | Params: %s", x->remote_service, __j);free(__j);}
-
-
/**
This macro verifies methods receive the correct parameters
It also creates local variables "session", "method",
"params", and "request"
*/
-#define OSRF_METHOD_VERIFY_DISPATCHER(__d) \
+#define OSRF_METHOD_VERIFY_CONTEXT(__d) \
if(!__d) return -1; \
\
osrfAppSession* session = __d->session; \
};
typedef struct _osrfMethodStruct osrfMethod;
-struct _osrfMethodDispatcherStruct {
+struct _osrfMethodContextStruct {
osrfAppSession* session;
osrfMethod* method;
jsonObject* params;
int request;
};
-typedef struct _osrfMethodDispatcherStruct osrfMethodDispatcher;
+typedef struct _osrfMethodContextStruct osrfMethodContext;
/**
free(msg);
}
+
+char* osrfMessageSerializeBatch( osrfMessage* msgs [], int count ) {
+ if( !msgs ) return NULL;
+
+ char* j;
+ int i = 0;
+ osrfMessage* msg = NULL;
+ jsonObject* wrapper = jsonNewObject(NULL);
+
+ while( ((msg = msgs[i]) && (i++ < count)) )
+ jsonObjectPush(wrapper, osrfMessageToJSON( msg ));
+
+ j = jsonObjectToJSON(wrapper);
+ jsonObjectFree(wrapper);
+
+ return j;
+}
+
+
char* osrf_message_serialize(osrf_message* msg) {
+
if( msg == NULL ) return NULL;
+ char* j = NULL;
+
+ jsonObject* json = osrfMessageToJSON( msg );
+
+ if(json) {
+ jsonObject* wrapper = jsonNewObject(NULL);
+ jsonObjectPush(wrapper, json);
+ j = jsonObjectToJSON(wrapper);
+ jsonObjectFree(wrapper);
+ }
+
+ return j;
+}
+
+
+jsonObject* osrfMessageToJSON( osrfMessage* msg ) {
+
jsonObject* json = jsonNewObject(NULL);
jsonObjectSetClass(json, "osrfMessage");
jsonObject* payload;
jsonObjectSetKey(json, "payload", payload);
break;
}
-
- jsonObject* wrapper = jsonNewObject(NULL);
- jsonObjectPush(wrapper, json);
- char* j = jsonObjectToJSON(wrapper);
- jsonObjectFree(wrapper);
- return j;
+
+ return json;
}
jsonObject* message = jsonObjectGetIndex(json, x);
- char* j = jsonObjectToJSON(message);
- debug_handler("deserialize parsed message \n%s\n", j );
- free(j);
-
-
if(message && message->type != JSON_NULL &&
message->classname && !strcmp(message->classname, "osrfMessage")) {
tmp = jsonObjectGetKey(message, "threadTrace");
if(tmp) {
+ char* tt = jsonObjectToSimpleString(tmp);
+ if(tt) {
+ new_msg->thread_trace = atoi(tt);
+ free(tt);
+ }
+ /*
if(tmp->type == JSON_NUMBER)
new_msg->thread_trace = (int) jsonObjectGetNumber(tmp);
if(tmp->type == JSON_STRING)
new_msg->thread_trace = atoi(jsonObjectGetString(tmp));
+ */
}
tmp = jsonObjectGetKey(message, "protocol");
if(tmp) {
+ char* proto = jsonObjectToSimpleString(tmp);
+ if(proto) {
+ new_msg->protocol = atoi(proto);
+ free(proto);
+ }
+
+ /*
if(tmp->type == JSON_NUMBER)
new_msg->protocol = (int) jsonObjectGetNumber(tmp);
if(tmp->type == JSON_STRING)
new_msg->protocol = atoi(jsonObjectGetString(tmp));
+ */
}
tmp = jsonObjectGetKey(message, "payload");
}
msgs[numparsed++] = new_msg;
- debug_handler("deserialize has parsed %d messages", numparsed);
}
}
int osrf_message_deserialize(char* json, osrf_message* msgs[], int count);
+
/** Pushes any message retreived from the xml into the 'msgs' array.
* it is assumed that 'msgs' has beenn pre-allocated.
* Returns the number of message that are in the buffer.
jsonObject* osrfMessageGetResult( osrfMessage* msg );
+/**
+ Returns the message as a jsonObject
+ @return The jsonObject which must be freed by the caller.
+ */
+jsonObject* osrfMessageToJSON( osrfMessage* msg );
+
+char* osrfMessageSerializeBatch( osrfMessage* msgs [], int count );
+
#endif
#include "osrf_prefork.h"
#include <signal.h>
+#include "osrf_app_session.h"
/* true if we just deleted a child. This will allow us to make sure we're
not trying to use freed memory */
if(!appname) fatal_handler("osrf_prefork_run requires an appname to run!");
+ set_proc_title( "opensrf listener [%s]", appname );
+
int maxr = 1000;
int maxc = 10;
int minc = 3;
if(!osrf_system_bootstrap_client_resc( NULL, NULL, resc))
fatal_handler("Unable to bootstrap client for osrf_prefork_run()");
free(resc);
+
+ set_proc_title( "opensrf drone [%s]", child->appname );
}
void prefork_child_process_request(prefork_child* child, char* data) {
- if(!child && child->connection) return;
+ if( !child ) return;
/* construct the message from the xml */
- debug_handler("Child %d received data from parent:\n%s\n", child->pid, data);
transport_message* msg = new_message_from_xml( data );
- osrf_stack_transport_handler(msg, child->appname);
+ osrfAppSession* session = osrf_stack_transport_handler(msg, child->appname);
+
+ if( ! session->stateless ) { /* keepalive loop for stateful sessions */
+
+ debug_handler("Entering keepalive loop for session %s", session->session_id );
+ }
}
int max_requests;
char* appname;
struct prefork_child_struct* next;
- transport_client* connection;
};
typedef struct prefork_child_struct prefork_child;
int (*osrf_stack_entry_point) (transport_client*, int) = &osrf_stack_process;
int osrf_stack_process( transport_client* client, int timeout ) {
- transport_message* msg = client_recv( client, timeout );
- if(msg == NULL) return 0;
- debug_handler( "Received message from transport code from %s", msg->sender );
- int status = osrf_stack_transport_handler( msg, NULL );
-
- while(1) {
- transport_message* m = client_recv( client, 0 );
- if(m) {
- debug_handler( "Received additional message from transport code");
- status = osrf_stack_transport_handler( m, NULL );
- } else {
- debug_handler( "osrf_stack_process returning with only 1 received message" );
- break;
- }
+ if( !client ) return -1;
+ transport_message* msg = NULL;
+
+ while( (msg = client_recv( client, timeout )) ) {
+ debug_handler( "Received message from transport code from %s", msg->sender );
+ osrf_stack_transport_handler( msg, NULL );
+ timeout = 0;
}
- return status;
+ return 0;
}
// -----------------------------------------------------------------------------
// Entry point into the stack
// -----------------------------------------------------------------------------
-int osrf_stack_transport_handler( transport_message* msg, char* my_service ) {
+osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_service ) {
- if(!msg) return -1;
+ if(!msg) return NULL;
debug_handler( "Transport handler received new message \nfrom %s "
"to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body );
osrf_app_session* session = osrf_app_session_find_session( msg->thread );
- if( session == NULL )
+ if( !session )
session = osrf_app_server_session_init( msg->thread, my_service, msg->sender);
+ if( !session ) return NULL;
+
if(!msg->is_error)
debug_handler("Session [%s] found or built", session->session_id );
debug_handler( "We received %d messages from %s", num_msgs, msg->sender );
- /* XXX ERROR CHECKING, BAD JSON, ETC... */
int i;
for( i = 0; i != num_msgs; i++ ) {
message_free( msg );
debug_handler("after msg delete");
- return 1;
+ return session;
}
int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ) {
return NULL;
case CONNECT:
- /* handle connect message */
+ osrfAppSessionStatus( session,
+ OSRF_STATUS_OK, msg->thread_trace, "Connection Successful" );
return NULL;
case REQUEST:
// -----------------------------------------------------------------------------
int osrf_stack_process( transport_client* client, int timeout );
-int osrf_stack_transport_handler( transport_message* msg, char* my_service );
+osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_service );
int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg );
int osrf_stack_application_handler( osrf_app_session* session, osrf_message* msg );
}
+char** __global_argv = NULL;
+int __global_argv_size = 0;
+
+int init_proc_title( int argc, char* argv[] ) {
+
+ __global_argv = argv;
+
+ int i = 0;
+ while( i < argc ) {
+ int len = strlen( __global_argv[i]);
+ bzero( __global_argv[i++], len );
+ __global_argv_size += len;
+ }
+
+ __global_argv_size -= 2;
+ return 0;
+}
+
+int set_proc_title( char* format, ... ) {
+ VA_LIST_TO_STRING(format);
+ bzero( *(__global_argv), __global_argv_size );
+ return snprintf( *(__global_argv), __global_argv_size, VA_BUF );
+}
+
/* utility method for profiling */
double get_timestamp_millis() {
- //struct timeb t;
struct timeval tv;
gettimeofday(&tv, NULL);
- //ftime(&t);
-// double time = (int)t.time + ( ((double)t.millitm) / 1000 ) + ( ((double)tv.tv_usec / 1000000) );
double time = (int)tv.tv_sec + ( ((double)tv.tv_usec / 1000000) );
return time;
}
#define BUFFER_MAX_SIZE 10485760
+/* these are evil and should be condemned */
+int init_proc_title( int argc, char* argv[] );
+int set_proc_title( char* format, ... );
+
+
int daemonize();
void* safe_malloc(int size);