From: Bill Erickson Date: Tue, 29 May 2018 21:39:20 +0000 (-0400) Subject: JBAS-2036 Import pingest; add weekly batches X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=5eaa2185561c721ab0af88d724f41dd037b41c62;p=working%2FEvergreen.git JBAS-2036 Import pingest; add weekly batches Import the pingest script for safe keeping. Adds support for a new --week-batch option for reingesting all bib records over a 7 day period. Signed-off-by: Bill Erickson --- diff --git a/KCLS/utility-scripts/CRONTAB b/KCLS/utility-scripts/CRONTAB index 133070c9e8..c516d073fe 100644 --- a/KCLS/utility-scripts/CRONTAB +++ b/KCLS/utility-scripts/CRONTAB @@ -172,3 +172,7 @@ BACKSTAGE_PASSWORD = BSPASS # Monthly UMS user balance summary export 0 3 1 * * . ~/.bashrc && cd $SCRIPT_DIR/ums_user_balances/ && ./ums_user_balances.sh +# Re-ingest all bib records over a 1 week period +# Note --skip-* options are also available to limit reingest actions +# 10 0 * * * . ~/.bashrc && cd $SCRIPT_DIR/bib_reingest/ && ./pingest.pl --week-batch + diff --git a/KCLS/utility-scripts/bib_reingest/pingest.pl b/KCLS/utility-scripts/bib_reingest/pingest.pl new file mode 100755 index 0000000000..c13ae8fc46 --- /dev/null +++ b/KCLS/utility-scripts/bib_reingest/pingest.pl @@ -0,0 +1,400 @@ +#!/usr/bin/perl +# --------------------------------------------------------------- +# Copyright © 2013,2014 Merrimack Valley Library Consortium +# Jason Stephenson +# +# Heavily modified by Bill Erickson +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# --------------------------------------------------------------- +# This guy parallelizes a reingest. +use strict; +use warnings; +use DBI; +use Getopt::Long; +use DateTime; +use Sys::Syslog qw(syslog openlog); + +# Globals for the command line options: -- + +# You will want to adjust the next two based on your database size, +# i.e. number of bib records as well as the number of cores on your +# database server. Using roughly number of cores/2 doesn't seem to +# have much impact in off peak times. +my $batch_size = 100; # records processed per batch +my $max_child = 2; # max number of parallel worker processes + +my $skip_browse; # Skip the browse reingest. +my $skip_attrs; # Skip the record attributes reingest. +my $skip_search; # Skip the search reingest. +my $skip_facets; # Skip the facets reingest. +my $min_id; # start processing at this bib ID. +my $max_id; # stop processing when this bib ID is reached. +my $max_duration; # max processing duration in seconds +my $newest_first; # Process records in descending order of edit_date +my $sort_id_desc; # Sort by record ID descending +my $log_stdout; # Clone log messages to stdout +my $help; # show help text +my $week_batch; # True if perfming a one-week reingest cycle + +my $syslog_facility = 'LOCAL6'; # matches Evergreen gateway +my $syslog_ops = 'pid'; +my $syslog_ident = 'pingest'; + +GetOptions( + 'batch-size=i' => \$batch_size, + 'max-child=i' => \$max_child, + 'skip-browse' => \$skip_browse, + 'skip-attrs' => \$skip_attrs, + 'skip-search' => \$skip_search, + 'skip-facets' => \$skip_facets, + 'min-id=i' => \$min_id, + 'max-id=i' => \$max_id, + 'newest-first' => \$newest_first, + 'sort-id-desc' => \$sort_id_desc, + 'max-duration=i' => \$max_duration, + 'log-stdout' => \$log_stdout, + 'week-batch' => \$week_batch, + 'help' => \$help +); + +sub help { + print <now(time_zone => 'local')->strftime('%F %T'); + my $msg_str = "$date_str [$$] $level $msg\n"; + + if ($die) { + die $msg_str; + + } else { + if ($level eq 'ERR' or $level eq 'WARNING') { + # always clone error messages to stdout + warn $msg_str; # avoid dupe messages + } elsif ($log_stdout) { + print $msg_str; + } + } +} + +announce('DEBUG', + "pingest starting with batch-size $batch_size and max_child $max_child"); + +my $where = "WHERE deleted = 'f'"; +if ($min_id && $max_id) { + $where .= " AND id BETWEEN $min_id AND $max_id"; +} elsif ($min_id) { + $where .= " AND id >= $min_id"; +} elsif ($max_id) { + $where .= " AND id <= $max_id"; +} + +my $order_by = 'ORDER BY '; +if ($newest_first) { + $order_by .= 'edit_date DESC, id DESC'; +} elsif ($sort_id_desc) { + $order_by .= 'id DESC'; +} else { + $order_by .= 'id ASC'; +} + +# "Gimme the keys! I'll drive!" +my $q = <= $max_duration; + return 0; +} + +# All of the DBI->connect() calls in this file assume that you have +# configured the PGHOST, PGPORT, PGDATABASE, PGUSER, and PGPASSWORD +# variables in your execution environment. If you have not, you have +# two options: +# +# 1) configure them +# +# 2) edit the DBI->connect() calls in this program so that it can +# connect to your database. +my $dbh = DBI->connect('DBI:Pg:'); +my $dow_start_pos = 0; +my $dow_end_pos; + +# When processing a week batch, first find the dow cut-off boundaries +if ($week_batch) { + my $count_query = "SELECT COUNT(id) FROM biblio.record_entry $where"; + my $result = $dbh->selectall_arrayref($count_query); + my $bib_count = $result->[0][0]; + my $buk_size = int($bib_count / 7); + my $dow = DateTime->now->day_of_week - 1; # Monday is 1 + my $day = 0; + while ($day <= $dow) { + $dow_start_pos = $day * $buk_size; + $dow_end_pos = $dow_start_pos + $buk_size; + $day++; + } + + announce('INFO', "Processing bib ID dow start_pos=$dow_start_pos ". + "to end_pos=$dow_end_pos"); +} + +my $results = $dbh->selectall_arrayref($q); +my $week_counter = -1; +foreach my $r (@$results) { + + if ($week_batch) { + $week_counter++; + if ($week_counter < $dow_start_pos || + $week_counter > $dow_end_pos) { + # In weekly batch mode ignore records that do not fall in the + # day-of-week dow. + next; + } + } + + my $record = $r->[0]; + push(@blist, $record); # separate list of browse-only ingest + push(@$records, $record); + if (++$count == $batch_size) { + $lol[$lists++] = $records; + $count = 0; + $records = []; + } +} +$lol[$lists++] = $records if ($count); # Last batch is likely to be small. +$dbh->disconnect(); + +announce('INFO', 'Processing '.scalar(@blist).' bib records'); + +# We're going to reuse $count to keep track of the total number of +# batches processed. +$count = 0; + +# @running keeps track of the running child processes. +my @running = (); + +# We start the browse-only ingest before starting the other ingests. +browse_ingest(@blist) unless ($skip_browse); + +# We loop until we have processed all of the batches stored in @lol +# or the maximum processing duration has been reached. +my $last_record; +while ($count < $lists) { + my $duration_expired = duration_expired(); + + if (scalar(@lol) && scalar(@running) < $max_child && !$duration_expired) { + # Reuse $records for the lulz. + $records = shift(@lol); + $last_record = $records->[-1]; + if ($skip_search && $skip_facets && $skip_attrs) { + $count++; + } else { + reingest($records); + } + } else { + my $pid = wait(); + if (grep {$_ == $pid} @running) { + @running = grep {$_ != $pid} @running; + $count++; + announce('DEBUG', "reingest() processed $count of ". + "$lists batches; last record = $last_record"); + } + } + + if ($duration_expired && scalar(@running) == 0) { + announce('INFO', + "reingest() stopping on $last_record at max duration"); + exit(0); + } +} + +# This subroutine forks a process to do the browse-only ingest on the +# @blist above. It cannot be parallelized, but can run in parrallel +# to the other ingests. +sub browse_ingest { + my @list = @_; + my $pid = fork(); + if (!defined($pid)) { + die "failed to spawn child"; + } elsif ($pid > 0) { + # Add our browser to the list of running children. + push(@running, $pid); + # Increment the number of lists, because this list was not + # previously counted. + $lists++; + } elsif ($pid == 0) { + my $dbh = DBI->connect('DBI:Pg:'); + my $sth = $dbh->prepare("SELECT metabib.reingest_metabib_field_entries(?, TRUE, FALSE, TRUE)"); + my $total = scalar(@list); + my $count = 0; + foreach (@list) { + if ($sth->execute($_)) { + my $crap = $sth->fetchall_arrayref(); + } else { + announce('WARNING', "Browse ingest failed for record $_"); + } + if (duration_expired()) { + announce('INFO', + "browse_ingest() stopping on record $_ on max duration"); + last; + } + + announce('DEBUG', "browse_ingest() processed $count ". + "of $total records; last record = $_") + if ++$count % $batch_size == 0; + } + $dbh->disconnect(); + exit(0); + } +} + +# Fork a child to do the other reingests: + +sub reingest { + my $list = shift; + my $pid = fork(); + if (!defined($pid)) { + die "Failed to spawn a child"; + } elsif ($pid > 0) { + push(@running, $pid); + } elsif ($pid == 0) { + my $dbh = DBI->connect('DBI:Pg:'); + reingest_attributes($dbh, $list) unless ($skip_attrs); + reingest_field_entries($dbh, $list) + unless ($skip_facets && $skip_search); + $dbh->disconnect(); + exit(0); + } +} + +# Reingest metabib field entries on a list of records. +sub reingest_field_entries { + my $dbh = shift; + my $list = shift; + my $sth = $dbh->prepare("SELECT metabib.reingest_metabib_field_entries(?, ?, TRUE, ?)"); + # Because reingest uses "skip" options we invert the logic of do variables. + $sth->bind_param(2, ($skip_facets) ? 1 : 0); + $sth->bind_param(3, ($skip_search) ? 1 : 0); + foreach (@$list) { + $sth->bind_param(1, $_); + if ($sth->execute()) { + my $crap = $sth->fetchall_arrayref(); + } else { + announce('WARNING', + "metabib.reingest_metabib_field_entries failed for record $_"); + } + } +} + +# Reingest record attributes on a list of records. +sub reingest_attributes { + my $dbh = shift; + my $list = shift; + my $sth = $dbh->prepare(<bind_param(1, $_); + if ($sth->execute()) { + my $crap = $sth->fetchall_arrayref(); + } else { + announce('WARNING', + "metabib.reingest_record_attributes failed for record $_"); + } + } +}