LP#1698206: Eliminate Staged Search
authorMike Rylander <mrylander@gmail.com>
Thu, 15 Jun 2017 19:54:40 +0000 (15:54 -0400)
committerKathy Lussier <klussier@masslnc.org>
Mon, 28 Aug 2017 15:14:53 +0000 (11:14 -0400)
=== Background
Evergreen stores all data, including that useful for patron and staff search,
in a normalized schema that is time and space efficient for transactional use
cases, and provides guarantees on data integrity.  In addition, development is
made simpler than would be the case otherwise and arbitrary reporting is made
possible.

However, this structure is not effective for direct, SQL-only search
functionality in a hierarchical, consortial dataset.  This is a problem that
is relatively unique to Evergreen, as it is most often employed to host and
serve large consortia with overlapping bibliographic datasets and
non-overlapping item and location datasets.  Other search engines, including
those built into other ILSs, do not generally have to account for
hierarchically organized location visibility concerns as a primary use case.
In other words, because it provides functionality that requires a hierarchical
view of non-bibliographic data, a problem space for Evergreen is essentially
nonexistent in competing products.

Evergreen's search infrastructure has evolved over the years.  In its current
form, the software first performs a full text search against extracted
bibliographic data and limits this initial internal result set to a
configurable size.  It then investigates the visibility of each result on
several non-bibliographic axes.  These visibility tests take up the
preponderance of CPU time spent in search, with full text search of the
bibliographic data generally completing within milliseconds. The main reason
this multi-stage mechanism is used is that there are many visibility axes and
attempting to join all required data sources together in a single query will
cause the search use case to perform very poorly.  A previous attempt to
create a pure SQL search mechanism failed for this reason.

A significant drawback of the current approach is that the costs imposed by
visibility filtering search results using normalized non-bibliographic data,
either in-query or separated from the main full-text query as it is today,
make it necessary to place limits on the number of database rows matched by
full-text query constructs.  This in turn can cause searches to omit results
in certain situations, such as a large consortium consisting of a few large
libraries and many small libraries.

However, it has been shown possible to overcome this performance issue by
providing an extensible way to collect all visibility related information
together into a small number of novel data structures with a compact in-memory
representation and very fast comparison functions.  In this way, we are able
to use pure SQL search strategies and therefore avoid result visibility
problems while also benefiting from improvements to the core PostgreSQL
database engine.  Further, this will open the door to indexing improvements,
such as removal of the need for duplicate data storage, or the use of non-XML
data storage schemes, which could reduce resource requirements and have a
direct, positive effect on patron and staff search experience.

=== Overview of existing search logic

. Construct core bibliographic search query
. Collect non-bibliographic filtering criteria
. Pass query and filters to a database function
. Calculate hierarchical location information for visibility testing
. Open cursor over core query, limited to *superpage_size * max_superpages* records
. Core query implements bib-level sorting
. For each result
.. NEXT if not on requested superpage
.. Check deleted flag, based on search type
.. Check transcendence
... Return result if true
.. Check for direct Located URI in scope
... Return result if exists
.. Check copy status + (circ lib | owning lib) based on modifier
.. Check peer bib copy status + (circ lib | owning lib) based on modifier
.. Check copy location based on filter
.. Check peer bib copy location based on filter
.. General copy visibility checks
... If NOT staff
.... Check for OPAC visible copies (trigger-maintained materialization)
.... Check for peer bib OPAC visible copies
... If staff
.... Confirm no copies here
.... Confirm no peer bib map
.... Confirm no copies anywhere
.... Confirm no Located URIs elsewhere
.. Return result if not excluded
. Calculate summary row

=== Overview of new mechanism
Record and copy information (everything checked in *(7)* above) is collected
into a novel data structure that allows all visibility-indicating criteria to
be flattened to integer arrays.  This is facilitated by a database trigger in
much the same way that basic OPAC copy visibility is collected for copies
today.

Most identifiers in Evergreen are stored as signed integers of either 32 or 64
bits.  The smaller 32 bit space allows for approximately two billion positive
entries, but all identifiers for table rows that are used as visibility axes
fall into a range of between one and one million for all applicable use cases,
and all identifiers of interest are positive.  Therefore, we can make use of
the most significant bits in an integer value to create a per-axis namespacing
mask.  When applied to the idenfitifer for a visibility axis identifier, this
mask allows two values that are identical across axis to be identified as
unique within a combined set of all values.

Sepcifically, we retain the four most significant bits of the integer space
and create from that 16 potential bitmasks for per-axis segregation of
identifiers.  Further, we separate copy-centered axes and bibliographic
record-centered attributes into two separate columns for storage purposes,
which means we can use the same four bits for different purposes within each
copy or bib set.

In order to implement existing visibility tests with this infrastructure, six
copy axes and two record axes are used from the possible 16 from each set.
See the search.calculate_visibility_attribute() for details.  By using 32 bit
integers we can collect all of the bitmasked values of each type (copy or bib)
into a single integer array and leverage the Postgres intarray extension to
test all axes at once.

At search time, required and user-requested visibility restrictions are
converted to *query_int* values. Results are directly filtered based on these
calculated *query_int* values.  This works in a way analogous to record
attribute filtering, avoiding the need to test statuses, circ and owning
library visibility, copy locations and location groups, copy OPAC visibility,
peer bibliographic record, Located URIs, or bibliographic record sources
directly.

=== Minimum Postgres version requirement
Due to features, particularly functions, available only in 9.4 and newer that
are key to the performance of the new method, Postgres 9.4 will need to be the
new lowest supported version for use with Evergreen.  While some of the new
features and functions could be implemented as user-defined functions in
PL/PGSQL, they would not be fast enough to make this pure-SQL search viable.

Among the important improvements that Postgres 9.4 and newer versions bring to
Evergreen are:

* Version 9.4 improved GIN indexes in ways that directly benefit Evergreen, as well as how anti-joins are planned which matters for some Evergreen searches.
* Version 9.5 introduced many general performance improvements, especially for joins and sorting, and brought planner improvements that impact complex queries such as those generated by this code.
* Version 9.6 delivered more general performance improvements, particularly for large servers such as those that Evergreen databases tend to live on, as well as more improvements to GIN indexes, executor changes that can avoid unnecessary work in search queries, new built-in full-text phrase searching, and initial parallel query execution.

=== Performance
The cost of the non-bibliographic filter value caching maintenance process is
10-40% faster than existing partial caching logic which it would replace.

The new code achieves up to 10% faster search times than the old, suboptimal
mechanism time for broad searches.  The new code is faster for more selective
searches, often by up to 90% faster.  In both broad and narrow search cases
the new mechanism performs with complete accuracy and does not miss
small-collection hits in large consortia as the existing code does.

Unsurprisingly, and in addition to the above improvements, performance is
improved marginally as each successive Postgres version at and beyond 9.4.

=== Page rendering changes
Previously, Evergreen would request the record details for a user-visible page
of results in parallel, and then, serially, request the facet data for the
result set.  Now, the facet data is requested asyncronously in the background
and then a single feed containing all records on a result page is requested
syncronously.  By parallelizing the result and facet metadata, page rendering
time is cut down significantly.  Concurrent requests of the same bibliographic
record are shared between apache backends to reduce result request time, and by
making one request instead of ten simultaineously, database load is reduced.  A
performance improvement of up to 20% in post-search page rendering time is seen
from this change.

Additionally, cross-apache caching of ancillary data, such as the coded value
map and other data, via memcache significantly reduces the average page
rendering time not just for result pages, but most pages generated by
Evergreen.  An additional performance improvement of up to 50% in post-search
page rendering time is seen from this change.

While these changes are not directly related to the removal staged search, they
touch areas impacted by core search changes and provided enough improvement
that implementing them concurrently with the elimination of staged search
seemed optimal.

=== User visible configuration changes
The stock configuration now provides an increased value for *max_superpages*
in opensrf.xml.  The default is now 100, and the *superpage_size* remains
1000, for a total limit of 100,000 hits per search.  This is not a limit on
visibility per se, as all records are visibility tested and ranked before
limiting, but simply a limit on the number of pages a user could click through
before reaching the end of the presented result list.

=== Tuning sensitivity
User-level timeouts are still possible with both the old and new code, given a
large enough dataset, a broad enough query, and a cold cache.  However, the
*gin_fuzzy_search_limit* GUC can be used to set a time cap on the new
mechanism. See https://www.postgresql.org/docs/9.6/static/gin-tips.html for
background, though the suggested values in the documentation are significantly
lower than would be readily useful for a large Evergreen instance.

Because it uses a more complex query structure, the new mechanism is somewhat
more sensitive to Postgres tuning in general.  In particular, lowering
*random_page_cost* from the default of *4.0* to a more reasonable *2.0* is
important for proper query planning.  For Evergreen use cases where the search
indexes and relevant tables are kept in RAM or SSDs are used for storage, this
value is acceptable and useful in general.

=== Funding and development
This project was funded by MassLNC and developed by Equinox Open Library
Initiative.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Galen Charlton <gmc@equinoxinitiative.org>
Signed-off-by: Kathy Lussier <klussier@masslnc.org>
Conflicts:
Open-ILS/src/perlmods/lib/OpenILS/WWW/EGCatLoader/Util.pm

Signed-off-by: Kathy Lussier <klussier@masslnc.org>
Open-ILS/examples/opensrf.xml.example
Open-ILS/src/perlmods/lib/OpenILS/Application/AppUtils.pm
Open-ILS/src/perlmods/lib/OpenILS/Application/Search/Biblio.pm
Open-ILS/src/perlmods/lib/OpenILS/Application/Storage/Driver/Pg/QueryParser.pm
Open-ILS/src/perlmods/lib/OpenILS/Application/Storage/Publisher/metabib.pm
Open-ILS/src/perlmods/lib/OpenILS/WWW/EGCatLoader/Util.pm
Open-ILS/src/sql/Pg/990.schema.unapi.sql
Open-ILS/src/sql/Pg/upgrade/XXXX.schema.copy_vis_attr_cache.sql [new file with mode: 0644]
Open-ILS/src/templates/opac/parts/result/paginate.tt2

index bbf0c52..48a3714 100644 (file)
@@ -683,7 +683,7 @@ vim:et:ts=4:sw=4:
                     <superpage_size>1000</superpage_size>
 
                     <!-- How many superpages to consider for searching overall. -->
-                    <max_superpages>10</max_superpages>
+                    <max_superpages>100</max_superpages>
 
                     <!-- zip code database file -->
                     <!--<zips_file>LOCALSTATEDIR/data/zips.txt</zips_file>-->
index a56443d..a8661cc 100644 (file)
@@ -763,6 +763,18 @@ sub find_org {
     return undef;
 }
 
