From fdbfe66ce0bbfbc17b0da5f4d14553ff4d1b76eb Mon Sep 17 00:00:00 2001 From: Mike Rylander Date: Fri, 4 Mar 2022 10:38:12 -0500 Subject: [PATCH] LP#1931737: DYM can cause deadlocks w/ parallel ingest This patch causes all symspell dictionary updates to occur at then end of metabib search field updates in one go, which allows Postgres' INSERT ... ON CONFLICT mechanism to properly lock and serialize changes when necessary. Signed-off-by: Mike Rylander Signed-off-by: Jane Sandberg Signed-off-by: Jason Stephenson Signed-off-by: blake --- Open-ILS/src/sql/Pg/030.schema.metabib.sql | 1 + Open-ILS/src/sql/Pg/300.schema.staged_search.sql | 179 +++++++-------- .../upgrade/WWWW.schema.dym_update_and_reify.sql | 240 +++++++++++++++++++++ 3 files changed, 318 insertions(+), 102 deletions(-) create mode 100644 Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql diff --git a/Open-ILS/src/sql/Pg/030.schema.metabib.sql b/Open-ILS/src/sql/Pg/030.schema.metabib.sql index 58163faad0..ac35fc73d3 100644 --- a/Open-ILS/src/sql/Pg/030.schema.metabib.sql +++ b/Open-ILS/src/sql/Pg/030.schema.metabib.sql @@ -1168,6 +1168,7 @@ BEGIN IF NOT b_skip_search THEN PERFORM metabib.update_combined_index_vectors(bib_id); + PERFORM search.symspell_dictionary_reify(); -- NOTE: we only use search data for symspell today END IF; RETURN; diff --git a/Open-ILS/src/sql/Pg/300.schema.staged_search.sql b/Open-ILS/src/sql/Pg/300.schema.staged_search.sql index b3b41f9af1..868ce5fb3a 100644 --- a/Open-ILS/src/sql/Pg/300.schema.staged_search.sql +++ b/Open-ILS/src/sql/Pg/300.schema.staged_search.sql @@ -1140,6 +1140,72 @@ CREATE TABLE search.symspell_dictionary ( identifier_suggestions TEXT[] ) WITH (fillfactor = 80); +-- INSERT-only table that catches updates to be reconciled +CREATE UNLOGGED TABLE search.symspell_dictionary_updates ( + transaction_id BIGINT, + keyword_count INT NOT NULL DEFAULT 0, + title_count INT NOT NULL DEFAULT 0, + author_count INT NOT NULL DEFAULT 0, + subject_count INT NOT NULL DEFAULT 0, + series_count INT NOT NULL DEFAULT 0, + identifier_count INT NOT NULL DEFAULT 0, + + prefix_key TEXT NOT NULL, + + keyword_suggestions TEXT[], + title_suggestions TEXT[], + author_suggestions TEXT[], + subject_suggestions TEXT[], + series_suggestions TEXT[], + identifier_suggestions TEXT[] +); +CREATE INDEX symspell_dictionary_updates_tid_idx ON search.symspell_dictionary_updates (transaction_id); + +CREATE OR REPLACE FUNCTION search.symspell_dictionary_reify () RETURNS SETOF search.symspell_dictionary AS $f$ + WITH new_rows AS ( + DELETE FROM search.symspell_dictionary_updates WHERE transaction_id = txid_current() RETURNING * + ), computed_rows AS ( -- this collapses the rows deleted into the format we need for UPSERT + SELECT SUM(keyword_count) AS keyword_count, + SUM(title_count) AS title_count, + SUM(author_count) AS author_count, + SUM(subject_count) AS subject_count, + SUM(series_count) AS series_count, + SUM(identifier_count) AS identifier_count, + + prefix_key, + + ARRAY_REMOVE(ARRAY_AGG(DISTINCT keyword_suggestions[1]), NULL) AS keyword_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT title_suggestions[1]), NULL) AS title_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT author_suggestions[1]), NULL) AS author_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT subject_suggestions[1]), NULL) AS subject_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT series_suggestions[1]), NULL) AS series_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT identifier_suggestions[1]), NULL) AS identifier_suggestions + FROM new_rows + GROUP BY prefix_key + ) + INSERT INTO search.symspell_dictionary AS d SELECT * FROM computed_rows + ON CONFLICT (prefix_key) DO UPDATE SET + keyword_count = GREATEST(0, d.keyword_count + EXCLUDED.keyword_count), + keyword_suggestions = evergreen.text_array_merge_unique(EXCLUDED.keyword_suggestions,d.keyword_suggestions), + + title_count = GREATEST(0, d.title_count + EXCLUDED.title_count), + title_suggestions = evergreen.text_array_merge_unique(EXCLUDED.title_suggestions,d.title_suggestions), + + author_count = GREATEST(0, d.author_count + EXCLUDED.author_count), + author_suggestions = evergreen.text_array_merge_unique(EXCLUDED.author_suggestions,d.author_suggestions), + + subject_count = GREATEST(0, d.subject_count + EXCLUDED.subject_count), + subject_suggestions = evergreen.text_array_merge_unique(EXCLUDED.subject_suggestions,d.subject_suggestions), + + series_count = GREATEST(0, d.series_count + EXCLUDED.series_count), + series_suggestions = evergreen.text_array_merge_unique(EXCLUDED.series_suggestions,d.series_suggestions), + + identifier_count = GREATEST(0, d.identifier_count + EXCLUDED.identifier_count), + identifier_suggestions = evergreen.text_array_merge_unique(EXCLUDED.identifier_suggestions,d.identifier_suggestions) + RETURNING *; +$f$ LANGUAGE SQL; + + CREATE OR REPLACE FUNCTION search.symspell_parse_words ( phrase TEXT ) RETURNS SETOF TEXT AS $F$ SELECT UNNEST(x) FROM regexp_matches($1, '([[:alnum:]]+''*[[:alnum:]]*)', 'g') x; @@ -1646,107 +1712,6 @@ BEGIN END; $F$ LANGUAGE PLPGSQL; -CREATE OR REPLACE FUNCTION search.symspell_build_and_merge_entries ( - full_input TEXT, - source_class TEXT, - old_input TEXT DEFAULT NULL, - include_phrases BOOL DEFAULT FALSE -) RETURNS SETOF search.symspell_dictionary AS $F$ -DECLARE - new_entry RECORD; - conflict_entry RECORD; -BEGIN - - IF full_input = old_input THEN -- neither NULL, and are the same - RETURN; - END IF; - - FOR new_entry IN EXECUTE $q$ - SELECT count, - prefix_key, - s AS suggestions - FROM (SELECT prefix_key, - ARRAY_AGG(DISTINCT $q$ || source_class || $q$_suggestions[1]) s, - SUM($q$ || source_class || $q$_count) count - FROM search.symspell_build_entries($1, $2, $3, $4) - GROUP BY 1) x - $q$ USING full_input, source_class, old_input, include_phrases - LOOP - EXECUTE $q$ - SELECT prefix_key, - $q$ || source_class || $q$_suggestions suggestions, - $q$ || source_class || $q$_count count - FROM search.symspell_dictionary - WHERE prefix_key = $1 $q$ - INTO conflict_entry - USING new_entry.prefix_key; - - IF new_entry.count <> 0 THEN -- Real word, and count changed - IF conflict_entry.prefix_key IS NOT NULL THEN -- we'll be updating - IF conflict_entry.count > 0 THEN -- it's a real word - RETURN QUERY EXECUTE $q$ - UPDATE search.symspell_dictionary - SET $q$ || source_class || $q$_count = $2 - WHERE prefix_key = $1 - RETURNING * $q$ - USING new_entry.prefix_key, GREATEST(0, new_entry.count + conflict_entry.count); - ELSE -- it was a prefix key or delete-emptied word before - IF conflict_entry.suggestions @> new_entry.suggestions THEN -- already have all suggestions here... - RETURN QUERY EXECUTE $q$ - UPDATE search.symspell_dictionary - SET $q$ || source_class || $q$_count = $2 - WHERE prefix_key = $1 - RETURNING * $q$ - USING new_entry.prefix_key, GREATEST(0, new_entry.count); - ELSE -- new suggestion! - RETURN QUERY EXECUTE $q$ - UPDATE search.symspell_dictionary - SET $q$ || source_class || $q$_count = $2, - $q$ || source_class || $q$_suggestions = $3 - WHERE prefix_key = $1 - RETURNING * $q$ - USING new_entry.prefix_key, GREATEST(0, new_entry.count), evergreen.text_array_merge_unique(conflict_entry.suggestions,new_entry.suggestions); - END IF; - END IF; - ELSE - -- We keep the on-conflict clause just in case... - RETURN QUERY EXECUTE $q$ - INSERT INTO search.symspell_dictionary AS d ( - $q$ || source_class || $q$_count, - prefix_key, - $q$ || source_class || $q$_suggestions - ) VALUES ( $1, $2, $3 ) ON CONFLICT (prefix_key) DO - UPDATE SET $q$ || source_class || $q$_count = d.$q$ || source_class || $q$_count + EXCLUDED.$q$ || source_class || $q$_count, - $q$ || source_class || $q$_suggestions = evergreen.text_array_merge_unique(d.$q$ || source_class || $q$_suggestions, EXCLUDED.$q$ || source_class || $q$_suggestions) - RETURNING * $q$ - USING new_entry.count, new_entry.prefix_key, new_entry.suggestions; - END IF; - ELSE -- key only, or no change - IF conflict_entry.prefix_key IS NOT NULL THEN -- we'll be updating - IF NOT conflict_entry.suggestions @> new_entry.suggestions THEN -- There are new suggestions - RETURN QUERY EXECUTE $q$ - UPDATE search.symspell_dictionary - SET $q$ || source_class || $q$_suggestions = $2 - WHERE prefix_key = $1 - RETURNING * $q$ - USING new_entry.prefix_key, evergreen.text_array_merge_unique(conflict_entry.suggestions,new_entry.suggestions); - END IF; - ELSE - RETURN QUERY EXECUTE $q$ - INSERT INTO search.symspell_dictionary AS d ( - $q$ || source_class || $q$_count, - prefix_key, - $q$ || source_class || $q$_suggestions - ) VALUES ( $1, $2, $3 ) ON CONFLICT (prefix_key) DO -- key exists, suggestions may be added due to this entry - UPDATE SET $q$ || source_class || $q$_suggestions = evergreen.text_array_merge_unique(d.$q$ || source_class || $q$_suggestions, EXCLUDED.$q$ || source_class || $q$_suggestions) - RETURNING * $q$ - USING new_entry.count, new_entry.prefix_key, new_entry.suggestions; - END IF; - END IF; - END LOOP; -END; -$F$ LANGUAGE PLPGSQL; - CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$ DECLARE search_class TEXT; @@ -1763,7 +1728,17 @@ BEGIN old_value := OLD.value; END IF; - PERFORM * FROM search.symspell_build_and_merge_entries(new_value, search_class, old_value); + IF new_value = old_value THEN + -- same, move along + ELSE + INSERT INTO search.symspell_dictionary_updates + SELECT txid_current(), * + FROM search.symspell_build_entries( + new_value, + search_class, + old_value + ); + END IF; RETURN NULL; -- always fired AFTER END; diff --git a/Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql b/Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql new file mode 100644 index 0000000000..8ff177232d --- /dev/null +++ b/Open-ILS/src/sql/Pg/upgrade/WWWW.schema.dym_update_and_reify.sql @@ -0,0 +1,240 @@ +BEGIN; + +-- INSERT-only table that catches dictionary updates to be reconciled +CREATE UNLOGGED TABLE search.symspell_dictionary_updates ( + transaction_id BIGINT, + keyword_count INT NOT NULL DEFAULT 0, + title_count INT NOT NULL DEFAULT 0, + author_count INT NOT NULL DEFAULT 0, + subject_count INT NOT NULL DEFAULT 0, + series_count INT NOT NULL DEFAULT 0, + identifier_count INT NOT NULL DEFAULT 0, + + prefix_key TEXT NOT NULL, + + keyword_suggestions TEXT[], + title_suggestions TEXT[], + author_suggestions TEXT[], + subject_suggestions TEXT[], + series_suggestions TEXT[], + identifier_suggestions TEXT[] +); +CREATE INDEX symspell_dictionary_updates_tid_idx ON search.symspell_dictionary_updates (transaction_id); + +-- Function that collects this transactions additions to the unlogged update table +CREATE OR REPLACE FUNCTION search.symspell_dictionary_reify () RETURNS SETOF search.symspell_dictionary AS $f$ + WITH new_rows AS ( + DELETE FROM search.symspell_dictionary_updates WHERE transaction_id = txid_current() RETURNING * + ), computed_rows AS ( -- this collapses the rows deleted into the format we need for UPSERT + SELECT SUM(keyword_count) AS keyword_count, + SUM(title_count) AS title_count, + SUM(author_count) AS author_count, + SUM(subject_count) AS subject_count, + SUM(series_count) AS series_count, + SUM(identifier_count) AS identifier_count, + + prefix_key, + + ARRAY_REMOVE(ARRAY_AGG(DISTINCT keyword_suggestions[1]), NULL) AS keyword_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT title_suggestions[1]), NULL) AS title_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT author_suggestions[1]), NULL) AS author_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT subject_suggestions[1]), NULL) AS subject_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT series_suggestions[1]), NULL) AS series_suggestions, + ARRAY_REMOVE(ARRAY_AGG(DISTINCT identifier_suggestions[1]), NULL) AS identifier_suggestions + FROM new_rows + GROUP BY prefix_key + ) + INSERT INTO search.symspell_dictionary AS d SELECT * FROM computed_rows + ON CONFLICT (prefix_key) DO UPDATE SET + keyword_count = GREATEST(0, d.keyword_count + EXCLUDED.keyword_count), + keyword_suggestions = evergreen.text_array_merge_unique(EXCLUDED.keyword_suggestions,d.keyword_suggestions), + + title_count = GREATEST(0, d.title_count + EXCLUDED.title_count), + title_suggestions = evergreen.text_array_merge_unique(EXCLUDED.title_suggestions,d.title_suggestions), + + author_count = GREATEST(0, d.author_count + EXCLUDED.author_count), + author_suggestions = evergreen.text_array_merge_unique(EXCLUDED.author_suggestions,d.author_suggestions), + + subject_count = GREATEST(0, d.subject_count + EXCLUDED.subject_count), + subject_suggestions = evergreen.text_array_merge_unique(EXCLUDED.subject_suggestions,d.subject_suggestions), + + series_count = GREATEST(0, d.series_count + EXCLUDED.series_count), + series_suggestions = evergreen.text_array_merge_unique(EXCLUDED.series_suggestions,d.series_suggestions), + + identifier_count = GREATEST(0, d.identifier_count + EXCLUDED.identifier_count), + identifier_suggestions = evergreen.text_array_merge_unique(EXCLUDED.identifier_suggestions,d.identifier_suggestions) + RETURNING *; +$f$ LANGUAGE SQL; + +-- simplified metabib.*_field_entry trigger that stages updates for reification in one go +CREATE OR REPLACE FUNCTION search.symspell_maintain_entries () RETURNS TRIGGER AS $f$ +DECLARE + search_class TEXT; + new_value TEXT := NULL; + old_value TEXT := NULL; +BEGIN + search_class := COALESCE(TG_ARGV[0], SPLIT_PART(TG_TABLE_NAME,'_',1)); + + IF TG_OP IN ('INSERT', 'UPDATE') THEN + new_value := NEW.value; + END IF; + + IF TG_OP IN ('DELETE', 'UPDATE') THEN + old_value := OLD.value; + END IF; + + IF new_value = old_value THEN + -- same, move along + ELSE + INSERT INTO search.symspell_dictionary_updates + SELECT txid_current(), * + FROM search.symspell_build_entries( + new_value, + search_class, + old_value + ); + END IF; + + RETURN NULL; -- always fired AFTER +END; +$f$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries( + bib_id BIGINT, + skip_facet BOOL DEFAULT FALSE, + skip_display BOOL DEFAULT FALSE, + skip_browse BOOL DEFAULT FALSE, + skip_search BOOL DEFAULT FALSE, + only_fields INT[] DEFAULT '{}'::INT[] +) RETURNS VOID AS $func$ +DECLARE + fclass RECORD; + ind_data metabib.field_entry_template%ROWTYPE; + mbe_row metabib.browse_entry%ROWTYPE; + mbe_id BIGINT; + b_skip_facet BOOL; + b_skip_display BOOL; + b_skip_browse BOOL; + b_skip_search BOOL; + value_prepped TEXT; + field_list INT[] := only_fields; + field_types TEXT[] := '{}'::TEXT[]; +BEGIN + + IF field_list = '{}'::INT[] THEN + SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field; + END IF; + + SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet; + SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display; + SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse; + SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search; + + IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF; + IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF; + IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF; + IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF; + + PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled; + IF NOT FOUND THEN + IF NOT b_skip_search THEN + FOR fclass IN SELECT * FROM config.metabib_class LOOP + -- RAISE NOTICE 'Emptying out %', fclass.name; + EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id; + END LOOP; + END IF; + IF NOT b_skip_facet THEN + DELETE FROM metabib.facet_entry WHERE source = bib_id; + END IF; + IF NOT b_skip_display THEN + DELETE FROM metabib.display_entry WHERE source = bib_id; + END IF; + IF NOT b_skip_browse THEN + DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id; + END IF; + END IF; + + FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP + + -- don't store what has been normalized away + CONTINUE WHEN ind_data.value IS NULL; + + IF ind_data.field < 0 THEN + ind_data.field = -1 * ind_data.field; + END IF; + + IF ind_data.facet_field AND NOT b_skip_facet THEN + INSERT INTO metabib.facet_entry (field, source, value) + VALUES (ind_data.field, ind_data.source, ind_data.value); + END IF; + + IF ind_data.display_field AND NOT b_skip_display THEN + INSERT INTO metabib.display_entry (field, source, value) + VALUES (ind_data.field, ind_data.source, ind_data.value); + END IF; + + + IF ind_data.browse_field AND NOT b_skip_browse THEN + -- A caveat about this SELECT: this should take care of replacing + -- old mbe rows when data changes, but not if normalization (by + -- which I mean specifically the output of + -- evergreen.oils_tsearch2()) changes. It may or may not be + -- expensive to add a comparison of index_vector to index_vector + -- to the WHERE clause below. + + CONTINUE WHEN ind_data.sort_value IS NULL; + + value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field); + IF ind_data.browse_nocase THEN + SELECT INTO mbe_row * FROM metabib.browse_entry + WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value + ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess + ELSE + SELECT INTO mbe_row * FROM metabib.browse_entry + WHERE value = value_prepped AND sort_value = ind_data.sort_value; + END IF; + + IF FOUND THEN + mbe_id := mbe_row.id; + ELSE + INSERT INTO metabib.browse_entry + ( value, sort_value ) VALUES + ( value_prepped, ind_data.sort_value ); + + mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS); + END IF; + + INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority) + VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority); + END IF; + + IF ind_data.search_field AND NOT b_skip_search THEN + -- Avoid inserting duplicate rows + EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class || + '_field_entry WHERE field = $1 AND source = $2 AND value = $3' + INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value; + -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id; + IF mbe_id IS NULL THEN + EXECUTE $$ + INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value) + VALUES ($$ || + quote_literal(ind_data.field) || $$, $$ || + quote_literal(ind_data.source) || $$, $$ || + quote_literal(ind_data.value) || + $$);$$; + END IF; + END IF; + + END LOOP; + + IF NOT b_skip_search THEN + PERFORM metabib.update_combined_index_vectors(bib_id); + PERFORM search.symspell_dictionary_reify(); + END IF; + + RETURN; +END; +$func$ LANGUAGE PLPGSQL; + +COMMIT; + -- 2.11.0