use existing indexed values; refactoring
authorBill Erickson <berickxx@gmail.com>
Fri, 26 Oct 2018 15:43:38 +0000 (11:43 -0400)
committerBill Erickson <berickxx@gmail.com>
Fri, 6 Sep 2019 18:21:21 +0000 (14:21 -0400)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Biblio.pm
Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Elastic.pm [new file with mode: 0644]
Open-ILS/src/perlmods/lib/OpenILS/Elastic.pm
Open-ILS/src/perlmods/lib/OpenILS/Elastic/BibSearch.pm
Open-ILS/src/sql/Pg/upgrade/XXXX.schema.elastic-search.sql

index 4581f17..9c7bc6a 100644 (file)
@@ -10,7 +10,7 @@ use OpenSRF::Utils::SettingsClient;
 use OpenILS::Utils::CStoreEditor q/:funcs/;
 use OpenSRF::Utils::Cache;
 use Encode;
-use OpenILS::Elastic::BibSearch;
+use OpenILS::Application::Search::Elastic;
 
 use OpenSRF::Utils::Logger qw/:logger/;
 
@@ -1159,7 +1159,10 @@ sub staged_search {
 
     # TODO TODO check settings/db to see if elasticsearch is 
     # enabled for bib-search.
-    #return elastic_search($search_hash->{query}, $user_offset, $user_limit);
+    if (1) {
+        return OpenILS::Application::Search::Elastic->bib_search(
+            $search_hash->{query}, $user_offset, $user_limit);
+    }
 
     # we're grabbing results on a per-superpage basis, which means the 
     # limit and offset should coincide with superpage boundaries
@@ -1311,6 +1314,7 @@ sub staged_search {
 }
 
 
+my $elastic_fields;
 sub elastic_search {
     my ($query, $offset, $limit) = @_;
 
@@ -1329,7 +1333,6 @@ sub elastic_search {
         }
     }
 
-
     my $elastic_query = {
         # Fetch only the bib ID field from each source document
         _source => ['id'],
@@ -1347,57 +1350,9 @@ sub elastic_search {
         }
     };
 
-    # No need to filter on holdings lib when searching globally
-    # (i.e. depth = 0)
-    if ($calls{site}) {
-
-        my $types = $U->get_org_types;
-        my $org = $U->find_org_by_shortname($U->get_org_tree, $calls{site});
-        my ($type) = grep {$_->id == $org->ou_type} @$types;
-        my $depth = $calls{depth} || $type->depth;
-
-        # No holdings-level circ lib filter needed when searching globally
-        if ($depth > 0) {
-
-            # TODO
-            # this makes a cstore call, but could easily come from cache.
-            my $org_ids = $U->get_org_descendants($org->id, $depth);
-
-            # Add a boolean OR-filter on holdings circ lib and optionally
-            # add a boolean AND-filter on copy status for availability
-            # checking.
-            $elastic_query->{query}->{bool}->{filter} = {
-                nested => {
-                    path => 'holdings',
-                    query => {bool => {should => []}}
-                }
-            };
-
-            my $should = 
-                $elastic_query->{query}{bool}{filter}{nested}{query}{bool}{should};
-
-            for my $org_id (@$org_ids) {
-
-                # Ensure at least one copy exists at the selected org unit
-                my $and = {
-                    bool => {
-                        must => [
-                            {term => {'holdings.circ_lib' => $org_id}}
-                        ]
-                    }
-                };
-
-                # When limiting to available, ensure at least one of the
-                # above copies is in status 0 or 7.
-                # TODO: consult config.copy_status.is_available
-                push(
-                    @{$and->{bool}{must}}, 
-                    {terms => {'holdings.status' => [0, 7]}}
-                ) if $available;
-
-                push(@$should, $and);
-            }
-        }
+    if (my $sn = $calls{site}) {
+        elastic_add_holdings_filter(
+            $elastic_query, $sn, $calls{depth}, $available);
     }
 
     if (my $key = $calls{sort}) {
@@ -1408,19 +1363,17 @@ sub elastic_search {
         my $dir = $descending ? 'desc' : 'asc';
         if ($key =~ /title/) {
             $elastic_query->{sort} = [
-                {'title|sort' => $dir},
-                {'title|maintitle.raw' => $dir}
+                {'titlesort' => $dir},
             ];
             
         } elsif ($key =~ /author/) {
             $elastic_query->{sort} = [
-                {'author|sort' => $dir},
-                {'author|first_author.raw' => $dir}
+                {'authorsort' => $dir},
             ];
 
         } elsif ($key =~ /pubdate/) {
             $elastic_query->{sort} = [
-                {'identifier|pub_date' => $dir}
+                {'pubdate' => $dir}
             ];
         }
     }
@@ -1438,7 +1391,88 @@ sub elastic_search {
                 grep {defined $_} @{$results->{hits}->{hits}}
         ]
     };
+}
+
+# avoid repetitive calls to DB for org info.
+my %org_data_cache = (by_shortname => {}, ancestors_at => {});
+
+sub elastic_add_holdings_filter {
+    my ($elastic_query, $shortname, $depth, $available) = @_;
+
+    if (!$org_data_cache{by_shortname}{$shortname}) {
+        $org_data_cache{by_shortname}{$shortname} = 
+            $U->find_org_by_shortname($U->get_org_tree, $shortname);
+    }
+
+    my $org = $org_data_cache{by_shortname}{$shortname};
+
+    my $types = $U->get_org_types; # pulls from cache
+    my ($type) = grep {$_->id == $org->ou_type} @$types;
+
+    $depth = defined $depth ? min($depth, $type->depth) : $type->depth;
+
+    if ($depth > 0) {
 
+        if (!$org_data_cache{ancestors_at}{$shortname}) {
+            $org_data_cache{ancestors_at}{$shortname} = {};
+        }
+
+        if (!$org_data_cache{ancestors_at}{$shortname}{$depth}) {
+            $org_data_cache{ancestors_at}{$shortname}{$depth} = 
+                $U->get_org_descendants($org->id, $depth);
+        }
+
+        my $org_ids = $org_data_cache{ancestors_at}{$shortname}{$depth};
+
+        # Add a boolean OR-filter on holdings circ lib and optionally
+        # add a boolean AND-filter on copy status for availability
+        # checking.
+        $elastic_query->{query}->{bool}->{filter} = {
+            nested => {
+                path => 'holdings',
+                query => {bool => {should => []}}
+            }
+        };
+
+        my $should = 
+            $elastic_query->{query}{bool}{filter}{nested}{query}{bool}{should};
+
+        for my $org_id (@$org_ids) {
+
+            # Ensure at least one copy exists at the selected org unit
+            my $and = {
+                bool => {
+                    must => [
+                        {term => {'holdings.circ_lib' => $org_id}}
+                    ]
+                }
+            };
+
+            # When limiting to available, ensure at least one of the
+            # above copies is in status 0 or 7.
+            # TODO: consult config.copy_status.is_available
+            push(
+                @{$and->{bool}{must}}, 
+                {terms => {'holdings.status' => [0, 7]}}
+            ) if $available;
+
+            push(@$should, $and);
+        }
+
+    } elsif ($available) {
+        # Limit to results that have an available copy, but don't worry
+        # about where the copy lives, since we're searching globally.
+
+        $elastic_query->{query}->{bool}->{filter} = {
+            nested => {
+                path => 'holdings',
+                query => {bool => {must => [
+                    # TODO: consult config.copy_status.is_available
+                    {terms => {'holdings.status' => [0, 7]}}
+                ]}}
+            }
+        };
+    }
 }
 
 sub fetch_display_fields {
diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Elastic.pm b/Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Elastic.pm
new file mode 100644 (file)
index 0000000..df0fc9c
--- /dev/null
@@ -0,0 +1,210 @@
+package OpenILS::Application::Search::Elastic;
+# ---------------------------------------------------------------
+# Copyright (C) 2018 King County Library System
+# Author: Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# ---------------------------------------------------------------
+use strict; 
+use warnings;
+use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/:logger/;
+use OpenILS::Utils::Fieldmapper;
+use OpenSRF::Utils::SettingsClient;
+use OpenILS::Utils::CStoreEditor q/:funcs/;
+use OpenILS::Elastic::BibSearch;
+use List::Util qw/min/;
+
+use OpenILS::Application::AppUtils;
+my $U = "OpenILS::Application::AppUtils";
+
+# bib fields defined in the elastic bib-search index
+my $bib_search_fields;
+
+# avoid repetitive calls to DB for org info.
+my %org_data_cache = (by_shortname => {}, ancestors_at => {});
+
+# Translate a bib search API call into something consumable by Elasticsearch
+# Translate search results into a structure consistent with a bib search
+# API response.
+sub bib_search {
+    my ($class, $query, $offset, $limit) = @_;
+
+    if (!$bib_search_fields) {
+        # gather fields and flesh with crad / cmf
+    }
+
+    my $elastic_query = translate_elastic_query($query, $offset, $limit);
+
+    my $es = OpenILS::Elastic::BibSearch->new('main');
+
+    $es->connect;
+    my $results = $es->search($elastic_query);
+
+    return {count => 0} unless $results;
+
+    return {
+        count => $results->{hits}->{total},
+        ids => [
+            map { [$_->{_id}, undef, $_->{_score}] } 
+                grep {defined $_} @{$results->{hits}->{hits}}
+        ]
+    };
+}
+
+sub translate_elastic_query {
+    my ($query, $offset, $limit) = @_;
+
+    my ($available) = ($query =~ s/(\#available)//g);
+    my ($descending) = ($query =~ s/(\#descending)//g);
+
+    my @funcs = qw/site depth sort item_lang/; # todo add others
+    my %calls;
+
+    for my $func (@funcs) {
+        my ($val) = ($query =~ /$func\(([^\)]+)\)/);
+        if (defined $val) {
+            # scrub from query string
+            $query =~ s/$func\(([^\)]+)\)//g;
+            $calls{$func} = $val;
+        }
+    }
+
+    my $elastic_query = {
+        # Fetch only the bib ID field from each source document
+        _source => ['id'],
+        size => $limit,
+        from => $offset,
+        query => {
+            bool => {
+                must => {
+                    query_string => {
+                        default_field => 'keyword',
+                        query => $query
+                    }
+                }
+            }
+        }
+    };
+
+    if (my $sn = $calls{site}) {
+        add_elastic_holdings_filter(
+            $elastic_query, $sn, $calls{depth}, $available);
+    }
+
+    if (my $key = $calls{sort}) {
+
+        # These sort fields match the default display field entries.
+        # TODO: index fields specific to sorting
+
+        my $dir = $descending ? 'desc' : 'asc';
+        if ($key =~ /title/) {
+            $elastic_query->{sort} = [
+                {'titlesort' => $dir},
+            ];
+            
+        } elsif ($key =~ /author/) {
+            $elastic_query->{sort} = [
+                {'authorsort' => $dir},
+            ];
+
+        } elsif ($key =~ /pubdate/) {
+            $elastic_query->{sort} = [
+                {'pubdate' => $dir}
+            ];
+        }
+    }
+
+    return $elastic_query;
+}
+
+
+sub add_elastic_holdings_filter {
+    my ($elastic_query, $shortname, $depth, $available) = @_;
+
+    if (!$org_data_cache{by_shortname}{$shortname}) {
+        $org_data_cache{by_shortname}{$shortname} = 
+            $U->find_org_by_shortname($U->get_org_tree, $shortname);
+    }
+
+    my $org = $org_data_cache{by_shortname}{$shortname};
+
+    my $types = $U->get_org_types; # pulls from cache
+    my ($type) = grep {$_->id == $org->ou_type} @$types;
+
+    $depth = defined $depth ? min($depth, $type->depth) : $type->depth;
+
+    if ($depth > 0) {
+
+        if (!$org_data_cache{ancestors_at}{$shortname}) {
+            $org_data_cache{ancestors_at}{$shortname} = {};
+        }
+
+        if (!$org_data_cache{ancestors_at}{$shortname}{$depth}) {
+            $org_data_cache{ancestors_at}{$shortname}{$depth} = 
+                $U->get_org_descendants($org->id, $depth);
+        }
+
+        my $org_ids = $org_data_cache{ancestors_at}{$shortname}{$depth};
+
+        # Add a boolean OR-filter on holdings circ lib and optionally
+        # add a boolean AND-filter on copy status for availability
+        # checking.
+        $elastic_query->{query}->{bool}->{filter} = {
+            nested => {
+                path => 'holdings',
+                query => {bool => {should => []}}
+            }
+        };
+
+        my $should = 
+            $elastic_query->{query}{bool}{filter}{nested}{query}{bool}{should};
+
+        for my $org_id (@$org_ids) {
+
+            # Ensure at least one copy exists at the selected org unit
+            my $and = {
+                bool => {
+                    must => [
+                        {term => {'holdings.circ_lib' => $org_id}}
+                    ]
+                }
+            };
+
+            # When limiting to available, ensure at least one of the
+            # above copies is in status 0 or 7.
+            # TODO: consult config.copy_status.is_available
+            push(
+                @{$and->{bool}{must}}, 
+                {terms => {'holdings.status' => [0, 7]}}
+            ) if $available;
+
+            push(@$should, $and);
+        }
+
+    } elsif ($available) {
+        # Limit to results that have an available copy, but don't worry
+        # about where the copy lives, since we're searching globally.
+
+        $elastic_query->{query}->{bool}->{filter} = {
+            nested => {
+                path => 'holdings',
+                query => {bool => {must => [
+                    # TODO: consult config.copy_status.is_available
+                    {terms => {'holdings.status' => [0, 7]}}
+                ]}}
+            }
+        };
+    }
+}
+
+1;
+
index 72d6609..7e00882 100644 (file)
@@ -95,11 +95,11 @@ sub load_config {
     my $self = shift;
     my $cluster = $self->cluster;
 
-    $self->{servers} = $self->get_db_rows(
+    $self->{nodes} = $self->get_db_rows(
         "SELECT * FROM elastic.node WHERE cluster = '$cluster' AND active");
 
-    unless (@{$self->{servers}}) {
-        $logger->error("ES no servers defined for cluster $cluster");
+    unless (@{$self->{nodes}}) {
+        $logger->error("ES no nodes defined for cluster $cluster");
         return;
     }
 
@@ -117,14 +117,19 @@ sub connect {
     $self->load_config;
 
     my @nodes;
-    for my $server (@{$self->{servers}}) {
+    for my $server (@{$self->{nodes}}) {
         push(@nodes, sprintf("%s://%s:%d", 
             $server->{proto}, $server->{host}, $server->{port}));
     }
 
     $logger->info("ES connecting to nodes @nodes");
 
-    $self->{es} = Search::Elasticsearch->new(nodes => \@nodes)
+    eval { $self->{es} = Search::Elasticsearch->new(nodes => \@nodes) };
+
+    if ($@) {
+        $logger->error("ES failed to connect to @nodes: $@");
+        return;
+    }
 }
 
 sub delete_index {
@@ -136,8 +141,10 @@ sub delete_index {
         $logger->info(
             "ES deleting index '$index' on cluster '".$self->cluster."'");
         $self->es->indices->delete(index => $index);
+
     } else {
-        $logger->warn("ES index '$index' does not exist");
+        $logger->warn("ES index '$index' ".
+            "does not exist in cluster '".$self->cluster."'");
     }
 }
 
index 5e3c6b3..6728233 100644 (file)
@@ -15,19 +15,15 @@ package OpenILS::Elastic::BibSearch;
 # ---------------------------------------------------------------
 use strict;
 use warnings;
-use Clone qw/clone/;
-use DBI;
-use XML::LibXML;
-use XML::LibXSLT;
 use OpenSRF::Utils::Logger qw/:logger/;
-use OpenILS::Elastic;
 use OpenSRF::Utils::JSON;
+use OpenILS::Elastic;
 use base qw/OpenILS::Elastic/;
 
 my $INDEX_NAME = 'bib-search';
 
 # number of bibs to index per batch.
-my $BIB_BATCH_SIZE = 1000;
+my $BIB_BATCH_SIZE = 500;
 
 # TODO: it's possible to apply multiple language analyzers.
 my $LANG_ANALYZER = 'english';
@@ -63,8 +59,8 @@ my $BASE_PROPERTIES = {
     },
 
     # Combo fields for field-class level searches.
-    # The value for every (for eaxmple) title|* field will be copied
-    # to the "title" field for searching accross all title entries.
+    # The value for every (for example) title|* search field will be 
+    # copied to the "title" field for searching accross all title entries.
     title => {
         type => 'text',
         analyzer => $LANG_ANALYZER,
@@ -152,31 +148,35 @@ sub create_index {
             };
 
             if ($field->{facet_field} || $field->{sorter}) {
-                # If it's also a facet field, add a keyword version
-                # of the field to use for aggregation
+                # If it's also a sort/facet field, add a keyword version
+                # of the field to use for sorting and aggregation
                 $def->{fields}{raw} = {type => 'keyword'};
+            }
 
-                if ($search_group) {
-                    # Fields in a search group are "copy_to"'ed the 
-                    # group definition
-                    $def->{copy_to} = $search_group;
-                }
+            if ($search_group) {
+                # Fields in a search group are copied to the group field
+                # for searching acrosss all fields of a given type.
+                $def->{copy_to} = $search_group;
             }
 
         } else {
-            # Fields that are only used for aggregation and sorting
-            # and filtering get no full-text treatment.
+            # Non-search fields -- used for sorting, aggregation,
+            # and "code" (raw value) searches -- are only indexed
+            # as (non-analyzed) keyword fields.
             $def = {type => 'keyword'};
         }
 
-        $logger->info("ES adding field $field_name: ". 
+        # Apply field boost.
+        $def->{boost} = $field->{weight} if ($field->{weight} || 1) > 1;
+
+        $logger->debug("ES adding field $field_name: ". 
             OpenSRF::Utils::JSON->perl2JSON($def));
 
         $mappings->{$field_name} = $def;
     }
 
     my $settings = $BASE_INDEX_SETTINGS;
-    $settings->{number_of_replicas} = scalar(@{$self->{servers}});
+    $settings->{number_of_replicas} = scalar(@{$self->{nodes}});
     $settings->{number_of_shards} = $self->index->{num_shards};
 
     my $conf = {
@@ -184,8 +184,6 @@ sub create_index {
         body => {
             settings => $settings,
             mappings => {record => {properties => $mappings}}
-            # document type (i.e. 'record') deprecated in v6
-            #mappings => {properties => $mappings}
         }
     };
 
@@ -222,72 +220,104 @@ sub populate_index {
     $logger->info("ES bib indexing complete with $total_indexed records");
 }
 
-# TODO add support for last_edit_date for partial re-indexing
-sub get_bib_records {
+sub get_bib_ids {
     my ($self, $state, $record_id) = @_;
+    return [$record_id] if $record_id;
+
+    # TODO add support for last_edit_date
+    my $last_id = $state->{last_bib_id};
 
     my $sql = <<SQL;
-SELECT bre.id, bre.create_date, bre.edit_date, bre.source AS bib_source
+SELECT bre.id
 FROM biblio.record_entry bre
-SQL
-
-    if ($record_id) {
-        $sql .= " WHERE bre.id = $record_id"
-    } else {
-        my $last_id = $state->{last_bib_id};
-        $sql .= <<SQL;
 WHERE NOT bre.deleted AND bre.active AND bre.id > $last_id
 ORDER BY bre.edit_date, bre.id LIMIT $BIB_BATCH_SIZE
 SQL
-    }
+
+    my $ids = $self->get_db_rows($sql);
+    return [ map {$_->{id}} @$ids ];
+}
+
+sub get_bib_data {
+    my ($self, $record_ids) = @_;
+
+    my $ids_str = join(',', @$record_ids);
+
+    my $sql = <<SQL;
+SELECT 
+    bre.id, 
+    bre.create_date, 
+    bre.edit_date, 
+    bre.source AS bib_source,
+    (elastic.bib_record_properties(bre.id)).*
+FROM biblio.record_entry bre
+WHERE id IN ($ids_str)
+SQL
 
     return $self->get_db_rows($sql);
 }
 
-# TODO partial re-index
 sub populate_bib_search_index_page {
     my ($self, $state) = @_;
 
     my $index_count = 0;
     my $last_id = $state->{last_bib_id};
 
-    my $bib_data = $self->get_bib_records($state);
-    return 0 unless @$bib_data;
+    my $bib_ids = $self->get_bib_ids($state);
+    return 0 unless @$bib_ids;
 
-    my $bib_ids = [ map {$_->{id}} @$bib_data ];
+    my $bib_data = $self->get_bib_data($bib_ids);
 
     my $holdings = $self->load_holdings($bib_ids);
 
-    my $fields = $self->get_db_rows(                                           
-        'SELECT * FROM elastic.bib_index_properties');                         
-
-    for my $bib (@$bib_data) {
-        my $bib_id = $bib->{id};
+    for my $bib_id (@$bib_ids) {
 
         my $body = {
-            bib_source => $bib->{bib_source},
             holdings => $holdings->{$bib_id} || []
         };
 
-        for my $df (q/create_date edit_date/) {
-            next unless $bib->{$df};
-            # ES wants ISO dates with the 'T' separator
-            (my $val = $bib->{$df}) =~ s/ /T/g;
-            $body->{$df} = $val;
-        }
-
-        my $fields = $self->get_db_rows(
-            "SELECT * FROM elastic.bib_record_properties($bib_id)");
+        # there are multiple rows per bib in the data list.
+        my @fields = grep {$_->{id} == $bib_id} @$bib_data;
+
+        my $first = 1;
+        for my $field (@fields) {
+        
+            if ($first) {
+                $first = 0;
+                # some values are repeated per field. 
+                # extract them from the first entry.
+                $body->{bib_source} = $field->{bib_source};
+
+                # ES ikes the "T" separator for ISO dates
+                ($body->{create_date} = $field->{create_date}) =~ s/ /T/g;
+                ($body->{edit_date} = $field->{edit_date}) =~ s/ /T/g;
+            }
 
-        for my $field (@$fields) {
             my $fclass = $field->{search_group};
             my $fname = $field->{name};
             $fname = "$fclass|$fname" if $fclass;
-            $body->{$fname} = $field->{value}
+
+            if ($body->{$fname}) {
+                if (ref $body->{$fname}) {
+                    # Three or more values encountered for field.
+                    # Add to the list.
+                    push(@{$body->{$fname}}, $field->{value});
+                } else {
+                    # Second value encountered for field.
+                    # Upgrade to array storage.
+                    $body->{$fname} = [
+                        $body->{$fname},
+                        $field->{value}
+                    ]
+                }
+            } else {
+                # First value encountered for field.
+                # Assume for now there will only be one value.
+                $body->{$fname} = $field->{value}
+            }
         }
 
-        return 0 unless 
-            $self->index_document($bib_id, $body);
+        return 0 unless $self->index_document($bib_id, $body);
 
         $state->{last_bib_id} = $bib_id;
         $index_count++;
index f0557f6..5125c5a 100644 (file)
@@ -53,6 +53,7 @@ CREATE OR REPLACE VIEW elastic.bib_index_properties AS
             cmf.field_class AS search_group,
             FALSE AS sorter,
             TRUE AS multi,
+            -- always treat identifier fields as non-search fields.
             (cmf.field_class <> 'identifier' AND cmf.search_field) AS search_field,
             cmf.facet_field,
             cmf.weight
@@ -60,6 +61,8 @@ CREATE OR REPLACE VIEW elastic.bib_index_properties AS
         WHERE cmf.search_field OR cmf.facet_field
     ) fields;
 
+-- Note this could be done with a view, but pushing the bib ID
+-- filter down to the base filter makes it a lot faster.
 CREATE OR REPLACE FUNCTION elastic.bib_record_properties(bre_id BIGINT) 
     RETURNS TABLE (
         search_group TEXT,
@@ -71,18 +74,43 @@ CREATE OR REPLACE FUNCTION elastic.bib_record_properties(bre_id BIGINT)
 DECLARE
 BEGIN
     RETURN QUERY EXECUTE $$
-        SELECT record.* FROM (
-            SELECT NULL::TEXT AS search_group, crad.name, mrs.source, mrs.value
+        SELECT DISTINCT record.* FROM (
+
+            -- record sorter values
+            SELECT 
+                NULL::TEXT AS search_group, 
+                crad.name, 
+                mrs.source, 
+                mrs.value
             FROM metabib.record_sorter mrs
             JOIN config.record_attr_definition crad ON (crad.name = mrs.attr)
             WHERE mrs.source = $$ || QUOTE_LITERAL(bre_id) || $$
             UNION
-            SELECT NULL::TEXT AS search_group, crad.name, mraf.id AS source, mraf.value
+
+            -- record attributes
+            SELECT 
+                NULL::TEXT AS search_group, 
+                crad.name, 
+                mraf.id AS source, 
+                mraf.value
             FROM metabib.record_attr_flat mraf
             JOIN config.record_attr_definition crad ON (crad.name = mraf.attr)
             WHERE mraf.id = $$ || QUOTE_LITERAL(bre_id) || $$
             UNION
-            SELECT cmf.field_class AS search_group, cmf.name, mfe.source, mfe.value
+
+            -- metabib field entries
+            SELECT 
+                cmf.field_class AS search_group, 
+                cmf.name, 
+                mfe.source, 
+                -- Index individual values instead of string-joined values
+                -- so they may be treated individually.  This is useful,
+                -- for example, when aggregating on individual subjects.
+                CASE WHEN cmf.joiner IS NOT NULL THEN
+                    REGEXP_SPLIT_TO_TABLE(mfe.value, cmf.joiner)
+                ELSE
+                    mfe.value
+                END AS value
             FROM (
                 SELECT * FROM metabib.title_field_entry UNION 
                 SELECT * FROM metabib.author_field_entry UNION