Consolidate bib search index builder
authorBill Erickson <berickxx@gmail.com>
Wed, 2 Oct 2019 18:03:32 +0000 (14:03 -0400)
committerBill Erickson <berickxx@gmail.com>
Mon, 3 Feb 2020 22:13:59 +0000 (17:13 -0500)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm [deleted file]
Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm [deleted file]
Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm [new file with mode: 0644]
Open-ILS/src/support-scripts/elastic-index.pl

diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm
deleted file mode 100644 (file)
index c85811d..0000000
+++ /dev/null
@@ -1,123 +0,0 @@
-package OpenILS::Elastic::Bib;
-# ---------------------------------------------------------------
-# Copyright (C) 2019 King County Library System
-# Author: Bill Erickson <berickxx@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR code.  See the
-# GNU General Public License for more details.
-# ---------------------------------------------------------------
-use strict;
-use warnings;
-use Encode;
-use DateTime;
-use Time::HiRes qw/time/;
-use OpenSRF::Utils::Logger qw/:logger/;
-use OpenSRF::Utils::JSON;
-use OpenILS::Utils::CStoreEditor qw/:funcs/;
-use OpenILS::Utils::DateTime qw/interval_to_seconds/;
-use OpenILS::Elastic;
-use base qw/OpenILS::Elastic/;
-
-# default number of bibs to index per batch.
-my $DEFAULT_BIB_BATCH_SIZE = 500;
-
-sub index {
-    my $self = shift;
-    return $self->{index} if $self->{index};
-    ($self->{index}) = grep {$_->code eq $self->index_name} @{$self->indices};
-
-    $logger->error("No ndex configured named ".$self->index_name) unless $self->{index};
-
-    return $self->{index};
-}
-
-
-# Add data to the bib-search index
-sub populate_index {
-    my ($self, $settings) = @_;
-    $settings ||= {};
-
-    my $index_count = 0;
-    my $total_indexed = 0;
-
-    # extract the database settings.
-    for my $db_key (grep {$_ =~ /^db_/} keys %$settings) {
-        $self->{$db_key} = $settings->{$db_key};
-    }
-
-    my $end_time;
-    my $duration = $settings->{max_duration};
-    if ($duration) {
-        my $seconds = interval_to_seconds($duration);
-        $end_time = DateTime->now;
-        $end_time->add(seconds => $seconds);
-    }
-
-    while (1) {
-
-        $index_count = $self->populate_bib_index_batch($settings);
-        $total_indexed += $index_count;
-
-        $logger->info("ES indexed $total_indexed bib records");
-
-        # exit if we're only indexing a single record or if the 
-        # batch indexer says there are no more records to index.
-        last if !$index_count || $settings->{index_record};
-
-        if ($end_time && DateTime->now > $end_time) {
-            $logger->info(
-                "ES index populate exiting early on max_duration $duration");
-            last;
-        }
-    } 
-
-    $logger->info("ES bib indexing complete with $total_indexed records");
-}
-
-sub get_bib_ids {
-    my ($self, $state) = @_;
-
-    # A specific record is selected for indexing.
-    return [$state->{index_record}] if $state->{index_record};
-
-    my $start_id = $state->{start_record} || 0;
-    my $stop_id = $state->{stop_record};
-    my $modified_since = $state->{modified_since};
-    my $batch_size = $state->{batch_size} || $DEFAULT_BIB_BATCH_SIZE;
-
-    my ($select, $from, $where);
-    if ($modified_since) {
-        $select = "SELECT id";
-        $from   = "FROM elastic.bib_last_mod_date";
-        $where  = "WHERE last_mod_date > '$modified_since'";
-    } else {
-        $select = "SELECT id";
-        $from   = "FROM biblio.record_entry";
-        $where  = "WHERE NOT deleted AND active";
-    }
-
-    $where .= " AND id >= $start_id" if $start_id;
-    $where .= " AND id <= $stop_id" if $stop_id;
-
-    # Ordering by ID is the simplest way to guarantee all requested
-    # records are processed, given that edit dates may not be unique
-    # and that we're using start_id/stop_id instead of OFFSET to
-    # define the batches.
-    my $order = "ORDER BY id";
-
-    my $sql = "$select $from $where $order LIMIT $batch_size";
-
-    my $ids = $self->get_db_rows($sql);
-    return [ map {$_->{id}} @$ids ];
-}
-
-1;
-
-
diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm
deleted file mode 100644 (file)
index 26d882b..0000000
+++ /dev/null
@@ -1,639 +0,0 @@
-package OpenILS::Elastic::Bib::Search;
-# ---------------------------------------------------------------
-# Copyright (C) 2018 King County Library System
-# Author: Bill Erickson <berickxx@gmail.com>
-#
-# This program is free software; you can redistribute it and/or
-# modify it under the terms of the GNU General Public License
-# as published by the Free Software Foundation; either version 2
-# of the License, or (at your option) any later version.
-
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR code.  See the
-# GNU General Public License for more details.
-# ---------------------------------------------------------------
-use strict;
-use warnings;
-use Encode;
-use DateTime;
-use Time::HiRes qw/time/;
-use Clone 'clone';
-use Business::ISBN;
-use Business::ISSN;
-use OpenSRF::Utils::Logger qw/:logger/;
-use OpenSRF::Utils::JSON;
-use OpenILS::Utils::CStoreEditor qw/:funcs/;
-use OpenILS::Utils::DateTime qw/interval_to_seconds/;
-use OpenILS::Elastic::Bib;
-use base qw/OpenILS::Elastic::Bib/;
-
-my $INDEX_NAME = 'bib-search';
-
-my $BASE_INDEX_SETTINGS = {
-    analysis => {
-        analyzer => {
-            folding => {
-                filter => ['lowercase', 'asciifolding'],
-                tokenizer => 'standard'
-            }
-        },
-        normalizer =>  {
-            custom_lowercase => {
-                type => 'custom',
-                filter => ['lowercase']
-            }
-        }
-    }
-};
-
-# Well-known bib-search index properties
-my $BASE_PROPERTIES = {
-    bib_source  => {type => 'integer'},
-    create_date => {type => 'date'},
-    edit_date   => {type => 'date'},
-    metarecord  => {type => 'integer'},
-
-    # Holdings summaries.  For bib-search, we don't need
-    # copy-specific details, only aggregate visibility information.
-    holdings => {
-        type => 'nested',
-        properties => {
-            status => {type => 'integer'},
-            circ_lib => {type => 'integer'},
-            location => {type => 'integer'},
-            circulate => {type => 'boolean'},
-            opac_visible => {type => 'boolean'}
-        }
-    },
-    marc => {
-        type => 'nested',
-        properties => {
-            tag => {
-                type => 'keyword',
-                normalizer => 'custom_lowercase'
-            },
-            subfield => {
-                type => 'keyword',
-                normalizer => 'custom_lowercase'
-            },
-            value => {
-                type => 'text',
-                fields => {
-                    text_folded => {
-                        type => 'text',
-                        analyzer => 'folding'
-                    }
-                }
-            }
-        }
-    },
-
-    # Make it possible to search across all fields in a search group.
-    # Values from grouped fields are copied into the group field.
-    # Here we make some assumptions about the general purpose of
-    # each group.
-    # Note the ignore_above only affects the 'keyword' version of the
-    # field, the assumption being text that large would solely be
-    # searched via 'text' indexes.
-    title => {
-        type => 'keyword',
-        ignore_above => 256,
-        normalizer => 'custom_lowercase',
-        fields => {
-            text => {type => 'text'},
-            text_folded => {type => 'text', analyzer => 'folding'}
-        }
-    },
-    author => {
-        type => 'keyword',
-        ignore_above => 256,
-        normalizer => 'custom_lowercase',
-        fields => {
-            text => {type => 'text'},
-            text_folded => {type => 'text', analyzer => 'folding'}
-        }
-    },
-    subject => {
-        type => 'keyword',
-        ignore_above => 256,
-        normalizer => 'custom_lowercase',
-        fields => {
-            text => {type => 'text'},
-            text_folded => {type => 'text', analyzer => 'folding'}
-        }
-    },
-    series => {
-        type => 'keyword',
-        ignore_above => 256,
-        normalizer => 'custom_lowercase',
-        fields => {
-            text => {type => 'text'},
-            text_folded => {type => 'text', analyzer => 'folding'}
-        }
-    },
-    keyword => {
-        # term (aka "keyword") searches are not used on the 
-        # keyword field, but we index it just the same (sans lowercase) 
-        # for structural consistency with other group fields.
-        type => 'keyword',
-        ignore_above => 256,
-        fields => {
-            text => {type => 'text'},
-            text_folded => {type => 'text', analyzer => 'folding'}
-        }
-    },
-    identifier => {
-        # Avoid full-text indexing on identifier fields.
-        type => 'keyword',
-        ignore_above => 256,
-        normalizer => 'custom_lowercase',
-    },
-
-    # Create some shortcut indexes for streamlining query_string searches.
-    ti => {type => 'text'},
-    au => {type => 'text'},
-    se => {type => 'text'},
-    su => {type => 'text'},
-    kw => {type => 'text'},
-    id => {
-        type => 'keyword',
-        ignore_above => 256
-    }
-};
-
-my %SHORT_GROUP_MAP = (
-    title => 'ti',
-    author => 'au',
-    subject => 'su',
-    series => 'se',
-    keyword => 'kw',
-    identifier => 'id'
-);
-
-sub index_name {
-    return $INDEX_NAME;
-}
-
-# TODO: add index-specific language analyzers to DB config
-sub language_analyzers {
-    return ("english");
-}
-
-sub create_index_properties {
-    my ($self, $custom_properties) = @_;
-
-    if ($custom_properties) {
-        $logger->info("ES generating index mappings from custom file $custom_properties");
-
-        my $json;
-        {
-            local $/=undef;
-
-            if (!open(MAPPING_FILE, $custom_properties)) {
-                $logger->error("ES cannot open mappings file: $!");
-                return undef;
-            }
-
-            $json = <MAPPING_FILE>;
-            close MAPPING_FILE;
-        }
-
-        my $struct = OpenSRF::Utils::JSON->JSON2perl($json);
-        return $struct->{'bib-search'}->{mappings}->{record}->{properties};
-    }
-
-    my $properties = $BASE_PROPERTIES;
-
-    # Add the language analyzers to the MARC mappings
-    for my $lang_analyzer ($self->language_analyzers) {
-        $properties->{marc}->{properties}->{value}->{fields}->{"text_$lang_analyzer"} = {
-            type => 'text',
-            analyzer => $lang_analyzer
-        };
-
-        # Apply language analysis to grouped fields, however skip
-        # the 'author' and 'identifier' groups since it makes less sense to 
-        # language-analyze proper names and identifiers.
-        $properties->{$_}->{fields}->{"text_$lang_analyzer"} = {
-            type => 'text',
-            analyzer => $lang_analyzer
-        } foreach qw/title subject series keyword/;
-    }
-
-    # elastic.bib_field has no primary key field, so retrieve_all won't work.
-    # Note the name value may be repeated across search group depending
-    # on local configuration.
-    my $fields = new_editor()->search_elastic_bib_field({name => {'!=' => undef}});
-
-    for my $field (@$fields) {
-
-        my $field_name = $field->name;
-        my $search_group = $field->search_group;
-        $field_name = "$search_group|$field_name" if $search_group;
-
-        my $def;
-
-        if ($search_group) {
-
-            # Use the same fields and analysis as the 'grouped' field.
-            $def = clone($properties->{$search_group});
-            $def->{copy_to} = [$search_group, $SHORT_GROUP_MAP{$search_group}];
-
-            # Apply ranking boost to each analysis variation.
-            my $flds = $def->{fields};
-            if ($flds && (my $boost = ($field->weight || 1)) > 1) {
-                $flds->{$_}->{boost} = $boost foreach keys %$flds;
-            }
-
-        } else {
-
-            # Non-grouped fields are used for filtering and sorting, so
-            # they don't need as much processing.
-
-            $def = {
-                type => 'keyword',
-                ignore_above => 256,
-                normalizer => 'custom_lowercase'
-            };
-        }
-
-        if ($field->facet_field eq 't' && $def->{fields}) {
-            # Facet fields are used for aggregation which requires
-            # an additional unaltered keyword field.
-            $def->{fields}->{facet} = {
-                type => 'keyword',
-                ignore_above => 256
-            };
-        }
-
-        $logger->debug("ES adding field $field_name: ". 
-            OpenSRF::Utils::JSON->perl2JSON($def));
-
-        $properties->{$field_name} = $def;
-    }
-
-    return $properties;
-}
-
-sub create_index {
-    my ($self, $custom_properties) = @_;
-
-    if ($self->es->indices->exists(index => $INDEX_NAME)) {
-        $logger->warn("ES index '$INDEX_NAME' already exists");
-        return;
-    }
-
-    $logger->info(
-        "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'");
-
-    my $properties = $self->create_index_properties($custom_properties);
-
-    my $settings = $BASE_INDEX_SETTINGS;
-    $settings->{number_of_replicas} = scalar(@{$self->nodes});
-    $settings->{number_of_shards} = $self->index->num_shards;
-
-    my $conf = {
-        index => $INDEX_NAME,
-        body => {settings => $settings}
-    };
-
-    $logger->info("ES creating index '$INDEX_NAME'");
-
-    # Create the base index with settings
-    eval { $self->es->indices->create($conf) };
-
-    if ($@) {
-        $logger->error("ES failed to create index cluster=".  
-            $self->cluster. "index=$INDEX_NAME error=$@");
-        warn "$@\n\n";
-        return 0;
-    }
-
-    # Create each mapping one at a time instead of en masse so we 
-    # can more easily report when mapping creation fails.
-
-    for my $field (keys %$properties) {
-        $logger->info("ES Creating index mapping for field $field");
-
-        eval { 
-            $self->es->indices->put_mapping({
-                index => $INDEX_NAME,
-                type  => 'record',
-                body  => {dynamic => 'false', properties => {$field => $properties->{$field}}}
-            });
-        };
-
-        if ($@) {
-            my $mapjson = OpenSRF::Utils::JSON->perl2JSON($properties->{$field});
-
-            $logger->error("ES failed to create index mapping: " .
-                "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson");
-
-            warn "$@\n\n";
-            return 0;
-        }
-    }
-
-    return 1;
-}
-
-sub get_bib_data {
-    my ($self, $record_ids) = @_;
-
-    my $ids_str = join(',', @$record_ids);
-
-    my $sql = <<SQL;
-SELECT DISTINCT ON (bre.id, search_group, name, value)
-    bre.id, 
-    bre.create_date, 
-    bre.edit_date, 
-    bre.source AS bib_source,
-    bre.deleted,
-    mmrsm.metarecord,
-    (elastic.bib_record_properties(bre.id)).*
-FROM biblio.record_entry bre
-LEFT JOIN metabib.metarecord_source_map mmrsm ON (mmrsm.source = bre.id)
-WHERE bre.id IN ($ids_str)
-SQL
-
-    return $self->get_db_rows($sql);
-}
-
-sub populate_bib_index_batch {
-    my ($self, $state) = @_;
-
-    my $index_count = 0;
-
-    my $bib_ids = $self->get_bib_ids($state);
-    return 0 unless @$bib_ids;
-
-    $logger->info("ES indexing ".scalar(@$bib_ids)." records");
-
-    my $bib_data = $self->get_bib_data($bib_ids);
-
-    # Remove records that are marked deleted.
-    # This should only happen when running in refresh mode.
-
-    my @active_ids;
-    for my $bib_id (@$bib_ids) {
-
-        # Every row in the result data contains the 'deleted' value.
-        my ($field) = grep {$_->{id} == $bib_id} @$bib_data;
-
-        if ($field->{deleted} == 1) { # not 't' / 'f'
-           $self->delete_documents($bib_id); 
-        } else {
-            push(@active_ids, $bib_id);
-        }
-    }
-
-    $bib_ids = [@active_ids];
-
-    my $holdings = $self->load_holdings($bib_ids);
-    my $marc = $self->load_marc($bib_ids);
-    my $def = $self->get_index_def;
-
-    # Ask ES what the index properties are so we can avoid passing data
-    # that will not be indexed, since ES will store the data on the source
-    # object even if it's not indexed.  This reduces bulk.
-    my $properties =
-        $def->{$self->index_name}->{mappings}->{record}->{properties};
-
-    for my $bib_id (@$bib_ids) {
-
-        my $body = {
-            marc => $marc->{$bib_id} || [],
-            holdings => $holdings->{$bib_id} || []
-        };
-
-        # there are multiple rows per bib in the data list.
-        my @fields = grep {$_->{id} == $bib_id} @$bib_data;
-
-        my $first = 1;
-        for my $field (@fields) {
-        
-            if ($first) {
-                $first = 0;
-                # some values are repeated per field. 
-                # extract them from the first entry.
-                $body->{bib_source} = $field->{bib_source};
-                $body->{metarecord} = $field->{metarecord};
-
-                # ES ikes the "T" separator for ISO dates
-                ($body->{create_date} = $field->{create_date}) =~ s/ /T/g;
-                ($body->{edit_date} = $field->{edit_date}) =~ s/ /T/g;
-            }
-
-            my $fclass = $field->{search_group};
-            my $fname = $field->{name};
-            my $value = $field->{value};
-            $fname = "$fclass|$fname" if $fclass;
-            $value = $self->truncate_value($value);
-
-            if ($fname eq 'identifier|isbn') {
-                index_isbns($body, $value);
-            } elsif ($fname eq 'identifier|issn') {
-                index_issns($body, $value);
-            } else {
-                append_field_value($body, $fname, $value, $properties);
-            }
-        }
-
-        return 0 unless $self->index_document($bib_id, $body);
-
-        $state->{start_record} = $bib_id + 1;
-        $index_count++;
-    }
-
-    $logger->info("ES indexing completed for records " . 
-        $bib_ids->[0] . '...' . $bib_ids->[-1]);
-
-    return $index_count;
-}
-
-
-# Indexes ISBN10, ISBN13, and formatted values of both (with hyphens)
-sub index_isbns {
-    my ($body, $value) = @_;
-    return unless $value;
-    
-    my %seen; # deduplicate values
-
-    # Chop up the collected raw values into parts and let
-    # Business::* tell us which parts looks like ISBNs.
-    for my $token (split(/ /, $value)) {
-        my $isbn = Business::ISBN->new($token);
-        if ($isbn && $isbn->is_valid) {
-            if ($isbn->as_isbn10) {
-                $seen{$isbn->as_isbn10->isbn} = 1;
-                $seen{$isbn->as_isbn10->as_string} = 1;
-            }
-            if ($isbn->as_isbn13) {
-                $seen{$isbn->as_isbn13->isbn} = 1;
-                $seen{$isbn->as_isbn13->as_string} = 1;
-            }
-        }
-    }
-
-    append_field_value($body, 'identifier|isbn', $_) foreach keys %seen;
-}
-
-# Indexes ISSN values with and wihtout hyphen formatting.
-sub index_issns {
-    my ($body, $value) = @_;
-    return unless $value;
-
-    my %seen; # deduplicate values
-    
-    # Chop up the collected raw values into parts and let
-    # Business::* tell us which parts looks valid.
-    for my $token (split(/ /, $value)) {
-        my $issn = Business::ISSN->new($token);
-        if ($issn && $issn->is_valid) {
-            # no option in business::issn to get the unformatted value.
-            (my $unformatted = $issn->as_string) =~ s/-//g;
-            $seen{$unformatted} = 1;
-            $seen{$issn->as_string} = 1;
-        }
-    }
-
-    append_field_value($body, 'identifier|issn', $_) foreach keys %seen;
-}
-
-sub append_field_value {
-    my ($body, $fname, $value, $properties) = @_;
-
-    # Confirm the data is wanted in the index before passing to ES to
-    # reduce the overall data footprint.
-    return unless $properties->{$fname};
-
-    if ($body->{$fname}) {
-        if (ref $body->{$fname}) {
-            # Three or more values encountered for field.
-            # Add to the list.
-            push(@{$body->{$fname}}, $value);
-        } else {
-            # Second value encountered for field.
-            # Upgrade to array storage.
-            $body->{$fname} = [$body->{$fname}, $value];
-        }
-    } else {
-        # First value encountered for field.
-        # Assume for now there will only be one value.
-        $body->{$fname} = $value
-    }
-}
-
-# Load holdings summary blobs for requested bibs
-sub load_holdings {
-    my ($self, $bib_ids) = @_;
-
-    my $bib_ids_str = join(',', @$bib_ids);
-
-    my $copy_data = $self->get_db_rows(<<SQL);
-SELECT 
-    COUNT(*) AS count,
-    acn.record, 
-    acp.status AS status, 
-    acp.circ_lib AS circ_lib, 
-    acp.location AS location,
-    acp.circulate AS circulate,
-    acp.opac_visible AS opac_visible
-FROM asset.copy acp
-JOIN asset.call_number acn ON acp.call_number = acn.id
-WHERE 
-    NOT acp.deleted AND
-    NOT acn.deleted AND
-    acn.record IN ($bib_ids_str)
-GROUP BY 2, 3, 4, 5, 6, 7
-SQL
-
-    $logger->info("ES found ".scalar(@$copy_data).
-        " holdings summaries for current record batch");
-
-    my $holdings = {};
-    for my $copy (@$copy_data) {
-
-        $holdings->{$copy->{record}} = [] 
-            unless $holdings->{$copy->{record}};
-
-        push(@{$holdings->{$copy->{record}}}, {
-            count => $copy->{count},
-            status => $copy->{status},
-            circ_lib => $copy->{circ_lib},
-            location => $copy->{location},
-            circulate => $copy->{circulate} ? 'true' : 'false',
-            opac_visbile => $copy->{opac_visible} ? 'true' : 'false'
-        });
-    }
-
-    return $holdings;
-}
-
-sub load_marc {
-    my ($self, $bib_ids) = @_;
-
-    my $bib_ids_str = join(',', @$bib_ids);
-
-    my $marc_data = $self->get_db_rows(<<SQL);
-SELECT record, tag, subfield, value
-FROM metabib.real_full_rec
-WHERE record IN ($bib_ids_str)
-SQL
-
-    $logger->info("ES found ".scalar(@$marc_data).
-        " MARC rows for current record batch");
-
-    my $marc = {};
-    for my $row (@$marc_data) {
-
-        my $value = $row->{value};
-        next unless defined $value && $value ne '';
-
-        my $subfield = $row->{subfield};
-        my $rec_id = $row->{record};
-        delete $row->{record}; # avoid adding this to the index
-
-        $row->{value} = $value = $self->truncate_value($value);
-
-        $marc->{$rec_id} = [] unless $marc->{$rec_id};
-        delete $row->{subfield} unless defined $subfield;
-
-        # Add values to existing record/tag/subfield rows.
-  
-        my $existing;
-        for my $entry (@{$marc->{$rec_id}}) {
-            next unless $entry->{tag} eq $row->{tag};
-
-            if (defined $subfield) {
-                if (defined $entry->{subfield}) {
-                    if ($subfield eq $entry->{subfield}) {
-                        $existing = $entry;
-                        last;
-                    }
-                }
-            } elsif (!defined $entry->{subfield}) {
-                # Neither has a subfield value / not all tags have subfields
-                $existing = $entry;
-                last;
-            }
-        }
-
-        if ($existing) {
-            
-            $existing->{value} = [$existing->{value}] unless ref $existing->{value};
-            push(@{$existing->{value}}, $value);
-
-        } else {
-
-            push(@{$marc->{$rec_id}}, $row);
-        }
-    }
-
-    return $marc;
-}
-
-1;
-
-
diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm
new file mode 100644 (file)
index 0000000..d43f4df
--- /dev/null
@@ -0,0 +1,731 @@
+package OpenILS::Elastic::BibSearch;
+# ---------------------------------------------------------------
+# Copyright (C) 2019 King County Library System
+# Author: Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR code.  See the
+# GNU General Public License for more details.
+# ---------------------------------------------------------------
+use strict;
+use warnings;
+use Encode;
+use DateTime;
+use Time::HiRes qw/time/;
+use OpenSRF::Utils::Logger qw/:logger/;
+use OpenSRF::Utils::JSON;
+use OpenILS::Utils::CStoreEditor qw/:funcs/;
+use OpenILS::Utils::DateTime qw/interval_to_seconds/;
+use OpenILS::Elastic;
+use base qw/OpenILS::Elastic/;
+
+# default number of bibs to index per batch.
+my $DEFAULT_BIB_BATCH_SIZE = 500;
+
+my $INDEX_NAME = 'bib-search';
+
+my $BASE_INDEX_SETTINGS = {
+    analysis => {
+        analyzer => {
+            folding => {
+                filter => ['lowercase', 'asciifolding'],
+                tokenizer => 'standard'
+            }
+        },
+        normalizer =>  {
+            custom_lowercase => {
+                type => 'custom',
+                filter => ['lowercase']
+            }
+        }
+    }
+};
+
+# Well-known bib-search index properties
+my $BASE_PROPERTIES = {
+    bib_source  => {type => 'integer'},
+    create_date => {type => 'date'},
+    edit_date   => {type => 'date'},
+    metarecord  => {type => 'integer'},
+
+    # Holdings summaries.  For bib-search, we don't need
+    # copy-specific details, only aggregate visibility information.
+    holdings => {
+        type => 'nested',
+        properties => {
+            status => {type => 'integer'},
+            circ_lib => {type => 'integer'},
+            location => {type => 'integer'},
+            circulate => {type => 'boolean'},
+            opac_visible => {type => 'boolean'}
+        }
+    },
+    marc => {
+        type => 'nested',
+        properties => {
+            tag => {
+                type => 'keyword',
+                normalizer => 'custom_lowercase'
+            },
+            subfield => {
+                type => 'keyword',
+                normalizer => 'custom_lowercase'
+            },
+            value => {
+                type => 'text',
+                fields => {
+                    text_folded => {
+                        type => 'text',
+                        analyzer => 'folding'
+                    }
+                }
+            }
+        }
+    },
+
+    # Make it possible to search across all fields in a search group.
+    # Values from grouped fields are copied into the group field.
+    # Here we make some assumptions about the general purpose of
+    # each group.
+    # Note the ignore_above only affects the 'keyword' version of the
+    # field, the assumption being text that large would solely be
+    # searched via 'text' indexes.
+    title => {
+        type => 'keyword',
+        ignore_above => 256,
+        normalizer => 'custom_lowercase',
+        fields => {
+            text => {type => 'text'},
+            text_folded => {type => 'text', analyzer => 'folding'}
+        }
+    },
+    author => {
+        type => 'keyword',
+        ignore_above => 256,
+        normalizer => 'custom_lowercase',
+        fields => {
+            text => {type => 'text'},
+            text_folded => {type => 'text', analyzer => 'folding'}
+        }
+    },
+    subject => {
+        type => 'keyword',
+        ignore_above => 256,
+        normalizer => 'custom_lowercase',
+        fields => {
+            text => {type => 'text'},
+            text_folded => {type => 'text', analyzer => 'folding'}
+        }
+    },
+    series => {
+        type => 'keyword',
+        ignore_above => 256,
+        normalizer => 'custom_lowercase',
+        fields => {
+            text => {type => 'text'},
+            text_folded => {type => 'text', analyzer => 'folding'}
+        }
+    },
+    keyword => {
+        # term (aka "keyword") searches are not used on the 
+        # keyword field, but we index it just the same (sans lowercase) 
+        # for structural consistency with other group fields.
+        type => 'keyword',
+        ignore_above => 256,
+        fields => {
+            text => {type => 'text'},
+            text_folded => {type => 'text', analyzer => 'folding'}
+        }
+    },
+    identifier => {
+        # Avoid full-text indexing on identifier fields.
+        type => 'keyword',
+        ignore_above => 256,
+        normalizer => 'custom_lowercase',
+    },
+
+    # Create some shortcut indexes for streamlining query_string searches.
+    ti => {type => 'text'},
+    au => {type => 'text'},
+    se => {type => 'text'},
+    su => {type => 'text'},
+    kw => {type => 'text'},
+    id => {
+        type => 'keyword',
+        ignore_above => 256
+    }
+};
+
+my %SHORT_GROUP_MAP = (
+    title => 'ti',
+    author => 'au',
+    subject => 'su',
+    series => 'se',
+    keyword => 'kw',
+    identifier => 'id'
+);
+
+sub index_name {
+    return $INDEX_NAME;
+}
+
+# TODO: add index-specific language analyzers to DB config
+sub language_analyzers {
+    return ("english");
+}
+
+sub create_index_properties {
+    my ($self, $custom_properties) = @_;
+
+    if ($custom_properties) {
+        $logger->info("ES generating index mappings from custom file $custom_properties");
+
+        my $json;
+        {
+            local $/=undef;
+
+            if (!open(MAPPING_FILE, $custom_properties)) {
+                $logger->error("ES cannot open mappings file: $!");
+                return undef;
+            }
+
+            $json = <MAPPING_FILE>;
+            close MAPPING_FILE;
+        }
+
+        my $struct = OpenSRF::Utils::JSON->JSON2perl($json);
+        return $struct->{'bib-search'}->{mappings}->{record}->{properties};
+    }
+
+    my $properties = $BASE_PROPERTIES;
+
+    # Add the language analyzers to the MARC mappings
+    for my $lang_analyzer ($self->language_analyzers) {
+        $properties->{marc}->{properties}->{value}->{fields}->{"text_$lang_analyzer"} = {
+            type => 'text',
+            analyzer => $lang_analyzer
+        };
+
+        # Apply language analysis to grouped fields, however skip
+        # the 'author' and 'identifier' groups since it makes less sense to 
+        # language-analyze proper names and identifiers.
+        $properties->{$_}->{fields}->{"text_$lang_analyzer"} = {
+            type => 'text',
+            analyzer => $lang_analyzer
+        } foreach qw/title subject series keyword/;
+    }
+
+    # elastic.bib_field has no primary key field, so retrieve_all won't work.
+    # Note the name value may be repeated across search group depending
+    # on local configuration.
+    my $fields = new_editor()->search_elastic_bib_field({name => {'!=' => undef}});
+
+    for my $field (@$fields) {
+
+        my $field_name = $field->name;
+        my $search_group = $field->search_group;
+        $field_name = "$search_group|$field_name" if $search_group;
+
+        my $def;
+
+        if ($search_group) {
+
+            # Use the same fields and analysis as the 'grouped' field.
+            $def = clone($properties->{$search_group});
+            $def->{copy_to} = [$search_group, $SHORT_GROUP_MAP{$search_group}];
+
+            # Apply ranking boost to each analysis variation.
+            my $flds = $def->{fields};
+            if ($flds && (my $boost = ($field->weight || 1)) > 1) {
+                $flds->{$_}->{boost} = $boost foreach keys %$flds;
+            }
+
+        } else {
+
+            # Non-grouped fields are used for filtering and sorting, so
+            # they don't need as much processing.
+
+            $def = {
+                type => 'keyword',
+                ignore_above => 256,
+                normalizer => 'custom_lowercase'
+            };
+        }
+
+        if ($field->facet_field eq 't' && $def->{fields}) {
+            # Facet fields are used for aggregation which requires
+            # an additional unaltered keyword field.
+            $def->{fields}->{facet} = {
+                type => 'keyword',
+                ignore_above => 256
+            };
+        }
+
+        $logger->debug("ES adding field $field_name: ". 
+            OpenSRF::Utils::JSON->perl2JSON($def));
+
+        $properties->{$field_name} = $def;
+    }
+
+    return $properties;
+}
+
+sub create_index {
+    my ($self, $custom_properties) = @_;
+
+    if ($self->es->indices->exists(index => $INDEX_NAME)) {
+        $logger->warn("ES index '$INDEX_NAME' already exists");
+        return;
+    }
+
+    $logger->info(
+        "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'");
+
+    my $properties = $self->create_index_properties($custom_properties);
+
+    my $settings = $BASE_INDEX_SETTINGS;
+    $settings->{number_of_replicas} = scalar(@{$self->nodes});
+    $settings->{number_of_shards} = $self->index->num_shards;
+
+    my $conf = {
+        index => $INDEX_NAME,
+        body => {settings => $settings}
+    };
+
+    $logger->info("ES creating index '$INDEX_NAME'");
+
+    # Create the base index with settings
+    eval { $self->es->indices->create($conf) };
+
+    if ($@) {
+        $logger->error("ES failed to create index cluster=".  
+            $self->cluster. "index=$INDEX_NAME error=$@");
+        warn "$@\n\n";
+        return 0;
+    }
+
+    # Create each mapping one at a time instead of en masse so we 
+    # can more easily report when mapping creation fails.
+
+    for my $field (keys %$properties) {
+        $logger->info("ES Creating index mapping for field $field");
+
+        eval { 
+            $self->es->indices->put_mapping({
+                index => $INDEX_NAME,
+                type  => 'record',
+                body  => {dynamic => 'false', properties => {$field => $properties->{$field}}}
+            });
+        };
+
+        if ($@) {
+            my $mapjson = OpenSRF::Utils::JSON->perl2JSON($properties->{$field});
+
+            $logger->error("ES failed to create index mapping: " .
+                "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson");
+
+            warn "$@\n\n";
+            return 0;
+        }
+    }
+
+    return 1;
+}
+
+sub get_bib_data {
+    my ($self, $record_ids) = @_;
+
+    my $ids_str = join(',', @$record_ids);
+
+    my $sql = <<SQL;
+SELECT DISTINCT ON (bre.id, search_group, name, value)
+    bre.id, 
+    bre.create_date, 
+    bre.edit_date, 
+    bre.source AS bib_source,
+    bre.deleted,
+    mmrsm.metarecord,
+    (elastic.bib_record_properties(bre.id)).*
+FROM biblio.record_entry bre
+LEFT JOIN metabib.metarecord_source_map mmrsm ON (mmrsm.source = bre.id)
+WHERE bre.id IN ($ids_str)
+SQL
+
+    return $self->get_db_rows($sql);
+}
+
+sub populate_bib_index_batch {
+    my ($self, $state) = @_;
+
+    my $index_count = 0;
+
+    my $bib_ids = $self->get_bib_ids($state);
+    return 0 unless @$bib_ids;
+
+    $logger->info("ES indexing ".scalar(@$bib_ids)." records");
+
+    my $bib_data = $self->get_bib_data($bib_ids);
+
+    # Remove records that are marked deleted.
+    # This should only happen when running in refresh mode.
+
+    my @active_ids;
+    for my $bib_id (@$bib_ids) {
+
+        # Every row in the result data contains the 'deleted' value.
+        my ($field) = grep {$_->{id} == $bib_id} @$bib_data;
+
+        if ($field->{deleted} == 1) { # not 't' / 'f'
+           $self->delete_documents($bib_id); 
+        } else {
+            push(@active_ids, $bib_id);
+        }
+    }
+
+    $bib_ids = [@active_ids];
+
+    my $holdings = $self->load_holdings($bib_ids);
+    my $marc = $self->load_marc($bib_ids);
+    my $def = $self->get_index_def;
+
+    # Ask ES what the index properties are so we can avoid passing data
+    # that will not be indexed, since ES will store the data on the source
+    # object even if it's not indexed.  This reduces bulk.
+    my $properties =
+        $def->{$self->index_name}->{mappings}->{record}->{properties};
+
+    for my $bib_id (@$bib_ids) {
+
+        my $body = {
+            marc => $marc->{$bib_id} || [],
+            holdings => $holdings->{$bib_id} || []
+        };
+
+        # there are multiple rows per bib in the data list.
+        my @fields = grep {$_->{id} == $bib_id} @$bib_data;
+
+        my $first = 1;
+        for my $field (@fields) {
+        
+            if ($first) {
+                $first = 0;
+                # some values are repeated per field. 
+                # extract them from the first entry.
+                $body->{bib_source} = $field->{bib_source};
+                $body->{metarecord} = $field->{metarecord};
+
+                # ES ikes the "T" separator for ISO dates
+                ($body->{create_date} = $field->{create_date}) =~ s/ /T/g;
+                ($body->{edit_date} = $field->{edit_date}) =~ s/ /T/g;
+            }
+
+            my $fclass = $field->{search_group};
+            my $fname = $field->{name};
+            my $value = $field->{value};
+            $fname = "$fclass|$fname" if $fclass;
+            $value = $self->truncate_value($value);
+
+            if ($fname eq 'identifier|isbn') {
+                index_isbns($body, $value);
+            } elsif ($fname eq 'identifier|issn') {
+                index_issns($body, $value);
+            } else {
+                append_field_value($body, $fname, $value, $properties);
+            }
+        }
+
+        return 0 unless $self->index_document($bib_id, $body);
+
+        $state->{start_record} = $bib_id + 1;
+        $index_count++;
+    }
+
+    $logger->info("ES indexing completed for records " . 
+        $bib_ids->[0] . '...' . $bib_ids->[-1]);
+
+    return $index_count;
+}
+
+
+# Indexes ISBN10, ISBN13, and formatted values of both (with hyphens)
+sub index_isbns {
+    my ($body, $value) = @_;
+    return unless $value;
+    
+    my %seen; # deduplicate values
+
+    # Chop up the collected raw values into parts and let
+    # Business::* tell us which parts looks like ISBNs.
+    for my $token (split(/ /, $value)) {
+        my $isbn = Business::ISBN->new($token);
+        if ($isbn && $isbn->is_valid) {
+            if ($isbn->as_isbn10) {
+                $seen{$isbn->as_isbn10->isbn} = 1;
+                $seen{$isbn->as_isbn10->as_string} = 1;
+            }
+            if ($isbn->as_isbn13) {
+                $seen{$isbn->as_isbn13->isbn} = 1;
+                $seen{$isbn->as_isbn13->as_string} = 1;
+            }
+        }
+    }
+
+    append_field_value($body, 'identifier|isbn', $_) foreach keys %seen;
+}
+
+# Indexes ISSN values with and wihtout hyphen formatting.
+sub index_issns {
+    my ($body, $value) = @_;
+    return unless $value;
+
+    my %seen; # deduplicate values
+    
+    # Chop up the collected raw values into parts and let
+    # Business::* tell us which parts looks valid.
+    for my $token (split(/ /, $value)) {
+        my $issn = Business::ISSN->new($token);
+        if ($issn && $issn->is_valid) {
+            # no option in business::issn to get the unformatted value.
+            (my $unformatted = $issn->as_string) =~ s/-//g;
+            $seen{$unformatted} = 1;
+            $seen{$issn->as_string} = 1;
+        }
+    }
+
+    append_field_value($body, 'identifier|issn', $_) foreach keys %seen;
+}
+
+sub append_field_value {
+    my ($body, $fname, $value, $properties) = @_;
+
+    # Confirm the data is wanted in the index before passing to ES to
+    # reduce the overall data footprint.
+    return unless $properties->{$fname};
+
+    if ($body->{$fname}) {
+        if (ref $body->{$fname}) {
+            # Three or more values encountered for field.
+            # Add to the list.
+            push(@{$body->{$fname}}, $value);
+        } else {
+            # Second value encountered for field.
+            # Upgrade to array storage.
+            $body->{$fname} = [$body->{$fname}, $value];
+        }
+    } else {
+        # First value encountered for field.
+        # Assume for now there will only be one value.
+        $body->{$fname} = $value
+    }
+}
+
+# Load holdings summary blobs for requested bibs
+sub load_holdings {
+    my ($self, $bib_ids) = @_;
+
+    my $bib_ids_str = join(',', @$bib_ids);
+
+    my $copy_data = $self->get_db_rows(<<SQL);
+SELECT 
+    COUNT(*) AS count,
+    acn.record, 
+    acp.status AS status, 
+    acp.circ_lib AS circ_lib, 
+    acp.location AS location,
+    acp.circulate AS circulate,
+    acp.opac_visible AS opac_visible
+FROM asset.copy acp
+JOIN asset.call_number acn ON acp.call_number = acn.id
+WHERE 
+    NOT acp.deleted AND
+    NOT acn.deleted AND
+    acn.record IN ($bib_ids_str)
+GROUP BY 2, 3, 4, 5, 6, 7
+SQL
+
+    $logger->info("ES found ".scalar(@$copy_data).
+        " holdings summaries for current record batch");
+
+    my $holdings = {};
+    for my $copy (@$copy_data) {
+
+        $holdings->{$copy->{record}} = [] 
+            unless $holdings->{$copy->{record}};
+
+        push(@{$holdings->{$copy->{record}}}, {
+            count => $copy->{count},
+            status => $copy->{status},
+            circ_lib => $copy->{circ_lib},
+            location => $copy->{location},
+            circulate => $copy->{circulate} ? 'true' : 'false',
+            opac_visbile => $copy->{opac_visible} ? 'true' : 'false'
+        });
+    }
+
+    return $holdings;
+}
+
+sub load_marc {
+    my ($self, $bib_ids) = @_;
+
+    my $bib_ids_str = join(',', @$bib_ids);
+
+    my $marc_data = $self->get_db_rows(<<SQL);
+SELECT record, tag, subfield, value
+FROM metabib.real_full_rec
+WHERE record IN ($bib_ids_str)
+SQL
+
+    $logger->info("ES found ".scalar(@$marc_data).
+        " MARC rows for current record batch");
+
+    my $marc = {};
+    for my $row (@$marc_data) {
+
+        my $value = $row->{value};
+        next unless defined $value && $value ne '';
+
+        my $subfield = $row->{subfield};
+        my $rec_id = $row->{record};
+        delete $row->{record}; # avoid adding this to the index
+
+        $row->{value} = $value = $self->truncate_value($value);
+
+        $marc->{$rec_id} = [] unless $marc->{$rec_id};
+        delete $row->{subfield} unless defined $subfield;
+
+        # Add values to existing record/tag/subfield rows.
+  
+        my $existing;
+        for my $entry (@{$marc->{$rec_id}}) {
+            next unless $entry->{tag} eq $row->{tag};
+
+            if (defined $subfield) {
+                if (defined $entry->{subfield}) {
+                    if ($subfield eq $entry->{subfield}) {
+                        $existing = $entry;
+                        last;
+                    }
+                }
+            } elsif (!defined $entry->{subfield}) {
+                # Neither has a subfield value / not all tags have subfields
+                $existing = $entry;
+                last;
+            }
+        }
+
+        if ($existing) {
+            
+            $existing->{value} = [$existing->{value}] unless ref $existing->{value};
+            push(@{$existing->{value}}, $value);
+
+        } else {
+
+            push(@{$marc->{$rec_id}}, $row);
+        }
+    }
+
+    return $marc;
+}
+
+
+
+sub index {
+    my $self = shift;
+    return $self->{index} if $self->{index};
+    ($self->{index}) = grep {$_->code eq $self->index_name} @{$self->indices};
+
+    $logger->error("No ndex configured named ".$self->index_name) unless $self->{index};
+
+    return $self->{index};
+}
+
+
+# Add data to the bib-search index
+sub populate_index {
+    my ($self, $settings) = @_;
+    $settings ||= {};
+
+    my $index_count = 0;
+    my $total_indexed = 0;
+
+    # extract the database settings.
+    for my $db_key (grep {$_ =~ /^db_/} keys %$settings) {
+        $self->{$db_key} = $settings->{$db_key};
+    }
+
+    my $end_time;
+    my $duration = $settings->{max_duration};
+    if ($duration) {
+        my $seconds = interval_to_seconds($duration);
+        $end_time = DateTime->now;
+        $end_time->add(seconds => $seconds);
+    }
+
+    while (1) {
+
+        $index_count = $self->populate_bib_index_batch($settings);
+        $total_indexed += $index_count;
+
+        $logger->info("ES indexed $total_indexed bib records");
+
+        # exit if we're only indexing a single record or if the 
+        # batch indexer says there are no more records to index.
+        last if !$index_count || $settings->{index_record};
+
+        if ($end_time && DateTime->now > $end_time) {
+            $logger->info(
+                "ES index populate exiting early on max_duration $duration");
+            last;
+        }
+    } 
+
+    $logger->info("ES bib indexing complete with $total_indexed records");
+}
+
+sub get_bib_ids {
+    my ($self, $state) = @_;
+
+    # A specific record is selected for indexing.
+    return [$state->{index_record}] if $state->{index_record};
+
+    my $start_id = $state->{start_record} || 0;
+    my $stop_id = $state->{stop_record};
+    my $modified_since = $state->{modified_since};
+    my $batch_size = $state->{batch_size} || $DEFAULT_BIB_BATCH_SIZE;
+
+    my ($select, $from, $where);
+    if ($modified_since) {
+        $select = "SELECT id";
+        $from   = "FROM elastic.bib_last_mod_date";
+        $where  = "WHERE last_mod_date > '$modified_since'";
+    } else {
+        $select = "SELECT id";
+        $from   = "FROM biblio.record_entry";
+        $where  = "WHERE NOT deleted AND active";
+    }
+
+    $where .= " AND id >= $start_id" if $start_id;
+    $where .= " AND id <= $stop_id" if $stop_id;
+
+    # Ordering by ID is the simplest way to guarantee all requested
+    # records are processed, given that edit dates may not be unique
+    # and that we're using start_id/stop_id instead of OFFSET to
+    # define the batches.
+    my $order = "ORDER BY id";
+
+    my $sql = "$select $from $where $order LIMIT $batch_size";
+
+    my $ids = $self->get_db_rows($sql);
+    return [ map {$_->{id}} @$ids ];
+}
+
+1;
+
+
index 754a84f..4eb4308 100755 (executable)
@@ -5,7 +5,7 @@ use Getopt::Long;
 use OpenSRF::Utils::JSON;
 use OpenILS::Utils::Fieldmapper;
 use OpenILS::Utils::CStoreEditor;
-use OpenILS::Elastic::Bib::Search;
+use OpenILS::Elastic::BibSearch;
 
 my $help;
 my $osrf_config = '/openils/conf/opensrf_core.xml';
@@ -146,7 +146,7 @@ OpenILS::Utils::CStoreEditor::init();
 my $es;
 
 if ($index_name eq 'bib-search') {
-    $es = OpenILS::Elastic::Bib::Search->new($cluster);
+    $es = OpenILS::Elastic::BibSearch->new($cluster);
 }
 
 if (!$es) {