--- /dev/null
+CREATE OR REPLACE FUNCTION metabib.browse(
+ search_field INT[],
+ browse_term TEXT,
+ context_org INT DEFAULT NULL,
+ context_loc_group INT DEFAULT NULL,
+ staff BOOL DEFAULT FALSE,
+ pivot_id BIGINT DEFAULT NULL,
+ result_limit INT DEFAULT 10
+) RETURNS SETOF metabib.flat_browse_entry_appearance AS $p$
+DECLARE
+ core_query TEXT;
+ back_query TEXT;
+ forward_query TEXT;
+ pivot_sort_value TEXT;
+ pivot_sort_fallback TEXT;
+ context_locations INT[];
+ browse_superpage_size INT;
+ results_skipped INT := 0;
+ back_limit INT;
+ back_to_pivot INT;
+ forward_limit INT;
+ forward_to_pivot INT;
+BEGIN
+ -- First, find the pivot if we were given a browse term but not a pivot.
+ IF pivot_id IS NULL THEN
+ pivot_id := metabib.browse_pivot(search_field, browse_term);
+ END IF;
+
+ SELECT INTO pivot_sort_value, pivot_sort_fallback
+ sort_value, value FROM metabib.browse_entry WHERE id = pivot_id;
+
+ -- Bail if we couldn't find a pivot.
+ IF pivot_sort_value IS NULL THEN
+ RETURN;
+ END IF;
+
+ -- Transform the context_loc_group argument (if any) (logc at the
+ -- TPAC layer) into a form we'll be able to use.
+ IF context_loc_group IS NOT NULL THEN
+ SELECT INTO context_locations ARRAY_AGG(location)
+ FROM asset.copy_location_group_map
+ WHERE lgroup = context_loc_group;
+ END IF;
+
+ -- Get the configured size of browse superpages.
+ SELECT INTO browse_superpage_size value -- NULL ok
+ FROM config.global_flag
+ WHERE enabled AND name = 'opac.browse.holdings_visibility_test_limit';
+
+ -- First we're going to search backward from the pivot, then we're going
+ -- to search forward. In each direction, we need two limits. At the
+ -- lesser of the two limits, we delineate the edge of the result set
+ -- we're going to return. At the greater of the two limits, we find the
+ -- pivot value that would represent an offset from the current pivot
+ -- at a distance of one "page" in either direction, where a "page" is a
+ -- result set of the size specified in the "result_limit" argument.
+ --
+ -- The two limits in each direction make four derived values in total,
+ -- and we calculate them now.
+ back_limit := CEIL(result_limit::FLOAT / 2);
+ back_to_pivot := result_limit;
+ forward_limit := result_limit / 2;
+ forward_to_pivot := result_limit - 1;
+
+ -- This is the meat of the SQL query that finds browse entries. We'll
+ -- pass this to a function which uses it with a cursor, so that individual
+ -- rows may be fetched in a loop until some condition is satisfied, without
+ -- waiting for a result set of fixed size to be collected all at once.
+ core_query := '
+SELECT mbe.id,
+ mbe.value,
+ mbe.sort_value
+ FROM metabib.browse_entry mbe
+ WHERE (
+ EXISTS ( -- are there any bibs using this mbe via the requested fields?
+ SELECT 1
+ FROM metabib.browse_entry_def_map mbedm
+ WHERE mbedm.entry = mbe.id AND mbedm.def = ANY(' || quote_literal(search_field) || ')
+ LIMIT 1
+ ) OR EXISTS ( -- are there any authorities using this mbe via the requested fields?
+ SELECT 1
+ FROM metabib.browse_entry_simple_heading_map mbeshm
+ JOIN authority.simple_heading ash ON ( mbeshm.simple_heading = ash.id )
+ JOIN authority.control_set_auth_field_metabib_field_map_refs map ON (
+ ash.atag = map.authority_field
+ AND map.metabib_field = ANY(' || quote_literal(search_field) || ')
+ )
+ JOIN authority.control_set_authority_field acsaf ON (
+ map.authority_field = acsaf.id
+ )
+ JOIN authority.heading_field ahf ON (ahf.id = acsaf.heading_field)
+ WHERE mbeshm.entry = mbe.id
+ AND ahf.heading_purpose IN (' || $$'variant'$$ || ')
+ )
+ ) AND ';
+
+ -- This is the variant of the query for browsing backward.
+ back_query := core_query ||
+ ' mbe.sort_value <= ' || quote_literal(pivot_sort_value) ||
+ ' ORDER BY mbe.sort_value DESC, mbe.value DESC ';
+
+ -- This variant browses forward.
+ forward_query := core_query ||
+ ' mbe.sort_value > ' || quote_literal(pivot_sort_value) ||
+ ' ORDER BY mbe.sort_value, mbe.value ';
+
+ -- We now call the function which applies a cursor to the provided
+ -- queries, stopping at the appropriate limits and also giving us
+ -- the next page's pivot.
+ RETURN QUERY
+ SELECT * FROM metabib.staged_browse(
+ back_query, search_field, context_org, context_locations,
+ staff, browse_superpage_size, TRUE, back_limit, back_to_pivot
+ ) UNION
+ SELECT * FROM metabib.staged_browse(
+ forward_query, search_field, context_org, context_locations,
+ staff, browse_superpage_size, FALSE, forward_limit, forward_to_pivot
+ ) ORDER BY row_number DESC;
+
+END;
+$p$ LANGUAGE PLPGSQL;
+CREATE OR REPLACE FUNCTION metabib.staged_browse(
+ query TEXT,
+ fields INT[],
+ context_org INT,
+ context_locations INT[],
+ staff BOOL,
+ browse_superpage_size INT,
+ count_up_from_zero BOOL, -- if false, count down from -1
+ result_limit INT,
+ next_pivot_pos INT
+) RETURNS SETOF metabib.flat_browse_entry_appearance AS $p$
+DECLARE
+ curs REFCURSOR;
+ rec RECORD;
+ qpfts_query TEXT;
+ aqpfts_query TEXT;
+ afields INT[];
+ bfields INT[];
+ result_row metabib.flat_browse_entry_appearance%ROWTYPE;
+ results_skipped INT := 0;
+ row_counter INT := 0;
+ row_number INT;
+ slice_start INT;
+ slice_end INT;
+ full_end INT;
+ all_records BIGINT[];
+ all_brecords BIGINT[];
+ all_arecords BIGINT[];
+ superpage_of_records BIGINT[];
+ superpage_size INT;
+ unauthorized_entry RECORD;
+BEGIN
+ IF count_up_from_zero THEN
+ row_number := 0;
+ ELSE
+ row_number := -1;
+ END IF;
+
+ OPEN curs FOR EXECUTE query;
+
+ LOOP
+ FETCH curs INTO rec;
+ IF NOT FOUND THEN
+ IF result_row.pivot_point IS NOT NULL THEN
+ RETURN NEXT result_row;
+ END IF;
+ RETURN;
+ END IF;
+
+ --Is unauthorized?
+ SELECT INTO unauthorized_entry *
+ FROM metabib.browse_entry_simple_heading_map mbeshm
+ INNER JOIN authority.simple_heading ash ON ( mbeshm.simple_heading = ash.id )
+ INNER JOIN authority.control_set_authority_field acsaf ON ( acsaf.id = ash.atag )
+ JOIN authority.heading_field ahf ON (ahf.id = acsaf.heading_field)
+ WHERE mbeshm.entry = rec.id
+ AND ahf.heading_purpose = 'variant';
+
+ -- Gather aggregate data based on the MBE row we're looking at now, authority axis
+ IF (unauthorized_entry.record IS NOT NULL) THEN
+ --unauthorized term belongs to an auth linked to a bib?
+ SELECT INTO all_arecords, result_row.sees, afields
+ ARRAY_AGG(DISTINCT abl.bib),
+ STRING_AGG(DISTINCT abl.authority::TEXT, $$,$$),
+ ARRAY_AGG(DISTINCT map.metabib_field)
+ FROM authority.bib_linking abl
+ INNER JOIN authority.control_set_auth_field_metabib_field_map_refs map ON (
+ map.authority_field = unauthorized_entry.atag
+ AND map.metabib_field = ANY(fields)
+ )
+ WHERE abl.authority = unauthorized_entry.record;
+ ELSE
+ --do usual procedure
+ SELECT INTO all_arecords, result_row.sees, afields
+ ARRAY_AGG(DISTINCT abl.bib), -- bibs to check for visibility
+ STRING_AGG(DISTINCT aal.source::TEXT, $$,$$), -- authority record ids
+ ARRAY_AGG(DISTINCT map.metabib_field) -- authority-tag-linked CMF rows
+
+ FROM metabib.browse_entry_simple_heading_map mbeshm
+ JOIN authority.simple_heading ash ON ( mbeshm.simple_heading = ash.id )
+ JOIN authority.authority_linking aal ON ( ash.record = aal.source )
+ JOIN authority.bib_linking abl ON ( aal.target = abl.authority )
+ JOIN authority.control_set_auth_field_metabib_field_map_refs map ON (
+ ash.atag = map.authority_field
+ AND map.metabib_field = ANY(fields)
+ )
+ JOIN authority.control_set_authority_field acsaf ON (
+ map.authority_field = acsaf.id
+ )
+ JOIN authority.heading_field ahf ON (ahf.id = acsaf.heading_field)
+ WHERE mbeshm.entry = rec.id
+ AND ahf.heading_purpose = 'variant';
+ END IF;
+
+ -- Gather aggregate data based on the MBE row we're looking at now, bib axis
+ SELECT INTO all_brecords, result_row.authorities, bfields
+ ARRAY_AGG(DISTINCT source),
+ STRING_AGG(DISTINCT authority::TEXT, $$,$$),
+ ARRAY_AGG(DISTINCT def)
+ FROM metabib.browse_entry_def_map
+ WHERE entry = rec.id
+ AND def = ANY(fields);
+
+ SELECT INTO result_row.fields STRING_AGG(DISTINCT x::TEXT, $$,$$) FROM UNNEST(afields || bfields) x;
+
+ result_row.sources := 0;
+ result_row.asources := 0;
+
+ -- Bib-linked vis checking
+ IF ARRAY_UPPER(all_brecords,1) IS NOT NULL THEN
+
+ full_end := ARRAY_LENGTH(all_brecords, 1);
+ superpage_size := COALESCE(browse_superpage_size, full_end);
+ slice_start := 1;
+ slice_end := superpage_size;
+
+ WHILE result_row.sources = 0 AND slice_start <= full_end LOOP
+ superpage_of_records := all_brecords[slice_start:slice_end];
+ qpfts_query :=
+ 'SELECT NULL::BIGINT AS id, ARRAY[r] AS records, ' ||
+ 'NULL AS badges, NULL::NUMERIC AS popularity, ' ||
+ '1::NUMERIC AS rel FROM (SELECT UNNEST(' ||
+ quote_literal(superpage_of_records) || '::BIGINT[]) AS r) rr';
+
+ -- We use search.query_parser_fts() for visibility testing.
+ -- We're calling it once per browse-superpage worth of records
+ -- out of the set of records related to a given mbe, until we've
+ -- either exhausted that set of records or found at least 1
+ -- visible record.
+
+ SELECT INTO result_row.sources visible
+ FROM search.query_parser_fts(
+ context_org, NULL, qpfts_query, NULL,
+ context_locations, 0, NULL, NULL, FALSE, staff, FALSE
+ ) qpfts
+ WHERE qpfts.rel IS NULL;
+
+ slice_start := slice_start + superpage_size;
+ slice_end := slice_end + superpage_size;
+ END LOOP;
+
+ -- Accurate? Well, probably.
+ result_row.accurate := browse_superpage_size IS NULL OR
+ browse_superpage_size >= full_end;
+
+ END IF;
+
+ -- Authority-linked vis checking
+ IF ARRAY_UPPER(all_arecords,1) IS NOT NULL THEN
+
+ full_end := ARRAY_LENGTH(all_arecords, 1);
+ superpage_size := COALESCE(browse_superpage_size, full_end);
+ slice_start := 1;
+ slice_end := superpage_size;
+
+ WHILE result_row.asources = 0 AND slice_start <= full_end LOOP
+ superpage_of_records := all_arecords[slice_start:slice_end];
+ qpfts_query :=
+ 'SELECT NULL::BIGINT AS id, ARRAY[r] AS records, ' ||
+ 'NULL AS badges, NULL::NUMERIC AS popularity, ' ||
+ '1::NUMERIC AS rel FROM (SELECT UNNEST(' ||
+ quote_literal(superpage_of_records) || '::BIGINT[]) AS r) rr';
+
+ -- We use search.query_parser_fts() for visibility testing.
+ -- We're calling it once per browse-superpage worth of records
+ -- out of the set of records related to a given mbe, via
+ -- authority until we've either exhausted that set of records
+ -- or found at least 1 visible record.
+
+ SELECT INTO result_row.asources visible
+ FROM search.query_parser_fts(
+ context_org, NULL, qpfts_query, NULL,
+ context_locations, 0, NULL, NULL, FALSE, staff, FALSE
+ ) qpfts
+ WHERE qpfts.rel IS NULL;
+
+ slice_start := slice_start + superpage_size;
+ slice_end := slice_end + superpage_size;
+ END LOOP;
+
+
+ -- Accurate? Well, probably.
+ result_row.aaccurate := browse_superpage_size IS NULL OR
+ browse_superpage_size >= full_end;
+
+ END IF;
+
+ IF result_row.sources > 0 OR result_row.asources > 0 THEN
+
+ -- The function that calls this function needs row_number in order
+ -- to correctly order results from two different runs of this
+ -- functions.
+ result_row.row_number := row_number;
+
+ -- Now, if row_counter is still less than limit, return a row. If
+ -- not, but it is less than next_pivot_pos, continue on without
+ -- returning actual result rows until we find
+ -- that next pivot, and return it.
+
+ IF row_counter < result_limit THEN
+ result_row.browse_entry := rec.id;
+ result_row.value := rec.value;
+
+ RETURN NEXT result_row;
+ ELSE
+ result_row.browse_entry := NULL;
+ result_row.authorities := NULL;
+ result_row.fields := NULL;
+ result_row.value := NULL;
+ result_row.sources := NULL;
+ result_row.sees := NULL;
+ result_row.accurate := NULL;
+ result_row.aaccurate := NULL;
+ result_row.pivot_point := rec.id;
+
+ IF row_counter >= next_pivot_pos THEN
+ RETURN NEXT result_row;
+ RETURN;
+ END IF;
+ END IF;
+
+ IF count_up_from_zero THEN
+ row_number := row_number + 1;
+ ELSE
+ row_number := row_number - 1;
+ END IF;
+
+ -- row_counter is different from row_number.
+ -- It simply counts up from zero so that we know when
+ -- we've reached our limit.
+ row_counter := row_counter + 1;
+ END IF;
+ END LOOP;
+END;
+$p$ LANGUAGE PLPGSQL;