From: erickson Date: Thu, 24 Feb 2005 20:04:10 +0000 (+0000) Subject: OK. This is the first early C version of the OpenSRF stack. It's highly X-Git-Tag: osrf_rel_2_0_1~1763 X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=f9b39f21b19ba8f16af815c6e91c5b1262aa4619;p=OpenSRF.git OK. This is the first early C version of the OpenSRF stack. It's highly incomplete at this point. In particular, included in the stack code is is the osrf_message code wich the router now needs. Added a top level src makefile Modified router to respond to app level requests. The only available method curently returns a list of attached router classes. git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@109 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/include/opensrf/osrf_app_session.h b/include/opensrf/osrf_app_session.h new file mode 100644 index 0000000..fe4d824 --- /dev/null +++ b/include/opensrf/osrf_app_session.h @@ -0,0 +1,192 @@ +#include "libjson/json.h" +#include "opensrf/transport_client.h" +#include "opensrf/generic_utils.h" +#include "osrf_message.h" +#include "osrf_system.h" + +#ifndef OSRF_APP_SESSION +#define OSRF_APP_SESSION + + +#define DEF_RECV_TIMEOUT 6 /* receive timeout */ +#define DEF_QUEUE_SIZE + +enum OSRF_SESSION_STATE { OSRF_SESSION_CONNECTING, OSRF_SESSION_CONNECTED, OSRF_SESSION_DISCONNECTED }; +enum OSRF_SESSION_TYPE { OSRF_SESSION_SERVER, OSRF_SESSION_CLIENT }; + +struct osrf_app_request_struct { + /** Our controlling session */ + struct osrf_app_session_struct* session; + + /** our "id" */ + int request_id; + /** True if we have received a 'request complete' message from our request */ + int complete; + /** Our original request payload */ + osrf_message* payload; + /** List of responses to our request */ + osrf_message* result; + + /** So we can be listified */ + struct osrf_app_request_struct* next; +}; +typedef struct osrf_app_request_struct osrf_app_request; + +struct osrf_app_session_struct { + + /** Our messag passing object */ + transport_client* transport_handle; + /** Cache of active app_request objects */ + osrf_app_request* request_queue; + + /** The original remote id of the remote service we're talking to */ + char* orig_remote_id; + /** The current remote id of the remote service we're talking to */ + char* remote_id; + + /** Who we're talking to */ + char* remote_service; + + /** The current request thread_trace */ + int thread_trace; + /** Our ID */ + char* session_id; + + /** The connect state */ + enum OSRF_SESSION_STATE state; + + /** SERVER or CLIENT */ + enum OSRF_SESSION_TYPE type; + + /** So we can be listified */ + struct osrf_app_session_struct* next; +}; +typedef struct osrf_app_session_struct osrf_app_session; + + + +// -------------------------------------------------------------------------- +// PUBLIC API *** +// -------------------------------------------------------------------------- + +/** Allocates a initializes a new app_session */ +osrf_app_session* osrf_app_client_session_init( char* remote_service ); + +/** Allocates and initializes a new server session. The global session cache + * is checked to see if this session already exists, if so, it's returned + */ +osrf_app_session* osrf_app_server_session_init( + char* session_id, char* our_app, char* remote_service, char* remote_id ); + +/** returns a session from the global session hash */ +osrf_app_session* osrf_app_session_find_session( char* session_id ); + +/** Builds a new app_request object with the given payload andn returns + * the id of the request. This id is then used to perform work on the + * requeset. + */ +int osrf_app_session_make_request( + osrf_app_session* session, json* params, char* method_name, int protocol ); + +/** Sets the given request to complete state */ +void osrf_app_session_set_complete( osrf_app_session* session, int request_id ); + +/** Returns true if the given request is complete */ +int osrf_app_session_complete( osrf_app_session* session, int request_id ); + +/** Does a recv call on the given request */ +osrf_message* osrf_app_session_request_recv( + osrf_app_session* session, int request_id, int timeout ); + +/** Removes the request from the request set and frees the reqest */ +void osrf_app_session_request_finish( osrf_app_session* session, int request_id ); + +/** Resends the orginal request with the given request id */ +int osrf_app_session_request_resend( osrf_app_session*, int request_id ); + +/** Resets the remote connection target to that of the original*/ +void osrf_app_session_reset_remote( osrf_app_session* ); + +/** Sets the remote target to 'remote_id' */ +void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ); + +/** pushes the given message into the result list of the app_request + * whose request_id matches the messages thread_trace + */ +int osrf_app_session_push_queue( osrf_app_session*, osrf_message* msg ); + +/** Attempts to connect to the remote service. Returns 1 on successful + * connection, 0 otherwise. + */ +int osrf_app_session_connect( osrf_app_session* ); + +/** Sends a disconnect message to the remote service. No response is expected */ +int osrf_app_session_disconnect( osrf_app_session* ); + +/** Waits up to 'timeout' seconds for some data to arrive. + * Any data that arrives will be processed according to its + * payload and message type. This method will return after + * any data has arrived. + */ +int osrf_app_session_queue_wait( osrf_app_session*, int timeout ); + +/** Disconnects (if client), frees any attached app_reuqests, removes the session from the + * global session cache and frees the session. Needless to say, only call this when the + * session is completey done. + */ +void osrf_app_session_destroy ( osrf_app_session* ); + + + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- +// Request functions +// -------------------------------------------------------------------------- + +/** Allocations and initializes a new app_request object */ +osrf_app_request* _osrf_app_request_init( osrf_app_session* session, osrf_message* msg ); + +/** Frees memory used by an app_request object */ +void _osrf_app_request_free( osrf_app_request * req ); + +/** Pushes the given message onto the list of 'responses' to this request */ +void _osrf_app_request_push_queue( osrf_app_request*, osrf_message* payload ); + +/** Checks the receive queue for messages. If any are found, the first + * is popped off and returned. Otherwise, this method will wait at most timeout + * seconds for a message to appear in the receive queue. Once it arrives it is returned. + * If no messages arrive in the timeout provided, null is returned. + */ +osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ); + +/** Resend this requests original request message */ +int _osrf_app_request_resend( osrf_app_request* req ); + + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- +// Session functions +// -------------------------------------------------------------------------- + +/** Returns the app_request with the given thread_trace (request_id) */ +osrf_app_request* _osrf_app_session_get_request( osrf_app_session*, int thread_trace ); + +/** frees memory held by a session. Note: We delete all requests in the request list */ +void _osrf_app_session_free( osrf_app_session* ); + +/** adds a session to the global session cache */ +void _osrf_app_session_push_session( osrf_app_session* ); + +/** removes from global session cache */ +void _osrf_app_session_remove_session( char* session_id ); + +/** Adds an app_request to the request set */ +void _osrf_app_session_push_request( osrf_app_session*, osrf_app_request* req ); + +/** Removes an app_request from this session request set, freeing the request object */ +void _osrf_app_session_remove_request( osrf_app_session*, osrf_app_request* req ); + +/** Send the given message */ +int _osrf_app_session_send( osrf_app_session*, osrf_message* msg ); + +#endif diff --git a/include/opensrf/osrf_message.h b/include/opensrf/osrf_message.h new file mode 100644 index 0000000..9533cfa --- /dev/null +++ b/include/opensrf/osrf_message.h @@ -0,0 +1,75 @@ +#include "libjson/json.h" +#include "opensrf/generic_utils.h" + +#ifndef osrf_message_h +#define osrf_message_h + +#define OSRF_XML_NAMESPACE "http://open-ils.org/xml/namespaces/oils_v1" + +#define OSRF_STATUS_CONTINUE 100 + +#define OSRF_STATUS_OK 200 +#define OSRF_STATUS_ACCEPTED 202 +#define OSRF_STATUS_COMPLETE 205 + +#define OSRF_STATUS_REDIRECTED 307 + +#define OSRF_STATUS_BADREQUEST 400 +#define OSRF_STATUS_UNAUTHORIZED 401 +#define OSRF_STATUS_FORBIDDEN 403 +#define OSRF_STATUS_NOTFOUND 404 +#define OSRF_STATUS_NOTALLOWED 405 +#define OSRF_STATUS_TIMEOUT 408 +#define OSRF_STATUS_EXPFAILED 417 + +#define OSRF_STATUS_INTERNALSERVERERROR 500 +#define OSRF_STATUS_NOTIMPLEMENTED 501 +#define OSRF_STATUS_VERSIONNOTSUPPORTED 505 + + +enum M_TYPE { CONNECT, REQUEST, RESULT, STATUS, DISCONNECT }; + +struct osrf_message_struct { + + enum M_TYPE m_type; + int thread_trace; + int protocol; + + /* if we're a STATUS message */ + char* status_name; + + /* if we're a STATUS or RESULT */ + char* status_text; + int status_code; + + /* if we're a RESULT */ + json* result_content; + + /* if we're a REQUEST */ + char* method_name; + json* params; + + /* in case anyone wants to make a list of us. + we won't touch this variable */ + struct osrf_message_struct* next; + +}; +typedef struct osrf_message_struct osrf_message; + + +osrf_message* osrf_message_init( enum M_TYPE type, int thread_trace, int protocol ); +void osrf_message_set_request_info( osrf_message*, char* param_name, json* params ); +void osrf_message_set_status_info( osrf_message*, char* status_name, char* status_text, int status_code ); +void osrf_message_set_result_content( osrf_message*, json* result_content ); +void osrf_message_free( osrf_message* ); +char* osrf_message_to_xml( osrf_message* ); +/** Pushes any message retreived from the xml into the 'msgs' array. + * it is assumed that 'msgs' has beenn pre-allocated. + * Returns the number of message that are in the buffer. + */ +int osrf_message_from_xml( char* xml, osrf_message* msgs[] ); + + + + +#endif diff --git a/include/opensrf/osrf_stack.h b/include/opensrf/osrf_stack.h new file mode 100644 index 0000000..23b358e --- /dev/null +++ b/include/opensrf/osrf_stack.h @@ -0,0 +1,17 @@ +#include "opensrf/transport_client.h" +#include "osrf_message.h" +#include "osrf_app_session.h" + +#ifndef OSRF_STACK_H +#define OSRF_STACK_H + +/* the max number of oilsMessage blobs present in any one root packet */ +#define OSRF_MAX_MSGS_PER_PACKET 16 +// ----------------------------------------------------------------------------- + +int osrf_stack_transport_handler( transport_message* msg ); +int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ); +int osrf_stack_application_handler( osrf_app_session* session, osrf_message*msg ); + + +#endif diff --git a/include/opensrf/osrf_system.h b/include/opensrf/osrf_system.h new file mode 100644 index 0000000..7023395 --- /dev/null +++ b/include/opensrf/osrf_system.h @@ -0,0 +1,24 @@ +#include "opensrf/transport_client.h" + +#ifndef OSRF_SYSTEM_H +#define OSRF_SYSTEM_H + +/** Connects to jabber. Returns 1 on success, 0 on failure */ +int osrf_system_bootstrap_client(); + +/** Useful for managing multiple connections. Any clients added should + * live through the duration of the process so there are no cleanup procedures + * as of yet + */ +struct transport_client_cache_struct { + transport_client* client; + char* service; + struct transport_client_cache_struct* next; +}; +typedef struct transport_client_cache_struct transport_client_cache; + +void osrf_system_push_transport_client( transport_client* client, char* service ); +transport_client* osrf_system_get_transport_client( char* service ); + + +#endif diff --git a/src/Makefile b/src/Makefile new file mode 100644 index 0000000..ba444d2 --- /dev/null +++ b/src/Makefile @@ -0,0 +1,24 @@ +# TOP level 'src' makefile for OpenSRF + + +all: router + + +transport: + make -C libtransport + +json: + make -C libjson + +stack: json transport + make -C libstack + +router: stack + make -C router + + +clean: + make -C libtransport clean + make -C libjson clean + make -C libstack clean + make -C router clean diff --git a/src/libstack/Makefile b/src/libstack/Makefile new file mode 100644 index 0000000..ff6e99c --- /dev/null +++ b/src/libstack/Makefile @@ -0,0 +1,29 @@ +LIB_DIR = ../../lib +CC_OPTS = -Wall -O2 -I /usr/include/libxml2 -I /usr/include/libxml2/libxml -I ../../include +LIB_SOURCES = osrf_message.c osrf_app_session.c osrf_stack.c osrf_system.c +LIB_TARGETS = osrf_message.o osrf_app_session.o osrf_stack.o osrf_system.o +EXE_LD_OPTS = -L $(LIB_DIR) -lxml2 -lopensrf_transport -lopensrf_stack -ljson +CC = gcc + +all: client sys_client + +sys_client: sys_client.c lib + $(CC) $(CC_OPTS) $(EXE_LD_OPTS) *.o sys_client.c -o sys_client + +client: client.c lib + $(CC) $(CC_OPTS) $(EXE_LD_OPTS) *.o client.c -o client + +# ---------------------------------------------------------------- + +lib: libjson libopensrf_transport + $(CC) -c $(CC_OPTS) $(LIB_SOURCES) + $(CC) -shared -W1 $(LIB_TARGETS) -o $(LIB_DIR)/libopensrf_stack.so + +libjson: + make -C ../libjson + +libopensrf_transport: + make -C ../libtransport + +clean: + /bin/rm -f *.o sys_client client diff --git a/src/libstack/osrf_app_session.c b/src/libstack/osrf_app_session.c new file mode 100644 index 0000000..956aac7 --- /dev/null +++ b/src/libstack/osrf_app_session.c @@ -0,0 +1,548 @@ +#include "opensrf/osrf_app_session.h" +#include + +/* the global app_session cache */ +osrf_app_session* app_session_cache; + + + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- +// Request API +// -------------------------------------------------------------------------- + +/** Allocations and initializes a new app_request object */ +osrf_app_request* _osrf_app_request_init( + osrf_app_session* session, osrf_message* msg ) { + + osrf_app_request* req = + (osrf_app_request*) safe_malloc(sizeof(osrf_app_request)); + + req->session = session; + req->request_id = msg->thread_trace; + req->complete = 0; + req->payload = msg; + req->result = NULL; + + return req; + +} + +/** Frees memory used by an app_request object */ +void _osrf_app_request_free( osrf_app_request * req ){ + if( req == NULL ) return; + osrf_message* cur_msg = req->result; + while( cur_msg != NULL ) { + osrf_message* next_msg = cur_msg->next; + osrf_message_free( cur_msg ); + cur_msg = next_msg; + } + osrf_message_free( req->payload ); + free( req ); +} + +/** Pushes the given message onto the list of 'responses' to this request */ +void _osrf_app_request_push_queue( osrf_app_request* req, osrf_message* result ){ + if(req == NULL || result == NULL) return; + debug_handler( "App Session pushing [%d] onto request queue", result->thread_trace ); + if(req->result == NULL) + req->result = result; + else { + result->next = req->result; + req->result = result; + } +} + +/** Removes this app_request from our session request set */ +void osrf_app_session_request_finish( + osrf_app_session* session, int req_id ){ + + if(session == NULL) return; + osrf_app_request* req = _osrf_app_session_get_request( session, req_id ); + if(req == NULL) return; + _osrf_app_session_remove_request( req->session, req ); + _osrf_app_request_free( req ); +} + +/** Checks the receive queue for messages. If any are found, the first + * is popped off and returned. Otherwise, this method will wait at most timeout + * seconds for a message to appear in the receive queue. Once it arrives it is returned. + * If no messages arrive in the timeout provided, null is returned. + */ +osrf_message* _osrf_app_request_recv( osrf_app_request* req, int timeout ) { + + if(req == NULL) return NULL; + + if( req->result != NULL ) { + debug_handler("app_request receive already has a message, returning it"); + /* pop off the first message in the list */ + osrf_message* tmp_msg = req->result->next; + req->result = tmp_msg; + return req->result; + } + + time_t start = time(NULL); + time_t remaining = (time_t) timeout; + + while( remaining >= 0 ) { + /* tell the session to wait for stuff */ + debug_handler( "In app_request receive with remaining time [%d]", (int) remaining ); + osrf_app_session_queue_wait( req->session, (int) remaining ); + + if( req->result != NULL ) { /* if we received anything */ + /* pop off the first message in the list */ + debug_handler( "app_request_recv received a message, returning it"); + osrf_message* ret_msg = req->result; + osrf_message* tmp_msg = ret_msg->next; + req->result = tmp_msg; + return ret_msg; + } + if( req->complete ) + return NULL; + + remaining -= (int) (time(NULL) - start); + } + + return NULL; +} + +/** Resend this requests original request message */ +int _osrf_app_request_resend( osrf_app_request* req ) { + if(req == NULL) return 0; + debug_handler( "Resending request [%d]", req->request_id ); + return _osrf_app_session_send( req->session, req->payload ); +} + + + +// -------------------------------------------------------------------------- +// -------------------------------------------------------------------------- +// Session API +// -------------------------------------------------------------------------- + +/** returns a session from the global session hash */ +osrf_app_session* osrf_app_session_find_session( char* session_id ) { + osrf_app_session* ptr = app_session_cache; + debug_handler("Searching for session in global cache with id [%s]", session_id ); + while( ptr != NULL ) { + if( !strcmp(ptr->session_id,session_id) ) + return ptr; + ptr = ptr->next; + } + return NULL; +} + + +/** adds a session to the global session cache */ +void _osrf_app_session_push_session( osrf_app_session* session ) { + + if( app_session_cache == NULL ) { + app_session_cache = session; + return; + } + + osrf_app_session* ptr = app_session_cache; + debug_handler( "Pushing [%s] onto global session cache", session->session_id ); + while( ptr != NULL ) { + if( !strcmp(ptr->session_id, session->session_id) ) + return; + if( ptr->next == NULL ) { + ptr->next = session; + return; + } + ptr = ptr->next; + } +} + + +/** unlinks from global session cache */ +void _osrf_app_session_remove_session( char* session_id ) { + + if( app_session_cache == NULL ) + return; + + debug_handler( "App Session removing session [%s] from global cache", session_id ); + if( !strcmp(app_session_cache->session_id, session_id) ) { + if( app_session_cache->next != NULL ) { + osrf_app_session* next = app_session_cache->next; + app_session_cache = next; + return; + } else { + app_session_cache = NULL; + return; + } + } + + if( app_session_cache->next == NULL ) + return; + + osrf_app_session* prev = app_session_cache; + osrf_app_session* ptr = prev->next; + while( ptr != NULL ) { + if( ptr->session_id == session_id ) { + osrf_app_session* tmp = ptr->next; + prev->next = tmp; + return; + } + ptr = ptr->next; + } +} + +/** Allocates a initializes a new app_session */ + +osrf_app_session* osrf_app_client_session_init( char* remote_service ) { + osrf_app_session* session = safe_malloc(sizeof(osrf_app_session)); + + session->transport_handle = osrf_system_get_transport_client( "client" ); + if( session->transport_handle == NULL ) { + warning_handler("No transport client for service 'client'"); + return NULL; + } + session->request_queue = NULL; + session->remote_id = strdup("router@judy/math"); /*XXX config value */ + session->orig_remote_id = strdup(session->remote_id); + + /* build a chunky, random session id */ + char id[256]; + memset(id,0,256); + srand((int)time(NULL)); + sprintf(id, "%d.%d%d", rand(), (int)time(NULL), getpid()); + session->session_id = strdup(id); + debug_handler( "Building a new client session with id [%s]", session->session_id ); + + session->thread_trace = 0; + session->state = OSRF_SESSION_DISCONNECTED; + session->type = OSRF_SESSION_CLIENT; + session->next = NULL; + _osrf_app_session_push_session( session ); + return session; +} + +osrf_app_session* osrf_app_server_session_init( + char* session_id, char* our_app, char* remote_service, char* remote_id ) { + + osrf_app_session* session = osrf_app_session_find_session( session_id ); + if(session) + return session; + + debug_handler( "Building a new server session with id [%s]", session_id ); + + session = safe_malloc(sizeof(osrf_app_session)); + + session->transport_handle = osrf_system_get_transport_client( our_app ); + if( session->transport_handle == NULL ) { + warning_handler("No transport client for service '%s'", our_app ); + return NULL; + } + session->request_queue = NULL; + session->remote_id = strdup(remote_id); + session->orig_remote_id = strdup(remote_id); + session->session_id = strdup(session_id); + + session->thread_trace = 0; + session->state = OSRF_SESSION_DISCONNECTED; + session->type = OSRF_SESSION_CLIENT; + session->next = NULL; + + _osrf_app_session_push_session( session ); + return session; + +} + + + +/** frees memory held by a session */ +void _osrf_app_session_free( osrf_app_session* session ){ + if(session==NULL) + return; + + free(session->remote_id); + free(session->orig_remote_id); + free(session); +} + + +int osrf_app_session_make_request( + osrf_app_session* session, json* params, char* method_name, int protocol ) { + if(session == NULL) return -1; + + osrf_message* req_msg = osrf_message_init( REQUEST, ++(session->thread_trace), protocol ); + osrf_message_set_request_info( req_msg, method_name, params ); + osrf_app_request* req = _osrf_app_request_init( session, req_msg ); + if(!_osrf_app_session_send( session, req_msg ) ) { + warning_handler( "Error sending request message [%d]", session->thread_trace ); + return -1; + } + + _osrf_app_session_push_request( session, req ); + return req->request_id; +} + +/** Adds an app_request to the request set */ +void _osrf_app_session_push_request( osrf_app_session* session, osrf_app_request* req ){ + if(session == NULL || req == NULL) + return; + + debug_handler( "Pushing [%d] onto requeust queue for session [%s]", + req->request_id, session->session_id ); + + if(session->request_queue == NULL) + session->request_queue = req; + else { + osrf_app_request* req2 = session->request_queue->next; + session->request_queue = req; + req->next = req2; + } +} + + + +/** Removes an app_request from this session request set */ +void _osrf_app_session_remove_request( osrf_app_session* session, osrf_app_request* req ){ + if(session == NULL || req == NULL) + return; + + if(session->request_queue == NULL) + return; + + debug_handler("Removing request [%d] from session [%s]", + req->request_id, session->session_id ); + osrf_app_request* first = session->request_queue; + if(first->request_id == req->request_id) { + if(first->next == NULL) { /* only one in the list */ + session->request_queue = NULL; + } else { + osrf_app_request* tmp = first->next; + session->request_queue = tmp; + } + } + + osrf_app_request* lead = first->next; + + while( lead != NULL ) { + if(lead->request_id == req->request_id) { + osrf_app_request* tmp = lead->next; + first->next = tmp; + return; + } + first = lead; + lead = lead->next; + } +} + + +void osrf_app_session_set_complete( osrf_app_session* session, int request_id ) { + if(session == NULL) + return; + + osrf_app_request* req = _osrf_app_session_get_request( session, request_id ); + if(req) req->complete = 1; +} + +int osrf_app_session_request_complete( osrf_app_session* session, int request_id ) { + if(session == NULL) + return 0; + osrf_app_request* req = _osrf_app_session_get_request( session, request_id ); + return req->complete; +} + +/** Returns the app_request with the given request_id (request_id) */ +osrf_app_request* _osrf_app_session_get_request( + osrf_app_session* session, int request_id ){ + if(session == NULL) + return NULL; + + debug_handler( "App Session searching for request [%d] in request queue",request_id ); + osrf_app_request* req = session->request_queue; + while( req != NULL ) { + if(req->request_id == request_id) + return req; + req = req->next; + } + return NULL; +} + + +/** Resets the remote connection id to that of the original*/ +void osrf_app_session_reset_remote( osrf_app_session* session ){ + if( session==NULL ) + return; + + free(session->remote_id); + debug_handler( "App Session [%s] resetting remote it to %s", + session->session_id, session->orig_remote_id ); + + session->remote_id = strdup(session->orig_remote_id); +} + +void osrf_app_session_set_remote( osrf_app_session* session, char* remote_id ) { + if(session == NULL) + return; + if( session->remote_id ) + free(session->remote_id ); + session->remote_id = strdup( remote_id ); +} + +/** pushes the given message into the result list of the app_request + with the given request_id */ +int osrf_app_session_push_queue( + osrf_app_session* session, osrf_message* msg ){ + + if(session == NULL || msg == NULL) + return 0; + + debug_handler( "AppSession pushing result for [%d] onto request payload queue", + msg->thread_trace ); + + osrf_app_request* req = session->request_queue; + + if(req == NULL) { + warning_handler( "app_session has no app_requests in its queue yet we have a result for [%d]", msg->thread_trace ); + return 0; + } + + debug_handler( "The first request in the request queue has tt [%d]", req->request_id ); + + while( req != NULL ) { + if(req->request_id == msg->thread_trace) { + debug_handler( "Found app_request for tt [%d]", msg->thread_trace ); + _osrf_app_request_push_queue( req, msg ); + return 1; + } + req = req->next; + } + + return 0; + +} + +/** Attempts to connect to the remote service */ +int osrf_app_session_connect(osrf_app_session* session){ + + if(session == NULL) + return 0; + + int timeout = 5; /* XXX CONFIG VALUE */ + + debug_handler( "AppSession connecting to %s", session->remote_id ); + + /* defaulting to protocol 1 for now */ + osrf_message* con_msg = osrf_message_init( CONNECT, session->thread_trace, 1 ); + osrf_app_session_reset_remote( session ); + session->state = OSRF_SESSION_CONNECTING; + if(!_osrf_app_session_send( session, con_msg ) ) + return 0; + + time_t start = time(NULL); + time_t remaining = (time_t) timeout; + + while( session->state != OSRF_SESSION_CONNECTED && remaining >= 0 ) { + osrf_app_session_queue_wait( session, remaining ); + remaining -= (int) (time(NULL) - start); + } + + if(session->state != OSRF_SESSION_CONNECTED) + return 0; + + return 1; +} + + + +/** Disconnects from the remote service */ +int osrf_app_session_disconnect( osrf_app_session* session){ + if(session == NULL) + return 1; + + if(session->state == OSRF_SESSION_DISCONNECTED) + return 1; + debug_handler( "AppSession disconnecting from %s", session->remote_id ); + + osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 ); + session->state = OSRF_SESSION_DISCONNECTED; + _osrf_app_session_send( session, dis_msg ); + + osrf_message_free( dis_msg ); + osrf_app_session_reset_remote( session ); + return 1; +} + +int osrf_app_session_request_resend( osrf_app_session* session, int req_id ) { + osrf_app_request* req = _osrf_app_session_get_request( session, req_id ); + return _osrf_app_request_resend( req ); +} + +/** Send the given message */ +int _osrf_app_session_send( osrf_app_session* session, osrf_message* msg ){ + if(session == NULL) return 0; + int ret_val= 0; + + osrf_app_session_queue_wait( session, 0 ); + debug_handler( "AppSession sending type %d, and thread_trace %d", + msg->m_type, msg->thread_trace ); + + if( (msg->m_type != CONNECT) && (msg->m_type != DISCONNECT) && + (session->state != OSRF_SESSION_CONNECTED) ) { + if(!osrf_app_session_connect( session )) + return 0; + } + + char* xml = osrf_message_to_xml(msg); + + transport_message* t_msg = message_init( + xml, "", session->session_id, session->remote_id, NULL ); + + debug_handler("Sending XML:\n%s", xml ); + ret_val = client_send_message( session->transport_handle, t_msg ); + message_free( t_msg ); + + return ret_val; +} + +/** Waits up to 'timeout' seconds for some data to arrive. + * Any data that arrives will be processed according to its + * payload and message type. This method will return after + * any data has arrived. + */ +int osrf_app_session_queue_wait( osrf_app_session* session, int timeout ){ + if(session == NULL) return 0; + int ret_val = 0; + debug_handler( "AppSession in queue_wait with timeout %d", timeout ); + ret_val = osrf_stack_process(session->transport_handle, timeout ); + return ret_val; +} + +/** Disconnects (if client) and removes the given session from the global session cache + * ! This free's all attached app_requests ! + */ +void osrf_app_session_destroy ( osrf_app_session* session ){ + if(session == NULL) return; + + debug_handler( "AppSession [%s] destroying self and deleting requests", session->session_id ); + if(session->type == OSRF_SESSION_CLIENT) { /* disconnect if we're a client */ + osrf_message* dis_msg = osrf_message_init( DISCONNECT, session->thread_trace, 1 ); + _osrf_app_session_send( session, dis_msg ); + //osrf_app_session_reset_remote( session ); + } + //session->state = OSRF_SESSION_DISCONNECTED; + _osrf_app_session_remove_session(session->session_id); + debug_handler("AppSession [%s] removed from cache", session->session_id ); + + osrf_app_request* req; + while( session->request_queue != NULL ) { + req = session->request_queue->next; + _osrf_app_request_free( session->request_queue ); + session->request_queue = req; + } + + _osrf_app_session_free( session ); +} + +osrf_message* osrf_app_session_request_recv( + osrf_app_session* session, int req_id, int timeout ) { + if(req_id < 0 || session == NULL) + return NULL; + osrf_app_request* req = _osrf_app_session_get_request( session, req_id ); + return _osrf_app_request_recv( req, timeout ); +} + diff --git a/src/libstack/osrf_message.c b/src/libstack/osrf_message.c new file mode 100644 index 0000000..eb51063 --- /dev/null +++ b/src/libstack/osrf_message.c @@ -0,0 +1,513 @@ +#include "opensrf/osrf_message.h" + + + +/* +int main() { + + char* x = ""; + */ + + /* + char* x = "" + "" + "" + "" + "" + "" + "" + "" + + "" + "" + "" + "" + "" + "" + "" + + ""; + */ + +/* + osrf_message* arr[4]; + memset(arr, 0, 4); + int ret = osrf_message_from_xml( x, arr ); + fprintf(stderr, "RET: %d\n", ret ); + if(ret<=0) + fatal_handler( "none parsed" ); + + osrf_message* xml_msg = arr[0]; + printf("Message name: %s\nstatus %s, \nstatusCode %d\n", xml_msg->status_name, xml_msg->status_text, xml_msg->status_code ); + +// xml_msg = arr[1]; +// printf("Message 2 status %s, statusCode %d\n", xml_msg->status_text, xml_msg->status_code ); + + + return 0; +} +*/ + + + + /* + osrf_message* msg = osrf_message_init( STATUS, 1, 1 ); +// osrf_message* msg = osrf_message_init( CONNECT, 1, 1 ); + //osrf_message* msg = osrf_message_init( REQUEST, 1, 1 ); + osrf_message_set_status_info( msg, "oilsConnectStatus", "Connection Succsesful", 200 ); + + json* params = json_object_new_array(); + json_object_array_add(params, json_object_new_int(1)); + json_object_array_add(params, json_object_new_int(2)); + + osrf_message_set_request_info( msg, "add", params ); + //osrf_message_set_result_content( msg, params ); + json_object_put( params ); + + char* xml = osrf_message_to_xml( msg ); + printf( "\n\nMessage as XML\n%s", xml ); + + osrf_message* xml_msg = osrf_message_from_xml( xml ); + + printf( "Message stuff \n\ntype %d" + "\nthread_trace %d \nprotocol %d " + "\nstatus_name %s" + "\nstatus_text %s\nstatus_code %d" + "\nresult_content %s \nparams %s" + "\n", xml_msg->m_type, + xml_msg->thread_trace, xml_msg->protocol, xml_msg->status_name, + xml_msg->status_text, xml_msg->status_code, + json_object_to_json_string( xml_msg->result_content), + json_object_to_json_string(xml_msg->params) + ); + + + free(xml); + osrf_message_free( msg ); + osrf_message_free( xml_msg ); + return 0; + +} +*/ + + +osrf_message* osrf_message_init( enum M_TYPE type, int thread_trace, int protocol ) { + + osrf_message* msg = safe_malloc(sizeof(osrf_message)); + msg->m_type = type; + msg->thread_trace = thread_trace; + msg->protocol = protocol; + msg->next = NULL; + + return msg; +} + + +void osrf_message_set_request_info( osrf_message* msg, char* method_name, json* json_params ) { + if( msg == NULL || method_name == NULL || json_params == NULL ) + fatal_handler( "Bad params to osrf_message_set_request_params()" ); + + msg->params = json_tokener_parse(json_object_to_json_string(json_params)); + msg->method_name = strdup( method_name ); +} + + + +void osrf_message_set_status_info( + osrf_message* msg, char* status_name, char* status_text, int status_code ) { + + if( msg == NULL ) + fatal_handler( "Bad params to osrf_message_set_status_info()" ); + + if( msg->m_type == STATUS ) + if( status_name != NULL ) + msg->status_name = strdup( status_name ); + + if( status_text != NULL ) + msg->status_text = strdup( status_text ); + + msg->status_code = status_code; +} + + +void osrf_message_set_result_content( osrf_message* msg, json* result_content ) { + if( msg == NULL ) + fatal_handler( "Bad params to osrf_message_set_result_content()" ); + msg->result_content = json_tokener_parse(json_object_to_json_string(result_content)); +} + + + +void osrf_message_free( osrf_message* msg ) { + if( msg == NULL ) + warning_handler( "Trying to delete NULL osrf_message" ); + + if( msg->status_name != NULL ) + free(msg->status_name); + + if( msg->status_text != NULL ) + free(msg->status_text); + + if( msg->result_content != NULL ) + json_object_put( msg->result_content ); + + if( msg->method_name != NULL ) + free(msg->method_name); + + if( msg->params != NULL ) + json_object_put( msg->params ); + + free(msg); +} + + + +/* here's where things get interesting */ +char* osrf_message_to_xml( osrf_message* msg ) { + + if( msg == NULL ) + return NULL; + + int bufsize; + xmlChar* xmlbuf; + char* encoded_msg; + + xmlKeepBlanksDefault(0); + + xmlNodePtr message_node; + xmlNodePtr type_node; + xmlNodePtr thread_trace_node; + xmlNodePtr protocol_node; + xmlNodePtr status_node; + xmlNodePtr status_text_node; + xmlNodePtr status_code_node; + xmlNodePtr method_node; + xmlNodePtr method_name_node; + xmlNodePtr params_node; + xmlNodePtr result_node; + xmlNodePtr content_node; + + + xmlDocPtr doc = xmlReadDoc( + BAD_CAST "" + "", + NULL, NULL, XML_PARSE_NSCLEAN ); + + message_node = xmlDocGetRootElement(doc)->children; /* since it's the only child */ + type_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + thread_trace_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + protocol_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + + char tt[64]; + memset(tt,0,64); + sprintf(tt,"%d",msg->thread_trace); + xmlSetProp( thread_trace_node, BAD_CAST "name", BAD_CAST "threadTrace" ); + xmlSetProp( thread_trace_node, BAD_CAST "value", BAD_CAST tt ); + + char prot[64]; + memset(prot,0,64); + sprintf(prot,"%d",msg->protocol); + xmlSetProp( protocol_node, BAD_CAST "name", BAD_CAST "protocol" ); + xmlSetProp( protocol_node, BAD_CAST "value", BAD_CAST prot ); + + switch(msg->m_type) { + + case CONNECT: + xmlSetProp( type_node, BAD_CAST "name", BAD_CAST "type" ); + xmlSetProp( type_node, BAD_CAST "value", BAD_CAST "CONNECT" ); + break; + + case DISCONNECT: + xmlSetProp( type_node, BAD_CAST "type", "DISCONNECT" ); + break; + + case STATUS: + + xmlSetProp( type_node, BAD_CAST "name", BAD_CAST "type" ); + xmlSetProp( type_node, BAD_CAST "value", BAD_CAST "STATUS" ); + status_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObject", NULL ); + xmlSetProp( status_node, BAD_CAST "name", BAD_CAST msg->status_name ); + + status_text_node = xmlNewChild( status_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + xmlSetProp( status_text_node, BAD_CAST "name", BAD_CAST "status" ); + xmlSetProp( status_text_node, BAD_CAST "value", BAD_CAST msg->status_text); + + status_code_node = xmlNewChild( status_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + xmlSetProp( status_code_node, BAD_CAST "name", BAD_CAST "statusCode" ); + + char sc[64]; + memset(sc,0,64); + sprintf(sc,"%d",msg->status_code); + xmlSetProp( status_code_node, BAD_CAST "value", BAD_CAST sc); + + break; + + case REQUEST: + + xmlSetProp( type_node, BAD_CAST "name", "type" ); + xmlSetProp( type_node, BAD_CAST "value", "REQUEST" ); + method_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObject", NULL ); + xmlSetProp( method_node, BAD_CAST "name", BAD_CAST "oilsMethod" ); + + if( msg->method_name != NULL ) { + + method_name_node = xmlNewChild( method_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + xmlSetProp( method_name_node, BAD_CAST "name", BAD_CAST "method" ); + xmlSetProp( method_name_node, BAD_CAST "value", BAD_CAST msg->method_name ); + + if( msg->params != NULL ) { + params_node = xmlNewChild( method_node, NULL, + BAD_CAST "params", BAD_CAST json_object_to_json_string( msg->params ) ); + } + } + + break; + + case RESULT: + + xmlSetProp( type_node, BAD_CAST "name", BAD_CAST "type" ); + xmlSetProp( type_node, BAD_CAST "value", BAD_CAST "RESULT" ); + result_node = xmlNewChild( message_node, NULL, BAD_CAST "domainObject", NULL ); + xmlSetProp( result_node, BAD_CAST "name", BAD_CAST "oilsResult" ); + + status_text_node = xmlNewChild( result_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + xmlSetProp( status_text_node, BAD_CAST "name", BAD_CAST "status" ); + xmlSetProp( status_text_node, BAD_CAST "value", BAD_CAST msg->status_text); + + status_code_node = xmlNewChild( result_node, NULL, BAD_CAST "domainObjectAttr", NULL ); + xmlSetProp( status_code_node, BAD_CAST "name", BAD_CAST "statusCode" ); + + char stc[64]; + memset(stc,0,64); + sprintf(stc,"%d",msg->status_code); + xmlSetProp( status_code_node, BAD_CAST "value", BAD_CAST stc); + + content_node = xmlNewChild( result_node, NULL, + BAD_CAST "domainObject", BAD_CAST json_object_to_json_string( msg->result_content ) ); + xmlSetProp( content_node, BAD_CAST "name", BAD_CAST "oilsScalar" ); + + break; + + default: + warning_handler( "Recieved bogus message type" ); + return NULL; + } + + + // ----------------------------------------------------- + // Dump the XML doc to a string and remove the + // xml declaration + // ----------------------------------------------------- + + /* passing in a '1' means we want to retain the formatting */ + xmlDocDumpFormatMemory( doc, &xmlbuf, &bufsize, 0 ); + encoded_msg = strdup( (char*) xmlbuf ); + + if( encoded_msg == NULL ) + fatal_handler("message_to_xml(): Out of Memory"); + + xmlFree(xmlbuf); + xmlFreeDoc( doc ); + xmlCleanupParser(); + + + /*** remove the XML declaration */ + int len = strlen(encoded_msg); + char tmp[len]; + memset( tmp, 0, len ); + int i; + int found_at = 0; + + /* when we reach the first >, take everything after it */ + for( i = 0; i!= len; i++ ) { + if( encoded_msg[i] == 62) { /* ascii > */ + + /* found_at holds the starting index of the rest of the doc*/ + found_at = i + 1; + break; + } + } + + if( found_at ) { + /* move the shortened doc into the tmp buffer */ + strncpy( tmp, encoded_msg + found_at, len - found_at ); + /* move the tmp buffer back into the allocated space */ + memset( encoded_msg, 0, len ); + strcpy( encoded_msg, tmp ); + } + + return encoded_msg; + +} + + +int osrf_message_from_xml( char* xml, osrf_message* msgs[] ) { + + if(!xml) return 0; + + xmlKeepBlanksDefault(0); + + xmlNodePtr message_node; + xmlDocPtr doc = xmlReadDoc( + BAD_CAST xml, NULL, NULL, XML_PARSE_NSCLEAN ); + + xmlNodePtr root =xmlDocGetRootElement(doc); + if(!root) { + warning_handler( "Attempt to build message from incomplete xml %s", xml ); + return 0; + } + + int msg_index = 0; + message_node = root->children; /* since it's the only child */ + + if(!message_node) { + warning_handler( "Attempt to build message from incomplete xml %s", xml ); + return 0; + } + + while( message_node != NULL ) { + + xmlNodePtr cur_node = message_node->children; + osrf_message* new_msg = safe_malloc(sizeof(osrf_message)); + + + while( cur_node ) { + + xmlChar* name = NULL; + xmlChar* value = NULL; + + /* we're a domainObjectAttr */ + if( !strcmp((char*)cur_node->name,"domainObjectAttr" )) { + name = xmlGetProp( cur_node, BAD_CAST "name"); + + if(name) { + + value = xmlGetProp( cur_node, BAD_CAST "value" ); + if(value) { + + if( (!strcmp((char*)name, "type")) ) { /* what type are we? */ + + if(!strcmp((char*)value, "CONNECT")) + new_msg->m_type = CONNECT; + + if(!strcmp((char*)value, "DISCONNECT")) + new_msg->m_type = DISCONNECT; + + if(!strcmp((char*)value, "STATUS")) + new_msg->m_type = STATUS; + + if(!strcmp((char*)value, "REQUEST")) + new_msg->m_type = REQUEST; + + if(!strcmp((char*)value, "RESULT")) + new_msg->m_type = RESULT; + + } else if((!strcmp((char*)name, "threadTrace"))) { + new_msg->thread_trace = atoi( (char*) value ); + + } else if((!strcmp((char*)name, "protocol"))) { + new_msg->protocol = atoi( (char*) value ); + } + + xmlFree(value); + } + xmlFree(name); + } + } + + /* we're a domainObject */ + if( !strcmp((char*)cur_node->name,"domainObject" )) { + + name = xmlGetProp( cur_node, BAD_CAST "name"); + + if(name) { + + if( !strcmp(name,"oilsMethod") ) { + + xmlNodePtr meth_node = cur_node->children; + + while( meth_node != NULL ) { + + if( !strcmp((char*)meth_node->name,"domainObjectAttr" )) { + char* meth_name = xmlGetProp( meth_node, BAD_CAST "value" ); + if(meth_name) { + new_msg->method_name = strdup(meth_name); + xmlFree(meth_name); + } + } + + if( !strcmp((char*)meth_node->name,"params" ) && meth_node->children->content ) + new_msg->params = json_object_new_string( meth_node->children->content ); + //new_msg->params = json_tokener_parse(ng(json_params)); + + meth_node = meth_node->next; + } + } //oilsMethod + + if( !strcmp(name,"oilsResult") || new_msg->m_type == STATUS ) { + + xmlNodePtr result_nodes = cur_node->children; + + while( result_nodes ) { + + if(!strcmp(result_nodes->name,"domainObjectAttr")) { + + xmlChar* result_attr_name = xmlGetProp( result_nodes, BAD_CAST "name"); + if(result_attr_name) { + xmlChar* result_attr_value = xmlGetProp( result_nodes, BAD_CAST "value" ); + + if( result_attr_value ) { + if((!strcmp((char*)result_attr_name, "status"))) + new_msg->status_text = strdup((char*) result_attr_value ); + + else if((!strcmp((char*)result_attr_name, "statusCode"))) + new_msg->status_code = atoi((char*) result_attr_value ); + xmlFree(result_attr_value); + } + + xmlFree(result_attr_name); + } + + } + + + if(!strcmp(result_nodes->name,"domainObject")) { + xmlChar* r_name = xmlGetProp( result_nodes, BAD_CAST "name" ); + if(r_name) { + if( !strcmp((char*)r_name,"oilsScalar") && result_nodes->children->content ) + new_msg->result_content = json_object_new_string( result_nodes->children->content ); + xmlFree(r_name); + } + } + result_nodes = result_nodes->next; + } + } + + if( new_msg->m_type == STATUS ) { new_msg->status_name = strdup(name); } + xmlFree(name); + } + } + + /* we're a params node */ + if( !strcmp((char*)cur_node->name,"params" )) { + + } + + cur_node = cur_node->next; + } + + msgs[msg_index] = new_msg; + msg_index++; + message_node = message_node->next; + + } // while message_node != null + + xmlCleanupCharEncodingHandlers(); + xmlFreeDoc( doc ); + xmlCleanupParser(); + + return msg_index; + +} + + diff --git a/src/libstack/osrf_stack.c b/src/libstack/osrf_stack.c new file mode 100644 index 0000000..f5f5c77 --- /dev/null +++ b/src/libstack/osrf_stack.c @@ -0,0 +1,152 @@ +#include "opensrf/osrf_stack.h" + +osrf_message* _do_client( osrf_app_session*, osrf_message* ); +osrf_message* _do_server( osrf_app_session*, osrf_message* ); + +int osrf_stack_process( transport_client* client, int timeout ) { + transport_message* msg = client_recv( client, timeout ); + if(msg == NULL) return 0; + return osrf_stack_transport_handler( msg ); +} + + + +// ----------------------------------------------------------------------------- +// Entry point into the stack +// ----------------------------------------------------------------------------- +int osrf_stack_transport_handler( transport_message* msg ) { + + debug_handler( "Transport handler received new message \nfrom %s " + "to %s with body \n\n%s\n", msg->sender, msg->recipient, msg->body ); + + osrf_app_session* session = osrf_app_session_find_session( msg->thread ); + + if( session == NULL ) /* we must be a server, build a new session */ + fatal_handler( "Server sessions not implemented yet ..." ); + + osrf_app_session_set_remote( session, msg->sender ); + osrf_message* arr[OSRF_MAX_MSGS_PER_PACKET]; + memset(arr, 0, OSRF_MAX_MSGS_PER_PACKET ); + int num_msgs = osrf_message_from_xml( msg->body, arr ); + + /* XXX ERROR CHECKING, BAD XML, ETC... */ + int i; + for( i = 0; i != num_msgs; i++ ) { + osrf_stack_message_handler( session, arr[i] ); + } + + return 1; +} + +int osrf_stack_message_handler( osrf_app_session* session, osrf_message* msg ) { + if(session == NULL || msg == NULL) + return 0; + + osrf_message* ret_msg; + if( session->type == OSRF_SESSION_CLIENT ) + ret_msg = _do_client( session, msg ); + else + ret_msg= _do_server( session, msg ); + + if(ret_msg) + osrf_stack_application_handler( session, msg ); + + return 1; + +} + +/** If we return a message, that message should be passed up the stack, + * if we return NULL, we're finished for now... + */ +osrf_message* _do_client( osrf_app_session* session, osrf_message* msg ) { + if(session == NULL || msg == NULL) + return NULL; + + //osrf_message* new_msg; + + if( msg->m_type == STATUS ) { + + switch( msg->status_code ) { + + case OSRF_STATUS_OK: + session->state = OSRF_SESSION_CONNECTED; + return NULL; + + case OSRF_STATUS_COMPLETE: + osrf_app_session_set_complete( session, msg->thread_trace ); + return NULL; + + case OSRF_STATUS_REDIRECTED: + osrf_app_session_reset_remote( session ); + session->state = OSRF_SESSION_DISCONNECTED; + osrf_app_session_request_resend( session, msg->thread_trace ); + return NULL; + + case OSRF_STATUS_EXPFAILED: + osrf_app_session_reset_remote( session ); + session->state = OSRF_SESSION_DISCONNECTED; + osrf_app_session_request_resend( session, msg->thread_trace ); + return NULL; + + case OSRF_STATUS_INTERNALSERVERERROR: /*XXX we need to propogate these... */ + /* + new_msg = osrf_message_init( RESULT, msg->thread_trace, msg->protocol ); + osrf_message_set_status_info( new_msg, + msg->status_name, msg->status_text, msg->status_code ); + osrf_message_set_result_content( new_msg, json_object_new_string("HELP") ); + warning_handler( "Received an INTERNAL SERVER ERROR from the server for tt %d", + msg->thread_trace); + return new_msg; + */ + return NULL; + + case OSRF_STATUS_TIMEOUT: + osrf_app_session_reset_remote( session ); + session->state = OSRF_SESSION_DISCONNECTED; + osrf_app_session_request_resend( session, msg->thread_trace ); + return NULL; + + + default: + warning_handler("We don't know what to do with the provided message code: %d", msg->status_code ); + } + + return NULL; + + } else if( msg->m_type == RESULT ) + return msg; + + return NULL; + +} + + +/** If we return a message, that message should be passed up the stack, + * if we return NULL, we're finished for now... + */ +osrf_message* _do_server( osrf_app_session* session, osrf_message* msg ) { + if(session == NULL || msg == NULL) + return NULL; + + warning_handler( "We dont' do servers yet !!" ); + + return msg; +} + + + + +int osrf_stack_application_handler( osrf_app_session* session, osrf_message* msg ) { + if(session == NULL || msg == NULL) + return 0; + + if(msg->m_type == RESULT) { + osrf_app_session_push_queue( session, msg ); + return 1; + } + + warning_handler( "application_handler can't handle whatever you sent, type %d", msg->m_type); + + return 1; + +} diff --git a/src/libstack/osrf_system.c b/src/libstack/osrf_system.c new file mode 100644 index 0000000..ec310ee --- /dev/null +++ b/src/libstack/osrf_system.c @@ -0,0 +1,49 @@ +#include "opensrf/osrf_system.h" + + +int osrf_system_bootstrap_client() { + // XXX config values + transport_client* client = client_init( "judy", 5222, 0 ); + char buf[256]; + memset(buf,0,256); + char* host = getenv("HOSTNAME"); + sprintf(buf, "client_%s_%d", host, getpid() ); + if(client_connect( client, "system_client","jkjkasdf", buf, 10, AUTH_DIGEST )) { + /* push ourselves into the client cache */ + osrf_system_push_transport_client( client, "client" ); + return 1; + } + return 0; +} + +// ----------------------------------------------------------------------------- +// Some client caching utility methods +transport_client_cache* client_cache; + +void osrf_system_push_transport_client( transport_client* client, char* service ) { + if(client == NULL || service == NULL) return; + transport_client_cache* new = (transport_client_cache*) safe_malloc(sizeof(transport_client_cache)); + new->service = strdup(service); + new->client = client; + if(client_cache == NULL) + client_cache = new; + else { + transport_client_cache* tmp = client_cache->next; + client_cache = new; + new->next = tmp; + } +} + +transport_client* osrf_system_get_transport_client( char* service ) { + if(service == NULL) return NULL; + transport_client_cache* cur = client_cache; + while(cur != NULL) { + if( !strcmp(cur->service, service)) + return cur->client; + cur = cur->next; + } + return NULL; +} +// ----------------------------------------------------------------------------- + + diff --git a/src/router/Makefile b/src/router/Makefile index 09aa6e5..cabf1f5 100644 --- a/src/router/Makefile +++ b/src/router/Makefile @@ -3,11 +3,14 @@ CC = gcc CC_OPTS = -Wall -O2 -I /usr/include/libxml2 -I /usr/include/libxml2/libxml -I ../../include -LD_OPTS = -lxml2 +LD_OPTS = -L ../../lib -lxml2 -lopensrf_stack -ljson LP=../libtransport LIB_SOURCES = $(LP)/generic_utils.c $(LP)/transport_socket.c $(LP)/transport_session.c $(LP)/transport_message.c $(LP)/transport_client.c $(LP)/sha.c -all: router router_query +all: router router_query router_register + +router_register: router_register.c + $(CC) $(CC_OPTS) -L ../../lib -lopensrf_transport -lxml2 router_register.c -o router_register router_query: router_query.c $(CC) $(CC_OPTS) -L ../../lib -lopensrf_transport -lxml2 router_query.c -o router_query diff --git a/src/router/router.c b/src/router/router.c index cfb0dfa..aeca719 100644 --- a/src/router/router.c +++ b/src/router/router.c @@ -2,12 +2,12 @@ #include #include - +#define ROUTER_MAX_MSGS_PER_PACKET 12 char* router_resource; transport_router_registrar* routt; - void _build_trusted_sites( transport_router_registrar* router ); + void sig_hup_handler( int a ) { router_registrar_free( routt ); config_reader_free(); @@ -330,6 +330,10 @@ int router_registrar_handle_msg( transport_router_registrar* router_registrar, t if( router_registrar == NULL || msg == NULL ) { return 0; } + if( msg->router_command == NULL || !strcmp(msg->router_command,"") ) { + return router_registrar_handle_app_request( router_registrar, msg ); + } + // user issued a ruoter query /* a query command has router_command="query" and the actual query type is the content of the message */ @@ -834,6 +838,114 @@ int router_return_server_info( } +int router_registrar_handle_app_request( + transport_router_registrar* router, transport_message* msg ) { + + osrf_message* arr[ROUTER_MAX_MSGS_PER_PACKET]; + memset(arr, 0, ROUTER_MAX_MSGS_PER_PACKET ); + int num_msgs = osrf_message_from_xml( msg->body, arr ); + + int i; + for( i = 0; i != num_msgs; i++ ) { + + osrf_message* omsg = arr[i]; + osrf_message* success; + + char* newxml = osrf_message_to_xml(omsg); + debug_handler( "Received potential app request from client:\n%s\n", newxml ); + free(newxml); + + if(omsg->m_type == CONNECT) { + + success = osrf_message_init( + STATUS, omsg->thread_trace, omsg->protocol ); + + osrf_message_set_status_info( + success, "oilsConnectStatus", "Connection Successful", OSRF_STATUS_OK ); + + } else if( omsg->m_type == REQUEST ) { + + success = router_registrar_process_app_request( router, omsg ); + + } else if(omsg->m_type == DISCONNECT) { + success = NULL; + + } else { + + success = osrf_message_init( + STATUS, omsg->thread_trace, omsg->protocol ); + osrf_message_set_status_info( + success, "oilsMethodException", "Method Not Found", OSRF_STATUS_NOTFOUND ); + } + + + /* now send our new message back */ + if(success) { + + char* xml = osrf_message_to_xml(success); + debug_handler( "Sending XML to client app request:\n%s\n", xml ); + transport_message* return_m = message_init( + xml, "", msg->thread, msg->sender, "" ); + + + client_send_message(router->jabber->t_client, return_m); + + free(xml); + osrf_message_free(success); + } + + } + + return 1; + +} + + + +osrf_message* router_registrar_process_app_request( + transport_router_registrar* router, osrf_message* omsg ) { + + + if( omsg->method_name == NULL ) + return NULL; + + json* result_content = NULL; + + debug_handler( "Received method from client: %s", omsg->method_name ); + + if(!strcmp(omsg->method_name,"router.info.class.list")) { + + debug_handler("Processing router.info.class.list request"); + + result_content = json_object_new_array(); + server_class_node* cur_class = router->server_class_list; + while( cur_class != NULL ) { + + debug_handler("Adding %s to request list", cur_class->server_class); + + json_object_array_add( + result_content, json_object_new_string(cur_class->server_class)); + cur_class = cur_class->next; + } + if( json_object_array_length(result_content) < 1 ) + result_content = NULL; + + } + + if( result_content == NULL ) + return NULL; + + osrf_message* success = osrf_message_init( + RESULT, omsg->thread_trace, omsg->protocol ); + osrf_message_set_result_content( success, result_content ); + json_object_put(result_content); + + return success; +} + + + + int router_registrar_free( transport_router_registrar* router_registrar ) { if( router_registrar == NULL ) return 0; jabber_connect_free( router_registrar->jabber ); diff --git a/src/router/router.h b/src/router/router.h index 2ab1aa7..fed3fd2 100644 --- a/src/router/router.h +++ b/src/router/router.h @@ -1,5 +1,7 @@ #include "opensrf/transport_client.h" #include "opensrf/transport_message.h" +#include "opensrf/osrf_message.h" + #include #include @@ -93,6 +95,7 @@ struct server_class_node_struct { }; typedef struct server_class_node_struct server_class_node; + // ---------------------------------------------------------------------- // Top level router_registrar object. Maintains the list of // server_class_nodes and the top level router jabber connection. @@ -121,8 +124,6 @@ struct transport_router_registrar_struct { typedef struct transport_router_registrar_struct transport_router_registrar; - - // ---------------------------------------------------------------------- // Returns an allocated transport_router_registrar. The user is responsible for // freeing the allocated memory with router_registrar_free() @@ -235,6 +236,15 @@ void listen_loop( transport_router_registrar* router ); int router_return_server_info( transport_router_registrar* router, transport_message* msg ); int remove_server_class( transport_router_registrar* router, server_class_node* class ); + + + +int router_registrar_handle_app_request( transport_router_registrar*, transport_message* msg ); + +osrf_message* router_registrar_process_app_request( + transport_router_registrar* , osrf_message* omsg ); + + // ---------------------------------------------------------------------- // Adds a handler for the SIGUSR1 that we send to wake all the // listening threads.