LP#1269911: Teach QueryParser new tricks
authorMike Rylander <mrylander@gmail.com>
Wed, 15 Jan 2014 21:00:52 +0000 (16:00 -0500)
committerDan Wells <dbw2@calvin.edu>
Fri, 21 Feb 2014 20:38:52 +0000 (15:38 -0500)
QP Needs to be made aware of several new structures in the database.

First, we have added a new sort-supporting table called metabib.record_sorter
which holds values extracted by crad.sorter=true attrs.  This is used instead
of the mrd.attrs->"something" hstore composite.

Next, we teach QP how to convert from a list of user-supplied values across
many dynamic filters (based on crad) into an intarray query of ids extracted
from config.coded_value_map (in the case of controlled attributes) or
metabib.uncontrolled_record_attr_value (in the case of, you guessed it,
uncontrolled attributes).  This query is applied against the vlist column
of metabib.record_attr_vector_list, which is GIN indexed for speed.

Finally, metabib.record_attr is now a view over metabib.record_attr_vector_list
and is consequently going to be slow for general use.  We restrict
its inclusion in the core query to only the case of a during() filter
which requires access to the value of a bib's Date2 field.  For the
other common case, requiring access to the Date1 field, we instead
use the pubdate sort value now stored in metabib.record_sorter.  We
might consider making the specific sorter attribute used configurable
so that we can change the definition of pubdate down the road, but it
starts out (and generally stays) defined as equivalent to Date1.

Signed-off-by: Mike Rylander <mrylander@gmail.com>
Signed-off-by: Dan Wells <dbw2@calvin.edu>
Open-ILS/src/perlmods/lib/OpenILS/Application/Storage/Driver/Pg/QueryParser.pm

index 7d0aae0..147b165 100644 (file)
@@ -695,6 +695,40 @@ use Data::Dumper;
 use OpenILS::Application::AppUtils;
 my $apputils = "OpenILS::Application::AppUtils";
 
