added list and hash code based on libJudy
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 3 Oct 2005 22:19:41 +0000 (22:19 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 3 Oct 2005 22:19:41 +0000 (22:19 +0000)
re-coded the router
added preliminary transport_group code for client redundancy (far from functional)
various twists and tweaks
fixed memory error in string_array code
update makefiles

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@541 9efc2488-bf62-4759-914b-345cdb29e865

21 files changed:
src/Makefile
src/libstack/Makefile
src/libstack/opensrf.c
src/libstack/osrf_app_session.c
src/libstack/osrf_hash.c [new file with mode: 0644]
src/libstack/osrf_hash.h [new file with mode: 0644]
src/libstack/osrf_list.c [new file with mode: 0644]
src/libstack/osrf_list.h [new file with mode: 0644]
src/libstack/osrf_stack.c
src/libstack/osrf_system.c
src/libstack/osrf_transgroup.c [new file with mode: 0644]
src/libstack/osrf_transgroup.h [new file with mode: 0644]
src/libtransport/transport_session.c
src/router/Makefile
src/router/osrf_router.c [new file with mode: 0644]
src/router/osrf_router.h [new file with mode: 0644]
src/router/osrf_router_main.c [new file with mode: 0644]
src/utils/string_array.c
src/utils/string_array.h
src/utils/utils.c
src/utils/utils.h

index 7ed9737..864b0fc 100644 (file)
@@ -28,6 +28,9 @@ OPENSRF_TARGETS = libtransport/transport_session.o \
                                                libstack/osrf_application.o \
                                                libstack/osrf_cache.o \
                                                libstack/xml_utils.o \
+                                               libstack/osrf_transgroup.o \
+                                               libstack/osrf_list.o \
+                                               libstack/osrf_hash.o \
                                                libstack/osrf_log.o \
                                                utils/socket_bundle.o \
                                                utils/string_array.o \
@@ -50,6 +53,9 @@ OPENSRF_HEADERS = libtransport/transport_session.h \
                                                libstack/osrf_cache.h \
                                                libstack/xml_utils.h \
                                                libstack/osrf_log.h \
+                                               libstack/osrf_transgroup.h \
+                                               libstack/osrf_list.h \
+                                               libstack/osrf_hash.h \
                                                utils/socket_bundle.h \
                                                utils/string_array.h \
                                                utils/utils.h \
@@ -74,7 +80,7 @@ libopensrf.so:        objson/libobjson.so
        @echo stack
        make -C libstack
        @echo $@
-       $(CC) -shared -W1 $(LDFLAGS) -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF)
+       $(CC) -shared -W1 $(LDFLAGS) -lJudy -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF)
        @echo apps
        make -C  c-apps
 
index b6b76de..8399594 100644 (file)
@@ -3,7 +3,7 @@
 #      provided to any method is not at least as large as the 'argc' setting for the method
 
 CFLAGS +=  -DASSUME_STATELESS  -DOSRF_LOG_PARAMS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing
-LDLIBS += -lxml2 -lobjson -ldl -lmemcache
+LDLIBS += -lxml2 -lobjson -ldl -lmemcache -LJudy
 
 TARGETS = osrf_message.o \
                         osrf_app_session.o \
@@ -15,6 +15,9 @@ TARGETS = osrf_message.o \
                         osrf_application.o \
                         osrf_cache.o \
                         osrf_log.o \
+                        osrf_transgroup.o \
+                        osrf_list.o \
+                        osrf_hash.o \
                         xml_utils.o
 
 HEADERS = osrf_message.h \
@@ -27,6 +30,9 @@ HEADERS = osrf_message.h \
                         osrf_application.h \
                         osrf_cache.h \
                         osrf_log.h \
+                        osrf_transgroup.h \
+                        osrf_list.h \
+                        osrf_hash.h \
                         xml_utils.h
 
 all: xml_utils.o $(TARGETS) copy 
@@ -49,6 +55,9 @@ osrfConfig.o: osrfConfig.c osrfConfig.h xml_utils.o
 osrf_application.o: osrf_application.c osrf_application.h
 osrf_cache.o:  osrf_cache.c osrf_cache.h
 osrf_log.o:    osrf_log.c osrf_log.h
+osrf_list.o:   osrf_list.c osrf_list.h
+osrf_hash.o:   osrf_hash.c osrf_hash.h
+
 
 clean:
        /bin/rm -f *.o libopensrf_stack.so xml_utils.h xml_utils.c
index 403a3d6..995fa84 100644 (file)
@@ -1,7 +1,85 @@
 #include "osrf_system.h"
