LP#1339190: Let workers hang around for a bit for higher-rate clients
authorMike Rylander <mrylander@gmail.com>
Fri, 15 Aug 2014 23:12:32 +0000 (19:12 -0400)
committerMike Rylander <mrylander@gmail.com>
Fri, 15 Aug 2014 23:12:32 +0000 (19:12 -0400)
For some clients, such as AMH (sorters), the per-message connections
cause too much latency.  So, instead, we'll let their backends hang
around for a while.  This is controlled by a new attribute on the
server-params element, worker-keepalive, as a peer to the personality
value.

This is measured in seconds and the default is 5. A value of 0 here
will disable this feature altogether.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
SIPServer.pm

index 5dd402f..1f876ab 100755 (executable)
@@ -29,6 +29,7 @@ use Net::Server::Multiplex;
 use Net::Server::PreFork;
 use Net::Server::Proto;
 use IO::Socket::INET;
+use IO::Pipe;
 use Socket qw(:crlf SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_KEEPALIVE);
 use IO::String;
 use Socket qw(:crlf);
@@ -106,12 +107,14 @@ push @parms,
 # can be used here to limit the number of concurrent in-flight requests
 # to avoid a fork-bomb DoS situation.  The default is 256.
 #
+my $worker_keepalive = 5;
 my $max_concurrent = 256;
 if (defined($config->{'server-params'})) {
     while (my ($key, $val) = each %{$config->{'server-params'}}) {
         push @parms, $key . '=' . $val;
         @ISA = ('Net::Server::'.$val) if ($key eq 'personality');
         $max_concurrent = $val if ($key eq 'max-concurrent');
+        $worker_keepalive = $val if ($key eq 'worker-keepalive');
     }
 }
 
