if(!nodes || !router || count < 1) return NULL;
osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
- grp->currentNode = 0;
- grp->router = strdup(router);
- grp->list = osrfNewList(1);
+ grp->nodes = osrfNewHash();
+ grp->itr = osrfNewHashIterator(grp->nodes);
int i;
- for( i = 0; i != count; i++ ) osrfListPush( grp->list, nodes[i] );
+ for( i = 0; i != count; i++ ) {
+ if(!(nodes[i] && nodes[i]->domain) ) return NULL;
+ osrfHashSet( grp->nodes, nodes[i], nodes[i]->domain );
+ }
+
return grp;
}
+/* connect all of the nodes to their servers */
int osrfTransportGroupConnect( osrfTransportGroup* grp ) {
- if(!grp) return 0;
- int i;
+ if(!grp) return -1;
int active = 0;
- for( i = 0; i != grp->list->size; i++ ) {
- osrfTransportGroupNode* node = osrfListGetIndex( grp->list, i );
+
+ osrfTransportGroupNode* node;
+ osrfHashIteratorReset(grp->itr);
+
+ while( (node = osrfHashIteratorNext(grp->itr)) ) {
if(client_connect( node->connection, node->username,
node->password, node->resource, 10, AUTH_DIGEST )) {
node->active = 1;
- node->lastsent = time(NULL);
active++;
}
}
+
+ osrfHashIteratorReset(grp->itr);
return active;
}
-/*
-osrfTransportGroup* osrfNewTransportGroup( char* resource ) {
-
- grp->username = osrfConfigGetValue( NULL, "/username" );
- grp->password = osrfConfigGetValue( NULL, "/passwd" );
- char* port = osrfConfigGetValue( NULL, "/port" );
- if(port) grp->port = atoi(port);
- grp->currentNode = 0;
+int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg ) {
+ if(!(grp && msg)) return -1;
- if(!resource) resource = "client";
- char* host = getenv("HOSTNAME");
- if(!host) host = "localhost";
- char* res = va_list_to_string( "osrf_%s_%s_%d", resource, host, getpid() );
+ char domain[256];
+ bzero(domain, 256);
+ jid_get_domain( msg->recipient, domain, 255 );
- int i;
- osrfStringArray* arr = osrfNewStringArray(8);
- osrfConfigGetValueList(NULL, arr, "/domains/domain");
-
- for( i = 0; i != arr->size; i++ ) {
- char* domain = osrfStringArrayGetString( arr, i );
- if(domain) {
- node->domain = strdup(domain);
- node->connection = client_init( domain, grp->port, NULL, 0 );
- if(client_connect( node->connection, grp->username, grp->password, res, 10, AUTH_DIGEST )) {
- node->active = 1;
- node->lastsent = time(NULL);
- }
- osrfListPush( grp->list, node );
- }
+ osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
+ if(node) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 0;
}
- free(res);
- osrfStringArrayFree(arr);
- return grp;
+ return warning_handler("Error sending message to domain %s", domain );
}
-*/
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg ) {
-int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain ) {
if(!(grp && msg)) return -1;
+ int bufsize = 256;
- char domain[256];
- bzero(domain, 256);
- jid_get_domain( msg->recipient, domain );
+ char domain[bufsize];
+ bzero(domain, bufsize);
+ jid_get_domain( msg->recipient, domain, bufsize - 1 );
- char msgrecip[254];
- bzero(msgrecip, 254);
- jid_get_username(msg->recipient, msgrecip);
+ char msgrecip[bufsize];
+ bzero(msgrecip, bufsize);
+ jid_get_username(msg->recipient, msgrecip, bufsize - 1);
+ char msgres[bufsize];
+ bzero(msgres, bufsize);
+ jid_get_resource(msg->recipient, msgres, bufsize - 1);
- osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ char* firstdomain = NULL;
+ char newrcp[1024];
- if( strcmp( msgrecip, grp->router ) ) { /* not a top level router message */
- if(node) {
- if( (client_send_message( node->connection, msg )) == 0 )
- return 0;
- else
- return warning_handler("Error sending message to domain %s", domain );
- }
- return warning_handler("Transport group has no node for domain %s", domain );
- }
+ int updateRecip = 1;
+ /* if we don't host this domain, don't update the recipient but send it as is */
+ if(osrfHashGet(grp->nodes, domain)) updateRecip = 0;
+ osrfTransportGroupNode* node;
- /*
- if( type == OSRF_SERVER_NODE )
- return _osrfTGServerSend( grp, msgdom, msg );
- if( type == OSRF_CLIENT_NODE )
- return _osrfTGClientSend( grp, msgdom, msg );
- */
+ do {
- return -1;
-}
-
-int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+ node = osrfHashIteratorNext(grp->itr);
+ if(!node) osrfHashIteratorReset(grp->itr);
- debug_handler("Transport group sending server message to domain %s", domain );
+ node = osrfHashIteratorNext(grp->itr);
+ if(!node) return -1;
- osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
- if(node) {
- if( (client_send_message( node->connection, msg )) == 0 )
- return 0;
- else
- return warning_handler("Error sending server response to domain %s", domain );
- }
- return warning_handler("Transport group has no node for domain %s for server response", domain );
-}
+ if(firstdomain == NULL) {
+ firstdomain = node->domain;
+ } else {
+ if(!strcmp(firstdomain, node->domain)) { /* we've made a full loop */
+ return warning_handler("We've tried to send to all domains.. giving up");
+ }
+ }
-int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+ /* update the recipient domain if necessary */
+ bzero(newrcp, 1024);
+ if(updateRecip)
+ sprintf(newrcp, "%s@%s/%s", msgrecip, node->domain, msgres);
+ else
+ sprintf(newrcp, msg->recipient);
- debug_handler("Transport group sending client message to domain %s", domain );
+ free(msg->recipient);
+ msg->recipient = strdup(newrcp);
- /* first see if we have a node for the requested domain */
- osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
- if(node && node->active) {
- if( (client_send_message( node->connection, msg )) == 0 )
+ if( (client_send_message( node->connection, msg )) == 0 )
return 0;
- else
- node->active = 0;
- }
- /* if not (or it fails), try sending to the current domain */
- node = osrfListGetIndex(grp->list, grp->currentNode);
- if(node && node->active) {
- if( (client_send_message( node->connection, msg )) == 0 )
- return 0;
- }
+ } while(1);
- /* start at the beginning and try them all ... */
- grp->currentNode = 0;
- while( grp->currentNode < grp->list->size ) {
- if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
- if( (client_send_message( node->connection, msg )) == 0 )
- return 1;
- else node->active = 0;
- }
- }
return -1;
}
transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
- if(!(grp && grp->list)) return NULL;
+ if(!grp) return NULL;
- int i;
int maxfd = 0;
- osrfTransportGroupNode* node = NULL;
fd_set fdset;
FD_ZERO( &fdset );
- for( i = 0; i != grp->list->size; i++ ) {
- if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+ osrfTransportGroupNode* node;
+ osrfHashIterator* itr = osrfNewHashIterator(grp->nodes);
+
+ while( (node = osrfHashIteratorNext(itr)) ) {
+ if(node->active) {
int fd = node->connection->session->sock_id;
if( fd < maxfd ) maxfd = fd;
FD_SET( fd, &fdset );
}
}
+ osrfHashIteratorReset(itr);
if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
- for( i = 0; i != grp->list->size; i++ ) {
- if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+ while( (node = osrfHashIteratorNext(itr)) ) {
+ if(node->active) {
int fd = node->connection->session->sock_id;
if( FD_ISSET( fd, &fdset ) ) {
return client_recv( node->connection, 0 );
}
}
+ osrfHashIteratorFree(itr);
return NULL;
}
transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
if(!(grp && domain)) return NULL;
- osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain);
if(!node && node->connection && node->connection->session) return NULL;
int fd = node->connection->session->sock_id;
void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
if(!(grp && domain)) return;
- osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ osrfTransportGroupNode* node = osrfHashGet(grp->nodes, domain );
if(node) node->active = 0;
}
+/*
osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain ) {
if(!(grp && grp->list && domain)) return NULL;
int i = 0;
if(!strcmp(node->domain, domain)) return node;
return NULL;
}
+*/
#include "opensrf/transport_client.h"
#include "opensrf/transport_message.h"
#include "osrf_list.h"
+#include "osrf_hash.h"
#include "osrfConfig.h"
#include "opensrf/utils.h"
#include <time.h>
/**
- Maintains a set of transport clients for redundancy
+ Maintains a set of transport clients
*/
-//enum osrfTGType { OSRF_SERVER_NODE, OSRF_CLIENT_NODE };
-
struct __osrfTransportGroupStruct {
- osrfList* list; /* our lisit of nodes */
- char* router; /* the login username of the router on this network */
- int currentNode; /* which node are we currently on. Used for client failover and
- only gets updated on client messages where a server failed
- and we need to move to the next server in the list */
+ osrfHash* nodes; /* our hash of nodes keyed by domain */
+ osrfHashIterator* itr; /* points to the next node in the list */
};
typedef struct __osrfTransportGroupStruct osrfTransportGroup;
/**
- Sends a transport message
- If the message is destined for a domain that this group does not have a connection
- for, then the message is sent out through the currently selected domain.
+ Sends a transport message by going to the next domain in the set.
+ if we have a connection for the recipient domain, then we consider it to be
+ a 'local' message. Local messages have their recipient domains re-written to
+ match the domain of the next server in the set and they are sent directly to
+ that server. If we do not have a connection for the recipient domain, it is
+ considered a 'remote' message and the message is sent directly (unchanged)
+ to the next connection in the set.
+
@param grp The transport group
- @param type Whether this is a client request or a server response
@param msg The message to send
- @param newdomain A pre-allocated buffer in which to write the name of the
- new domain if a the expected domain could not be sent to.
- @return 0 on normal successful send. Returns 1 if the message was sent
- to a new domain (note: this can only happen when type == OSRF_CLIENT_NODE)
+ @return 0 on normal successful send.
Returns -1 if the message cannot be sent.
*/
-int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain );
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg );
+
+/**
+ Sends the message to the exact recipient. No failover is attempted.
+ @return 0 on success, -1 on error.
+ */
+int osrfTransportGroupSendMatch( osrfTransportGroup* grp, transport_message* msg );
+
int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout );
/**
- Tells the group that the connect to the last message sent to the provided
+ Tells the group that a message to the given domain failed
domain did not make it through;
@param grp The transport group
@param comain The failed domain
xmlAddChild( message_node, body_node );
}
-
- /*
- xmlBufferPtr buf = xmlBufferCreate();
- int status = xmlNodeDump( buf, doc, xmlDocGetRootElement(doc) , 1, 0 );
- */
-
- //xmlDocDumpFormatMemory( doc, &xmlbuf, &bufsize, 0 );
xmlDocDumpMemoryEnc( doc, &xmlbuf, &bufsize, "UTF-8" );
encoded_body = strdup( (char*) xmlbuf );
-void jid_get_username( const char* jid, char buf[] ) {
+void jid_get_username( const char* jid, char buf[], int size ) {
if( jid == NULL ) { return; }
int i;
for( i = 0; i != len; i++ ) {
if( jid[i] == 64 ) { /*ascii @*/
+ if(i > size) i = size;
strncpy( buf, jid, i );
return;
}
}
-void jid_get_resource( const char* jid, char buf[]) {
+void jid_get_resource( const char* jid, char buf[], int size) {
if( jid == NULL ) { return; }
int len = strlen( jid );
int i;
for( i = 0; i!= len; i++ ) {
if( jid[i] == 47 ) { /* ascii / */
- strncpy( buf, jid + i + 1, len - (i+1) );
+ const char* start = jid + i + 1; /* right after the '/' */
+ int rlen = len - (i+1);
+ if(rlen > size) rlen = size;
+ strncpy( buf, start, rlen );
}
}
}
-void jid_get_domain( const char* jid, char buf[] ) {
+void jid_get_domain( const char* jid, char buf[], int size ) {
if(jid == NULL) return;
else if(jid[i] == 47 && index1 != 0) /* ascii / */
index2 = i;
}
- if( index1 > 0 && index2 > 0 && index2 > index1 )
- memcpy( buf, jid + index1, index2 - index1 );
+
+ if( index1 > 0 && index2 > 0 && index2 > index1 ) {
+ int dlen = index2 - index1;
+ if(dlen > size) dlen = size;
+ memcpy( buf, jid + index1, dlen );
+ }
}
void set_msg_error( transport_message* msg, char* type, int err_code ) {