+sub find_org_by_shortname {
+    my( $self, $org_tree, $shortname )  = @_;
+    return undef unless $org_tree and defined $shortname;
+    return $org_tree if ( $org_tree->shortname eq $shortname );
+    return undef unless ref($org_tree->children);
+    for my $c (@{$org_tree->children}) {
+        my $o = $self->find_org_by_shortname($c, $shortname);
+        return $o if $o;
+    }
+    return undef;
+}
+
 sub fetch_non_cat_type_by_name_and_org {
     my( $self, $name, $orgId ) = @_;
     $logger->debug("Fetching non cat type $name at org $orgId");
@@ -1462,6 +1474,12 @@ sub get_org_tree {
     return $tree;
 }
 
+sub get_global_flag {
+    my($self, $flag) = @_;
+    return undef unless ($flag);
+    return OpenILS::Utils::CStoreEditor->new->retrieve_config_global_flag($flag);
+}
+
 sub get_org_descendants {
     my($self, $org_id, $depth) = @_;
 
index 670076b..361b359 100644 (file)
@@ -1409,7 +1409,7 @@ sub retrieve_cached_facets {
 
     eval {
         local $SIG{ALRM} = sub {die};
-        alarm(2); # we'll sleep for as much as 2s
+        alarm(4); # we'll sleep for as much as 4s
         do {
             die if $cache->get_cache($key . '_COMPLETE');
         } while (sleep(0.05));
index babaa46..f643d5f 100644 (file)
@@ -841,10 +841,10 @@ sub toSQL {
         $bre_join = 'INNER JOIN biblio.record_entry bre ON m.source = bre.id AND bre.deleted';
         # The above suffices for filters too when the #deleted modifier
         # is in use.
-    } elsif ($$flat_plan{uses_bre}) {
+    } elsif ($$flat_plan{uses_bre} or !$self->find_modifier('staff')) {
         $bre_join = 'INNER JOIN biblio.record_entry bre ON m.source = bre.id';
     }
-    
+
     my $desc = 'ASC';
     $desc = 'DESC' if ($self->find_modifier('descending'));
 
@@ -963,13 +963,110 @@ sub toSQL {
     my $key = 'm.source';
     $key = 'm.metarecord' if (grep {$_->name eq 'metarecord' or $_->name eq 'metabib'} @{$self->modifiers});
 
-    my $core_limit = $self->QueryParser->core_limit || 25000;
-    $core_limit = 1 if($self->find_modifier('lucky'));
+    my $core_limit = $self->QueryParser->core_limit || 'NULL';
+    if ($self->find_modifier('lucky')) {
+        $filters{check_limit} = 1;
+        $filters{skip_check} = 0;
+       $core_limit = 1;
+    }
+
 
     my $flat_where = $$flat_plan{where};
     if ($flat_where ne '') {
         $flat_where = "AND (\n" . ${spc} x 5 . $flat_where . "\n" . ${spc} x 4 . ")";
     }
+
+    my $final_c_attr_test;
+    my $c_attr_join = '';
+    my $c_vis_test = '';
+    my $pc_vis_test = '';
+
+    # copy visibility testing
+    if (!$self->find_modifier('staff')) {
+        $pc_vis_test = "c_attrs";
+        $c_attr_join = ",c_attr"
+    }
+
+    if ($self->find_modifier('available')) {
+        push @{$$flat_plan{vis_filter}{'c_attr'}},
+            "search.calculate_visibility_attribute_test('status','{0,7,12}')";
+    }
+
+    if (@{$$flat_plan{vis_filter}{c_attr}}) {
+        $c_vis_test = join(",",@{$$flat_plan{vis_filter}{c_attr}});
+        $c_attr_join = ',c_attr';
+    }
+
+    if ($c_vis_test or $pc_vis_test) {
+        my $vis_test = '';
+
+        if ($c_vis_test and $pc_vis_test) {
+            $vis_test = $pc_vis_test . ",". $c_vis_test;
+        } elsif ($pc_vis_test) {
+            $vis_test = $pc_vis_test;
+        } else {
+            $vis_test = $c_vis_test;
+        }
+
+        # WITH-clause just generates vis test
+        $$flat_plan{with} .= "\n," if $$flat_plan{with};
+        $$flat_plan{with} .= "c_attr AS (SELECT (ARRAY_TO_STRING(ARRAY[$vis_test],'&'))::query_int AS vis_test FROM asset.patron_default_visibility_mask() x)";
+
+        $final_c_attr_test = 'EXISTS (SELECT 1 FROM asset.copy_vis_attr_cache WHERE record = m.source AND vis_attr_vector @@ c_attr.vis_test)';
+        if (!$pc_vis_test) { # staff search
+            $final_c_attr_test = '(' . $final_c_attr_test . ' OR NOT EXISTS (SELECT 1 FROM asset.copy_vis_attr_cache WHERE record = m.source))';
+        }
+    }
+    my $final_b_attr_test;
+    my $b_attr_join = '';
+    my $b_vis_test = '';
+    my $pb_vis_test = '';
+
+    # bib visibility testing
+    if (!$self->find_modifier('staff')) {
+        $pb_vis_test = "b_attrs";
+        $b_attr_join = ",b_attr"
+    }
+
+    if (@{$$flat_plan{vis_filter}{b_attr}}) {
+        $b_attr_join = ',b_attr ';
+        $b_vis_test = join("||'&'||",@{$$flat_plan{vis_filter}{b_attr}});
+    }
+
+    if ($b_vis_test or $pb_vis_test) {
+        my $vis_test = '';
+
+        if ($b_vis_test and $pb_vis_test) {
+            $vis_test = $pb_vis_test . ",". $b_vis_test;
+        } elsif ($pb_vis_test) {
+            $vis_test = $pb_vis_test;
+        } else {
+            $vis_test = $b_vis_test;
+        }
+
+        # WITH-clause just generates vis test
+        $$flat_plan{with} .= "\n," if $$flat_plan{with};
+        $$flat_plan{with} .= "b_attr AS (SELECT (ARRAY_TO_STRING(ARRAY[$vis_test],'&'))::query_int AS vis_test FROM asset.patron_default_visibility_mask() x)";
+
+        # These are magic numbers... see: search.calculate_visibility_attribute() UDF
+        $final_b_attr_test = '(b_attr.vis_test IS NULL OR bre.vis_attr_vector @@ b_attr.vis_test)';
+        if (!$pb_vis_test) { # staff search
+            $final_b_attr_test .= " OR NOT ( int4range(0,268435455,'[]') @> ANY(bre.vis_attr_vector) )";
+        }
+    }
+
+    if ($final_c_attr_test or $final_b_attr_test) { # something...
+        if ($final_c_attr_test and $final_b_attr_test) { # both!
+            my $plan = "($final_c_attr_test) OR ($final_b_attr_test)";
+            $flat_where .= "\n" . ${spc} x 4 . "AND (\n" . ${spc} x 5 .  $plan .  "\n" . ${spc} x 4 . ")";
+        } elsif ($final_c_attr_test) { # just copies...
+            $flat_where .= "\n" . ${spc} x 4 . "AND (\n" . ${spc} x 5 .  $final_c_attr_test .  "\n" . ${spc} x 4 . ")";
+        } else { # just bibs...
+            $flat_where .= "\n" . ${spc} x 4 . "AND (\n" . ${spc} x 5 .  $final_b_attr_test .  "\n" . ${spc} x 4 . ")";
+        }
+    }
+
     my $with = $$flat_plan{with};
     $with= "\nWITH $with" if $with;
 
@@ -982,27 +1079,45 @@ sub toSQL {
     }
 
     my $sql = <<SQL;
+WITH w AS (
+
 $with
-SELECT  $key AS id,
-        $agg_records,
-        (${rel})::NUMERIC AS rel,
-        $rank AS rank, 
-        FIRST(pubdate_t.value) AS tie_break,
-        STRING_AGG(ARRAY_TO_STRING(pop_with.badges,','),',') AS badges,
-        AVG(COALESCE(pop_with.total_score::NUMERIC,0.0::NUMERIC))::NUMERIC(2,1) AS popularity
-  FROM  metabib.metarecord_source_map m
-        $$flat_plan{from}
-        $mra_join
-        $mrv_join
-        $bre_join
-        $pop_join
-        $pubdate_join
-        $lang_join
-  WHERE 1=1
-        $flat_where
-  GROUP BY 1
-  ORDER BY 4 $desc $nullpos, $pop_extra_sort 5 DESC $nullpos, 3 DESC
-  LIMIT $core_limit
+SELECT  id,
+        rel,
+        CASE WHEN cardinality(records) = 1 THEN records[1] ELSE NULL END AS record,
+        NULL::INT AS total,
+        NULL::INT AS checked,
+        NULL::INT AS visible,
+        NULL::INT AS deleted,
+        NULL::INT AS excluded,
+        badges,
+        popularity
+  FROM  (SELECT $key AS id,
+                $agg_records,
+                ${rel}::NUMERIC AS rel,
+                $rank AS rank, 
+                FIRST(pubdate_t.value) AS tie_break,
+                STRING_AGG(ARRAY_TO_STRING(pop_with.badges,','),',') AS badges,
+                AVG(COALESCE(pop_with.total_score::NUMERIC,0.0::NUMERIC))::NUMERIC(2,1) AS popularity
+          FROM  metabib.metarecord_source_map m
+                $$flat_plan{from}
+                $mra_join
+                $mrv_join
+                $bre_join
+                $pop_join
+                $pubdate_join
+                $lang_join
+                $c_attr_join
+                $b_attr_join
+          WHERE 1=1
+                $flat_where
+          GROUP BY 1
+          ORDER BY 4 $desc $nullpos, $pop_extra_sort 5 DESC $nullpos, 3 DESC
+          LIMIT $core_limit
+        ) AS core_query
+) (SELECT * FROM w LIMIT $filters{check_limit} OFFSET $filters{skip_check})
+        UNION ALL
+  SELECT NULL,NULL,NULL,COUNT(*),COUNT(*),COUNT(*),0,0,NULL,NULL FROM w;
 SQL
 
     warn $sql if $self->QueryParser->debug;
@@ -1018,6 +1133,7 @@ sub flatten {
     my $from = shift || '';
     my $where = shift || '';
     my $with = '';
+    my %vis_filter = ( c_attr => [], b_attr => [] );
     my $uses_bre = 0;
     my $uses_mrd = 0;
     my $uses_mrv = 0;
@@ -1197,6 +1313,11 @@ sub flatten {
 
     my $joiner = "\n" . ${spc} x ( $self->plan_level + 5 ) . ($self->joiner eq '&' ? 'AND ' : 'OR ');
 
+    my ($depth_filter) = grep { $_->name eq 'depth' } @{$self->filters};
+    if ($depth_filter and @{$depth_filter->args} == 1) {
+        $depth_filter = $depth_filter->args->[0];
+    }
+
     my @dlist = ();
     my $common = 0;
     # for each dynamic filter, build more of the WHERE clause
@@ -1357,6 +1478,58 @@ sub flatten {
                     $where .= "$key ${NOT}IN (" . join(',', map { $self->QueryParser->quote_value($_) } @{$filter->args}) . ')';
                 }
 
+            } elsif ($filter->name eq 'site') {
+                if (@{$filter->args} == 1) {
+                    if (!defined($depth_filter) or $depth_filter > 0) { # no point in filtering by "all"
+                        my $sitename = $filter->args->[0];
+
+                        my $ot = $U->get_org_tree;
+                        my $site_org = $U->find_org_by_shortname($ot, $sitename);
+
+                        if ($site_org and $site_org->id != $ot->id) { # no point in filtering by "all"
+                            my $dorgs = $U->get_org_descendants($site_org->id, $depth_filter);
+                            my $aorgs = $U->get_org_ancestors($site_org->id);
+
+                            my $negate = $filter->negate ? 'TRUE' : 'FALSE';
+                            push @{$vis_filter{'c_attr'}},
+                                "search.calculate_visibility_attribute_test('circ_lib','{".join(',', @$dorgs)."}',$negate)";
+
+                            my $lorgs = [@$aorgs];
+                            my $luri_as_copy_gf = $U->get_global_flag('opac.located_uri.act_as_copy');
+                            push @$lorgs, @$dorgs if (
+                                $luri_as_copy_gf
+                                and $U->is_true($luri_as_copy_gf->enabled)
+                                and $U->is_true($luri_as_copy_gf->value)
+                            );
+
+                            $uses_bre = 1;
+                            push @{$vis_filter{'b_attr'}},
+                                "search.calculate_visibility_attribute_test('luri_org','{".join(',', @$lorgs)."}',$negate)";
+                        }
+                    }
+                }
+
+            } elsif ($filter->name eq 'locations') {
+                if (@{$filter->args} > 0) {
+                    my $negate = $filter->negate ? 'TRUE' : 'FALSE';
+                    push @{$vis_filter{'c_attr'}},
+                        "search.calculate_visibility_attribute_test('location','{".join(',', @{$filter->args})."}',$negate)";
+                }
+
+            } elsif ($filter->name eq 'location_groups') {
+                if (@{$filter->args} > 0) {
+                    my $negate = $filter->negate ? 'TRUE' : 'FALSE';
+                    push @{$vis_filter{'c_attr'}},
+                        "search.calculate_visibility_attribute_test('location_group','{".join(',', @{$filter->args})."}',$negate)";
+                }
+
+            } elsif ($filter->name eq 'statuses') {
+                if (@{$filter->args} > 0) {
+                    my $negate = $filter->negate ? 'TRUE' : 'FALSE';
+                    push @{$vis_filter{'c_attr'}},
+                        "search.calculate_visibility_attribute_test('status','{".join(',', @{$filter->args})."}',$negate)";
+                }
+
             } elsif ($filter->name eq 'has_browse_entry') {
                 if (@{$filter->args} >= 2) {
                     my $entry = int(shift @{$filter->args});
@@ -1401,13 +1574,11 @@ sub flatten {
                     }
                 }
             } elsif ($filter->name eq 'bib_source') {
-                $uses_bre = 1;
-
                 if (@{$filter->args} > 0) {
-                    $where .= $joiner if $where ne '';
-                    $where .= "${NOT}COALESCE(bre.source IN ("
-                           . join(',', map { $self->QueryParser->quote_value($_) } @{ $filter->args })
-                           . "), false)";
+                    $uses_bre = 1;
+                    my $negate = $filter->negate ? 'TRUE' : 'FALSE';
+                    push @{$vis_filter{'c_attr'}},
+                        "search.calculate_visibility_attribute_test('source','{".join(',', @{$filter->args})."}',$negate)";
                 }
             } elsif ($filter->name eq 'from_metarecord') {
                 if (@{$filter->args} > 0) {
@@ -1442,6 +1613,7 @@ sub flatten {
         from => $from,
         where => $where,
         with => $with,
+        vis_filter => \%vis_filter,
         uses_bre => $uses_bre,
         uses_mrv => $uses_mrv,
         uses_mrd => $uses_mrd
index d8a65c1..2e724de 100644 (file)
@@ -3001,7 +3001,7 @@ sub query_parser_fts {
 
 
     # gather the limit or default to 10
-    my $limit = $args{check_limit} || 'NULL';
+    my $limit = $args{check_limit};
     if (my ($filter) = $query->parse_tree->find_filter('limit')) {
             $limit = $filter->args->[0] if (@{$filter->args});
     }
@@ -3011,7 +3011,7 @@ sub query_parser_fts {
 
 
     # gather the offset or default to 0
-    my $offset = $args{skip_check} || $args{offset} || 0;
+    my $offset = $args{skip_check} || $args{offset};
     if (my ($filter) = $query->parse_tree->find_filter('offset')) {
             $offset = $filter->args->[0] if (@{$filter->args});
     }
@@ -3077,7 +3077,8 @@ sub query_parser_fts {
 
     my $param_search_ou = $ou;
     my $param_depth = $depth; $param_depth = 'NULL' unless (defined($depth) and length($depth) > 0 );
-    my $param_core_query = "\$core_query_$$\$" . $query->parse_tree->toSQL . "\$core_query_$$\$";
+#    my $param_core_query = "\$core_query_$$\$" . $query->parse_tree->toSQL . "\$core_query_$$\$";
+    my $param_core_query = $query->parse_tree->toSQL;
     my $param_statuses = '$${' . join(',', map { s/\$//go; "\"$_\""} @statuses) . '}$$';
     my $param_locations = '$${' . join(',', map { s/\$//go; "\"$_\""} @location) . '}$$';
     my $staff = ($self->api_name =~ /staff/ or $query->parse_tree->find_modifier('staff')) ? "'t'" : "'f'";
@@ -3085,22 +3086,27 @@ sub query_parser_fts {
     my $metarecord = ($self->api_name =~ /metabib/ or $query->parse_tree->find_modifier('metabib') or $query->parse_tree->find_modifier('metarecord')) ? "'t'" : "'f'";
     my $param_pref_ou = $pref_ou || 'NULL';
 
+#    my $sth = metabib::metarecord_source_map->db_Main->prepare(<<"    SQL");
+#        SELECT  * -- bib search: $args{query}
+#          FROM  search.query_parser_fts(
+#                    $param_search_ou\:\:INT,
+#                    $param_depth\:\:INT,
+#                    $param_core_query\:\:TEXT,
+#                    $param_statuses\:\:INT[],
+#                    $param_locations\:\:INT[],
+#                    $param_offset\:\:INT,
+#                    $param_check\:\:INT,
+#                    $param_limit\:\:INT,
+#                    $metarecord\:\:BOOL,
+#                    $staff\:\:BOOL,
+#                    $deleted_search\:\:BOOL,
+#                    $param_pref_ou\:\:INT
+#                );
+#    SQL
+
     my $sth = metabib::metarecord_source_map->db_Main->prepare(<<"    SQL");
-        SELECT  * -- bib search: $args{query}
-          FROM  search.query_parser_fts(
-                    $param_search_ou\:\:INT,
-                    $param_depth\:\:INT,
-                    $param_core_query\:\:TEXT,
-                    $param_statuses\:\:INT[],
-                    $param_locations\:\:INT[],
-                    $param_offset\:\:INT,
-                    $param_check\:\:INT,
-                    $param_limit\:\:INT,
-                    $metarecord\:\:BOOL,
-                    $staff\:\:BOOL,
-                    $deleted_search\:\:BOOL,
-                    $param_pref_ou\:\:INT
-                );
+        -- bib search: $args{query}
+        $param_core_query
     SQL
 
     $sth->execute;
@@ -3268,6 +3274,27 @@ sub query_parser_fts_wrapper {
         }
     }
 
+    # gather the limit or default to 10
+    my $limit = delete($args{check_limit}) || $base_plan->superpage_size;
+    if (my ($filter) = $base_plan->parse_tree->find_filter('limit')) {
+            $limit = $filter->args->[0] if (@{$filter->args});
+    }
+    if (my ($filter) = $base_plan->parse_tree->find_filter('check_limit')) {
+            $limit = $filter->args->[0] if (@{$filter->args});
+    }
+
+    # gather the offset or default to 0
+    my $offset = delete($args{skip_check}) || delete($args{offset}) || 0;
+    if (my ($filter) = $base_plan->parse_tree->find_filter('offset')) {
+            $offset = $filter->args->[0] if (@{$filter->args});
+    }
+    if (my ($filter) = $base_plan->parse_tree->find_filter('skip_check')) {
+            $offset = $filter->args->[0] if (@{$filter->args});
+    }
+
+
+    $query = "check_limit($limit) $query" if (defined $limit);
+    $query = "skip_check($offset) $query" if (defined $offset);
     $query = "estimation_strategy($args{estimation_strategy}) $query" if ($args{estimation_strategy});
     $query = "badge_orgs($borgs) $query" if ($borgs);
 
@@ -3275,9 +3302,9 @@ sub query_parser_fts_wrapper {
     $query = "site($args{org_unit}) $query" if ($args{org_unit});
     $query = "depth($args{depth}) $query" if (defined($args{depth}));
     $query = "sort($args{sort}) $query" if ($args{sort});
-    $query = "limit($args{limit}) $query" if ($args{limit});
     $query = "core_limit($args{core_limit}) $query" if ($args{core_limit});
-    $query = "skip_check($args{skip_check}) $query" if ($args{skip_check});
+#    $query = "limit($args{limit}) $query" if ($args{limit});
+#    $query = "skip_check($args{skip_check}) $query" if ($args{skip_check});
     $query = "superpage($args{superpage}) $query" if ($args{superpage});
     $query = "offset($args{offset}) $query" if ($args{offset});
     $query = "#metarecord $query" if ($self->api_name =~ /metabib/);
index eb268dc..9bed9cd 100644 (file)
@@ -55,6 +55,7 @@ sub child_init {
 sub init_ro_object_cache {
     my $self = shift;
     my $ctx = $self->ctx;
+    my $memcache ||= OpenSRF::Utils::Cache->new('global');
 
     # reset org unit setting cache on each page load to avoid the
     # requirement of reloading apache with each org-setting change
@@ -87,12 +88,21 @@ sub init_ro_object_cache {
         my $get_key = "get_$hint";
         my $search_key = "search_$hint";
 
+        my $memcache_key = join('.', 'EGWeb',$locale,$hint) . '.';
+
         # Retrieve the full set of objects with class $hint
         $locale_subs->{$list_key} = sub {
+            my $from_memcache = 0;
+            my $list = $memcache->get_cache($memcache_key.'list');
+            if ($list) {
+                $cache{list}{$locale}{$hint} = $list;
+                $from_memcache = 1;
+            }
             my $method = "retrieve_all_$eclass";
             my $e = new_editor();
             $cache{list}{$locale}{$hint} = $e->$method() unless $cache{list}{$locale}{$hint};
             undef $e;
+            $memcache->put_cache($memcache_key.'list',$cache{list}{$locale}{$hint}) unless $from_memcache;
             return $cache{list}{$locale}{$hint};
         };
 
@@ -336,13 +346,23 @@ my $unapi_cache;
 sub get_records_and_facets {
     my ($self, $rec_ids, $facet_key, $unapi_args) = @_;
 
+    # collect the facet data
+    my $search = OpenSRF::AppSession->create('open-ils.search');
+    my $facet_req;
+    if ($facet_key) {
+        $facet_req = $search->request(
+            'open-ils.search.facet_cache.retrieve', $facet_key
+        );
+    }
+
     $unapi_args ||= {};
     $unapi_args->{site} ||= $self->ctx->{aou_tree}->()->shortname;
     $unapi_args->{depth} ||= $self->ctx->{aou_tree}->()->ou_type->depth;
     $unapi_args->{flesh_depth} ||= 5;
 
     my $is_meta = delete $unapi_args->{metarecord};
-    my $unapi_type = $is_meta ? 'unapi.mmr' : 'unapi.bre';
+    #my $unapi_type = $is_meta ? 'unapi.mmr' : 'unapi.bre';
+    my $unapi_type = $is_meta ? 'unapi.metabib_virtual_record_feed' : 'unapi.biblio_record_entry_feed';
 
     $unapi_cache ||= OpenSRF::Utils::Cache->new('global');
     my $unapi_cache_key_suffix = join(
@@ -356,134 +376,44 @@ sub get_records_and_facets {
 
     my %tmp_data;
     my $outer_self = $self;
-    $self->timelog("get_records_and_facets(): about to call multisession");
-    my $ses = OpenSRF::MultiSession->new(
-        app => 'open-ils.cstore',
-        cap => 10, # XXX config
-        success_handler => sub {
-            my($self, $req) = @_;
-            my $data = $req->{response}->[0]->content;
-
-            $outer_self->timelog("get_records_and_facets(): got response content");
-
-            # Protect against requests for non-existent records
-            return unless $data->{$unapi_type};
-
-            my $xml = XML::LibXML->new->parse_string($data->{$unapi_type})->documentElement;
-
-            $outer_self->timelog("get_records_and_facets(): parsed xml");
-            # Protect against legacy invalid MARCXML that might not have a 901c
-            my $bre_id;
-            my $mmr_id;
-            my $bre_id_nodes =  $xml->find('*[@tag="901"]/*[@code="c"]');
-            if ($bre_id_nodes) {
-                $bre_id =  $bre_id_nodes->[0]->textContent;
-            } else {
-                $logger->warn("Missing 901 subfield 'c' in " . $xml->toString());
-            }
-
-            if ($is_meta) {
-                # extract metarecord ID from mmr.unapi tag
-                for my $node ($xml->getElementsByTagName('abbr')) {
-                    my $title = $node->getAttribute('title');
-                    ($mmr_id = $title) =~ 
-                        s/tag:open-ils.org:U2\@mmr\/(\d+)\/.*/$1/g;
-                    last if $mmr_id;
-                }
-            }
 
-            my $rec_id = $mmr_id ? $mmr_id : $bre_id;
-            $tmp_data{$rec_id} = {
-                id => $rec_id, 
-                bre_id => $bre_id, 
-                mmr_id => $mmr_id,
-                marc_xml => $xml
-            };
-
-            if ($rec_id) {
-                # Let other backends grab our data now that we're done.
-                my $key = 'TPAC_unapi_cache_'.$rec_id.'_'.$unapi_cache_key_suffix;
-                my $cache_data = $unapi_cache->get_cache($key);
-                if ($$cache_data{running}) {
-                    $unapi_cache->put_cache($key, {
-                        bre_id => $bre_id,
-                        mmr_id => $mmr_id,
-                        id => $rec_id, 
-                        marc_xml => $data->{$unapi_type} 
-                    }, 10);
-                }
-            }
-
-            $outer_self->timelog("get_records_and_facets(): end of success handler");
-        }
-    );
-
-    $self->timelog("get_records_and_facets(): about to call ".
-        "$unapi_type via json_query (rec_ids has " . scalar(@$rec_ids));
-
-    my @loop_recs = uniq @$rec_ids;
-    my %rec_timeout;
+    my $sdepth = $unapi_args->{flesh_depth};
+    my $slimit = "acn=>$sdepth,acp=>$sdepth";
+    $slimit .= ",bre=>$sdepth" if $is_meta;
+    my $flesh = $unapi_args->{flesh} || '';
 
-    while (my $bid = shift @loop_recs) {
+    # tag the record with the MR id
+    $flesh =~ s/}$/,mmr.unapi}/g if $is_meta;
 
-        sleep(0.1) if $rec_timeout{$bid};
+    my $ses = OpenSRF::AppSession->create('open-ils.cstore');
 
+    my @loop_recs;
+    for my $bid (@$rec_ids) {
         my $unapi_cache_key = 'TPAC_unapi_cache_'.$bid.'_'.$unapi_cache_key_suffix;
-        my $unapi_data = $unapi_cache->get_cache($unapi_cache_key) || {};
-
-        if ($unapi_data->{running}) { #cache entry from ongoing, concurrent retrieval
-            if (!$rec_timeout{$bid}) {
-                $rec_timeout{$bid} = time() + 10;
-            }
-
-            if ( time() > $rec_timeout{$bid} ) { # we've waited too long. just do it
-                $unapi_data = {};
-                delete $rec_timeout{$bid};
-            } else { # we'll pause next time around to let this one try again
-                push(@loop_recs, $bid);
-                next;
-            }
-        }
+        my $unapi_data = $unapi_cache->get_cache($unapi_cache_key);
 
-        if ($unapi_data->{marc_xml}) { # we got data from the cache
+        if (!$unapi_data || $unapi_data->{running}) { #cache entry not done yet, get our own copy
+            push(@loop_recs, $bid);
+        } else {
             $unapi_data->{marc_xml} = XML::LibXML->new->parse_string($unapi_data->{marc_xml})->documentElement;
             $tmp_data{$unapi_data->{id}} = $unapi_data;
-        } else { # we're the first or we timed out. success_handler will populate the real value
-            $unapi_cache->put_cache($unapi_cache_key, { running => $$ }, 10);
-
-            my $sdepth = $unapi_args->{flesh_depth};
-            my $slimit = "acn=>$sdepth,acp=>$sdepth";
-            $slimit .= ",bre=>$sdepth" if $is_meta;
-            my $flesh = $unapi_args->{flesh} || '';
-
-            # tag the record with the MR id
-            $flesh =~ s/}$/,mmr.unapi}/g if $is_meta;
-
-            $ses->request(
-                'open-ils.cstore.json_query',
-                 {from => [
-                    $unapi_type, $bid, 'marcxml','record', $flesh,
-                    $unapi_args->{site}, 
-                    $unapi_args->{depth}, 
-                    $slimit,
-                    undef, undef, $unapi_args->{pref_lib}
-                ]}
-            );
         }
     }
 
-    # gather up the unapi recs
-    $ses->session_wait(1);
-    $self->timelog("get_records_and_facets():past session wait");
+    my $unapi_req = $ses->request(
+        'open-ils.cstore.json_query',
+         {from => [
+            $unapi_type, '{'.join(',',@loop_recs).'}', 'marcxml', $flesh,
+            $unapi_args->{site}, 
+            $unapi_args->{depth}, 
+            $slimit,
+            undef, undef, $unapi_args->{pref_lib}
+        ]}
+    );
 
     my $facets = {};
-    if ($facet_key) {
+    if ($facet_req) {
         $self->timelog("get_records_and_facets():almost ready to fetch facets");
-        # collect the facet data
-        my $search = OpenSRF::AppSession->create('open-ils.search');
-        my $facet_req = $search->request(
-            'open-ils.search.facet_cache.retrieve', $facet_key
-        );
 
         my $tmp_facets = $facet_req->gather(1);
         $self->timelog("get_records_and_facets(): gathered facet data");
@@ -508,10 +438,66 @@ sub get_records_and_facets {
             }
         }
         $self->timelog("get_records_and_facets(): gathered/sorted facet data");
-        $search->kill_me;
     } else {
         $facets = undef;
     }
+    $search->kill_me;
+
+    my $data = $unapi_req->gather(1);
+
+    $outer_self->timelog("get_records_and_facets(): got response content");
+
+    # Protect against requests for non-existent records
+    return unless $data->{$unapi_type};
+
+    my $doc = XML::LibXML->new->parse_string($data->{$unapi_type})->documentElement;
+
+    $outer_self->timelog("get_records_and_facets(): parsed xml");
+    for my $xml ($doc->getElementsByTagName('record')) {
+        $xml = XML::LibXML->new->parse_string($xml->toString)->documentElement;
+
+        # Protect against legacy invalid MARCXML that might not have a 901c
+        my $bre_id;
+        my $mmr_id;
+        my $bre_id_nodes =  $xml->find('*[@tag="901"]/*[@code="c"]');
+        if ($bre_id_nodes) {
+            $bre_id =  $bre_id_nodes->[0]->textContent;
+        } else {
+            $logger->warn("Missing 901 subfield 'c' in " . $xml->toString());
+        }
+    
+        if ($is_meta) {
+            # extract metarecord ID from mmr.unapi tag
+            for my $node ($xml->getElementsByTagName('abbr')) {
+                my $title = $node->getAttribute('title');
+                ($mmr_id = $title) =~ 
+                    s/tag:open-ils.org:U2\@mmr\/(\d+)\/.*/$1/g;
+                last if $mmr_id;
+            }
+        }
+    
+        my $rec_id = $mmr_id ? $mmr_id : $bre_id;
+        $tmp_data{$rec_id} = {
+            id => $rec_id, 
+            bre_id => $bre_id, 
+            mmr_id => $mmr_id,
+            marc_xml => $xml
+        };
+    
+        if ($rec_id) {
+            # Let other backends grab our data now that we're done.
+            my $key = 'TPAC_unapi_cache_'.$rec_id.'_'.$unapi_cache_key_suffix;
+            my $cache_data = $unapi_cache->get_cache($key);
+            if ($$cache_data{running}) {
+                $unapi_cache->put_cache($key, {
+                    bre_id => $bre_id,
+                    mmr_id => $mmr_id,
+                    id => $rec_id, 
+                    marc_xml => $xml->toString
+                }, 10);
+            }
+        }
+    }
 
     return ($facets, map { $tmp_data{$_} } @$rec_ids);
 }
index b1a407a..617946a 100644 (file)
@@ -321,6 +321,8 @@ RETURNS XML AS $F$ SELECT NULL::XML $F$ LANGUAGE SQL STABLE;
 
 CREATE OR REPLACE FUNCTION unapi.biblio_record_entry_feed ( id_list BIGINT[], format TEXT, includes TEXT[], org TEXT, depth INT DEFAULT NULL, slimit HSTORE DEFAULT NULL, soffset HSTORE DEFAULT NULL, include_xmlns BOOL DEFAULT TRUE, title TEXT DEFAULT NULL, description TEXT DEFAULT NULL, creator TEXT DEFAULT NULL, update_ts TEXT DEFAULT NULL, unapi_url TEXT DEFAULT NULL, header_xml XML DEFAULT NULL ) RETURNS XML AS $F$ SELECT NULL::XML $F$ LANGUAGE SQL STABLE;
 
+CREATE OR REPLACE FUNCTION unapi.metabib_virtual_record_feed ( id_list BIGINT[], format TEXT, includes TEXT[], org TEXT, depth INT DEFAULT NULL, slimit HSTORE DEFAULT NULL, soffset HSTORE DEFAULT NULL, include_xmlns BOOL DEFAULT TRUE, title TEXT DEFAULT NULL, description TEXT DEFAULT NULL, creator TEXT DEFAULT NULL, update_ts TEXT DEFAULT NULL, unapi_url TEXT DEFAULT NULL, header_xml XML DEFAULT NULL ) RETURNS XML AS $F$ SELECT NULL::XML $F$ LANGUAGE SQL STABLE;
+
 CREATE OR REPLACE FUNCTION unapi.memoize (classname TEXT, obj_id BIGINT, format TEXT,  ename TEXT, includes TEXT[], org TEXT, depth INT DEFAULT NULL, slimit HSTORE DEFAULT NULL, soffset HSTORE DEFAULT NULL, include_xmlns BOOL DEFAULT TRUE ) RETURNS XML AS $F$
 DECLARE
     key     TEXT;
@@ -407,6 +409,65 @@ BEGIN
 END;
 $F$ LANGUAGE PLPGSQL STABLE;
 
+CREATE OR REPLACE FUNCTION unapi.metabib_virtual_record_feed ( id_list BIGINT[], format TEXT, includes TEXT[], org TEXT, depth INT DEFAULT NULL, slimit HSTORE DEFAULT NULL, soffset HSTORE DEFAULT NULL, include_xmlns BOOL DEFAULT TRUE, title TEXT DEFAULT NULL, description TEXT DEFAULT NULL, creator TEXT DEFAULT NULL, update_ts TEXT DEFAULT NULL, unapi_url TEXT DEFAULT NULL, header_xml XML DEFAULT NULL ) RETURNS XML AS $F$
+DECLARE
+    layout          unapi.bre_output_layout%ROWTYPE;
+    transform       config.xml_transform%ROWTYPE;
+    item_format     TEXT;
+    tmp_xml         TEXT;
+    xmlns_uri       TEXT := 'http://open-ils.org/spec/feed-xml/v1';
+    ouid            INT;
+    element_list    TEXT[];
+BEGIN
+
+    IF org = '-' OR org IS NULL THEN
+        SELECT shortname INTO org FROM evergreen.org_top();
+    END IF;
+
+    SELECT id INTO ouid FROM actor.org_unit WHERE shortname = org;
+    SELECT * INTO layout FROM unapi.bre_output_layout WHERE name = format;
+
+    IF layout.name IS NULL THEN
+        RETURN NULL::XML;
+    END IF;
+
+    SELECT * INTO transform FROM config.xml_transform WHERE name = layout.transform;
+    xmlns_uri := COALESCE(transform.namespace_uri,xmlns_uri);
+
+    -- Gather the bib xml
+    SELECT XMLAGG( unapi.mmr(i, format, '', includes, org, depth, slimit, soffset, include_xmlns)) INTO tmp_xml FROM UNNEST( id_list ) i;
+
+    IF layout.title_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.title_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, title;
+    END IF;
+
+    IF layout.description_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.description_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, description;
+    END IF;
+
+    IF layout.creator_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.creator_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, creator;
+    END IF;
+
+    IF layout.update_ts_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.update_ts_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, update_ts;
+    END IF;
+
+    IF unapi_url IS NOT NULL THEN
+        EXECUTE $$SELECT XMLCONCAT( XMLELEMENT( name link, XMLATTRIBUTES( 'http://www.w3.org/1999/xhtml' AS xmlns, 'unapi-server' AS rel, $1 AS href, 'unapi' AS title)), $2)$$ INTO tmp_xml USING unapi_url, tmp_xml::XML;
+    END IF;
+
+    IF header_xml IS NOT NULL THEN tmp_xml := XMLCONCAT(header_xml,tmp_xml::XML); END IF;
+
+    element_list := regexp_split_to_array(layout.feed_top,E'\\.');
+    FOR i IN REVERSE ARRAY_UPPER(element_list, 1) .. 1 LOOP
+        EXECUTE 'SELECT XMLELEMENT( name '|| quote_ident(element_list[i]) ||', XMLATTRIBUTES( $1 AS xmlns), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML;
+    END LOOP;
+
+    RETURN tmp_xml::XML;
+END;
+$F$ LANGUAGE PLPGSQL STABLE;
+
 CREATE OR REPLACE FUNCTION unapi.bre (
     obj_id BIGINT,
     format TEXT,
diff --git a/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.copy_vis_attr_cache.sql b/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.copy_vis_attr_cache.sql
new file mode 100644 (file)
index 0000000..906c82a
--- /dev/null
@@ -0,0 +1,859 @@
+BEGIN;
+
+-- Thist change drops a needless join and saves 10-15% in time cost
+CREATE OR REPLACE FUNCTION search.facets_for_record_set(ignore_facet_classes text[], hits bigint[]) RETURNS TABLE(id integer, value text, count bigint)
+AS $f$
+    SELECT id, value, count
+      FROM (
+        SELECT  mfae.field AS id,
+                mfae.value,
+                COUNT(DISTINCT mfae.source),
+                row_number() OVER (
+                    PARTITION BY mfae.field ORDER BY COUNT(DISTINCT mfae.source) DESC
+                ) AS rownum
+          FROM  metabib.facet_entry mfae
+                JOIN config.metabib_field cmf ON (cmf.id = mfae.field)
+          WHERE mfae.source = ANY ($2)
+                AND cmf.facet_field
+                AND cmf.field_class NOT IN (SELECT * FROM unnest($1))
+          GROUP by 1, 2
+      ) all_facets
+      WHERE rownum <= (
+        SELECT COALESCE(
+            (SELECT value::INT FROM config.global_flag WHERE name = 'search.max_facets_per_field' AND enabled),
+            1000
+        )
+      );
+$f$ LANGUAGE SQL;
+
+CREATE OR REPLACE FUNCTION unapi.metabib_virtual_record_feed ( id_list BIGINT[], format TEXT, includes TEXT[], org TEXT, depth INT DEFAULT NULL, slimit HSTORE DEFAULT NULL, soffset HSTORE DEFAULT NULL, include_xmlns BOOL DEFAULT TRUE, title TEXT DEFAULT NULL, description TEXT DEFAULT NULL, creator TEXT DEFAULT NULL, update_ts TEXT DEFAULT NULL, unapi_url TEXT DEFAULT NULL, header_xml XML DEFAULT NULL ) RETURNS XML AS $F$
+DECLARE
+    layout          unapi.bre_output_layout%ROWTYPE;
+    transform       config.xml_transform%ROWTYPE;
+    item_format     TEXT;
+    tmp_xml         TEXT;
+    xmlns_uri       TEXT := 'http://open-ils.org/spec/feed-xml/v1';
+    ouid            INT;
+    element_list    TEXT[];
+BEGIN
+
+    IF org = '-' OR org IS NULL THEN
+        SELECT shortname INTO org FROM evergreen.org_top();
+    END IF;
+
+    SELECT id INTO ouid FROM actor.org_unit WHERE shortname = org;
+    SELECT * INTO layout FROM unapi.bre_output_layout WHERE name = format;
+
+    IF layout.name IS NULL THEN
+        RETURN NULL::XML;
+    END IF;
+
+    SELECT * INTO transform FROM config.xml_transform WHERE name = layout.transform;
+    xmlns_uri := COALESCE(transform.namespace_uri,xmlns_uri);
+
+    -- Gather the bib xml
+    SELECT XMLAGG( unapi.mmr(i, format, '', includes, org, depth, slimit, soffset, include_xmlns)) INTO tmp_xml FROM UNNEST( id_list ) i;
+
+    IF layout.title_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.title_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, title;
+    END IF;
+
+    IF layout.description_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.description_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, description;
+    END IF;
+
+    IF layout.creator_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.creator_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, creator;
+    END IF;
+
+    IF layout.update_ts_element IS NOT NULL THEN
+        EXECUTE 'SELECT XMLCONCAT( XMLELEMENT( name '|| layout.update_ts_element ||', XMLATTRIBUTES( $1 AS xmlns), $3), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML, update_ts;
+    END IF;
+
+    IF unapi_url IS NOT NULL THEN
+        EXECUTE $$SELECT XMLCONCAT( XMLELEMENT( name link, XMLATTRIBUTES( 'http://www.w3.org/1999/xhtml' AS xmlns, 'unapi-server' AS rel, $1 AS href, 'unapi' AS title)), $2)$$ INTO tmp_xml USING unapi_url, tmp_xml::XML;
+    END IF;
+
+    IF header_xml IS NOT NULL THEN tmp_xml := XMLCONCAT(header_xml,tmp_xml::XML); END IF;
+
+    element_list := regexp_split_to_array(layout.feed_top,E'\\.');
+    FOR i IN REVERSE ARRAY_UPPER(element_list, 1) .. 1 LOOP
+        EXECUTE 'SELECT XMLELEMENT( name '|| quote_ident(element_list[i]) ||', XMLATTRIBUTES( $1 AS xmlns), $2)' INTO tmp_xml USING xmlns_uri, tmp_xml::XML;
+    END LOOP;
+
+    RETURN tmp_xml::XML;
+END;
+$F$ LANGUAGE PLPGSQL STABLE;
+
+CREATE TABLE asset.copy_vis_attr_cache (
+    id              BIGSERIAL   PRIMARY KEY,
+    record          BIGINT      NOT NULL, -- No FKEYs, managed by user triggers.
+    target_copy     BIGINT      NOT NULL,
+    vis_attr_vector INT[]
+);
+CREATE INDEX copy_vis_attr_cache_record_idx ON asset.copy_vis_attr_cache (record);
+CREATE INDEX copy_vis_attr_cache_copy_idx ON asset.copy_vis_attr_cache (target_copy);
+
+ALTER TABLE biblio.record_entry ADD COLUMN vis_attr_vector INT[];
+
+CREATE OR REPLACE FUNCTION search.calculate_visibility_attribute ( value INT, attr TEXT ) RETURNS INT AS $f$
+SELECT  ((CASE $2
+
+            WHEN 'luri_org'         THEN 0 -- "b" attr
+            WHEN 'bib_source'       THEN 1 -- "b" attr
+
+            WHEN 'copy_flags'       THEN 0 -- "c" attr
+            WHEN 'owning_lib'       THEN 1 -- "c" attr
+            WHEN 'circ_lib'         THEN 2 -- "c" attr
+            WHEN 'status'           THEN 3 -- "c" attr
+            WHEN 'location'         THEN 4 -- "c" attr
+            WHEN 'location_group'   THEN 5 -- "c" attr
+
+        END) << 28 ) | $1;
+
+/* copy_flags bit positions, LSB-first:
+
+ 0: asset.copy.opac_visible
+
+
+   When adding flags, you must update asset.all_visible_flags()
+
+   Because bib and copy values are stored separately, we can reuse
+   shifts, saving us some space. We could probably take back a bit
+   too, but I'm not sure its worth squeezing that last one out. We'd
+   be left with just 2 slots for copy attrs, rather than 10.
+*/
+
+$f$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION search.calculate_visibility_attribute_list ( attr TEXT, value INT[] ) RETURNS INT[] AS $f$
+    SELECT ARRAY_AGG(search.calculate_visibility_attribute(x, $1)) FROM UNNEST($2) AS X;
+$f$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION search.calculate_visibility_attribute_test ( attr TEXT, value INT[], negate BOOL DEFAULT FALSE ) RETURNS TEXT AS $f$
+    SELECT  CASE WHEN $3 THEN '!' ELSE '' END || '(' || ARRAY_TO_STRING(search.calculate_visibility_attribute_list($1,$2),'|') || ')';
+$f$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION asset.calculate_copy_visibility_attribute_set ( copy_id BIGINT ) RETURNS INT[] AS $f$
+DECLARE
+    copy_row    asset.copy%ROWTYPE;
+    lgroup_map  asset.copy_location_group_map%ROWTYPE;
+    attr_set    INT[];
+BEGIN
+    SELECT * INTO copy_row FROM asset.copy WHERE id = copy_id;
+
+    attr_set := attr_set || search.calculate_visibility_attribute(copy_row.opac_visible::INT, 'copy_flags');
+    attr_set := attr_set || search.calculate_visibility_attribute(copy_row.circ_lib, 'circ_lib');
+    attr_set := attr_set || search.calculate_visibility_attribute(copy_row.status, 'status');
+    attr_set := attr_set || search.calculate_visibility_attribute(copy_row.location, 'location');
+
+    SELECT  ARRAY_APPEND(
+                attr_set,
+                search.calculate_visibility_attribute(owning_lib, 'owning_lib')
+            ) INTO attr_set
+      FROM  asset.call_number
+      WHERE id = copy_row.call_number;
+
+    FOR lgroup_map IN SELECT * FROM asset.copy_location_group_map WHERE location = copy_row.location LOOP
+        attr_set := attr_set || search.calculate_visibility_attribute(lgroup_map.lgroup, 'location_group');
+    END LOOP;
+
+    RETURN attr_set;
+END;
+$f$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION biblio.calculate_bib_visibility_attribute_set ( bib_id BIGINT ) RETURNS INT[] AS $f$
+DECLARE
+    bib_row     biblio.record_entry%ROWTYPE;
+    cn_row      asset.call_number%ROWTYPE;
+    attr_set    INT[];
+BEGIN
+    SELECT * INTO bib_row FROM biblio.record_entry WHERE id = bib_id;
+
+    IF bib_row.source IS NOT NULL THEN
+        attr_set := attr_set || search.calculate_visibility_attribute(bib_row.source, 'bib_source');
+    END IF;
+
+    FOR cn_row IN
+        SELECT  cn.*
+          FROM  asset.call_number cn
+                JOIN asset.uri_call_number_map m ON (cn.id = m.call_number)
+                JOIN asset.uri u ON (u.id = m.uri)
+          WHERE cn.record = bib_id
+                AND cn.label = '##URI##'
+                AND u.active
+    LOOP
+        attr_set := attr_set || search.calculate_visibility_attribute(cn_row.owning_lib, 'luri_org');
+    END LOOP;
+
+    RETURN attr_set;
+END;
+$f$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION asset.cache_copy_visibility () RETURNS TRIGGER as $func$
+DECLARE
+    ocn     asset.call_number%ROWTYPE;
+    ncn     asset.call_number%ROWTYPE;
+    cid     BIGINT;
+BEGIN
+
+    IF TG_TABLE_NAME = 'peer_bib_copy_map' THEN -- Only needs ON INSERT OR DELETE, so handle separately
+        IF TG_OP = 'INSERT' THEN
+            INSERT INTO asset.copy_vis_attr_cache (record, target_copy, vis_attr_vector) VALUES (
+                NEW.peer_record,
+                NEW.target_copy,
+                asset.calculate_copy_visibility_attribute_set(NEW.target_copy)
+            );
+
+            RETURN NEW;
+        ELSIF TG_OP = 'DELETE' THEN
+            DELETE FROM asset.copy_vis_attr_cache
+              WHERE record = NEW.peer_record AND target_copy = NEW.target_copy;
+
+            RETURN OLD;
+        END IF;
+    END IF;
+
+    IF TG_OP = 'INSERT' THEN -- Handles ON INSERT. ON UPDATE is below.
+        IF TG_TABLE_NAME IN ('copy', 'unit') THEN
+            SELECT * INTO ncn FROM asset.call_number cn WHERE id = NEW.call_number;
+            INSERT INTO asset.copy_vis_attr_cache (record, target_copy, vis_attr_vector) VALUES (
+                ncn.record,
+                NEW.target_copy,
+                asset.calculate_copy_visibility_attribute_set(NEW.id)
+            );
+        ELSIF TG_TABLE_NAME = 'record_entry' THEN
+            NEW.vis_attr_vector := biblio.calculate_bib_visibility_attribute_set(NEW.id);
+        END IF;
+
+        RETURN NEW;
+    END IF;
+
+    -- handle items first, since with circulation activity
+    -- their statuses change frequently
+    IF TG_TABLE_NAME IN ('copy', 'unit') THEN -- This handles ON UPDATE OR DELETE. ON INSERT above
+
+        IF TG_OP = 'DELETE' THEN -- Shouldn't get here, normally
+            DELETE FROM asset.copy_vis_attr_cache WHERE target_copy = OLD.id;
+            RETURN OLD;
+        END IF;
+
+        SELECT * INTO ncn FROM asset.call_number cn WHERE id = NEW.call_number;
+
+        IF OLD.deleted <> NEW.deleted THEN
+            IF NEW.deleted THEN
+                DELETE FROM asset.copy_vis_attr_cache WHERE target_copy = OLD.id;
+            ELSE
+                INSERT INTO asset.copy_vis_attr_cache (record, target_copy, vis_attr_vector) VALUES (
+                    ncn.record,
+                    NEW.id,
+                    asset.calculate_copy_visibility_attribute_set(NEW.id)
+                );
+            END IF;
+
+            RETURN NEW;
+        ELSIF OLD.call_number  <> NEW.call_number THEN
+            SELECT * INTO ocn FROM asset.call_number cn WHERE id = OLD.call_number;
+
+            IF ncn.record <> ocn.record THEN
+                UPDATE  biblio.record_entry
+                  SET   vis_attr_vector = biblio.calculate_bib_visibility_attribute_set(ncn.record)
+                  WHERE id = ocn.record;
+            END IF;
+        END IF;
+
+        IF OLD.location     <> NEW.location OR
+           OLD.status       <> NEW.status OR
+           OLD.opac_visible <> NEW.opac_visible OR
+           OLD.circ_lib     <> NEW.circ_lib
+        THEN
+            -- any of these could change visibility, but
+            -- we'll save some queries and not try to calculate
+            -- the change directly
+            UPDATE  asset.copy_vis_attr_cache
+              SET   target_copy = NEW.id,
+                    vis_attr_vector = asset.calculate_copy_visibility_attribute_set(NEW.id)
+              WHERE target_copy = OLD.id;
+
+        END IF;
+
+    ELSIF TG_TABLE_NAME = 'call_number' THEN -- Only ON UPDATE. Copy handler will deal with ON INSERT OR DELETE.
+
+        IF OLD.record <> NEW.record THEN
+            IF NEW.label = '##URI##' THEN
+                UPDATE  biblio.record_entry
+                  SET   vis_attr_vector = biblio.calculate_bib_visibility_attribute_set(OLD.record)
+                  WHERE id = OLD.record;
+
+                UPDATE  biblio.record_entry
+                  SET   vis_attr_vector = biblio.calculate_bib_visibility_attribute_set(NEW.record)
+                  WHERE id = NEW.record;
+            END IF;
+
+            UPDATE  asset.copy_vis_attr_cache
+              SET   record = NEW.record,
+                    vis_attr_vector = asset.calculate_copy_visibility_attribute_set(target_copy)
+              WHERE target_copy IN (SELECT id FROM asset.copy WHERE call_number = NEW.id)
+                    AND record = OLD.record;
+
+        ELSIF OLD.owning_lib <> NEW.owning_lib THEN
+            UPDATE  asset.copy_vis_attr_cache
+              SET   vis_attr_vector = asset.calculate_copy_visibility_attribute_set(target_copy)
+              WHERE target_copy IN (SELECT id FROM asset.copy WHERE call_number = NEW.id)
+                    AND record = NEW.record;
+
+            IF NEW.label = '##URI##' THEN
+                UPDATE  biblio.record_entry
+                  SET   vis_attr_vector = biblio.calculate_bib_visibility_attribute_set(OLD.record)
+                  WHERE id = OLD.record;
+            END IF;
+        END IF;
+
+    ELSIF TG_TABLE_NAME = 'record_entry' THEN -- Only handles ON UPDATE OR DELETE
+
+        IF TG_OP = 'DELETE' THEN -- Shouldn't get here, normally
+            DELETE FROM asset.copy_vis_attr_cache WHERE record = OLD.id;
+            RETURN OLD;
+        ELSIF OLD.source <> NEW.source THEN
+            NEW.vis_attr_vector := biblio.calculate_bib_visibility_attribute_set(NEW.id);
+        END IF;
+
+    END IF;
+
+    RETURN NEW;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+
+-- Helper functions for use in constructing searches --
+
+CREATE OR REPLACE FUNCTION asset.all_visible_flags () RETURNS TEXT AS $f$
+    SELECT  '(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(1 << x, 'copy_flags')),'&') || ')'
+      FROM  GENERATE_SERIES(0,0) AS x; -- increment as new flags are added.
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.visible_orgs (otype TEXT) RETURNS TEXT AS $f$
+    SELECT  '(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, $1)),'|') || ')'
+      FROM  actor.org_unit
+      WHERE opac_visible;
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.invisible_orgs (otype TEXT) RETURNS TEXT AS $f$
+    SELECT  '!(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, $1)),'|') || ')'
+      FROM  actor.org_unit
+      WHERE NOT opac_visible;
+$f$ LANGUAGE SQL STABLE;
+
+-- Bib-oriented defaults for search
+CREATE OR REPLACE FUNCTION asset.bib_source_default () RETURNS TEXT AS $f$
+    SELECT  '(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, 'bib_source')),'|') || ')'
+      FROM  config.bib_source
+      WHERE transcendant;
+$f$ LANGUAGE SQL IMMUTABLE;
+
+CREATE OR REPLACE FUNCTION asset.luri_org_default () RETURNS TEXT AS $f$
+    SELECT  * FROM asset.invisible_orgs('luri_org');
+$f$ LANGUAGE SQL STABLE;
+
+-- Copy-oriented defaults for search
+CREATE OR REPLACE FUNCTION asset.location_group_default () RETURNS TEXT AS $f$
+    SELECT  '!(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, 'location_group')),'|') || ')'
+      FROM  asset.copy_location_group
+      WHERE NOT opac_visible;
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.location_default () RETURNS TEXT AS $f$
+    SELECT  '!(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, 'location')),'|') || ')'
+      FROM  asset.copy_location
+      WHERE NOT opac_visible;
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.status_default () RETURNS TEXT AS $f$
+    SELECT  '!(' || ARRAY_TO_STRING(ARRAY_AGG(search.calculate_visibility_attribute(id, 'status')),'|') || ')'
+      FROM  config.copy_status
+      WHERE NOT opac_visible;
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.owning_lib_default () RETURNS TEXT AS $f$
+    SELECT  * FROM asset.invisible_orgs('owning_lib');
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.circ_lib_default () RETURNS TEXT AS $f$
+    SELECT  * FROM asset.invisible_orgs('circ_lib');
+$f$ LANGUAGE SQL STABLE;
+
+CREATE OR REPLACE FUNCTION asset.patron_default_visibility_mask () RETURNS TABLE (b_attrs TEXT, c_attrs TEXT)  AS $f$
+DECLARE
+    copy_flags      TEXT; -- "c" attr
+
+    owning_lib      TEXT; -- "c" attr
+    circ_lib        TEXT; -- "c" attr
+    status          TEXT; -- "c" attr
+    location        TEXT; -- "c" attr
+    location_group  TEXT; -- "c" attr
+
+    luri_org        TEXT; -- "b" attr
+    bib_sources     TEXT; -- "b" attr
+BEGIN
+    copy_flags      := asset.all_visible_flags(); -- Will always have at least one
+
+    owning_lib      := NULLIF(asset.owning_lib_default(),'!()');
+    
+    circ_lib        := NULLIF(asset.circ_lib_default(),'!()');
+    status          := NULLIF(asset.status_default(),'!()');
+    location        := NULLIF(asset.location_default(),'!()');
+    location_group  := NULLIF(asset.location_group_default(),'!()');
+
+    luri_org        := NULLIF(asset.luri_org_default(),'!()');
+    bib_sources     := NULLIF(asset.bib_source_default(),'()');
+
+    RETURN QUERY SELECT
+        '('||ARRAY_TO_STRING(
+            ARRAY[luri_org,bib_sources],
+            '|'
+        )||')',
+        '('||ARRAY_TO_STRING(
+            ARRAY[copy_flags,owning_lib,circ_lib,status,location,location_group]::TEXT[],
+            '&'
+        )||')';
+END;
+$f$ LANGUAGE PLPGSQL STABLE ROWS 1;
+
+CREATE OR REPLACE FUNCTION metabib.suggest_browse_entries(raw_query_text text, search_class text, headline_opts text, visibility_org integer, query_limit integer, normalization integer)
+ RETURNS TABLE(value text, field integer, buoyant_and_class_match boolean, field_match boolean, field_weight integer, rank real, buoyant boolean, match text)
+AS $f$
+DECLARE
+    prepared_query_texts    TEXT[];
+    query                   TSQUERY;
+    plain_query             TSQUERY;
+    opac_visibility_join    TEXT;
+    search_class_join       TEXT;
+    r_fields                RECORD;
+BEGIN
+    prepared_query_texts := metabib.autosuggest_prepare_tsquery(raw_query_text);
+
+    query := TO_TSQUERY('keyword', prepared_query_texts[1]);
+    plain_query := TO_TSQUERY('keyword', prepared_query_texts[2]);
+
+    visibility_org := NULLIF(visibility_org,-1);
+    IF visibility_org IS NOT NULL THEN
+        opac_visibility_join := '
+    JOIN asset.copy_vis_attr_cache acvac ON (acvac.record = x.source)
+    JOIN vm ON (acvac.vis_attr_vector @@ vm.c_attrs::query_int)
+';
+    ELSE
+        opac_visibility_join := '';
+    END IF;
+
+    -- The following determines whether we only provide suggestsons matching
+    -- the user's selected search_class, or whether we show other suggestions
+    -- too. The reason for MIN() is that for search_classes like
+    -- 'title|proper|uniform' you would otherwise get multiple rows.  The
+    -- implication is that if title as a class doesn't have restrict,
+    -- nor does the proper field, but the uniform field does, you're going
+    -- to get 'false' for your overall evaluation of 'should we restrict?'
+    -- To invert that, change from MIN() to MAX().
+
+    SELECT
+        INTO r_fields
+            MIN(cmc.restrict::INT) AS restrict_class,
+            MIN(cmf.restrict::INT) AS restrict_field
+        FROM metabib.search_class_to_registered_components(search_class)
+            AS _registered (field_class TEXT, field INT)
+        JOIN
+            config.metabib_class cmc ON (cmc.name = _registered.field_class)
+        LEFT JOIN
+            config.metabib_field cmf ON (cmf.id = _registered.field);
+
+    -- evaluate 'should we restrict?'
+    IF r_fields.restrict_field::BOOL OR r_fields.restrict_class::BOOL THEN
+        search_class_join := '
+    JOIN
+        metabib.search_class_to_registered_components($2)
+        AS _registered (field_class TEXT, field INT) ON (
+            (_registered.field IS NULL AND
+                _registered.field_class = cmf.field_class) OR
+            (_registered.field = cmf.id)
+        )
+    ';
+    ELSE
+        search_class_join := '
+    LEFT JOIN
+        metabib.search_class_to_registered_components($2)
+        AS _registered (field_class TEXT, field INT) ON (
+            _registered.field_class = cmc.name
+        )
+    ';
+    END IF;
+
+    RETURN QUERY EXECUTE '
+WITH vm AS ( SELECT * FROM asset.patron_default_visibility_mask() ),
+     mbe AS (SELECT * FROM metabib.browse_entry WHERE index_vector @@ $1 LIMIT 10000)
+SELECT  DISTINCT
+        x.value,
+        x.id,
+        x.push,
+        x.restrict,
+        x.weight,
+        x.ts_rank_cd,
+        x.buoyant,
+        TS_HEADLINE(value, $7, $3)
+  FROM  (SELECT DISTINCT
+                mbe.value,
+                cmf.id,
+                cmc.buoyant AND _registered.field_class IS NOT NULL AS push,
+                _registered.field = cmf.id AS restrict,
+                cmf.weight,
+                TS_RANK_CD(mbe.index_vector, $1, $6),
+                cmc.buoyant,
+                mbedm.source
+          FROM  metabib.browse_entry_def_map mbedm
+                JOIN mbe ON (mbe.id = mbedm.entry)
+                JOIN config.metabib_field cmf ON (cmf.id = mbedm.def)
+                JOIN config.metabib_class cmc ON (cmf.field_class = cmc.name)
+                '  || search_class_join || '
+          ORDER BY 3 DESC, 4 DESC NULLS LAST, 5 DESC, 6 DESC, 7 DESC, 1 ASC
+          LIMIT 1000) AS x
+        ' || opac_visibility_join || '
+  ORDER BY 3 DESC, 4 DESC NULLS LAST, 5 DESC, 6 DESC, 7 DESC, 1 ASC
+  LIMIT $5
+'   -- sic, repeat the order by clause in the outer select too
+    USING
+        query, search_class, headline_opts,
+        visibility_org, query_limit, normalization, plain_query
+        ;
+
+    -- sort order:
+    --  buoyant AND chosen class = match class
+    --  chosen field = match field
+    --  field weight
+    --  rank
+    --  buoyancy
+    --  value itself
+
+END;
+$f$ LANGUAGE plpgsql ROWS 10;
+
+CREATE OR REPLACE FUNCTION metabib.browse(search_field integer[], browse_term text, context_org integer DEFAULT NULL::integer, context_loc_group integer DEFAULT NULL::integer, staff boolean DEFAULT false, pivot_id bigint DEFAULT NULL::bigint, result_limit integer DEFAULT 10)
+ RETURNS SETOF metabib.flat_browse_entry_appearance
+AS $f$
+DECLARE
+    core_query              TEXT;
+    back_query              TEXT;
+    forward_query           TEXT;
+    pivot_sort_value        TEXT;
+    pivot_sort_fallback     TEXT;
+    context_locations       INT[];
+    browse_superpage_size   INT;
+    results_skipped         INT := 0;
+    back_limit              INT;
+    back_to_pivot           INT;
+    forward_limit           INT;
+    forward_to_pivot        INT;
+BEGIN
+    -- First, find the pivot if we were given a browse term but not a pivot.
+    IF pivot_id IS NULL THEN
+        pivot_id := metabib.browse_pivot(search_field, browse_term);
+    END IF;
+
+    SELECT INTO pivot_sort_value, pivot_sort_fallback
+        sort_value, value FROM metabib.browse_entry WHERE id = pivot_id;
+
+    -- Bail if we couldn't find a pivot.
+    IF pivot_sort_value IS NULL THEN
+        RETURN;
+    END IF;
+
+    -- Transform the context_loc_group argument (if any) (logc at the
+    -- TPAC layer) into a form we'll be able to use.
+    IF context_loc_group IS NOT NULL THEN
+        SELECT INTO context_locations ARRAY_AGG(location)
+            FROM asset.copy_location_group_map
+            WHERE lgroup = context_loc_group;
+    END IF;
+
+    -- Get the configured size of browse superpages.
+    SELECT INTO browse_superpage_size COALESCE(value::INT,100)     -- NULL ok
+        FROM config.global_flag
+        WHERE enabled AND name = 'opac.browse.holdings_visibility_test_limit';
+
+    -- First we're going to search backward from the pivot, then we're going
+    -- to search forward.  In each direction, we need two limits.  At the
+    -- lesser of the two limits, we delineate the edge of the result set
+    -- we're going to return.  At the greater of the two limits, we find the
+    -- pivot value that would represent an offset from the current pivot
+    -- at a distance of one "page" in either direction, where a "page" is a
+    -- result set of the size specified in the "result_limit" argument.
+    --
+    -- The two limits in each direction make four derived values in total,
+    -- and we calculate them now.
+    back_limit := CEIL(result_limit::FLOAT / 2);
+    back_to_pivot := result_limit;
+    forward_limit := result_limit / 2;
+    forward_to_pivot := result_limit - 1;
+
+    -- This is the meat of the SQL query that finds browse entries.  We'll
+    -- pass this to a function which uses it with a cursor, so that individual
+    -- rows may be fetched in a loop until some condition is satisfied, without
+    -- waiting for a result set of fixed size to be collected all at once.
+    core_query := '
+SELECT  mbe.id,
+        mbe.value,
+        mbe.sort_value
+  FROM  metabib.browse_entry mbe
+  WHERE (
+            EXISTS ( -- are there any bibs using this mbe via the requested fields?
+                SELECT  1
+                  FROM  metabib.browse_entry_def_map mbedm
+                  WHERE mbedm.entry = mbe.id AND mbedm.def = ANY(' || quote_literal(search_field) || ')
+            ) OR EXISTS ( -- are there any authorities using this mbe via the requested fields?
+                SELECT  1
+                  FROM  metabib.browse_entry_simple_heading_map mbeshm
+                        JOIN authority.simple_heading ash ON ( mbeshm.simple_heading = ash.id )
+                        JOIN authority.control_set_auth_field_metabib_field_map_refs map ON (
+                            ash.atag = map.authority_field
+                            AND map.metabib_field = ANY(' || quote_literal(search_field) || ')
+                        )
+                  WHERE mbeshm.entry = mbe.id
+            )
+        ) AND ';
+
+    -- This is the variant of the query for browsing backward.
+    back_query := core_query ||
+        ' mbe.sort_value <= ' || quote_literal(pivot_sort_value) ||
+    ' ORDER BY mbe.sort_value DESC, mbe.value DESC LIMIT 1000';
+
+    -- This variant browses forward.
+    forward_query := core_query ||
+        ' mbe.sort_value > ' || quote_literal(pivot_sort_value) ||
+    ' ORDER BY mbe.sort_value, mbe.value LIMIT 1000';
+
+    -- We now call the function which applies a cursor to the provided
+    -- queries, stopping at the appropriate limits and also giving us
+    -- the next page's pivot.
+    RETURN QUERY
+        SELECT * FROM metabib.staged_browse(
+            back_query, search_field, context_org, context_locations,
+            staff, browse_superpage_size, TRUE, back_limit, back_to_pivot
+        ) UNION
+        SELECT * FROM metabib.staged_browse(
+            forward_query, search_field, context_org, context_locations,
+            staff, browse_superpage_size, FALSE, forward_limit, forward_to_pivot
+        ) ORDER BY row_number DESC;
+
+END;
+$f$ LANGUAGE plpgsql ROWS 10;
+
+CREATE OR REPLACE FUNCTION metabib.staged_browse(query text, fields integer[], context_org integer, context_locations integer[], staff boolean, browse_superpage_size integer, count_up_from_zero boolean, result_limit integer, next_pivot_pos integer)
+ RETURNS SETOF metabib.flat_browse_entry_appearance
+AS $f$
+DECLARE
+    curs                    REFCURSOR;
+    rec                     RECORD;
+    qpfts_query             TEXT;
+    aqpfts_query            TEXT;
+    afields                 INT[];
+    bfields                 INT[];
+    result_row              metabib.flat_browse_entry_appearance%ROWTYPE;
+    results_skipped         INT := 0;
+    row_counter             INT := 0;
+    row_number              INT;
+    slice_start             INT;
+    slice_end               INT;
+    full_end                INT;
+    all_records             BIGINT[];
+    all_brecords             BIGINT[];
+    all_arecords            BIGINT[];
+    superpage_of_records    BIGINT[];
+    superpage_size          INT;
+    c_tests                 TEXT := '';
+    b_tests                 TEXT := '';
+    c_orgs                  INT[];
+BEGIN
+    IF count_up_from_zero THEN
+        row_number := 0;
+    ELSE
+        row_number := -1;
+    END IF;
+
+    IF NOT staff THEN
+        SELECT x.c_attrs, x.b_attrs INTO c_tests, b_tests FROM asset.patron_default_visibility_mask() x;
+    END IF;
+
+    IF c_tests <> '' THEN c_tests := c_tests || '&'; END IF;
+    IF b_tests <> '' THEN b_tests := b_tests || '&'; END IF;
+
+    SELECT ARRAY_AGG(id) INTO c_orgs FROM actor.org_unit_descendants(context_org);
+    
+    c_tests := c_tests || search.calculate_visibility_attribute_test('circ_lib',c_orgs)
+               || '&' || search.calculate_visibility_attribute_test('owning_lib',c_orgs);
+    
+    PERFORM 1 FROM config.internal_flag WHERE enabled AND name = 'opac.located_uri.act_as_copy';
+    IF FOUND THEN
+        b_tests := b_tests || search.calculate_visibility_attribute_test(
+            'luri_org',
+            (SELECT ARRAY_AGG(id) FROM actor.org_unit_full_path(context_org) x)
+        );
+    ELSE
+        b_tests := b_tests || search.calculate_visibility_attribute_test(
+            'luri_org',
+            (SELECT ARRAY_AGG(id) FROM actor.org_unit_ancestors(context_org) x)
+        );
+    END IF;
+
+    IF context_locations THEN
+        IF c_tests <> '' THEN c_tests := c_tests || '&'; END IF;
+        c_tests := c_tests || search.calculate_visibility_attribute_test('location',context_locations);
+    END IF;
+
+    OPEN curs NO SCROLL FOR EXECUTE query;
+
+    LOOP
+        FETCH curs INTO rec;
+        IF NOT FOUND THEN
+            IF result_row.pivot_point IS NOT NULL THEN
+                RETURN NEXT result_row;
+            END IF;
+            RETURN;
+        END IF;
+
+        -- Gather aggregate data based on the MBE row we're looking at now, authority axis
+        SELECT INTO all_arecords, result_row.sees, afields
+                ARRAY_AGG(DISTINCT abl.bib), -- bibs to check for visibility
+                STRING_AGG(DISTINCT aal.source::TEXT, $$,$$), -- authority record ids
+                ARRAY_AGG(DISTINCT map.metabib_field) -- authority-tag-linked CMF rows
+
+          FROM  metabib.browse_entry_simple_heading_map mbeshm
+                JOIN authority.simple_heading ash ON ( mbeshm.simple_heading = ash.id )
+                JOIN authority.authority_linking aal ON ( ash.record = aal.source )
+                JOIN authority.bib_linking abl ON ( aal.target = abl.authority )
+                JOIN authority.control_set_auth_field_metabib_field_map_refs map ON (
+                    ash.atag = map.authority_field
+                    AND map.metabib_field = ANY(fields)
+                )
+          WHERE mbeshm.entry = rec.id;
+
+        -- Gather aggregate data based on the MBE row we're looking at now, bib axis
+        SELECT INTO all_brecords, result_row.authorities, bfields
+                ARRAY_AGG(DISTINCT source),
+                STRING_AGG(DISTINCT authority::TEXT, $$,$$),
+                ARRAY_AGG(DISTINCT def)
+          FROM  metabib.browse_entry_def_map
+          WHERE entry = rec.id
+                AND def = ANY(fields);
+
+        SELECT INTO result_row.fields STRING_AGG(DISTINCT x::TEXT, $$,$$) FROM UNNEST(afields || bfields) x;
+
+        result_row.sources := 0;
+        result_row.asources := 0;
+
+        -- Bib-linked vis checking
+        IF ARRAY_UPPER(all_brecords,1) IS NOT NULL THEN
+
+            SELECT  INTO result_row.sources COUNT(DISTINCT b.id)
+              FROM  biblio.record_entry b
+                    JOIN asset.copy_vis_attr_cache acvac ON (acvac.record = b.id)
+              WHERE b.id = ANY(all_brecords[1:browse_superpage_size])
+                    AND (
+                        acvac.vis_attr_vector @@ c_tests::query_int
+                        OR b.vis_attr_vector @@ b_tests::query_int
+                    );
+
+            result_row.accurate := TRUE;
+
+        END IF;
+
+        -- Authority-linked vis checking
+        IF ARRAY_UPPER(all_arecords,1) IS NOT NULL THEN
+
+            SELECT  INTO result_row.asources COUNT(DISTINCT b.id)
+              FROM  biblio.record_entry b
+                    JOIN asset.copy_vis_attr_cache acvac ON (acvac.record = b.id)
+              WHERE b.id = ANY(all_arecords[1:browse_superpage_size])
+                    AND (
+                        acvac.vis_attr_vector @@ c_tests::query_int
+                        OR b.vis_attr_vector @@ b_tests::query_int
+                    );
+
+            result_row.aaccurate := TRUE;
+
+        END IF;
+
+        IF result_row.sources > 0 OR result_row.asources > 0 THEN
+
+            -- The function that calls this function needs row_number in order
+            -- to correctly order results from two different runs of this
+            -- functions.
+            result_row.row_number := row_number;
+
+            -- Now, if row_counter is still less than limit, return a row.  If
+            -- not, but it is less than next_pivot_pos, continue on without
+            -- returning actual result rows until we find
+            -- that next pivot, and return it.
+
+            IF row_counter < result_limit THEN
+                result_row.browse_entry := rec.id;
+                result_row.value := rec.value;
+
+                RETURN NEXT result_row;
+            ELSE
+                result_row.browse_entry := NULL;
+                result_row.authorities := NULL;
+                result_row.fields := NULL;
+                result_row.value := NULL;
+                result_row.sources := NULL;
+                result_row.sees := NULL;
+                result_row.accurate := NULL;
+                result_row.aaccurate := NULL;
+                result_row.pivot_point := rec.id;
+
+                IF row_counter >= next_pivot_pos THEN
+                    RETURN NEXT result_row;
+                    RETURN;
+                END IF;
+            END IF;
+
+            IF count_up_from_zero THEN
+                row_number := row_number + 1;
+            ELSE
+                row_number := row_number - 1;
+            END IF;
+
+            -- row_counter is different from row_number.
+            -- It simply counts up from zero so that we know when
+            -- we've reached our limit.
+            row_counter := row_counter + 1;
+        END IF;
+    END LOOP;
+END;
+$f$ LANGUAGE plpgsql ROWS 10;
+
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON biblio.peer_bib_copy_map;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON biblio.record_entry;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON asset.copy;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON asset.call_number;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON asset.copy_location;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON serial.unit;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON config.copy_status;
+DROP TRIGGER IF EXISTS a_opac_vis_mat_view_tgr ON actor.org_unit;
+
+-- Upgrade the data!
+INSERT INTO asset.copy_vis_attr_cache (target_copy, record, vis_attr_vector)
+    SELECT  cp.id,
+            cn.record,
+            asset.calculate_copy_visibility_attribute_set(cp.id)
+      FROM  asset.copy cp
+            JOIN asset.call_number cn ON (cp.call_number = cn.id);
+
+UPDATE biblio.record_entry SET vis_attr_vector = biblio.calculate_bib_visibility_attribute_set(id);
+
+CREATE TRIGGER z_opac_vis_mat_view_tgr BEFORE INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_tgr AFTER INSERT OR DELETE ON biblio.peer_bib_copy_map FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_tgr AFTER UPDATE ON asset.call_number FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_del_tgr BEFORE DELETE ON asset.copy FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_del_tgr BEFORE DELETE ON serial.unit FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_tgr AFTER INSERT OR UPDATE ON asset.copy FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+CREATE TRIGGER z_opac_vis_mat_view_tgr AFTER INSERT OR UPDATE ON serial.unit FOR EACH ROW EXECUTE PROCEDURE asset.cache_copy_visibility();
+
+COMMIT;
+
index 2130759..00e1fb2 100644 (file)
@@ -5,7 +5,7 @@
                 [%~ |l('<span class="result_count_number">' _ ctx.result_start _'</span>',
                 '<span class="result_count_number">' _ ctx.result_stop _ '</span>',
                 '<span class="result_count_number">' _ ctx.hit_count _ '</span>')  ~%]
-                Results [_1] - [_2] of about [_3]
+                Results [_1] - [_2] of [_3]
                 [%~ END %]
                 <span class='padding-left-6'>
                     [%~ |l('<span class="result_count_number">' _ (page + 1) _ '</span>',