libstack/osrf_application.o \
libstack/osrf_cache.o \
libstack/xml_utils.o \
+ libstack/osrf_transgroup.o \
+ libstack/osrf_list.o \
+ libstack/osrf_hash.o \
libstack/osrf_log.o \
utils/socket_bundle.o \
utils/string_array.o \
libstack/osrf_cache.h \
libstack/xml_utils.h \
libstack/osrf_log.h \
+ libstack/osrf_transgroup.h \
+ libstack/osrf_list.h \
+ libstack/osrf_hash.h \
utils/socket_bundle.h \
utils/string_array.h \
utils/utils.h \
@echo stack
make -C libstack
@echo $@
- $(CC) -shared -W1 $(LDFLAGS) -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF)
+ $(CC) -shared -W1 $(LDFLAGS) -lJudy -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF)
@echo apps
make -C c-apps
# provided to any method is not at least as large as the 'argc' setting for the method
CFLAGS += -DASSUME_STATELESS -DOSRF_LOG_PARAMS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing
-LDLIBS += -lxml2 -lobjson -ldl -lmemcache
+LDLIBS += -lxml2 -lobjson -ldl -lmemcache -LJudy
TARGETS = osrf_message.o \
osrf_app_session.o \
osrf_application.o \
osrf_cache.o \
osrf_log.o \
+ osrf_transgroup.o \
+ osrf_list.o \
+ osrf_hash.o \
xml_utils.o
HEADERS = osrf_message.h \
osrf_application.h \
osrf_cache.h \
osrf_log.h \
+ osrf_transgroup.h \
+ osrf_list.h \
+ osrf_hash.h \
xml_utils.h
all: xml_utils.o $(TARGETS) copy
osrf_application.o: osrf_application.c osrf_application.h
osrf_cache.o: osrf_cache.c osrf_cache.h
osrf_log.o: osrf_log.c osrf_log.h
+osrf_list.o: osrf_list.c osrf_list.h
+osrf_hash.o: osrf_hash.c osrf_hash.h
+
clean:
/bin/rm -f *.o libopensrf_stack.so xml_utils.h xml_utils.c
#include "osrf_system.h"
+#include "osrf_hash.h"
+#include "osrf_list.h"
+
+//static void _free(void* i) { free(i); }
+//static void _hfree(char* c, void* i) { free(i); }
int main( int argc, char* argv[] ) {
+ /*
+ osrfHash* list = osrfNewHash();
+ list->freeItem = _hfree;
+
+ char* x = strdup("X");
+ char* y = strdup("Y");
+ char* z = strdup("Z");
+ osrfHashSet( list, x, "test1" );
+ osrfHashSet( list, y, "test2" );
+ osrfHashSet( list, z, "test3" );
+
+ char* q = (char*) osrfHashGet( list, "test1" );
+ printf( "%s\n", q );
+
+ q = (char*) osrfHashGet( list, "test2" );
+ printf( "%s\n", q );
+
+ q = (char*) osrfHashGet( list, "test3" );
+ printf( "%s\n", q );
+
+ osrfHashIterator* itr = osrfNewHashIterator(list);
+ char* val;
+
+ while( (val = osrfHashIteratorNext(itr)) )
+ printf("Iterated item: %s\n", val );
+
+ osrfHashIteratorReset(itr);
+ while( (val = osrfHashIteratorNext(itr)) )
+ printf("Iterated item: %s\n", val );
+
+ printf( "Count: %lu\n", osrfHashGetCount(list));
+
+ osrfHashIteratorFree(itr);
+
+ osrfHashFree(list);
+
+ exit(1);
+
+ osrfList* list = osrfNewList();
+ list->freeItem = _free;
+
+ char* x = strdup("X");
+ char* y = strdup("Y");
+ char* z = strdup("Z");
+ osrfListSet( list, x, 0 );
+ osrfListSet( list, y, 2 );
+ osrfListSet( list, z, 4 );
+
+ char* q = (char*) osrfListGetIndex( list, 4 );
+ printf( "%s\n", q );
+
+ osrfListIterator* itr = osrfNewListIterator( list );
+ char* val;
+
+ while( (val = osrfListIteratorNext(itr)) )
+ printf("Found val: %s\n", val );
+
+ osrfListIteratorReset(itr);
+ printf("\n");
+ while( (val = osrfListIteratorNext(itr)) )
+ printf("Found val: %s\n", val );
+
+ osrfListIteratorFree(itr);
+
+ printf( "Count: %lu\n", osrfListGetCount(list));
+
+ osrfListFree(list);
+
+ exit(1);
+ */
+
+
+
if( argc < 4 ) {
fprintf(stderr, "Usage: %s <host> <bootstrap_config> <config_context>\n", argv[0]);
return 1;
char target_buf[512];
memset(target_buf,0,512);
- //char* domain = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_get_config_context() ); /* just the first for now */
- //char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_get_config_context() );
osrfStringArray* arr = osrfNewStringArray(8);
osrfConfigGetValueList(NULL, arr, "/domains/domain");
char* domain = osrfStringArrayGetString(arr, 0);
char* router_name = osrfConfigGetValue(NULL, "/router_name");
- osrfStringArrayFree(arr);
sprintf( target_buf, "%s@%s/%s", router_name, domain, remote_service );
+ osrfStringArrayFree(arr);
//free(domain);
free(router_name);
}
osrf_app_request* req = _osrf_app_request_init( session, req_msg );
- if(!_osrf_app_session_send( session, req_msg ) ) {
+ if(_osrf_app_session_send( session, req_msg ) ) {
warning_handler( "Error sending request message [%d]", session->thread_trace );
return -1;
}
session->state = OSRF_SESSION_CONNECTING;
int ret = _osrf_app_session_send( session, con_msg );
osrf_message_free(con_msg);
- if(!ret) return 0;
+ if(ret) return 0;
time_t start = time(NULL);
time_t remaining = (time_t) timeout;
--- /dev/null
+#include "osrf_hash.h"
+
+osrfHash* osrfNewHash() {
+ osrfHash* hash = safe_malloc(sizeof(osrfHash));
+ hash->hash = (Pvoid_t) NULL;
+ hash->freeItem = NULL;
+ return hash;
+}
+
+void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... ) {
+ if(!(hash && item && key )) return NULL;
+
+ Word_t* value;
+ VA_LIST_TO_STRING(key);
+ uint8_t idx[strlen(VA_BUF) + 1];
+ strcpy( idx, VA_BUF );
+
+ void* olditem = osrfHashRemove( hash, VA_BUF );
+
+ JSLI(value, hash->hash, idx);
+ if(value) *value = (Word_t) item;
+ return olditem;
+
+}
+
+void* osrfHashRemove( osrfHash* hash, const char* key, ... ) {
+ if(!(hash && key )) return NULL;
+
+ VA_LIST_TO_STRING(key);
+
+ Word_t* value;
+ uint8_t idx[strlen(VA_BUF) + 1];
+ strcpy( idx, VA_BUF );
+ void* item = NULL;
+ int retcode;
+
+ JSLG( value, hash->hash, idx);
+
+ if( value ) {
+ item = (void*) *value;
+ if(item) {
+ if( hash->freeItem ) {
+ hash->freeItem( (char*) idx, item );
+ item = NULL;
+ }
+ }
+ }
+
+
+ JSLD( retcode, hash->hash, idx );
+
+ return item;
+}
+
+
+void* osrfHashGet( osrfHash* hash, const char* key, ... ) {
+ if(!(hash && key )) return NULL;
+
+ VA_LIST_TO_STRING(key);
+
+ Word_t* value;
+ uint8_t idx[strlen(VA_BUF) + 1];
+ strcpy( idx, VA_BUF );
+
+ JSLG( value, hash->hash, idx );
+ if(value) return (void*) *value;
+ return NULL;
+}
+
+
+osrfStringArray* osrfHashKeys( osrfHash* hash ) {
+ if(!hash) return NULL;
+
+ Word_t* value;
+ uint8_t idx[OSRF_HASH_MAXKEY];
+ strcpy(idx, "");
+ char* key;
+ osrfStringArray* strings = osrfNewStringArray(8);
+
+ JSLF( value, hash->hash, idx );
+
+ while( value ) {
+ key = (char*) idx;
+ osrfStringArrayAdd( strings, key );
+ JSLN( value, hash->hash, idx );
+ }
+
+ return strings;
+}
+
+
+unsigned long osrfHashGetCount( osrfHash* hash ) {
+ if(!hash) return -1;
+
+ Word_t* value;
+ unsigned long count = 0;
+ uint8_t idx[OSRF_HASH_MAXKEY];
+
+ strcpy( (char*) idx, "");
+ JSLF(value, hash->hash, idx);
+
+ while(value) {
+ count++;
+ JSLN( value, hash->hash, idx );
+ }
+
+ return count;
+}
+
+void osrfHashFree( osrfHash* hash ) {
+ if(!hash) return;
+
+ int i;
+ osrfStringArray* keys = osrfHashKeys( hash );
+
+ for( i = 0; i != keys->size; i++ ) {
+ char* key = (char*) osrfStringArrayGetString( keys, i );
+ osrfHashRemove( hash, key );
+ }
+
+ osrfStringArrayFree(keys);
+ free(hash);
+}
+
+
+
+osrfHashIterator* osrfNewHashIterator( osrfHash* hash ) {
+ if(!hash) return NULL;
+ osrfHashIterator* itr = safe_malloc(sizeof(osrfHashIterator));
+ itr->hash = hash;
+ itr->current = NULL;
+ return itr;
+}
+
+void* osrfHashIteratorNext( osrfHashIterator* itr ) {
+ if(!(itr && itr->hash)) return NULL;
+
+ Word_t* value;
+ uint8_t idx[OSRF_HASH_MAXKEY];
+
+ if( itr->current == NULL ) { /* get the first item in the list */
+ strcpy(idx, "");
+ JSLF( value, itr->hash->hash, idx );
+
+ } else {
+ strcpy(idx, itr->current);
+ JSLN( value, itr->hash->hash, idx );
+ }
+
+ if(value) {
+ free(itr->current);
+ itr->current = strdup((char*) idx);
+ return (void*) *value;
+ }
+
+ return NULL;
+
+}
+
+void osrfHashIteratorFree( osrfHashIterator* itr ) {
+ if(!itr) return;
+ free(itr->current);
+ free(itr);
+}
+
+void osrfHashIteratorReset( osrfHashIterator* itr ) {
+ if(!itr) return;
+ free(itr->current);
+ itr->current = NULL;
+}
+
+
+
--- /dev/null
+#include <Judy.h>
+#include "opensrf/utils.h"
+#include "opensrf/string_array.h"
+
+#define OSRF_HASH_MAXKEY 256
+
+struct __osrfHashStruct {
+ Pvoid_t hash; /* the hash */
+ void (*freeItem) (char* key, void* item); /* callback for freeing stored items */
+};
+typedef struct __osrfHashStruct osrfHash;
+
+
+struct __osrfHashIteratorStruct {
+ char* current;
+ osrfHash* hash;
+};
+typedef struct __osrfHashIteratorStruct osrfHashIterator;
+
+/**
+ Allocates a new hash object
+ */
+osrfHash* osrfNewHash();
+
+/**
+ Sets the given key with the given item
+ if "freeItem" is defined and an item already exists at the given location,
+ then old item is freed and the new item is put into place.
+ if "freeItem" is not defined and an item already exists, the old item
+ is returned.
+ @return The old item if exists and there is no 'freeItem', returns NULL
+ otherwise
+ */
+void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... );
+
+/**
+ Removes an item from the hash.
+ if 'freeItem' is defined it is used and NULL is returned,
+ else the freed item is returned
+ */
+void* osrfHashRemove( osrfHash* hash, const char* key, ... );
+
+void* osrfHashGet( osrfHash* hash, const char* key, ... );
+
+
+/**
+ @return A list of strings representing the keys of the hash.
+ caller is responsible for freeing the returned string array
+ with osrfStringArrayFree();
+ */
+osrfStringArray* osrfHashKeys( osrfHash* hash );
+
+/**
+ Frees a hash
+ */
+void osrfHashFree( osrfHash* hash );
+
+/**
+ @return The number of items in the hash
+ */
+unsigned long osrfHashGetCount( osrfHash* hash );
+
+
+
+
+/**
+ Creates a new list iterator with the given list
+ */
+osrfHashIterator* osrfNewHashIterator( osrfHash* hash );
+
+/**
+ Returns the next non-NULL item in the list, return NULL when
+ the end of the list has been reached
+ */
+void* osrfHashIteratorNext( osrfHashIterator* itr );
+
+/**
+ Deallocates the given list
+ */
+void osrfHashIteratorFree( osrfHashIterator* itr );
+
+void osrfHashIteratorReset( osrfHashIterator* itr );
+
--- /dev/null
+#include "osrf_list.h"
+
+
+osrfList* osrfNewList() {
+ osrfList* list = safe_malloc(sizeof(osrfList));
+ list->list = (Pvoid_t) NULL;
+ list->size = 0;
+ list->freeItem = NULL;
+ return list;
+}
+
+
+int osrfListPush( osrfList* list, void* item ) {
+ if(!(list && item)) return -1;
+ Word_t* value;
+ unsigned long index = -1;
+ JLL(value, list->list, index );
+ osrfListSet( list, item, index+1 );
+ return 0;
+}
+
+
+void* osrfListSet( osrfList* list, void* item, unsigned long position ) {
+ if(!list || position < 0) return NULL;
+
+ Word_t* value;
+ void* olditem = osrfListRemove( list, position );
+
+ JLI( value, list->list, position );
+ *value = (Word_t) item;
+ __osrfListSetSize( list );
+
+ return olditem;
+}
+
+
+void* osrfListGetIndex( osrfList* list, unsigned long position ) {
+ if(!list) return NULL;
+
+ Word_t* value;
+ JLG( value, list->list, position );
+ if(value) return (void*) *value;
+ return NULL;
+}
+
+void osrfListFree( osrfList* list ) {
+ if(!list) return;
+
+ Word_t* value;
+ unsigned long index = -1;
+ JLL(value, list->list, index );
+ int retcode;
+
+ while (value != NULL) {
+ JLD(retcode, list->list, index);
+
+ if(list->freeItem) {
+ list->freeItem( (void*) *value );
+ *value = (Word_t) NULL;
+ }
+
+ JLP(value, list->list, index);
+ }
+
+ free(list);
+}
+
+void* osrfListRemove( osrfList* list, int position ) {
+ if(!list) return NULL;
+
+ int retcode;
+ Word_t* value;
+ JLG( value, list->list, position );
+ void* olditem = NULL;
+
+ if( value ) {
+
+ olditem = (void*) *value;
+ if( olditem ) {
+ JLD(retcode, list->list, position );
+ if(retcode == 1) {
+ if(list->freeItem) {
+ list->freeItem( olditem );
+ olditem = NULL;
+ }
+ __osrfListSetSize( list );
+ }
+ }
+ }
+
+ return olditem;
+}
+
+
+int osrfListFind( osrfList* list, void* addr ) {
+ if(!(list && addr)) return -1;
+
+ Word_t* value;
+ unsigned long index = -1;
+ JLL(value, list->list, index );
+
+ while (value != NULL) {
+ if( (void*) *value == addr )
+ return index;
+ JLP(value, list->list, index);
+ }
+
+ return -1;
+}
+
+
+
+void __osrfListSetSize( osrfList* list ) {
+ if(!list) return;
+
+ Word_t* value;
+ unsigned long index = -1;
+ JLL(value, list->list, index );
+ list->size = index + 1;
+}
+
+
+unsigned long osrfListGetCount( osrfList* list ) {
+ if(!list) return -1;
+ unsigned long retcode = -1;
+ JLC( retcode, list->list, 0, -1 );
+ return retcode;
+}
+
+
+osrfListIterator* osrfNewListIterator( osrfList* list ) {
+ if(!list) return NULL;
+ osrfListIterator* itr = safe_malloc(sizeof(osrfListIterator));
+ itr->list = list;
+ itr->current = 0;
+ return itr;
+}
+
+void* osrfListIteratorNext( osrfListIterator* itr ) {
+ if(!(itr && itr->list)) return NULL;
+
+ Word_t* value;
+ if(itr->current >= itr->list->size) return NULL;
+ JLF( value, itr->list->list, itr->current );
+ if(value) {
+ itr->current++;
+ return (void*) *value;
+ }
+ return NULL;
+}
+
+void osrfListIteratorFree( osrfListIterator* itr ) {
+ if(!itr) return;
+ free(itr);
+}
+
+
+
+void osrfListIteratorReset( osrfListIterator* itr ) {
+ if(!itr) return;
+ itr->current = 0;
+}
+
+
--- /dev/null
+#include <stdio.h>
+#include "opensrf/utils.h"
+#include <Judy.h>
+
+/**
+ Items are stored as void*'s so it's up to the user to
+ manage the data wisely. Also, if the 'freeItem' callback is defined for the list,
+ then, it will be used on any item that needs to be freed, so don't mix data
+ types in the list if you want magic freeing */
+
+struct __osrfListStruct {
+ Pvoid_t list; /* the list */
+ int size; /* how many items in the list including NULL items between non-NULL items */
+ void (*freeItem) (void* item); /* callback for freeing stored items */
+};
+typedef struct __osrfListStruct osrfList;
+
+
+struct __osrfListIteratorStruct {
+ osrfList* list;
+ unsigned long current;
+};
+typedef struct __osrfListIteratorStruct osrfListIterator;
+
+
+/**
+ Creates a new list iterator with the given list
+ */
+osrfListIterator* osrfNewListIterator( osrfList* list );
+
+/**
+ Returns the next non-NULL item in the list, return NULL when
+ the end of the list has been reached
+ */
+void* osrfListIteratorNext( osrfListIterator* itr );
+
+/**
+ Deallocates the given list
+ */
+void osrfListIteratorFree( osrfListIterator* itr );
+
+void osrfListIteratorReset( osrfListIterator* itr );
+
+
+/**
+ Allocates a new list
+ @param compress If true, the list will compress empty slots on delete. If item positionality
+ is not important, then using this feature is reccomended to keep the list from growing indefinitely.
+ if item positionality is not important.
+ @return The allocated list
+ */
+osrfList* osrfNewList();
+
+/**
+ Pushes an item onto the end of the list. This always finds the highest index
+ in the list and pushes the new item into the list after it.
+ @param list The list
+ @param item The item to push
+ @return 0 on success, -1 on failure
+ */
+int osrfListPush( osrfList* list, void* item );
+
+/**
+ Puts the given item into the list at the specified position. If there
+ is already an item at the given position and the list has it's
+ "freeItem" function defined, then it will be used to free said item.
+ If no 'freeItem' callback is defined, then the displaced item will
+ be returned;
+ @param list The list
+ @param item The item to put into the list
+ @param position The position to place the item in
+ @return NULL in successfully inserting the new item and freeing
+ any displaced items. Returns the displaced item if no "freeItem"
+ callback is defined.
+ */
+void* osrfListSet( osrfList* list, void* item, unsigned long position );
+
+/**
+ Returns the item at the given position
+ @param list The list
+ @param postiont the position
+ */
+void* osrfListGetIndex( osrfList* list, unsigned long position );
+
+/**
+ Frees the list and all list items (if the list has a "freeItem" function defined )
+ @param list The list
+ */
+void osrfListFree( osrfList* list );
+
+/**
+ Removes the list item at the given index
+ @param list The list
+ @param position The position of the item to remove
+ @return A pointer to the item removed if "freeItem" is not defined
+ for this list, returns NULL if it is.
+ */
+void* osrfListRemove( osrfList* list, int position );
+
+/**
+ Finds the list item whose void* is the same as the one passed in
+ @param list The list
+ @param addr The pointer connected to the list item we're to find
+ @return the index of the item, or -1 if the item was not found
+ */
+int osrfListFind( osrfList* list, void* addr );
+
+
+void __osrfListSetSize( osrfList* list );
+
+/**
+ @return The number of non-null items in the list
+ */
+unsigned long osrfListGetCount( osrfList* list );
+
+
if( msg->is_error && ! msg->thread ) {
warning_handler("!! Received jabber layer error for %s ... exiting\n", msg->sender );
+ message_free( msg );
return NULL;
}
if(! msg->thread && ! msg->is_error ) {
warning_handler("Received a non-error message with no thread trace... dropping");
message_free( msg );
+ return NULL;
}
osrf_app_session* session = osrf_app_session_find_session( msg->thread );
char* port = osrfConfigGetValue( NULL, "/port" );
char* unixpath = osrfConfigGetValue( NULL, "/unixpath" );
- char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
+ char* domain = strdup(osrfStringArrayGetString( arr, 0 )); /* just the first for now */
osrfStringArrayFree(arr);
--- /dev/null
+#include "osrf_transgroup.h"
+#include <sys/select.h>
+
+
+osrfTransportGroupNode* osrfNewTransportGroupNode(
+ char* domain, int port, char* username, char* password, char* resource ) {
+
+ if(!(domain && port && username && password && resource)) return NULL;
+
+ osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
+ node->domain = strdup(domain);
+ node->port = port;
+ node->username = strdup(username);
+ node->password = strdup(password);
+ node->domain = strdup(domain);
+ node->active = 0;
+ node->lastsent = 0;
+ node->connection = client_init( domain, port, NULL, 0 );
+
+ return node;
+}
+
+
+osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count ) {
+ if(!nodes || !router || count < 1) return NULL;
+
+ osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
+ grp->currentNode = 0;
+ grp->router = strdup(router);
+ grp->list = osrfNewList(1);
+
+ int i;
+ for( i = 0; i != count; i++ ) osrfListPush( grp->list, nodes[i] );
+ return grp;
+}
+
+
+int osrfTransportGroupConnect( osrfTransportGroup* grp ) {
+ if(!grp) return 0;
+ int i;
+ int active = 0;
+ for( i = 0; i != grp->list->size; i++ ) {
+ osrfTransportGroupNode* node = osrfListGetIndex( grp->list, i );
+ if(client_connect( node->connection, node->username,
+ node->password, node->resource, 10, AUTH_DIGEST )) {
+ node->active = 1;
+ node->lastsent = time(NULL);
+ active++;
+ }
+ }
+ return active;
+}
+
+
+/*
+osrfTransportGroup* osrfNewTransportGroup( char* resource ) {
+
+ grp->username = osrfConfigGetValue( NULL, "/username" );
+ grp->password = osrfConfigGetValue( NULL, "/passwd" );
+ char* port = osrfConfigGetValue( NULL, "/port" );
+ if(port) grp->port = atoi(port);
+ grp->currentNode = 0;
+
+ if(!resource) resource = "client";
+ char* host = getenv("HOSTNAME");
+ if(!host) host = "localhost";
+ char* res = va_list_to_string( "osrf_%s_%s_%d", resource, host, getpid() );
+
+ int i;
+ osrfStringArray* arr = osrfNewStringArray(8);
+ osrfConfigGetValueList(NULL, arr, "/domains/domain");
+
+ for( i = 0; i != arr->size; i++ ) {
+ char* domain = osrfStringArrayGetString( arr, i );
+ if(domain) {
+ node->domain = strdup(domain);
+ node->connection = client_init( domain, grp->port, NULL, 0 );
+ if(client_connect( node->connection, grp->username, grp->password, res, 10, AUTH_DIGEST )) {
+ node->active = 1;
+ node->lastsent = time(NULL);
+ }
+ osrfListPush( grp->list, node );
+ }
+ }
+
+ free(res);
+ osrfStringArrayFree(arr);
+ return grp;
+}
+*/
+
+
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain ) {
+ if(!(grp && msg)) return -1;
+
+ char domain[256];
+ bzero(domain, 256);
+ jid_get_domain( msg->recipient, domain );
+
+ char msgrecip[254];
+ bzero(msgrecip, 254);
+ jid_get_username(msg->recipient, msgrecip);
+
+
+ osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+
+ if( strcmp( msgrecip, grp->router ) ) { /* not a top level router message */
+
+ if(node) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 0;
+ else
+ return warning_handler("Error sending message to domain %s", domain );
+ }
+ return warning_handler("Transport group has no node for domain %s", domain );
+ }
+
+
+ /*
+ if( type == OSRF_SERVER_NODE )
+ return _osrfTGServerSend( grp, msgdom, msg );
+ if( type == OSRF_CLIENT_NODE )
+ return _osrfTGClientSend( grp, msgdom, msg );
+ */
+
+ return -1;
+}
+
+int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+
+ debug_handler("Transport group sending server message to domain %s", domain );
+
+ osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ if(node) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 0;
+ else
+ return warning_handler("Error sending server response to domain %s", domain );
+ }
+ return warning_handler("Transport group has no node for domain %s for server response", domain );
+}
+
+
+int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+
+ debug_handler("Transport group sending client message to domain %s", domain );
+
+ /* first see if we have a node for the requested domain */
+ osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ if(node && node->active) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 0;
+ else
+ node->active = 0;
+ }
+
+ /* if not (or it fails), try sending to the current domain */
+ node = osrfListGetIndex(grp->list, grp->currentNode);
+ if(node && node->active) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 0;
+ }
+
+ /* start at the beginning and try them all ... */
+ grp->currentNode = 0;
+ while( grp->currentNode < grp->list->size ) {
+ if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+ if( (client_send_message( node->connection, msg )) == 0 )
+ return 1;
+ else node->active = 0;
+ }
+ }
+ return -1;
+}
+
+static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
+ if(!(fdset && maxfd)) return 0;
+
+ struct timeval tv;
+ tv.tv_sec = timeout;
+ tv.tv_usec = 0;
+ int retval = 0;
+
+ if( timeout < 0 ) {
+ if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 )
+ return 0;
+
+ } else {
+ if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 )
+ return 0;
+ }
+
+ return retval;
+}
+
+
+transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
+ if(!(grp && grp->list)) return NULL;
+
+ int i;
+ int maxfd = 0;
+ osrfTransportGroupNode* node = NULL;
+ fd_set fdset;
+ FD_ZERO( &fdset );
+
+ for( i = 0; i != grp->list->size; i++ ) {
+ if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+ int fd = node->connection->session->sock_id;
+ if( fd < maxfd ) maxfd = fd;
+ FD_SET( fd, &fdset );
+ }
+ }
+
+ if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
+ for( i = 0; i != grp->list->size; i++ ) {
+ if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+ int fd = node->connection->session->sock_id;
+ if( FD_ISSET( fd, &fdset ) ) {
+ return client_recv( node->connection, 0 );
+ }
+ }
+ }
+ }
+
+ return NULL;
+}
+
+transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
+ if(!(grp && domain)) return NULL;
+
+ osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ if(!node && node->connection && node->connection->session) return NULL;
+ int fd = node->connection->session->sock_id;
+
+ fd_set fdset;
+ FD_ZERO( &fdset );
+ FD_SET( fd, &fdset );
+
+ int active = __osrfTGWait( &fdset, fd, timeout );
+ if(active) return client_recv( node->connection, 0 );
+
+ return NULL;
+}
+
+void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
+ if(!(grp && domain)) return;
+ osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+ if(node) node->active = 0;
+}
+
+osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain ) {
+ if(!(grp && grp->list && domain)) return NULL;
+ int i = 0;
+ osrfTransportGroupNode* node = NULL;
+
+ while( (node = (osrfTransportGroupNode*) osrfListGetIndex( grp->list, i++ )) )
+ if(!strcmp(node->domain, domain)) return node;
+ return NULL;
+}
+
+
+
+
--- /dev/null
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+#include "osrf_list.h"
+#include "osrfConfig.h"
+#include "opensrf/utils.h"
+#include <time.h>
+
+/**
+ Maintains a set of transport clients for redundancy
+ */
+
+//enum osrfTGType { OSRF_SERVER_NODE, OSRF_CLIENT_NODE };
+
+struct __osrfTransportGroupStruct {
+ osrfList* list; /* our lisit of nodes */
+ char* router; /* the login username of the router on this network */
+ int currentNode; /* which node are we currently on. Used for client failover and
+ only gets updated on client messages where a server failed
+ and we need to move to the next server in the list */
+};
+typedef struct __osrfTransportGroupStruct osrfTransportGroup;
+
+
+struct __osrfTransportGroupNode {
+ transport_client* connection; /* our connection to the network */
+ char* domain; /* the domain we're connected to */
+ char* username; /* username used to connect to the group of servers */
+ char* password; /* password used to connect to the group of servers */
+ char* resource; /* the login resource */
+ int port; /* port used to connect to the group of servers */
+
+ int active; /* true if we're able to send data on this connection */
+ time_t lastsent; /* the last time we sent a message */
+};
+typedef struct __osrfTransportGroupNode osrfTransportGroupNode;
+
+
+/**
+ Creates a new group node
+ @param domain The domain we're connecting to
+ @param port The port to connect on
+ @param username The login name
+ @param password The login password
+ @param resource The login resource
+ @return A new transport group node
+ */
+osrfTransportGroupNode* osrfNewTransportGroupNode(
+ char* domain, int port, char* username, char* password, char* resource );
+
+
+/**
+ Allocates and initializes a new transport group.
+ The first node in the array is the default node for client connections.
+ @param router The router name shared accross the networks
+ @param nodes The nodes in the group.
+ */
+osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count );
+
+/**
+ Attempts to connect all of the nodes in this group.
+ @param grp The transport group
+ @return The number of nodes successfully connected
+ */
+int osrfTransportGroupConnect( osrfTransportGroup* grp );
+
+
+/**
+ Sends a transport message
+ If the message is destined for a domain that this group does not have a connection
+ for, then the message is sent out through the currently selected domain.
+ @param grp The transport group
+ @param type Whether this is a client request or a server response
+ @param msg The message to send
+ @param newdomain A pre-allocated buffer in which to write the name of the
+ new domain if a the expected domain could not be sent to.
+ @return 0 on normal successful send. Returns 1 if the message was sent
+ to a new domain (note: this can only happen when type == OSRF_CLIENT_NODE)
+ Returns -1 if the message cannot be sent.
+ */
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain );
+
+int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
+int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
+
+/**
+ Waits on all connections for inbound data.
+ @param grp The transport group
+ @param timeout How long to wait for data. 0 means check for data
+ but don't wait, a negative number means to wait indefinitely
+ @return The received message or NULL if the timeout occurred before a
+ message was received
+ */
+transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout );
+
+/**
+ Waits for data from a single domain
+ @param grp The transport group
+ @param domain The domain to wait for data on
+ @param timeout see osrfTransportGroupRecvAll
+ */
+transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout );
+
+/**
+ Tells the group that the connect to the last message sent to the provided
+ domain did not make it through;
+ @param grp The transport group
+ @param comain The failed domain
+ */
+void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain );
+
+
+/**
+ Finds a node in our list of nodes
+ */
+osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain );
+
+
int session_send_msg(
transport_session* session, transport_message* msg ) {
- if( ! session ) { return 0; }
+ if( ! session ) { return -1; }
if( ! session->state_machine->connected ) {
warning_handler("State machine is not connected in send_msg()");
- return 0;
+ return -1;
}
message_prepare_xml( msg );
//tcp_send( session->sock_obj, msg->msg_xml );
- socket_send( session->sock_id, msg->msg_xml );
-
- return 1;
+ return socket_send( session->sock_id, msg->msg_xml );
}
CFLAGS += -D_ROUTER
all: opensrf_router
+#osrf_router
install:
cp opensrf_router $(BINDIR)
-opensrf_router: router.o
- $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@
-router.o: router.c router.h
+#opensrf_router: router.o
+# $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@
+#router.o: router.c router.h
+
+opensrf_router: osrf_router.o osrf_router_main.o
+ $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) osrf_router.o osrf_router_main.o -o $@
+osrf_router.o: osrf_router.c osrf_router.h
+osrf_router_main.o: osrf_router_main.c
clean:
/bin/rm -f *.o opensrf_router
--- /dev/null
+#include "osrf_router.h"
+
+#define ROUTER_SOCKFD connection->session->sock_id
+#define ROUTER_REGISTER "register"
+#define ROUTER_UNREGISTER "unregister"
+
+
+#define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
+
+osrfRouter* osrfNewRouter(
+ char* domain, char* name,
+ char* resource, char* password, int port,
+ osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
+
+ if(!( domain && name && resource && password && port )) return NULL;
+
+ osrfRouter* router = safe_malloc(sizeof(osrfRouter));
+ router->domain = strdup(domain);
+ router->name = strdup(name);
+ router->password = strdup(password);
+ router->resource = strdup(resource);
+ router->port = port;
+
+ router->trustedClients = trustedClients;
+ router->trustedServers = trustedServers;
+
+ router->classes = osrfNewHash();
+ router->classes->freeItem = &osrfRouterClassFree;
+
+ router->connection = client_init( domain, port, NULL, 0 );
+
+ return router;
+}
+
+
+
+int osrfRouterConnect( osrfRouter* router ) {
+ if(!router) return -1;
+ int ret = client_connect( router->connection, router->name,
+ router->password, router->resource, 10, AUTH_DIGEST );
+ if( ret == 0 ) return -1;
+ return 0;
+}
+
+
+void osrfRouterRun( osrfRouter* router ) {
+ if(!(router && router->classes)) return;
+
+ int routerfd = router->ROUTER_SOCKFD;
+ int selectret = 0;
+
+ while(1) {
+
+ fd_set set;
+ int maxfd = __osrfRouterFillFDSet( router, &set );
+ int numhandled = 0;
+
+ if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
+ warning_handler("Top level select call failed with errno %d", errno);
+ continue;
+ }
+
+ /* see if there is a top level router message */
+
+ if( FD_ISSET(routerfd, &set) ) {
+ debug_handler("Top router socket is active: %d", routerfd );
+ numhandled++;
+ osrfRouterHandleIncoming( router );
+ }
+
+
+ /* now check each of the connected classes and see if they have data to route */
+ while( numhandled < selectret ) {
+
+ osrfRouterClass* class;
+ osrfHashIterator* itr = osrfNewHashIterator(router->classes);
+
+ while( (class = osrfHashIteratorNext(itr)) ) {
+
+ char* classname = itr->current;
+
+ if( classname && (class = osrfRouterFindClass( router, classname )) ) {
+
+ debug_handler("Checking %s for activity...", classname );
+
+ int sockfd = class->ROUTER_SOCKFD;
+ if(FD_ISSET( sockfd, &set )) {
+ debug_handler("Socket is active: %d", sockfd );
+ numhandled++;
+ osrfRouterClassHandleIncoming( router, classname, class );
+ }
+ }
+ }
+
+ osrfHashIteratorFree(itr);
+ }
+ }
+}
+
+
+void osrfRouterHandleIncoming( osrfRouter* router ) {
+ if(!router) return;
+
+ transport_message* msg = NULL;
+
+ if( (msg = client_recv( router->connection, 0 )) ) {
+
+ if( msg->sender ) {
+
+ /* if the sender is not a trusted server, drop the message */
+ int len = strlen(msg->sender) + 1;
+ char domain[len];
+ bzero(domain, len);
+ jid_get_domain( msg->sender, domain );
+
+ if(osrfStringArrayContains( router->trustedServers, domain))
+ osrfRouterHandleMessage( router, msg );
+ else
+ warning_handler("Received message from un-trusted server domain %s", msg->sender);
+ }
+
+ message_free(msg);
+ }
+}
+
+int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
+ if(!(router && class)) return -1;
+
+ transport_message* msg;
+ debug_handler("osrfRouterClassHandleIncoming()");
+
+ if( (msg = client_recv( class->connection, 0 )) ) {
+
+ if( msg->sender ) {
+
+ /* if the client is not from a trusted domain, drop the message */
+ int len = strlen(msg->sender) + 1;
+ char domain[len];
+ bzero(domain, len);
+ jid_get_domain( msg->sender, domain );
+
+ if(osrfStringArrayContains( router->trustedClients, domain)) {
+
+ transport_message* bouncedMessage = NULL;
+ if( msg->is_error ) {
+
+ /* handle bounced message */
+ if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) )
+ return -1; /* we have no one to send the requested message to */
+
+ message_free( msg );
+ msg = bouncedMessage;
+ }
+ osrfRouterClassHandleMessage( router, class, msg );
+
+ } else {
+ warning_handler("Received client message from untrusted client domain %s", domain );
+ }
+ }
+
+ message_free( msg );
+ }
+
+ return 0;
+}
+
+
+
+
+int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
+ if(!(router && msg)) return -1;
+
+ if( !msg->router_command || !strcmp(msg->router_command,""))
+ return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
+
+ if(!msg->router_class) return -1;
+
+ osrfRouterClass* class = NULL;
+ if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
+ class = osrfRouterFindClass( router, msg->router_class );
+
+ info_handler("Registering class %s", msg->router_class );
+
+ if(!class) class = osrfRouterAddClass( router, msg->router_class );
+
+ if(class) {
+
+ if( osrfRouterClassFindNode( class, msg->sender ) )
+ return 0;
+ else
+ osrfRouterClassAddNode( class, msg->sender );
+
+ }
+
+ } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
+
+ if( msg->router_class && strcmp( msg->router_class, "") ) {
+ info_handler("Unregistering router class %s", msg->router_class );
+ osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
+ }
+ }
+
+ return 0;
+}
+
+
+
+osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
+ if(!(router && router->classes && classname)) return NULL;
+
+ osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
+ class->nodes = osrfNewHash();
+ class->itr = osrfNewHashIterator(class->nodes);
+ class->nodes->freeItem = &osrfRouterNodeFree;
+ class->router = router;
+
+ class->connection = client_init( router->domain, router->port, NULL, 0 );
+
+ if(!client_connect( class->connection, router->name,
+ router->password, classname, 10, AUTH_DIGEST ) ) {
+ osrfRouterClassFree( classname, class );
+ return NULL;
+ }
+
+ osrfHashSet( router->classes, class, classname );
+ return class;
+}
+
+
+int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
+ if(!(rclass && rclass->nodes && remoteId)) return -1;
+
+ info_handler("Adding router node for remote id %s", remoteId );
+
+ osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
+ node->count = 0;
+ node->lastMessage = NULL;
+ node->remoteId = strdup(remoteId);
+
+ osrfHashSet( rclass->nodes, node, remoteId );
+ return 0;
+}
+
+/* copy off the lastMessage, remove the offending node, send error if it's tht last node
+ ? return NULL if it's the last node ?
+ */
+
+transport_message* osrfRouterClassHandleBounce(
+ osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
+
+ debug_handler("osrfRouterClassHandleBounce()");
+
+ warning_handler("Received network layer error message from %s", msg->sender );
+ osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
+ transport_message* lastSent = NULL;
+
+ if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
+
+ if( node->lastMessage ) {
+ warning_handler("We lost the last node in the class, responding with error and removing...");
+
+ transport_message* error = message_init(
+ node->lastMessage->body, node->lastMessage->subject,
+ node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
+ set_msg_error( error, "cancel", 501 );
+
+ /* send the error message back to the original sender */
+ client_send_message( rclass->connection, error );
+ message_free( error );
+ }
+
+ return NULL;
+
+ } else {
+
+ if( node->lastMessage ) {
+ debug_handler("Cloning lastMessage so next node can send it");
+ lastSent = message_init( node->lastMessage->body,
+ node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
+ message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
+ }
+ }
+
+ /* remove the dead node */
+ osrfRouterClassRemoveNode( router, classname, msg->sender);
+ return lastSent;
+}
+
+
+/**
+ If we get a regular message, we send it to the next node in the list of nodes
+ if we get an error, it's a bounce back from a previous attempt. We take the
+ body and thread from the last sent on the node that had the bounced message
+ and propogate them on to the new message being sent
+ */
+int osrfRouterClassHandleMessage(
+ osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
+ if(!(router && rclass && msg)) return -1;
+
+ debug_handler("osrfRouterClassHandleMessage()");
+
+ osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
+ if(!node) {
+ osrfHashIteratorReset(rclass->itr);
+ node = osrfHashIteratorNext( rclass->itr );
+ }
+
+ if(node) {
+
+ transport_message* new_msg= message_init( msg->body,
+ msg->subject, msg->thread, node->remoteId, msg->sender );
+ message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
+
+ info_handler( "Routing message:\nfrom: [%s]\nto: [%s]",
+ new_msg->router_from, new_msg->recipient );
+
+ message_free( node->lastMessage );
+ node->lastMessage = new_msg;
+
+ if ( client_send_message( rclass->connection, new_msg ) == 0 )
+ node->count++;
+
+ else {
+ message_prepare_xml(new_msg);
+ warning_handler("Error sending message from %s to %s\n%s",
+ new_msg->sender, new_msg->recipient, new_msg->msg_xml );
+ }
+
+ }
+
+ return 0;
+}
+
+
+int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
+ if(!(router && router->classes && classname)) return -1;
+ info_handler("Removing router class %s", classname );
+ osrfHashRemove( router->classes, classname );
+ return 0;
+}
+
+
+int osrfRouterClassRemoveNode(
+ osrfRouter* router, char* classname, char* remoteId ) {
+
+ if(!(router && router->classes && classname && remoteId)) return 0;
+
+ info_handler("Removing router node %s", remoteId );
+
+ osrfRouterClass* class = osrfRouterFindClass( router, classname );
+
+ if( class ) {
+
+ osrfHashRemove( class->nodes, remoteId );
+ if( osrfHashGetCount(class->nodes) == 0 ) {
+ osrfRouterRemoveClass( router, classname );
+ return 1;
+ }
+
+ return 0;
+ }
+
+ return -1;
+}
+
+
+void osrfRouterClassFree( char* classname, void* c ) {
+ if(!(classname && c)) return;
+ osrfRouterClass* rclass = (osrfRouterClass*) c;
+ client_disconnect( rclass->connection );
+ client_free( rclass->connection );
+
+ osrfHashIteratorReset( rclass->itr );
+ osrfRouterNode* node;
+
+ while( (node = osrfHashIteratorNext(rclass->itr)) )
+ osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
+
+ free(rclass);
+}
+
+
+void osrfRouterNodeFree( char* remoteId, void* n ) {
+ if(!n) return;
+ osrfRouterNode* node = (osrfRouterNode*) n;
+ free(node->remoteId);
+ message_free(node->lastMessage);
+ free(node);
+}
+
+
+void osrfRouterFree( osrfRouter* router ) {
+ if(!router) return;
+
+ free(router->domain);
+ free(router->name);
+ free(router->resource);
+ free(router->password);
+
+ osrfStringArrayFree( router->trustedClients );
+ osrfStringArrayFree( router->trustedServers );
+
+ client_free( router->connection );
+ free(router);
+}
+
+
+
+osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
+ if(!( router && router->classes && classname )) return NULL;
+ return (osrfRouterClass*) osrfHashGet( router->classes, classname );
+}
+
+
+osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
+ if(!(rclass && remoteId)) return NULL;
+ return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
+}
+
+
+int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
+ if(!(router && router->classes && set)) return -1;
+
+ FD_ZERO(set);
+ int maxfd = router->ROUTER_SOCKFD;
+ FD_SET(maxfd, set);
+
+ int sockid;
+
+ osrfRouterClass* class = NULL;
+ osrfHashIterator* itr = osrfNewHashIterator(router->classes);
+
+ while( (class = osrfHashIteratorNext(itr)) ) {
+ char* classname = itr->current;
+
+ if( classname && (class = osrfRouterFindClass( router, classname )) ) {
+ sockid = class->ROUTER_SOCKFD;
+
+ if( osrfUtilsCheckFileDescriptor( sockid ) ) {
+ osrfRouterRemoveClass( router, classname );
+
+ } else {
+ if( sockid > maxfd ) maxfd = sockid;
+ FD_SET(sockid, set);
+ }
+ }
+ }
+
+ osrfHashIteratorFree(itr);
+ return maxfd;
+}
+
+
+
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
+
+ int T = 32;
+ osrfMessage* arr[T];
+ memset(arr, 0, T );
+
+ int num_msgs = osrf_message_deserialize( msg->body, arr, T );
+ osrfMessage* omsg = NULL;
+
+ int i;
+ for( i = 0; i != num_msgs; i++ ) {
+
+ if( !(omsg = arr[i]) ) continue;
+
+ switch( omsg->m_type ) {
+
+ case CONNECT:
+ osrfRouterRespondConnect( router, msg, omsg );
+ break;
+
+ case REQUEST:
+ osrfRouterProcessAppRequest( router, msg, omsg );
+ break;
+
+ default: break;
+ }
+
+ osrfMessageFree( omsg );
+ }
+
+ return 0;
+}
+
+int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+ if(!(router && msg && omsg)) return -1;
+
+ osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
+
+ debug_handler("router recevied a CONNECT message from %s", msg->sender );
+
+ osrf_message_set_status_info(
+ success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
+
+ char* data = osrf_message_serialize(success);
+
+ transport_message* return_m = message_init(
+ data, "", msg->thread, msg->sender, "" );
+
+ client_send_message(router->connection, return_m);
+
+ free(data);
+ osrf_message_free(success);
+ message_free(return_m);
+
+ return 0;
+}
+
+
+
+int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+
+ if(!(router && msg && omsg && omsg->method_name)) return -1;
+
+ info_handler("Router received app request: %s", omsg->method_name );
+
+ jsonObject* jresponse = NULL;
+ if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
+
+ int i;
+ jresponse = jsonParseString("[]");
+
+ osrfStringArray* keys = osrfHashKeys( router->classes );
+ for( i = 0; i != keys->size; i++ )
+ jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
+ osrfStringArrayFree(keys);
+
+
+ } else {
+
+ return osrfRouterHandleMethodNFound( router, msg, omsg );
+ }
+
+
+ osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
+ jsonObjectFree(jresponse);
+
+ return 0;
+
+}
+
+
+
+int osrfRouterHandleMethodNFound(
+ osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+
+ osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
+ osrf_message_set_status_info( err,
+ "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
+
+ char* data = osrf_message_serialize(err);
+
+ transport_message* tresponse = message_init(
+ data, "", msg->thread, msg->sender, msg->recipient );
+
+ client_send_message(router->connection, tresponse );
+
+ free(data);
+ osrf_message_free( err );
+ message_free(tresponse);
+ return 0;
+}
+
+
+
+int osrfRouterHandleAppResponse( osrfRouter* router,
+ transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
+
+ if( response ) { /* send the response message */
+
+ osrfMessage* oresponse = osrf_message_init(
+ RESULT, omsg->thread_trace, omsg->protocol );
+
+ char* json = jsonObjectToJSON(response);
+ osrf_message_set_result_content( oresponse, json);
+
+ char* data = osrf_message_serialize(oresponse);
+ debug_handler( "Responding to client app request with data: \n%s\n", data );
+
+ transport_message* tresponse = message_init(
+ data, "", msg->thread, msg->sender, msg->recipient );
+
+ client_send_message(router->connection, tresponse );
+
+ osrfMessageFree(oresponse);
+ message_free(tresponse);
+ free(json);
+ free(data);
+ }
+
+
+ /* now send the 'request complete' message */
+ osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
+ osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
+
+ char* statusdata = osrf_message_serialize(status);
+
+ transport_message* sresponse = message_init(
+ statusdata, "", msg->thread, msg->sender, msg->recipient );
+ client_send_message(router->connection, sresponse );
+
+
+ free(statusdata);
+ osrfMessageFree(status);
+ message_free(sresponse);
+
+ return 0;
+}
+
+
+
+
--- /dev/null
+#include <sys/select.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include "opensrf/utils.h"
+#include "opensrf/osrf_list.h"
+#include "opensrf/osrf_hash.h"
+
+#include "opensrf/string_array.h"
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+
+#include "opensrf/osrf_message.h"
+
+
+
+/* a router maintains a list of server classes */
+struct __osrfRouterStruct {
+
+ osrfHash* classes; /* our list of server classes */
+ char* domain; /* our login domain */
+ char* name;
+ char* resource;
+ char* password;
+ int port;
+
+ osrfStringArray* trustedClients;
+ osrfStringArray* trustedServers;
+
+ transport_client* connection;
+};
+
+typedef struct __osrfRouterStruct osrfRouter;
+
+
+/* a class maintains a set of server nodes */
+struct __osrfRouterClassStruct {
+ osrfRouter* router; /* our router handle */
+ osrfHashIterator* itr;
+ osrfHash* nodes;
+ transport_client* connection;
+};
+typedef struct __osrfRouterClassStruct osrfRouterClass;
+
+/* represents a link to a single server's inbound connection */
+struct __osrfRouterNodeStruct {
+ char* remoteId; /* send message to me via this login */
+ int count; /* how many message have been sent to this node */
+ transport_message* lastMessage;
+};
+typedef struct __osrfRouterNodeStruct osrfRouterNode;
+
+/**
+ Allocates a new router.
+ @param domain The jabber domain to connect to
+ @param name The login name for the router
+ @param resource The login resource for the router
+ @param password The login password for the new router
+ @param port The port to connect to the jabber server on
+ @param trustedClients The array of client domains that we allow to send requests through us
+ @param trustedServers The array of server domains that we allow to register, etc. with ust.
+ @return The allocated router or NULL on memory error
+ */
+osrfRouter* osrfNewRouter( char* domain, char* name, char* resource,
+ char* password, int port, osrfStringArray* trustedClients, osrfStringArray* trustedServers );
+
+/**
+ Connects the given router to the network
+ */
+int osrfRouterConnect( osrfRouter* router );
+
+/**
+ Waits for incoming data to route
+ If this function returns, then the router's connection to the jabber server
+ has failed.
+ */
+void osrfRouterRun( osrfRouter* router );
+
+
+/**
+ Allocates and adds a new router class handler to the router's list of handlers.
+ Also connects the class handler to the network at <routername>@domain/<classname>
+ @param router The current router instance
+ @param classname The name of the class this node handles.
+ @return 0 on success, -1 on connection error.
+ */
+osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname );
+
+/**
+ Adds a new server node to the given class.
+ @param rclass The Router class to add the node to
+ @param remoteId The remote login of this node
+ @return 0 on success, -1 on generic error
+ */
+int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId );
+
+
+/**
+ Handles top level router messages
+ @return 0 on success
+ */
+int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg );
+
+
+/**
+ Handles class level requests
+ @return 0 on success
+ */
+int osrfRouterClassHandleMessage( osrfRouter* router,
+ osrfRouterClass* rclass, transport_message* msg );
+
+/**
+ Removes a given class from the router, freeing as it goes
+ */
+int osrfRouterRemoveClass( osrfRouter* router, char* classname );
+
+/**
+ Removes the given node from the class. Also, if this is that last node in the set,
+ removes the class from the router
+ @return 0 on successful removal with no class removal
+ @return 1 on successful remove with class removal
+ @return -1 error on removal
+ */
+int osrfRouterClassRemoveNode( osrfRouter* router, char* classname, char* remoteId );
+
+/**
+ Frees a router class object
+ Takes a void* since it is freed by the hash code
+ */
+void osrfRouterClassFree( char* classname, void* rclass );
+
+/**
+ Frees a router node object
+ Takes a void* since it is freed by the list code
+ */
+void osrfRouterNodeFree( char* remoteId, void* node );
+
+
+/**
+ Frees a router
+ */
+void osrfRouterFree( osrfRouter* router );
+
+/**
+ Finds the class associated with the given class name in the list of classes
+ */
+osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname );
+
+/**
+ Finds the router node within this class with the given remote id
+ */
+osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId );
+
+
+/**
+ Clears and populates the provided fd_set* with file descriptors
+ from the router's top level connection as well as each of the
+ router class connections
+ @return The largest file descriptor found in the filling process
+ */
+int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set );
+
+
+
+/**
+ Utility method for handling incoming requests to the router
+ and making sure the sender is allowed.
+ */
+void osrfRouterHandleIncoming( osrfRouter* router );
+
+/**
+ Utility method for handling incoming requests to a router class,
+ makes sure sender is a trusted client
+ */
+int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class );
+
+/* handles case where router node is not longer reachable. copies over the
+ data from the last sent message and returns a newly crafted suitable for treating
+ as a newly inconing message. Removes the dead node and If there are no more
+ nodes to send the new message to, returns NULL.
+ */
+transport_message* osrfRouterClassHandleBounce(
+ osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg );
+
+
+
+/**
+ handles messages that don't have a 'router_command' set. They are assumed to
+ be app request messages
+ */
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg );
+
+
+/**
+ Handles connects, disconnects, etc.
+ */
+int osrfRouterHandeStatusMessage( osrfRouter* router, transport_message* msg );
+
+
+/**
+ Handles REQUEST messages
+ */
+int osrfRouterHandleRequestMessage( osrfRouter* router, transport_message* msg );
+
+
+
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg );
+
+
+int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
+
+
+int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
+int osrfRouterHandleAppResponse( osrfRouter* router,
+ transport_message* msg, osrfMessage* omsg, jsonObject* response );
+
+
+int osrfRouterHandleMethodNFound( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
--- /dev/null
+#include "osrf_router.h"
+#include "opensrf/osrfConfig.h"
+#include "opensrf/utils.h"
+#include "opensrf/logging.h"
+#include <signal.h>
+
+osrfRouter* __osrfRouter = NULL;
+
+void routerSignalHandler( int signal ) {
+ warning_handler("Received signal [%d], cleaning up...", signal );
+ osrfConfigCleanup();
+ osrfRouterFree(__osrfRouter);
+ log_free();
+}
+
+static int __setupRouter( char* config, char* context );
+
+
+int main( int argc, char* argv[] ) {
+
+ if( argc < 3 ) {
+ fatal_handler( "Usage: %s <path_to_config_file> <config_context>", argv[0] );
+ exit(0);
+ }
+
+ char* config = strdup( argv[1] );
+ char* context = strdup( argv[2] );
+ init_proc_title( argc, argv );
+ set_proc_title( "OpenSRF Router" );
+
+ return __setupRouter( config, context );
+ free(config);
+ free(context);
+
+}
+
+int __setupRouter( char* config, char* context ) {
+
+ fprintf(stderr, "Launching router with config %s and config context %s", config, context );
+ osrfConfig* cfg = osrfConfigInit( config, context );
+ osrfConfigSetDefaultConfig(cfg);
+
+
+ char* server = osrfConfigGetValue(NULL, "/transport/server");
+ char* port = osrfConfigGetValue(NULL, "/transport/port");
+ char* username = osrfConfigGetValue(NULL, "/transport/username");
+ char* password = osrfConfigGetValue(NULL, "/transport/password");
+ char* resource = osrfConfigGetValue(NULL, "/transport/resource");
+
+ /* set up the logger */
+ char* level = osrfConfigGetValue(NULL, "/loglevel");
+ char* log_file = osrfConfigGetValue(NULL, "/logfile");
+
+ int llevel = 1;
+ if(level) llevel = atoi(level);
+
+ if(!log_init( llevel, log_file ))
+ fprintf(stderr, "Unable to init logging, going to stderr...\n" );
+
+ free(level);
+ free(log_file);
+
+ info_handler( "Router connecting as: server: %s port: %s "
+ "user: %s resource: %s", server, port, username, resource );
+
+ int iport = 0;
+ if(port) iport = atoi( port );
+
+ osrfStringArray* tclients = osrfNewStringArray(4);
+ osrfStringArray* tservers = osrfNewStringArray(4);
+ osrfConfigGetValueList(NULL, tservers, "/trusted_domains/server" );
+ osrfConfigGetValueList(NULL, tclients, "/trusted_domains/client" );
+
+ int i;
+ for( i = 0; i != tservers->size; i++ )
+ info_handler( "Router adding trusted server: %s", osrfStringArrayGetString( tservers, i ) );
+
+ for( i = 0; i != tclients->size; i++ )
+ info_handler( "Router adding trusted client: %s", osrfStringArrayGetString( tclients, i ) );
+
+ osrfRouter* router = osrfNewRouter( server,
+ username, resource, password, iport, tclients, tservers );
+
+ signal(SIGHUP,routerSignalHandler);
+ signal(SIGINT,routerSignalHandler);
+ signal(SIGTERM,routerSignalHandler);
+
+ if( (osrfRouterConnect(router)) != 0 ) {
+ fprintf(stderr, "!!!! Unable to connect router to jabber server %s... exiting", server );
+ return -1;
+ }
+
+ free(server); free(port);
+ free(username); free(password);
+
+ __osrfRouter = router;
+ daemonize();
+ osrfRouterRun( router );
+
+ return -1;
+
+}
+
+
void string_array_destroy(string_array* arr) {
if(arr) {
int i = 0;
- while( i++ < arr->size ) free(arr->array[i]);
+ while( i < arr->size ) free(arr->array[i++]);
free(arr->array);
free(arr);
}
}
+
+
+int osrfStringArrayContains( osrfStringArray* arr, char* string ) {
+ if(!(arr && string)) return 0;
+
+ int i;
+ for( i = 0; i != arr->size; i++ ) {
+ char* str = osrfStringArrayGetString(arr, i);
+ if(str) {
+ if(!strcmp(str, string)) return 1;
+ }
+ }
+
+ return 0;
+}
+
char* string_array_get_string(osrfStringArray* arr, int index);
char* osrfStringArrayGetString(osrfStringArray* arr, int index);
+/* returns true if this array contains the given string */
+int osrfStringArrayContains( osrfStringArray* arr, char* string );
+
void string_array_destroy(osrfStringArray*);
void osrfStringArrayFree(osrfStringArray*);
*/
#include "utils.h"
+#include <errno.h>
inline void* safe_malloc( int size ) {
void* ptr = (void*) malloc( size );
}
+int osrfUtilsCheckFileDescriptor( int fd ) {
+
+ fd_set tmpset;
+ FD_ZERO(&tmpset);
+ FD_SET(fd, &tmpset);
+
+ struct timeval tv;
+ tv.tv_sec = 0;
+ tv.tv_usec = 0;
+
+ if( select(fd + 1, &tmpset, NULL, NULL, &tv) == -1 ) {
+ if( errno == EBADF ) return -1;
+ }
+
+ return 0;
+}
char* md5sum( char* text, ... );
+/**
+ Checks the validity of the file descriptor
+ returns -1 if the file descriptor is invalid
+ returns 0 if the descriptor is OK
+ */
+int osrfUtilsCheckFileDescriptor( int fd );
+
#endif