+our %_dfilter_controlled_cache = ();
+
+sub dynamic_filter_compile {
+    my ($self, $filter, $params, $negate) = @_;
+    my $e = OpenILS::Utils::CStoreEditor->new;
+
+    $negate = $negate ? '!' : '';
+
+    if (!exists($_dfilter_controlled_cache{$filter})) {
+        my $crad = $e->retrieve_config_record_attr_definition($filter);
+        my $ccvm_list = $e->search_config_coded_value_map({ctype =>$filter});
+
+        $_dfilter_controlled_cache{$filter} = $crad->to_bare_hash;
+        $_dfilter_controlled_cache{$filter}{controlled} = scalar @$ccvm_list;
+    }
+
+    my $method = $_dfilter_controlled_cache{$filter}{controlled} ?
+        'search_config_coded_value_map' : 'search_metabib_uncontrolled_record_attr_value';
+    my $attr_field = $_dfilter_controlled_cache{$filter}{controlled} ?
+        'ctype' : 'attr';
+    my $value_field = $_dfilter_controlled_cache{$filter}{controlled} ?
+        'code' : 'value';
+
+    return sprintf('%s(%s)', $negate,
+        join(
+            '|', 
+            map {
+                $_->id
+            } @{
+                $e->$method({ $attr_field => $filter, $value_field => $params })
+            }
+        )
+    );
+}
 
 sub toSQL {
     my $self = shift;
@@ -736,14 +770,31 @@ sub toSQL {
         $sort_filter = 'rel';
     }
 
+    my $lang_join = '';
     if (($filters{preferred_language} || $self->QueryParser->default_preferred_language) && ($filters{preferred_language_multiplier} || $self->QueryParser->default_preferred_language_multiplier)) {
+    
         my $pl = $self->QueryParser->quote_value( $filters{preferred_language} ? $filters{preferred_language} : $self->QueryParser->default_preferred_language );
+        $$flat_plan{with} .= ',' if $$flat_plan{with};
+        $$flat_plan{with} .= "lang_with AS (SELECT id FROM config.coded_value_map WHERE ctype = 'item_lang' AND code = $pl)";
+        $lang_join = ",lang_with";
+
         my $plw = $filters{preferred_language_multiplier} ? $filters{preferred_language_multiplier} : $self->QueryParser->default_preferred_language_multiplier;
-        $rel = "($rel * COALESCE( NULLIF( FIRST(mrd.attrs \@> hstore('item_lang', $pl)), FALSE )::INT * $plw, 1))";
+        $rel = "($rel * COALESCE( NULLIF( FIRST(mrv.vlist \@> ARRAY[lang_with.id]), FALSE )::INT * $plw, 1))";
+        $$flat_plan{uses_mrv} = 1;
     }
     $rel = "1.0/($rel)::NUMERIC";
 
-    my $mra_join = 'INNER JOIN metabib.record_attr mrd ON m.source = mrd.id';
+    my $mrv_join = '';
+    if ($$flat_plan{uses_mrv}) {
+        $mrv_join = 'INNER JOIN metabib.record_attr_vector_list mrv ON m.source = mrv.source';
+    }
+
+    my $mra_join = '';
+    if ($$flat_plan{uses_mrd}) {
+        $mra_join = 'INNER JOIN metabib.record_attr mrd ON m.source = mrd.id';
+    }
+
+    my $pubdate_join = "LEFT JOIN metabib.record_sorter pubdate_t ON m.source = pubdate_t.source AND attr = 'pubdate'";
 
     my $bre_join = '';
     if ($self->find_modifier('deleted')) {
@@ -763,7 +814,7 @@ sub toSQL {
     $nullpos = 'NULLS FIRST' if ($self->find_modifier('nullsfirst'));
 
     if (grep {$_ eq $sort_filter} @{$self->QueryParser->dynamic_sorters}) {
-        $rank = "FIRST(mrd.attrs->'$sort_filter')"
+        $rank = "FIRST((SELECT value FROM metabib.record_sorter rbr WHERE rbr.source = m.source and attr = '$sort_filter'))"
     } elsif ($sort_filter eq 'create_date') {
         $rank = "FIRST((SELECT create_date FROM biblio.record_entry rbr WHERE rbr.id = m.source))";
     } elsif ($sort_filter eq 'edit_date') {
@@ -800,11 +851,14 @@ SELECT  $key AS id,
         $agg_records,
         $rel AS rel,
         $rank AS rank, 
-        FIRST(mrd.attrs->'date1') AS tie_break
+        FIRST(pubdate_t.value) AS tie_break
   FROM  metabib.metarecord_source_map m
+        $$flat_plan{from}
+        $pubdate_join
         $mra_join
+        $mrv_join
         $bre_join
-        $$flat_plan{from}
+        $lang_join
   WHERE 1=1
         $flat_where
   GROUP BY 1
@@ -826,6 +880,8 @@ sub flatten {
     my $where = shift || '';
     my $with = '';
     my $uses_bre = 0;
+    my $uses_mrd = 0;
+    my $uses_mrv = 0;
 
     my @rank_list;
     for my $node ( @{$self->query_nodes} ) {
@@ -986,6 +1042,8 @@ sub flatten {
                 }
 
                 $uses_bre = $$subnode{uses_bre};
+                $uses_mrd = $$subnode{uses_mrd};
+                $uses_mrv = $$subnode{uses_mrv};
             }
         } else {
 
@@ -999,40 +1057,37 @@ sub flatten {
     }
 
     my $joiner = "\n" . ${spc} x ( $self->plan_level + 5 ) . ($self->joiner eq '&' ? 'AND ' : 'OR ');
+
+    my @dlist = ();
     # for each dynamic filter, build more of the WHERE clause
     for my $filter (@{$self->filters}) {
         my $NOT = $filter->negate ? 'NOT ' : '';
         if (grep { $_ eq $filter->name } @{ $self->QueryParser->dynamic_filters }) {
 
-            warn "flatten(): processing dynamic filter ". $filter->name ."\n"
-                if $self->QueryParser->debug;
-
-            # bool joiner for intra-plan nodes/filters
-            $where .= $joiner if $where ne '';
-
-            my @fargs = @{$filter->args};
             my $fname = $filter->name;
             $fname = 'item_lang' if $fname eq 'language'; #XXX filter aliases 
 
-            $where .= sprintf(
-                "${NOT}COALESCE((mrd.attrs->'%s') IN (%s), false)", $fname, 
-                join(',', map { $self->QueryParser->quote_value($_) } @fargs)
-            );
-
-            warn "flatten(): filter where => $where\n"
+            warn "flatten(): processing dynamic filter ". $filter->name ."\n"
                 if $self->QueryParser->debug;
+
+            my $vlist_query = $self->dynamic_filter_compile( $fname, $filter->args, $filter->negate );
+
+            # bool joiner for intra-plan nodes/filters
+            push(@dlist, $self->joiner) if @dlist;
+            push(@dlist, $vlist_query);
+            $uses_mrv = 1;
         } else {
             if ($filter->name eq 'before') {
                 if (@{$filter->args} == 1) {
                     $where .= $joiner if $where ne '';
-                    $where .= "${NOT}COALESCE((mrd.attrs->'date1') <= "
+                    $where .= "${NOT}COALESCE(pubdate_t.value <= "
                            . $self->QueryParser->quote_value($filter->args->[0])
                            . ", false)";
                 }
             } elsif ($filter->name eq 'after') {
                 if (@{$filter->args} == 1) {
                     $where .= $joiner if $where ne '';
-                    $where .= "${NOT}COALESCE((mrd.attrs->'date1') >= "
+                    $where .= "${NOT}COALESCE(pubdate_t.value >= "
                            . $self->QueryParser->quote_value($filter->args->[0])
                            . ", false)";
                 }
@@ -1041,12 +1096,13 @@ sub flatten {
                     $where .= $joiner if $where ne '';
                     $where .= "${NOT}COALESCE("
                            . $self->QueryParser->quote_value($filter->args->[0])
-                           . " BETWEEN (mrd.attrs->'date1') AND (mrd.attrs->'date2'), false)";
+                           . " BETWEEN pubdate_t.value AND (mrd.attrs->'date2'), false)";
+                    $uses_mrd = 1;
                 }
             } elsif ($filter->name eq 'between') {
                 if (@{$filter->args} == 2) {
                     $where .= $joiner if $where ne '';
-                    $where .= "${NOT}COALESCE((mrd.attrs->'date1') BETWEEN "
+                    $where .= "${NOT}COALESCE(pubdate_t.value BETWEEN "
                            . $self->QueryParser->quote_value($filter->args->[0])
                            . " AND "
                            . $self->QueryParser->quote_value($filter->args->[1])
@@ -1170,9 +1226,27 @@ sub flatten {
             }
         }
     }
+
+    if (@dlist) {
+
+        $where .= $joiner if $where ne '';
+        $where .= sprintf(
+            'mrv.vlist @@ \'%s\'',
+            join('', @dlist)
+        );
+    }
+
     warn "flatten(): full filter where => $where\n" if $self->QueryParser->debug;
 
-    return { rank_list => \@rank_list, from => $from, where => $where,  with => $with, uses_bre => $uses_bre };
+    return {
+        rank_list => \@rank_list,
+        from => $from,
+        where => $where,
+        with => $with,
+        uses_bre => $uses_bre,
+        uses_mrv => $uses_mrv,
+        uses_mrd => $uses_mrd
+    };
 }