+#include "osrf_hash.h"
+#include "osrf_list.h"
+
+//static void _free(void* i) { free(i); }
+//static void _hfree(char* c, void* i) { free(i); }
 
 int main( int argc, char* argv[] ) {
 
+       /*
+       osrfHash* list = osrfNewHash();
+       list->freeItem = _hfree;
+
+       char* x = strdup("X");
+       char* y = strdup("Y");
+       char* z = strdup("Z");
+       osrfHashSet( list, x, "test1" );
+       osrfHashSet( list, y, "test2" );
+       osrfHashSet( list, z, "test3" );
+
+       char* q = (char*) osrfHashGet( list, "test1" );
+       printf( "%s\n", q );
+
+       q = (char*) osrfHashGet( list, "test2" );
+       printf( "%s\n", q );
+
+       q = (char*) osrfHashGet( list, "test3" );
+       printf( "%s\n", q );
+
+       osrfHashIterator* itr = osrfNewHashIterator(list);
+       char* val;
+
+       while( (val = osrfHashIteratorNext(itr)) )
+               printf("Iterated item: %s\n", val );
+
+       osrfHashIteratorReset(itr);
+       while( (val = osrfHashIteratorNext(itr)) )
+               printf("Iterated item: %s\n", val );
+
+       printf( "Count: %lu\n", osrfHashGetCount(list));
+
+       osrfHashIteratorFree(itr);
+
+       osrfHashFree(list);
+
+       exit(1);
+
+       osrfList* list = osrfNewList();
+       list->freeItem = _free;
+
+       char* x = strdup("X");
+       char* y = strdup("Y");
+       char* z = strdup("Z");
+       osrfListSet( list, x, 0 );
+       osrfListSet( list, y, 2 );
+       osrfListSet( list, z, 4 );
+
+       char* q = (char*) osrfListGetIndex( list, 4 );
+       printf( "%s\n", q );
+
+       osrfListIterator* itr = osrfNewListIterator( list );
+       char* val;
+
+       while( (val = osrfListIteratorNext(itr)) ) 
+               printf("Found val: %s\n", val );
+
+       osrfListIteratorReset(itr);
+       printf("\n");
+       while( (val = osrfListIteratorNext(itr)) ) 
+               printf("Found val: %s\n", val );
+
+       osrfListIteratorFree(itr);
+
+       printf( "Count: %lu\n", osrfListGetCount(list));
+
+       osrfListFree(list);
+
+       exit(1);
+       */
+
+
+
        if( argc < 4 ) {
                fprintf(stderr, "Usage: %s <host> <bootstrap_config> <config_context>\n", argv[0]);
                return 1;
index 83a1644..66b105d 100644 (file)
@@ -245,16 +245,14 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) {
 
        char target_buf[512];
        memset(target_buf,0,512);
-       //char* domain  = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_get_config_context() ); /* just the first for now */
-       //char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_get_config_context() );
 
        osrfStringArray* arr = osrfNewStringArray(8);
        osrfConfigGetValueList(NULL, arr, "/domains/domain");
        char* domain = osrfStringArrayGetString(arr, 0);
        char* router_name = osrfConfigGetValue(NULL, "/router_name");
-       osrfStringArrayFree(arr);
        
        sprintf( target_buf, "%s@%s/%s",  router_name, domain, remote_service );
+       osrfStringArrayFree(arr);
        //free(domain);
        free(router_name);
 
@@ -376,7 +374,7 @@ int osrf_app_session_make_req(
        }
 
        osrf_app_request* req = _osrf_app_request_init( session, req_msg );
-       if(!_osrf_app_session_send( session, req_msg ) ) {
+       if(_osrf_app_session_send( session, req_msg ) ) {
                warning_handler( "Error sending request message [%d]", session->thread_trace );
                return -1;
        }
@@ -541,7 +539,7 @@ int osrf_app_session_connect(osrf_app_session* session){
        session->state = OSRF_SESSION_CONNECTING;
        int ret = _osrf_app_session_send( session, con_msg );
        osrf_message_free(con_msg);
-       if(!ret)        return 0;
+       if(ret) return 0;
 
        time_t start = time(NULL);      
        time_t remaining = (time_t) timeout;
diff --git a/src/libstack/osrf_hash.c b/src/libstack/osrf_hash.c
new file mode 100644 (file)
index 0000000..819b979
--- /dev/null
@@ -0,0 +1,173 @@
+#include "osrf_hash.h"
+
+osrfHash* osrfNewHash() {
+       osrfHash* hash = safe_malloc(sizeof(osrfHash));
+       hash->hash = (Pvoid_t) NULL;
+       hash->freeItem = NULL;
+       return hash;
+}
+
+void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... ) {
+       if(!(hash && item && key )) return NULL;
+
+       Word_t* value;
+       VA_LIST_TO_STRING(key);
+       uint8_t idx[strlen(VA_BUF) + 1];
+       strcpy( idx, VA_BUF );
+
+       void* olditem = osrfHashRemove( hash, VA_BUF );
+
+       JSLI(value, hash->hash, idx);
+       if(value) *value = (Word_t) item;
+       return olditem;
+       
+}
+
+void* osrfHashRemove( osrfHash* hash, const char* key, ... ) {
+       if(!(hash && key )) return NULL;
+
+       VA_LIST_TO_STRING(key);
+
+       Word_t* value;
+       uint8_t idx[strlen(VA_BUF) + 1];
+       strcpy( idx, VA_BUF );
+       void* item = NULL;
+       int retcode;
+
+       JSLG( value, hash->hash,  idx);
+
+       if( value ) {
+               item = (void*) *value;
+               if(item) {
+                       if( hash->freeItem ) {
+                               hash->freeItem( (char*) idx, item ); 
+                               item = NULL;
+                       }
+               }
+       }
+
+
+       JSLD( retcode, hash->hash, idx );
+
+       return item;
+}
+
+
+void* osrfHashGet( osrfHash* hash, const char* key, ... ) {
+       if(!(hash && key )) return NULL;
+
+       VA_LIST_TO_STRING(key);
+
+       Word_t* value;
+       uint8_t idx[strlen(VA_BUF) + 1];
+       strcpy( idx, VA_BUF );
+
+       JSLG( value, hash->hash, idx );
+       if(value) return (void*) *value;
+       return NULL;
+}
+
+
+osrfStringArray* osrfHashKeys( osrfHash* hash ) {
+       if(!hash) return NULL;
+
+       Word_t* value;
+       uint8_t idx[OSRF_HASH_MAXKEY];
+       strcpy(idx, "");
+       char* key;
+       osrfStringArray* strings = osrfNewStringArray(8);
+
+       JSLF( value, hash->hash, idx );
+
+       while( value ) {
+               key = (char*) idx;
+               osrfStringArrayAdd( strings, key );
+               JSLN( value, hash->hash, idx );
+       }
+
+       return strings;
+}
+
+
+unsigned long osrfHashGetCount( osrfHash* hash ) {
+       if(!hash) return -1;
+
+       Word_t* value;
+       unsigned long count = 0;
+       uint8_t idx[OSRF_HASH_MAXKEY];
+
+       strcpy( (char*) idx, "");
+       JSLF(value, hash->hash, idx);
+
+       while(value) {
+               count++;
+               JSLN( value, hash->hash, idx );
+       }
+
+       return count;
+}
+
+void osrfHashFree( osrfHash* hash ) {
+       if(!hash) return;
+
+       int i;
+       osrfStringArray* keys = osrfHashKeys( hash );
+
+       for( i = 0; i != keys->size; i++ )  {
+               char* key = (char*) osrfStringArrayGetString( keys, i );
+               osrfHashRemove( hash, key );
+       }
+
+       osrfStringArrayFree(keys);
+       free(hash);
+}
+
+
+
+osrfHashIterator* osrfNewHashIterator( osrfHash* hash ) {
+       if(!hash) return NULL;
+       osrfHashIterator* itr = safe_malloc(sizeof(osrfHashIterator));
+       itr->hash = hash;
+       itr->current = NULL;
+       return itr;
+}
+
+void* osrfHashIteratorNext( osrfHashIterator* itr ) {
+       if(!(itr && itr->hash)) return NULL;
+
+       Word_t* value;
+       uint8_t idx[OSRF_HASH_MAXKEY];
+
+       if( itr->current == NULL ) { /* get the first item in the list */
+               strcpy(idx, "");
+               JSLF( value, itr->hash->hash, idx );
+
+       } else {
+               strcpy(idx, itr->current);
+               JSLN( value, itr->hash->hash, idx );
+       }
+
+       if(value) {
+               free(itr->current);
+               itr->current = strdup((char*) idx);
+               return (void*) *value;
+       }
+
+       return NULL;
+
+}
+
+void osrfHashIteratorFree( osrfHashIterator* itr ) {
+       if(!itr) return;
+       free(itr->current);
+       free(itr);
+}
+
+void osrfHashIteratorReset( osrfHashIterator* itr ) {
+       if(!itr) return;
+       free(itr->current);
+       itr->current = NULL;
+}
+
+
+
diff --git a/src/libstack/osrf_hash.h b/src/libstack/osrf_hash.h
new file mode 100644 (file)
index 0000000..abaacc5
--- /dev/null
@@ -0,0 +1,83 @@
+#include <Judy.h>
+#include "opensrf/utils.h"
+#include "opensrf/string_array.h"
+
+#define OSRF_HASH_MAXKEY 256
+
+struct __osrfHashStruct {
+       Pvoid_t hash;                                                   /* the hash */
+       void (*freeItem) (char* key, void* item);       /* callback for freeing stored items */
+};
+typedef struct __osrfHashStruct osrfHash;
+
+
+struct __osrfHashIteratorStruct {
+       char* current;
+       osrfHash* hash;
+};
+typedef struct __osrfHashIteratorStruct osrfHashIterator;
+
+/**
+  Allocates a new hash object
+  */
+osrfHash* osrfNewHash();
+
+/**
+  Sets the given key with the given item
+  if "freeItem" is defined and an item already exists at the given location, 
+  then old item is freed and the new item is put into place.
+  if "freeItem" is not defined and an item already exists, the old item
+  is returned.
+  @return The old item if exists and there is no 'freeItem', returns NULL
+  otherwise
+  */
+void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... );
+
+/**
+  Removes an item from the hash.
+  if 'freeItem' is defined it is used and NULL is returned,
+  else the freed item is returned
+  */
+void* osrfHashRemove( osrfHash* hash, const char* key, ... );
+
+void* osrfHashGet( osrfHash* hash, const char* key, ... );
+
+
+/**
+  @return A list of strings representing the keys of the hash. 
+  caller is responsible for freeing the returned string array 
+  with osrfStringArrayFree();
+  */
+osrfStringArray* osrfHashKeys( osrfHash* hash );
+
+/**
+  Frees a hash
+  */
+void osrfHashFree( osrfHash* hash );
+
+/**
+  @return The number of items in the hash
+  */
+unsigned long osrfHashGetCount( osrfHash* hash );
+
+
+
+
+/**
+  Creates a new list iterator with the given list
+  */
+osrfHashIterator* osrfNewHashIterator( osrfHash* hash );
+
+/**
+  Returns the next non-NULL item in the list, return NULL when
+  the end of the list has been reached
+  */
+void* osrfHashIteratorNext( osrfHashIterator* itr );
+
+/**
+  Deallocates the given list
+  */
+void osrfHashIteratorFree( osrfHashIterator* itr );
+
+void osrfHashIteratorReset( osrfHashIterator* itr );
+
diff --git a/src/libstack/osrf_list.c b/src/libstack/osrf_list.c
new file mode 100644 (file)
index 0000000..a3d673e
--- /dev/null
@@ -0,0 +1,164 @@
+#include "osrf_list.h"
+
+
+osrfList* osrfNewList() {
+       osrfList* list = safe_malloc(sizeof(osrfList));
+       list->list = (Pvoid_t) NULL;
+       list->size = 0;
+       list->freeItem = NULL;
+       return list;
+}
+
+
+int osrfListPush( osrfList* list, void* item ) {
+       if(!(list && item)) return -1;
+       Word_t* value;
+       unsigned long index = -1;
+       JLL(value, list->list, index );
+       osrfListSet( list, item, index+1 );
+       return 0;
+}
+
+
+void* osrfListSet( osrfList* list, void* item, unsigned long position ) {
+       if(!list || position < 0) return NULL;
+
+       Word_t* value;
+       void* olditem = osrfListRemove( list, position );
+
+       JLI( value, list->list, position ); 
+       *value = (Word_t) item;
+       __osrfListSetSize( list );
+
+       return olditem;
+}
+
+
+void* osrfListGetIndex( osrfList* list, unsigned long position ) {
+       if(!list) return NULL;
+
+       Word_t* value;
+       JLG( value, list->list, position );
+       if(value) return (void*) *value;
+       return NULL;
+}
+
+void osrfListFree( osrfList* list ) {
+       if(!list) return;
+
+       Word_t* value;
+       unsigned long index = -1;
+       JLL(value, list->list, index );
+       int retcode;
+
+       while (value != NULL) {
+               JLD(retcode, list->list, index);
+
+               if(list->freeItem) {
+                       list->freeItem( (void*) *value );
+                       *value = (Word_t) NULL;
+               }
+
+               JLP(value, list->list, index);
+       }               
+
+       free(list);
+}
+
+void* osrfListRemove( osrfList* list, int position ) {
+       if(!list) return NULL;
+
+       int retcode;
+       Word_t* value;
+       JLG( value, list->list, position );
+       void* olditem = NULL;
+
+       if( value ) {
+
+               olditem = (void*) *value;
+               if( olditem ) {
+                       JLD(retcode, list->list, position );
+                       if(retcode == 1) {
+                               if(list->freeItem) {
+                                       list->freeItem( olditem );
+                                       olditem = NULL;
+                               }
+                               __osrfListSetSize( list );
+                       }
+               }
+       }
+
+       return olditem;
+}
+
+
+int osrfListFind( osrfList* list, void* addr ) {
+       if(!(list && addr)) return -1;
+
+       Word_t* value;
+       unsigned long index = -1;
+       JLL(value, list->list, index );
+
+       while (value != NULL) {
+               if( (void*) *value == addr )
+                       return index;
+               JLP(value, list->list, index);
+       }
+
+       return -1;
+}
+
+
+
+void __osrfListSetSize( osrfList* list ) {
+       if(!list) return;
+
+       Word_t* value;
+       unsigned long index = -1;
+       JLL(value, list->list, index );
+       list->size = index + 1;
+}
+
+
+unsigned long osrfListGetCount( osrfList* list ) {
+       if(!list) return -1;
+       unsigned long retcode = -1;
+       JLC( retcode, list->list, 0, -1 );
+       return retcode;
+}
+
+
+osrfListIterator* osrfNewListIterator( osrfList* list ) {
+       if(!list) return NULL;
+       osrfListIterator* itr = safe_malloc(sizeof(osrfListIterator));
+       itr->list = list;
+       itr->current = 0;
+       return itr;
+}
+
+void* osrfListIteratorNext( osrfListIterator* itr ) {
+       if(!(itr && itr->list)) return NULL;
+
+       Word_t* value;
+       if(itr->current >= itr->list->size) return NULL;
+       JLF( value, itr->list->list, itr->current );
+       if(value) {
+               itr->current++;
+               return (void*) *value;
+       }
+       return NULL;
+}
+
+void osrfListIteratorFree( osrfListIterator* itr ) {
+       if(!itr) return;
+       free(itr);
+}
+
+
+
+void osrfListIteratorReset( osrfListIterator* itr ) {
+       if(!itr) return;
+       itr->current = 0;
+}
+
+
diff --git a/src/libstack/osrf_list.h b/src/libstack/osrf_list.h
new file mode 100644 (file)
index 0000000..9486cd0
--- /dev/null
@@ -0,0 +1,116 @@
+#include <stdio.h>
+#include "opensrf/utils.h"
+#include <Judy.h>
+
+/**
+  Items are stored as void*'s so it's up to the user to
+  manage the data wisely.  Also, if the 'freeItem' callback is defined for the list,
+  then, it will be used on any item that needs to be freed, so don't mix data
+  types in the list if you want magic freeing */
+
+struct __osrfListStruct {
+       Pvoid_t list;                                                   /* the list */
+       int size;                                                               /* how many items in the list including NULL items between non-NULL items */    
+       void (*freeItem) (void* item);  /* callback for freeing stored items */
+};
+typedef struct __osrfListStruct osrfList;
+
+
+struct __osrfListIteratorStruct {
+       osrfList* list;
+       unsigned long current;
+};
+typedef struct __osrfListIteratorStruct osrfListIterator;
+
+
+/**
+  Creates a new list iterator with the given list
+  */
+osrfListIterator* osrfNewListIterator( osrfList* list );
+
+/**
+  Returns the next non-NULL item in the list, return NULL when
+  the end of the list has been reached
+  */
+void* osrfListIteratorNext( osrfListIterator* itr );
+
+/**
+  Deallocates the given list
+  */
+void osrfListIteratorFree( osrfListIterator* itr );
+
+void osrfListIteratorReset( osrfListIterator* itr );
+
+
+/**
+  Allocates a new list
+  @param compress If true, the list will compress empty slots on delete.  If item positionality
+  is not important, then using this feature is reccomended to keep the list from growing indefinitely.
+  if item positionality is not important.
+  @return The allocated list
+  */
+osrfList* osrfNewList();
+
+/**
+  Pushes an item onto the end of the list.  This always finds the highest index
+  in the list and pushes the new item into the list after it.
+  @param list The list
+  @param item The item to push
+  @return 0 on success, -1 on failure
+  */
+int osrfListPush( osrfList* list, void* item );
+
+/**
+  Puts the given item into the list at the specified position.  If there
+  is already an item at the given position and the list has it's 
+  "freeItem" function defined, then it will be used to free said item.
+  If no 'freeItem' callback is defined, then the displaced item will
+  be returned;
+  @param list The list
+  @param item The item to put into the list
+  @param position The position to place the item in
+  @return NULL in successfully inserting the new item and freeing
+  any displaced items.  Returns the displaced item if no "freeItem"
+  callback is defined.
+       */
+void* osrfListSet( osrfList* list, void* item, unsigned long position );
+
+/**
+  Returns the item at the given position
+  @param list The list
+  @param postiont the position
+  */
+void* osrfListGetIndex( osrfList* list, unsigned long  position );
+
+/**
+  Frees the list and all list items (if the list has a "freeItem" function defined )
+  @param list The list
+  */
+void osrfListFree( osrfList* list );
+
+/**
+  Removes the list item at the given index
+  @param list The list
+  @param position The position of the item to remove
+  @return A pointer to the item removed if "freeItem" is not defined
+  for this list, returns NULL if it is.
+  */
+void* osrfListRemove( osrfList* list, int position );
+
+/**
+  Finds the list item whose void* is the same as the one passed in
+  @param list The list
+  @param addr The pointer connected to the list item we're to find
+  @return the index of the item, or -1 if the item was not found
+  */
+int osrfListFind( osrfList* list, void* addr );
+
+
+void __osrfListSetSize( osrfList* list );
+
+/**
+  @return The number of non-null items in the list
+  */
+unsigned long osrfListGetCount( osrfList* list );
+
+
index 4fb3955..0bdca70 100644 (file)
@@ -34,12 +34,14 @@ osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_s
 
        if( msg->is_error && ! msg->thread ) {
                warning_handler("!! Received jabber layer error for %s ... exiting\n", msg->sender );
+               message_free( msg );
                return NULL;
        }
 
        if(! msg->thread  && ! msg->is_error ) {
                warning_handler("Received a non-error message with no thread trace... dropping");
                message_free( msg );
+               return NULL;
        }
 
        osrf_app_session* session = osrf_app_session_find_session( msg->thread );
index 77e5021..d6c9e85 100644 (file)
@@ -144,7 +144,7 @@ int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, cha
        char* port                      = osrfConfigGetValue( NULL, "/port" );
        char* unixpath          = osrfConfigGetValue( NULL, "/unixpath" );
 
-       char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */
+       char* domain = strdup(osrfStringArrayGetString( arr, 0 )); /* just the first for now */
        osrfStringArrayFree(arr);
 
 
diff --git a/src/libstack/osrf_transgroup.c b/src/libstack/osrf_transgroup.c
new file mode 100644 (file)
index 0000000..40bded1
--- /dev/null
@@ -0,0 +1,263 @@
+#include "osrf_transgroup.h"
+#include <sys/select.h>
+
+
+osrfTransportGroupNode* osrfNewTransportGroupNode( 
+               char* domain, int port, char* username, char* password, char* resource ) {
+
+       if(!(domain && port && username && password && resource)) return NULL;
+
+       osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode));
+       node->domain    = strdup(domain);
+       node->port              = port;
+       node->username = strdup(username);
+       node->password = strdup(password);
+       node->domain    = strdup(domain);
+       node->active    = 0;
+       node->lastsent  = 0;
+       node->connection = client_init( domain, port, NULL, 0 );
+
+       return node;
+}
+
+
+osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count ) {
+       if(!nodes || !router || count < 1) return NULL;
+
+       osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup));
+       grp->currentNode                        = 0;
+       grp->router                                     = strdup(router);
+       grp->list                                       = osrfNewList(1);
+
+       int i;
+       for( i = 0; i != count; i++ ) osrfListPush( grp->list, nodes[i] );
+       return grp;
+}
+
+
+int osrfTransportGroupConnect( osrfTransportGroup* grp ) {
+       if(!grp) return 0;
+       int i;
+       int active = 0;
+       for( i = 0; i != grp->list->size; i++ ) {
+               osrfTransportGroupNode* node = osrfListGetIndex( grp->list, i );
+               if(client_connect( node->connection, node->username, 
+                                       node->password, node->resource, 10, AUTH_DIGEST )) {
+                       node->active = 1;
+                       node->lastsent = time(NULL);
+                       active++;
+               }
+       }
+       return active;
+}
+
+
+/*
+osrfTransportGroup* osrfNewTransportGroup( char* resource ) {
+
+       grp->username                           = osrfConfigGetValue( NULL, "/username" );
+       grp->password                           = osrfConfigGetValue( NULL, "/passwd" );
+       char* port                                      = osrfConfigGetValue( NULL, "/port" );
+       if(port) grp->port              = atoi(port);
+       grp->currentNode                        = 0;
+
+       if(!resource) resource = "client";
+       char* host = getenv("HOSTNAME");
+       if(!host) host = "localhost";
+       char* res = va_list_to_string( "osrf_%s_%s_%d", resource, host, getpid() ); 
+
+       int i;
+       osrfStringArray* arr = osrfNewStringArray(8); 
+       osrfConfigGetValueList(NULL, arr, "/domains/domain");
+
+       for( i = 0; i != arr->size; i++ ) {
+               char* domain = osrfStringArrayGetString( arr, i ); 
+               if(domain) {
+                       node->domain = strdup(domain);
+                       node->connection = client_init( domain, grp->port, NULL, 0 );
+                       if(client_connect( node->connection, grp->username, grp->password, res, 10, AUTH_DIGEST )) {
+                               node->active = 1;
+                               node->lastsent = time(NULL);
+                       }
+                       osrfListPush( grp->list, node );
+               }
+       }
+
+       free(res);
+       osrfStringArrayFree(arr);
+       return grp;
+}
+*/
+
+
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain ) {
+       if(!(grp && msg)) return -1;
+
+       char domain[256];
+       bzero(domain, 256);
+       jid_get_domain( msg->recipient, domain );
+
+       char msgrecip[254];
+       bzero(msgrecip, 254);
+       jid_get_username(msg->recipient, msgrecip);
+
+
+       osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+
+       if( strcmp( msgrecip, grp->router ) ) { /* not a top level router message */
+
+               if(node) {
+                       if( (client_send_message( node->connection, msg )) == 0 )
+                               return 0;
+                       else 
+                               return warning_handler("Error sending message to domain %s", domain );
+               }
+               return warning_handler("Transport group has no node for domain %s", domain );
+       }
+
+
+       /*
+       if( type == OSRF_SERVER_NODE )
+               return _osrfTGServerSend( grp, msgdom, msg );
+       if( type == OSRF_CLIENT_NODE )
+               return _osrfTGClientSend( grp, msgdom, msg );
+               */
+
+       return -1;
+}
+
+int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+
+       debug_handler("Transport group sending server message to domain %s", domain );
+
+       osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+       if(node) {
+               if( (client_send_message( node->connection, msg )) == 0 )
+                       return 0;
+               else 
+                       return warning_handler("Error sending server response to domain %s", domain );
+       }
+       return warning_handler("Transport group has no node for domain %s for server response", domain );
+}
+
+
+int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) {
+
+       debug_handler("Transport group sending client message to domain %s", domain );
+
+       /* first see if we have a node for the requested domain */
+       osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+       if(node && node->active) {
+               if( (client_send_message( node->connection, msg )) == 0 )
+                       return 0;
+               else
+                       node->active = 0;
+       }
+
+       /* if not (or it fails), try sending to the current domain */
+       node = osrfListGetIndex(grp->list, grp->currentNode);
+       if(node && node->active) {
+               if( (client_send_message( node->connection, msg )) == 0 )
+                       return 0;
+       }
+
+       /* start at the beginning and try them all ... */
+       grp->currentNode = 0;
+       while( grp->currentNode < grp->list->size ) {
+               if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+                       if( (client_send_message( node->connection, msg )) == 0 ) 
+                               return 1;
+                       else node->active = 0;
+               }
+       }
+       return -1;
+}
+
+static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) {
+       if(!(fdset && maxfd)) return 0;
+
+       struct timeval tv;
+       tv.tv_sec = timeout;
+       tv.tv_usec = 0;
+       int retval = 0;
+
+       if( timeout < 0 ) {
+               if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 ) 
+                       return 0;
+
+       } else {
+               if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 ) 
+                       return 0;
+       }
+
+       return retval;
+}
+
+
+transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) {
+       if(!(grp && grp->list)) return NULL;
+
+       int i;
+       int maxfd = 0;
+       osrfTransportGroupNode* node = NULL;
+       fd_set fdset;
+       FD_ZERO( &fdset );
+
+       for( i = 0; i != grp->list->size; i++ ) {
+               if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+                       int fd = node->connection->session->sock_id;
+                       if( fd < maxfd ) maxfd = fd;
+                       FD_SET( fd, &fdset );
+               }
+       }
+
+       if( __osrfTGWait( &fdset, maxfd, timeout ) ) {
+               for( i = 0; i != grp->list->size; i++ ) {
+                       if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) {
+                               int fd = node->connection->session->sock_id;
+                               if( FD_ISSET( fd, &fdset ) ) {
+                                       return client_recv( node->connection, 0 );
+                               }
+                       }
+               }
+       }
+
+       return NULL;
+}
+
+transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) {
+       if(!(grp && domain)) return NULL;
+
+       osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+       if(!node && node->connection && node->connection->session) return NULL;
+       int fd = node->connection->session->sock_id;
+
+       fd_set fdset;
+       FD_ZERO( &fdset );
+       FD_SET( fd, &fdset );
+
+       int active = __osrfTGWait( &fdset, fd, timeout );
+       if(active) return client_recv( node->connection, 0 );
+       
+       return NULL;
+}
+
+void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) {
+       if(!(grp && domain)) return;
+       osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain );
+       if(node) node->active = 0;
+}
+
+osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain ) {
+       if(!(grp && grp->list && domain)) return NULL;
+       int i = 0; 
+       osrfTransportGroupNode* node = NULL;
+
+       while( (node = (osrfTransportGroupNode*) osrfListGetIndex( grp->list, i++ )) ) 
+               if(!strcmp(node->domain, domain)) return node;
+       return NULL;
+}
+
+
+
+
diff --git a/src/libstack/osrf_transgroup.h b/src/libstack/osrf_transgroup.h
new file mode 100644 (file)
index 0000000..c9b6be1
--- /dev/null
@@ -0,0 +1,117 @@
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+#include "osrf_list.h"
+#include "osrfConfig.h"
+#include "opensrf/utils.h"
+#include <time.h>
+
+/**
+  Maintains a set of transport clients for redundancy
+  */
+
+//enum osrfTGType { OSRF_SERVER_NODE, OSRF_CLIENT_NODE };
+
+struct __osrfTransportGroupStruct {
+       osrfList* list; /* our lisit of nodes */
+       char* router;                                                   /* the login username of the router on this network */
+       int currentNode;        /* which node are we currently on.  Used for client failover and
+                                                               only gets updated on client messages where a server failed 
+                                                               and we need to move to the next server in the list */
+};
+typedef struct __osrfTransportGroupStruct osrfTransportGroup;
+
+
+struct __osrfTransportGroupNode {
+       transport_client* connection;           /* our connection to the network */
+       char* domain;                                                   /* the domain we're connected to */
+       char* username;                                         /* username used to connect to the group of servers */
+       char* password;                                         /* password used to connect to the group of servers */
+       char* resource;                                         /* the login resource */
+       int port;                                                               /* port used to connect to the group of servers */
+
+       int active;                                                             /* true if we're able to send data on this connection */
+       time_t lastsent;                                                /* the last time we sent a message */
+};
+typedef struct __osrfTransportGroupNode osrfTransportGroupNode;
+
+
+/**
+  Creates a new group node
+  @param domain The domain we're connecting to
+  @param port The port to connect on
+  @param username The login name
+  @param password The login password
+  @param resource The login resource
+  @return A new transport group node
+  */
+osrfTransportGroupNode* osrfNewTransportGroupNode( 
+               char* domain, int port, char* username, char* password, char* resource );
+
+
+/**
+  Allocates and initializes a new transport group.
+  The first node in the array is the default node for client connections.
+  @param router The router name shared accross the networks
+  @param nodes The nodes in the group.
+  */
+osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count );
+
+/**
+  Attempts to connect all of the nodes in this group.
+  @param grp The transport group
+  @return The number of nodes successfully connected
+  */
+int osrfTransportGroupConnect( osrfTransportGroup* grp );
+
+
+/**
+  Sends a transport message
+  If the message is destined for a domain that this group does not have a connection
+  for, then the message is sent out through the currently selected domain.
+  @param grp The transport group
+  @param type Whether this is a client request or a server response
+  @param msg The message to send 
+  @param newdomain A pre-allocated buffer in which to write the name of the 
+  new domain if a the expected domain could not be sent to.
+  @return 0 on normal successful send.  Returns 1 if the message was sent
+  to a new domain (note: this can only happen when type == OSRF_CLIENT_NODE)
+  Returns -1 if the message cannot be sent.  
+  */
+int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain );
+
+int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
+int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg );
+
+/**
+  Waits on all connections for inbound data.
+  @param grp The transport group
+  @param timeout How long to wait for data.  0 means check for data
+  but don't wait, a negative number means to wait indefinitely
+  @return The received message or NULL if the timeout occurred before a 
+  message was received 
+ */
+transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout );
+
+/**
+  Waits for data from a single domain
+  @param grp The transport group
+  @param domain The domain to wait for data on
+  @param timeout see osrfTransportGroupRecvAll
+  */
+transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout );
+
+/**
+  Tells the group that the connect to the last message sent to the provided
+  domain did not make it through;
+  @param grp The transport group
+  @param comain The failed domain
+  */
+void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain );
+
+
+/**
+  Finds a node in our list of nodes 
+  */
+osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain );
+
+
index 8d67ae1..e2b6cc4 100644 (file)
@@ -125,18 +125,16 @@ int session_wait( transport_session* session, int timeout ) {
 int session_send_msg( 
                transport_session* session, transport_message* msg ) {
 
-       if( ! session ) { return 0; }
+       if( ! session ) { return -1; }
 
        if( ! session->state_machine->connected ) {
                warning_handler("State machine is not connected in send_msg()");
-               return 0;
+               return -1;
        }
 
        message_prepare_xml( msg );
        //tcp_send( session->sock_obj, msg->msg_xml );
-       socket_send( session->sock_id, msg->msg_xml );
-
-       return 1;
+       return socket_send( session->sock_id, msg->msg_xml );
 
 }
 
index a0a3330..e156743 100644 (file)
@@ -4,13 +4,19 @@ LDLIBS        += -lxml2 -lopensrf -lobjson
 CFLAGS += -D_ROUTER
 
 all: opensrf_router 
+#osrf_router
 
 install: 
        cp opensrf_router $(BINDIR)
 
-opensrf_router:        router.o 
-       $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@
-router.o:      router.c router.h
+#opensrf_router:       router.o 
+#      $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@
+#router.o:     router.c router.h
+
+opensrf_router:        osrf_router.o osrf_router_main.o
+       $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) osrf_router.o osrf_router_main.o -o $@
+osrf_router.o: osrf_router.c osrf_router.h
+osrf_router_main.o: osrf_router_main.c 
 
 clean:
        /bin/rm -f *.o opensrf_router
