WIP: pingest-in-QI user/miker/pingest-in-queued-ingest
authorMike Rylander <mrylander@gmail.com>
Mon, 19 Sep 2022 14:55:53 +0000 (10:55 -0400)
committerMike Rylander <mrylander@gmail.com>
Mon, 19 Sep 2022 14:55:53 +0000 (10:55 -0400)
Signed-off-by: Mike Rylander <mrylander@gmail.com>
Open-ILS/src/sql/Pg/030.schema.metabib.sql
Open-ILS/src/sql/Pg/upgrade/XXXX.schema.queued_ingest.sql
Open-ILS/src/support-scripts/ingest_ctl

index a5c806c..113f309 100644 (file)
@@ -1076,6 +1076,7 @@ BEGIN
 
     PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
     IF NOT FOUND THEN
+        -- XXX Need to include field_list in the WHERE clauses below!
         IF NOT b_skip_search THEN
             FOR fclass IN SELECT * FROM config.metabib_class LOOP
                 -- RAISE NOTICE 'Emptying out %', fclass.name;
index 33767d9..a78ead9 100644 (file)
@@ -341,42 +341,80 @@ END;
 $func$ LANGUAGE PLPGSQL;
 
 CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+    skip_facet   BOOL   := FALSE;
+    skip_display BOOL   := FALSE;
+    skip_browse  BOOL   := FALSE;
+    skip_search  BOOL   := FALSE;
+    skip_auth    BOOL   := FALSE;
+    skip_full    BOOL   := FALSE;
+    skip_attrs   BOOL   := FALSE;
+    skip_luri    BOOL   := FALSE;
+    skip_mrmap   BOOL   := FALSE;
+    only_attrs   TEXT[] := NULL;
+    only_fields  INT[]  := '{}'::INT[];
 BEGIN
 
     -- Record authority linking
+    SELECT extra LIKE '%skip_authority%' INTO skip_auth;
     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
-    IF NOT FOUND THEN
+    IF NOT FOUND AND NOT skip_auth THEN
         PERFORM biblio.map_authority_linking( bib.id, bib.marc );
     END IF;
 
     -- Flatten and insert the mfr data
+    SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
-    IF NOT FOUND THEN
+    IF NOT FOUND AND NOT skip_full THEN
         PERFORM metabib.reingest_metabib_full_rec(bib.id);
+    END IF;
 
-        -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
-        PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
-        IF NOT FOUND THEN
-            PERFORM metabib.reingest_record_attributes(bib.id, NULL, bib.marc, insert_only);
+    -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
+    SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
+    PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
+    IF NOT FOUND AND NOT skip_attrs THEN
+        IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
+            SELECT REGEXP_SPLIT_TO_ARRAY(
+                (REGEXP_MATCHES(extra, 'field_list\(\s*(\w[ ,\w]*?)\s*\)'))[1],
+                '\s*,\s*'
+            ) INTO only_attrs;
         END IF;
+
+        PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
     END IF;
 
     -- Gather and insert the field entry data