@@ -194,17 +197,6 @@ sub process_request {
 # Multiplex.
 
 
-sub REAPER {
-  for (keys(%kid_hash)) {
-    if ( my $reaped = waitpid($_, WNOHANG) > 0 ) {
-      # Mourning... done.
-      $kid_count--;
-      delete $kid_hash{$_};
-    }
-  }
-  $SIG{CHLD} = sub { REAPER() };
-}
-
 sub init_cache {
     return $cache if $cache;
 
@@ -274,18 +266,28 @@ sub check_pending_connections {
         if @pending_connections;
 }
 
+sub REAPER {
+    for (keys(%kid_hash)) {
+        if ( my $reaped = waitpid($_, WNOHANG) > 0 ) {
+            # Mourning... done.
+            $kid_count--;
+            if ($active_connections{$kid_hash{$_}}) {
+                if ($active_connections{$kid_hash{$_}}{worker_pipe}) {
+                    delete $active_connections{$kid_hash{$_}}{worker_pipe};
+                }
+            }
+            delete $kid_hash{$_};
+        }
+    }
+    $SIG{CHLD} = sub { REAPER() };
+}
+
 sub mux_input {
     my $mself = shift;
     my $mux = shift;
     my $mux_fh = shift;
     my $str_ref = shift;
 
-    # clone the mux string into a file handle
-    my $str_fh = IO::String->new(''.$$str_ref);
-
-    # clear read data from the mux string ref
-    $$str_ref = '';
-
     my ($peeraddr, $peerport) = (
         $mself->{net_server}->{server}->{peeraddr},
         $mself->{net_server}->{server}->{peerport}
@@ -341,8 +343,13 @@ sub mux_input {
             return;
         }
 
-        # We stick this here, assuming success. Cleanup comes later via PERMAFROST().
-        $active_connections{$conn_id} = { id => $conn_id, transport => $transport, net_server => $self };
+        # We stick this here, assuming success. Cleanup comes later via memcache and reaper.
+        $active_connections{$conn_id} = {
+            id => $conn_id,
+            transport => $transport,
+            net_server => $self,
+            worker_pipe => IO::Pipe->new
+        };
  
         my $pid = fork();
         if (!defined($pid) or $pid < 0) {
@@ -351,6 +358,7 @@ sub mux_input {
         }
 
         if ($pid == 0) { # in kid
+            $active_connections{$conn_id}{worker_pipe}->reader;
 
             $cache = undef; # don't use the same cache handle as our parent.
             my $cache_data = {id => $conn_id};
@@ -395,70 +403,79 @@ sub mux_input {
 
             syslog('LOG_DEBUG', "Child $$ / $conn_id kicking of login process");
 
-            eval { &$transport($self, $str_fh) };
+            eval { &$transport($self, $active_connections{$conn_id}{worker_pipe}) };
 
             if ($@) {
                 syslog('LOG_ERR', "ILS login error: $@");
                 $self->{login_complete}->(0) unless $self->{login_complete_called};
             }
 
+            $self->sip_protocol_loop($active_connections{$conn_id}{worker_pipe}, $worker_keepalive);
+
             exit(0);
 
         } else {
+            my $fh = $active_connections{$conn_id}{worker_pipe};
+            print $fh $$str_ref;
             push(@pending_connections, $pid);
-            $kid_hash{$pid} = 1;
+            $kid_hash{$pid} = $conn_id;
             $kid_count++;
         }
 
-        # nothing else for the parent to do until login completes
-        return; # NEXT CUSTOMER PLEASE STEP UP
-    }
-
-    $self = $active_connections{$conn_id}->{net_server};
-
-    my $pid = fork();
-    if (!defined($pid) or $pid < 0) {
-        syslog('LOG_ERR', "Unable to fork new child process $!");
-        return;
-    }
-
-    if ($pid == 0) { # in kid
-
-        syslog("LOG_DEBUG", "multi: $conn_id to be processed by child $$");
-
-        # build the connection we deleted after logging in
-        $self->{ils}->use; # module name in the parent
-        $self->{ils} = $self->{ils}->new($self->{institution}, $self->{account}, $self->{state});
-
-        # MUX mode only works with protocol version 2, because it assumes
-        # a SIP login has occured.  However, since the login occured 
-        # within a different now-dead process, the previously modified
-        # protocol_version is lost.  Re-apply it globally here.
-        $protocol_version = 2;
-
-        if (!$self->{ils}) {
-            syslog('LOG_ERR', "Unable to build ILS module in mux child");
-            exit(0);
-        }
-
-        # build the connection we deleted after logging in
-        my $input = Sip::read_SIP_packet($str_fh);
-        $input =~ s/[\r\n]+$//sm;    # Strip off any trailing line ends
+    } else {
 
-        my $status = Sip::MsgType::handle($input, $self, '');
+        $self = $active_connections{$conn_id}->{net_server};
+    
+        if ($active_connections{$conn_id}{worker_pipe}) {
+            my $fh = $active_connections{$conn_id}{worker_pipe};
+            print $fh $$str_ref;
 
-        if (!$status) {
-            syslog("LOG_ERR", "raw_transport: failed to handle %s", substr($input,0,2));
-            die "sip_protocol_loop: failed Sip::MsgType::handle('$input', $self, '')";
+        } else { # waited too long, kid and pipe are gone
+            $active_connections{$conn_id}{worker_pipe} = IO::Pipe->new;
+    
+            my $pid = fork();
+            if (!defined($pid) or $pid < 0) {
+                syslog('LOG_ERR', "Unable to fork new child process $!");
+                return;
+            }
+        
+            if ($pid == 0) { # in kid
+                $active_connections{$conn_id}{worker_pipe}->reader;
+        
+                syslog("LOG_DEBUG", "multi: $conn_id to be processed by child $$");
+        
+                # build the connection we deleted after logging in
+                $self->{ils}->use; # module name in the parent
+                $self->{ils} = $self->{ils}->new($self->{institution}, $self->{account}, $self->{state});
+        
+                # MUX mode only works with protocol version 2, because it assumes
+                # a SIP login has occured.  However, since the login occured 
+                # within a different now-dead process, the previously modified
+                # protocol_version is lost.  Re-apply it globally here.
+                $protocol_version = 2;
+        
+                if (!$self->{ils}) {
+                    syslog('LOG_ERR', "Unable to build ILS module in mux child");
+                    exit(0);
+                }
+        
+                $self->sip_protocol_loop($active_connections{$conn_id}{worker_pipe}, $worker_keepalive);
+       
+                exit(0);
+        
+            } else { # in parent
+                $active_connections{$conn_id}{worker_pipe}->writer;
+                my $fh = $active_connections{$conn_id}{worker_pipe};
+                print $fh $$str_ref;
+                $kid_count++;
+                $kid_hash{$pid} = $conn_id;
+                syslog("LOG_DEBUG", "multi: $conn_id forked child $pid; $kid_count total");
+            } 
         }
+    }
 
-        exit(0);
-
-    } else { # in parent
-        $kid_count++;
-        $kid_hash{$pid} = 1;
-        syslog("LOG_DEBUG", "multi: $conn_id forked child $pid; $kid_count total");
-    } 
+    # clear read data from the mux string ref
+    $$str_ref = '';
 }
 
 # client disconnected, remove the active connection
@@ -598,11 +615,13 @@ sub http_transport {
 # processes are the same:
 sub sip_protocol_loop {
     my $self = shift;
+    my $fh = shift || *STDIN;
+    my $keepalive = shift;
     my $expect;
     my $service = $self->{service};
     my $config  = $self->{config};
     my $input;
-    my $timeout = $self->{service}->{timeout} || $config->{timeout} || 0;
+    my $timeout = $keepalive || $self->{service}->{timeout} || $config->{timeout} || 0;
 
     # Now that the terminal has logged in, the first message
     # we recieve must be an SC_STATUS message.  But it might be
@@ -621,12 +640,15 @@ sub sip_protocol_loop {
     $expect = '';
 
     alarm $timeout; # First loop timeout
-    while ( $input = Sip::read_SIP_packet(*STDIN) ) {
+    while ( $input = Sip::read_SIP_packet($fh) ) {
         alarm 0; # Don't timeout while we are processing
         $input =~ s/[\r\n]+$//sm;    # Strip off any trailing line ends
 
         my $status = Sip::MsgType::handle($input, $self, $expect);
-        next if $status eq REQUEST_ACS_RESEND;
+        if ($status eq REQUEST_ACS_RESEND) {
+            alarm $timeout;
+            next;
+        }
 
         if (!$status) {
             syslog("LOG_ERR", "raw_transport: failed to handle %s", substr($input,0,2));
@@ -638,8 +660,11 @@ sub sip_protocol_loop {
             die "sip_protocol_loop: exiting: expected '$expect', received '$status'";
         }
 
+        last if (defined $keepalive && !$keepalive);
+
         # We successfully received and processed what we were expecting
         $expect = '';
         alarm $timeout; # Next loop timeout
+
     }
 }