LP#1339190 Add support for the "Multiplex" personality
authorMike Rylander <mrylander@gmail.com>
Wed, 11 Sep 2013 19:52:00 +0000 (15:52 -0400)
committerMike Rylander <mrylander@gmail.com>
Fri, 5 Dec 2014 17:17:54 +0000 (12:17 -0500)
We use Net::Server::Multiplex to reduce resource consumption.  In
this mode, SIPServer will maintain connection state in the main
listening process and fork workers as needed to handle individual
requests.

Initial implementation by Mike Rylander, with a conversion from
File::Queue to Memcache, and LOTS of debugging, by Bill Erickson.

Some highlights:

* A fork fence for max concurrent in-flight requests

* Allow the ILS to save state in IO::Multiplex mode

* Optimistic login using fork-and-check

When a SIP child process is spawned to handle a new connection login,
the pending login is tracked in the parent process (by PID) and the
child indicates to the parent that the login has succeeded by storing
login success/failure plus some state information in memcache.  Any time
the parent wakes up to process a message, it checks for completed logins
so they can be resolved as OK in the parent and the state information
is extracted and stored for future conversation with the resolved client.

* Let workers hang around for a bit for higher-rate clients

For some clients, such as AMH (sorters), the per-message connections
cause too much latency.  So, instead, we'll let their backends hang
around for a while.  This is controlled by a new attribute on the
server-params element, worker-keepalive, as a peer to the personality
value.  This value can also be set on the institution and login elements
to support finer-grained keepalive tuning.

This is measured in seconds and the default is 5. A value of 0 here
will disable this feature altogether.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Blake GH <blake@mobiusconsortium.org>
SIPServer.pm
SIPconfig.xml
Sip.pm
Sip/Configuration.pm
Sip/MsgType.pm

index 00dc93a..9e598b6 100755 (executable)
@@ -1,7 +1,10 @@
 #
 # Copyright (C) 2006-2008  Georgia Public Library Service
+# Copyright (C) 2013-2014  Equinox Software, Inc.
 # 
 # Author: David J. Fiander
+# Author: Mike Rylander
+# Author: Bill Erickson
 # 
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of version 2 of the GNU General Public
@@ -23,18 +26,24 @@ use strict;
 use warnings;
 use Exporter;
 use Sys::Syslog qw(syslog);
+use Net::Server::Multiplex;
 use Net::Server::PreFork;
 use Net::Server::Proto;
 use IO::Socket::INET;
+use IO::Pipe;
 use Socket qw(:crlf SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_KEEPALIVE);
 use Data::Dumper;              # For debugging
 require UNIVERSAL::require;
+use POSIX qw/:sys_wait_h :errno_h/;
 
-#use Sip qw(readline);
+use Sip qw($protocol_version);
 use Sip::Constants qw(:all);
 use Sip::Configuration;
 use Sip::Checksum qw(checksum verify_cksum);
 use Sip::MsgType;
+use Time::HiRes qw/time/;
+
+use Cache::Memcached;
 
 use constant LOG_SIP => "local6"; # Local alias for the logging facility
 
@@ -82,19 +91,43 @@ push @parms,
     "syslog_facility=" . LOG_SIP;
 
 #
-# Server Management: set parameters for the Net::Server::PreFork
-# module.  The module silently ignores parameters that it doesn't
+# Server Management: set parameters for the Net::Server personality
+# chosen, defaulting to PreFork.
+#
+# The PreFork module silently ignores parameters that it doesn't
 # recognize, and complains about invalid values for parameters
 # that it does.
 #
+# The Fork module only cares about max_servers, for our purposes, which
+# defaults to 256.
+#
+# The Multiplex module ignores all runtime params, and triggers an
+# alternate implementation of the processing loop.  See the Net::Server
+# personality documentation for details. The max-concurrent parameter
+# can be used here to limit the number of concurrent in-flight requests
+# to avoid a fork-bomb DoS situation.  The default is 256.
+#
+my $worker_keepalive = 5;
+my $max_concurrent = 256;
 if (defined($config->{'server-params'})) {
     while (my ($key, $val) = each %{$config->{'server-params'}}) {
         push @parms, $key . '=' . $val;
+        @ISA = ('Net::Server::'.$val) if ($key eq 'personality');
+        $max_concurrent = $val if ($key eq 'max-concurrent');
+        $worker_keepalive = $val if ($key eq 'worker-keepalive');
     }
 }
 
 print Dumper(@parms);
 
