From 58a28e9208ddbb1c9bf2c546f1b332e565f02526 Mon Sep 17 00:00:00 2001 From: Mike Rylander Date: Fri, 15 Aug 2014 19:12:32 -0400 Subject: [PATCH] LP#1339190: Let workers hang around for a bit for higher-rate clients For some clients, such as AMH (sorters), the per-message connections cause too much latency. So, instead, we'll let their backends hang around for a while. This is controlled by a new attribute on the server-params element, worker-keepalive, as a peer to the personality value. This is measured in seconds and the default is 5. A value of 0 here will disable this feature altogether. Signed-off-by: Mike Rylander --- SIPServer.pm | 163 ++++++++++++++++++++++++++++++++++------------------------- 1 file changed, 94 insertions(+), 69 deletions(-) diff --git a/SIPServer.pm b/SIPServer.pm index 5dd402f..1f876ab 100755 --- a/SIPServer.pm +++ b/SIPServer.pm @@ -29,6 +29,7 @@ use Net::Server::Multiplex; use Net::Server::PreFork; use Net::Server::Proto; use IO::Socket::INET; +use IO::Pipe; use Socket qw(:crlf SOL_SOCKET SO_KEEPALIVE IPPROTO_TCP TCP_KEEPALIVE); use IO::String; use Socket qw(:crlf); @@ -106,12 +107,14 @@ push @parms, # can be used here to limit the number of concurrent in-flight requests # to avoid a fork-bomb DoS situation. The default is 256. # +my $worker_keepalive = 5; my $max_concurrent = 256; if (defined($config->{'server-params'})) { while (my ($key, $val) = each %{$config->{'server-params'}}) { push @parms, $key . '=' . $val; @ISA = ('Net::Server::'.$val) if ($key eq 'personality'); $max_concurrent = $val if ($key eq 'max-concurrent'); + $worker_keepalive = $val if ($key eq 'worker-keepalive'); } } @@ -194,17 +197,6 @@ sub process_request { # Multiplex. -sub REAPER { - for (keys(%kid_hash)) { - if ( my $reaped = waitpid($_, WNOHANG) > 0 ) { - # Mourning... done. - $kid_count--; - delete $kid_hash{$_}; - } - } - $SIG{CHLD} = sub { REAPER() }; -} - sub init_cache { return $cache if $cache; @@ -274,18 +266,28 @@ sub check_pending_connections { if @pending_connections; } +sub REAPER { + for (keys(%kid_hash)) { + if ( my $reaped = waitpid($_, WNOHANG) > 0 ) { + # Mourning... done. + $kid_count--; + if ($active_connections{$kid_hash{$_}}) { + if ($active_connections{$kid_hash{$_}}{worker_pipe}) { + delete $active_connections{$kid_hash{$_}}{worker_pipe}; + } + } + delete $kid_hash{$_}; + } + } + $SIG{CHLD} = sub { REAPER() }; +} + sub mux_input { my $mself = shift; my $mux = shift; my $mux_fh = shift; my $str_ref = shift; - # clone the mux string into a file handle - my $str_fh = IO::String->new(''.$$str_ref); - - # clear read data from the mux string ref - $$str_ref = ''; - my ($peeraddr, $peerport) = ( $mself->{net_server}->{server}->{peeraddr}, $mself->{net_server}->{server}->{peerport} @@ -341,8 +343,13 @@ sub mux_input { return; } - # We stick this here, assuming success. Cleanup comes later via PERMAFROST(). - $active_connections{$conn_id} = { id => $conn_id, transport => $transport, net_server => $self }; + # We stick this here, assuming success. Cleanup comes later via memcache and reaper. + $active_connections{$conn_id} = { + id => $conn_id, + transport => $transport, + net_server => $self, + worker_pipe => IO::Pipe->new + }; my $pid = fork(); if (!defined($pid) or $pid < 0) { @@ -351,6 +358,7 @@ sub mux_input { } if ($pid == 0) { # in kid + $active_connections{$conn_id}{worker_pipe}->reader; $cache = undef; # don't use the same cache handle as our parent. my $cache_data = {id => $conn_id}; @@ -395,70 +403,79 @@ sub mux_input { syslog('LOG_DEBUG', "Child $$ / $conn_id kicking of login process"); - eval { &$transport($self, $str_fh) }; + eval { &$transport($self, $active_connections{$conn_id}{worker_pipe}) }; if ($@) { syslog('LOG_ERR', "ILS login error: $@"); $self->{login_complete}->(0) unless $self->{login_complete_called}; } + $self->sip_protocol_loop($active_connections{$conn_id}{worker_pipe}, $worker_keepalive); + exit(0); } else { + my $fh = $active_connections{$conn_id}{worker_pipe}; + print $fh $$str_ref; push(@pending_connections, $pid); - $kid_hash{$pid} = 1; + $kid_hash{$pid} = $conn_id; $kid_count++; } - # nothing else for the parent to do until login completes - return; # NEXT CUSTOMER PLEASE STEP UP - } - - $self = $active_connections{$conn_id}->{net_server}; - - my $pid = fork(); - if (!defined($pid) or $pid < 0) { - syslog('LOG_ERR', "Unable to fork new child process $!"); - return; - } - - if ($pid == 0) { # in kid - - syslog("LOG_DEBUG", "multi: $conn_id to be processed by child $$"); - - # build the connection we deleted after logging in - $self->{ils}->use; # module name in the parent - $self->{ils} = $self->{ils}->new($self->{institution}, $self->{account}, $self->{state}); - - # MUX mode only works with protocol version 2, because it assumes - # a SIP login has occured. However, since the login occured - # within a different now-dead process, the previously modified - # protocol_version is lost. Re-apply it globally here. - $protocol_version = 2; - - if (!$self->{ils}) { - syslog('LOG_ERR', "Unable to build ILS module in mux child"); - exit(0); - } - - # build the connection we deleted after logging in - my $input = Sip::read_SIP_packet($str_fh); - $input =~ s/[\r\n]+$//sm; # Strip off any trailing line ends + } else { - my $status = Sip::MsgType::handle($input, $self, ''); + $self = $active_connections{$conn_id}->{net_server}; + + if ($active_connections{$conn_id}{worker_pipe}) { + my $fh = $active_connections{$conn_id}{worker_pipe}; + print $fh $$str_ref; - if (!$status) { - syslog("LOG_ERR", "raw_transport: failed to handle %s", substr($input,0,2)); - die "sip_protocol_loop: failed Sip::MsgType::handle('$input', $self, '')"; + } else { # waited too long, kid and pipe are gone + $active_connections{$conn_id}{worker_pipe} = IO::Pipe->new; + + my $pid = fork(); + if (!defined($pid) or $pid < 0) { + syslog('LOG_ERR', "Unable to fork new child process $!"); + return; + } + + if ($pid == 0) { # in kid + $active_connections{$conn_id}{worker_pipe}->reader; + + syslog("LOG_DEBUG", "multi: $conn_id to be processed by child $$"); + + # build the connection we deleted after logging in + $self->{ils}->use; # module name in the parent + $self->{ils} = $self->{ils}->new($self->{institution}, $self->{account}, $self->{state}); + + # MUX mode only works with protocol version 2, because it assumes + # a SIP login has occured. However, since the login occured + # within a different now-dead process, the previously modified + # protocol_version is lost. Re-apply it globally here. + $protocol_version = 2; + + if (!$self->{ils}) { + syslog('LOG_ERR', "Unable to build ILS module in mux child"); + exit(0); + } + + $self->sip_protocol_loop($active_connections{$conn_id}{worker_pipe}, $worker_keepalive); + + exit(0); + + } else { # in parent + $active_connections{$conn_id}{worker_pipe}->writer; + my $fh = $active_connections{$conn_id}{worker_pipe}; + print $fh $$str_ref; + $kid_count++; + $kid_hash{$pid} = $conn_id; + syslog("LOG_DEBUG", "multi: $conn_id forked child $pid; $kid_count total"); + } } + } - exit(0); - - } else { # in parent - $kid_count++; - $kid_hash{$pid} = 1; - syslog("LOG_DEBUG", "multi: $conn_id forked child $pid; $kid_count total"); - } + # clear read data from the mux string ref + $$str_ref = ''; } # client disconnected, remove the active connection @@ -598,11 +615,13 @@ sub http_transport { # processes are the same: sub sip_protocol_loop { my $self = shift; + my $fh = shift || *STDIN; + my $keepalive = shift; my $expect; my $service = $self->{service}; my $config = $self->{config}; my $input; - my $timeout = $self->{service}->{timeout} || $config->{timeout} || 0; + my $timeout = $keepalive || $self->{service}->{timeout} || $config->{timeout} || 0; # Now that the terminal has logged in, the first message # we recieve must be an SC_STATUS message. But it might be @@ -621,12 +640,15 @@ sub sip_protocol_loop { $expect = ''; alarm $timeout; # First loop timeout - while ( $input = Sip::read_SIP_packet(*STDIN) ) { + while ( $input = Sip::read_SIP_packet($fh) ) { alarm 0; # Don't timeout while we are processing $input =~ s/[\r\n]+$//sm; # Strip off any trailing line ends my $status = Sip::MsgType::handle($input, $self, $expect); - next if $status eq REQUEST_ACS_RESEND; + if ($status eq REQUEST_ACS_RESEND) { + alarm $timeout; + next; + } if (!$status) { syslog("LOG_ERR", "raw_transport: failed to handle %s", substr($input,0,2)); @@ -638,8 +660,11 @@ sub sip_protocol_loop { die "sip_protocol_loop: exiting: expected '$expect', received '$status'"; } + last if (defined $keepalive && !$keepalive); + # We successfully received and processed what we were expecting $expect = ''; alarm $timeout; # Next loop timeout + } } -- 2.11.0