Perl pipe reading overhaul : data size header
authorBill Erickson <berick@esilibrary.com>
Mon, 13 Feb 2012 21:53:59 +0000 (16:53 -0500)
committerMike Rylander <mrylander@gmail.com>
Mon, 20 Feb 2012 19:37:20 +0000 (14:37 -0500)
The lockfile mechanism for preventing premature end of reads on child
processes suffers from one serious flaw:  if the data to write exceeds
the pipe buffer size, the parent will block on syswrite and the service
will lock up.  It's also not as effecient (for the normal case) as the
code was without the lockfile, becasue the writes and reads are
serialized.

This commit replaces the lockfile mechanism with a protocol header in
the data.  The first X (currently 12) bytes of data written to the child
process will contain the full length of the data to be written (minus
the header size).  The child now reads the data in parallel with the parent as
data is available.  If the child reads all available data (in the pipe)
but not all of the expected data, the child will go back into a select()
wait pending more data from the parent.  The process continues until all
data is read.

This same mechanism is already used to commicate status info from child
processes to the parent.

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Jason Stephenson <jstephenson@mvlc.org>
Signed-off-by: Mike Rylander <mrylander@gmail.com>
src/perl/lib/OpenSRF/Server.pm
src/perl/lib/OpenSRF/System.pm

index 90a186d..32954f2 100644 (file)
@@ -24,13 +24,14 @@ use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::Transport::SlimJabber::Client;
 use Encode;
 use POSIX qw/:sys_wait_h :errno_h/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Time::HiRes qw/usleep/;
 use IO::Select;
 use Socket;
 our $chatty = 1; # disable for production
 
 use constant STATUS_PIPE_DATA_SIZE => 12;
