Batch fine generator caching/parallel optimizations user/berick/fine-gen-cache-parallel
authorBill Erickson <berickxx@gmail.com>
Wed, 5 Jul 2017 16:21:51 +0000 (12:21 -0400)
committerBill Erickson <berickxx@gmail.com>
Fri, 21 Jul 2017 15:05:03 +0000 (11:05 -0400)
* Fine generator caches org unit setting values per instance.  Once
  cached, the number of cstore calls per transaction is reduced by
  3 to 5 calls, depending on settings, when running in batch mode.

* Fine generator disconnects from cstore after processing each
  transaction giving cstores a chance to recycle and avoid memory
  gobbling on huge batches.

* Fine generator now collects parallel batches of transactions directly
  within the server-side generator API instead of requiring the caller
  to collect transactions up front for individual processing.  This lets
  us take advantage of the new org setting caching.

* Fine generator script improvements:
** Arguments are passed via GetOpt, with support for legacy-style
   opensrf config and lockfile name passing (with warning).
** supports a --parallel option to override the value from opensrf.xml.
** Sets OSRF_LOG_CLIENT for log tracing.

Signed-off-by: Bill Erickson <berickxx@gmail.com>
Open-ILS/src/perlmods/lib/OpenILS/Application/Circ/CircCommon.pm
Open-ILS/src/perlmods/lib/OpenILS/Application/Storage/Publisher/action.pm
Open-ILS/src/support-scripts/fine_generator.pl