+# initialize all remaining global variables before 
+# going into listen mode.
+my %kid_hash;
+my $kid_count = 0;
+my $cache;
+my @pending_connections;
+my %active_connections;
+
 #
 # This is the main event.
 SIPServer->run(@parms);
@@ -104,7 +137,8 @@ SIPServer->run(@parms);
 #
 
 # process_request is the callback used by Net::Server to handle
-# an incoming connection request.
+# an incoming connection request when the peronsality is either
+# Fork or PreFork.
 
 sub process_request {
     my $self = shift;
@@ -158,12 +192,352 @@ sub process_request {
     syslog("LOG_INFO", '%s: shutting down', $transport);
 }
 
+# mux_input is the callback used by Net::Server to handle
+# an incoming connection request when the peronsality is 
+# Multiplex.
+
+
+sub init_cache {
+    return $cache if $cache;
+
+    if (!$config->{cache}) {
+        syslog('LOG_ERR', "Cache servers needed");
+        return;
+    }
+    my $servers = $config->{cache}->{server};
+    syslog('LOG_DEBUG', "Cache servers: @$servers");
+
+    $cache = Cache::Memcached->new({servers => $servers}) or
+        syslog('LOG_ERR', "Unable to initialize memcache: @$servers");
+
+    return $cache;
+}
+
+# In the parent, pending connections are tracked as an array of PIDs.
+# As each child process completes the login dance, it plops some
+# info into memcache for us to pickup and copy into our active
+# connections.  No memcache entry means the child login dance
+# is still in progress.
+sub check_pending_connections {
+    return unless @pending_connections;
+
+    init_cache();
+
+    syslog('LOG_DEBUG', 
+        "multi: pending connections to inspect: @pending_connections");
+
+    # get_multi will return all completed login blobs
+    my @keys = map { "sip_pending_auth_$_" } @pending_connections;
+    my $values = $cache->get_multi(@keys);
+
+    for my $key (keys %$values) {
+        my $VAR1; # for Dump() -> eval;
+        eval $values->{$key}; # Data::Dumper->Dump string
+
+        my $id = $VAR1->{id}; # conn_id
+        $active_connections{$id}{net_server_parts} = $VAR1->{net_server_parts};
+
+        if ($VAR1->{success}) {
+            if ($active_connections{$id}{net_server_parts}{state}) {
+                local $Data::Dumper::Indent = 0;
+                syslog('LOG_DEBUG', "multi: conn_id=$id has state: ".
+                    Dumper($active_connections{$id}{net_server_parts}{state}));
+            }
+
+        } else {
+            syslog('LOG_INFO', "Child $id failed SIP login; removing connection");
+            delete $active_connections{$id};
+        }
+
+        # clean up ---
+
+        syslog('LOG_DEBUG', 
+            "multi: pending connection for conn_id=$id resolved");
+        $cache->delete($key);
+        @pending_connections = grep {$_ ne $id} @pending_connections;
+    }
+
+    syslog('LOG_DEBUG', 
+        "multi: connections still pending after check: @pending_connections")
+        if @pending_connections;
+
+    if (0) {
+        # useful for debugging connection-specific state information
+        local $Data::Dumper::Indent = 0;
+        for my $conn_id (keys %active_connections) {
+            syslog('LOG_DEBUG', "Connection $conn_id has state "
+                .Dumper($active_connections{$conn_id}{net_server_parts}{state}));
+        }
+    }
+}
+
+sub sig_chld {
+    if ( !scalar(keys(%kid_hash))) { # not using mux mode
+        1 while waitpid(-1, WNOHANG) > 0;
+    } else {
+        for (keys(%kid_hash)) {
+            if ( my $reaped = waitpid($_, WNOHANG) > 0 ) {
+                syslog('LOG_DEBUG', "Reaping child $_");
+                # Mourning... done.
+                $kid_count--;
+                # note: in some cases (when the primary connection is severed),
+                # the active connection is cleaned up in mux_close.  
+                if ($active_connections{$kid_hash{$_}}) {
+                    if ($active_connections{$kid_hash{$_}}{worker_pipe}) {
+                        syslog('LOG_DEBUG', "Closing worker pipe after timeout for: $kid_hash{$_}");
+                        delete $active_connections{$kid_hash{$_}}{worker_pipe};
+                    }
+                }
+                delete $kid_hash{$_};
+            }
+        }
+    }
+}
+
+sub mux_connection {
+    my ($mself, $fh) = @_;
+
+    my ($peeraddr, $peerport) = (
+        $mself->{net_server}->{server}->{peeraddr},
+        $mself->{net_server}->{server}->{peerport}
+    );
+
+    # create a new connection ID for this MUX handler.
+    $mself->{conn_id} = "$peeraddr:$peerport\@" . time();
+    syslog('LOG_DEBUG', "New connection created: ".$mself->{conn_id});
+}
+
+sub mux_input {
+    my $mself = shift;
+    my $mux = shift;
+    my $mux_fh = shift;
+    my $str_ref = shift;
+
+    my $conn_id = $mself->{conn_id}; # see mux_connection
+
+    # and process any pending logins
+    check_pending_connections();
+
+    my $c = scalar(keys %active_connections);
+    syslog("LOG_DEBUG", "multi: inbound message on connection $conn_id; $c total");
+
+    if ($kid_count >= $max_concurrent) {
+        # XXX should we say something to the client? maybe wait and try again?
+        syslog('LOG_ERR', "Unwilling to fork new child process, at least $max_concurrent already ongoing");
+        return;
+    }
+
+    my $self;
+    if (!$active_connections{$conn_id}) { # Brand new connection, log them in
+        $self = $mself->{net_server};
+
+        my ($sockaddr, $port, $proto);
+    
+        $self->{config} = $config;
+    
+        $sockaddr = $self->{server}->{sockaddr};
+        $port     = $self->{server}->{sockport};
+        $proto    = $self->{server}->{client}->NS_proto();
+    
+        syslog('LOG_INFO', "New client $conn_id connecting to $sockaddr on port $port and proto $proto");
+    
+        $self->{service} = $config->find_service( $sockaddr, $port, $proto );
+    
+        if (! defined($self->{service})) {
+            syslog( "LOG_ERR", "process_request: Unrecognized server connection: %s:%s/%s",
+                $sockaddr, $port, $proto );
+            syslog('LOG_ERR', "process_request: Bad server connection");
+            return;
+        }
+    
+        my $transport = $transports{ $self->{service}->{transport} };
+    
+        if ( !defined($transport) ) {
+            syslog("LOG_WARNING", "Unknown transport, dropping");
+            return;
+        }
+
+        # We stick this here, assuming success. Cleanup comes later via memcache and reaper.
+        $active_connections{$conn_id} = {
+            id => $conn_id,
+            transport => $transport,
+            net_server => $self,
+            worker_pipe => IO::Pipe->new
+        };
+        # This is kind of kinky, but allows us to avoid requiring Socket::Linux.
+        # A simple "Socket::Linux"->use won't suffice since we need access to
+        # all of it's bareword constants as well.
+        eval <<'        EVAL';
+        use Socket::Linux qw(TCP_KEEPINTVL TCP_KEEPIDLE TCP_KEEPCNT);
+        setsockopt($self->{server}->{client}, SOL_SOCKET,  SO_KEEPALIVE, 1);
+        setsockopt($self->{server}->{client}, IPPROTO_TCP, TCP_KEEPIDLE, 120);
+        setsockopt($self->{server}->{client}, IPPROTO_TCP, TCP_KEEPINTVL, 10);
+        EVAL
+
+        my $pid = fork();
+        if (!defined($pid) or $pid < 0) {
+            syslog('LOG_ERR', "Unable to fork new child process $!");
+            return;
+        }
+
+        if ($pid == 0) { # in kid
+            $active_connections{$conn_id}{worker_pipe}->reader;
+
+            $cache = undef; # don't use the same cache handle as our parent.
+            my $cache_data = {id => $conn_id};
+
+            # Once the login dance is complete in SipMsg, login_complete() is
+            # called so that we may cache the results before the login response
+            # message is delivered to the client.  
+            $self->{login_complete} = sub {
+                my $status = shift;
+
+                if ($status) { # login OK
+
+                    $self->{state} = $self->{ils}->state() if (UNIVERSAL::can($self->{ils},'state'));
+
+                    $cache_data->{success} = 1;
+                    $cache_data->{net_server_parts} = {
+                        map { ($_ => $$self{$_}) } qw/state institution account/
+                    };
+
+                    # Stash the ILS module somewhere handy for later
+                    $cache_data->{net_server_parts}{ils} = ref($self->{ils});
+
+                } else {
+                    $cache_data->{success} = 0;
+                }
+
+                init_cache()->set(
+                    "sip_pending_auth_$conn_id", 
+                    Data::Dumper->Dump([$cache_data]),
+                    # Our cache entry is only inspected when the parent process
+                    # wakes up from an inbound request.  If this is the last child
+                    # to connect before a long period of inactivity, our cache
+                    # entry may sit unnattended for some time, hence the
+                    # 12 hour cache timeout.  XXX: make it configurable?
+                    43200 # 12 hours
+                );
+
+                $self->{login_complete_called} = 1;
+            };
+
+            syslog('LOG_DEBUG', "Child $$ / $conn_id kicking off login process");
+
+            eval { &$transport($self, $active_connections{$conn_id}{worker_pipe}) };
+
+            if ($@) {
+                syslog('LOG_ERR', "ILS login error: $@");
+                $self->{login_complete}->(0) unless $self->{login_complete_called};
+            }
+
+            $self->sip_protocol_loop(
+                $active_connections{$conn_id}{worker_pipe},
+                $self->{account}->{'worker-keepalive'}
+                    // $self->{institution}->{'worker-keepalive'}
+                    // $worker_keepalive
+            );
+
+            exit(0);
+
+        } else {
+            my $fh = $active_connections{$conn_id}{worker_pipe};
+            $fh->writer;
+            $fh->autoflush;
+            print $fh $$str_ref;
+            push(@pending_connections, $conn_id);
+            $kid_hash{$pid} = $conn_id;
+            $kid_count++;
+        }
+
+    } else {
+
+        $self = $active_connections{$conn_id}->{net_server};
+        my $ns_parts = $active_connections{$conn_id}->{net_server_parts};
+
+        if ($active_connections{$conn_id}{worker_pipe}) {
+            syslog('LOG_DEBUG', "multi: parent writing msg to existing child process");
+            my $fh = $active_connections{$conn_id}{worker_pipe};
+            print $fh $$str_ref;
+
+        } else { # waited too long, kid and pipe are gone
+            $active_connections{$conn_id}{worker_pipe} = IO::Pipe->new;
+            syslog('LOG_DEBUG', "multi: parent creating new pipe for existing connection");
+    
+            my $pid = fork();
+            if (!defined($pid) or $pid < 0) {
+                syslog('LOG_ERR', "Unable to fork new child process $!");
+                return;
+            }
+        
+            if ($pid == 0) { # in kid
+                $active_connections{$conn_id}{worker_pipe}->reader;
+        
+                syslog("LOG_DEBUG", "multi: $conn_id to be processed by child $$");
+        
+                # build the connection we deleted after logging in
+                $ns_parts->{ils}->use; # module name in the parent
+                $self->{$_} = $ns_parts->{$_} for keys %$ns_parts;
+                $self->{ils} = $ns_parts->{ils}->new(
+                    $ns_parts->{institution}, $ns_parts->{account}, $ns_parts->{state});
+        
+                # MUX mode only works with protocol version 2, because it assumes
+                # a SIP login has occured.  However, since the login occured 
+                # within a different now-dead process, the previously modified
+                # protocol_version is lost.  Re-apply it globally here.
+                $protocol_version = 2;
+        
+                if (!$self->{ils}) {
+                    syslog('LOG_ERR', "Unable to build ILS module in mux child");
+                    exit(0);
+                }
+        
+                $self->sip_protocol_loop(
+                    $active_connections{$conn_id}{worker_pipe},
+                    $self->{account}->{'worker-keepalive'}
+                        // $self->{institution}->{'worker-keepalive'}
+                        // $worker_keepalive
+                );
+
+       
+                exit(0);
+        
+            } else { # in parent
+                $active_connections{$conn_id}{worker_pipe}->writer;
+                my $fh = $active_connections{$conn_id}{worker_pipe};
+                $fh->autoflush;
+                print $fh $$str_ref;
+                $kid_count++;
+                $kid_hash{$pid} = $conn_id;
+                syslog("LOG_DEBUG", "multi: $conn_id forked child $pid; $kid_count total");
+            } 
+        }
+    }
+
+    # clear read data from the mux string ref
+    $$str_ref = '';
+}
+
+# client disconnected, remove the active connection
+sub mux_close {
+    my ($self, $mux, $fh) = @_;
+    my $conn_id = $self->{conn_id};
+
+    delete $active_connections{$conn_id};
+    syslog("LOG_DEBUG", "multi: mux_close cleaning up child: $conn_id; ". 
+        scalar(keys %active_connections)." remain");
+}
+
+
 #
 # Transports
 #
 
 sub raw_transport {
     my $self = shift;
+    my $fh = shift || *STDIN;
+
     my ($uid, $pwd);
     my $input;
     my $service = $self->{service};
@@ -177,7 +551,7 @@ sub raw_transport {
 
     while ($strikes--) {
         alarm $timeout;
-        $input = Sip::read_SIP_packet(*STDIN);
+        $input = Sip::read_SIP_packet($fh);
         alarm 0;
 
         if (!$input) {
@@ -194,6 +568,7 @@ sub raw_transport {
                 die 'raw_transport: sending SC status before login not enabled, exiting';
             }
             Sip::MsgType::handle($input, $self, SC_STATUS);
+            $strikes++; # it's allowed, don't charge for it
             next;
         }
         last if Sip::MsgType::handle($input, $self, LOGIN);
@@ -211,11 +586,12 @@ sub raw_transport {
     syslog("LOG_DEBUG", "raw_transport: uname/inst: '%s/%s'",
         $self->{account}->{id},
         $self->{account}->{institution});
-
 }
 
 sub telnet_transport {
     my $self = shift;
+    my $fh = shift || *STDIN;
+
     my ($uid, $pwd);
     my $strikes = 3;
     my $account = undef;
@@ -233,12 +609,12 @@ sub telnet_transport {
     while ($strikes--) {
         print "login: ";
         alarm $timeout;
-        $uid = <STDIN>;
+        $uid = <$fh>;
         alarm 0;
 
         print "password: ";
         alarm $timeout;
-        $pwd = <STDIN>;
+        $pwd = <$fh>;
         alarm 0;
 
         $uid =~ s/[\r\n]+$//;
@@ -280,11 +656,13 @@ sub http_transport {
 # processes are the same:
 sub sip_protocol_loop {
     my $self = shift;
+    my $fh = shift || *STDIN;
+    my $keepalive = shift;
     my $expect;
     my $service = $self->{service};
     my $config  = $self->{config};
     my $input;
-    my $timeout = $self->{service}->{timeout} || $config->{timeout} || 0;
+    my $timeout = $keepalive || $self->{service}->{timeout} || $config->{timeout} || 0;
 
     # Now that the terminal has logged in, the first message
     # we recieve must be an SC_STATUS message.  But it might be
@@ -303,12 +681,15 @@ sub sip_protocol_loop {
     $expect = '';
 
     alarm $timeout; # First loop timeout
-    while ( $input = Sip::read_SIP_packet(*STDIN) ) {
+    while ( $input = Sip::read_SIP_packet($fh) ) {
         alarm 0; # Don't timeout while we are processing
         $input =~ s/[\r\n]+$//sm;    # Strip off any trailing line ends
 
         my $status = Sip::MsgType::handle($input, $self, $expect);
-        next if $status eq REQUEST_ACS_RESEND;
+        if ($status eq REQUEST_ACS_RESEND) {
+            alarm $timeout;
+            next;
+        }
 
         if (!$status) {
             syslog("LOG_ERR", "raw_transport: failed to handle %s", substr($input,0,2));
@@ -320,8 +701,11 @@ sub sip_protocol_loop {
             die "sip_protocol_loop: exiting: expected '$expect', received '$status'";
         }
 
+        last if (defined $keepalive && !$keepalive);
+
         # We successfully received and processed what we were expecting
         $expect = '';
         alarm $timeout; # Next loop timeout
+
     }
 }
index f9e3cf3..ce5d939 100644 (file)
@@ -1,8 +1,10 @@
 <!--
 #
 # Copyright (C) 2006-2008  Georgia Public Library Service
+# Copyright (C) 2013 Equinox Software, Inc.
 # 
 # Author: David J. Fiander
+# Author: Mike Rylander
 # 
 # This program is free software; you can redistribute it and/or
 # modify it under the terms of version 2 of the GNU General Public
 
   <error-detect enabled="true" />
 
-  <!-- Set Net::Server::PreFork runtime parameters -->
+  <!-- Set Net::Server runtime parameters.  "personality" may -->
+  <!-- be either PreFork or Mulitplex. -->
   <server-params
+           personality='PreFork'
            min_servers='1'
            min_spare_servers='0' />
   
       timeout="60" />
   </listeners>
 
+  <!-- One or more Memecache servers are required for Mulitplex mode. -->
+  <!-- Cache server(s) are ignored in non-Multiplex mode -->
+  <cache>
+    <server>127.0.0.1:11211</server>
+  </cache>
+
   <accounts>
       <login id="scclient" password="clientpwd" institution="UWOLS">
       </login>
diff --git a/Sip.pm b/Sip.pm
index a687cd7..35ba419 100644 (file)
--- a/Sip.pm
+++ b/Sip.pm
@@ -241,14 +241,16 @@ sub write_msg {
         $msg .= checksum($msg);
     }
 
+    my $outmsg = "$msg\r";
 
     if ($file) {
-        print $file "$msg\r";
+        print $file $outmsg;
     } else {
-        print "$msg\r";
-        syslog("LOG_INFO", "OUTPUT MSG: '$msg'");
+        my $rv = POSIX::write(fileno(STDOUT), $outmsg, length($outmsg));
+        syslog("LOG_ERR", "Error writing to STDOUT $!") unless $rv;
     }
 
+    syslog("LOG_INFO", "OUTPUT MSG: '$msg'");
     $last_response = $msg;
 }
 
index a673fb9..ce9144e 100644 (file)
@@ -38,14 +38,14 @@ my $parser = new XML::Simple(
     KeyAttr => {
         login       => '+id',
         institution => '+id',
-        service     => '+port'
+        service     => '+port',
     },
     GroupTags => {
         listeners    => 'service',
         accounts     => 'login',
         institutions => 'institution',
     },
-    ForceArray => [ 'service', 'login', 'institution' ],
+    ForceArray => [ 'service', 'login', 'institution', 'server' ],
     ValueAttr  => {
         'error-detect' => 'enabled',
         'timeout'      => 'value',
index 543838e..779d41d 100644 (file)
@@ -859,6 +859,8 @@ sub handle_login {
         _load_ils_handler($server, $uid);
     }
 
+    $server->{login_complete}->($status) if $server->{login_complete};
+
     $self->write_msg(LOGIN_RESP . $status);
 
     return $status ? LOGIN : '';