LP#1268619: websocket translator idle timeout / graceful shutdown
authorBill Erickson <berick@esilibrary.com>
Tue, 14 Jan 2014 21:22:23 +0000 (16:22 -0500)
committerGalen Charlton <gmc@esilibrary.com>
Tue, 19 Aug 2014 22:50:47 +0000 (15:50 -0700)
Added support for an idle timeout and idle check interval configuration
variables.  These allow each websocket apache process to kick off
clients that have been connected and are idle for too long, thus hogging
a process unnecessarily.

Added a SIGUSR1 signal handler which forces the idle timeout to be very
low and a short re-check period so that the client can be kicked as soon
as there are no open conversations.

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Galen Charlton <gmc@esilibrary.com>
src/gateway/osrf_websocket_translator.c

index b430871..e2cfc98 100644 (file)
  *   "osrf_msg" : [<osrf_msg>, <osrf_msg>, ...]   // required
  * }
  *
- * Each translator operates with two threads.  One thread receives messages
+ * Each translator operates with three threads.  One thread receives messages
  * from the websocket client, translates, and relays them to the opensrf 
  * network. The second thread collects responses from the opensrf network and 
- * relays them back to the websocket client.
+ * relays them back to the websocket client.  The third thread inspects
+ * the idle timeout interval t see if it's time to drop the idle client.
  *