-    PERFORM metabib.reingest_metabib_field_entries(bib.id);
+    SELECT extra LIKE '%skip_facet%' INTO skip_facet;
+    SELECT extra LIKE '%skip_display%' INTO skip_display;
+    SELECT extra LIKE '%skip_browse%' INTO skip_browse;
+    SELECT extra LIKE '%skip_search%' INTO skip_search;
+
+    IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
+        SELECT REGEXP_SPLIT_TO_ARRAY(
+            (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
+            '\s*,\s*'
+        )::INT[] INTO only_fields;
+    END IF;
+
+    IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
+        PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
+    END IF;
 
     -- Located URI magic
+    SELECT extra LIKE '%skip_luri%' INTO skip_luri;
     PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
-    IF NOT FOUND THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
+    IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
 
     -- (re)map metarecord-bib linking
+    SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
     IF insert_only THEN -- if not deleted and performing an insert, check for the flag
         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
-        IF NOT FOUND THEN
+        IF NOT FOUND AND NOT skip_mrmap THEN
             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
         END IF;
     ELSE -- we're doing an update, and we're not deleted, remap
         PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
-        IF NOT FOUND THEN
+        IF NOT FOUND AND NOT skip_mrmap THEN
             PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
         END IF;
     END IF;
index 2a38fe8..33692fc 100755 (executable)
@@ -51,6 +51,19 @@ my $queue_state_data = '';  # State data required for queue entry processing
 my $queue_owner;            # Owner of the queue 
 my $queue_run_at;           # Owner of the queue 
 my $queue_threads;          # parallelism for this queue (capped at max_child)
+my $skip_browse = 0;        # Skip the browse reingest.
+my $skip_attrs = 0;         # Skip the record attributes reingest.
+my $skip_search = 0;        # Skip the search reingest.
+my $skip_facets = 0;        # Skip the facets reingest.
+my $skip_display = 0;       # Skip the display reingest.
+my $skip_full_rec = 0;      # Skip the full_rec reingest.
+my $skip_authority = 0;     # Skip the authority reingest.
+my $skip_luri = 0;          # Skip the luri reingest.
+my $skip_mrmap = 0;         # Skip the metarecord remapping.
+my $record_attrs = [];      # Skip the metarecord remapping.
+my $metabib_fields = [];    # Skip the metarecord remapping.
+my $input_records = [];     # Records supplied via CLI switch.
+my $pingest = '';           # Special "pingest" flag, supplying an EG user name as queue owner.
 
 my $help;         # show help text
 
@@ -83,9 +96,22 @@ GetOptions(
     'queue-run-at=s'    => \$queue_run_at,
     'queue-threads=i'   => \$queue_threads,
     'queue-state-data=s'=> \$queue_state_data,
+    'skip-browse'       => \$skip_browse,
+    'skip-attrs'        => \$skip_attrs,
+    'skip-search'       => \$skip_search,
+    'skip-facets'       => \$skip_facets,
+    'skip-display'      => \$skip_display,
+    'skip-full_rec'     => \$skip_full_rec,
+    'skip-authority'    => \$skip_authority,
+    'skip-luri'         => \$skip_luri,
+    'skip-mr-map'       => \$skip_mrmap,
+    'attr=s@'           => \$record_attrs,
+    'field=s@'          => \$metabib_fields,
+    'record=s@'         => \$input_records,
     'start-id=i'        => \$start_id,
     'end-id=i'          => \$end_id,
     'pipe'              => \$opt_pipe,
+    'pingest=s'         => \$pingest,
     'coordinator'       => \$daemon,
     'chatty'            => \$chatty,
     'help'              => \$help
@@ -291,7 +317,10 @@ if ($daemon) { # background mode, we need a lockfile;
 }
 
 my $start_time = time;
-my %stats = (
+my %stats;
+
+sub reset_stats {
+  %stats = (
     total => {
     }, biblio    => {
         insert    => {},
@@ -303,17 +332,65 @@ my %stats = (
         update    => {},
         delete    => {}
     }, seconds => {}
-);
+  );
+}
+
+reset_stats();
 
 my %processors;
 my %queues_in_progress;
 
 my $db_connections_in_use = 0;
 
-if ($start_id || $end_id || $opt_pipe) { # enqueuing mode
+if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode
+
+    if ($pingest) { # special mode that sets up two queues that can run in parallel
+
+        my $no_browse = $skip_browse;
+        my $orig_stat_data = $queue_state_data;
+
+        # set up the first queue
+        $queue = undef;
+        $queue_threads //= 4;
+        $queue_type = 'biblio';
+        $queue_action = 'update';
+        $queue_why = 'pingest - fields and attributes queue';
+        $queue_owner = $pingest;
+
+        # for pingest mode, always skip authority and luri, and skip browse in the first queue
+        $skip_browse = 1;
+        $skip_authority = 1;
+        $skip_luri = 1;
+        
+        my $record_list = enqueue_input();
+        report_stats('Enqueuing '.$queue_why);
+
+        if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest
+            # set up the second queue
+            $queue = undef;
+            $queue_threads //= 4;
+            $queue_why = 'pingest - browse queue';
+            $queue_state_data = $orig_stat_data;
+
+            $skip_browse = 0;
+            $skip_attrs = 1;
+            $skip_search = 1;
+            $skip_facets = 1;
+            $skip_display = 1;
+            $skip_full_rec = 1;
+            $skip_mrmap = 1;
+
+            reset_stats();
+
+            enqueue_input($record_list);
+            report_stats('Enqueuing '.$queue_why);
+        }
+
+    } else { # just a regular, user-defined QI request
+        enqueue_input();
+        report_stats('Enqueuing');
+    }
 
-    enqueue_input();
-    report_stats('Enqueuing');
 
 } elsif ($queue && !$stats_only) { # single queue processing mode
 
@@ -972,15 +1049,20 @@ sub report_stats {
 }
 
 sub enqueue_input {
+    my $predestined_input = shift;
     my @input;
 
-    if ($opt_pipe) {
+    if ($predestined_input and @$predestined_input) {
+        @input = @$predestined_input;
+    } elsif ($opt_pipe) {
         while (<STDIN>) {
             # Assume any string of digits is an id.
             if (my @subs = /([0-9]+)/g) {
                 push(@input, @subs);
             }
         }
+    } elsif (@$input_records) {
+        @input = grep { /^\d+$/ } @$input_records;
     } else {
         my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED";
         if ($start_id && $end_id) {
@@ -1024,6 +1106,21 @@ sub enqueue_input {
     }
     $queue = $q_obj->{id} || '0'; 
 
+    if ($queue_type eq 'biblio' and $queue_action eq 'update') {
+        $queue_state_data .= ';skip_browse' if $skip_browse;
+        $queue_state_data .= ';skip_attrs' if $skip_attrs;
+        $queue_state_data .= ';skip_search' if $skip_search;
+        $queue_state_data .= ';skip_facets' if $skip_facets;
+        $queue_state_data .= ';skip_display' if $skip_display;
+        $queue_state_data .= ';skip_full_rec' if $skip_full_rec;
+        $queue_state_data .= ';skip_authority' if $skip_authority;
+        $queue_state_data .= ';skip_luri' if $skip_luri;
+        $queue_state_data .= ';skip_mrmap' if $skip_mrmap;
+
+        $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs;
+        $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields;
+    }
+
     my $qid = $q_obj->{id};
     my $run_at = $q_obj->{run_at} || 'NOW';
     for my $rid (@input) {
@@ -1043,5 +1140,6 @@ sub enqueue_input {
     }
     $main_dbh->commit;
 
+    return \@input;
 }