From: Bill Erickson Date: Mon, 11 Jun 2018 22:27:22 +0000 (-0400) Subject: Websocketd more comments; remove some reduntant code X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=4992307937477bfff22f31e9b027346bd0d55ec3;p=working%2FOpenSRF.git Websocketd more comments; remove some reduntant code Signed-off-by: Bill Erickson --- diff --git a/src/websocket-stdio/osrf-websocket-stdio.c b/src/websocket-stdio/osrf-websocket-stdio.c index 4474c96..662285a 100644 --- a/src/websocket-stdio/osrf-websocket-stdio.c +++ b/src/websocket-stdio/osrf-websocket-stdio.c @@ -1,14 +1,14 @@ /* -------------------------------------------------------------------- * Copyright (C) 2018 King County Library Service * Bill Erickson - * + * * Code borrows heavily from osrf_websocket_translator.c - * + * * This program is free software; you can redistribute it and/or * modify it under the terms of the GNU General Public License * as published by the Free Software Foundation; either version 2 * of the License, or (at your option) any later version. - * + * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the @@ -49,7 +49,7 @@ // practice, this number will be very small, rarely reaching double // digits. This is just a security back-stop. A client trying to open // this many connections is almost certainly attempting to DOS the -// gateway / server. +// gateway / server. #define MAX_ACTIVE_STATEFUL_SESSIONS 64 // Message exceeding this size are discarded. @@ -57,23 +57,28 @@ // ~10M #define MAX_MESSAGE_SIZE 10485760 -// After processing any message this size or larger, free and +// After processing any message this size or larger, free and // recreate the stdin buffer to release the memory. // ~100k #define RESET_MESSAGE_SIZE 102400 // default values, replaced during setup (below) as needed. -static char* config_file = "/openils/conf/opensrf_core.xml"; +static char* config_file = "/openils/conf/opensrf_core.xml"; static char* config_ctxt = "gateway"; static char* osrf_router = NULL; static char* osrf_domain = NULL; // Cache of opensrf thread strings and back-end receipients. -// Tracking this here means the caller only needs to track/know the thread. +// Tracking this here means the caller only needs to track the thread. +// It also means we don't have to expose internal XMPP IDs static osrfHash* stateful_session_cache = NULL; +// Message on STDIN go into our reusable buffer static growing_buffer* stdin_buf = NULL; +// OpenSRF XMPP connection handle static transport_client* osrf_handle = NULL; -static char recipient_buf[RECIP_BUF_SIZE]; // reusable recipient buffer +// Reusable string buf for recipient addresses +static char recipient_buf[RECIP_BUF_SIZE]; +// Websocket client IP address (for logging) static char* client_ip = NULL; static void rebuild_stdin_buffer(); @@ -87,29 +92,28 @@ static void read_one_osrf_message(transport_message*); static int shut_it_down(int); static void release_hash_string(char*, void*); -static void sigint_handler(int sig) { +// Websocketd sends SIGINT for shutdown, followed by SIGTERM +// if SIGINT takes too long. +static void sigint_handler(int sig) { osrfLogInfo(OSRF_LOG_MARK, "WS received SIGINT - graceful shutdown"); shut_it_down(0); } int main(int argc, char* argv[]) { - // Handle shutdown signal + // Handle shutdown signal -- only needed once. signal(SIGINT, sigint_handler); - rebuild_stdin_buffer(); - - // Connect to OpenSRF - // exits on error. + // Connect to OpenSR -- exits on error child_init(argc, argv); // Disable output buffering. setbuf(stdout, NULL); + rebuild_stdin_buffer(); // The main loop waits for data to be available on both STDIN - // (inbound websocket data) and the OpenSRF XMPP socket (outbound - // replies). - + // (websocket client request) and the OpenSRF XMPP socket + // (replies returning to the websocket client). fd_set fds; int stdin_no = fileno(stdin); int osrf_no = osrf_handle->session->sock_id; @@ -122,25 +126,19 @@ int main(int argc, char* argv[]) { FD_SET(osrf_no, &fds); FD_SET(stdin_no, &fds); + // Wait indefinitely for activity to process sel_resp = select(maxfd + 1, &fds, NULL, NULL, NULL); - if (sel_resp < 0) { + if (sel_resp < 0) { // error if (errno == EINTR) { // Interrupted by a signal. Start the loop over. continue; } - if (errno == EBADF) { - osrfLogError(OSRF_LOG_MARK, - "WS exiting on bad file descriptor"); - - } else { - osrfLogError(OSRF_LOG_MARK, - "WS select() failed with [%s]. Exiting", strerror(errno)); - } + osrfLogError(OSRF_LOG_MARK, + "WS select() failed with [%s]. Exiting", strerror(errno)); - // No way to fix this one, get outta here. shut_it_down(1); } @@ -168,13 +166,14 @@ static void rebuild_stdin_buffer() { static int shut_it_down(int stat) { osrfHashFree(stateful_session_cache); buffer_free(stdin_buf); - osrf_system_shutdown(); + osrf_system_shutdown(); // clean XMPP disconnect exit(stat); return stat; } -// Connect to opensrf +// Connect to OpenSRF/XMPP +// Apply settings and command line args. static void child_init(int argc, char* argv[]) { if (argc > 1) { @@ -207,9 +206,9 @@ static void release_hash_string(char* key, void* str) { } -// Relay messages from STDIN to OpenSRF -// Reads one message then returns to allow for responses to intermingle -// with a long series of requests. +// Relay websocket client messages from STDIN to OpenSRF. Reads one +// message then returns, allowing responses to intermingle with long +// series of requests. static void read_from_stdin() { char char_buf[1]; char c; @@ -225,10 +224,15 @@ static void read_from_stdin() { if (errno == EAGAIN) { // read interrupted. Return to main loop to resume. + // Returning here will leave any in-progress message in + // the stdin_buf. We return to the main select loop + // to confirm we really have more data to read and to + // perform additional error checking on the stream. return; } - osrfLogError(OSRF_LOG_MARK, + // All other errors reading STDIN are considered fatal. + osrfLogError(OSRF_LOG_MARK, "WS STDIN read failed with [%s]. Exiting", strerror(errno)); shut_it_down(1); return; @@ -245,7 +249,7 @@ static void read_from_stdin() { if (c == '\n') { // end of current message if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { - osrfLogError(OSRF_LOG_MARK, + osrfLogError(OSRF_LOG_MARK, "WS message exceeded MAX_MESSAGE_SIZE, discarding"); rebuild_stdin_buffer(); return; @@ -271,8 +275,9 @@ static void read_from_stdin() { } else { if (stdin_buf->n_used >= MAX_MESSAGE_SIZE) { - // Message exceeds max message size. - // Continue reading and discarding data. + // Message exceeds max message size. Continue reading + // and discarding data. NOTE: don't reset stdin_buf + // here becase we check n_used again once reading is done. continue; } @@ -282,6 +287,7 @@ static void read_from_stdin() { } } +// Relays a single websocket request to the OpenSRF/XMPP network. static void relay_stdin_message(const char* msg_string) { jsonObject *msg_wrapper = NULL; // free me @@ -389,6 +395,8 @@ static void relay_stdin_message(const char* msg_string) { free(msg_body); } +// Turn the OpenSRF message JSON into a set of osrfMessage's for +// analysis, ingress application, and logging. static char* extract_inbound_messages( const char* service, const char* thread, const jsonObject *osrf_msg) { @@ -399,7 +407,7 @@ static char* extract_inbound_messages( // here we do an extra json round-trip to get the data // in a form osrf_message_deserialize can understand - // TODO: consider a version of osrf_message_init which can + // TODO: consider a version of osrf_message_init which can // accept a jsonObject* instead of a JSON string. char *osrf_msg_json = jsonObjectToJSON(osrf_msg); osrf_message_deserialize(osrf_msg_json, msg_list, num_msgs); @@ -435,12 +443,13 @@ static char* extract_inbound_messages( char* finalMsg = osrfMessageSerializeBatch(msg_list, num_msgs); // clean up our messages - for (i = 0; i < num_msgs; i++) + for (i = 0; i < num_msgs; i++) osrfMessageFree(msg_list[i]); return finalMsg; } +// All REQUESTs are logged as activity. static void log_request(const char* service, osrfMessage* msg) { const jsonObject* params = msg->_params; @@ -481,7 +490,7 @@ static void log_request(const char* service, osrfMessage* msg) { -// Relay messages from OpenSRF to STDIN +// Relay response messages from OpenSRF to STDIN // Relays all available messages static void read_from_osrf() { transport_message* tmsg = NULL; @@ -489,23 +498,23 @@ static void read_from_osrf() { // Double check the socket connection before continuing. if (!client_connected(osrf_handle) || !socket_connected(osrf_handle->session->sock_id)) { - osrfLogWarning(OSRF_LOG_MARK, + osrfLogWarning(OSRF_LOG_MARK, "WS: Jabber socket disconnected, exiting"); shut_it_down(1); } - // Once client_recv is called all data waiting on the socket is read. - // This means we can't return to the main select() loop after each message, - // because any subsequent messages will get stuck in the opensrf receive - // queue. Process all available messages. + // Once client_recv is called all data waiting on the socket is + // read. This means we can't return to the main select() loop after + // each message, because any subsequent messages will get stuck in + // the opensrf receive queue. Process all available messages. while ( (tmsg = client_recv(osrf_handle, 0)) ) { if (tmsg->is_error) { - // tmsg here is bounced message, likely the result of a XMPP - // cancel code 503 (forbidden) from an attempt to send a - // message to a service that's not available on the public - // XMPP domain. Recovery not possible. - osrfLogError(OSRF_LOG_MARK, + // tmsg here is likely a bounced message, possibly the result + // of a XMPP cancel code 503 (forbidden) from an attempt to + // send a message to a service that's not available on the + // public XMPP domain. Treat as irrecoverable. + osrfLogError(OSRF_LOG_MARK, "WS XMPP error [%d] occured, exiting", tmsg->error_code); shut_it_down(1); return; @@ -516,6 +525,8 @@ static void read_from_osrf() { } } +// Process a single OpenSRF response message and print the reponse +// to STDOUT for delivery to the websocket client. static void read_one_osrf_message(transport_message* tmsg) { osrfList *msg_list = NULL; osrfMessage *one_msg = NULL; @@ -542,7 +553,7 @@ static void read_one_osrf_message(transport_message* tmsg) { if (!osrfHashGet(stateful_session_cache, tmsg->thread)) { - unsigned long ses_size = + unsigned long ses_size = osrfHashGetCount(stateful_session_cache); if (ses_size < MAX_ACTIVE_STATEFUL_SESSIONS) { @@ -579,23 +590,15 @@ static void read_one_osrf_message(transport_message* tmsg) { // the individual osrfMessage's will be freed along with it osrfListFree(msg_list); - // relay the response messages to the client + // Pack the response into a websocket wrapper message. jsonObject *msg_wrapper = NULL; char *msg_string = NULL; - - // build the wrapper object msg_wrapper = jsonNewObject(NULL); + jsonObjectSetKey(msg_wrapper, "thread", jsonNewObject(tmsg->thread)); jsonObjectSetKey(msg_wrapper, "log_xid", jsonNewObject(tmsg->osrf_xid)); jsonObjectSetKey(msg_wrapper, "osrf_msg", jsonParseRaw(tmsg->body)); - if (tmsg->is_error) { - osrfLogError(OSRF_LOG_MARK, - "WS received jabber error message in response to thread=%s", - tmsg->thread); - jsonObjectSetKey(msg_wrapper, "transport_error", jsonNewBoolObject(1)); - } - msg_string = jsonObjectToJSONRaw(msg_wrapper); // Send the JSON to STDOUT @@ -603,11 +606,6 @@ static void read_one_osrf_message(transport_message* tmsg) { free(msg_string); jsonObjectFree(msg_wrapper); - } - - - -