From: Bill Erickson Date: Fri, 30 Aug 2019 21:08:31 +0000 (-0400) Subject: Consolidate some indexing code X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=a6782567c679461e97869e21d7baaeb48e181156;p=working%2FEvergreen.git Consolidate some indexing code Signed-off-by: Bill Erickson --- diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/ElasticMapper.pm b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/ElasticMapper.pm index d737e2ea8b..c218b868b0 100644 --- a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/ElasticMapper.pm +++ b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/ElasticMapper.pm @@ -20,8 +20,8 @@ use OpenSRF::Utils::Logger qw/:logger/; use OpenILS::Utils::Fieldmapper; use OpenSRF::Utils::SettingsClient; use OpenILS::Utils::CStoreEditor q/:funcs/; -use OpenILS::Elastic::BibSearch; -use OpenILS::Elastic::BibMarc; +use OpenILS::Elastic::Bib::Search; +use OpenILS::Elastic::Bib::Marc; use List::Util qw/min/; use Digest::MD5 qw(md5_hex); @@ -112,7 +112,7 @@ sub bib_search { my ($elastic_query, $cache_key) = compile_elastic_query($query, $staff, $offset, $limit); - my $es = OpenILS::Elastic::BibSearch->new('main'); + my $es = OpenILS::Elastic::Bib::Search->new('main'); $es->connect; my $results = $es->search($elastic_query); @@ -629,7 +629,7 @@ sub marc_search { my $elastic_query = compile_elastic_marc_query($args, $staff, $offset, $limit); - my $es = OpenILS::Elastic::BibMarc->new('main'); + my $es = OpenILS::Elastic::Bib::Marc->new('main'); $es->connect; my $results = $es->search($elastic_query); diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm new file mode 100644 index 0000000000..7d05749903 --- /dev/null +++ b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib.pm @@ -0,0 +1,122 @@ +package OpenILS::Elastic::Bib; +# --------------------------------------------------------------- +# Copyright (C) 2019 King County Library System +# Author: Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR code. See the +# GNU General Public License for more details. +# --------------------------------------------------------------- +use strict; +use warnings; +use Encode; +use DateTime; +use Time::HiRes qw/time/; +use OpenSRF::Utils::Logger qw/:logger/; +use OpenSRF::Utils::JSON; +use OpenILS::Utils::CStoreEditor qw/:funcs/; +use OpenILS::Utils::DateTime qw/interval_to_seconds/; +use OpenILS::Elastic; +use base qw/OpenILS::Elastic/; + +# number of bibs to index per batch. +my $BIB_BATCH_SIZE = 500; + +sub index { + my $self = shift; + return $self->{index} if $self->{index}; + ($self->{index}) = grep {$_->code eq $self->index_name} @{$self->indices}; + + $logger->error("No ndex configured named ".$self->index_name) unless $self->{index}; + + return $self->{index}; +} + + +# Add data to the bib-search index +sub populate_index { + my ($self, $settings) = @_; + $settings ||= {}; + + my $index_count = 0; + my $total_indexed = 0; + + # extract the database settings. + for my $db_key (grep {$_ =~ /^db_/} keys %$settings) { + $self->{$db_key} = $settings->{$db_key}; + } + + my $end_time; + my $duration = $settings->{max_duration}; + if ($duration) { + my $seconds = interval_to_seconds($duration); + $end_time = DateTime->now; + $end_time->add(seconds => $seconds); + } + + while (1) { + + $index_count = $self->populate_bib_index_batch($settings); + $total_indexed += $index_count; + + $logger->info("ES indexed $total_indexed bib records"); + + # exit if we're only indexing a single record or if the + # batch indexer says there are no more records to index. + last if !$index_count || $settings->{index_record}; + + if ($end_time && DateTime->now > $end_time) { + $logger->info( + "ES index populate exiting early on max_duration $duration"); + last; + } + } + + $logger->info("ES bib indexing complete with $total_indexed records"); +} + +sub get_bib_ids { + my ($self, $state) = @_; + + # A specific record is selected for indexing. + return [$state->{index_record}] if $state->{index_record}; + + my $start_id = $state->{start_record} || 0; + my $stop_id = $state->{stop_record}; + my $modified_since = $state->{modified_since}; + + my ($select, $from, $where); + if ($modified_since) { + $select = "SELECT id"; + $from = "FROM elastic.bib_last_mod_date"; + $where = "WHERE last_mod_date > '$modified_since'"; + } else { + $select = "SELECT id"; + $from = "FROM biblio.record_entry"; + $where = "WHERE NOT deleted AND active"; + } + + $where .= " AND id >= $start_id" if $start_id; + $where .= " AND id <= $stop_id" if $stop_id; + + # Ordering by ID is the simplest way to guarantee all requested + # records are processed, given that edit dates may not be unique + # and that we're using start_id/stop_id instead of OFFSET to + # define the batches. + my $order = "ORDER BY id"; + + my $sql = "$select $from $where $order LIMIT $BIB_BATCH_SIZE"; + + my $ids = $self->get_db_rows($sql); + return [ map {$_->{id}} @$ids ]; +} + +1; + + diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Marc.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Marc.pm new file mode 100644 index 0000000000..dae48fa5ac --- /dev/null +++ b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Marc.pm @@ -0,0 +1,298 @@ +package OpenILS::Elastic::Bib::Marc; +use base 'OpenILS::Elastic::Bib'; +# --------------------------------------------------------------- +# Copyright (C) 2019 King County Library System +# Author: Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR code. See the +# GNU General Public License for more details. +# --------------------------------------------------------------- +use strict; +use warnings; +use Encode; +use DateTime; +use Time::HiRes qw/time/; +use OpenSRF::Utils::Logger qw/:logger/; +use OpenSRF::Utils::JSON; +use OpenILS::Utils::CStoreEditor qw/:funcs/; +use OpenILS::Utils::DateTime qw/interval_to_seconds/; +use OpenILS::Elastic::Bib; +use base qw/OpenILS::Elastic::Bib/; + +my $INDEX_NAME = 'bib-marc'; + +# TODO: it's possible to apply multiple language analyzers. +my $LANG_ANALYZER = 'english'; + +my $BASE_INDEX_SETTINGS = { + analysis => { + analyzer => { + folding => { + filter => ['lowercase', 'asciifolding'], + tokenizer => 'standard' + } + }, + normalizer => { + custom_lowercase => { + type => 'custom', + filter => ['lowercase'] + } + } + } +}; + +my $BASE_PROPERTIES = { + source => {type => 'integer', index => 'false'}, + create_date => {type => 'date'}, + edit_date => {type => 'date'}, + bib_source => {type => 'integer'}, + marc => { + type => 'nested', + properties => { + # tag is assumed to be composed of numbers, so no lowercase. + tag => {type => 'keyword'}, + subfield => { + type => 'keyword', + fields => { + lower => { + type => 'keyword', + normalizer => 'custom_lowercase' + } + } + }, + value => { + type => 'keyword', + fields => { + lower => { + type => 'keyword', + normalizer => 'custom_lowercase' + }, + text => { + type => 'text', + analyzer => $LANG_ANALYZER + }, + text_folded => { + type => 'text', + analyzer => 'folding' + } + } + } + + } + } +}; + +sub index_name { + return $INDEX_NAME; +} + +sub create_index { + my ($self) = @_; + + if ($self->es->indices->exists(index => $INDEX_NAME)) { + $logger->warn("ES index '$INDEX_NAME' already exists"); + return; + } + + $logger->info( + "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'"); + + my $mappings = $BASE_PROPERTIES; + my $settings = $BASE_INDEX_SETTINGS; + $settings->{number_of_replicas} = scalar(@{$self->nodes}); + $settings->{number_of_shards} = $self->index->num_shards; + + my $conf = { + index => $INDEX_NAME, + body => {settings => $settings} + }; + + $logger->info("ES creating index '$INDEX_NAME'"); + + # Create the base index with settings + eval { $self->es->indices->create($conf) }; + + if ($@) { + $logger->error("ES failed to create index cluster=". + $self->cluster. "index=$INDEX_NAME error=$@"); + warn "$@\n\n"; + return 0; + } + + # Create each mapping one at a time instead of en masse so we + # can more easily report when mapping creation fails. + + for my $field (keys %$mappings) { + $logger->info("ES Creating index mapping for field $field"); + + eval { + $self->es->indices->put_mapping({ + index => $INDEX_NAME, + type => 'record', + body => {properties => {$field => $mappings->{$field}}} + }); + }; + + if ($@) { + my $mapjson = OpenSRF::Utils::JSON->perl2JSON($mappings->{$field}); + + $logger->error("ES failed to create index mapping: " . + "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson"); + + warn "$@\n\n"; + return 0; + } + } + + return 1; +} + +sub get_bib_data { + my ($self, $record_ids) = @_; + + my $ids_str = join(',', @$record_ids); + + my $sql = <get_db_rows($sql); +} + +sub populate_bib_index_batch { + my ($self, $state) = @_; + + my $index_count = 0; + + my $bib_ids = $self->get_bib_ids($state); + return 0 unless @$bib_ids; + + $logger->info("ES indexing ".scalar(@$bib_ids)." records"); + + my $bib_data = $self->get_bib_data($bib_ids); + + # Remove records that are marked deleted. + # This should only happen when running in refresh mode. + + my @active_ids; + for my $bib_id (@$bib_ids) { + + # Every row in the result data contains the 'deleted' value. + my ($field) = grep {$_->{id} == $bib_id} @$bib_data; + + if ($field->{deleted} == 1) { # not 't' / 'f' + $self->delete_documents($bib_id); + } else { + push(@active_ids, $bib_id); + } + } + + $bib_ids = [@active_ids]; + + my $marc = $self->load_marc($bib_ids); + + for my $bib_id (@$bib_ids) { + + my ($record) = grep {$_->{id} == $bib_id} @$bib_data; + + my $body = { + marc => $marc->{$bib_id} || [], + bib_source => $record->{bib_source}, + }; + + ($body->{create_date} = $record->{create_date}) =~ s/ /T/g; + ($body->{edit_date} = $record->{edit_date}) =~ s/ /T/g; + + return 0 unless $self->index_document($bib_id, $body); + + $state->{start_record} = $bib_id + 1; + $index_count++; + } + + $logger->info("ES indexing completed for records " . + $bib_ids->[0] . '...' . $bib_ids->[-1]); + + return $index_count; +} + +sub load_marc { + my ($self, $bib_ids) = @_; + + my $bib_ids_str = join(',', @$bib_ids); + + my $marc_data = $self->get_db_rows(<info("ES found ".scalar(@$marc_data). + " full record rows for current record batch"); + + my $marc = {}; + for my $row (@$marc_data) { + + my $value = $row->{value}; + next unless defined $value && $value ne ''; + + my $subfield = $row->{subfield}; + my $rec_id = $row->{record}; + delete $row->{record}; # avoid adding this to the index + + $row->{value} = $value = $self->truncate_value($value); + + $marc->{$rec_id} = [] unless $marc->{$rec_id}; + delete $row->{subfield} unless defined $subfield; + + # Add values to existing record/tag/subfield rows. + + my $existing; + for my $entry (@{$marc->{$rec_id}}) { + next unless $entry->{tag} eq $row->{tag}; + + if (defined $subfield) { + if (defined $entry->{subfield}) { + if ($subfield eq $entry->{subfield}) { + $existing = $entry; + last; + } + } + } elsif (!defined $entry->{subfield}) { + # Neither has a subfield value / not all tags have subfields + $existing = $entry; + last; + } + } + + if ($existing) { + + $existing->{value} = [$existing->{value}] unless ref $existing->{value}; + push(@{$existing->{value}}, $value); + + } else { + + push(@{$marc->{$rec_id}}, $row); + } + } + + return $marc; +} + + +1; + + diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm new file mode 100644 index 0000000000..37e75bcee1 --- /dev/null +++ b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/Bib/Search.pm @@ -0,0 +1,343 @@ +package OpenILS::Elastic::Bib::Search; +# --------------------------------------------------------------- +# Copyright (C) 2018 King County Library System +# Author: Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. + +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR code. See the +# GNU General Public License for more details. +# --------------------------------------------------------------- +use strict; +use warnings; +use Encode; +use DateTime; +use Time::HiRes qw/time/; +use OpenSRF::Utils::Logger qw/:logger/; +use OpenSRF::Utils::JSON; +use OpenILS::Utils::CStoreEditor qw/:funcs/; +use OpenILS::Utils::DateTime qw/interval_to_seconds/; +use OpenILS::Elastic::Bib; +use base qw/OpenILS::Elastic::Bib/; + +my $INDEX_NAME = 'bib-search'; + +# number of bibs to index per batch. +my $BIB_BATCH_SIZE = 500; + +# TODO: it's possible to apply multiple language analyzers. +my $LANG_ANALYZER = 'english'; + +my $BASE_INDEX_SETTINGS = { + analysis => { + analyzer => { + folding => { + filter => ['lowercase', 'asciifolding'], + tokenizer => 'standard' + } + }, + normalizer => { + custom_lowercase => { + type => 'custom', + filter => ['lowercase'] + } + } + } +}; + +# Well-known bib-search index properties +my $BASE_PROPERTIES = { + source => {type => 'integer', index => 'false'}, + create_date => {type => 'date', index => 'false'}, + edit_date => {type => 'date', index => 'false'}, + + # Holdings summaries. For bib-search, we don't need + # copy-specific details, only aggregate visibility information. + holdings => { + type => 'nested', + properties => { + status => {type => 'integer'}, + circ_lib => {type => 'integer'}, + location => {type => 'integer'}, + circulate => {type => 'boolean'}, + opac_visible => {type => 'boolean'} + } + } +}; + +sub index_name { + return $INDEX_NAME; +} + +sub create_index { + my ($self) = @_; + + if ($self->es->indices->exists(index => $INDEX_NAME)) { + $logger->warn("ES index '$INDEX_NAME' already exists"); + return; + } + + $logger->info( + "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'"); + + my $mappings = $BASE_PROPERTIES; + + my $fields = new_editor()->retrieve_all_elastic_bib_field(); + + for my $field (@$fields) { + + my $field_name = $field->name; + my $search_group = $field->search_group; + $field_name = "$search_group|$field_name" if $search_group; + + # Every field gets a keyword index (default) for aggregation and + # a lower-case keyword index (.lower) for sorting and certain + # types of searches (exact match, starts with) + my $def = { + type => 'keyword', + fields => { + lower => { + type => 'keyword', + normalizer => 'custom_lowercase' + } + } + }; + + if ($field->search_field eq 't') { + # Search fields also get full text indexing and analysis + # plus a "folded" variation for ascii folded searches. + + $def->{fields}->{text} = { + type => 'text', + analyzer => $LANG_ANALYZER + }; + + $def->{fields}->{text_folded} = { + type => 'text', + analyzer => 'folding' + }; + } + + # Apply field boost. + $def->{boost} = $field->weight if ($field->weight || 1) > 1; + + $logger->debug("ES adding field $field_name: ". + OpenSRF::Utils::JSON->perl2JSON($def)); + + $mappings->{$field_name} = $def; + } + + my $settings = $BASE_INDEX_SETTINGS; + $settings->{number_of_replicas} = scalar(@{$self->nodes}); + $settings->{number_of_shards} = $self->index->num_shards; + + my $conf = { + index => $INDEX_NAME, + body => {settings => $settings} + }; + + + $logger->info("ES creating index '$INDEX_NAME'"); + + # Create the base index with settings + eval { $self->es->indices->create($conf) }; + + if ($@) { + $logger->error("ES failed to create index cluster=". + $self->cluster. "index=$INDEX_NAME error=$@"); + warn "$@\n\n"; + return 0; + } + + # Create each mapping one at a time instead of en masse so we + # can more easily report when mapping creation fails. + + for my $field (keys %$mappings) { + $logger->info("ES Creating index mapping for field $field"); + + eval { + $self->es->indices->put_mapping({ + index => $INDEX_NAME, + type => 'record', + body => {properties => {$field => $mappings->{$field}}} + }); + }; + + if ($@) { + my $mapjson = OpenSRF::Utils::JSON->perl2JSON($mappings->{$field}); + + $logger->error("ES failed to create index mapping: " . + "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson"); + + warn "$@\n\n"; + return 0; + } + } + + return 1; +} + +sub get_bib_data { + my ($self, $record_ids) = @_; + + my $ids_str = join(',', @$record_ids); + + my $sql = <get_db_rows($sql); +} + +sub populate_bib_index_batch { + my ($self, $state) = @_; + + my $index_count = 0; + + my $bib_ids = $self->get_bib_ids($state); + return 0 unless @$bib_ids; + + $logger->info("ES indexing ".scalar(@$bib_ids)." records"); + + my $bib_data = $self->get_bib_data($bib_ids); + + # Remove records that are marked deleted. + # This should only happen when running in refresh mode. + + my @active_ids; + for my $bib_id (@$bib_ids) { + + # Every row in the result data contains the 'deleted' value. + my ($field) = grep {$_->{id} == $bib_id} @$bib_data; + + if ($field->{deleted} == 1) { # not 't' / 'f' + $self->delete_documents($bib_id); + } else { + push(@active_ids, $bib_id); + } + } + + $bib_ids = [@active_ids]; + + my $holdings = $self->load_holdings($bib_ids); + + for my $bib_id (@$bib_ids) { + + my $body = { + holdings => $holdings->{$bib_id} || [] + }; + + # there are multiple rows per bib in the data list. + my @fields = grep {$_->{id} == $bib_id} @$bib_data; + + my $first = 1; + for my $field (@fields) { + + if ($first) { + $first = 0; + # some values are repeated per field. + # extract them from the first entry. + $body->{bib_source} = $field->{bib_source}; + + # ES ikes the "T" separator for ISO dates + ($body->{create_date} = $field->{create_date}) =~ s/ /T/g; + ($body->{edit_date} = $field->{edit_date}) =~ s/ /T/g; + } + + my $fclass = $field->{search_group}; + my $fname = $field->{name}; + my $value = $field->{value}; + $fname = "$fclass|$fname" if $fclass; + $value = $self->truncate_value($value); + + if ($body->{$fname}) { + if (ref $body->{$fname}) { + # Three or more values encountered for field. + # Add to the list. + push(@{$body->{$fname}}, $value); + } else { + # Second value encountered for field. + # Upgrade to array storage. + $body->{$fname} = [$body->{$fname}, $value]; + } + } else { + # First value encountered for field. + # Assume for now there will only be one value. + $body->{$fname} = $value + } + } + + return 0 unless $self->index_document($bib_id, $body); + + $state->{start_record} = $bib_id + 1; + $index_count++; + } + + $logger->info("ES indexing completed for records " . + $bib_ids->[0] . '...' . $bib_ids->[-1]); + + return $index_count; +} + +# Load holdings summary blobs for requested bibs +sub load_holdings { + my ($self, $bib_ids) = @_; + + my $bib_ids_str = join(',', @$bib_ids); + + my $copy_data = $self->get_db_rows(<info("ES found ".scalar(@$copy_data). + " holdings summaries for current record batch"); + + my $holdings = {}; + for my $copy (@$copy_data) { + + $holdings->{$copy->{record}} = [] + unless $holdings->{$copy->{record}}; + + push(@{$holdings->{$copy->{record}}}, { + count => $copy->{count}, + status => $copy->{status}, + circ_lib => $copy->{circ_lib}, + location => $copy->{location}, + circulate => $copy->{circulate} ? 'true' : 'false', + opac_visbile => $copy->{opac_visible} ? 'true' : 'false' + }); + } + + return $holdings; +} + +1; + + diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm deleted file mode 100644 index 86bfc9f425..0000000000 --- a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibMarc.pm +++ /dev/null @@ -1,392 +0,0 @@ -package OpenILS::Elastic::BibMarc; -# --------------------------------------------------------------- -# Copyright (C) 2019 King County Library System -# Author: Bill Erickson -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR code. See the -# GNU General Public License for more details. -# --------------------------------------------------------------- -use strict; -use warnings; -use Encode; -use DateTime; -use Time::HiRes qw/time/; -use OpenSRF::Utils::Logger qw/:logger/; -use OpenSRF::Utils::JSON; -use OpenILS::Utils::CStoreEditor qw/:funcs/; -use OpenILS::Utils::DateTime qw/interval_to_seconds/; -use OpenILS::Elastic; -use base qw/OpenILS::Elastic/; -use Data::Dumper; -$Data::Dumper::Indent = 0; - -my $INDEX_NAME = 'bib-marc'; - -# number of bibs to index per batch. -my $BIB_BATCH_SIZE = 500; - -# TODO: it's possible to apply multiple language analyzers. -my $LANG_ANALYZER = 'english'; - -my $BASE_INDEX_SETTINGS = { - analysis => { - analyzer => { - folding => { - filter => ['lowercase', 'asciifolding'], - tokenizer => 'standard' - } - }, - normalizer => { - custom_lowercase => { - type => 'custom', - filter => ['lowercase'] - } - } - } -}; - -my $BASE_PROPERTIES = { - source => {type => 'integer', index => 'false'}, - create_date => {type => 'date'}, - edit_date => {type => 'date'}, - bib_source => {type => 'integer'}, - marc => { - type => 'nested', - properties => { - # tag is assumed to be composed of numbers, so no lowercase. - tag => {type => 'keyword'}, - subfield => { - type => 'keyword', - fields => { - lower => { - type => 'keyword', - normalizer => 'custom_lowercase' - } - } - }, - value => { - type => 'keyword', - fields => { - lower => { - type => 'keyword', - normalizer => 'custom_lowercase' - }, - text => { - type => 'text', - analyzer => $LANG_ANALYZER - }, - text_folded => { - type => 'text', - analyzer => 'folding' - } - } - } - - } - } -}; - -sub index_name { - return $INDEX_NAME; -} - -sub index { - my $self = shift; - return $self->{index} if $self->{index}; - ($self->{index}) = grep {$_->code eq $INDEX_NAME} @{$self->indices}; - - if (!$self->{index}) { - $logger->error("No index configuration exists for '$INDEX_NAME'"); - return undef; - } - - return $self->{index}; -} - -sub create_index { - my ($self) = @_; - - if ($self->es->indices->exists(index => $INDEX_NAME)) { - $logger->warn("ES index '$INDEX_NAME' already exists"); - return; - } - - $logger->info( - "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'"); - - my $mappings = $BASE_PROPERTIES; - my $settings = $BASE_INDEX_SETTINGS; - $settings->{number_of_replicas} = scalar(@{$self->nodes}); - $settings->{number_of_shards} = $self->index->num_shards; - - my $conf = { - index => $INDEX_NAME, - body => {settings => $settings} - }; - - $logger->info("ES creating index '$INDEX_NAME'"); - - # Create the base index with settings - eval { $self->es->indices->create($conf) }; - - if ($@) { - $logger->error("ES failed to create index cluster=". - $self->cluster. "index=$INDEX_NAME error=$@"); - warn "$@\n\n"; - return 0; - } - - # Create each mapping one at a time instead of en masse so we - # can more easily report when mapping creation fails. - - for my $field (keys %$mappings) { - $logger->info("ES Creating index mapping for field $field"); - - eval { - $self->es->indices->put_mapping({ - index => $INDEX_NAME, - type => 'record', - body => {properties => {$field => $mappings->{$field}}} - }); - }; - - if ($@) { - my $mapjson = OpenSRF::Utils::JSON->perl2JSON($mappings->{$field}); - - $logger->error("ES failed to create index mapping: " . - "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson"); - - warn "$@\n\n"; - return 0; - } - } - - return 1; -} - -sub populate_index { - my ($self, $settings) = @_; - $settings ||= {}; - - my $index_count = 0; - my $total_indexed = 0; - - # extract the database settings. - for my $db_key (grep {$_ =~ /^db_/} keys %$settings) { - $self->{$db_key} = $settings->{$db_key}; - } - - my $end_time; - my $duration = $settings->{max_duration}; - if ($duration) { - my $seconds = interval_to_seconds($duration); - $end_time = DateTime->now; - $end_time->add(seconds => $seconds); - } - - while (1) { - - $index_count = $self->populate_bib_index_batch($settings); - $total_indexed += $index_count; - - $logger->info("ES indexed $total_indexed bib records"); - - # exit if we're only indexing a single record or if the - # batch indexer says there are no more records to index. - last if !$index_count || $settings->{index_record}; - - if ($end_time && DateTime->now > $end_time) { - $logger->info( - "ES index populate exiting early on max_duration $duration"); - last; - } - } - - $logger->info("ES bib indexing complete with $total_indexed records"); -} - -sub get_bib_ids { - my ($self, $state) = @_; - - # A specific record is selected for indexing. - return [$state->{index_record}] if $state->{index_record}; - - my $start_id = $state->{start_record} || 0; - my $stop_id = $state->{stop_record}; - my $modified_since = $state->{modified_since}; - - my ($select, $from, $where); - if ($modified_since) { - $select = "SELECT id"; - $from = "FROM elastic.bib_last_mod_date"; - $where = "WHERE last_mod_date > '$modified_since'"; - } else { - $select = "SELECT id"; - $from = "FROM biblio.record_entry"; - $where = "WHERE NOT deleted AND active"; - } - - $where .= " AND id >= $start_id" if $start_id; - $where .= " AND id <= $stop_id" if $stop_id; - - # Ordering by ID is the simplest way to guarantee all requested - # records are processed, given that edit dates may not be unique - # and that we're using start_id/stop_id instead of OFFSET to - # define the batches. - my $order = "ORDER BY id"; - - my $sql = "$select $from $where $order LIMIT $BIB_BATCH_SIZE"; - - my $ids = $self->get_db_rows($sql); - return [ map {$_->{id}} @$ids ]; -} - -sub get_bib_data { - my ($self, $record_ids) = @_; - - my $ids_str = join(',', @$record_ids); - - my $sql = <get_db_rows($sql); -} - -sub populate_bib_index_batch { - my ($self, $state) = @_; - - my $index_count = 0; - - my $bib_ids = $self->get_bib_ids($state); - return 0 unless @$bib_ids; - - $logger->info("ES indexing ".scalar(@$bib_ids)." records"); - - my $bib_data = $self->get_bib_data($bib_ids); - - # Remove records that are marked deleted. - # This should only happen when running in refresh mode. - - my @active_ids; - for my $bib_id (@$bib_ids) { - - # Every row in the result data contains the 'deleted' value. - my ($field) = grep {$_->{id} == $bib_id} @$bib_data; - - if ($field->{deleted} == 1) { # not 't' / 'f' - $self->delete_documents($bib_id); - } else { - push(@active_ids, $bib_id); - } - } - - $bib_ids = [@active_ids]; - - my $marc = $self->load_marc($bib_ids); - - for my $bib_id (@$bib_ids) { - - my ($record) = grep {$_->{id} == $bib_id} @$bib_data; - - my $body = { - marc => $marc->{$bib_id} || [], - bib_source => $record->{bib_source}, - }; - - ($body->{create_date} = $record->{create_date}) =~ s/ /T/g; - ($body->{edit_date} = $record->{edit_date}) =~ s/ /T/g; - - return 0 unless $self->index_document($bib_id, $body); - - $state->{start_record} = $bib_id + 1; - $index_count++; - } - - $logger->info("ES indexing completed for records " . - $bib_ids->[0] . '...' . $bib_ids->[-1]); - - return $index_count; -} - -sub load_marc { - my ($self, $bib_ids) = @_; - - my $bib_ids_str = join(',', @$bib_ids); - - my $marc_data = $self->get_db_rows(<info("ES found ".scalar(@$marc_data). - " full record rows for current record batch"); - - my $marc = {}; - for my $row (@$marc_data) { - - my $value = $row->{value}; - next unless defined $value && $value ne ''; - - my $subfield = $row->{subfield}; - my $rec_id = $row->{record}; - delete $row->{record}; # avoid adding this to the index - - $row->{value} = $value = $self->truncate_value($value); - - $marc->{$rec_id} = [] unless $marc->{$rec_id}; - delete $row->{subfield} unless defined $subfield; - - # Add values to existing record/tag/subfield rows. - - my $existing; - for my $entry (@{$marc->{$rec_id}}) { - next unless $entry->{tag} eq $row->{tag}; - - if (defined $subfield) { - if (defined $entry->{subfield}) { - if ($subfield eq $entry->{subfield}) { - $existing = $entry; - last; - } - } - } elsif (!defined $entry->{subfield}) { - # Neither has a subfield value / not all tags have subfields - $existing = $entry; - last; - } - } - - if ($existing) { - - $existing->{value} = [$existing->{value}] unless ref $existing->{value}; - push(@{$existing->{value}}, $value); - - } else { - - push(@{$marc->{$rec_id}}, $row); - } - } - - return $marc; -} - - -1; - - diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm b/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm deleted file mode 100644 index 9f2d9bbc89..0000000000 --- a/Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm +++ /dev/null @@ -1,480 +0,0 @@ -package OpenILS::Elastic::BibSearch; -# --------------------------------------------------------------- -# Copyright (C) 2018 King County Library System -# Author: Bill Erickson -# -# This program is free software; you can redistribute it and/or -# modify it under the terms of the GNU General Public License -# as published by the Free Software Foundation; either version 2 -# of the License, or (at your option) any later version. - -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR code. See the -# GNU General Public License for more details. -# --------------------------------------------------------------- -use strict; -use warnings; -use Encode; -use DateTime; -use Time::HiRes qw/time/; -use OpenSRF::Utils::Logger qw/:logger/; -use OpenSRF::Utils::JSON; -use OpenILS::Utils::CStoreEditor qw/:funcs/; -use OpenILS::Utils::DateTime qw/interval_to_seconds/; -use OpenILS::Elastic; -use base qw/OpenILS::Elastic/; - -my $INDEX_NAME = 'bib-search'; - -# number of bibs to index per batch. -my $BIB_BATCH_SIZE = 500; - -# TODO: it's possible to apply multiple language analyzers. -my $LANG_ANALYZER = 'english'; - -my $BASE_INDEX_SETTINGS = { - analysis => { - analyzer => { - folding => { - filter => ['lowercase', 'asciifolding'], - tokenizer => 'standard' - } - }, - normalizer => { - custom_lowercase => { - type => 'custom', - filter => ['lowercase'] - } - } - } -}; - -# Well-known bib-search index properties -my $BASE_PROPERTIES = { - source => {type => 'integer', index => 'false'}, - create_date => {type => 'date', index => 'false'}, - edit_date => {type => 'date', index => 'false'}, - - # Holdings summaries. For bib-search, we don't need - # copy-specific details, only aggregate visibility information. - holdings => { - type => 'nested', - properties => { - status => {type => 'integer'}, - circ_lib => {type => 'integer'}, - location => {type => 'integer'}, - circulate => {type => 'boolean'}, - opac_visible => {type => 'boolean'} - } - } -}; - -sub index_name { - return $INDEX_NAME; -} - -sub index { - my $self = shift; - return $self->{index} if $self->{index}; - ($self->{index}) = grep {$_->code eq $INDEX_NAME} @{$self->indices}; - return $self->{index}; -} - -sub create_index { - my ($self) = @_; - - if ($self->es->indices->exists(index => $INDEX_NAME)) { - $logger->warn("ES index '$INDEX_NAME' already exists"); - return; - } - - $logger->info( - "ES creating index '$INDEX_NAME' on cluster '".$self->cluster."'"); - - my $mappings = $BASE_PROPERTIES; - - my $fields = new_editor()->retrieve_all_elastic_bib_field(); - - for my $field (@$fields) { - - my $field_name = $field->name; - my $search_group = $field->search_group; - $field_name = "$search_group|$field_name" if $search_group; - - # Every field gets a keyword index (default) for aggregation and - # a lower-case keyword index (.lower) for sorting and certain - # types of searches (exact match, starts with) - my $def = { - type => 'keyword', - fields => { - lower => { - type => 'keyword', - normalizer => 'custom_lowercase' - } - } - }; - - if ($field->search_field eq 't') { - # Search fields also get full text indexing and analysis - # plus a "folded" variation for ascii folded searches. - - $def->{fields}->{text} = { - type => 'text', - analyzer => $LANG_ANALYZER - }; - - $def->{fields}->{text_folded} = { - type => 'text', - analyzer => 'folding' - }; - } - - # Apply field boost. - $def->{boost} = $field->weight if ($field->weight || 1) > 1; - - $logger->debug("ES adding field $field_name: ". - OpenSRF::Utils::JSON->perl2JSON($def)); - - $mappings->{$field_name} = $def; - } - - my $settings = $BASE_INDEX_SETTINGS; - $settings->{number_of_replicas} = scalar(@{$self->nodes}); - $settings->{number_of_shards} = $self->index->num_shards; - - my $conf = { - index => $INDEX_NAME, - body => {settings => $settings} - }; - - - $logger->info("ES creating index '$INDEX_NAME'"); - - # Create the base index with settings - eval { $self->es->indices->create($conf) }; - - if ($@) { - $logger->error("ES failed to create index cluster=". - $self->cluster. "index=$INDEX_NAME error=$@"); - warn "$@\n\n"; - return 0; - } - - # Create each mapping one at a time instead of en masse so we - # can more easily report when mapping creation fails. - - for my $field (keys %$mappings) { - $logger->info("ES Creating index mapping for field $field"); - - eval { - $self->es->indices->put_mapping({ - index => $INDEX_NAME, - type => 'record', - body => {properties => {$field => $mappings->{$field}}} - }); - }; - - if ($@) { - my $mapjson = OpenSRF::Utils::JSON->perl2JSON($mappings->{$field}); - - $logger->error("ES failed to create index mapping: " . - "index=$INDEX_NAME field=$field error=$@ mapping=$mapjson"); - - warn "$@\n\n"; - return 0; - } - } - - return 1; -} - -# Add data to the bib-search index -sub populate_index { - my ($self, $settings) = @_; - $settings ||= {}; - - my $index_count = 0; - my $total_indexed = 0; - - # extract the database settings. - for my $db_key (grep {$_ =~ /^db_/} keys %$settings) { - $self->{$db_key} = $settings->{$db_key}; - } - - my $end_time; - my $duration = $settings->{max_duration}; - if ($duration) { - my $seconds = interval_to_seconds($duration); - $end_time = DateTime->now; - $end_time->add(seconds => $seconds); - } - - while (1) { - - $index_count = $self->populate_bib_index_batch($settings); - $total_indexed += $index_count; - - $logger->info("ES indexed $total_indexed bib records"); - - # exit if we're only indexing a single record or if the - # batch indexer says there are no more records to index. - last if !$index_count || $settings->{index_record}; - - if ($end_time && DateTime->now > $end_time) { - $logger->info( - "ES index populate exiting early on max_duration $duration"); - last; - } - } - - $logger->info("ES bib indexing complete with $total_indexed records"); -} - -sub get_bib_ids { - my ($self, $state) = @_; - - # A specific record is selected for indexing. - return [$state->{index_record}] if $state->{index_record}; - - my $start_id = $state->{start_record} || 0; - my $stop_id = $state->{stop_record}; - my $modified_since = $state->{modified_since}; - - my ($select, $from, $where); - if ($modified_since) { - $select = "SELECT id"; - $from = "FROM elastic.bib_last_mod_date"; - $where = "WHERE last_mod_date > '$modified_since'"; - } else { - $select = "SELECT id"; - $from = "FROM biblio.record_entry"; - $where = "WHERE NOT deleted AND active"; - } - - $where .= " AND id >= $start_id" if $start_id; - $where .= " AND id <= $stop_id" if $stop_id; - - # Ordering by ID is the simplest way to guarantee all requested - # records are processed, given that edit dates may not be unique - # and that we're using start_id/stop_id instead of OFFSET to - # define the batches. - my $order = "ORDER BY id"; - - my $sql = "$select $from $where $order LIMIT $BIB_BATCH_SIZE"; - - my $ids = $self->get_db_rows($sql); - return [ map {$_->{id}} @$ids ]; -} - -sub get_bib_data { - my ($self, $record_ids) = @_; - - my $ids_str = join(',', @$record_ids); - - my $sql = <get_db_rows($sql); -} - -sub populate_bib_index_batch { - my ($self, $state) = @_; - - my $index_count = 0; - - my $bib_ids = $self->get_bib_ids($state); - return 0 unless @$bib_ids; - - $logger->info("ES indexing ".scalar(@$bib_ids)." records"); - - my $bib_data = $self->get_bib_data($bib_ids); - - # Remove records that are marked deleted. - # This should only happen when running in refresh mode. - - my @active_ids; - for my $bib_id (@$bib_ids) { - - # Every row in the result data contains the 'deleted' value. - my ($field) = grep {$_->{id} == $bib_id} @$bib_data; - - if ($field->{deleted} == 1) { # not 't' / 'f' - $self->delete_documents($bib_id); - } else { - push(@active_ids, $bib_id); - } - } - - $bib_ids = [@active_ids]; - - my $holdings = $self->load_holdings($bib_ids); - - for my $bib_id (@$bib_ids) { - - my $body = { - holdings => $holdings->{$bib_id} || [] - }; - - # there are multiple rows per bib in the data list. - my @fields = grep {$_->{id} == $bib_id} @$bib_data; - - my $first = 1; - for my $field (@fields) { - - if ($first) { - $first = 0; - # some values are repeated per field. - # extract them from the first entry. - $body->{bib_source} = $field->{bib_source}; - - # ES ikes the "T" separator for ISO dates - ($body->{create_date} = $field->{create_date}) =~ s/ /T/g; - ($body->{edit_date} = $field->{edit_date}) =~ s/ /T/g; - } - - my $fclass = $field->{search_group}; - my $fname = $field->{name}; - my $value = $field->{value}; - $fname = "$fclass|$fname" if $fclass; - $value = $self->truncate_value($value); - - if ($body->{$fname}) { - if (ref $body->{$fname}) { - # Three or more values encountered for field. - # Add to the list. - push(@{$body->{$fname}}, $value); - } else { - # Second value encountered for field. - # Upgrade to array storage. - $body->{$fname} = [$body->{$fname}, $value]; - } - } else { - # First value encountered for field. - # Assume for now there will only be one value. - $body->{$fname} = $value - } - } - - return 0 unless $self->index_document($bib_id, $body); - - $state->{start_record} = $bib_id + 1; - $index_count++; - } - - $logger->info("ES indexing completed for records " . - $bib_ids->[0] . '...' . $bib_ids->[-1]); - - return $index_count; -} - -# Load holdings summary blobs for requested bibs -sub load_holdings { - my ($self, $bib_ids) = @_; - - my $bib_ids_str = join(',', @$bib_ids); - - my $copy_data = $self->get_db_rows(<info("ES found ".scalar(@$copy_data). - " holdings summaries for current record batch"); - - my $holdings = {}; - for my $copy (@$copy_data) { - - $holdings->{$copy->{record}} = [] - unless $holdings->{$copy->{record}}; - - push(@{$holdings->{$copy->{record}}}, { - count => $copy->{count}, - status => $copy->{status}, - circ_lib => $copy->{circ_lib}, - location => $copy->{location}, - circulate => $copy->{circulate} ? 'true' : 'false', - opac_visbile => $copy->{opac_visible} ? 'true' : 'false' - }); - } - - return $holdings; -} - -# Example pulling marc tag/subfield data. -# TODO: Create a separate bib-marc index if needed. -sub load_marc { - my ($self, $bib_ids) = @_; - - my $bib_ids_str = join(',', @$bib_ids); - - my $marc_data = $self->get_db_rows(<info("ES found ".scalar(@$marc_data). - " full record rows for current record batch"); - - my $marc = {}; - for my $row (@$marc_data) { - - my $rec_id = $row->{record}; - next unless defined $row->{value} && $row->{value} ne ''; - - $marc->{$rec_id} = [] unless $marc->{$rec_id}; - delete $row->{subfield} unless defined $row->{subfield}; - - # Add values to existing record/tag/subfield rows. - - my ($existing) = grep { - $_->{record} == $row->{record} && - $_->{tag} eq $row->{tag} && ( - (not defined $_->{subfield} && not defined $row->{subfield}) || - ($_->{subfield} eq $row->{subfield}) - ) - } @{$marc->{$rec_id}}; - - if ($existing) { - - $existing->{subfield} = [$existing->{subfield}] - unless ref $existing->{subfield}; - push(@{$existing->{subfield}}, $row->{value}); - - } else { - - push(@{$marc->{$rec_id}}, $row); - } - } - - return $marc; -} - - - -1; - - diff --git a/Open-ILS/src/support-scripts/elastic-index.pl b/Open-ILS/src/support-scripts/elastic-index.pl index c24aa38e20..80d3ef46d7 100755 --- a/Open-ILS/src/support-scripts/elastic-index.pl +++ b/Open-ILS/src/support-scripts/elastic-index.pl @@ -5,8 +5,8 @@ use Getopt::Long; use OpenSRF::Utils::JSON; use OpenILS::Utils::Fieldmapper; use OpenILS::Utils::CStoreEditor; -use OpenILS::Elastic::BibSearch; -use OpenILS::Elastic::BibMarc; +use OpenILS::Elastic::Bib::Search; +use OpenILS::Elastic::Bib::Marc; my $help; my $osrf_config = '/openils/conf/opensrf_core.xml'; @@ -128,9 +128,9 @@ OpenILS::Utils::CStoreEditor::init(); my $es; if ($index_name eq 'bib-search') { - $es = OpenILS::Elastic::BibSearch->new($cluster); + $es = OpenILS::Elastic::Bib::Search->new($cluster); } elsif ($index_name eq 'bib-marc') { - $es = OpenILS::Elastic::BibMarc->new($cluster); + $es = OpenILS::Elastic::Bib::Marc->new($cluster); } if (!$es) {