- * After the initial setup, all thread actions occur within a thread mutex.
- * The desired affect is a non-threaded application that uses threads for 
- * the sole purpose of having one thread listening for incoming data, while
- * a second thread listens for responses.  When either thread awakens, it's
- * the only thread in town until it goes back to sleep (i.e. listening on 
+ * After the initial setup, all thread actions occur within a thread
+ * mutex.  The desired affect is a non-threaded application that uses
+ * threads for the sole purpose of having one thread listening for
+ * incoming data, while a second thread listens for responses, and a
+ * third checks the idle timeout.  When any thread awakens, it's the
+ * only thread in town until it goes back to sleep (i.e. listening on
  * its socket for data).
  *
- * Note that with a "thread", which allows us to identify the opensrf session,
- * the caller does not need to provide a recipient address.  The "service" is
- * only required to start a new opensrf session.  After the sesession is 
- * started, all future communication is based solely on the thread.  However,
- * the "service" should be passed by the caller for all requests to ensure it
- * is properly logged in the activity log.
+ * Note that with the opensrf "thread", which allows us to identify the
+ * opensrf session, the caller does not need to provide a recipient
+ * address.  The "service" is only required to start a new opensrf
+ * session.  After the sesession is started, all future communication is
+ * based solely on the thread.  However, the "service" should be passed
+ * by the caller for all requests to ensure it is properly logged in the
+ * activity log.
+ *
+ * Every inbound and outbound message updates the last_activity_time.
+ * A separate thread wakes periodically to see if the time since the
+ * last_activity_time exceeds the configured idle_timeout_interval.  If
+ * so, a disconnect is sent to the client, completing the conversation.
+ *
+ * Configuration goes directly into the Apache envvars file.  
+ * (e.g. /etc/apache2-websockets/envvars).  As of today, it's not 
+ * possible to leverage Apache configuration directives directly,
+ * since this is not an Apache module, but a shared library loaded
+ * by an apache module.  This includes SetEnv / SetEnvIf.
+ *
+ * export OSRF_WEBSOCKET_IDLE_TIMEOUT=300
+ * export OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL=5
+ * export OSRF_WEBSOCKET_CONFIG_FILE=/openils/conf/opensrf_core.xml
+ * export OSRF_WEBSOCKET_CONFIG_CTXT=gateway
  */
 
 /**
  * down for graceful disconnects.
  */
 
+#include <stdlib.h>
+#include <signal.h>
+#include <unistd.h>
 #include "httpd.h"
+#include "http_log.h"
 #include "apr_strings.h"
 #include "apr_thread_proc.h"
 #include "apr_hash.h"
 #define RECIP_BUF_SIZE 128
 #define WEBSOCKET_TRANSLATOR_INGRESS "ws-translator-v1"
 
+
+// default values, replaced during setup (below) as needed.
+static char* config_file = "/openils/conf/opensrf_core.xml";
+static char* config_ctxt = "gateway";
+static time_t idle_timeout_interval = 300; 
+static time_t idle_check_interval = 5;
+static time_t last_activity_time = 0;
+
+// true if we've received a signal to start graceful shutdown
+static int shutdown_requested = 0; 
+static void sigusr1_handler(int sig);
+static void sigusr1_handler(int sig) {                                       
+    shutdown_requested = 1; 
+    signal(SIGUSR1, sigusr1_handler);
+    osrfLogInfo(OSRF_LOG_MARK, "WS received SIGUSR1 - Graceful Shutdown");
+}
+
 typedef struct _osrfWebsocketTranslator {
 
     /** Our handle for communicating with the caller */
@@ -106,6 +146,14 @@ typedef struct _osrfWebsocketTranslator {
     apr_thread_t *responder_thread;
 
     /**
+     * Thread responsible for checking inactivity timeout.
+     * If no activitity occurs within the configured interval,
+     * a disconnect is sent to the client and the connection
+     * is terminated.
+     */
+    apr_thread_t *idle_timeout_thread;
+
+    /**
      * All message handling code is wrapped in a thread mutex such
      * that all actions (after the initial setup) are serialized
      * to minimize the possibility of multi-threading snafus.
@@ -160,7 +208,6 @@ static void clear_cached_recipient(const char* thread) {
     }
 }
 
-
 void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
     osrfList *msg_list = NULL;
@@ -251,7 +298,116 @@ void* osrf_responder_thread_main_body(transport_message *tmsg) {
 
     free(msg_string);
     jsonObjectFree(msg_wrapper);
+}
+
+/**
+ * Sleep and regularly wake to see if the process has been idle for too
+ * long.  If so, send a disconnect to the client.
+ *
+ */
+void* APR_THREAD_FUNC osrf_idle_timeout_thread_main(
+        apr_thread_t *thread, void *data) {
+
+    // sleep time defaults to the check interval, but may 
+    // be shortened during shutdown.
+    int sleep_time = idle_check_interval;
+    int shutdown_loops = 0;
+
+    while (1) {
+
+        if (apr_thread_mutex_unlock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error un-locking thread mutex");
+            return NULL;
+        }
+
+        // note: receiving a signal (e.g. SIGUSR1) will not interrupt
+        // this sleep(), since it's running within its own thread.
+        // During graceful shtudown, we may wait up to 
+        // idle_check_interval seconds before initiating shutdown.
+        sleep(sleep_time);
+        
+        if (apr_thread_mutex_lock(trans->mutex) != APR_SUCCESS) {
+            osrfLogError(OSRF_LOG_MARK, "WS error locking thread mutex");
+            return NULL;
+        }
+
+        // no client is connected.  reset sleep time go back to sleep.
+        if (!trans->client_connected) {
+            sleep_time = idle_check_interval;
+            continue;
+        }
+
+        // do we have any active conversations with the connected client?
+        int active_count = apr_hash_count(trans->session_cache);
+
+        if (active_count) {
+
+            if (shutdown_requested) {
+                // active conversations means we can't shut down.  
+                // shorten the check interval to re-check more often.
+                shutdown_loops++;
+                osrfLogDebug(OSRF_LOG_MARK, 
+                    "WS: %d active conversation(s) found in shutdown after "
+                    "%d attempts.  Sleeping...", shutdown_loops, active_count
+                );
+
+                if (shutdown_loops > 30) {
+                    // this is clearly a long-running conversation, let's
+                    // check less frequently to avoid excessive logging.
+                    sleep_time = 3;
+                } else {
+                    sleep_time = 1;
+                }
+            } 
+
+            // active conversations means keep going.  There's no point in
+            // checking the idle time (below) if we're mid-conversation
+            continue;
+        }
+
+        // no active conversations
+             
+        if (shutdown_requested) {
+            // there's no need to reset the shutdown vars (loops/requested)
+            // SIGUSR1 is Apaches reload signal, which means this process
+            // will be going away as soon as the client is disconnected.
+
+            osrfLogInfo(OSRF_LOG_MARK,
+                "WS: no active conversations remain in shutdown; "
+                    "closing client connection");
+
+        } else { 
+            // see how long we've been idle.  If too long, kick the client
+
+            time_t now = time(NULL);
+            time_t difference = now - last_activity_time;
+
+            osrfLogDebug(OSRF_LOG_MARK, 
+                "WS has been idle for %d seconds", difference);
+
+            if (difference < idle_timeout_interval) {
+                // Last activity occurred within the idle timeout interval.
+                continue;
+            }
+
+            // idle timeout exceeded
+            osrfLogDebug(OSRF_LOG_MARK, 
+                "WS: idle timeout exceeded.  now=%d / last=%d; " 
+                "closing client connection", now, last_activity_time);
+        }
+
+
+        // send a disconnect to the client, which will come back around
+        // to cause our on_disconnect_handler to run.
+        osrfLogDebug(OSRF_LOG_MARK, "WS: sending close() to client");
+        trans->server->close(trans->server);
+
+        // client will be going away, reset sleep time
+        sleep_time = idle_check_interval;
+    }
 
+    // should never get here
+    return NULL;
 }
 
 /**
@@ -282,6 +438,7 @@ void* APR_THREAD_FUNC osrf_responder_thread_main(apr_thread_t *thread, void *dat
         osrfLogForceXid(tmsg->osrf_xid);
         osrf_responder_thread_main_body(tmsg);
         message_free(tmsg);                                                         
+        last_activity_time = time(NULL);
     }
 
     return NULL;
@@ -300,17 +457,64 @@ int child_init(const WebSocketServer *server) {
     apr_threadattr_t *thread_attr = NULL;
     apr_thread_mutex_t *mutex = NULL;
     request_rec *r = server->request(server);
-        
-    osrfLogDebug(OSRF_LOG_MARK, "WS child_init");
+
 
     // osrf_handle will already be connected if this is not the first request
     // served by this process.
     if ( !(osrf_handle = osrfSystemGetTransportClient()) ) {
-        char* config_file = "/openils/conf/opensrf_core.xml";
-        char* config_ctx = "gateway"; //TODO config
-        if (!osrfSystemBootstrapClientResc(config_file, config_ctx, "websocket")) {   
+        
+        // load config values from the env
+        char* timeout = getenv("OSRF_WEBSOCKET_IDLE_TIMEOUT");
+        if (timeout) {
+            if (!atoi(timeout)) {
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+                    "WS: invalid OSRF_WEBSOCKET_IDLE_TIMEOUT: %s", timeout);
+            } else {
+                idle_timeout_interval = (time_t) atoi(timeout);
+            }
+        }
+
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+            "WS: timeout set to %d", idle_timeout_interval);
+
+        char* interval = getenv("OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL");
+        if (interval) {
+            if (!atoi(interval)) {
+                ap_log_rerror(APLOG_MARK, APLOG_ERR, 0, r, 
+                    "WS: invalid OSRF_WEBSOCKET_IDLE_CHECK_INTERVAL: %s", 
+                    interval
+                );
+            } else {
+                idle_check_interval = (time_t) atoi(interval);
+            }
+        } 
+
+        ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+            "WS: idle check interval set to %d", idle_check_interval);
+
+      
+        char* cfile = getenv("OSRF_WEBSOCKET_CONFIG_FILE");
+        if (cfile) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r,
+                "WS: config file set to %s", cfile);
+            config_file = cfile;
+        }
+
+        char* ctxt = getenv("OSRF_WEBSOCKET_CONFIG_CTXT");
+        if (ctxt) {
+            ap_log_rerror(APLOG_MARK, APLOG_DEBUG, 0, r, 
+                "WS: config context set to %s", ctxt);
+            config_ctxt = ctxt;
+        }
+
+        // connect to opensrf
+        if (!osrfSystemBootstrapClientResc(
+                config_file, config_ctxt, "websocket")) {   
+
             osrfLogError(OSRF_LOG_MARK, 
-                "WS unable to bootstrap OpenSRF client with config %s", config_file); 
+                "WS unable to bootstrap OpenSRF client with config %s "
+                "and context %s", config_file, config_ctxt
+            ); 
             return 1;
         }
 
@@ -323,7 +527,6 @@ int child_init(const WebSocketServer *server) {
         return 1;
     }
 
-
     // allocate our static translator instance
     trans = (osrfWebsocketTranslator*) 
         apr_palloc(pool, sizeof(osrfWebsocketTranslator));
@@ -359,6 +562,24 @@ int child_init(const WebSocketServer *server) {
         return 1;
     }
 
+    // Create the idle timeout thread, which lives for the lifetime
+    // of the process.
+    thread = NULL; // reset
+    thread_attr = NULL; // reset
+    if ( (apr_threadattr_create(&thread_attr, trans->main_pool) == APR_SUCCESS) &&
+         (apr_threadattr_detach_set(thread_attr, 0) == APR_SUCCESS) &&
+         (apr_thread_create(&thread, thread_attr, 
+            osrf_idle_timeout_thread_main, trans, trans->main_pool) == APR_SUCCESS)) {
+
+        osrfLogDebug(OSRF_LOG_MARK, "WS created idle timeout thread");
+        trans->idle_timeout_thread = thread;
+        
+    } else {
+        osrfLogError(OSRF_LOG_MARK, "WS unable to create idle timeout thread");
+        return 1;
+    }
+
+
     if (apr_thread_mutex_create(
             &mutex, APR_THREAD_MUTEX_UNNESTED, 
             trans->main_pool) != APR_SUCCESS) {
@@ -368,6 +589,8 @@ int child_init(const WebSocketServer *server) {
 
     trans->mutex = mutex;
 
+    signal(SIGUSR1, sigusr1_handler);
+
     return APR_SUCCESS;
 }
 
@@ -377,10 +600,16 @@ int child_init(const WebSocketServer *server) {
 void* CALLBACK on_connect_handler(const WebSocketServer *server) {
     request_rec *r = server->request(server);
     apr_pool_t *pool;
+    apr_thread_t *thread = NULL;
+    apr_threadattr_t *thread_attr = NULL;
 
-    osrfLogInfo(OSRF_LOG_MARK, 
-        "WS connect from %s", r->connection->remote_ip); 
-        //"WS connect from %s", r->connection->client_ip); // apache 2.4
+#ifdef APACHE_MIN_24
+    char* client_ip = r->connection->client_ip;
+#else
+    char* client_ip = r->connection->remote_ip;
+#endif
+
+    osrfLogInfo(OSRF_LOG_MARK, "WS connect from %s", client_ip);
 
     if (!trans) {
         // first connection
@@ -399,6 +628,8 @@ void* CALLBACK on_connect_handler(const WebSocketServer *server) {
 
     trans->session_pool = pool;
     trans->client_connected = 1;
+    last_activity_time = time(NULL);
+
     return trans;
 }
 
@@ -600,6 +831,8 @@ static size_t on_message_handler_body(void *data,
     jsonObjectFree(msg_wrapper);
     free(msg_body);
 
+    last_activity_time = time(NULL);
+
     return OK;
 }
 
@@ -631,6 +864,10 @@ void CALLBACK on_disconnect_handler(
 
     osrfWebsocketTranslator *trans = (osrfWebsocketTranslator*) data;
     trans->client_connected = 0;
+
+    // timeout thread is recreated w/ each new connection
+    apr_thread_exit(trans->idle_timeout_thread, APR_SUCCESS);
+    trans->idle_timeout_thread = NULL;
     
     // ensure no errant session data is sticking around
     apr_hash_clear(trans->session_cache);
@@ -655,7 +892,8 @@ void CALLBACK on_destroy_handler(WebSocketPlugin *plugin) {
     if (trans) {
         apr_thread_exit(trans->responder_thread, APR_SUCCESS);
         apr_thread_mutex_destroy(trans->mutex);
-        apr_pool_destroy(trans->session_pool);
+        if (trans->session_pool)
+            apr_pool_destroy(trans->session_pool);
         apr_pool_destroy(trans->main_pool);
     }