int osrfAppInitialize();
int osrfAppChildInit();
+void osrfAppChildExit();
int osrfMathRun( osrfMethodContext* );
return 0;
}
+/* called when this process is just coming into existence */
int osrfAppChildInit() {
return 0;
}
+/* called when this process is about to exit */
+void osrfAppChildExit() {
+ osrfLogDebug(OSRF_LOG_MARK, "Child is exiting...");
+}
+
+
int osrfMathRun( osrfMethodContext* ctx ) {
OSRF_METHOD_VERIFY_CONTEXT(ctx); /* see osrf_application.h */
fflush(stderr);
*/
+ osrfAppSession* session = osrf_app_client_session_init(service);
+
+ double starttime = get_timestamp_millis();
+ int req_id = osrf_app_session_make_req( session, NULL, method, api_level, mparams );
+
+
+ if( req_id == -1 ) {
+ osrfLogError(OSRF_LOG_MARK, "I am unable to communcate with opensrf..going away...");
+ /* we don't want to spawn an intense re-forking storm
+ * if there is no jabber server.. so give it some time before we die */
+ usleep( 100000 ); /* 100 milliseconds */
+ exit(1);
+ }
+
/* ----------------------------------------------------------------- */
/* log all requests to the activity log */
buffer_free(act);
/* ----------------------------------------------------------------- */
- osrfAppSession* session = osrf_app_client_session_init(service);
-
- double starttime = get_timestamp_millis();
- int req_id = osrf_app_session_make_req( session, NULL, method, api_level, mparams );
-
-
- if( req_id == -1 ) {
- osrfLogError(OSRF_LOG_MARK, "I am unable to communcate with opensrf..going away...");
- /* we don't want to spawn an intense re-forking storm
- * if there is no jabber server.. so give it some time before we die */
- usleep( 100000 ); /* 100 milliseconds */
- exit(1);
- }
osrf_message* omsg = NULL;
string_array_destroy(mparams);
osrfLogDebug(OSRF_LOG_MARK, "Gateway served %d requests", ++numserved);
+ osrfLogClearXid();
return ret;
}
osrfHashFree(osrfAppSessionCache);
}
+
+
/** Frees memory used by an app_request object */
void _osrf_app_request_free( void * req ){
if( req == NULL ) return;
char* method_name, int protocol, string_array* param_strings ) {
if(session == NULL) return -1;
+ osrfLogMkXid();
+
osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol );
osrf_message_set_method(req_msg, method_name);
if(params) {
transport_message* t_msg = message_init(
string, "", session->session_id, session->remote_id, NULL );
+ message_set_osrf_xid( t_msg, osrfLogGetXid() );
retval = client_send_message( session->transport_handle, t_msg );
void osrfAppSessionCleanup();
+
#endif
osrfApplication* app = safe_malloc(sizeof(osrfApplication));
app->handle = dlopen (soFile, RTLD_NOW);
+ app->onExit = NULL;
if(!app->handle) {
osrfLogWarning( OSRF_LOG_MARK, "Failed to dlopen library file %s: %s", soFile, dlerror() );
osrfLogSetAppname(appName);
+ osrfAppSetOnExit(app, appName);
+
return 0;
}
+
+void osrfAppSetOnExit(osrfApplication* app, char* appName) {
+ if(!(app && appName)) return;
+
+ /* see if we can run the initialize method */
+ char* error;
+ void (*onExit) (void);
+ *(void **) (&onExit) = dlsym(app->handle, "osrfAppChildExit");
+
+ if( (error = dlerror()) != NULL ) {
+ osrfLogDebug(OSRF_LOG_MARK, "No exit handler defined for %s", appName);
+ return;
+ }
+
+ osrfLogInfo(OSRF_LOG_MARK, "registering exit handler for %s", appName);
+ app->onExit = (*onExit);
+ //if( (ret = (*onExit)()) ) {
+}
+
+
int osrfAppRunChildInit(char* appname) {
osrfApplication* app = _osrfAppFindApplication(appname);
if(!app) return -1;
}
+void osrfAppRunExitCode() {
+ osrfHashIterator* itr = osrfNewHashIterator(__osrfAppHash);
+ osrfApplication* app;
+ while( (app = osrfHashIteratorNext(itr)) ) {
+ if( app->onExit ) {
+ osrfLogInfo(OSRF_LOG_MARK, "Running onExit handler for app %s", itr->current);
+ app->onExit();
+ }
+ }
+}
+
+
int osrfAppRegisterMethod( char* appName, char* methodName,
char* symbolName, char* notes, int argc, int options ) {
struct _osrfApplicationStruct {
void* handle; /* the lib handle */
osrfHash* methods;
+ void (*onExit) (void);
};
typedef struct _osrfApplicationStruct osrfApplication;
/**
* Tells the backend process to run its child init function */
int osrfAppRunChildInit(char* appname);
+void osrfAppSetOnExit(osrfApplication* app, char* appName);
+void osrfAppRunExitCode();
osrfLogDebug( OSRF_LOG_MARK, "Child init hook for child %d", child->pid);
char* resc = va_list_to_string("%s_drone",child->appname);
+ /* if we're a source-client, tell the logger now that we're a new process*/
+ char* isclient = osrfConfigGetValue(NULL, "/client");
+ if( isclient && !strcasecmp(isclient,"true") )
+ osrfLogSetIsClient(1);
+ free(isclient);
+
+
/* we want to remove traces of our parents socket connection
* so we can have our own */
osrfSystemIgnoreTransportClient();
osrfLogError( OSRF_LOG_MARK,
"Unable to bootstrap client in prefork_child_process_request()");
sleep(1);
- exit(1);
+ osrf_prefork_child_exit(child);
}
}
if( prefork_child_init_hook(child) == -1 ) {
osrfLogError(OSRF_LOG_MARK,
"Forker child going away because we could not connect to OpenSRF...");
- exit(1);
+ osrf_prefork_child_exit(child);
}
prefork_child_wait( child );
- exit(0); /* just to be sure */
+ osrf_prefork_child_exit(child); /* just to be sure */
}
return NULL;
}
+void osrf_prefork_child_exit(prefork_child* child) {
+ osrfAppRunExitCode();
+ exit(0);
+}
void prefork_launch_children( prefork_simple* forker ) {
if(!forker) return;
if( errno == EAGAIN ) n = 0;
+ if( errno == EPIPE ) {
+ osrfLogWarning(OSRF_LOG_MARK, "C child attempted read on broken pipe, exiting...");
+ break;
+ }
+
if( n < 0 ) {
osrfLogWarning( OSRF_LOG_MARK, "Prefork child read returned error with errno %d", errno );
break;
osrfLogDebug( OSRF_LOG_MARK, "Child with max-requests=%d, num-served=%d exiting...[%d]",
child->max_requests, i, getpid() );
- exit(0);
+ osrf_prefork_child_exit(child); /* just to be sure */
}
void osrf_prefork_register_routers( char* appname );
-
+void osrf_prefork_child_exit( prefork_child* );
if(!msg) return NULL;
+ osrfLogSetXid(msg->osrf_xid);
+
osrfLogDebug( OSRF_LOG_MARK, "Transport handler received new message \nfrom %s "
"to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body );
char* domain = strdup(osrfStringArrayGetString( arr, 0 )); /* just the first for now */
osrfStringArrayFree(arr);
+ /* if we're a source-client, tell the logger */
+ char* isclient = osrfConfigGetValue(NULL, "/client");
+ if( isclient && !strcasecmp(isclient,"true") )
+ osrfLogSetIsClient(1);
+ free(isclient);
int llevel = 0;
int iport = 0;
xmlChar* router_to = xmlGetProp( root, BAD_CAST "router_to" );
xmlChar* router_class= xmlGetProp( root, BAD_CAST "router_class" );
xmlChar* broadcast = xmlGetProp( root, BAD_CAST "broadcast" );
+ xmlChar* osrf_xid = xmlGetProp( root, BAD_CAST "osrf_xid" );
+
+ if( osrf_xid ) {
+ message_set_osrf_xid( new_msg, (char*) osrf_xid);
+ xmlFree(osrf_xid);
+ }
if( router_from ) {
new_msg->sender = strdup((char*)router_from);
if( new_msg->body == NULL )
new_msg->body = strdup("");
- int bufsize;
- xmlChar* xmlbuf;
- char* encoded_body;
-
- xmlDocDumpFormatMemory( msg_doc, &xmlbuf, &bufsize, 0 );
- encoded_body = strdup( (char*) xmlbuf );
-
- if( encoded_body == NULL )
- osrfLogError(OSRF_LOG_MARK, "message_to_xml(): Out of Memory");
-
- xmlFree(xmlbuf);
- xmlFreeDoc(msg_doc);
- xmlCleanupParser();
-
- /*** remove the XML declaration */
- int len = strlen(encoded_body);
- char tmp[len];
- memset( tmp, 0, len );
- int i;
- int found_at = 0;
-
- /* when we reach the first >, take everything after it */
- for( i = 0; i!= len; i++ ) {
- if( encoded_body[i] == 62) { /* ascii > */
-
- /* found_at holds the starting index of the rest of the doc*/
- found_at = i + 1;
- break;
- }
- }
+ new_msg->msg_xml = xmlDocToString(msg_doc, 0);
+ xmlFreeDoc(msg_doc);
+ xmlCleanupParser();
- if( found_at ) {
- /* move the shortened doc into the tmp buffer */
- strncpy( tmp, encoded_body + found_at, len - found_at );
- /* move the tmp buffer back into the allocated space */
- memset( encoded_body, 0, len );
- strcpy( encoded_body, tmp );
- }
-
- new_msg->msg_xml = encoded_body;
return new_msg;
-
}
+void message_set_osrf_xid( transport_message* msg, char* osrf_xid ) {
+ if(!msg) return;
+ if( osrf_xid )
+ msg->osrf_xid = strdup(osrf_xid);
+ else msg->osrf_xid = strdup("");
+}
void message_set_router_info( transport_message* msg, char* router_from,
char* router_to, char* router_class, char* router_command, int broadcast_enabled ) {
free(msg->router_to);
free(msg->router_class);
free(msg->router_command);
+ free(msg->osrf_xid);
if( msg->error_type != NULL ) free(msg->error_type);
if( msg->msg_xml != NULL ) free(msg->msg_xml);
free(msg);
xmlNewProp( message_node, BAD_CAST "router_to", BAD_CAST msg->router_to );
xmlNewProp( message_node, BAD_CAST "router_class", BAD_CAST msg->router_class );
xmlNewProp( message_node, BAD_CAST "router_command", BAD_CAST msg->router_command );
+ xmlNewProp( message_node, BAD_CAST "osrf_xid", BAD_CAST msg->osrf_xid );
if( msg->broadcast )
xmlNewProp( message_node, BAD_CAST "broadcast", BAD_CAST "1" );
#include <libxml/xmlmemory.h>
#include "opensrf/utils.h"
+#include "opensrf/xml_utils.h"
#include "opensrf/log.h"
#ifndef TRANSPORT_MESSAGE_H
char* router_to;
char* router_class;
char* router_command;
+ char* osrf_xid;
int is_error;
char* error_type;
int error_code;
void message_set_router_info( transport_message* msg, char* router_from,
char* router_to, char* router_class, char* router_command, int broadcast_enabled );
+void message_set_osrf_xid( transport_message* msg, char* osrf_xid );
+
// ---------------------------------------------------------------------------------
// Formats the Jabber message as XML for encoding.
// Returns NULL on error
/* for OpenSRF extensions */
session->router_to_buffer = buffer_init( JABBER_JID_BUFSIZE );
session->router_from_buffer = buffer_init( JABBER_JID_BUFSIZE );
+ session->osrf_xid_buffer = buffer_init( JABBER_JID_BUFSIZE );
session->router_class_buffer = buffer_init( JABBER_JID_BUFSIZE );
session->router_command_buffer = buffer_init( JABBER_JID_BUFSIZE );
buffer_free(session->message_error_type);
buffer_free(session->router_to_buffer);
buffer_free(session->router_from_buffer);
+ buffer_free(session->osrf_xid_buffer);
buffer_free(session->router_class_buffer);
buffer_free(session->router_command_buffer);
buffer_free(session->session_id);
buffer_add( ses->from_buffer, get_xml_attr( atts, "from" ) );
buffer_add( ses->recipient_buffer, get_xml_attr( atts, "to" ) );
buffer_add( ses->router_from_buffer, get_xml_attr( atts, "router_from" ) );
+ buffer_add( ses->osrf_xid_buffer, get_xml_attr( atts, "osrf_xid" ) );
buffer_add( ses->router_to_buffer, get_xml_attr( atts, "router_to" ) );
buffer_add( ses->router_class_buffer, get_xml_attr( atts, "router_class" ) );
buffer_add( ses->router_command_buffer, get_xml_attr( atts, "router_command" ) );
ses->router_command_buffer->buf,
ses->router_broadcast );
+ message_set_osrf_xid( msg, ses->osrf_xid_buffer->buf );
+
if( ses->message_error_type->n_used > 0 ) {
set_msg_error( msg, ses->message_error_type->buf, ses->message_error_code );
}
buffer_reset( ses->from_buffer );
buffer_reset( ses->recipient_buffer );
buffer_reset( ses->router_from_buffer );
+ buffer_reset( ses->osrf_xid_buffer );
buffer_reset( ses->router_to_buffer );
buffer_reset( ses->router_class_buffer );
buffer_reset( ses->router_command_buffer );
growing_buffer* router_from_buffer;
growing_buffer* router_class_buffer;
growing_buffer* router_command_buffer;
+ growing_buffer* osrf_xid_buffer;
int router_broadcast;
/* this can be anything. It will show up in the
my $meth = shift;
return unless $self;
+ # tell the logger to create a new xid - the logger will decide if it's really necessary
+ $logger->mk_osrf_xid;
+
my $method;
if (!ref $meth) {
$method = new OpenSRF::DomainObject::oilsMethod ( method => $meth );
my $body = $helper->get_body();
my $type = $helper->get_msg_type();
+ $logger->set_osrf_xid($helper->get_osrf_xid);
if (defined($type) and $type eq 'error') {
throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
$msg->setBody( $body );
$msg->set_router_command( $router_command );
$msg->set_router_class( $router_class );
-
+ $msg->set_osrf_xid($logger->get_osrf_xid);
$logger->transport(
"JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
}
}
+sub set_osrf_xid {
+ my( $self, $xid ) = @_;
+ $self->{msg_node}->setAttribute( osrf_xid => $xid );
+}
+
+
+sub get_osrf_xid {
+ my $self = shift;
+ $self->{msg_node}->getAttribute('osrf_xid');
+}
+
+
1;
sub INTERNAL { return 5; }
sub ALL { return 100; }
+my $isclient; # true if we control the osrf_xid
+
# load up our config options
sub set_config {
} else { $actfile = "$logdir/$actfile"; }
- #warn "Level: $loglevel, Fac: $facility, Act: $actfac\n";
+ $isclient = (OpenSRF::Utils::Config->current->bootstrap->client =~ /^true$/iog) ? 1 : 0;
}
sub _fac_to_const {
}
+# ----------------------------------------------------------------------
+# creates a new xid if necessary
+# ----------------------------------------------------------------------
+my $osrf_xid = '';
+my $osrf_xid_inc = 0;
+sub mk_osrf_xid {
+ return unless $isclient;
+ $osrf_xid_inc++;
+ return $osrf_xid = "$^T${$}$osrf_xid_inc";
+}
+
+sub set_osrf_xid {
+ return if $isclient; # if we're a client, we control our xid
+ $osrf_xid = $_[1];
+}
+
+sub get_osrf_xid { return $osrf_xid; }
+# ----------------------------------------------------------------------
+
sub _log_message {
my( $msg, $level ) = @_;
return if $level > $loglevel;
elsif ($level == INTERNAL()) {$l = LOG_DEBUG; $n = "INTL"; }
elsif ($level == ACTIVITY()) {$l = LOG_INFO; $n = "ACT"; $fac = $actfac; }
- #my( $pack, $file, $line_no ) = @caller;
+ my( undef, $file, $line_no ) = caller(1);
+ $file =~ s#/.*/##og;
# help syslog with the formatting
$msg =~ s/\%/\%\%/gso if( is_act_syslog() or is_syslog() );
- $msg = "[$n:"."$$".":::] $msg";
+ $msg = "[$n:"."$$".":$file:$line_no:$osrf_xid] $msg";
$msg = substr($msg, 0, 1536);
while( (msg = client_recv( class->connection, 0 )) ) {
+ osrfLogSetXid(msg->osrf_xid);
+
if( msg->sender ) {
osrfLogDebug(OSRF_LOG_MARK,
}
}
+ osrfLogClearXid();
message_free( msg );
}
transport_message* error = message_init(
node->lastMessage->body, node->lastMessage->subject,
node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
+ message_set_osrf_xid(error, node->lastMessage->osrf_xid);
set_msg_error( error, "cancel", 501 );
/* send the error message back to the original sender */
lastSent = message_init( node->lastMessage->body,
node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
+ message_set_osrf_xid( lastSent, node->lastMessage->osrf_xid );
}
} else {
transport_message* new_msg= message_init( msg->body,
msg->subject, msg->thread, node->remoteId, msg->sender );
message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
+ message_set_osrf_xid( new_msg, msg->osrf_xid );
osrfLogInfo( OSRF_LOG_MARK, "Routing message:\nfrom: [%s]\nto: [%s]",
new_msg->router_from, new_msg->recipient );
while( (node = osrfHashIteratorNext(rclass->itr)) )
osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
+ osrfHashIteratorFree(rclass->itr);
+ osrfHashFree(rclass->nodes);
+
free(rclass);
}
char* __osrfLogAppname = NULL;
int __osrfLogLevel = OSRF_LOG_INFO;
int __osrfLogActivityEnabled = 1;
+int __osrfLogIsClient = 0;
+
+
+int __osrfLogXidInc = 0; /* increments with each new xid for uniqueness */
+char* __osrfLogXid = NULL; /* current xid */
+char* __osrfLogXidPfx = NULL; /* xid prefix string */
void osrfLogCleanup() {
openlog(__osrfLogAppname, 0, __osrfLogFacility );
}
+static void __osrfLogSetXid(char* xid) {
+ if(xid) {
+ if(__osrfLogXid) free(__osrfLogXid);
+ __osrfLogXid = strdup(xid);
+ }
+}
+
+void osrfLogClearXid() { __osrfLogSetXid(""); }
+void osrfLogSetXid(char* xid) {
+ if(!__osrfLogIsClient) __osrfLogSetXid(xid);
+}
+
+void osrfLogMkXid() {
+ if(__osrfLogIsClient) {
+ char buf[32];
+ memset(buf, 0x0, 32);
+ snprintf(buf, 32, "%s%d", __osrfLogXidPfx, __osrfLogXidInc);
+ __osrfLogSetXid(buf);
+ __osrfLogXidInc++;
+ }
+}
+
+char* osrfLogGetXid() {
+ return __osrfLogXid;
+}
+
+void osrfLogSetIsClient(int is) {
+ __osrfLogIsClient = is;
+ if(!is) return;
+ /* go ahead and create the xid prefix so it will be consistent later */
+ static char buff[32];
+ memset(buff, 0x0, 32);
+ snprintf(buff, 32, "%d%d", (int)time(NULL), getpid());
+ __osrfLogXidPfx = buff;
+}
+
void osrfLogSetType( int logtype ) {
if( logtype != OSRF_LOG_TYPE_FILE &&
logtype != OSRF_LOG_TYPE_SYSLOG ) {
break;
}
+ char* xid = (__osrfLogXid) ? __osrfLogXid : "";
+
if(__osrfLogType == OSRF_LOG_TYPE_SYSLOG ) {
char buf[1536];
memset(buf, 0x0, 1536);
buf[1533] = '.';
buf[1534] = '.';
buf[1535] = '\0';
- syslog( fac | lvl, "[%s:%d:%s:%d] %s", l, getpid(), filename, line, buf );
+ syslog( fac | lvl, "[%s:%d:%s:%d:%s] %s", l, getpid(), filename, line, xid, buf );
}
else if( __osrfLogType == OSRF_LOG_TYPE_FILE )
- _osrfLogToFile("[%s:%d:%s:%d] %s", l, getpid(), filename, line, msg );
+ _osrfLogToFile("[%s:%d:%s:%d:%s] %s", l, getpid(), filename, line, xid, msg );
}
void osrfLogCleanup();
+void osrfLogClearXid();
+void osrfLogSetXid(char* xid);
+void osrfLogMkXid();
+void osrfLogSetIsClient(int is);
+char* osrfLogGetXid();
+
/* sets the activity flag */
void osrfLogSetActivityEnabled( int enabled );