From: erickson Date: Mon, 3 Oct 2005 22:19:41 +0000 (+0000) Subject: added list and hash code based on libJudy X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=16ae416fa0e05beea7f6b028c5139e4a282cb6cb;p=opensrf%2Fbjwebb.git added list and hash code based on libJudy re-coded the router added preliminary transport_group code for client redundancy (far from functional) various twists and tweaks fixed memory error in string_array code update makefiles git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@541 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/src/Makefile b/src/Makefile index 7ed9737..864b0fc 100644 --- a/src/Makefile +++ b/src/Makefile @@ -28,6 +28,9 @@ OPENSRF_TARGETS = libtransport/transport_session.o \ libstack/osrf_application.o \ libstack/osrf_cache.o \ libstack/xml_utils.o \ + libstack/osrf_transgroup.o \ + libstack/osrf_list.o \ + libstack/osrf_hash.o \ libstack/osrf_log.o \ utils/socket_bundle.o \ utils/string_array.o \ @@ -50,6 +53,9 @@ OPENSRF_HEADERS = libtransport/transport_session.h \ libstack/osrf_cache.h \ libstack/xml_utils.h \ libstack/osrf_log.h \ + libstack/osrf_transgroup.h \ + libstack/osrf_list.h \ + libstack/osrf_hash.h \ utils/socket_bundle.h \ utils/string_array.h \ utils/utils.h \ @@ -74,7 +80,7 @@ libopensrf.so: objson/libobjson.so @echo stack make -C libstack @echo $@ - $(CC) -shared -W1 $(LDFLAGS) -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF) + $(CC) -shared -W1 $(LDFLAGS) -lJudy -lmemcache -lobjson $(OPENSRF_TARGETS) -o $(TMPDIR)/$(LIBOPENSRF) @echo apps make -C c-apps diff --git a/src/libstack/Makefile b/src/libstack/Makefile index b6b76de..8399594 100644 --- a/src/libstack/Makefile +++ b/src/libstack/Makefile @@ -3,7 +3,7 @@ # provided to any method is not at least as large as the 'argc' setting for the method CFLAGS += -DASSUME_STATELESS -DOSRF_LOG_PARAMS -DOSRF_STRICT_PARAMS -rdynamic -fno-strict-aliasing -LDLIBS += -lxml2 -lobjson -ldl -lmemcache +LDLIBS += -lxml2 -lobjson -ldl -lmemcache -LJudy TARGETS = osrf_message.o \ osrf_app_session.o \ @@ -15,6 +15,9 @@ TARGETS = osrf_message.o \ osrf_application.o \ osrf_cache.o \ osrf_log.o \ + osrf_transgroup.o \ + osrf_list.o \ + osrf_hash.o \ xml_utils.o HEADERS = osrf_message.h \ @@ -27,6 +30,9 @@ HEADERS = osrf_message.h \ osrf_application.h \ osrf_cache.h \ osrf_log.h \ + osrf_transgroup.h \ + osrf_list.h \ + osrf_hash.h \ xml_utils.h all: xml_utils.o $(TARGETS) copy @@ -49,6 +55,9 @@ osrfConfig.o: osrfConfig.c osrfConfig.h xml_utils.o osrf_application.o: osrf_application.c osrf_application.h osrf_cache.o: osrf_cache.c osrf_cache.h osrf_log.o: osrf_log.c osrf_log.h +osrf_list.o: osrf_list.c osrf_list.h +osrf_hash.o: osrf_hash.c osrf_hash.h + clean: /bin/rm -f *.o libopensrf_stack.so xml_utils.h xml_utils.c diff --git a/src/libstack/opensrf.c b/src/libstack/opensrf.c index 403a3d6..995fa84 100644 --- a/src/libstack/opensrf.c +++ b/src/libstack/opensrf.c @@ -1,7 +1,85 @@ #include "osrf_system.h" +#include "osrf_hash.h" +#include "osrf_list.h" + +//static void _free(void* i) { free(i); } +//static void _hfree(char* c, void* i) { free(i); } int main( int argc, char* argv[] ) { + /* + osrfHash* list = osrfNewHash(); + list->freeItem = _hfree; + + char* x = strdup("X"); + char* y = strdup("Y"); + char* z = strdup("Z"); + osrfHashSet( list, x, "test1" ); + osrfHashSet( list, y, "test2" ); + osrfHashSet( list, z, "test3" ); + + char* q = (char*) osrfHashGet( list, "test1" ); + printf( "%s\n", q ); + + q = (char*) osrfHashGet( list, "test2" ); + printf( "%s\n", q ); + + q = (char*) osrfHashGet( list, "test3" ); + printf( "%s\n", q ); + + osrfHashIterator* itr = osrfNewHashIterator(list); + char* val; + + while( (val = osrfHashIteratorNext(itr)) ) + printf("Iterated item: %s\n", val ); + + osrfHashIteratorReset(itr); + while( (val = osrfHashIteratorNext(itr)) ) + printf("Iterated item: %s\n", val ); + + printf( "Count: %lu\n", osrfHashGetCount(list)); + + osrfHashIteratorFree(itr); + + osrfHashFree(list); + + exit(1); + + osrfList* list = osrfNewList(); + list->freeItem = _free; + + char* x = strdup("X"); + char* y = strdup("Y"); + char* z = strdup("Z"); + osrfListSet( list, x, 0 ); + osrfListSet( list, y, 2 ); + osrfListSet( list, z, 4 ); + + char* q = (char*) osrfListGetIndex( list, 4 ); + printf( "%s\n", q ); + + osrfListIterator* itr = osrfNewListIterator( list ); + char* val; + + while( (val = osrfListIteratorNext(itr)) ) + printf("Found val: %s\n", val ); + + osrfListIteratorReset(itr); + printf("\n"); + while( (val = osrfListIteratorNext(itr)) ) + printf("Found val: %s\n", val ); + + osrfListIteratorFree(itr); + + printf( "Count: %lu\n", osrfListGetCount(list)); + + osrfListFree(list); + + exit(1); + */ + + + if( argc < 4 ) { fprintf(stderr, "Usage: %s \n", argv[0]); return 1; diff --git a/src/libstack/osrf_app_session.c b/src/libstack/osrf_app_session.c index 83a1644..66b105d 100644 --- a/src/libstack/osrf_app_session.c +++ b/src/libstack/osrf_app_session.c @@ -245,16 +245,14 @@ osrf_app_session* osrf_app_client_session_init( char* remote_service ) { char target_buf[512]; memset(target_buf,0,512); - //char* domain = config_value( "opensrf.bootstrap", "//%s/domains/domain1", osrf_get_config_context() ); /* just the first for now */ - //char* router_name = config_value( "opensrf.bootstrap", "//%s/router_name", osrf_get_config_context() ); osrfStringArray* arr = osrfNewStringArray(8); osrfConfigGetValueList(NULL, arr, "/domains/domain"); char* domain = osrfStringArrayGetString(arr, 0); char* router_name = osrfConfigGetValue(NULL, "/router_name"); - osrfStringArrayFree(arr); sprintf( target_buf, "%s@%s/%s", router_name, domain, remote_service ); + osrfStringArrayFree(arr); //free(domain); free(router_name); @@ -376,7 +374,7 @@ int osrf_app_session_make_req( } osrf_app_request* req = _osrf_app_request_init( session, req_msg ); - if(!_osrf_app_session_send( session, req_msg ) ) { + if(_osrf_app_session_send( session, req_msg ) ) { warning_handler( "Error sending request message [%d]", session->thread_trace ); return -1; } @@ -541,7 +539,7 @@ int osrf_app_session_connect(osrf_app_session* session){ session->state = OSRF_SESSION_CONNECTING; int ret = _osrf_app_session_send( session, con_msg ); osrf_message_free(con_msg); - if(!ret) return 0; + if(ret) return 0; time_t start = time(NULL); time_t remaining = (time_t) timeout; diff --git a/src/libstack/osrf_hash.c b/src/libstack/osrf_hash.c new file mode 100644 index 0000000..819b979 --- /dev/null +++ b/src/libstack/osrf_hash.c @@ -0,0 +1,173 @@ +#include "osrf_hash.h" + +osrfHash* osrfNewHash() { + osrfHash* hash = safe_malloc(sizeof(osrfHash)); + hash->hash = (Pvoid_t) NULL; + hash->freeItem = NULL; + return hash; +} + +void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... ) { + if(!(hash && item && key )) return NULL; + + Word_t* value; + VA_LIST_TO_STRING(key); + uint8_t idx[strlen(VA_BUF) + 1]; + strcpy( idx, VA_BUF ); + + void* olditem = osrfHashRemove( hash, VA_BUF ); + + JSLI(value, hash->hash, idx); + if(value) *value = (Word_t) item; + return olditem; + +} + +void* osrfHashRemove( osrfHash* hash, const char* key, ... ) { + if(!(hash && key )) return NULL; + + VA_LIST_TO_STRING(key); + + Word_t* value; + uint8_t idx[strlen(VA_BUF) + 1]; + strcpy( idx, VA_BUF ); + void* item = NULL; + int retcode; + + JSLG( value, hash->hash, idx); + + if( value ) { + item = (void*) *value; + if(item) { + if( hash->freeItem ) { + hash->freeItem( (char*) idx, item ); + item = NULL; + } + } + } + + + JSLD( retcode, hash->hash, idx ); + + return item; +} + + +void* osrfHashGet( osrfHash* hash, const char* key, ... ) { + if(!(hash && key )) return NULL; + + VA_LIST_TO_STRING(key); + + Word_t* value; + uint8_t idx[strlen(VA_BUF) + 1]; + strcpy( idx, VA_BUF ); + + JSLG( value, hash->hash, idx ); + if(value) return (void*) *value; + return NULL; +} + + +osrfStringArray* osrfHashKeys( osrfHash* hash ) { + if(!hash) return NULL; + + Word_t* value; + uint8_t idx[OSRF_HASH_MAXKEY]; + strcpy(idx, ""); + char* key; + osrfStringArray* strings = osrfNewStringArray(8); + + JSLF( value, hash->hash, idx ); + + while( value ) { + key = (char*) idx; + osrfStringArrayAdd( strings, key ); + JSLN( value, hash->hash, idx ); + } + + return strings; +} + + +unsigned long osrfHashGetCount( osrfHash* hash ) { + if(!hash) return -1; + + Word_t* value; + unsigned long count = 0; + uint8_t idx[OSRF_HASH_MAXKEY]; + + strcpy( (char*) idx, ""); + JSLF(value, hash->hash, idx); + + while(value) { + count++; + JSLN( value, hash->hash, idx ); + } + + return count; +} + +void osrfHashFree( osrfHash* hash ) { + if(!hash) return; + + int i; + osrfStringArray* keys = osrfHashKeys( hash ); + + for( i = 0; i != keys->size; i++ ) { + char* key = (char*) osrfStringArrayGetString( keys, i ); + osrfHashRemove( hash, key ); + } + + osrfStringArrayFree(keys); + free(hash); +} + + + +osrfHashIterator* osrfNewHashIterator( osrfHash* hash ) { + if(!hash) return NULL; + osrfHashIterator* itr = safe_malloc(sizeof(osrfHashIterator)); + itr->hash = hash; + itr->current = NULL; + return itr; +} + +void* osrfHashIteratorNext( osrfHashIterator* itr ) { + if(!(itr && itr->hash)) return NULL; + + Word_t* value; + uint8_t idx[OSRF_HASH_MAXKEY]; + + if( itr->current == NULL ) { /* get the first item in the list */ + strcpy(idx, ""); + JSLF( value, itr->hash->hash, idx ); + + } else { + strcpy(idx, itr->current); + JSLN( value, itr->hash->hash, idx ); + } + + if(value) { + free(itr->current); + itr->current = strdup((char*) idx); + return (void*) *value; + } + + return NULL; + +} + +void osrfHashIteratorFree( osrfHashIterator* itr ) { + if(!itr) return; + free(itr->current); + free(itr); +} + +void osrfHashIteratorReset( osrfHashIterator* itr ) { + if(!itr) return; + free(itr->current); + itr->current = NULL; +} + + + diff --git a/src/libstack/osrf_hash.h b/src/libstack/osrf_hash.h new file mode 100644 index 0000000..abaacc5 --- /dev/null +++ b/src/libstack/osrf_hash.h @@ -0,0 +1,83 @@ +#include +#include "opensrf/utils.h" +#include "opensrf/string_array.h" + +#define OSRF_HASH_MAXKEY 256 + +struct __osrfHashStruct { + Pvoid_t hash; /* the hash */ + void (*freeItem) (char* key, void* item); /* callback for freeing stored items */ +}; +typedef struct __osrfHashStruct osrfHash; + + +struct __osrfHashIteratorStruct { + char* current; + osrfHash* hash; +}; +typedef struct __osrfHashIteratorStruct osrfHashIterator; + +/** + Allocates a new hash object + */ +osrfHash* osrfNewHash(); + +/** + Sets the given key with the given item + if "freeItem" is defined and an item already exists at the given location, + then old item is freed and the new item is put into place. + if "freeItem" is not defined and an item already exists, the old item + is returned. + @return The old item if exists and there is no 'freeItem', returns NULL + otherwise + */ +void* osrfHashSet( osrfHash* hash, void* item, const char* key, ... ); + +/** + Removes an item from the hash. + if 'freeItem' is defined it is used and NULL is returned, + else the freed item is returned + */ +void* osrfHashRemove( osrfHash* hash, const char* key, ... ); + +void* osrfHashGet( osrfHash* hash, const char* key, ... ); + + +/** + @return A list of strings representing the keys of the hash. + caller is responsible for freeing the returned string array + with osrfStringArrayFree(); + */ +osrfStringArray* osrfHashKeys( osrfHash* hash ); + +/** + Frees a hash + */ +void osrfHashFree( osrfHash* hash ); + +/** + @return The number of items in the hash + */ +unsigned long osrfHashGetCount( osrfHash* hash ); + + + + +/** + Creates a new list iterator with the given list + */ +osrfHashIterator* osrfNewHashIterator( osrfHash* hash ); + +/** + Returns the next non-NULL item in the list, return NULL when + the end of the list has been reached + */ +void* osrfHashIteratorNext( osrfHashIterator* itr ); + +/** + Deallocates the given list + */ +void osrfHashIteratorFree( osrfHashIterator* itr ); + +void osrfHashIteratorReset( osrfHashIterator* itr ); + diff --git a/src/libstack/osrf_list.c b/src/libstack/osrf_list.c new file mode 100644 index 0000000..a3d673e --- /dev/null +++ b/src/libstack/osrf_list.c @@ -0,0 +1,164 @@ +#include "osrf_list.h" + + +osrfList* osrfNewList() { + osrfList* list = safe_malloc(sizeof(osrfList)); + list->list = (Pvoid_t) NULL; + list->size = 0; + list->freeItem = NULL; + return list; +} + + +int osrfListPush( osrfList* list, void* item ) { + if(!(list && item)) return -1; + Word_t* value; + unsigned long index = -1; + JLL(value, list->list, index ); + osrfListSet( list, item, index+1 ); + return 0; +} + + +void* osrfListSet( osrfList* list, void* item, unsigned long position ) { + if(!list || position < 0) return NULL; + + Word_t* value; + void* olditem = osrfListRemove( list, position ); + + JLI( value, list->list, position ); + *value = (Word_t) item; + __osrfListSetSize( list ); + + return olditem; +} + + +void* osrfListGetIndex( osrfList* list, unsigned long position ) { + if(!list) return NULL; + + Word_t* value; + JLG( value, list->list, position ); + if(value) return (void*) *value; + return NULL; +} + +void osrfListFree( osrfList* list ) { + if(!list) return; + + Word_t* value; + unsigned long index = -1; + JLL(value, list->list, index ); + int retcode; + + while (value != NULL) { + JLD(retcode, list->list, index); + + if(list->freeItem) { + list->freeItem( (void*) *value ); + *value = (Word_t) NULL; + } + + JLP(value, list->list, index); + } + + free(list); +} + +void* osrfListRemove( osrfList* list, int position ) { + if(!list) return NULL; + + int retcode; + Word_t* value; + JLG( value, list->list, position ); + void* olditem = NULL; + + if( value ) { + + olditem = (void*) *value; + if( olditem ) { + JLD(retcode, list->list, position ); + if(retcode == 1) { + if(list->freeItem) { + list->freeItem( olditem ); + olditem = NULL; + } + __osrfListSetSize( list ); + } + } + } + + return olditem; +} + + +int osrfListFind( osrfList* list, void* addr ) { + if(!(list && addr)) return -1; + + Word_t* value; + unsigned long index = -1; + JLL(value, list->list, index ); + + while (value != NULL) { + if( (void*) *value == addr ) + return index; + JLP(value, list->list, index); + } + + return -1; +} + + + +void __osrfListSetSize( osrfList* list ) { + if(!list) return; + + Word_t* value; + unsigned long index = -1; + JLL(value, list->list, index ); + list->size = index + 1; +} + + +unsigned long osrfListGetCount( osrfList* list ) { + if(!list) return -1; + unsigned long retcode = -1; + JLC( retcode, list->list, 0, -1 ); + return retcode; +} + + +osrfListIterator* osrfNewListIterator( osrfList* list ) { + if(!list) return NULL; + osrfListIterator* itr = safe_malloc(sizeof(osrfListIterator)); + itr->list = list; + itr->current = 0; + return itr; +} + +void* osrfListIteratorNext( osrfListIterator* itr ) { + if(!(itr && itr->list)) return NULL; + + Word_t* value; + if(itr->current >= itr->list->size) return NULL; + JLF( value, itr->list->list, itr->current ); + if(value) { + itr->current++; + return (void*) *value; + } + return NULL; +} + +void osrfListIteratorFree( osrfListIterator* itr ) { + if(!itr) return; + free(itr); +} + + + +void osrfListIteratorReset( osrfListIterator* itr ) { + if(!itr) return; + itr->current = 0; +} + + diff --git a/src/libstack/osrf_list.h b/src/libstack/osrf_list.h new file mode 100644 index 0000000..9486cd0 --- /dev/null +++ b/src/libstack/osrf_list.h @@ -0,0 +1,116 @@ +#include +#include "opensrf/utils.h" +#include + +/** + Items are stored as void*'s so it's up to the user to + manage the data wisely. Also, if the 'freeItem' callback is defined for the list, + then, it will be used on any item that needs to be freed, so don't mix data + types in the list if you want magic freeing */ + +struct __osrfListStruct { + Pvoid_t list; /* the list */ + int size; /* how many items in the list including NULL items between non-NULL items */ + void (*freeItem) (void* item); /* callback for freeing stored items */ +}; +typedef struct __osrfListStruct osrfList; + + +struct __osrfListIteratorStruct { + osrfList* list; + unsigned long current; +}; +typedef struct __osrfListIteratorStruct osrfListIterator; + + +/** + Creates a new list iterator with the given list + */ +osrfListIterator* osrfNewListIterator( osrfList* list ); + +/** + Returns the next non-NULL item in the list, return NULL when + the end of the list has been reached + */ +void* osrfListIteratorNext( osrfListIterator* itr ); + +/** + Deallocates the given list + */ +void osrfListIteratorFree( osrfListIterator* itr ); + +void osrfListIteratorReset( osrfListIterator* itr ); + + +/** + Allocates a new list + @param compress If true, the list will compress empty slots on delete. If item positionality + is not important, then using this feature is reccomended to keep the list from growing indefinitely. + if item positionality is not important. + @return The allocated list + */ +osrfList* osrfNewList(); + +/** + Pushes an item onto the end of the list. This always finds the highest index + in the list and pushes the new item into the list after it. + @param list The list + @param item The item to push + @return 0 on success, -1 on failure + */ +int osrfListPush( osrfList* list, void* item ); + +/** + Puts the given item into the list at the specified position. If there + is already an item at the given position and the list has it's + "freeItem" function defined, then it will be used to free said item. + If no 'freeItem' callback is defined, then the displaced item will + be returned; + @param list The list + @param item The item to put into the list + @param position The position to place the item in + @return NULL in successfully inserting the new item and freeing + any displaced items. Returns the displaced item if no "freeItem" + callback is defined. + */ +void* osrfListSet( osrfList* list, void* item, unsigned long position ); + +/** + Returns the item at the given position + @param list The list + @param postiont the position + */ +void* osrfListGetIndex( osrfList* list, unsigned long position ); + +/** + Frees the list and all list items (if the list has a "freeItem" function defined ) + @param list The list + */ +void osrfListFree( osrfList* list ); + +/** + Removes the list item at the given index + @param list The list + @param position The position of the item to remove + @return A pointer to the item removed if "freeItem" is not defined + for this list, returns NULL if it is. + */ +void* osrfListRemove( osrfList* list, int position ); + +/** + Finds the list item whose void* is the same as the one passed in + @param list The list + @param addr The pointer connected to the list item we're to find + @return the index of the item, or -1 if the item was not found + */ +int osrfListFind( osrfList* list, void* addr ); + + +void __osrfListSetSize( osrfList* list ); + +/** + @return The number of non-null items in the list + */ +unsigned long osrfListGetCount( osrfList* list ); + + diff --git a/src/libstack/osrf_stack.c b/src/libstack/osrf_stack.c index 4fb3955..0bdca70 100644 --- a/src/libstack/osrf_stack.c +++ b/src/libstack/osrf_stack.c @@ -34,12 +34,14 @@ osrfAppSession* osrf_stack_transport_handler( transport_message* msg, char* my_s if( msg->is_error && ! msg->thread ) { warning_handler("!! Received jabber layer error for %s ... exiting\n", msg->sender ); + message_free( msg ); return NULL; } if(! msg->thread && ! msg->is_error ) { warning_handler("Received a non-error message with no thread trace... dropping"); message_free( msg ); + return NULL; } osrf_app_session* session = osrf_app_session_find_session( msg->thread ); diff --git a/src/libstack/osrf_system.c b/src/libstack/osrf_system.c index 77e5021..d6c9e85 100644 --- a/src/libstack/osrf_system.c +++ b/src/libstack/osrf_system.c @@ -144,7 +144,7 @@ int osrf_system_bootstrap_client_resc( char* config_file, char* contextnode, cha char* port = osrfConfigGetValue( NULL, "/port" ); char* unixpath = osrfConfigGetValue( NULL, "/unixpath" ); - char* domain = osrfStringArrayGetString( arr, 0 ); /* just the first for now */ + char* domain = strdup(osrfStringArrayGetString( arr, 0 )); /* just the first for now */ osrfStringArrayFree(arr); diff --git a/src/libstack/osrf_transgroup.c b/src/libstack/osrf_transgroup.c new file mode 100644 index 0000000..40bded1 --- /dev/null +++ b/src/libstack/osrf_transgroup.c @@ -0,0 +1,263 @@ +#include "osrf_transgroup.h" +#include + + +osrfTransportGroupNode* osrfNewTransportGroupNode( + char* domain, int port, char* username, char* password, char* resource ) { + + if(!(domain && port && username && password && resource)) return NULL; + + osrfTransportGroupNode* node = safe_malloc(sizeof(osrfTransportGroupNode)); + node->domain = strdup(domain); + node->port = port; + node->username = strdup(username); + node->password = strdup(password); + node->domain = strdup(domain); + node->active = 0; + node->lastsent = 0; + node->connection = client_init( domain, port, NULL, 0 ); + + return node; +} + + +osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count ) { + if(!nodes || !router || count < 1) return NULL; + + osrfTransportGroup* grp = safe_malloc(sizeof(osrfTransportGroup)); + grp->currentNode = 0; + grp->router = strdup(router); + grp->list = osrfNewList(1); + + int i; + for( i = 0; i != count; i++ ) osrfListPush( grp->list, nodes[i] ); + return grp; +} + + +int osrfTransportGroupConnect( osrfTransportGroup* grp ) { + if(!grp) return 0; + int i; + int active = 0; + for( i = 0; i != grp->list->size; i++ ) { + osrfTransportGroupNode* node = osrfListGetIndex( grp->list, i ); + if(client_connect( node->connection, node->username, + node->password, node->resource, 10, AUTH_DIGEST )) { + node->active = 1; + node->lastsent = time(NULL); + active++; + } + } + return active; +} + + +/* +osrfTransportGroup* osrfNewTransportGroup( char* resource ) { + + grp->username = osrfConfigGetValue( NULL, "/username" ); + grp->password = osrfConfigGetValue( NULL, "/passwd" ); + char* port = osrfConfigGetValue( NULL, "/port" ); + if(port) grp->port = atoi(port); + grp->currentNode = 0; + + if(!resource) resource = "client"; + char* host = getenv("HOSTNAME"); + if(!host) host = "localhost"; + char* res = va_list_to_string( "osrf_%s_%s_%d", resource, host, getpid() ); + + int i; + osrfStringArray* arr = osrfNewStringArray(8); + osrfConfigGetValueList(NULL, arr, "/domains/domain"); + + for( i = 0; i != arr->size; i++ ) { + char* domain = osrfStringArrayGetString( arr, i ); + if(domain) { + node->domain = strdup(domain); + node->connection = client_init( domain, grp->port, NULL, 0 ); + if(client_connect( node->connection, grp->username, grp->password, res, 10, AUTH_DIGEST )) { + node->active = 1; + node->lastsent = time(NULL); + } + osrfListPush( grp->list, node ); + } + } + + free(res); + osrfStringArrayFree(arr); + return grp; +} +*/ + + +int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain ) { + if(!(grp && msg)) return -1; + + char domain[256]; + bzero(domain, 256); + jid_get_domain( msg->recipient, domain ); + + char msgrecip[254]; + bzero(msgrecip, 254); + jid_get_username(msg->recipient, msgrecip); + + + osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain ); + + if( strcmp( msgrecip, grp->router ) ) { /* not a top level router message */ + + if(node) { + if( (client_send_message( node->connection, msg )) == 0 ) + return 0; + else + return warning_handler("Error sending message to domain %s", domain ); + } + return warning_handler("Transport group has no node for domain %s", domain ); + } + + + /* + if( type == OSRF_SERVER_NODE ) + return _osrfTGServerSend( grp, msgdom, msg ); + if( type == OSRF_CLIENT_NODE ) + return _osrfTGClientSend( grp, msgdom, msg ); + */ + + return -1; +} + +int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) { + + debug_handler("Transport group sending server message to domain %s", domain ); + + osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain ); + if(node) { + if( (client_send_message( node->connection, msg )) == 0 ) + return 0; + else + return warning_handler("Error sending server response to domain %s", domain ); + } + return warning_handler("Transport group has no node for domain %s for server response", domain ); +} + + +int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg ) { + + debug_handler("Transport group sending client message to domain %s", domain ); + + /* first see if we have a node for the requested domain */ + osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain ); + if(node && node->active) { + if( (client_send_message( node->connection, msg )) == 0 ) + return 0; + else + node->active = 0; + } + + /* if not (or it fails), try sending to the current domain */ + node = osrfListGetIndex(grp->list, grp->currentNode); + if(node && node->active) { + if( (client_send_message( node->connection, msg )) == 0 ) + return 0; + } + + /* start at the beginning and try them all ... */ + grp->currentNode = 0; + while( grp->currentNode < grp->list->size ) { + if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) { + if( (client_send_message( node->connection, msg )) == 0 ) + return 1; + else node->active = 0; + } + } + return -1; +} + +static int __osrfTGWait( fd_set* fdset, int maxfd, int timeout ) { + if(!(fdset && maxfd)) return 0; + + struct timeval tv; + tv.tv_sec = timeout; + tv.tv_usec = 0; + int retval = 0; + + if( timeout < 0 ) { + if( (retval = select( maxfd + 1, fdset, NULL, NULL, NULL)) == -1 ) + return 0; + + } else { + if( (retval = select( maxfd + 1, fdset, NULL, NULL, &tv)) == -1 ) + return 0; + } + + return retval; +} + + +transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ) { + if(!(grp && grp->list)) return NULL; + + int i; + int maxfd = 0; + osrfTransportGroupNode* node = NULL; + fd_set fdset; + FD_ZERO( &fdset ); + + for( i = 0; i != grp->list->size; i++ ) { + if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) { + int fd = node->connection->session->sock_id; + if( fd < maxfd ) maxfd = fd; + FD_SET( fd, &fdset ); + } + } + + if( __osrfTGWait( &fdset, maxfd, timeout ) ) { + for( i = 0; i != grp->list->size; i++ ) { + if( (node = osrfListGetIndex(grp->list, grp->currentNode++)) && node->active ) { + int fd = node->connection->session->sock_id; + if( FD_ISSET( fd, &fdset ) ) { + return client_recv( node->connection, 0 ); + } + } + } + } + + return NULL; +} + +transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ) { + if(!(grp && domain)) return NULL; + + osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain ); + if(!node && node->connection && node->connection->session) return NULL; + int fd = node->connection->session->sock_id; + + fd_set fdset; + FD_ZERO( &fdset ); + FD_SET( fd, &fdset ); + + int active = __osrfTGWait( &fdset, fd, timeout ); + if(active) return client_recv( node->connection, 0 ); + + return NULL; +} + +void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ) { + if(!(grp && domain)) return; + osrfTransportGroupNode* node = __osrfTransportGroupFindNode( grp, domain ); + if(node) node->active = 0; +} + +osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain ) { + if(!(grp && grp->list && domain)) return NULL; + int i = 0; + osrfTransportGroupNode* node = NULL; + + while( (node = (osrfTransportGroupNode*) osrfListGetIndex( grp->list, i++ )) ) + if(!strcmp(node->domain, domain)) return node; + return NULL; +} + + + + diff --git a/src/libstack/osrf_transgroup.h b/src/libstack/osrf_transgroup.h new file mode 100644 index 0000000..c9b6be1 --- /dev/null +++ b/src/libstack/osrf_transgroup.h @@ -0,0 +1,117 @@ +#include "opensrf/transport_client.h" +#include "opensrf/transport_message.h" +#include "osrf_list.h" +#include "osrfConfig.h" +#include "opensrf/utils.h" +#include + +/** + Maintains a set of transport clients for redundancy + */ + +//enum osrfTGType { OSRF_SERVER_NODE, OSRF_CLIENT_NODE }; + +struct __osrfTransportGroupStruct { + osrfList* list; /* our lisit of nodes */ + char* router; /* the login username of the router on this network */ + int currentNode; /* which node are we currently on. Used for client failover and + only gets updated on client messages where a server failed + and we need to move to the next server in the list */ +}; +typedef struct __osrfTransportGroupStruct osrfTransportGroup; + + +struct __osrfTransportGroupNode { + transport_client* connection; /* our connection to the network */ + char* domain; /* the domain we're connected to */ + char* username; /* username used to connect to the group of servers */ + char* password; /* password used to connect to the group of servers */ + char* resource; /* the login resource */ + int port; /* port used to connect to the group of servers */ + + int active; /* true if we're able to send data on this connection */ + time_t lastsent; /* the last time we sent a message */ +}; +typedef struct __osrfTransportGroupNode osrfTransportGroupNode; + + +/** + Creates a new group node + @param domain The domain we're connecting to + @param port The port to connect on + @param username The login name + @param password The login password + @param resource The login resource + @return A new transport group node + */ +osrfTransportGroupNode* osrfNewTransportGroupNode( + char* domain, int port, char* username, char* password, char* resource ); + + +/** + Allocates and initializes a new transport group. + The first node in the array is the default node for client connections. + @param router The router name shared accross the networks + @param nodes The nodes in the group. + */ +osrfTransportGroup* osrfNewTransportGroup( char* router, osrfTransportGroupNode* nodes[], int count ); + +/** + Attempts to connect all of the nodes in this group. + @param grp The transport group + @return The number of nodes successfully connected + */ +int osrfTransportGroupConnect( osrfTransportGroup* grp ); + + +/** + Sends a transport message + If the message is destined for a domain that this group does not have a connection + for, then the message is sent out through the currently selected domain. + @param grp The transport group + @param type Whether this is a client request or a server response + @param msg The message to send + @param newdomain A pre-allocated buffer in which to write the name of the + new domain if a the expected domain could not be sent to. + @return 0 on normal successful send. Returns 1 if the message was sent + to a new domain (note: this can only happen when type == OSRF_CLIENT_NODE) + Returns -1 if the message cannot be sent. + */ +int osrfTransportGroupSend( osrfTransportGroup* grp, transport_message* msg, char* newdomain ); + +int _osrfTGServerSend( osrfTransportGroup* grp, char* domain, transport_message* msg ); +int _osrfTGClientSend( osrfTransportGroup* grp, char* domain, transport_message* msg ); + +/** + Waits on all connections for inbound data. + @param grp The transport group + @param timeout How long to wait for data. 0 means check for data + but don't wait, a negative number means to wait indefinitely + @return The received message or NULL if the timeout occurred before a + message was received + */ +transport_message* osrfTransportGroupRecvAll( osrfTransportGroup* grp, int timeout ); + +/** + Waits for data from a single domain + @param grp The transport group + @param domain The domain to wait for data on + @param timeout see osrfTransportGroupRecvAll + */ +transport_message* osrfTransportGroupRecv( osrfTransportGroup* grp, char* domain, int timeout ); + +/** + Tells the group that the connect to the last message sent to the provided + domain did not make it through; + @param grp The transport group + @param comain The failed domain + */ +void osrfTransportGroupSetInactive( osrfTransportGroup* grp, char* domain ); + + +/** + Finds a node in our list of nodes + */ +osrfTransportGroupNode* __osrfTransportGroupFindNode( osrfTransportGroup* grp, char* domain ); + + diff --git a/src/libtransport/transport_session.c b/src/libtransport/transport_session.c index 8d67ae1..e2b6cc4 100644 --- a/src/libtransport/transport_session.c +++ b/src/libtransport/transport_session.c @@ -125,18 +125,16 @@ int session_wait( transport_session* session, int timeout ) { int session_send_msg( transport_session* session, transport_message* msg ) { - if( ! session ) { return 0; } + if( ! session ) { return -1; } if( ! session->state_machine->connected ) { warning_handler("State machine is not connected in send_msg()"); - return 0; + return -1; } message_prepare_xml( msg ); //tcp_send( session->sock_obj, msg->msg_xml ); - socket_send( session->sock_id, msg->msg_xml ); - - return 1; + return socket_send( session->sock_id, msg->msg_xml ); } diff --git a/src/router/Makefile b/src/router/Makefile index a0a3330..e156743 100644 --- a/src/router/Makefile +++ b/src/router/Makefile @@ -4,13 +4,19 @@ LDLIBS += -lxml2 -lopensrf -lobjson CFLAGS += -D_ROUTER all: opensrf_router +#osrf_router install: cp opensrf_router $(BINDIR) -opensrf_router: router.o - $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@ -router.o: router.c router.h +#opensrf_router: router.o +# $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) router.o -o $@ +#router.o: router.c router.h + +opensrf_router: osrf_router.o osrf_router_main.o + $(CC) $(CFLAGS) $(LDFLAGS) $(LDLIBS) osrf_router.o osrf_router_main.o -o $@ +osrf_router.o: osrf_router.c osrf_router.h +osrf_router_main.o: osrf_router_main.c clean: /bin/rm -f *.o opensrf_router diff --git a/src/router/osrf_router.c b/src/router/osrf_router.c new file mode 100644 index 0000000..07609ce --- /dev/null +++ b/src/router/osrf_router.c @@ -0,0 +1,615 @@ +#include "osrf_router.h" + +#define ROUTER_SOCKFD connection->session->sock_id +#define ROUTER_REGISTER "register" +#define ROUTER_UNREGISTER "unregister" + + +#define ROUTER_REQUEST_CLASS_LIST "opensrf.router.info.class.list" + +osrfRouter* osrfNewRouter( + char* domain, char* name, + char* resource, char* password, int port, + osrfStringArray* trustedClients, osrfStringArray* trustedServers ) { + + if(!( domain && name && resource && password && port )) return NULL; + + osrfRouter* router = safe_malloc(sizeof(osrfRouter)); + router->domain = strdup(domain); + router->name = strdup(name); + router->password = strdup(password); + router->resource = strdup(resource); + router->port = port; + + router->trustedClients = trustedClients; + router->trustedServers = trustedServers; + + router->classes = osrfNewHash(); + router->classes->freeItem = &osrfRouterClassFree; + + router->connection = client_init( domain, port, NULL, 0 ); + + return router; +} + + + +int osrfRouterConnect( osrfRouter* router ) { + if(!router) return -1; + int ret = client_connect( router->connection, router->name, + router->password, router->resource, 10, AUTH_DIGEST ); + if( ret == 0 ) return -1; + return 0; +} + + +void osrfRouterRun( osrfRouter* router ) { + if(!(router && router->classes)) return; + + int routerfd = router->ROUTER_SOCKFD; + int selectret = 0; + + while(1) { + + fd_set set; + int maxfd = __osrfRouterFillFDSet( router, &set ); + int numhandled = 0; + + if( (selectret = select(maxfd + 1, &set, NULL, NULL, NULL)) < 0 ) { + warning_handler("Top level select call failed with errno %d", errno); + continue; + } + + /* see if there is a top level router message */ + + if( FD_ISSET(routerfd, &set) ) { + debug_handler("Top router socket is active: %d", routerfd ); + numhandled++; + osrfRouterHandleIncoming( router ); + } + + + /* now check each of the connected classes and see if they have data to route */ + while( numhandled < selectret ) { + + osrfRouterClass* class; + osrfHashIterator* itr = osrfNewHashIterator(router->classes); + + while( (class = osrfHashIteratorNext(itr)) ) { + + char* classname = itr->current; + + if( classname && (class = osrfRouterFindClass( router, classname )) ) { + + debug_handler("Checking %s for activity...", classname ); + + int sockfd = class->ROUTER_SOCKFD; + if(FD_ISSET( sockfd, &set )) { + debug_handler("Socket is active: %d", sockfd ); + numhandled++; + osrfRouterClassHandleIncoming( router, classname, class ); + } + } + } + + osrfHashIteratorFree(itr); + } + } +} + + +void osrfRouterHandleIncoming( osrfRouter* router ) { + if(!router) return; + + transport_message* msg = NULL; + + if( (msg = client_recv( router->connection, 0 )) ) { + + if( msg->sender ) { + + /* if the sender is not a trusted server, drop the message */ + int len = strlen(msg->sender) + 1; + char domain[len]; + bzero(domain, len); + jid_get_domain( msg->sender, domain ); + + if(osrfStringArrayContains( router->trustedServers, domain)) + osrfRouterHandleMessage( router, msg ); + else + warning_handler("Received message from un-trusted server domain %s", msg->sender); + } + + message_free(msg); + } +} + +int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ) { + if(!(router && class)) return -1; + + transport_message* msg; + debug_handler("osrfRouterClassHandleIncoming()"); + + if( (msg = client_recv( class->connection, 0 )) ) { + + if( msg->sender ) { + + /* if the client is not from a trusted domain, drop the message */ + int len = strlen(msg->sender) + 1; + char domain[len]; + bzero(domain, len); + jid_get_domain( msg->sender, domain ); + + if(osrfStringArrayContains( router->trustedClients, domain)) { + + transport_message* bouncedMessage = NULL; + if( msg->is_error ) { + + /* handle bounced message */ + if( !(bouncedMessage = osrfRouterClassHandleBounce( router, classname, class, msg )) ) + return -1; /* we have no one to send the requested message to */ + + message_free( msg ); + msg = bouncedMessage; + } + osrfRouterClassHandleMessage( router, class, msg ); + + } else { + warning_handler("Received client message from untrusted client domain %s", domain ); + } + } + + message_free( msg ); + } + + return 0; +} + + + + +int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ) { + if(!(router && msg)) return -1; + + if( !msg->router_command || !strcmp(msg->router_command,"")) + return osrfRouterHandleAppRequest( router, msg ); /* assume it's an app session level request */ + + if(!msg->router_class) return -1; + + osrfRouterClass* class = NULL; + if(!strcmp(msg->router_command, ROUTER_REGISTER)) { + class = osrfRouterFindClass( router, msg->router_class ); + + info_handler("Registering class %s", msg->router_class ); + + if(!class) class = osrfRouterAddClass( router, msg->router_class ); + + if(class) { + + if( osrfRouterClassFindNode( class, msg->sender ) ) + return 0; + else + osrfRouterClassAddNode( class, msg->sender ); + + } + + } else if( !strcmp( msg->router_command, ROUTER_UNREGISTER ) ) { + + if( msg->router_class && strcmp( msg->router_class, "") ) { + info_handler("Unregistering router class %s", msg->router_class ); + osrfRouterClassRemoveNode( router, msg->router_class, msg->sender ); + } + } + + return 0; +} + + + +osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ) { + if(!(router && router->classes && classname)) return NULL; + + osrfRouterClass* class = safe_malloc(sizeof(osrfRouterClass)); + class->nodes = osrfNewHash(); + class->itr = osrfNewHashIterator(class->nodes); + class->nodes->freeItem = &osrfRouterNodeFree; + class->router = router; + + class->connection = client_init( router->domain, router->port, NULL, 0 ); + + if(!client_connect( class->connection, router->name, + router->password, classname, 10, AUTH_DIGEST ) ) { + osrfRouterClassFree( classname, class ); + return NULL; + } + + osrfHashSet( router->classes, class, classname ); + return class; +} + + +int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ) { + if(!(rclass && rclass->nodes && remoteId)) return -1; + + info_handler("Adding router node for remote id %s", remoteId ); + + osrfRouterNode* node = safe_malloc(sizeof(osrfRouterNode)); + node->count = 0; + node->lastMessage = NULL; + node->remoteId = strdup(remoteId); + + osrfHashSet( rclass->nodes, node, remoteId ); + return 0; +} + +/* copy off the lastMessage, remove the offending node, send error if it's tht last node + ? return NULL if it's the last node ? + */ + +transport_message* osrfRouterClassHandleBounce( + osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ) { + + debug_handler("osrfRouterClassHandleBounce()"); + + warning_handler("Received network layer error message from %s", msg->sender ); + osrfRouterNode* node = osrfRouterClassFindNode( rclass, msg->sender ); + transport_message* lastSent = NULL; + + if( node && osrfHashGetCount(rclass->nodes) == 1 ) { /* the last node is dead */ + + if( node->lastMessage ) { + warning_handler("We lost the last node in the class, responding with error and removing..."); + + transport_message* error = message_init( + node->lastMessage->body, node->lastMessage->subject, + node->lastMessage->thread, node->lastMessage->router_from, node->lastMessage->recipient ); + set_msg_error( error, "cancel", 501 ); + + /* send the error message back to the original sender */ + client_send_message( rclass->connection, error ); + message_free( error ); + } + + return NULL; + + } else { + + if( node->lastMessage ) { + debug_handler("Cloning lastMessage so next node can send it"); + lastSent = message_init( node->lastMessage->body, + node->lastMessage->subject, node->lastMessage->thread, "", node->lastMessage->router_from ); + message_set_router_info( lastSent, node->lastMessage->router_from, NULL, NULL, NULL, 0 ); + } + } + + /* remove the dead node */ + osrfRouterClassRemoveNode( router, classname, msg->sender); + return lastSent; +} + + +/** + If we get a regular message, we send it to the next node in the list of nodes + if we get an error, it's a bounce back from a previous attempt. We take the + body and thread from the last sent on the node that had the bounced message + and propogate them on to the new message being sent + */ +int osrfRouterClassHandleMessage( + osrfRouter* router, osrfRouterClass* rclass, transport_message* msg ) { + if(!(router && rclass && msg)) return -1; + + debug_handler("osrfRouterClassHandleMessage()"); + + osrfRouterNode* node = osrfHashIteratorNext( rclass->itr ); + if(!node) { + osrfHashIteratorReset(rclass->itr); + node = osrfHashIteratorNext( rclass->itr ); + } + + if(node) { + + transport_message* new_msg= message_init( msg->body, + msg->subject, msg->thread, node->remoteId, msg->sender ); + message_set_router_info( new_msg, msg->sender, NULL, NULL, NULL, 0 ); + + info_handler( "Routing message:\nfrom: [%s]\nto: [%s]", + new_msg->router_from, new_msg->recipient ); + + message_free( node->lastMessage ); + node->lastMessage = new_msg; + + if ( client_send_message( rclass->connection, new_msg ) == 0 ) + node->count++; + + else { + message_prepare_xml(new_msg); + warning_handler("Error sending message from %s to %s\n%s", + new_msg->sender, new_msg->recipient, new_msg->msg_xml ); + } + + } + + return 0; +} + + +int osrfRouterRemoveClass( osrfRouter* router, char* classname ) { + if(!(router && router->classes && classname)) return -1; + info_handler("Removing router class %s", classname ); + osrfHashRemove( router->classes, classname ); + return 0; +} + + +int osrfRouterClassRemoveNode( + osrfRouter* router, char* classname, char* remoteId ) { + + if(!(router && router->classes && classname && remoteId)) return 0; + + info_handler("Removing router node %s", remoteId ); + + osrfRouterClass* class = osrfRouterFindClass( router, classname ); + + if( class ) { + + osrfHashRemove( class->nodes, remoteId ); + if( osrfHashGetCount(class->nodes) == 0 ) { + osrfRouterRemoveClass( router, classname ); + return 1; + } + + return 0; + } + + return -1; +} + + +void osrfRouterClassFree( char* classname, void* c ) { + if(!(classname && c)) return; + osrfRouterClass* rclass = (osrfRouterClass*) c; + client_disconnect( rclass->connection ); + client_free( rclass->connection ); + + osrfHashIteratorReset( rclass->itr ); + osrfRouterNode* node; + + while( (node = osrfHashIteratorNext(rclass->itr)) ) + osrfRouterClassRemoveNode( rclass->router, classname, node->remoteId ); + + free(rclass); +} + + +void osrfRouterNodeFree( char* remoteId, void* n ) { + if(!n) return; + osrfRouterNode* node = (osrfRouterNode*) n; + free(node->remoteId); + message_free(node->lastMessage); + free(node); +} + + +void osrfRouterFree( osrfRouter* router ) { + if(!router) return; + + free(router->domain); + free(router->name); + free(router->resource); + free(router->password); + + osrfStringArrayFree( router->trustedClients ); + osrfStringArrayFree( router->trustedServers ); + + client_free( router->connection ); + free(router); +} + + + +osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ) { + if(!( router && router->classes && classname )) return NULL; + return (osrfRouterClass*) osrfHashGet( router->classes, classname ); +} + + +osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ) { + if(!(rclass && remoteId)) return NULL; + return (osrfRouterNode*) osrfHashGet( rclass->nodes, remoteId ); +} + + +int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ) { + if(!(router && router->classes && set)) return -1; + + FD_ZERO(set); + int maxfd = router->ROUTER_SOCKFD; + FD_SET(maxfd, set); + + int sockid; + + osrfRouterClass* class = NULL; + osrfHashIterator* itr = osrfNewHashIterator(router->classes); + + while( (class = osrfHashIteratorNext(itr)) ) { + char* classname = itr->current; + + if( classname && (class = osrfRouterFindClass( router, classname )) ) { + sockid = class->ROUTER_SOCKFD; + + if( osrfUtilsCheckFileDescriptor( sockid ) ) { + osrfRouterRemoveClass( router, classname ); + + } else { + if( sockid > maxfd ) maxfd = sockid; + FD_SET(sockid, set); + } + } + } + + osrfHashIteratorFree(itr); + return maxfd; +} + + + +int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ) { + + int T = 32; + osrfMessage* arr[T]; + memset(arr, 0, T ); + + int num_msgs = osrf_message_deserialize( msg->body, arr, T ); + osrfMessage* omsg = NULL; + + int i; + for( i = 0; i != num_msgs; i++ ) { + + if( !(omsg = arr[i]) ) continue; + + switch( omsg->m_type ) { + + case CONNECT: + osrfRouterRespondConnect( router, msg, omsg ); + break; + + case REQUEST: + osrfRouterProcessAppRequest( router, msg, omsg ); + break; + + default: break; + } + + osrfMessageFree( omsg ); + } + + return 0; +} + +int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) { + if(!(router && msg && omsg)) return -1; + + osrfMessage* success = osrf_message_init( STATUS, omsg->thread_trace, omsg->protocol ); + + debug_handler("router recevied a CONNECT message from %s", msg->sender ); + + osrf_message_set_status_info( + success, "osrfConnectStatus", "Connection Successful", OSRF_STATUS_OK ); + + char* data = osrf_message_serialize(success); + + transport_message* return_m = message_init( + data, "", msg->thread, msg->sender, "" ); + + client_send_message(router->connection, return_m); + + free(data); + osrf_message_free(success); + message_free(return_m); + + return 0; +} + + + +int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ) { + + if(!(router && msg && omsg && omsg->method_name)) return -1; + + info_handler("Router received app request: %s", omsg->method_name ); + + jsonObject* jresponse = NULL; + if(!strcmp( omsg->method_name, ROUTER_REQUEST_CLASS_LIST )) { + + int i; + jresponse = jsonParseString("[]"); + + osrfStringArray* keys = osrfHashKeys( router->classes ); + for( i = 0; i != keys->size; i++ ) + jsonObjectPush( jresponse, jsonNewObject(osrfStringArrayGetString( keys, i )) ); + osrfStringArrayFree(keys); + + + } else { + + return osrfRouterHandleMethodNFound( router, msg, omsg ); + } + + + osrfRouterHandleAppResponse( router, msg, omsg, jresponse ); + jsonObjectFree(jresponse); + + return 0; + +} + + + +int osrfRouterHandleMethodNFound( + osrfRouter* router, transport_message* msg, osrfMessage* omsg ) { + + osrf_message* err = osrf_message_init( STATUS, omsg->thread_trace, 1); + osrf_message_set_status_info( err, + "osrfMethodException", "Router method not found", OSRF_STATUS_NOTFOUND ); + + char* data = osrf_message_serialize(err); + + transport_message* tresponse = message_init( + data, "", msg->thread, msg->sender, msg->recipient ); + + client_send_message(router->connection, tresponse ); + + free(data); + osrf_message_free( err ); + message_free(tresponse); + return 0; +} + + + +int osrfRouterHandleAppResponse( osrfRouter* router, + transport_message* msg, osrfMessage* omsg, jsonObject* response ) { + + if( response ) { /* send the response message */ + + osrfMessage* oresponse = osrf_message_init( + RESULT, omsg->thread_trace, omsg->protocol ); + + char* json = jsonObjectToJSON(response); + osrf_message_set_result_content( oresponse, json); + + char* data = osrf_message_serialize(oresponse); + debug_handler( "Responding to client app request with data: \n%s\n", data ); + + transport_message* tresponse = message_init( + data, "", msg->thread, msg->sender, msg->recipient ); + + client_send_message(router->connection, tresponse ); + + osrfMessageFree(oresponse); + message_free(tresponse); + free(json); + free(data); + } + + + /* now send the 'request complete' message */ + osrf_message* status = osrf_message_init( STATUS, omsg->thread_trace, 1); + osrf_message_set_status_info( status, "osrfConnectStatus", "Request Complete", OSRF_STATUS_COMPLETE ); + + char* statusdata = osrf_message_serialize(status); + + transport_message* sresponse = message_init( + statusdata, "", msg->thread, msg->sender, msg->recipient ); + client_send_message(router->connection, sresponse ); + + + free(statusdata); + osrfMessageFree(status); + message_free(sresponse); + + return 0; +} + + + + diff --git a/src/router/osrf_router.h b/src/router/osrf_router.h new file mode 100644 index 0000000..ebe2897 --- /dev/null +++ b/src/router/osrf_router.h @@ -0,0 +1,221 @@ +#include +#include +#include + +#include "opensrf/utils.h" +#include "opensrf/osrf_list.h" +#include "opensrf/osrf_hash.h" + +#include "opensrf/string_array.h" +#include "opensrf/transport_client.h" +#include "opensrf/transport_message.h" + +#include "opensrf/osrf_message.h" + + + +/* a router maintains a list of server classes */ +struct __osrfRouterStruct { + + osrfHash* classes; /* our list of server classes */ + char* domain; /* our login domain */ + char* name; + char* resource; + char* password; + int port; + + osrfStringArray* trustedClients; + osrfStringArray* trustedServers; + + transport_client* connection; +}; + +typedef struct __osrfRouterStruct osrfRouter; + + +/* a class maintains a set of server nodes */ +struct __osrfRouterClassStruct { + osrfRouter* router; /* our router handle */ + osrfHashIterator* itr; + osrfHash* nodes; + transport_client* connection; +}; +typedef struct __osrfRouterClassStruct osrfRouterClass; + +/* represents a link to a single server's inbound connection */ +struct __osrfRouterNodeStruct { + char* remoteId; /* send message to me via this login */ + int count; /* how many message have been sent to this node */ + transport_message* lastMessage; +}; +typedef struct __osrfRouterNodeStruct osrfRouterNode; + +/** + Allocates a new router. + @param domain The jabber domain to connect to + @param name The login name for the router + @param resource The login resource for the router + @param password The login password for the new router + @param port The port to connect to the jabber server on + @param trustedClients The array of client domains that we allow to send requests through us + @param trustedServers The array of server domains that we allow to register, etc. with ust. + @return The allocated router or NULL on memory error + */ +osrfRouter* osrfNewRouter( char* domain, char* name, char* resource, + char* password, int port, osrfStringArray* trustedClients, osrfStringArray* trustedServers ); + +/** + Connects the given router to the network + */ +int osrfRouterConnect( osrfRouter* router ); + +/** + Waits for incoming data to route + If this function returns, then the router's connection to the jabber server + has failed. + */ +void osrfRouterRun( osrfRouter* router ); + + +/** + Allocates and adds a new router class handler to the router's list of handlers. + Also connects the class handler to the network at @domain/ + @param router The current router instance + @param classname The name of the class this node handles. + @return 0 on success, -1 on connection error. + */ +osrfRouterClass* osrfRouterAddClass( osrfRouter* router, char* classname ); + +/** + Adds a new server node to the given class. + @param rclass The Router class to add the node to + @param remoteId The remote login of this node + @return 0 on success, -1 on generic error + */ +int osrfRouterClassAddNode( osrfRouterClass* rclass, char* remoteId ); + + +/** + Handles top level router messages + @return 0 on success + */ +int osrfRouterHandleMessage( osrfRouter* router, transport_message* msg ); + + +/** + Handles class level requests + @return 0 on success + */ +int osrfRouterClassHandleMessage( osrfRouter* router, + osrfRouterClass* rclass, transport_message* msg ); + +/** + Removes a given class from the router, freeing as it goes + */ +int osrfRouterRemoveClass( osrfRouter* router, char* classname ); + +/** + Removes the given node from the class. Also, if this is that last node in the set, + removes the class from the router + @return 0 on successful removal with no class removal + @return 1 on successful remove with class removal + @return -1 error on removal + */ +int osrfRouterClassRemoveNode( osrfRouter* router, char* classname, char* remoteId ); + +/** + Frees a router class object + Takes a void* since it is freed by the hash code + */ +void osrfRouterClassFree( char* classname, void* rclass ); + +/** + Frees a router node object + Takes a void* since it is freed by the list code + */ +void osrfRouterNodeFree( char* remoteId, void* node ); + + +/** + Frees a router + */ +void osrfRouterFree( osrfRouter* router ); + +/** + Finds the class associated with the given class name in the list of classes + */ +osrfRouterClass* osrfRouterFindClass( osrfRouter* router, char* classname ); + +/** + Finds the router node within this class with the given remote id + */ +osrfRouterNode* osrfRouterClassFindNode( osrfRouterClass* rclass, char* remoteId ); + + +/** + Clears and populates the provided fd_set* with file descriptors + from the router's top level connection as well as each of the + router class connections + @return The largest file descriptor found in the filling process + */ +int __osrfRouterFillFDSet( osrfRouter* router, fd_set* set ); + + + +/** + Utility method for handling incoming requests to the router + and making sure the sender is allowed. + */ +void osrfRouterHandleIncoming( osrfRouter* router ); + +/** + Utility method for handling incoming requests to a router class, + makes sure sender is a trusted client + */ +int osrfRouterClassHandleIncoming( osrfRouter* router, char* classname, osrfRouterClass* class ); + +/* handles case where router node is not longer reachable. copies over the + data from the last sent message and returns a newly crafted suitable for treating + as a newly inconing message. Removes the dead node and If there are no more + nodes to send the new message to, returns NULL. + */ +transport_message* osrfRouterClassHandleBounce( + osrfRouter* router, char* classname, osrfRouterClass* rclass, transport_message* msg ); + + + +/** + handles messages that don't have a 'router_command' set. They are assumed to + be app request messages + */ +int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ); + + +/** + Handles connects, disconnects, etc. + */ +int osrfRouterHandeStatusMessage( osrfRouter* router, transport_message* msg ); + + +/** + Handles REQUEST messages + */ +int osrfRouterHandleRequestMessage( osrfRouter* router, transport_message* msg ); + + + +int osrfRouterHandleAppRequest( osrfRouter* router, transport_message* msg ); + + +int osrfRouterRespondConnect( osrfRouter* router, transport_message* msg, osrfMessage* omsg ); + + + +int osrfRouterProcessAppRequest( osrfRouter* router, transport_message* msg, osrfMessage* omsg ); + +int osrfRouterHandleAppResponse( osrfRouter* router, + transport_message* msg, osrfMessage* omsg, jsonObject* response ); + + +int osrfRouterHandleMethodNFound( osrfRouter* router, transport_message* msg, osrfMessage* omsg ); + diff --git a/src/router/osrf_router_main.c b/src/router/osrf_router_main.c new file mode 100644 index 0000000..c7fd903 --- /dev/null +++ b/src/router/osrf_router_main.c @@ -0,0 +1,104 @@ +#include "osrf_router.h" +#include "opensrf/osrfConfig.h" +#include "opensrf/utils.h" +#include "opensrf/logging.h" +#include + +osrfRouter* __osrfRouter = NULL; + +void routerSignalHandler( int signal ) { + warning_handler("Received signal [%d], cleaning up...", signal ); + osrfConfigCleanup(); + osrfRouterFree(__osrfRouter); + log_free(); +} + +static int __setupRouter( char* config, char* context ); + + +int main( int argc, char* argv[] ) { + + if( argc < 3 ) { + fatal_handler( "Usage: %s ", argv[0] ); + exit(0); + } + + char* config = strdup( argv[1] ); + char* context = strdup( argv[2] ); + init_proc_title( argc, argv ); + set_proc_title( "OpenSRF Router" ); + + return __setupRouter( config, context ); + free(config); + free(context); + +} + +int __setupRouter( char* config, char* context ) { + + fprintf(stderr, "Launching router with config %s and config context %s", config, context ); + osrfConfig* cfg = osrfConfigInit( config, context ); + osrfConfigSetDefaultConfig(cfg); + + + char* server = osrfConfigGetValue(NULL, "/transport/server"); + char* port = osrfConfigGetValue(NULL, "/transport/port"); + char* username = osrfConfigGetValue(NULL, "/transport/username"); + char* password = osrfConfigGetValue(NULL, "/transport/password"); + char* resource = osrfConfigGetValue(NULL, "/transport/resource"); + + /* set up the logger */ + char* level = osrfConfigGetValue(NULL, "/loglevel"); + char* log_file = osrfConfigGetValue(NULL, "/logfile"); + + int llevel = 1; + if(level) llevel = atoi(level); + + if(!log_init( llevel, log_file )) + fprintf(stderr, "Unable to init logging, going to stderr...\n" ); + + free(level); + free(log_file); + + info_handler( "Router connecting as: server: %s port: %s " + "user: %s resource: %s", server, port, username, resource ); + + int iport = 0; + if(port) iport = atoi( port ); + + osrfStringArray* tclients = osrfNewStringArray(4); + osrfStringArray* tservers = osrfNewStringArray(4); + osrfConfigGetValueList(NULL, tservers, "/trusted_domains/server" ); + osrfConfigGetValueList(NULL, tclients, "/trusted_domains/client" ); + + int i; + for( i = 0; i != tservers->size; i++ ) + info_handler( "Router adding trusted server: %s", osrfStringArrayGetString( tservers, i ) ); + + for( i = 0; i != tclients->size; i++ ) + info_handler( "Router adding trusted client: %s", osrfStringArrayGetString( tclients, i ) ); + + osrfRouter* router = osrfNewRouter( server, + username, resource, password, iport, tclients, tservers ); + + signal(SIGHUP,routerSignalHandler); + signal(SIGINT,routerSignalHandler); + signal(SIGTERM,routerSignalHandler); + + if( (osrfRouterConnect(router)) != 0 ) { + fprintf(stderr, "!!!! Unable to connect router to jabber server %s... exiting", server ); + return -1; + } + + free(server); free(port); + free(username); free(password); + + __osrfRouter = router; + daemonize(); + osrfRouterRun( router ); + + return -1; + +} + + diff --git a/src/utils/string_array.c b/src/utils/string_array.c index 1428a22..ffae2fc 100644 --- a/src/utils/string_array.c +++ b/src/utils/string_array.c @@ -64,8 +64,24 @@ void osrfStringArrayFree(osrfStringArray* arr) { void string_array_destroy(string_array* arr) { if(arr) { int i = 0; - while( i++ < arr->size ) free(arr->array[i]); + while( i < arr->size ) free(arr->array[i++]); free(arr->array); free(arr); } } + + +int osrfStringArrayContains( osrfStringArray* arr, char* string ) { + if(!(arr && string)) return 0; + + int i; + for( i = 0; i != arr->size; i++ ) { + char* str = osrfStringArrayGetString(arr, i); + if(str) { + if(!strcmp(str, string)) return 1; + } + } + + return 0; +} + diff --git a/src/utils/string_array.h b/src/utils/string_array.h index d0867aa..3a7efbd 100644 --- a/src/utils/string_array.h +++ b/src/utils/string_array.h @@ -26,6 +26,9 @@ void osrfStringArrayAdd(osrfStringArray*, char* string); char* string_array_get_string(osrfStringArray* arr, int index); char* osrfStringArrayGetString(osrfStringArray* arr, int index); +/* returns true if this array contains the given string */ +int osrfStringArrayContains( osrfStringArray* arr, char* string ); + void string_array_destroy(osrfStringArray*); void osrfStringArrayFree(osrfStringArray*); diff --git a/src/utils/utils.c b/src/utils/utils.c index 9c42578..cc1e284 100644 --- a/src/utils/utils.c +++ b/src/utils/utils.c @@ -15,6 +15,7 @@ GNU General Public License for more details. */ #include "utils.h" +#include inline void* safe_malloc( int size ) { void* ptr = (void*) malloc( size ); @@ -427,4 +428,20 @@ char* md5sum( char* text, ... ) { } +int osrfUtilsCheckFileDescriptor( int fd ) { + + fd_set tmpset; + FD_ZERO(&tmpset); + FD_SET(fd, &tmpset); + + struct timeval tv; + tv.tv_sec = 0; + tv.tv_usec = 0; + + if( select(fd + 1, &tmpset, NULL, NULL, &tv) == -1 ) { + if( errno == EBADF ) return -1; + } + + return 0; +} diff --git a/src/utils/utils.h b/src/utils/utils.h index 1737c29..999d608 100644 --- a/src/utils/utils.h +++ b/src/utils/utils.h @@ -184,4 +184,11 @@ char* file_to_string(const char* filename); char* md5sum( char* text, ... ); +/** + Checks the validity of the file descriptor + returns -1 if the file descriptor is invalid + returns 0 if the descriptor is OK + */ +int osrfUtilsCheckFileDescriptor( int fd ); + #endif