index 1675712..ed58009 100644 (file)
@@ -219,13 +219,17 @@ sub create_bill {
 }
 
 sub extend_grace_period {
-    my($class, $circ_lib, $due_date, $grace_period, $e, $h) = @_;
+    my($class, $circ_lib, $due_date, $grace_period, $e, $h, $ous_cache) = @_;
+    $ous_cache ||= {};
+
     if ($grace_period >= 86400) { # Only extend grace periods greater than or equal to a full day
         my $parser = DateTime::Format::ISO8601->new;
         my $due_dt = $parser->parse_datetime( cleanse_ISO8601( $due_date ) );
         my $due = $due_dt->epoch;
 
-        my $grace_extend = $U->ou_ancestor_setting_value($circ_lib, 'circ.grace.extend');
+        my $grace_extend = $class->get_cached_ou_setting(
+            $e, $ous_cache, $circ_lib, 'circ.grace.extend');
+
         $e = new_editor() if (!$e);
         $h = $e->retrieve_actor_org_unit_hours_of_operation($circ_lib) if (!$h);
         if ($grace_extend and $h) { 
@@ -251,10 +255,14 @@ sub extend_grace_period {
             } else {
                 # Extra nice grace periods
                 # AKA, merge closed dates trailing the grace period into the grace period
-                my $grace_extend_into_closed = $U->ou_ancestor_setting_value($circ_lib, 'circ.grace.extend.into_closed');
+
+                my $grace_extend_into_closed = $class->get_cached_ou_setting(
+                    $e, $ous_cache, $circ_lib, 'circ.grace.extend.into_closed');
+
                 $due += 86400 if $grace_extend_into_closed;
 
-                my $grace_extend_all = $U->ou_ancestor_setting_value($circ_lib, 'circ.grace.extend.all');
+                my $grace_extend_all = $class->get_cached_ou_setting(
+                    $e, $ous_cache, $circ_lib, 'circ.grace.extend.all');
 
                 if ( $grace_extend_all ) {
                     # Start checking the day after the item was due
@@ -425,6 +433,17 @@ sub seconds_to_interval_hash {
         return %output;
 }
 
+# return org setting value, using the value found in $cache if available.
+sub get_cached_ou_setting {
+    my ($class, $e, $cache, $org_id, $setting) = @_;
+    $cache ||= {};
+    $cache->{$org_id} = {} unless $cache->{$org_id};
+    $cache->{$org_id}->{$setting} = 
+        $U->ou_ancestor_setting_value($org_id, $setting)
+        unless exists $cache->{$org_id}->{$setting};
+    return $cache->{$org_id}->{$setting};
+}
+
 sub generate_fines {
     my ($class, $args) = @_;
     my $circs = $args->{circs};
@@ -444,6 +463,9 @@ sub generate_fines {
 
     my %hoo = map { ( $_->id => $_ ) } @{ $e->retrieve_all_actor_org_unit_hours_of_operation };
 
+    # cache org unit setting values per fine generator instance.
+    my $ous_cache = {};
+
     my $handling_resvs = 0;
     for my $c (@$circs) {
 
@@ -483,9 +505,13 @@ sub generate_fines {
 
         eval {
 
-            # Clean up after previous transaction.  
-            # This is a no-op if there is no open transaction.
-            $e->xact_rollback if $commit;
+            # Clean up after previous transaction and disconnect from
+            # cstore drone.  Forcing a disconnect helps prevent memory
+            # exhaustion when processing huge batches of transactions.
+            # Note the disconnect is forced here instead of during
+            # $e->xact_commit to ensure a disconnect happens with every
+            # transaction regardless of whether a billing was created.
+            $e->rollback if $commit;
 
             $logger->info(sprintf("Processing $ctype %d...", $c->id));
 
@@ -546,7 +572,10 @@ sub generate_fines {
                 $logger->info( "Potential first billing for circ ".$c->id );
                 $last_fine = $due;
 
-                $grace_period = extend_grace_period($class, $c->$circ_lib_method,$c->$due_date_method,$grace_period,undef,$hoo{$c->$circ_lib_method});
+                $grace_period = extend_grace_period(
+                    $class, $c->$circ_lib_method, $c->$due_date_method, 
+                    $grace_period, $e, $hoo{$c->$circ_lib_method}, $ous_cache
+                );
             }
 
             return if ($last_fine > $now);
@@ -568,13 +597,13 @@ sub generate_fines {
             my $recurring_fine = int($c->$recurring_fine_method * 100);
             my $max_fine = int($c->max_fine * 100);
 
-            my $skip_closed_check = $U->ou_ancestor_setting_value(
-                $c->$circ_lib_method, 'circ.fines.charge_when_closed');
-            $skip_closed_check = $U->is_true($skip_closed_check);
+            my $skip_closed_check = $U->is_true(
+                $class->get_cached_ou_setting($e, $ous_cache, 
+                    $c->$circ_lib_method, 'circ.fines.charge_when_closed'));
 
-            my $truncate_to_max_fine = $U->ou_ancestor_setting_value(
-                $c->$circ_lib_method, 'circ.fines.truncate_to_max_fine');
-            $truncate_to_max_fine = $U->is_true($truncate_to_max_fine);
+            my $truncate_to_max_fine = $U->is_true(
+                $class->get_cached_ou_setting($e, $ous_cache, 
+                    $c->$circ_lib_method, 'circ.fines.truncate_to_max_fine'));
 
             my ($latest_billing_ts, $latest_amount) = ('',0);
             for (my $bill = 1; $bill <= $pending_fine_count; $bill++) {
index c6118f2..a3d52d4 100644 (file)
@@ -153,11 +153,17 @@ __PACKAGE__->register_method(
 # of transaction the ID refers to without having to query the DB.
 # skip_no_fines - filter out transactions which will never be billed, 
 # e.g. circs with a $0 max fine or $0 recurring fine.
+#
+# parallel_count -- number of parallel processes handling these circs
+# parallel_slot  -- which batch of parallel circs to process.  
+#     value is 1..$parallel_count
 sub overdue_circs {
     my $upper_interval = shift || '1 millennium';
     my $idlist = shift;
     my $partition = shift;
     my $skip_no_fines = shift;
+    my $parallel_count = shift || 1;
+    my $parallel_slot = shift || 1;
 
     # Only retrieve ID's in the initial query if that's all the caller needs.
     my $contents = $idlist ? 'id' : '*';
@@ -165,6 +171,12 @@ sub overdue_circs {
     my $fines_filter = $skip_no_fines ? 
         'AND recurring_fine <> 0 AND max_fine <> 0' : '';
 
+    my $id_filter = '';
+    if ($parallel_count > 1) {
+        $parallel_slot--; # translate to zero-based slot numbers
+        $id_filter = "AND MOD(id, $parallel_count) = $parallel_slot";
+    }
+
     my $c_t = action::circulation->table;
 
     my $sql = <<"    SQL";
@@ -172,6 +184,7 @@ sub overdue_circs {
           FROM  $c_t
           WHERE stop_fines IS NULL
             $fines_filter
+            $id_filter
             AND due_date < ( CURRENT_TIMESTAMP - grace_period )
             AND fine_interval < ?::INTERVAL
     SQL
@@ -202,6 +215,9 @@ sub overdue_circs {
 
     push @circs, map { $idlist ? $_->{id} : booking::reservation->construct($_) } $sth->fetchall_hash;
 
+    $logger->info("parallel=$parallel_count slot=$parallel_slot ".
+        "processing ".scalar(@circs)." transactions");
+
     return @circs;
 }
 
@@ -998,6 +1014,8 @@ sub generate_fines {
     my $self = shift;
     my $client = shift;
     my $circ_id = shift;
+    my $parallel_count = shift || 1;
+    my $parallel_slot = shift;
 
     my $circs;
     my $editor = new_editor;
@@ -1014,7 +1032,8 @@ sub generate_fines {
             $circs = $editor->search_booking_reservation->search_where( { id => $circ_id, return_time => undef, cancel_time => undef } );
         }
     } else {
-        $circs = [overdue_circs(undef, 1, 1, 1)];
+        $circs = 
+            [overdue_circs(undef, 1, 1, 1, $parallel_count, $parallel_slot)];
     }
 
     return OpenILS::Application::Circ::CircCommon->generate_fines({circs => $circs, conn => $client})
index 5d0ac8e..b6e6988 100755 (executable)
 
 use strict; 
 use warnings;
-use OpenSRF::Utils::JSON;
+use Getopt::Long;
 use OpenSRF::System;
+use OpenSRF::AppSession;
 use OpenSRF::Utils::SettingsClient;
-use OpenSRF::MultiSession;
+$ENV{OSRF_LOG_CLIENT} = 1;
 
-my $config = shift || die "bootstrap config required\n";
-my $lockfile = shift || "/tmp/generate_fines-LOCK";
-my $grace = shift;
+my $help;
+my $osrf_config = '/openils/conf/opensrf_core.xml';
+my $lockfile = '/tmp/generate_fines-LOCK';
+my $parallel = 0; # default to opensrf.xml value
 
-if (defined($grace)) {
-    die "Grace period is now defined in the database. It should not be passed to the fine generator.";
+GetOptions(
+    'help'                  => \$help,
+    'osrf-config=s'         => \$osrf_config,
+    'lockfile=s'            => \$lockfile,
+    'parallel=i'            => \$parallel,
+) || die "\nSee --help for more\n";
+
+
+# Support legacy (pre-get-opt) argument passing with warnings.
+my $legacy_config = shift;
+if ($legacy_config) {
+    warn "Loading opensrf config: $legacy_config.\n".
+        "Use --osrf-config instead to silence this warning\n";
+    $osrf_config = $legacy_config;
+}
+
+my $legacy_lockfile = shift;
+if ($legacy_lockfile) {
+    warn "Loading lockfile: $legacy_lockfile.\n".
+        "Use --lockfile instead to silence this warning\n";
+    $lockfile = $legacy_lockfile;
+}
+
+sub help {
+    print <<HELP;
+
+Batch fine generator
+
+$0 --osrf-config $osrf_config --lockfile $lockfile --parallel 3 
+
+Options
+
+    --osrf-config [/openils/conf/opensrf_core.xml] 
+        OpenSRF config file.
+
+    --lockfile [/tmp/generate_fines-LOCK]
+        Full path to lock file
+
+    --parallel
+        Number of parallel fine generator processes to run.
+        When set, this overrides the value from opensrf.xml
+
+    --verbose
+        Print process counts
+HELP
+
+    exit(0);
 }
+
+help() if $help;
+
+
+# If a lockfile exists and the PID in the file is in fact running, exit.
+# If the lockfile references a dead PID, just replace it.
 if (-e $lockfile) {
-        open(F,$lockfile);
-        my $pid = <F>;
-        close F;
-
-        open(F,'/bin/ps axo pid|');
-        while ( my $p = <F>) {
-                chomp($p);
-                if ($p =~ s/\s*(\d+)$/$1/o && $p == $pid) {
-                        die "I seem to be running already at pid $pid.  If not, try again\n";
-                }
+    open(F,$lockfile);
+    my $pid = <F>;
+    close F;
+
+    open(F,'/bin/ps axo pid|');
+    while (my $p = <F>) {
+        chomp($p);
+        if ($p =~ s/\s*(\d+)$/$1/o && $p == $pid) {
+            die "I seem to be running already at pid $pid.  If not, try again\n";
         }
-        close F;
+    }
+    close F;
 }
 
-open(F, ">$lockfile");
+open(F, ">$lockfile") or die "Cannot open $lockfile : $!\n";
 print F $$;
 close F;
 
-OpenSRF::System->bootstrap_client( config_file => $config );
-my $settings = OpenSRF::Utils::SettingsClient->new;
-my $parallel = $settings->config_value( fine_generator => 'parallel' ) || 1; 
-
-if ($parallel == 1) {
-
-    my $r = OpenSRF::AppSession
-            ->create( 'open-ils.storage' )
-            ->request( 'open-ils.storage.action.circulation.overdue.generate_fines' );
+sub init {
+    OpenSRF::System->bootstrap_client(config_file => $osrf_config);
+    Fieldmapper->import(
+        IDL => OpenSRF::Utils::SettingsClient->new->config_value("IDL"));
 
-    while (!$r->complete) { $r->recv };
+    if (!$parallel) {
+        my $settings = OpenSRF::Utils::SettingsClient->new;
+        $parallel = $settings->config_value(fine_generator => 'parallel') || 1;
+    }
+}
 
-} else {
+eval { # Make sure we can delete the lock file.
 
-    my $multi_generator = OpenSRF::MultiSession->new(
-        app => 'open-ils.storage', 
-        cap => $parallel, 
-        api_level => 1,
-    );
+    init();
 
-    my $storage = OpenSRF::AppSession->create("open-ils.storage");
-    my $r = $storage->request('open-ils.storage.action.circulation.overdue.id_list');
-    while (my $resp = $r->recv(timeout => 600)) {
-        my $circ_id = $resp->content;
-        $multi_generator->request( 'open-ils.storage.action.circulation.overdue.generate_fines', $circ_id );
+    # Launch one fine generator request per parallel count.
+    my @reqs;
+    for my $slot (1..$parallel) {
+        my $r = OpenSRF::AppSession->create('open-ils.storage')->request(
+            'open-ils.storage.action.circulation.overdue.generate_fines',
+            undef, $parallel, $slot
+        );
+        push(@reqs, $r);
     }
-    $storage->disconnect();
-    $multi_generator->session_wait(1);
-    $multi_generator->disconnect;
 
-}
+    # Pull responses from the response queue and discard until complete.
+    # The first recv response can take a while to arrive.
+    while (@reqs) {
+        foreach (@reqs) {
+            $_->recv(timeout => 600);
+            die $_->failed . "\n" if $_->failed;
+        }
+        @reqs = grep {!$_->complete} @reqs;
+    }
+};
 
 unlink $lockfile;
+