diff --git a/src/router/osrf_router.c b/src/router/osrf_router.c
new file mode 100644 (file)
index 0000000..07609ce
--- /dev/null
@@ -0,0 +1,615 @@
+#include "osrf_router.h"
+
+#define ROUTER_SOCKFD connection->session->sock_id
+#define ROUTER_REGISTER "register"
+#define ROUTER_UNREGISTER "unregister"
+
+
+#define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list"
+
+osrfRouter* osrfNewRouter( 
+               char* domain, char* name, 
+               char* resource, char* password, int port, 
+               osrfStringArray* trustedClients, osrfStringArray* trustedServers ) {
+
+       if(!( domain && name && resource && password && port )) return NULL;
+
+       osrfRouter* router      = safe_malloc(sizeof(osrfRouter));
+       router->domain                  = strdup(domain);
+       router->name                    = strdup(name);
+       router->password                = strdup(password);
+       router->resource                = strdup(resource);
+       router->port                    = port;
+
+       router->trustedClients = trustedClients;
+       router->trustedServers = trustedServers;
+
+       router->classes = osrfNewHash(); 
+       router->classes->freeItem = &osrfRouterClassFree;
+
+       router->connection = client_init( domain, port, NULL, 0 );
+
+       return router;
+}
+
+
+
+int osrfRouterConnect( osrfRouter* router ) {
+       if(!router) return -1;
+       int ret = client_connect( router->connection, router->name, 
+                       router->password, router->resource, 10, AUTH_DIGEST );
+       if( ret == 0 ) return -1;
+       return 0;
+}
+
+
+void osrfRouterRun( osrfRouter* router ) {
+       if(!(router && router->classes)) return;
+
+       int routerfd = router->ROUTER_SOCKFD;
+       int selectret = 0;
+
+       while(1) {
+
+               fd_set set;
+               int maxfd = __osrfRouterFillFDSet( router, &set );
+               int numhandled = 0;
+
+               if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) {
+                       warning_handler("Top level select call failed with errno %d", errno);
+                       continue;
+               }
+
+               /* see if there is a top level router message */
+
+               if( FD_ISSET(routerfd, &set) ) {
+                       debug_handler("Top router socket is active: %d", routerfd );
+                       numhandled++;
+                       osrfRouterHandleIncoming( router );
+               }
+
+
+               /* now check each of the connected classes and see if they have data to route */
+               while( numhandled < selectret ) {
+
+                       osrfRouterClass* class;
+                       osrfHashIterator* itr = osrfNewHashIterator(router->classes);
+
+                       while( (class = osrfHashIteratorNext(itr)) ) {
+
+                               char* classname = itr->current;
+
+                               if( classname && (class = osrfRouterFindClass( router, classname )) ) {
+
+                                       debug_handler("Checking %s for activity...", classname );
+
+                                       int sockfd = class->ROUTER_SOCKFD;
+                                       if(FD_ISSET( sockfd, &set )) {
+                                               debug_handler("Socket is active: %d", sockfd );
+                                               numhandled++;
+                                               osrfRouterClassHandleIncoming( router, classname, class );
+                                       }
+                               }
+                       }
+
+                       osrfHashIteratorFree(itr);
+               }
+       }
+}
+
+
+void osrfRouterHandleIncoming( osrfRouter* router ) {
+       if(!router) return;
+
+       transport_message* msg = NULL;
+
+       if( (msg = client_recv( router->connection, 0 )) ) { 
+
+               if( msg->sender ) {
+
+                       /* if the sender is not a trusted server, drop the message */
+                       int len = strlen(msg->sender) + 1;
+                       char domain[len];
+                       bzero(domain, len);
+                       jid_get_domain( msg->sender, domain );
+
+                       if(osrfStringArrayContains( router->trustedServers, domain)) 
+                               osrfRouterHandleMessage( router, msg );
+                        else 
+                               warning_handler("Received message from un-trusted server domain %s", msg->sender);
+               }
+
+               message_free(msg);
+       }
+}
+
+int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) {
+       if(!(router && class)) return -1;
+
+       transport_message* msg;
+       debug_handler("osrfRouterClassHandleIncoming()");
+
+       if( (msg = client_recv( class->connection, 0 )) ) {
+
+               if( msg->sender ) {
+
+                       /* if the client is not from a trusted domain, drop the message */
+                       int len = strlen(msg->sender) + 1;
+                       char domain[len];
+                       bzero(domain, len);
+                       jid_get_domain( msg->sender, domain );
+
+                       if(osrfStringArrayContains( router->trustedClients, domain)) {
+
+                               transport_message* bouncedMessage = NULL;
+                               if( msg->is_error )  {
+
+                                       /* handle bounced message */
+                                       if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) ) 
+                                               return -1; /* we have no one to send the requested message to */
+
+                                       message_free( msg );
+                                       msg = bouncedMessage;
+                               }
+                               osrfRouterClassHandleMessage( router, class, msg );
+
+                       } else {
+                               warning_handler("Received client message from untrusted client domain %s", domain );
+                       }
+               }
+
+               message_free( msg );
+       }
+
+       return 0;
+}
+
+
+
+
+int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) {
+       if(!(router && msg)) return -1;
+
+       if( !msg->router_command || !strcmp(msg->router_command,"")) 
+               return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */
+
+       if(!msg->router_class) return -1;
+
+       osrfRouterClass* class = NULL;
+       if(!strcmp(msg->router_command, ROUTER_REGISTER)) {
+               class = osrfRouterFindClass( router, msg->router_class );
+
+               info_handler("Registering class %s", msg->router_class );
+
+               if(!class) class = osrfRouterAddClass( router, msg->router_class );
+
+               if(class) { 
+
+                       if( osrfRouterClassFindNode( class, msg->sender ) )
+                               return 0;
+                       else 
+                               osrfRouterClassAddNode( class, msg->sender );
+
+               } 
+
+       } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) {
+
+               if( msg->router_class && strcmp( msg->router_class, "") ) {
+                       info_handler("Unregistering router class %s", msg->router_class );
+                       osrfRouterClassRemoveNode( router, msg->router_class, msg->sender );
+               }
+       }
+
+       return 0;
+}
+
+
+
+osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) {
+       if(!(router && router->classes && classname)) return NULL;
+
+       osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass));
+       class->nodes = osrfNewHash();
+       class->itr = osrfNewHashIterator(class->nodes);
+       class->nodes->freeItem = &osrfRouterNodeFree;
+       class->router   = router;
+
+       class->connection = client_init( router->domain, router->port, NULL, 0 );
+
+       if(!client_connect( class->connection, router->name, 
+                       router->password, classname, 10, AUTH_DIGEST ) ) {
+               osrfRouterClassFree( classname, class );
+               return NULL;
+       }
+       
+       osrfHashSet( router->classes, class, classname );
+       return class;
+}
+
+
+int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) {
+       if(!(rclass && rclass->nodes && remoteId)) return -1;
+
+       info_handler("Adding router node for remote id %s", remoteId );
+
+       osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode));
+       node->count = 0;
+       node->lastMessage = NULL;
+       node->remoteId = strdup(remoteId);
+
+       osrfHashSet( rclass->nodes, node, remoteId );
+       return 0;
+}
+
+/* copy off the lastMessage, remove the offending node, send error if it's tht last node 
+       ? return NULL if it's the last node ?
+ */
+
+transport_message* osrfRouterClassHandleBounce( 
+               osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) {
+
+       debug_handler("osrfRouterClassHandleBounce()");
+
+       warning_handler("Received network layer error message from %s", msg->sender );
+       osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender );
+       transport_message* lastSent = NULL;
+
+       if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */
+
+               if( node->lastMessage ) {
+                       warning_handler("We lost the last node in the class, responding with error and removing...");
+       
+                       transport_message* error = message_init( 
+                               node->lastMessage->body, node->lastMessage->subject, 
+                               node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient );
+                       set_msg_error( error, "cancel", 501 );
+       
+                       /* send the error message back to the original sender */
+                       client_send_message( rclass->connection, error );
+                       message_free( error );
+               }
+       
+               return NULL;
+       
+       } else { 
+
+               if( node->lastMessage ) {
+                       debug_handler("Cloning lastMessage so next node can send it");
+                       lastSent = message_init( node->lastMessage->body,
+                               node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from );
+                       message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 );
+               }
+       }
+
+       /* remove the dead node */
+       osrfRouterClassRemoveNode( router, classname, msg->sender);
+       return lastSent;
+}
+
+
+/**
+  If we get a regular message, we send it to the next node in the list of nodes
+  if we get an error, it's a bounce back from a previous attempt.  We take the
+  body and thread from the last sent on the node that had the bounced message
+  and propogate them on to the new message being sent
+  */
+int osrfRouterClassHandleMessage( 
+               osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) {
+       if(!(router && rclass && msg)) return -1;
+
+       debug_handler("osrfRouterClassHandleMessage()");
+
+       osrfRouterNode* node = osrfHashIteratorNext( rclass->itr );
+       if(!node) {
+               osrfHashIteratorReset(rclass->itr);
+               node = osrfHashIteratorNext( rclass->itr );
+       }
+
+       if(node) {
+
+               transport_message* new_msg= message_init(       msg->body, 
+                               msg->subject, msg->thread, node->remoteId, msg->sender );
+               message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 );
+
+               info_handler( "Routing message:\nfrom: [%s]\nto: [%s]", 
+                               new_msg->router_from, new_msg->recipient );
+
+               message_free( node->lastMessage );
+               node->lastMessage = new_msg;
+
+               if ( client_send_message( rclass->connection, new_msg ) == 0 ) 
+                       node->count++;
+
+               else {
+                       message_prepare_xml(new_msg);
+                       warning_handler("Error sending message from %s to %s\n%s", 
+                                       new_msg->sender, new_msg->recipient, new_msg->msg_xml );
+               }
+
+       } 
+
+       return 0;
+}
+
+
+int osrfRouterRemoveClass( osrfRouter* router, char* classname ) {
+       if(!(router && router->classes && classname)) return -1;
+       info_handler("Removing router class %s", classname );
+       osrfHashRemove( router->classes, classname );
+       return 0;
+}
+
+
+int osrfRouterClassRemoveNode( 
+               osrfRouter* router, char* classname, char* remoteId ) {
+
+       if(!(router && router->classes && classname && remoteId)) return 0;
+
+       info_handler("Removing router node %s", remoteId );
+
+       osrfRouterClass* class = osrfRouterFindClass( router, classname );
+
+       if( class ) {
+
+               osrfHashRemove( class->nodes, remoteId );
+               if( osrfHashGetCount(class->nodes) == 0 ) {
+                       osrfRouterRemoveClass( router, classname );
+                       return 1;
+               }
+
+               return 0;
+       }
+
+       return -1;
+}
+
+
+void osrfRouterClassFree( char* classname, void* c ) {
+       if(!(classname && c)) return;
+       osrfRouterClass* rclass = (osrfRouterClass*) c;
+       client_disconnect( rclass->connection );        
+       client_free( rclass->connection );      
+
+       osrfHashIteratorReset( rclass->itr );
+       osrfRouterNode* node;
+
+       while( (node = osrfHashIteratorNext(rclass->itr)) ) 
+               osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId );
+
+       free(rclass);
+}
+
+
+void osrfRouterNodeFree( char* remoteId, void* n ) {
+       if(!n) return;
+       osrfRouterNode* node = (osrfRouterNode*) n;
+       free(node->remoteId);
+       message_free(node->lastMessage);
+       free(node);
+}
+
+
+void osrfRouterFree( osrfRouter* router ) {
+       if(!router) return;
+
+       free(router->domain);           
+       free(router->name);
+       free(router->resource);
+       free(router->password);
+
+       osrfStringArrayFree( router->trustedClients );
+       osrfStringArrayFree( router->trustedServers );
+
+       client_free( router->connection );
+       free(router);
+}
+
+
+
+osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) {
+       if(!( router && router->classes && classname )) return NULL;
+       return (osrfRouterClass*) osrfHashGet( router->classes, classname );
+}
+
+
+osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) {
+       if(!(rclass && remoteId))  return NULL;
+       return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId );
+}
+
+
+int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) {
+       if(!(router && router->classes && set)) return -1;
+
+       FD_ZERO(set);
+       int maxfd = router->ROUTER_SOCKFD;
+       FD_SET(maxfd, set);
+
+       int sockid;
+
+       osrfRouterClass* class = NULL;
+       osrfHashIterator* itr = osrfNewHashIterator(router->classes);
+
+       while( (class = osrfHashIteratorNext(itr)) ) {
+               char* classname = itr->current;
+
+               if( classname && (class = osrfRouterFindClass( router, classname )) ) {
+                       sockid = class->ROUTER_SOCKFD;
+       
+                       if( osrfUtilsCheckFileDescriptor( sockid ) ) {
+                               osrfRouterRemoveClass( router, classname );
+       
+                       } else {
+                               if( sockid > maxfd ) maxfd = sockid;
+                               FD_SET(sockid, set);
+                       }
+               }
+       }
+
+       osrfHashIteratorFree(itr);
+       return maxfd;
+}
+
+
+
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) {
+
+       int T = 32;
+       osrfMessage* arr[T];
+       memset(arr, 0, T );
+
+       int num_msgs = osrf_message_deserialize( msg->body, arr, T );
+       osrfMessage* omsg = NULL;
+
+       int i;
+       for( i = 0; i != num_msgs; i++ ) {
+
+               if( !(omsg = arr[i]) ) continue;
+
+               switch( omsg->m_type ) {
+
+                       case CONNECT:
+                               osrfRouterRespondConnect( router, msg, omsg );
+                               break;
+
+                       case REQUEST:
+                               osrfRouterProcessAppRequest( router, msg, omsg );
+                               break;
+
+                       default: break;
+               }
+
+               osrfMessageFree( omsg );
+       }
+
+       return 0;
+}
+
+int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+       if(!(router && msg && omsg)) return -1;
+
+       osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol );
+
+       debug_handler("router recevied a CONNECT message from %s", msg->sender );
+
+       osrf_message_set_status_info( 
+               success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK );
+
+       char* data      = osrf_message_serialize(success);
+
+       transport_message* return_m = message_init( 
+               data, "", msg->thread, msg->sender, "" );
+
+       client_send_message(router->connection, return_m);
+
+       free(data);
+       osrf_message_free(success);
+       message_free(return_m);
+
+       return 0;
+}
+
+
+
+int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+
+       if(!(router && msg && omsg && omsg->method_name)) return -1;
+
+       info_handler("Router received app request: %s", omsg->method_name );
+
+       jsonObject* jresponse = NULL;
+       if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) {
+
+               int i;
+               jresponse = jsonParseString("[]");
+
+               osrfStringArray* keys = osrfHashKeys( router->classes );
+               for( i = 0; i != keys->size; i++ )
+                       jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) );
+               osrfStringArrayFree(keys);
+
+
+       } else {
+
+               return osrfRouterHandleMethodNFound( router, msg, omsg );
+       }
+
+
+       osrfRouterHandleAppResponse( router, msg, omsg, jresponse );
+       jsonObjectFree(jresponse); 
+
+       return 0;
+
+}
+
+
+
+int osrfRouterHandleMethodNFound( 
+               osrfRouter* router, transport_message* msg, osrfMessage* omsg ) {
+
+       osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1);
+               osrf_message_set_status_info( err, 
+                               "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND );
+
+               char* data =  osrf_message_serialize(err);
+
+               transport_message* tresponse = message_init(
+                               data, "", msg->thread, msg->sender, msg->recipient );
+
+               client_send_message(router->connection, tresponse );
+
+               free(data);
+               osrf_message_free( err );
+               message_free(tresponse);
+               return 0;
+}
+
+
+
+int osrfRouterHandleAppResponse( osrfRouter* router, 
+       transport_message* msg, osrfMessage* omsg, jsonObject* response ) {
+
+       if( response ) { /* send the response message */
+
+               osrfMessage* oresponse = osrf_message_init(
+                               RESULT, omsg->thread_trace, omsg->protocol );
+       
+               char* json = jsonObjectToJSON(response);
+               osrf_message_set_result_content( oresponse, json);
+       
+               char* data =  osrf_message_serialize(oresponse);
+               debug_handler( "Responding to client app request with data: \n%s\n", data );
+
+               transport_message* tresponse = message_init(
+                               data, "", msg->thread, msg->sender, msg->recipient );
+       
+               client_send_message(router->connection, tresponse );
+
+               osrfMessageFree(oresponse); 
+               message_free(tresponse);
+               free(json);
+               free(data);
+       }
+
+
+       /* now send the 'request complete' message */
+       osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1);
+       osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE );
+
+       char* statusdata = osrf_message_serialize(status);
+
+       transport_message* sresponse = message_init(
+                       statusdata, "", msg->thread, msg->sender, msg->recipient );
+       client_send_message(router->connection, sresponse );
+
+
+       free(statusdata);
+       osrfMessageFree(status);
+       message_free(sresponse);
+
+       return 0;
+}
+
+
+
+
diff --git a/src/router/osrf_router.h b/src/router/osrf_router.h
new file mode 100644 (file)
index 0000000..ebe2897
--- /dev/null
@@ -0,0 +1,221 @@
+#include <sys/select.h>
+#include <signal.h>
+#include <stdio.h>
+
+#include "opensrf/utils.h"
+#include "opensrf/osrf_list.h"
+#include "opensrf/osrf_hash.h"
+
+#include "opensrf/string_array.h"
+#include "opensrf/transport_client.h"
+#include "opensrf/transport_message.h"
+
+#include "opensrf/osrf_message.h"
+
+
+
+/* a router maintains a list of server classes */
+struct __osrfRouterStruct {
+
+       osrfHash* classes;      /* our list of server classes */
+       char* domain;                   /* our login domain */
+       char* name;
+       char* resource;
+       char* password;
+       int port;
+
+       osrfStringArray* trustedClients;
+       osrfStringArray* trustedServers;
+
+       transport_client* connection;
+};
+
+typedef struct __osrfRouterStruct osrfRouter;
+
+
+/* a class maintains a set of server nodes */
+struct __osrfRouterClassStruct {
+       osrfRouter* router; /* our router handle */
+       osrfHashIterator* itr;
+       osrfHash* nodes;
+       transport_client* connection;
+};
+typedef struct __osrfRouterClassStruct osrfRouterClass;
+
+/* represents a link to a single server's inbound connection */
+struct __osrfRouterNodeStruct {
+       char* remoteId; /* send message to me via this login */
+       int count;                      /* how many message have been sent to this node */
+       transport_message* lastMessage;
+};
+typedef struct __osrfRouterNodeStruct osrfRouterNode;
+
+/**
+  Allocates a new router.  
+  @param domain The jabber domain to connect to
+  @param name The login name for the router
+  @param resource The login resource for the router
+  @param password The login password for the new router
+  @param port The port to connect to the jabber server on
+  @param trustedClients The array of client domains that we allow to send requests through us
+  @param trustedServers The array of server domains that we allow to register, etc. with ust.
+  @return The allocated router or NULL on memory error
+  */
+osrfRouter* osrfNewRouter( char* domain, char* name, char* resource, 
+       char* password, int port, osrfStringArray* trustedClients, osrfStringArray* trustedServers );
+
+/**
+  Connects the given router to the network
+  */
+int osrfRouterConnect( osrfRouter* router );
+
+/**
+  Waits for incoming data to route
+  If this function returns, then the router's connection to the jabber server
+  has failed.
+  */
+void osrfRouterRun( osrfRouter* router );
+
+
+/**
+  Allocates and adds a new router class handler to the router's list of handlers.
+  Also connects the class handler to the network at <routername>@domain/<classname>
+  @param router The current router instance
+  @param classname The name of the class this node handles.
+  @return 0 on success, -1 on connection error.
+  */
+osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname );
+
+/**
+  Adds a new server node to the given class.
+  @param rclass The Router class to add the node to
+  @param remoteId The remote login of this node
+  @return 0 on success, -1 on generic error
+  */
+int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId );
+
+
+/**
+  Handles top level router messages
+  @return 0 on success
+  */
+int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg );
+
+
+/**
+  Handles class level requests
+  @return 0 on success
+  */
+int osrfRouterClassHandleMessage( osrfRouter* router, 
+               osrfRouterClass* rclass, transport_message* msg );
+
+/**
+  Removes a given class from the router, freeing as it goes
+  */
+int osrfRouterRemoveClass( osrfRouter* router, char* classname );
+
+/**
+  Removes the given node from the class.  Also, if this is that last node in the set,
+  removes the class from the router 
+  @return 0 on successful removal with no class removal
+  @return 1 on successful remove with class removal
+  @return -1 error on removal
+ */
+int osrfRouterClassRemoveNode( osrfRouter* router, char* classname, char* remoteId );
+
+/**
+  Frees a router class object
+  Takes a void* since it is freed by the hash code
+  */
+void osrfRouterClassFree( char* classname, void* rclass );
+
+/**
+  Frees a router node object 
+  Takes a void* since it is freed by the list code
+  */
+void osrfRouterNodeFree( char* remoteId, void* node );
+
+
+/**
+  Frees a router
+  */
+void osrfRouterFree( osrfRouter* router );
+
+/**
+  Finds the class associated with the given class name in the list of classes
+  */
+osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname );
+
+/**
+  Finds the router node within this class with the given remote id 
+  */
+osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId );
+
+
+/**
+  Clears and populates the provided fd_set* with file descriptors
+  from the router's top level connection as well as each of the
+  router class connections
+  @return The largest file descriptor found in the filling process
+  */
+int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set );
+
+
+
+/**
+  Utility method for handling incoming requests to the router
+  and making sure the sender is allowed.
+  */
+void osrfRouterHandleIncoming( osrfRouter* router );
+
+/**
+       Utility method for handling incoming requests to a router class,
+       makes sure sender is a trusted client
+       */
+int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname,  osrfRouterClass* class );
+
+/* handles case where router node is not longer reachable.  copies over the
+       data from the last sent message and returns a newly crafted suitable for treating
+       as a newly inconing message.  Removes the dead node and If there are no more
+       nodes to send the new message to, returns NULL.
+       */
+transport_message* osrfRouterClassHandleBounce(
+               osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg );
+
+
+
+/**
+  handles messages that don't have a 'router_command' set.  They are assumed to
+  be app request messages 
+  */
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg );
+
+
+/**
+  Handles connects, disconnects, etc.
+  */
+int osrfRouterHandeStatusMessage( osrfRouter* router, transport_message* msg );
+
+
+/**
+  Handles REQUEST messages 
+  */
+int osrfRouterHandleRequestMessage( osrfRouter* router, transport_message* msg );
+
+
+
+int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg );
+
+
+int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
+
+
+int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
+int osrfRouterHandleAppResponse( osrfRouter* router, 
+               transport_message* msg, osrfMessage* omsg, jsonObject* response );
+
+
+int osrfRouterHandleMethodNFound( osrfRouter* router, transport_message* msg, osrfMessage* omsg );
+
diff --git a/src/router/osrf_router_main.c b/src/router/osrf_router_main.c
new file mode 100644 (file)
index 0000000..c7fd903
--- /dev/null
@@ -0,0 +1,104 @@
+#include "osrf_router.h"
+#include "opensrf/osrfConfig.h"
+#include "opensrf/utils.h"
+#include "opensrf/logging.h"
+#include <signal.h>
+
+osrfRouter* __osrfRouter = NULL;
+
+void routerSignalHandler( int signal ) {
+       warning_handler("Received signal [%d], cleaning up...", signal );
+       osrfConfigCleanup();
+       osrfRouterFree(__osrfRouter);
+       log_free();
+}
+
+static int __setupRouter( char* config, char* context );
+
+
+int main( int argc, char* argv[] ) {
+
+       if( argc < 3 ) {
+               fatal_handler( "Usage: %s <path_to_config_file> <config_context>", argv[0] );
+               exit(0);
+       }
+
+       char* config = strdup( argv[1] );
+       char* context = strdup( argv[2] );
+       init_proc_title( argc, argv );
+       set_proc_title( "OpenSRF Router" );
+
+       return __setupRouter( config, context );
+       free(config);
+       free(context);
+
+}
+
+int __setupRouter( char* config, char* context ) {
+
+       fprintf(stderr, "Launching router with config %s and config context %s", config, context );
+       osrfConfig* cfg = osrfConfigInit( config, context );
+       osrfConfigSetDefaultConfig(cfg);
+
+
+       char* server                    = osrfConfigGetValue(NULL, "/transport/server");
+       char* port                              = osrfConfigGetValue(NULL, "/transport/port");
+       char* username                  = osrfConfigGetValue(NULL, "/transport/username");
+       char* password                  = osrfConfigGetValue(NULL, "/transport/password");
+       char* resource                  = osrfConfigGetValue(NULL, "/transport/resource");
+
+       /* set up the logger */
+       char* level = osrfConfigGetValue(NULL, "/loglevel");
+       char* log_file = osrfConfigGetValue(NULL, "/logfile");
+
+       int llevel = 1;
+       if(level) llevel = atoi(level);
+
+       if(!log_init( llevel, log_file )) 
+               fprintf(stderr, "Unable to init logging, going to stderr...\n" );
+
+       free(level);
+       free(log_file);
+
+       info_handler( "Router connecting as: server: %s port: %s "
+                       "user: %s resource: %s", server, port, username, resource );
+
+       int iport = 0;
+       if(port)        iport = atoi( port );
+
+       osrfStringArray* tclients = osrfNewStringArray(4);
+       osrfStringArray* tservers = osrfNewStringArray(4);
+       osrfConfigGetValueList(NULL, tservers, "/trusted_domains/server" );
+       osrfConfigGetValueList(NULL, tclients, "/trusted_domains/client" );
+
+       int i;
+       for( i = 0; i != tservers->size; i++ ) 
+               info_handler( "Router adding trusted server: %s", osrfStringArrayGetString( tservers, i ) );
+
+       for( i = 0; i != tclients->size; i++ ) 
+               info_handler( "Router adding trusted client: %s", osrfStringArrayGetString( tclients, i ) );
+
+       osrfRouter* router = osrfNewRouter( server, 
+                       username, resource, password, iport, tclients, tservers );
+       
+       signal(SIGHUP,routerSignalHandler);
+       signal(SIGINT,routerSignalHandler);
+       signal(SIGTERM,routerSignalHandler);
+
+       if( (osrfRouterConnect(router)) != 0 ) {
+               fprintf(stderr, "!!!! Unable to connect router to jabber server %s... exiting", server );
+               return -1;
+       }
+
+       free(server); free(port); 
+       free(username); free(password); 
+
+       __osrfRouter = router;
+       daemonize();
+       osrfRouterRun( router );
+
+       return -1;
+
+}
+
+
index 1428a22..ffae2fc 100644 (file)
@@ -64,8 +64,24 @@ void osrfStringArrayFree(osrfStringArray* arr) {
 void string_array_destroy(string_array* arr) {
        if(arr) {
                int i = 0;
-               while( i++ < arr->size ) free(arr->array[i]);
+               while( i < arr->size ) free(arr->array[i++]);
                free(arr->array);
                free(arr);
        }
 }
+
+
+int osrfStringArrayContains( osrfStringArray* arr, char* string ) {
+       if(!(arr && string)) return 0;
+       
+       int i;
+       for( i = 0; i != arr->size; i++ ) {
+               char* str = osrfStringArrayGetString(arr, i);
+               if(str) {
+                       if(!strcmp(str, string)) return 1;
+               }
+       }
+
+       return 0;
+}
+
index d0867aa..3a7efbd 100644 (file)
@@ -26,6 +26,9 @@ void osrfStringArrayAdd(osrfStringArray*, char* string);
 char* string_array_get_string(osrfStringArray* arr, int index);
 char* osrfStringArrayGetString(osrfStringArray* arr, int index);
 
+/* returns true if this array contains the given string */
+int osrfStringArrayContains( osrfStringArray* arr, char* string );
+
 
 void string_array_destroy(osrfStringArray*);
 void osrfStringArrayFree(osrfStringArray*);
index 9c42578..cc1e284 100644 (file)
@@ -15,6 +15,7 @@ GNU General Public License for more details.
 */
 
 #include "utils.h"
+#include <errno.h>
 
 inline void* safe_malloc( int size ) {
        void* ptr = (void*) malloc( size );
@@ -427,4 +428,20 @@ char* md5sum( char* text, ... ) {
 
 }
 
+int osrfUtilsCheckFileDescriptor( int fd ) {
+
+       fd_set tmpset;
+       FD_ZERO(&tmpset);
+       FD_SET(fd, &tmpset);
+
+       struct timeval tv;
+       tv.tv_sec = 0;
+       tv.tv_usec = 0;
+
+       if( select(fd + 1, &tmpset, NULL, NULL, &tv) == -1 ) {
+               if( errno == EBADF ) return -1;
+       }
+
+       return 0;
+}
 
index 1737c29..999d608 100644 (file)
@@ -184,4 +184,11 @@ char* file_to_string(const char* filename);
 char* md5sum( char* text, ... );
 
 
+/**
+  Checks the validity of the file descriptor
+  returns -1 if the file descriptor is invalid
+  returns 0 if the descriptor is OK
+  */
+int osrfUtilsCheckFileDescriptor( int fd );
+
 #endif