use OpenSRF::Transport::SlimJabber::Client;
use Encode;
use POSIX qw/:sys_wait_h :errno_h/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Time::HiRes qw/usleep/;
use IO::Select;
use Socket;
our $chatty = 1; # disable for production
use constant STATUS_PIPE_DATA_SIZE => 12;
+use constant WRITE_PIPE_DATA_SIZE => 12;
sub new {
my($class, $service, %args) = @_;
$self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log"
if $self->{stderr_log_path};
- $self->{lock_file} =
- sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service});
-
$self->{min_spare_children} ||= 0;
$self->{max_spare_children} = $self->{min_spare_children} + 1 if
# clean up our dead children
$self->reap_children(1);
- # clean up the lock file
- close($self->{lock_file_handle});
- unlink($self->{lock_file});
-
exit(0) unless $no_exit;
}
-sub open_lock_file {
- my $self = shift;
- open($self->{lock_file_handle}, '>>', $self->{lock_file})
- or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n";
-}
-
# ----------------------------------------------------------------
# Waits on the jabber socket for inbound data from the router.
# Each new message is passed off to a child process for handling.
$self->spawn_children;
$self->build_osrf_handle;
$self->register_routers;
- $self->open_lock_file;
my $wait_time = 1;
# main server loop
my($self, $child, $msg) = @_;
my $xml = encode_utf8(decode_utf8($msg->to_xml));
- flock($self->{lock_file_handle}, LOCK_EX) or
- $logger->error("server: cannot flock : $!");
+ # tell the child how much data to expect, minus the header
+ my $write_size;
+ {use bytes; $write_size = length($xml)}
+ $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
for (0..2) {
local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
# send message to child data pipe
- syswrite($child->{pipe_to_child}, $xml);
+ syswrite($child->{pipe_to_child}, $write_size . $xml);
last unless $self->{sig_pipe};
$logger->error("server: got SIGPIPE writing to $child, retrying...");
usleep(50000); # 50 msec
}
- flock($self->{lock_file_handle}, LOCK_UN) or
- $logger->error("server: cannot de-flock : $!");
-
$logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
}
use OpenSRF::Transport::SlimJabber::XMPPMessage;
use OpenSRF::Utils::Logger qw($logger);
use OpenSRF::DomainObject::oilsResponse qw/:status/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
use Time::HiRes qw(time usleep);
use POSIX qw/:sys_wait_h :errno_h/;
fcntl($fh, F_SETFL, $flags);
}
-sub open_lock_file {
- my $self = shift;
-
- # close our copy of the parent's lock file since we won't be using it
- close($self->{parent}->{lock_file_handle})
- if $self->{parent}->{lock_file_handle};
-
- my $fname = $self->{parent}->{lock_file};
- unless (open($self->{lock_file_handle}, '>>', $fname)) {
- $logger->error("server: child cannot open lock file $fname : $!");
- die "server: child cannot open lock file $fname : $!\n";
- }
-}
-
# ----------------------------------------------------------------
# Connects to Jabber and runs the application child_init
# ----------------------------------------------------------------
sub init {
my $self = shift;
- $self->open_lock_file;
my $service = $self->{parent}->{service};
$0 = "OpenSRF Drone [$service]";
OpenSRF::Transport::PeerHandle->construct($service);
my $self = shift;
my $data = '';
- my $read_size = 1024;
+ my $buf_size = 4096;
my $nonblock = 0;
my $read_pipe = $self->{pipe_to_parent};
-
- # wait for some data to start arriving
- my $read_set = IO::Select->new;
- $read_set->add($read_pipe);
+ my $data_size;
+ my $total_read;
+ my $first_read = 1;
while (1) {
- # if can_read is interrupted while blocking,
- # go back and wait again until it it succeeds.
- last if $read_set->can_read;
- }
- # Parent has started sending data to us.
- # Wait for the parent to write all data to the pipe. Then, immediately release
- # the lock so the parent can start writing data to other child processes.
- # Note: there is no danger of a subsequent message coming from the parent on
- # the same pipe, since this child is now marked as active.
- flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!");
- flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!");
-
- # we have all data now so all reads can be done in non-blocking mode
- $self->set_nonblock($read_pipe);
+ # wait for some data to start arriving
+ my $read_set = IO::Select->new;
+ $read_set->add($read_pipe);
+
+ while (1) {
+ # if can_read is interrupted while blocking,
+ # go back and wait again until it succeeds.
+ last if $read_set->can_read;
+ }
- while(1) {
- my $sig_pipe = 0;
- local $SIG{'PIPE'} = sub { $sig_pipe = 1 };
+ # parent started writing, let's start reading
+ $self->set_nonblock($read_pipe);
- my $buf = '';
- my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+ while (1) {
+ # pull as much data from the pipe as possible
- unless(defined $n) {
+ my $buf = '';
+ my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size);
- if ($sig_pipe) {
- $logger->info("server: $self got SIGPIPE reading data from parent, retrying...");
- usleep(50000); # 50 msec
- next;
+ unless(defined $bytes_read) {
+ $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+ last;
}
- $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
- last;
+ last if $bytes_read <= 0; # no more data available for reading
+
+ $total_read += $bytes_read;
+ $data .= $buf;
}
- last if $n <= 0; # no data left to read
+ # we've read all the data currently available on the pipe.
+ # let's see if we're done.
+
+ if ($first_read) {
+ # extract the data size and remove the size header
+ my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
+ $data_size = int(substr($data, 0, $wps_size)) + $wps_size;
+ $data = substr($data, $wps_size);
+ $first_read = 0;
+ }
- $data .= $buf;
+ $self->set_block($self->{pipe_to_parent});
- last if $n < $read_size; # done reading all data
+ if ($total_read == $data_size) {
+ # we've read all the data. Nothing left to do
+ last;
+ }
}
- # return to blocking mode
- $self->set_block($self->{pipe_to_parent});
return $data;
}