bib marc record index
authorBill Erickson <berickxx@gmail.com>
Tue, 27 Aug 2019 21:50:47 +0000 (17:50 -0400)
committerBill Erickson <berickxx@gmail.com>
Mon, 13 Jun 2022 20:01:13 +0000 (16:01 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
Open-ILS/src/perlmods/lib/OpenILS/Elastic.pm
Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm [new file with mode: 0644]
Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm
Open-ILS/src/sql/Pg/upgrade/XXXX.schema.elastic-search.sql
Open-ILS/src/support-scripts/elastic-index.pl

index 8f6c4ae..181d6c0 100644 (file)
@@ -241,6 +241,20 @@ sub search {
     return $result;
 }
 
+# Lucene has a hard limit on the size of an indexable chunk.
+# Avoid trying to index such data by lazily chopping it off
+# at 1/4 the limit to accomodate all UTF-8 chars.
+sub truncate_value {
+    my ($self, $value) = @_;
+
+    if (length(Encode::encode('UTF-8', $value)) > 32760) {
+        $value = substr($value, 0, 8190);
+    }
+
+    return $value;
+}
+
+
 
 
 1;
diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm
new file mode 100644 (file)
index 0000000..bf02b66
--- /dev/null
@@ -0,0 +1,366 @@
+package OpenILS::Elastic::BibMarc;
+# ---------------------------------------------------------------
+# Copyright (C) 2019 King County Library System
+# Author: Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR code.  See the
+# GNU General Public License for more details.
+# ---------------------------------------------------------------
+use strict;
+use warnings;
+use Encode;
+use DateTime;
+use Time::HiRes qw/time/;
+use OpenSRF::Utils::Logger qw/:logger/;
+use OpenSRF::Utils::JSON;
+use OpenILS::Utils::CStoreEditor qw/:funcs/;
+use OpenILS::Utils::DateTime qw/interval_to_seconds/;
+use OpenILS::Elastic;
+use base qw/OpenILS::Elastic/;
+use Data::Dumper;
+$Data::Dumper::Indent = 0;
+
+my $INDEX_NAME = 'bib-marc';
+
+# number of bibs to index per batch.
+my $BIB_BATCH_SIZE = 500;
+
+# TODO: it's possible to apply multiple language analyzers.
+my $LANG_ANALYZER = 'english';
+
+my $BASE_INDEX_SETTINGS = {
+    analysis => {
+        analyzer => {
+            folding => {
+                filter => ['lowercase', 'asciifolding'],
+                tokenizer => 'standard'
+            }
+        },
+        normalizer =>  {
+            custom_lowercase => {
+                type => 'custom',
+                filter => ['lowercase']
+            }
+        }
+    }
+};
+
+my $BASE_PROPERTIES = {
+    source => {type => 'integer', index => 'false'},
+    create_date => {type => 'date'},
+    edit_date => {type => 'date'},
+    bib_source => {type => 'integer'},
+    marc => {
+        type => 'nested',
+        properties => {
+            tag => {type => 'keyword'},
+            subfield => {type => 'keyword'},
+            value => {type => 'text'}
+        }
+    }
+};
+
+sub index_name {
+    return $INDEX_NAME;
+}
+
+sub index {
+    my $self = shift;
+    return $self->{index} if $self->{index};
+    ($self->{index}) = grep {$_->code eq $INDEX_NAME} @{$self->indices};
+
+    if (!$self->{index}) {
+        $logger->error("No index configuration exists for '$INDEX_NAME'");
+        return undef;
+    }
+
+    return $self->{index};
+}
+
+sub create_index {
+    my ($self) = @_;
+
+    if ($self->es->indices->exists(index => $INDEX_NAME)) {
+        $logger->warn("ES index '$INDEX_NAME' already exists");
+        return;
+    }
+
+    $logger->info(
+        "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'");
+
+    my $mappings = $BASE_PROPERTIES;
+    my $settings = $BASE_INDEX_SETTINGS;
+    $settings->{number_of_replicas} = scalar(@{$self->nodes});
+    $settings->{number_of_shards} = $self->index->num_shards;
+
+    my $conf = {
+        index => $INDEX_NAME,
+        body => {settings => $settings}
+    };
+
+    $logger->info("ES creating index '$INDEX_NAME'");
+
+    # Create the base index with settings
+    eval { $self->es->indices->create($conf) };
+
+    if ($@) {
+        $logger->error("ES failed to create index cluster=".  
+            $self->cluster. "index=$INDEX_NAME error=$@");
+        warn "$@\n\n";
+        return 0;
+    }
+
+    # Create each mapping one at a time instead of en masse so we 
+    # can more easily report when mapping creation fails.
+
+    for my $field (keys %$mappings) {
+        $logger->info("ES Creating index mapping for field $field");
+
+        eval { 
+            $self->es->indices->put_mapping({
+                index => $INDEX_NAME,
+                type  => 'record',
+                body  => {properties => {$field => $mappings->{$field}}}
+            });
+        };
+
+        if ($@) {
+            my $mapjson = OpenSRF::Utils::JSON->perl2JSON($mappings->{$field});
+
+            $logger->error("ES failed to create index mapping: " .
+                "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson");
+
+            warn "$@\n\n";
+            return 0;
+        }
+    }
+
+    return 1;
+}
+
+sub populate_index {
+    my ($self, $settings) = @_;
+    $settings ||= {};
+
+    my $index_count = 0;
+    my $total_indexed = 0;
+
+    # extract the database settings.
+    for my $db_key (grep {$_ =~ /^db_/} keys %$settings) {
+        $self->{$db_key} = $settings->{$db_key};
+    }
+
+    my $end_time;
+    my $duration = $settings->{max_duration};
+    if ($duration) {
+        my $seconds = interval_to_seconds($duration);
+        $end_time = DateTime->now;
+        $end_time->add(seconds => $seconds);
+    }
+
+    while (1) {
+
+        $index_count = $self->populate_bib_index_batch($settings);
+        $total_indexed += $index_count;
+
+        $logger->info("ES indexed $total_indexed bib records");
+
+        # exit if we're only indexing a single record or if the 
+        # batch indexer says there are no more records to index.
+        last if !$index_count || $settings->{index_record};
+
+        if ($end_time && DateTime->now > $end_time) {
+            $logger->info(
+                "ES index populate exiting early on max_duration $duration");
+            last;
+        }
+    } 
+
+    $logger->info("ES bib indexing complete with $total_indexed records");
+}
+
+sub get_bib_ids {
+    my ($self, $state) = @_;
+
+    # A specific record is selected for indexing.
+    return [$state->{index_record}] if $state->{index_record};
+
+    my $start_id = $state->{start_record} || 0;
+    my $stop_id = $state->{stop_record};
+    my $modified_since = $state->{modified_since};
+
+    my ($select, $from, $where);
+    if ($modified_since) {
+        $select = "SELECT id";
+        $from   = "FROM elastic.bib_last_mod_date";
+        $where  = "WHERE last_mod_date > '$modified_since'";
+    } else {
+        $select = "SELECT id";
+        $from   = "FROM biblio.record_entry";
+        $where  = "WHERE NOT deleted AND active";
+    }
+
+    $where .= " AND id >= $start_id" if $start_id;
+    $where .= " AND id <= $stop_id" if $stop_id;
+
+    # Ordering by ID is the simplest way to guarantee all requested
+    # records are processed, given that edit dates may not be unique
+    # and that we're using start_id/stop_id instead of OFFSET to
+    # define the batches.
+    my $order = "ORDER BY id";
+
+    my $sql = "$select $from $where $order LIMIT $BIB_BATCH_SIZE";
+
+    my $ids = $self->get_db_rows($sql);
+    return [ map {$_->{id}} @$ids ];
+}
+
+sub get_bib_data {
+    my ($self, $record_ids) = @_;
+
+    my $ids_str = join(',', @$record_ids);
+
+    my $sql = <<SQL;
+SELECT 
+    bre.id, 
+    bre.create_date, 
+    bre.edit_date, 
+    bre.source AS bib_source,
+    bre.deleted
+FROM biblio.record_entry bre
+WHERE bre.id IN ($ids_str)
+SQL
+
+    return $self->get_db_rows($sql);
+}
+
+sub populate_bib_index_batch {
+    my ($self, $state) = @_;
+
+    my $index_count = 0;
+
+    my $bib_ids = $self->get_bib_ids($state);
+    return 0 unless @$bib_ids;
+
+    $logger->info("ES indexing ".scalar(@$bib_ids)." records");
+
+    my $bib_data = $self->get_bib_data($bib_ids);
+
+    # Remove records that are marked deleted.
+    # This should only happen when running in refresh mode.
+
+    my @active_ids;
+    for my $bib_id (@$bib_ids) {
+
+        # Every row in the result data contains the 'deleted' value.
+        my ($field) = grep {$_->{id} == $bib_id} @$bib_data;
+
+        if ($field->{deleted} == 1) { # not 't' / 'f'
+           $self->delete_documents($bib_id); 
+        } else {
+            push(@active_ids, $bib_id);
+        }
+    }
+
+    $bib_ids = [@active_ids];
+
+    my $marc = $self->load_marc($bib_ids);
+
+    for my $bib_id (@$bib_ids) {
+
+        my ($record) = grep {$_->{id} == $bib_id} @$bib_data;
+
+        my $body = {
+            marc => $marc->{$bib_id} || [],
+            bib_source => $record->{bib_source},
+        };
+
+        ($body->{create_date} = $record->{create_date}) =~ s/ /T/g;
+        ($body->{edit_date} = $record->{edit_date}) =~ s/ /T/g;
+
+        return 0 unless $self->index_document($bib_id, $body);
+
+        $state->{start_record} = $bib_id + 1;
+        $index_count++;
+    }
+
+    $logger->info("ES indexing completed for records " . 
+        $bib_ids->[0] . '...' . $bib_ids->[-1]);
+
+    return $index_count;
+}
+
+sub load_marc {
+    my ($self, $bib_ids) = @_;
+
+    my $bib_ids_str = join(',', @$bib_ids);
+
+    my $marc_data = $self->get_db_rows(<<SQL);
+SELECT record, tag, subfield, value
+FROM metabib.full_rec
+WHERE record IN ($bib_ids_str)
+SQL
+
+    $logger->info("ES found ".scalar(@$marc_data).
+        " full record rows for current record batch");
+
+    my $marc = {};
+    for my $row (@$marc_data) {
+
+        my $value = $row->{value};
+        next unless defined $value && $value ne '';
+
+        my $subfield = $row->{subfield};
+        my $rec_id = $row->{record};
+        delete $row->{record}; # avoid adding this to the index
+
+        $row->{value} = $value = $self->truncate_value($value);
+
+        $marc->{$rec_id} = [] unless $marc->{$rec_id};
+        delete $row->{subfield} unless defined $subfield;
+
+        # Add values to existing record/tag/subfield rows.
+  
+        my $existing;
+        for my $entry (@{$marc->{$rec_id}}) {
+            next unless $entry->{tag} eq $row->{tag};
+
+            if (defined $subfield) {
+                if (defined $entry->{subfield}) {
+                    if ($subfield eq $entry->{subfield}) {
+                        $existing = $entry;
+                        last;
+                    }
+                }
+            } elsif (!defined $entry->{subfield}) {
+                # Neither has a subfield value / not all tags have subfields
+                $existing = $entry;
+                last;
+            }
+        }
+
+        if ($existing) {
+            
+            $existing->{value} = [$existing->{value}] unless ref $existing->{value};
+            push(@{$existing->{value}}, $value);
+
+        } else {
+
+            push(@{$marc->{$rec_id}}, $row);
+        }
+    }
+
+    return $marc;
+}
+
+
+1;
+
+
index f891a55..9f2d9bb 100644 (file)
@@ -346,13 +346,7 @@ sub populate_bib_index_batch {
             my $fname = $field->{name};
             my $value = $field->{value};
             $fname = "$fclass|$fname" if $fclass;
-
-            # Lucene has a hard limit on the size of an indexable chunk.
-            # Avoid trying to index such data by lazily chopping it off
-            # at 1/4 the limit to accomodate all UTF-8 chars.
-            if (length(Encode::encode('UTF-8', $value)) > 32760) {
-                $value = substr($value, 0, 8190);
-            }
+            $value = $self->truncate_value($value);
 
             if ($body->{$fname}) {
                 if (ref $body->{$fname}) {
index 7dbc25c..88ce891 100644 (file)
@@ -25,12 +25,11 @@ CREATE TABLE elastic.node (
 
 CREATE TABLE elastic.index (
     id            SERIAL  PRIMARY KEY,
-    code          TEXT    NOT NULL DEFAULT 'bib-search',
+    code          TEXT    NOT NULL, -- e.g. 'bib-search'
     cluster       TEXT    NOT NULL 
                   REFERENCES elastic.cluster (code) ON DELETE CASCADE,
     active        BOOLEAN NOT NULL DEFAULT FALSE,
     num_shards    INTEGER NOT NULL DEFAULT 1,
-    CONSTRAINT    valid_index_code CHECK (code IN ('bib-search')),
     CONSTRAINT    index_type_once_per_cluster UNIQUE (code, cluster)
 );
 
@@ -169,7 +168,7 @@ INSERT INTO elastic.node
 VALUES ('Localhost', 'localhost', 'http', 9200, TRUE, 'main');
 
 INSERT INTO elastic.index (code, active, cluster)
-VALUES ('bib-search', TRUE, 'main');
+VALUES ('bib-search', TRUE, 'main'), ('bib-marc', TRUE, 'main');
 
 COMMIT;
 
index 05ebc6b..c24aa38 100755 (executable)
@@ -6,6 +6,7 @@ use OpenSRF::Utils::JSON;
 use OpenILS::Utils::Fieldmapper;
 use OpenILS::Utils::CStoreEditor;
 use OpenILS::Elastic::BibSearch;
+use OpenILS::Elastic::BibMarc;
 
 my $help;
 my $osrf_config = '/openils/conf/opensrf_core.xml';
@@ -128,6 +129,8 @@ my $es;
 
 if ($index_name eq 'bib-search') {
     $es = OpenILS::Elastic::BibSearch->new($cluster);
+} elsif ($index_name eq 'bib-marc') {
+    $es = OpenILS::Elastic::BibMarc->new($cluster);
 }
 
 if (!$es) {