From: erickson Date: Wed, 1 Sep 2010 00:17:12 +0000 (+0000) Subject: Replace Net::Server with local pre-forking server X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=b88bdf98e74d52f5c875a8db5c1422f297eabd1b;p=opensrf%2Fbjwebb.git Replace Net::Server with local pre-forking server Support max/min children and max/min spare children For more, see http://libmail.georgialibraries.org/pipermail/open-ils-dev/2010-May/006068.html git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@2016 9efc2488-bf62-4759-914b-345cdb29e865 --- diff --git a/bin/opensrf-perl.pl.in b/bin/opensrf-perl.pl.in index 5751eed..cc99759 100755 --- a/bin/opensrf-perl.pl.in +++ b/bin/opensrf-perl.pl.in @@ -110,6 +110,7 @@ sub do_init { # start a specific service sub do_start { my $service = shift; + if(-e get_pid_file($service)) { msg("$service is already running"); return; @@ -117,19 +118,9 @@ sub do_start { load_settings() if $service eq 'opensrf.settings'; - my $sclient = OpenSRF::Utils::SettingsClient->new; - my $apps = $sclient->config_value("activeapps", "appname"); - OpenSRF::Transport::PeerHandle->retrieve->disconnect; - if(grep { $_ eq $service } @hosted_services) { return unless do_daemon($service); - launch_net_server($service); - launch_listener($service); - $0 = "OpenSRF controller [$service]"; - while(my $pid = waitpid(-1, 0)) { - last if $pid == -1; - $logger->debug("Cleaning up Perl $service process $pid"); - } + OpenSRF::System->run_service($service); } msg("$service is not configured to run on $hostname"); @@ -169,6 +160,9 @@ sub do_daemon { close STDIN; close STDOUT; close STDERR; + open STDIN, '/dev/null'; + open STDERR, '>/dev/null'; `echo $$ > $pid_file`; return 1; } @@ -184,29 +178,6 @@ sub load_settings { $parser->get_server_config($conf->env->hostname); } -# starts up the unix::server master process -sub launch_net_server { - my $service = shift; - push @OpenSRF::UnixServer::ISA, 'Net::Server::PreFork'; - unless(OpenSRF::Utils::safe_fork()) { - $0 = "OpenSRF Drone [$service]"; - OpenSRF::UnixServer->new($service)->serve; - exit; - } - return 1; -} - -# starts up the inbound listener process -sub launch_listener { - my $service = shift; - unless(OpenSRF::Utils::safe_fork()) { - $0 = "OpenSRF listener [$service]"; - OpenSRF::Transport::Listener->new($service)->initialize->listen; - exit; - } - return 1; -} - sub msg { my $m = shift; my $v = shift; diff --git a/src/perl/lib/OpenSRF/Application.pm b/src/perl/lib/OpenSRF/Application.pm index 9971d45..4a36baf 100644 --- a/src/perl/lib/OpenSRF/Application.pm +++ b/src/perl/lib/OpenSRF/Application.pm @@ -12,7 +12,6 @@ use Time::HiRes qw/time/; use OpenSRF::EX qw/:try/; use Carp; use OpenSRF::Utils::JSON; -#use OpenSRF::UnixServer; # to get the server class from UnixServer::App sub DESTROY{}; diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm new file mode 100644 index 0000000..85cd5fa --- /dev/null +++ b/src/perl/lib/OpenSRF/Server.pm @@ -0,0 +1,596 @@ +# ---------------------------------------------------------------- +# Copyright (C) 2010 Equinox Software, Inc. +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# ---------------------------------------------------------------- +package OpenSRF::Server; +use strict; +use warnings; +use OpenSRF::Transport; +use OpenSRF::Application; +use OpenSRF::Utils::Config; +use OpenSRF::Transport::PeerHandle; +use OpenSRF::Utils::SettingsClient; +use OpenSRF::Utils::Logger qw($logger); +use OpenSRF::Transport::SlimJabber::Client; +use POSIX qw/:sys_wait_h :errno_h/; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use IO::Select; +use Socket; +our $chatty = 1; # disable for production + +use constant STATUS_PIPE_DATA_SIZE => 12; + +sub new { + my($class, $service, %args) = @_; + my $self = bless(\%args, $class); + + $self->{service} = $service; # service name + $self->{num_children} = 0; # number of child processes + $self->{osrf_handle} = undef; # xmpp handle + $self->{routers} = []; # list of registered routers + $self->{active_list} = []; # list of active children + $self->{idle_list} = []; # list of idle children + + $self->{min_spare_children} ||= 0; + + $self->{max_spare_children} = $self->{min_spare_children} + 1 if + $self->{max_spare_children} and + $self->{max_spare_children} <= $self->{min_spare_children}; + + return $self; +} + +# ---------------------------------------------------------------- +# Disconnects from routers and waits for child processes to exit. +# ---------------------------------------------------------------- +sub cleanup { + my $self = shift; + my $no_exit = shift; + + $logger->info("server: shutting down and cleaning up..."); + + # don't get sidetracked by signals while we're cleaning up. + # it could result in unexpected behavior with list traversal + $SIG{CHLD} = 'IGNORE'; + + # terminate the child processes + $self->kill_child($_) for + (@{$self->{idle_list}}, @{$self->{active_list}}); + + # de-register routers + $self->unregister_routers; + + $self->{osrf_handle}->disconnect; + + # clean up our dead children + $self->reap_children(1); + + exit(0) unless $no_exit; +} + + +# ---------------------------------------------------------------- +# Waits on the jabber socket for inbound data from the router. +# Each new message is passed off to a child process for handling. +# At regular intervals, wake up for min/max spare child maintenance +# ---------------------------------------------------------------- +sub run { + my $self = shift; + + $logger->set_service($self->{service}); + + $SIG{$_} = sub { $self->cleanup; } for (qw/INT TERM QUIT/); + $SIG{CHLD} = sub { $self->reap_children(); }; + + $self->spawn_children; + $self->build_osrf_handle; + $self->register_routers; + my $wait_time = 1; + + # main server loop + while(1) { + + $self->check_status; + $self->{child_died} = 0; + + my $msg = $self->{osrf_handle}->process($wait_time); + + # we woke up for any reason, reset the wait time to allow + # for idle maintenance as necessary + $wait_time = 1; + + if($msg) { + + if(my $child = pop(@{$self->{idle_list}})) { + + # we have an idle child to handle the request + $chatty and $logger->internal("server: passing request to idle child $child"); + push(@{$self->{active_list}}, $child); + $self->write_child($child, $msg); + + } elsif($self->{num_children} < $self->{max_children}) { + + # spawning a child to handle the request + $chatty and $logger->internal("server: spawning child to handle request"); + $self->write_child($self->spawn_child(1), $msg); + + } else { + + $logger->warn("server: no children available, waiting..."); + $self->check_status(1); # block until child is available + + my $child = pop(@{$self->{idle_list}}); + push(@{$self->{active_list}}, $child); + $self->write_child($child, $msg); + } + + } else { + + # don't perform idle maint immediately when woken by SIGCHLD + unless($self->{child_died}) { + + # when we hit equilibrium, there's no need for regular + # maintenance, so set wait_time to 'forever' + $wait_time = -1 unless $self->perform_idle_maintenance; + } + } + } +} + +# ---------------------------------------------------------------- +# Launch a new spare child or kill an extra spare child. To +# prevent large-scale spawning or die-offs, spawn or kill only +# 1 process per idle maintenance loop. +# Returns true if any idle maintenance occurred, 0 otherwise +# ---------------------------------------------------------------- +sub perform_idle_maintenance { + my $self = shift; + + # spawn 1 spare child per maintenance loop if necessary + if( $self->{min_spare_children} and + $self->{num_children} < $self->{max_children} and + scalar(@{$self->{idle_list}}) < $self->{min_spare_children} ) { + + $chatty and $logger->internal("server: spawning spare child"); + $self->spawn_child; + return 1; + + # kill 1 excess spare child per maintenance loop if necessary + } elsif($self->{max_spare_children} and + $self->{num_children} > $self->{min_children} and + scalar(@{$self->{idle_list}}) > $self->{max_spare_children} ) { + + $chatty and $logger->internal("server: killing spare child"); + $self->kill_child; + return 1; + } + + return 0; +} + +sub kill_child { + my $self = shift; + my $child = shift || pop(@{$self->{idle_list}}) or return; + $chatty and $logger->internal("server: killing child $child"); + kill('TERM', $child->{pid}); +} + +# ---------------------------------------------------------------- +# Jabber connection inbound message arrive on. +# ---------------------------------------------------------------- +sub build_osrf_handle { + my $self = shift; + + my $conf = OpenSRF::Utils::Config->current; + my $username = $conf->bootstrap->username; + my $password = $conf->bootstrap->passwd; + my $domain = $conf->bootstrap->domain; + my $port = $conf->bootstrap->port; + my $resource = $self->{service} . '_listener_' . $conf->env->hostname; + + $logger->debug("server: inbound connecting as $username\@$domain/$resource on port $port"); + + $self->{osrf_handle} = + OpenSRF::Transport::SlimJabber::Client->new( + username => $username, + resource => $resource, + password => $password, + host => $domain, + port => $port, + ); + + $self->{osrf_handle}->initialize; +} + + +# ---------------------------------------------------------------- +# Sends request data to a child process +# ---------------------------------------------------------------- +sub write_child { + my($self, $child, $msg) = @_; + my $xml = $msg->to_xml; + syswrite($child->{pipe_to_child}, $xml); +} + +# ---------------------------------------------------------------- +# Checks to see if any child process has reported its availability +# In blocking mode, blocks until a child has reported. +# ---------------------------------------------------------------- +sub check_status { + my($self, $block) = @_; + + my $read_set = IO::Select->new; + $read_set->add($_->{pipe_to_child}) for @{$self->{active_list}}; + + my @handles = $read_set->can_read(($block) ? undef : 0) or return; + + my $pid = ''; + my @pids; + for my $pipe (@handles) { + sysread($pipe, $pid, STATUS_PIPE_DATA_SIZE) or next; + push(@pids, int($pid)); + } + + $chatty and $logger->internal("server: ".scalar(@pids)." children reporting for duty: (@pids)"); + + my $child; + my @new_actives; + + # move the children from the active list to the idle list + for my $proc (@{$self->{active_list}}) { + if(grep { $_ == $proc->{pid} } @pids) { + push(@{$self->{idle_list}}, $proc); + } else { + push(@new_actives, $proc); + } + } + + $self->{active_list} = [@new_actives]; + + $chatty and $logger->internal(sprintf( + "server: %d idle and %d active children after status update", + scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}}))); +} + +# ---------------------------------------------------------------- +# Cleans up any child processes that have exited. +# In shutdown mode, block until all children have washed ashore +# ---------------------------------------------------------------- +sub reap_children { + my($self, $shutdown) = @_; + $self->{child_died} = 1; + + while(1) { + + my $pid = waitpid(-1, ($shutdown) ? 0 : WNOHANG); + return if $pid <= 0; + + $chatty and $logger->internal("server: reaping child $pid"); + + my ($child) = grep {$_->{pid} == $pid} (@{$self->{active_list}}, @{$self->{idle_list}}); + + close($child->{pipe_to_parent}); + close($child->{pipe_to_child}); + delete $child->{$_} for keys %$child; # destroy with a vengeance + + $self->{num_children}--; + $self->{active_list} = [ grep { $_->{pid} != $pid } @{$self->{active_list}} ]; + $self->{idle_list} = [ grep { $_->{pid} != $pid } @{$self->{idle_list}} ]; + } + + $self->spawn_children unless $shutdown; + + $chatty and $logger->internal(sprintf( + "server: %d idle and %d active children after reap_children", + scalar(@{$self->{idle_list}}), scalar(@{$self->{active_list}}))); + +} + +# ---------------------------------------------------------------- +# Spawn up to max_children processes +# ---------------------------------------------------------------- +sub spawn_children { + my $self = shift; + $self->spawn_child while $self->{num_children} < $self->{min_children}; +} + +# ---------------------------------------------------------------- +# Spawns a new child. If $active is set, the child goes directly +# into the active_list. +# ---------------------------------------------------------------- +sub spawn_child { + my($self, $active) = @_; + + my $child = OpenSRF::Server::Child->new($self); + + # socket for sending message data to the child + if(!socketpair( + $child->{pipe_to_child}, + $child->{pipe_to_parent}, + AF_UNIX, SOCK_STREAM, PF_UNSPEC)) { + $logger->error("server: error creating data socketpair: $!"); + return undef; + } + + $child->{pipe_to_child}->autoflush(1); + $child->{pipe_to_parent}->autoflush(1); + + $child->{pid} = fork(); + + if($child->{pid}) { # parent process + $self->{num_children}++; + + + if($active) { + push(@{$self->{active_list}}, $child); + } else { + push(@{$self->{idle_list}}, $child); + } + + $chatty and $logger->internal("server: server spawned child $child with ".$self->{num_children}." total children"); + + return $child; + + } else { # child process + + $SIG{$_} = 'DEFAULT' for (qw/INT TERM QUIT HUP/); + + $child->{pid} = $$; + eval { + $child->init; + $child->run; + OpenSRF::Transport::PeerHandle->retrieve->disconnect; + }; + $logger->error("server: child process died: $@") if $@; + exit(0); + } +} + +# ---------------------------------------------------------------- +# Sends the register command to the configured routers +# ---------------------------------------------------------------- +sub register_routers { + my $self = shift; + + my $conf = OpenSRF::Utils::Config->current; + my $routers = $conf->bootstrap->routers; + my $router_name = $conf->bootstrap->router_name; + my @targets; + + for my $router (@$routers) { + if(ref $router) { + + if( !$router->{services} || + !$router->{services}->{service} || + ( + ref($router->{services}->{service}) eq 'ARRAY' and + grep { $_ eq $self->{service} } @{$router->{services}->{service}} + ) || $router->{services}->{service} eq $self->{service}) { + + my $name = $router->{name}; + my $domain = $router->{domain}; + push(@targets, "$name\@$domain/router"); + } + + } else { + push(@targets, "$router_name\@$router/router"); + } + } + + foreach (@targets) { + $logger->info("server: registering with router $_"); + $self->{osrf_handle}->send( + to => $_, + body => 'registering', + router_command => 'register', + router_class => $self->{service} + ); + } + + $self->{routers} = \@targets; +} + +# ---------------------------------------------------------------- +# Sends the unregister command to any routers we have registered +# with. +# ---------------------------------------------------------------- +sub unregister_routers { + my $self = shift; + return unless $self->{osrf_handle}->tcp_connected; + + for my $router (@{$self->{routers}}) { + $logger->info("server: disconnecting from router $router"); + $self->{osrf_handle}->send( + to => $router, + body => "unregistering", + router_command => "unregister", + router_class => $self->{service} + ); + } +} + + +package OpenSRF::Server::Child; +use strict; +use warnings; +use OpenSRF::Transport; +use OpenSRF::Application; +use OpenSRF::Transport::PeerHandle; +use OpenSRF::Transport::SlimJabber::XMPPMessage; +use OpenSRF::Utils::Logger qw($logger); +use OpenSRF::DomainObject::oilsResponse qw/:status/; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use Time::HiRes qw(time); +use POSIX qw/:sys_wait_h :errno_h/; + +use overload '""' => sub { return '[' . shift()->{pid} . ']'; }; + +sub new { + my($class, $parent) = @_; + my $self = bless({}, $class); + $self->{pid} = 0; # my process ID + $self->{parent} = $parent; # Controller parent process + $self->{num_requests} = 0; # total serviced requests + return $self; +} + +sub set_nonblock { + my($self, $fh) = @_; + my $flags = fcntl($fh, F_GETFL, 0); + fcntl($fh, F_SETFL, $flags | O_NONBLOCK); +} + +sub set_block { + my($self, $fh) = @_; + my $flags = fcntl($fh, F_GETFL, 0); + $flags &= ~O_NONBLOCK; + fcntl($fh, F_SETFL, $flags); +} + +# ---------------------------------------------------------------- +# Connects to Jabber and runs the application child_init +# ---------------------------------------------------------------- +sub init { + my $self = shift; + my $service = $self->{parent}->{service}; + $0 = "OpenSRF Drone [$service]"; + OpenSRF::Transport::PeerHandle->construct($service); + OpenSRF::Application->application_implementation->child_init + if (OpenSRF::Application->application_implementation->can('child_init')); +} + +# ---------------------------------------------------------------- +# Waits for messages from the parent process, handles the message, +# then goes into the keepalive loop if this is a stateful session. +# When max_requests is hit, the process exits. +# ---------------------------------------------------------------- +sub run { + my $self = shift; + my $network = OpenSRF::Transport::PeerHandle->retrieve; + + # main child run loop. Ends when this child hits max requests. + while(1) { + + my $data = $self->wait_for_request or next; + + # Update process name to show activity + my $orig_name = $0; + $0 = "$0*"; + + # Discard extraneous data from the jabber socket + if(!$network->flush_socket()) { + $logger->error("server: network disconnected! child dropping request and exiting: $data"); + exit; + } + + my $session = OpenSRF::Transport->handler( + $self->{parent}->{service}, + OpenSRF::Transport::SlimJabber::XMPPMessage->new(xml => $data) + ); + + $self->keepalive_loop($session); + + last if ++$self->{num_requests} == $self->{parent}->{max_requests}; + + # Tell the parent process we are available to process requests + $self->send_status; + + # Repair process name + $0 = $orig_name; + } + + $chatty and $logger->internal("server: child process shutting down after reaching max_requests"); + + OpenSRF::Application->application_implementation->child_exit + if (OpenSRF::Application->application_implementation->can('child_exit')); +} + +# ---------------------------------------------------------------- +# waits for a request data on the parent pipe and returns it. +# ---------------------------------------------------------------- +sub wait_for_request { + my $self = shift; + + my $data = ''; + my $read_size = 1024; + my $nonblock = 0; + + while(1) { + # Start out blocking, when data is available, read it all + + my $buf = ''; + my $n = sysread($self->{pipe_to_parent}, $buf, $read_size); + + unless(defined $n) { + $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; + last; + } + + last if $n <= 0; # no data left to read + + $data .= $buf; + + last if $n < $read_size; # done reading all data + + $self->set_nonblock($self->{pipe_to_parent}) unless $nonblock; + $nonblock = 1; + } + + $self->set_block($self->{pipe_to_parent}) if $nonblock; + return $data; +} + + +# ---------------------------------------------------------------- +# If this is a stateful opensrf session, wait up to $keepalive +# seconds for subsequent requests from the client +# ---------------------------------------------------------------- +sub keepalive_loop { + my($self, $session) = @_; + my $keepalive = $self->{parent}->{keepalive}; + + while($session->state and $session->state == $session->CONNECTED) { + + unless( $session->queue_wait($keepalive) ) { + + # client failed to disconnect before timeout + $logger->info("server: no request was received in $keepalive seconds, exiting stateful session"); + + my $res = OpenSRF::DomainObject::oilsConnectStatus->new( + status => "Disconnected on timeout", + statusCode => STATUS_TIMEOUT + ); + + $session->status($res); + $session->state($session->DISCONNECTED); + last; + } + } + + $chatty and $logger->internal("server: child done with request(s)"); + $session->kill_me; +} + +# ---------------------------------------------------------------- +# Report our availability to our parent process +# ---------------------------------------------------------------- +sub send_status { + my $self = shift; + syswrite( + $self->{pipe_to_parent}, + sprintf("%*s", OpenSRF::Server::STATUS_PIPE_DATA_SIZE, $self->{pid}) + ); +} + + +1; diff --git a/src/perl/lib/OpenSRF/System.pm b/src/perl/lib/OpenSRF/System.pm index 346cf23..6329a10 100644 --- a/src/perl/lib/OpenSRF/System.pm +++ b/src/perl/lib/OpenSRF/System.pm @@ -2,10 +2,9 @@ package OpenSRF::System; use strict; use warnings; use OpenSRF; use base 'OpenSRF'; -use OpenSRF::Utils::Logger qw(:level); +use OpenSRF::Utils::Logger qw($logger); use OpenSRF::Transport::Listener; use OpenSRF::Transport; -use OpenSRF::UnixServer; use OpenSRF::Utils; use OpenSRF::EX qw/:try/; use POSIX qw/setsid :sys_wait_h/; @@ -13,7 +12,7 @@ use OpenSRF::Utils::Config; use OpenSRF::Utils::SettingsParser; use OpenSRF::Utils::SettingsClient; use OpenSRF::Application; -use Net::Server::PreFork; +use OpenSRF::Server; my $bootstrap_config_file; sub import { @@ -67,4 +66,47 @@ sub connected { return 0; } +sub run_service { + my($class, $service) = @_; + + $0 = "OpenSRF Listener [$service]"; + + # temp connection to use for application initialization + OpenSRF::System->bootstrap_client(client_name => "system_client"); + + my $sclient = OpenSRF::Utils::SettingsClient->new; + my $getval = sub { $sclient->config_value(apps => $service => @_); }; + + my $impl = $getval->('implementation'); + + OpenSRF::Application::server_class($service); + OpenSRF::Application->application_implementation($impl); + OpenSRF::Utils::JSON->register_class_hint(name => $impl, hint => $service, type => 'hash'); + OpenSRF::Application->application_implementation->initialize() + if (OpenSRF::Application->application_implementation->can('initialize')); + + # kill the temp connection + OpenSRF::Transport::PeerHandle->retrieve->disconnect; + + my $server = OpenSRF::Server->new( + $service, + keepalive => $getval->('keepalive') || 5, + max_requests => $getval->(unix_config => 'max_requests') || 10000, + max_children => $getval->(unix_config => 'max_children') || 20, + min_children => $getval->(unix_config => 'min_children') || 1, + min_spare_children => $getval->(unix_config => 'min_spare_children'), + max_spare_children => $getval->(unix_config => 'max_spare_children') + ); + + while(1) { + eval { $server->run; }; + # we only arrive here if the server died a painful death + $logger->error("server: died with error $@"); + $server->cleanup(1); + $logger->info("server: restarting after fatal crash..."); + sleep 2; + } +} + + 1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm index f30abf5..2db8be7 100644 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm +++ b/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm @@ -8,12 +8,7 @@ use OpenSRF::Utils::Config; use OpenSRF::Utils::Logger qw/$logger/; use OpenSRF::Transport::SlimJabber::XMPPReader; use OpenSRF::Transport::SlimJabber::XMPPMessage; -use IO::Socket::UNIX; -use FreezeThaw qw/freeze/; - -sub DESTROY{ - shift()->disconnect; -} +use IO::Socket::INET; =head1 NAME @@ -153,6 +148,7 @@ sub initialize { unless ( $self->reader->connected ); $self->xmpp_id("$username\@$host/$resource"); + $logger->debug("Created XMPP connection " . $self->xmpp_id); return $self; } diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm deleted file mode 100644 index 898a528..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/Inbound.pm +++ /dev/null @@ -1,201 +0,0 @@ -package OpenSRF::Transport::SlimJabber::Inbound; -use strict;use warnings; -use base qw/OpenSRF::Transport::SlimJabber::Client/; -use OpenSRF::EX qw(:try); -use OpenSRF::Utils::Logger qw(:level); -use OpenSRF::Utils::SettingsClient; -use OpenSRF::Utils::Config; -use Time::HiRes qw/usleep/; -use FreezeThaw qw/freeze/; - -my $logger = "OpenSRF::Utils::Logger"; - -=head1 Description - -This is the jabber connection where all incoming client requests will be accepted. -This connection takes the data, passes it off to the system then returns to take -more data. Connection params are all taken from the config file and the values -retreived are based on the $app name passed into new(). - -This service should be loaded at system startup. - -=cut - -{ - my $unix_sock; - sub unix_sock { return $unix_sock; } - my $instance; - - sub new { - my( $class, $app ) = @_; - $class = ref( $class ) || $class; - if( ! $instance ) { - - my $conf = OpenSRF::Utils::Config->current; - my $domain = $conf->bootstrap->domain; - $logger->error("use of is deprecated") if $conf->bootstrap->domains; - - my $username = $conf->bootstrap->username; - my $password = $conf->bootstrap->passwd; - my $port = $conf->bootstrap->port; - my $host = $domain; - my $resource = $app . '_listener_at_' . $conf->env->hostname; - - my $no_router = 0; # make this a config entry if we want to use it - if($no_router) { - # no router, only one listener running.. - $username = "router"; - $resource = $app; - } - - OpenSRF::Utils::Logger->transport("Inbound as $username, $password, $resource, $host, $port\n", INTERNAL ); - - my $self = __PACKAGE__->SUPER::new( - username => $username, - resource => $resource, - password => $password, - host => $host, - port => $port, - ); - - $self->{app} = $app; - - my $client = OpenSRF::Utils::SettingsClient->new(); - my $f = $client->config_value("dirs", "sock"); - $unix_sock = join( "/", $f, - $client->config_value("apps", $app, "unix_config", "unix_sock" )); - bless( $self, $class ); - $instance = $self; - } - return $instance; - } - -} - -sub DESTROY { - my $self = shift; - for my $router (@{$self->{routers}}) { - if($self->tcp_connected()) { - $logger->info("disconnecting from router $router"); - $self->send( to => $router, body => "registering", - router_command => "unregister" , router_class => $self->{app} ); - } - } -} - -my $sig_pipe = 0; - -sub listen { - my $self = shift; - - $self->{routers} = []; - - try { - - my $conf = OpenSRF::Utils::Config->current; - my $router_name = $conf->bootstrap->router_name; - my $routers = $conf->bootstrap->routers; - $logger->info("loading router info $routers"); - - for my $router (@$routers) { - if(ref $router) { - if( !$router->{services} || - !$router->{services}->{service} || - ( - ref($router->{services}->{service}) eq 'ARRAY' and - grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) || - $router->{services}->{service} eq $self->{app}) { - - my $name = $router->{name}; - my $domain = $router->{domain}; - my $target = "$name\@$domain/router"; - push(@{$self->{routers}}, $target); - $logger->info( $self->{app} . " connecting to router $target"); - $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} ); - } - } else { - my $target = "$router_name\@$router/router"; - push(@{$self->{routers}}, $target); - $logger->info( $self->{app} . " connecting to router $target"); - $self->send( to => $target, body => "registering", router_command => "register" , router_class => $self->{app} ); - } - } - - } catch Error with { - my $err = shift; - $logger->error($self->{app} . ": No routers defined: $err"); - # no routers defined - }; - - my $app = $self->{app}; - - $logger->info("$app inbound: going into listen loop" ); - - while(1) { - - my $sock = $self->unix_sock(); - my $o; - - try { - $o = $self->process(-1); - - if(!$o){ - $logger->error("$app inbound: received no data from the Jabber socket in process()"); - usleep(100000); # otherwise we loop and pound syslog logger with errors - } - - } catch OpenSRF::EX::JabberDisconnected with { - - $logger->error("$app inbound: process lost its jabber connection. Attempting to reconnect..."); - $self->initialize; - $o = undef; - }; - - next unless $o; - - while(1) { - # keep trying to deliver the message until we succeed - - my $socket = IO::Socket::UNIX->new( Peer => $sock ); - - unless($socket and $socket->connected) { - $logger->error("$app inbound: unable to connect to inbound socket $sock: $!"); - usleep(50000); # 50 msec - next; - } - - # block until the pipe is ready for writing - my $outfile = ''; - vec($outfile, $socket->fileno, 1) = 1; - my $nfound = select(undef, $outfile, undef, undef); - - next unless $nfound; # should not happen since we're blocking - - if($nfound == -1) { # select failed - $logger->error("$app inbound: unable to write to socket: $!"); - usleep(50000); # 50 msec - next; - } - - $sig_pipe = 0; - local $SIG{'PIPE'} = sub { $sig_pipe = 1; }; - print $socket freeze($o); - - if($sig_pipe) { - # The attempt to write to the socket failed. Wait a short time then try again. - # Don't bother closing the socket, it will only cause grief - $logger->error("$app inbound: got SIGPIPE, will retry after a short wait..."); - usleep(50000); # 50 msec - next; - } - - $socket->close; - last; - } - } - - $logger->error("$app inbound: exited process loop"); -} - -1; - diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm index 040ffb8..c41517c 100644 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm +++ b/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm @@ -60,8 +60,6 @@ sub new { OpenSRF::EX::Config->throw( "JPeer could not load all necessary values from config" ) unless ( $username and $password and $resource and $host and $port ); - OpenSRF::Utils::Logger->transport( "Built Peer with", INTERNAL ); - my $self = __PACKAGE__->SUPER::new( username => $username, resource => $resource, diff --git a/src/perl/lib/OpenSRF/UnixServer.pm b/src/perl/lib/OpenSRF/UnixServer.pm deleted file mode 100644 index c4b48c8..0000000 --- a/src/perl/lib/OpenSRF/UnixServer.pm +++ /dev/null @@ -1,266 +0,0 @@ -package OpenSRF::UnixServer; -use strict; use warnings; -use base qw/OpenSRF/; -use OpenSRF::EX qw(:try); -use OpenSRF::Utils::Logger qw(:level $logger); -use OpenSRF::Transport::PeerHandle; -use OpenSRF::Application; -use OpenSRF::AppSession; -use OpenSRF::DomainObject::oilsResponse qw/:status/; -use OpenSRF::System; -use OpenSRF::Utils::SettingsClient; -use Time::HiRes qw(time); -use OpenSRF::Utils::JSON; -use vars qw/@ISA $app/; -use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); -use Carp; -use FreezeThaw qw/thaw/; - -use IO::Socket::INET; -use IO::Socket::UNIX; - -sub DESTROY { confess "Dying $$"; } - -=head1 What am I - -All inbound messages are passed on to the UnixServer for processing. -We take the data, close the Unix socket, and pass the data on to our abstract -'process()' method. - -Our purpose is to 'multiplex' a single TCP connection into multiple 'client' connections. -So when you pass data down the Unix socket to us, we have been preforked and waiting -to disperse new data among us. - -=cut - -sub app { return $app; } - -{ - - sub new { - my( $class, $app1 ) = @_; - if( ! $app1 ) { - throw OpenSRF::EX::InvalidArg( "UnixServer requires an app name to run" ); - } - $app = $app1; - my $self = bless( {}, $class ); -# my $client = OpenSRF::Utils::SettingsClient->new(); -# if( $client->config_value("server_type") !~ /fork/i || -# OpenSRF::Utils::Config->current->bootstrap->settings_config ) { -# warn "Calling hooks for non-prefork\n"; -# $self->configure_hook(); -# $self->child_init_hook(); -# } - return $self; - } - -} - -=head2 process_request() - -Takes the incoming data, closes the Unix socket and hands the data untouched -to the abstract process() method. This method is implemented in our subclasses. - -=cut - -sub process_request { - - my $self = shift; - my $data; my $d; - while( $d = ) { $data .= $d; } - - my $orig = $0; - $0 = "$0*"; - - if( ! $data or ! defined( $data ) or $data eq "" ) { - close($self->{server}->{client}); - $logger->debug("Unix child received empty data from socket", ERROR); - $0 = $orig; - return; - } - - - if( ! close( $self->{server}->{client} ) ) { - $logger->debug( "Error closing Unix socket: $!", ERROR ); - } - - my $app = $self->app(); - $logger->transport( "UnixServer for $app received $data", INTERNAL ); - - # -------------------------------------------------------------- - # Drop all data from the socket before coninuting to process - # -------------------------------------------------------------- - my $ph = OpenSRF::Transport::PeerHandle->retrieve; - if(!$ph->flush_socket()) { - $logger->error("We received a request ". - "and we are no longer connected to the jabber network. ". - "We will go away and drop this request: $data"); - exit; - } - - ($data) = thaw($data); - my $app_session = OpenSRF::Transport->handler( $self->app(), $data ); - - if(!ref($app_session)) { - $logger->transport( "Did not receive AppSession from transport handler, returning...", WARN ); - $0 = $orig; - return; - } - - if($app_session->stateless and $app_session->state != $app_session->CONNECTED()){ - $logger->debug("Exiting keepalive for stateless session / orig = $orig"); - $app_session->kill_me; - $0 = $orig; - return; - } - - - my $client = OpenSRF::Utils::SettingsClient->new(); - my $keepalive = $client->config_value("apps", $self->app(), "keepalive"); - - my $req_counter = 0; - while( $app_session and - $app_session->state and - $app_session->state != $app_session->DISCONNECTED() and - $app_session->find( $app_session->session_id ) ) { - - - my $before = time; - $logger->debug( "UnixServer calling queue_wait $keepalive", INTERNAL ); - $app_session->queue_wait( $keepalive ); - $logger->debug( "after queue wait $keepalive", INTERNAL ); - my $after = time; - - if( ($after - $before) >= $keepalive ) { - - my $res = OpenSRF::DomainObject::oilsConnectStatus->new( - status => "Disconnected on timeout", - statusCode => STATUS_TIMEOUT); - $app_session->status($res); - $app_session->state( $app_session->DISCONNECTED() ); - last; - } - - } - - my $x = 0; - while( $app_session && $app_session->queue_wait(0) ) { - $logger->debug( "Looping on zombies " . $x++ , DEBUG); - } - - $logger->debug( "Timed out, disconnected, or authentication failed" ); - $app_session->kill_me if ($app_session); - - $0 = $orig; -} - - -sub serve { - my( $self ) = @_; - - my $app = $self->app(); - $logger->set_service($app); - - $0 = "OpenSRF master [$app]"; - - my $client = OpenSRF::Utils::SettingsClient->new(); - my @base = ('apps', $app, 'unix_config' ); - - my $min_servers = $client->config_value(@base, 'min_children'); - my $max_servers = $client->config_value(@base, "max_children" ); - my $min_spare = $client->config_value(@base, "min_spare_children" ); - my $max_spare = $client->config_value(@base, "max_spare_children" ); - my $max_requests = $client->config_value(@base, "max_requests" ); - # fwiw, these file paths are (obviously) not portable - my $log_file = join("/", $client->config_value("dirs", "log"), $client->config_value(@base, "unix_log" )); - my $port = join("/", $client->config_value("dirs", "sock"), $client->config_value(@base, "unix_sock" )); - my $pid_file = join("/", $client->config_value("dirs", "pid"), $client->config_value(@base, "unix_pid" )); - - $min_spare ||= $min_servers; - $max_spare ||= $max_servers; - $max_requests ||= 1000; - - $logger->info("UnixServer: min=$min_servers, max=$max_servers, min_spare=$min_spare ". - "max_spare=$max_spare, max_req=$max_requests, log_file=$log_file, port=$port, pid_file=$pid_file"); - - $self->run( - min_servers => $min_servers, - max_servers => $max_servers, - min_spare_servers => $min_spare, - max_spare_servers => $max_spare, - max_requests => $max_requests, - log_file => $log_file, - port => $port, - proto => 'unix', - pid_file => $pid_file, - ); - -} - - -sub configure_hook { - my $self = shift; - my $app = $self->app; - - # boot a client - OpenSRF::System->bootstrap_client( client_name => "system_client" ); - - $logger->debug( "Setting application implementation for $app", DEBUG ); - my $client = OpenSRF::Utils::SettingsClient->new(); - my $imp = $client->config_value("apps", $app, "implementation"); - OpenSRF::Application::server_class($app); - OpenSRF::Application->application_implementation( $imp ); - OpenSRF::Utils::JSON->register_class_hint( name => $imp, hint => $app, type => "hash" ); - OpenSRF::Application->application_implementation->initialize() - if (OpenSRF::Application->application_implementation->can('initialize')); - - if( $client->config_value("server_type") !~ /fork/i ) { - $self->child_init_hook(); - } - - my $con = OpenSRF::Transport::PeerHandle->retrieve; - if($con) { - $con->disconnect; - } - - return OpenSRF::Application->application_implementation; -} - -sub child_init_hook { - - $0 =~ s/master/drone/g; - - if ($ENV{OPENSRF_PROFILE}) { - my $file = $0; - $file =~ s/\W/_/go; - eval "use Devel::Profiler output_file => '/tmp/profiler_$file.out', buffer_size => 0;"; - if ($@) { - $logger->debug("Could not load Devel::Profiler: $@",ERROR); - } else { - $0 .= ' [PROFILING]'; - $logger->debug("Running under Devel::Profiler", INFO); - } - } - - my $self = shift; - -# $logger->transport( -# "Creating PeerHandle from UnixServer child_init_hook", INTERNAL ); - OpenSRF::Transport::PeerHandle->construct( $self->app() ); - $logger->transport( "PeerHandle Created from UnixServer child_init_hook", INTERNAL ); - - OpenSRF::Application->application_implementation->child_init - if (OpenSRF::Application->application_implementation->can('child_init')); - - return OpenSRF::Transport::PeerHandle->retrieve; -} - -sub child_finish_hook { - $logger->debug("attempting to call child exit handler..."); - OpenSRF::Application->application_implementation->child_exit - if (OpenSRF::Application->application_implementation->can('child_exit')); -} - - -1; -