+use constant WRITE_PIPE_DATA_SIZE  => 12;
 
 sub new {
     my($class, $service, %args) = @_;
@@ -48,9 +49,6 @@ sub new {
     $self->{stderr_log} = $self->{stderr_log_path} . "/${service}_stderr.log" 
         if $self->{stderr_log_path};
 
-    $self->{lock_file} = 
-        sprintf("%s/%s_$$.lock", $self->{lock_file_path}, $self->{service});
-
     $self->{min_spare_children} ||= 0;
 
     $self->{max_spare_children} = $self->{min_spare_children} + 1 if
@@ -85,19 +83,9 @@ sub cleanup {
     # clean up our dead children
     $self->reap_children(1);
 
-    # clean up the lock file
-    close($self->{lock_file_handle});
-    unlink($self->{lock_file});
-
     exit(0) unless $no_exit;
 }
 
-sub open_lock_file {
-    my $self = shift;
-    open($self->{lock_file_handle}, '>>', $self->{lock_file})
-        or die "server: cannot open lock file ".$self->{lock_file} ." : $!\n";
-}
-
 # ----------------------------------------------------------------
 # Waits on the jabber socket for inbound data from the router.
 # Each new message is passed off to a child process for handling.
@@ -114,7 +102,6 @@ sub run {
     $self->spawn_children;
     $self->build_osrf_handle;
     $self->register_routers;
-    $self->open_lock_file;
     my $wait_time = 1;
 
     # main server loop
@@ -253,8 +240,10 @@ sub write_child {
     my($self, $child, $msg) = @_;
     my $xml = encode_utf8(decode_utf8($msg->to_xml));
 
-    flock($self->{lock_file_handle}, LOCK_EX) or 
-        $logger->error("server: cannot flock : $!");
+    # tell the child how much data to expect, minus the header
+    my $write_size;
+    {use bytes; $write_size = length($xml)}
+    $write_size = sprintf("%*s", WRITE_PIPE_DATA_SIZE, $write_size);
 
     for (0..2) {
 
@@ -262,16 +251,13 @@ sub write_child {
         local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
 
         # send message to child data pipe
-        syswrite($child->{pipe_to_child}, $xml);
+        syswrite($child->{pipe_to_child}, $write_size . $xml);
 
         last unless $self->{sig_pipe};
         $logger->error("server: got SIGPIPE writing to $child, retrying...");
         usleep(50000); # 50 msec
     }
 
-    flock($self->{lock_file_handle}, LOCK_UN) or 
-        $logger->error("server: cannot de-flock : $!");
-
     $logger->error("server: unable to send request message to child $child") if $self->{sig_pipe};
 }
 
@@ -508,7 +494,7 @@ use OpenSRF::Transport::PeerHandle;
 use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use OpenSRF::Utils::Logger qw($logger);
 use OpenSRF::DomainObject::oilsResponse qw/:status/;
-use Fcntl qw(:flock F_GETFL F_SETFL O_NONBLOCK);
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Time::HiRes qw(time usleep);
 use POSIX qw/:sys_wait_h :errno_h/;
 
@@ -537,26 +523,11 @@ sub set_block {
     fcntl($fh, F_SETFL, $flags);
 }
 
-sub open_lock_file {
-    my $self = shift;
-
-    # close our copy of the parent's lock file since we won't be using it
-    close($self->{parent}->{lock_file_handle}) 
-        if $self->{parent}->{lock_file_handle};
-
-    my $fname = $self->{parent}->{lock_file};
-    unless (open($self->{lock_file_handle}, '>>', $fname)) {
-        $logger->error("server: child cannot open lock file $fname : $!");
-        die "server: child cannot open lock file $fname : $!\n";
-    }
-}
-
 # ----------------------------------------------------------------
 # Connects to Jabber and runs the application child_init
 # ----------------------------------------------------------------
 sub init {
     my $self = shift;
-    $self->open_lock_file;
     my $service = $self->{parent}->{service};
     $0 = "OpenSRF Drone [$service]";
     OpenSRF::Transport::PeerHandle->construct($service);
@@ -617,59 +588,64 @@ sub wait_for_request {
     my $self = shift;
 
     my $data = '';
-    my $read_size = 1024;
+    my $buf_size = 4096;
     my $nonblock = 0;
     my $read_pipe = $self->{pipe_to_parent};
-
-    # wait for some data to start arriving
-    my $read_set = IO::Select->new;
-    $read_set->add($read_pipe);
+    my $data_size;
+    my $total_read;
+    my $first_read = 1;
 
     while (1) {
-        # if can_read is interrupted while blocking, 
-        # go back and wait again until it it succeeds.
-        last if $read_set->can_read;
-    }
 
-    # Parent has started sending data to us.
-    # Wait for the parent to write all data to the pipe.  Then, immediately release 
-    # the lock so the parent can start writing data to other child processes.  
-    # Note: there is no danger of a subsequent message coming from the parent on 
-    # the same pipe, since this child is now marked as active.
-    flock($self->{lock_file_handle}, LOCK_EX) or $logger->error("server: cannot flock : $!");
-    flock($self->{lock_file_handle}, LOCK_UN) or $logger->error("server: cannot de-flock : $!");
-
-    # we have all data now so all reads can be done in non-blocking mode
-    $self->set_nonblock($read_pipe);
+        # wait for some data to start arriving
+        my $read_set = IO::Select->new;
+        $read_set->add($read_pipe);
+    
+        while (1) {
+            # if can_read is interrupted while blocking, 
+            # go back and wait again until it succeeds.
+            last if $read_set->can_read;
+        }
 
-    while(1) {
-        my $sig_pipe = 0;
-        local $SIG{'PIPE'} = sub { $sig_pipe = 1 };
+        # parent started writing, let's start reading
+        $self->set_nonblock($read_pipe);
 
-        my $buf = '';
-        my $n = sysread($self->{pipe_to_parent}, $buf, $read_size);
+        while (1) {
+            # pull as much data from the pipe as possible
 
-        unless(defined $n) {
+            my $buf = '';
+            my $bytes_read = sysread($self->{pipe_to_parent}, $buf, $buf_size);
 
-            if ($sig_pipe) {
-                $logger->info("server: $self got SIGPIPE reading data from parent, retrying...");
-                usleep(50000); # 50 msec
-                next;
+            unless(defined $bytes_read) {
+                $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!;
+                last;
             }
 
-            $logger->error("server: error reading data pipe: $!") unless EAGAIN == $!; 
-            last;
+            last if $bytes_read <= 0; # no more data available for reading
+
+            $total_read += $bytes_read;
+            $data .= $buf;
         }
 
-        last if $n <= 0; # no data left to read
+        # we've read all the data currently available on the pipe.
+        # let's see if we're done.
+
+        if ($first_read) {
+            # extract the data size and remove the size header
+            my $wps_size = OpenSRF::Server::WRITE_PIPE_DATA_SIZE;
+            $data_size = int(substr($data, 0, $wps_size)) + $wps_size;
+            $data = substr($data, $wps_size);
+            $first_read = 0;
+        }
 
-        $data .= $buf;
+        $self->set_block($self->{pipe_to_parent});
 
-        last if $n < $read_size; # done reading all data
+        if ($total_read == $data_size) {
+            # we've read all the data. Nothing left to do
+            last;
+        }
     }
 
-    # return to blocking mode
-    $self->set_block($self->{pipe_to_parent});
     return $data;
 }
 
index a720d26..7fd7195 100644 (file)
@@ -99,8 +99,7 @@ sub run_service {
         min_children =>  $getval->(unix_config => 'min_children') || 1,
         min_spare_children =>  $getval->(unix_config => 'min_spare_children'),
         max_spare_children =>  $getval->(unix_config => 'max_spare_children'),
-        stderr_log_path => $stderr_path,
-        lock_file_path => $pid_dir
+        stderr_log_path => $stderr_path
     );
 
     while(1) {