From 96afe187d28ea59738f85e91cfd4c3bf9556627f Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Sun, 10 Jun 2018 15:43:12 -0400 Subject: [PATCH] Websocket stdio / websocketd experiment Signed-off-by: Bill Erickson --- src/websocket-stdio/osrf-websocket-stdio.c | 99 ++++++++++++++++++++---------- 1 file changed, 68 insertions(+), 31 deletions(-) diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index f4879bd..c02e6ff 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -18,6 +18,7 @@ GNU General Public License for more details. #include #include #include +#include #include #include #include @@ -78,9 +79,19 @@ static void read_from_stdin(); static void relay_stdin_message(const char*); static char* extract_inbound_messages(); static void read_from_osrf(); +static void read_one_osrf_message(transport_message*); +static int shut_it_down(int); + +static void sigint_handler(int sig) { + osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown"); + shut_it_down(0); +} int main() { + // Handle shutdown signal + signal(SIGINT, sigint_handler); + // stdin buffer is cleared with each read and reused. stdin_buf = buffer_init(512); @@ -122,26 +133,33 @@ int main() { } else { osrfLogError(OSRF_LOG_MARK, - "WS exiting on irrecoverable select() error"); + "WS select() failed with [%s]. Exiting", strerror(errno)); } - // No way to fix this one, exit program. - exit(1); + // No way to fix this one, get outta here. + shut_it_down(1); } if (FD_ISSET(stdin_no, &fds)) { + osrfLogDebug(OSRF_LOG_MARK, "STDIN active"); read_from_stdin(); } if (FD_ISSET(osrf_no, &fds)) { + osrfLogDebug(OSRF_LOG_MARK, "XMPP active"); read_from_osrf(); } } + return shut_it_down(0); +} + +static int shut_it_down(int stat) { osrfHashFree(stateful_session_cache); buffer_free(stdin_buf); osrf_system_shutdown(); - return 0; + exit(stat); + return stat; } @@ -150,7 +168,7 @@ static void child_init() { if (!osrf_system_bootstrap_client(config_file, config_ctxt) ) { fprintf(stderr, "Cannot boostrap OSRF\n"); - exit(1); + shut_it_down(1); } osrf_handle = osrfSystemGetTransportClient(); @@ -168,17 +186,36 @@ static void child_init() { // Relay messages from STDIN to OpenSRF -// Reads one message then returns +// Reads one message then returns to allow for response to intermingle +// with a long series of requests. static void read_from_stdin() { + char char_buf[1]; char c; - while ( (c = getchar()) ) { + while (1) { + int stat = read(fileno(stdin), char_buf, 1); + + if (stat < 0) { + + if (errno == EAGAIN) { + // read interrupted. Return to main loop to resume. + return; + } + + osrfLogError(OSRF_LOG_MARK, + "WS STDIN read failed with [%s]. Exiting", strerror(errno)); + shut_it_down(1); + return; + } - if (c == EOF) { + if (stat == 0) { // EOF osrfLogInfo(OSRF_LOG_MARK, "WS exiting on disconnect"); - exit(0); + shut_it_down(0); + return; } + c = char_buf[0]; + if (c == '\n') { // end of current message if (stdin_buf->n_used > 0) { @@ -288,7 +325,7 @@ static void relay_stdin_message(const char* msg_string) { if (client_send_message(osrf_handle, tmsg) != 0) { osrfLogError(OSRF_LOG_MARK, "WS failed sending data to OpenSRF, exiting"); - exit(1); + shut_it_down(1); } osrfLogClearXid(); @@ -394,32 +431,31 @@ static char* extract_inbound_messages( // Relay messages from OpenSRF to STDIN // Relays all available messages static void read_from_osrf() { + transport_message* tmsg = NULL; - osrfList *msg_list = NULL; - osrfMessage *one_msg = NULL; - char* recipToFree = NULL; - int i; - - // At this point we know the osrf socket has data, read - // up to one message with no waiting. - transport_message* tmsg = client_recv(osrf_handle, 0); - - if (!tmsg) { - // tmsg can only be NULL if the underlying select() call is - // interrupted or the jabber socket connection was severed. - - if (client_connected(osrf_handle) && - socket_connected(osrf_handle->session->sock_id)) { - return; // back to main select loop - } - - // Socket connection was broken. Send disconnect to client, - // causing on_disconnect_handler to run and cleanup. + // Double check the socket connection before continuing. + if (!client_connected(osrf_handle) || + !socket_connected(osrf_handle->session->sock_id)) { osrfLogWarning(OSRF_LOG_MARK, "WS: Jabber socket disconnected, exiting"); + shut_it_down(1); + } - exit(1); + // Once client_recv is called all data waiting on the socket is read. + // This means we can't return to the main select() loop after each message, + // because any subsequent messages will get stuck in the opensrf queue. + // Instead, process all available messages. + while ( (tmsg = client_recv(osrf_handle, 0)) ) { + read_one_osrf_message(tmsg); + message_free(tmsg); } +} + +static void read_one_osrf_message(transport_message* tmsg) { + osrfList *msg_list = NULL; + osrfMessage *one_msg = NULL; + char* recipToFree = NULL; + int i; osrfLogDebug(OSRF_LOG_MARK, "WS received opensrf response for thread=%s", tmsg->thread); @@ -510,6 +546,7 @@ static void read_from_osrf() { free(msg_string); jsonObjectFree(msg_wrapper); + } -- 2.11.0