}
void osrfChatCleanupClients( osrfChatServer* server ) {
- if(server) {
- osrfListFree(server->deadNodes);
- server->deadNodes = osrfNewList();
- }
+ if(!server) return;
+ osrfListFree(server->deadNodes);
+ server->deadNodes = osrfNewList();
}
int osrfChatSendRaw( osrfChatNode* node, char* msgXML ) {
if(!(node && msgXML)) return -1;
- return socket_send( node->sockid, msgXML );
+ /* wait at most 3 second for this client to take our data */
+ return socket_send_timeout( node->sockid, msgXML, 3000000 );
}
-
-
-
void osrfChatNodeFinish( osrfChatServer* server, osrfChatNode* node ) {
if(!(server && node)) return;
osrfChatSendRaw( node, "</stream:stream>");
osrfLogInfo( OSRF_LOG_MARK, "Sending message on local connection\nfrom: %s\nto: %s", fromAddr, toAddr );
osrfChatNode* tonode = osrfHashGet(cs->nodeHash, toAddr);
if(tonode) {
- osrfChatSendRaw( tonode, msgXML );
+
+ /* if we can't send to the recipient (recipient is gone or too busy,
+ * we drop the recipient and inform the sender that the recipient
+ * is no more */
+ if( osrfChatSendRaw( tonode, msgXML ) < 0 ) {
+
+ osrfChatRemoveNode( cs, tonode );
+ char* xml = va_list_to_string( OSRF_CHAT_NO_RECIPIENT, toAddr, fromAddr );
+
+ osrfLogError( OSRF_LOG_MARK, "Node failed to function. "
+ "Responding to caller with error: %s", toAddr);
+
+
+ if( osrfChatSendRaw( node, xml ) < 0 ) {
+ osrfLogError(OSRF_LOG_MARK, "Sending node is now gone..removing");
+ osrfChatRemoveNode( cs, node );
+ }
+ free(xml);
+ }
} else {
/* send an error message saying we don't have this connection */
osrfLogInfo( OSRF_LOG_MARK, "We have no connection for %s", toAddr);
char* xml = va_list_to_string( OSRF_CHAT_NO_RECIPIENT, toAddr, fromAddr );
- osrfChatSendRaw( node, xml );
+ if( osrfChatSendRaw( node, xml ) < 0 )
+ osrfChatRemoveNode( cs, node );
free(xml);
}
if(tonode) {
if( tonode->state == OSRF_CHAT_STATE_CONNECTED ) {
osrfLogDebug( OSRF_LOG_MARK, "Routing message to server %s", dombuf);
- osrfChatSendRaw( tonode, msgXML );
+
+ if( osrfChatSendRaw( tonode, msgXML ) < 0 ) {
+ osrfLogError( OSRF_LOG_MARK, "Node failed to function: %s", toAddr);
+ char* xml = va_list_to_string( OSRF_CHAT_NO_RECIPIENT, toAddr, fromAddr );
+ if( osrfChatSendRaw( node, xml ) < 0 )
+ osrfChatRemoveNode( cs, node );
+ free(xml);
+ osrfChatRemoveNode( cs, tonode );
+ }
} else {
osrfLogInfo( OSRF_LOG_MARK, "Received s2s message and we're still trying to connect...caching");
char* username;
char* authkey; /* when doing any auth negotiation, this is the auth seed hash */
-
osrfList* msgs; /* if we're a server node we may have a pool of messages waiting to be delivered */
xmlParserCtxtPtr parserCtx;
};
typedef struct __osrfChatNodeStruct osrfChatNode;
+/*
struct __osrfChatS2SMessageStruct {
char* toAddr;
char* msgXML;
};
typedef struct __osrfChatS2SMessageStruct osrfChatS2SMessage;
+*/
struct __osrfChatServerStruct {
osrfHash* nodeHash; /* sometimes we need hash (remote id) lookup, sometimes we need socket id lookup */
/* sends the given data to the given socket */
int socket_send(int sock_fd, const char* data) {
- osrfLogInternal( OSRF_LOG_MARK, "socket_bundle sending to %d data %s",
- sock_fd, data);
+ return _socket_send( sock_fd, data, 0);
+}
+
+
+int _socket_send(int sock_fd, const char* data, int flags) {
signal(SIGPIPE, SIG_IGN); /* in case a unix socket was closed */
- if( send( sock_fd, data, strlen(data), 0 ) < 0 ) {
- osrfLogWarning( OSRF_LOG_MARK, "tcp_server_send(): Error sending data" );
+
+ size_t r = send( sock_fd, data, strlen(data), flags );
+
+ if( r == -1 ) {
+ osrfLogWarning( OSRF_LOG_MARK, "tcp_server_send(): Error sending data with return %d", r );
+ osrfLogWarning( OSRF_LOG_MARK, "Last Sys Error: %s", strerror(errno));
return -1;
}
return 0;
}
-/* disconnects the node with the given sock_fd and removes
- it from the socket set */
-void socket_disconnect(socket_manager* mgr, int sock_fd) {
- osrfLogDebug( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
+int socket_send_nowait( int sock_fd, const char* data) {
+ return _socket_send( sock_fd, data, MSG_DONTWAIT);
+}
- if( close( sock_fd ) == -1 )
- osrfLogWarning( OSRF_LOG_MARK, "socket_disconnect(): Error closing socket, removing anyway" );
- if(mgr != NULL)
- socket_remove_node(mgr, sock_fd);
-
+/*
+ * Waits at most usecs microseconds for the send buffer of the given
+ * socket to accept new data. This does not guarantee that the
+ * socket will accept all the data we want to give it.
+ */
+int socket_send_timeout( int sock_fd, const char* data, int usecs ) {
+
+ fd_set write_set;
+ FD_ZERO( &write_set );
+ FD_SET( sock_fd, &write_set );
+
+ int mil = 1000000;
+ int secs = (int) usecs / mil;
+ usecs = usecs - (secs * mil);
+
+ struct timeval tv;
+ tv.tv_sec = secs;
+ tv.tv_usec = usecs;
+
+ osrfLogInfo(OSRF_LOG_MARK, "Socket waiting on select before send");
+ int ret = select( sock_fd + 1, NULL, &write_set, NULL, &tv);
+ osrfLogInfo(OSRF_LOG_MARK, "Socket done waiting");
+
+ if( ret > 0 ) return _socket_send( sock_fd, data, 0);
+
+ osrfLogError(OSRF_LOG_MARK, "socket_send_timeout(): "
+ "timed out on send for socket %d after %d secs, %d usecs", sock_fd, secs, usecs );
+
+ return -1;
+}
+
+
+/* disconnects the node with the given sock_fd and removes
+ it from the socket set */
+void socket_disconnect(socket_manager* mgr, int sock_fd) {
+ osrfLogInternal( OSRF_LOG_MARK, "Closing socket %d", sock_fd);
+ close( sock_fd );
+ socket_remove_node(mgr, sock_fd);
}
/* sends the given data to the given socket. returns 0 on success, -1 otherwise */
int socket_send(int sock_fd, const char* data);
+/* utility method */
+int _socket_send(int sock_fd, const char* data, int flags);
+
+
+/* sends the given data to the given socket.
+ * sets the send flag MSG_DONTWAIT which will allow the
+ * process to continue even if the socket buffer is full
+ * returns 0 on success, -1 otherwise */
+int socket_send_nowait( int sock_fd, const char* data);
+
+/* waits at most usecs microseconds for the socket buffer to
+ * be available */
+int socket_send_timeout( int sock_fd, const char* data, int usecs );
+
/* disconnects the node with the given sock_fd and removes
it from the socket set */
void socket_disconnect(socket_manager*, int sock_fd);
#include "utils.h"
#include <errno.h>
+
inline void* safe_malloc( int size ) {
void* ptr = (void*) malloc( size );
if( ptr == NULL ) {