From: Mike Rylander Date: Fri, 4 Mar 2022 15:41:07 +0000 (-0500) Subject: LP#1979071: Queued Ingest X-Git-Url: https://old-git.evergreen-ils.org/?a=commitdiff_plain;h=a52ea2edb5f6308f31a28fdd6d4a46bacae9a94d;p=Evergreen.git LP#1979071: Queued Ingest This feature allows for the separation of bib and authority record updates and the search (and other) indexing that occurs when a record is modified in some way. The Queued Ingest mechanism consists of several parts working together: * A set of configuration flags that control when ingest should be performed immediately, and when it can be deferred until after the transaction commits and control is returned to the user. * Refactoring of the in-database ingest triggers to separate deciding what should happen to a record given a data modification event, and when/how that process should take place. * A set of queuing tables used to track which records are to be processed and in what ways, when that processing was requested, and the ability to group processing requests into named queues that can report who made a processing request and for what purpose. * A Queued Ingest Coordinator that runs in the background monitoring the queuing tables for activity and processes records as they are enqueued. This can run on any server that can connect to the database and has the OpenSRF Perl modules installed. * A command line tool to be usedby administrators to enqueue records for Queued Ingest processing, to create named queues, and to process enqueued records either in one queue or all outstanding enqueued entries. This tool can also report on the status of requested Queued Ingest processing, whether pending, ongoing, or complete, either for all time or since a particular date and time. The queuing tables added here are not yet included in the IDL for reporting or Staff interface construction. They can be added as the need arises. To test Queued Ingest: * Upgrade the database schema using the appropriate upgrade script. * Install OpenILS/src/support-scripts/ingest_ctl in the usual OpenSRF bin directory. * Start the Queued Ingest Coordinator by using the --coordinator parameter to ingest_ctl along with the relevant database parameters. Use the --help parameter for more details. * Enable the Global Flag called 'ingest.queued.all' * Edit, import, and delete some bib and authority records, and use the --stats parameter to ingest_ctl to see entries being processed. Signed-off-by: Mike Rylander Signed-off-by: Bill Erickson Signed-off-by: Galen Charlton --- diff --git a/Open-ILS/src/perlmods/lib/OpenILS/Application/Cat/BibCommon.pm b/Open-ILS/src/perlmods/lib/OpenILS/Application/Cat/BibCommon.pm index d10cfd1f1d..2114b0746a 100644 --- a/Open-ILS/src/perlmods/lib/OpenILS/Application/Cat/BibCommon.pm +++ b/Open-ILS/src/perlmods/lib/OpenILS/Application/Cat/BibCommon.pm @@ -71,8 +71,20 @@ sub biblio_record_replace_marc { $rec->editor($e->requestor->id); $rec->edit_date('now'); $rec->marc($marc); + + my $inline_ingest = $e->retrieve_config_global_flag('ingest.queued.biblio.update.marc_edit_inline'); + $inline_ingest = ($inline_ingest and $U->is_true($inline_ingest->enabled)); + + $e->json_query( + {from => [ 'action.set_queued_ingest_force', 'ingest.queued.biblio.update.disabled' ]} + ) if ($inline_ingest); + $e->update_biblio_record_entry($rec) or return $e->die_event; + $e->json_query( + {from => ['action.clear_queued_ingest_force']} + ) if ($inline_ingest); + return $rec; } @@ -114,8 +126,19 @@ sub biblio_record_xml_import { $record->edit_date('now'); $record->marc($marc); + my $inline_ingest = $e->retrieve_config_global_flag('ingest.queued.biblio.insert.marc_edit_inline'); + $inline_ingest = ($inline_ingest and $U->is_true($inline_ingest->enabled)); + + $e->json_query( + {from => [ 'action.set_queued_ingest_force', 'ingest.queued.biblio.insert.disabled' ]} + ) if ($inline_ingest); + $record = $e->create_biblio_record_entry($record) or return $e->die_event; + $e->json_query( + {from => ['action.clear_queued_ingest_force']} + ) if ($inline_ingest); + if($use_id) { my $existing = $e->search_biblio_record_entry( { diff --git a/Open-ILS/src/sql/Pg/030.schema.metabib.sql b/Open-ILS/src/sql/Pg/030.schema.metabib.sql index 40cd60ecaa..0b80ca986e 100644 --- a/Open-ILS/src/sql/Pg/030.schema.metabib.sql +++ b/Open-ILS/src/sql/Pg/030.schema.metabib.sql @@ -1038,11 +1038,11 @@ BEGIN END; $func$ LANGUAGE PLPGSQL; -CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries( +CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries( bib_id BIGINT, - skip_facet BOOL DEFAULT FALSE, + skip_facet BOOL DEFAULT FALSE, skip_display BOOL DEFAULT FALSE, - skip_browse BOOL DEFAULT FALSE, + skip_browse BOOL DEFAULT FALSE, skip_search BOOL DEFAULT FALSE, only_fields INT[] DEFAULT '{}'::INT[] ) RETURNS VOID AS $func$ @@ -1078,24 +1078,23 @@ BEGIN IF NOT FOUND THEN IF NOT b_skip_search THEN FOR fclass IN SELECT * FROM config.metabib_class LOOP - -- RAISE NOTICE 'Emptying out %', fclass.name; - EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id; + EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list; END LOOP; END IF; IF NOT b_skip_facet THEN - DELETE FROM metabib.facet_entry WHERE source = bib_id; + DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list); END IF; IF NOT b_skip_display THEN - DELETE FROM metabib.display_entry WHERE source = bib_id; + DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list); END IF; IF NOT b_skip_browse THEN - DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id; + DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list); END IF; END IF; FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP - -- don't store what has been normalized away + -- don't store what has been normalized away CONTINUE WHEN ind_data.value IS NULL; IF ind_data.field < 0 THEN @@ -1124,23 +1123,20 @@ BEGIN CONTINUE WHEN ind_data.sort_value IS NULL; value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field); - IF ind_data.browse_nocase THEN + IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that SELECT INTO mbe_row * FROM metabib.browse_entry WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess - ELSE - SELECT INTO mbe_row * FROM metabib.browse_entry - WHERE value = value_prepped AND sort_value = ind_data.sort_value; END IF; - IF FOUND THEN + IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use mbe_id := mbe_row.id; - ELSE + ELSE -- otherwise, an UPSERT-protected variant INSERT INTO metabib.browse_entry ( value, sort_value ) VALUES - ( value_prepped, ind_data.sort_value ); - - mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS); + ( value_prepped, ind_data.sort_value ) + ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id + RETURNING id INTO mbe_id; END IF; INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority) @@ -1697,7 +1693,7 @@ BEGIN norm_attr_value := '{}'::TEXT[]; attr_vector_tmp := '{}'::INT[]; - SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; + SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection SELECT ARRAY_AGG(value) INTO attr_value @@ -1705,7 +1701,7 @@ BEGIN WHERE record = rid AND tag LIKE attr_def.tag AND CASE - WHEN attr_def.sf_list IS NOT NULL + WHEN attr_def.sf_list IS NOT NULL THEN POSITION(subfield IN attr_def.sf_list) > 0 ELSE TRUE END @@ -1730,7 +1726,7 @@ BEGIN IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format; - + -- See if we can skip the XSLT ... it's expensive IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN -- Can't skip the transform @@ -1739,7 +1735,7 @@ BEGIN ELSE transformed_xml := rmarc; END IF; - + prev_xfrm := xfrm.name; END IF; @@ -1804,7 +1800,7 @@ BEGIN norm_attr_value := norm_attr_value || tmp_val; END IF; END LOOP; - + IF attr_def.filter THEN -- Create unknown uncontrolled values and find the IDs of the values IF ccvm_row.id IS NULL THEN @@ -1842,7 +1838,7 @@ BEGIN requested attrs, and then add back the new set of attr values. */ - IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN + IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid; SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list); attr_vector := attr_vector || attr_vector_tmp; @@ -1877,94 +1873,225 @@ BEGIN END LOOP; IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN - IF rdeleted THEN -- initial insert OR revivication - DELETE FROM metabib.record_attr_vector_list WHERE source = rid; - INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector); - ELSE - UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid; - END IF; + INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector) + ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist; END IF; END; $func$ LANGUAGE PLPGSQL; - --- AFTER UPDATE OR INSERT trigger for biblio.record_entry -CREATE OR REPLACE FUNCTION biblio.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$ +CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ DECLARE tmp_bool BOOL; + diag_detail TEXT; + diag_context TEXT; BEGIN + PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled; + tmp_bool := FOUND; - IF NEW.deleted THEN -- If this bib is deleted - - PERFORM * FROM config.internal_flag WHERE - name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled; - - tmp_bool := FOUND; -- Just in case this is changed by some other statement + PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool); - PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint, TRUE, tmp_bool ); - - IF NOT tmp_bool THEN - -- One needs to keep these around to support searches - -- with the #deleted modifier, so one should turn on the named - -- internal flag for that functionality. - DELETE FROM metabib.record_attr_vector_list WHERE source = NEW.id; - END IF; - - DELETE FROM authority.bib_linking WHERE bib = NEW.id; -- Avoid updating fields in bibs that are no longer visible - DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = NEW.id; -- Separate any multi-homed items - DELETE FROM metabib.browse_entry_def_map WHERE source = NEW.id; -- Don't auto-suggest deleted bibs - RETURN NEW; -- and we're done + IF NOT tmp_bool THEN + -- One needs to keep these around to support searches + -- with the #deleted modifier, so one should turn on the named + -- internal flag for that functionality. + DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id; END IF; - IF TG_OP = 'UPDATE' AND OLD.deleted IS FALSE THEN -- re-ingest? - PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled; + DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible + DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items + DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs - IF NOT FOUND AND OLD.marc = NEW.marc THEN -- don't do anything if the MARC didn't change - RETURN NEW; - END IF; - END IF; + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + skip_facet BOOL := FALSE; + skip_display BOOL := FALSE; + skip_browse BOOL := FALSE; + skip_search BOOL := FALSE; + skip_auth BOOL := FALSE; + skip_full BOOL := FALSE; + skip_attrs BOOL := FALSE; + skip_luri BOOL := FALSE; + skip_mrmap BOOL := FALSE; + only_attrs TEXT[] := NULL; + only_fields INT[] := '{}'::INT[]; + diag_detail TEXT; + diag_context TEXT; +BEGIN -- Record authority linking + SELECT extra LIKE '%skip_authority%' INTO skip_auth; PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled; - IF NOT FOUND THEN - PERFORM biblio.map_authority_linking( NEW.id, NEW.marc ); + IF NOT FOUND AND NOT skip_auth THEN + PERFORM biblio.map_authority_linking( bib.id, bib.marc ); END IF; -- Flatten and insert the mfr data + SELECT extra LIKE '%skip_full_rec%' INTO skip_full; PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled; - IF NOT FOUND THEN - PERFORM metabib.reingest_metabib_full_rec(NEW.id); + IF NOT FOUND AND NOT skip_full THEN + PERFORM metabib.reingest_metabib_full_rec(bib.id); + END IF; - -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields - PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled; - IF NOT FOUND THEN - PERFORM metabib.reingest_record_attributes(NEW.id, NULL, NEW.marc, TG_OP = 'INSERT' OR OLD.deleted); + -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields + SELECT extra LIKE '%skip_attrs%' INTO skip_attrs; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled; + IF NOT FOUND AND NOT skip_attrs THEN + IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN + SELECT REGEXP_SPLIT_TO_ARRAY( + (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1], + '\s*,\s*' + ) INTO only_attrs; END IF; + + PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only); END IF; -- Gather and insert the field entry data - PERFORM metabib.reingest_metabib_field_entries(NEW.id); + SELECT extra LIKE '%skip_facet%' INTO skip_facet; + SELECT extra LIKE '%skip_display%' INTO skip_display; + SELECT extra LIKE '%skip_browse%' INTO skip_browse; + SELECT extra LIKE '%skip_search%' INTO skip_search; + + IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN + SELECT REGEXP_SPLIT_TO_ARRAY( + (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1], + '\s*,\s*' + )::INT[] INTO only_fields; + END IF; + + IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN + PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields); + END IF; -- Located URI magic + SELECT extra LIKE '%skip_luri%' INTO skip_luri; PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled; - IF NOT FOUND THEN PERFORM biblio.extract_located_uris( NEW.id, NEW.marc, NEW.editor ); END IF; + IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF; -- (re)map metarecord-bib linking - IF TG_OP = 'INSERT' THEN -- if not deleted and performing an insert, check for the flag + SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap; + IF insert_only THEN -- if not deleted and performing an insert, check for the flag PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled; - IF NOT FOUND THEN - PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint ); + IF NOT FOUND AND NOT skip_mrmap THEN + PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint ); END IF; ELSE -- we're doing an update, and we're not deleted, remap PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled; + IF NOT FOUND AND NOT skip_mrmap THEN + PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint ); + END IF; + END IF; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + tmp_bool BOOL; + diag_detail TEXT; + diag_context TEXT; +BEGIN + DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible + DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records + DELETE FROM authority.simple_heading WHERE record = NEW.id; + -- Should remove matching $0 from controlled fields at the same time? + + -- XXX What do we about the actual linking subfields present in + -- authority records that target this one when this happens? + DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + ashs authority.simple_heading%ROWTYPE; + mbe_row metabib.browse_entry%ROWTYPE; + mbe_id BIGINT; + ash_id BIGINT; + diag_detail TEXT; + diag_context TEXT; +BEGIN + + -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled; + + IF NOT FOUND AND auth.heading <> old_heading THEN + PERFORM authority.propagate_changes(auth.id); + END IF; + + IF NOT insert_only THEN + DELETE FROM authority.authority_linking WHERE source = auth.id; + DELETE FROM authority.simple_heading WHERE record = auth.id; + END IF; + + INSERT INTO authority.authority_linking (source, target, field) + SELECT source, target, field FROM authority.calculate_authority_linking( + auth.id, auth.control_set, auth.marc::XML + ); + + FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP + + INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus) + VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus); + ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS); + + SELECT INTO mbe_row * FROM metabib.browse_entry + WHERE value = ashs.value AND sort_value = ashs.sort_value; + + IF FOUND THEN + mbe_id := mbe_row.id; + ELSE + INSERT INTO metabib.browse_entry + ( value, sort_value ) VALUES + ( ashs.value, ashs.sort_value ); + + mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS); + END IF; + + INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id); + + END LOOP; + + -- Flatten and insert the afr data + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled; + IF NOT FOUND THEN + PERFORM authority.reingest_authority_full_rec(auth.id); + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled; IF NOT FOUND THEN - PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint ); + PERFORM authority.reingest_authority_rec_descriptor(auth.id); END IF; END IF; - RETURN NEW; + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; END; $func$ LANGUAGE PLPGSQL; diff --git a/Open-ILS/src/sql/Pg/090.schema.action.sql b/Open-ILS/src/sql/Pg/090.schema.action.sql index 5a86cd3353..1c5572a0e5 100644 --- a/Open-ILS/src/sql/Pg/090.schema.action.sql +++ b/Open-ILS/src/sql/Pg/090.schema.action.sql @@ -1797,5 +1797,394 @@ CREATE TABLE action.batch_hold_event_map ( hold INT NOT NULL REFERENCES action.hold_request (id) ON UPDATE CASCADE ON DELETE CASCADE ); +CREATE TABLE action.ingest_queue ( + id SERIAL PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED, + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, + threads INT, + why TEXT +); + +CREATE TABLE action.ingest_queue_entry ( + id BIGSERIAL PRIMARY KEY, + record BIGINT NOT NULL, -- points to a record id of the appropriate record_type + record_type TEXT NOT NULL, + action TEXT NOT NULL, + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + state_data TEXT NOT NULL DEFAULT '', + queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, + override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED, + ingest_time TIMESTAMPTZ, + fail_time TIMESTAMPTZ +); +CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL; +CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL; + +CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry ( + record_id BIGINT, + rtype TEXT DEFAULT 'biblio', + when_to_run TIMESTAMPTZ DEFAULT NOW(), + queue_id INT DEFAULT NULL, + ingest_action TEXT DEFAULT 'update', -- will be the most common? + old_state_data TEXT DEFAULT '' +) RETURNS BOOL AS $F$ +DECLARE + new_entry action.ingest_queue_entry%ROWTYPE; + prev_del_entry action.ingest_queue_entry%ROWTYPE; + diag_detail TEXT; + diag_context TEXT; +BEGIN + + IF ingest_action = 'delete' THEN + -- first see if there is an outstanding entry + SELECT * INTO prev_del_entry + FROM action.ingest_queue_entry + WHERE qe.record = record_id + AND qe.state_date = old_state_data + AND qe.record_type = rtype + AND qe.ingest_time IS NULL + AND qe.override_by IS NULL; + END IF; + + WITH existing_queue_entry_cte AS ( + SELECT queue_id AS queue, + rtype AS record_type, + record_id AS record, + qe.id AS override_by, + ingest_action AS action, + q.run_at AS run_at, + old_state_data AS state_data + FROM action.ingest_queue_entry qe + JOIN action.ingest_queue q ON (qe.queue = q.id) + WHERE qe.record = record_id + AND q.end_time IS NULL + AND qe.record_type = rtype + AND qe.state_data = old_state_data + AND qe.ingest_time IS NULL + AND qe.fail_time IS NULL + AND qe.override_by IS NULL + ), existing_nonqueue_entry_cte AS ( + SELECT queue_id AS queue, + rtype AS record_type, + record_id AS record, + qe.id AS override_by, + ingest_action AS action, + qe.run_at AS run_at, + old_state_data AS state_data + FROM action.ingest_queue_entry qe + WHERE qe.record = record_id + AND qe.queue IS NULL + AND qe.record_type = rtype + AND qe.state_data = old_state_data + AND qe.ingest_time IS NULL + AND qe.fail_time IS NULL + AND qe.override_by IS NULL + ), new_entry_cte AS ( + SELECT * FROM existing_queue_entry_cte + UNION ALL + SELECT * FROM existing_nonqueue_entry_cte + UNION ALL + SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data + ), insert_entry_cte AS ( + INSERT INTO action.ingest_queue_entry + (queue, record_type, record, override_by, action, run_at, state_data) + SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte + ORDER BY 4 NULLS LAST, 6 + LIMIT 1 + RETURNING * + ) SELECT * INTO new_entry FROM insert_entry_cte; + + IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry + UPDATE action.ingest_queue_entry + SET override_by = new_entry.id + WHERE id = prev_del_entry.id; + + UPDATE action.ingest_queue_entry + SET override_by = NULL + WHERE id = new_entry.id; + + ELSIF new_entry.override_by IS NOT NULL THEN + RETURN TRUE; -- already handled, don't notify + END IF; + + NOTIFY queued_ingest; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$F$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$ +DECLARE + ingest_success BOOL := NULL; + qe action.ingest_queue_entry%ROWTYPE; +BEGIN + + SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid; + IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN + RETURN TRUE; -- Already done + END IF; + + IF qe.action = 'delete' THEN + IF qe.record_type = 'biblio' THEN + SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record; + ELSIF qe.record_type = 'authority' THEN + SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record; + END IF; + ELSE + IF qe.record_type = 'biblio' THEN + IF qe.action = 'propagate' THEN + SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success; + ELSE + SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record; + END IF; + ELSIF qe.record_type = 'authority' THEN + SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record; + END IF; + END IF; + + IF NOT ingest_success THEN + UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled; + IF FOUND THEN + RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id; + ELSE + RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id; + END IF; + ELSE + UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id; + END IF; + + RETURN ingest_success; +END; +$func$ LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$ +BEGIN + IF NEW.ingest_time IS NOT NULL THEN + UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id; + END IF; + + RETURN NULL; +END; +$F$ LANGUAGE PLPGSQL; + +CREATE TRIGGER complete_duplicated_entries_trigger + AFTER UPDATE ON action.ingest_queue_entry + FOR EACH ROW WHEN (NEW.override_by IS NULL) + EXECUTE PROCEDURE action.complete_duplicated_entries(); + +CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$ + $_SHARED{"ingest_queue_id"} = $_[0]; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$ + return $_SHARED{"ingest_queue_id"}; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$ + delete($_SHARED{"ingest_queue_id"}); +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$ + $_SHARED{"ingest_queue_force"} = $_[0]; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$ + return $_SHARED{"ingest_queue_force"}; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$ + delete($_SHARED{"ingest_queue_force"}); +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION authority.propagate_changes + (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$ +DECLARE + queuing_success BOOL := FALSE; +BEGIN + + PERFORM 1 FROM config.global_flag + WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate') + AND enabled; + + IF FOUND THEN + -- XXX enqueue special 'propagate' bib action + SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success; + + IF queuing_success THEN + RETURN aid; + END IF; + END IF; + + PERFORM authority.apply_propagate_changes(aid, bid); + RETURN aid; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.apply_propagate_changes + (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$ +DECLARE + bib_forced BOOL := FALSE; + bib_rec biblio.record_entry%ROWTYPE; + new_marc TEXT; +BEGIN + + SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid; + + new_marc := vandelay.merge_record_xml( + bib_rec.marc, authority.generate_overlay_template(aid)); + + IF new_marc = bib_rec.marc THEN + -- Authority record change had no impact on this bib record. + -- Nothing left to do. + RETURN aid; + END IF; + + PERFORM 1 FROM config.global_flag + WHERE name = 'ingest.disable_authority_auto_update_bib_meta' + AND enabled; + + IF NOT FOUND THEN + -- update the bib record editor and edit_date + bib_rec.editor := ( + SELECT editor FROM authority.record_entry WHERE id = aid); + bib_rec.edit_date = NOW(); + END IF; + + PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled'); + + UPDATE biblio.record_entry SET + marc = new_marc, + editor = bib_rec.editor, + edit_date = bib_rec.edit_date + WHERE id = bid; + + PERFORM action.clear_queued_ingest_force(); + + RETURN aid; + +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$ +DECLARE + old_state_data TEXT := ''; + new_action TEXT; + queuing_force TEXT; + queuing_flag_name TEXT; + queuing_flag BOOL := FALSE; + queuing_success BOOL := FALSE; + ingest_success BOOL := FALSE; + ingest_queue INT; +BEGIN + + -- Identify the ingest action type + IF TG_OP = 'UPDATE' THEN + + -- Gather type-specific data for later use + IF TG_TABLE_SCHEMA = 'authority' THEN + old_state_data = OLD.heading; + END IF; + + IF NOT OLD.deleted THEN -- maybe reingest? + IF NEW.deleted THEN + new_action = 'delete'; -- nope, delete + ELSE + new_action = 'update'; -- yes, update + END IF; + ELSIF NOT NEW.deleted THEN + new_action = 'insert'; -- revivify, AKA insert + ELSE + RETURN NEW; -- was and is still deleted, don't ingest + END IF; + ELSIF TG_OP = 'INSERT' THEN + new_action = 'insert'; -- brand new + ELSE + RETURN OLD; -- really deleting the record + END IF; + + queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action; + -- See if we should be queuing anything + SELECT enabled INTO queuing_flag + FROM config.internal_flag + WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name) + AND enabled + LIMIT 1; + + SELECT action.get_queued_ingest_force() INTO queuing_force; + IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN + queuing_flag := TRUE; + END IF; + + -- you (or part of authority propagation) can forcibly disable specific queuing actions + IF queuing_force = queuing_flag_name||'.disabled' THEN + queuing_flag := FALSE; + END IF; + + -- And if we should be queuing ... + IF queuing_flag THEN + ingest_queue := action.get_ingest_queue(); + + -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)... + IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest? + + PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled; + + -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed + IF NOT FOUND AND OLD.marc = NEW.marc THEN + RETURN NEW; + END IF; + END IF; + + -- Otherwise, attempt to enqueue + SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success; + END IF; + + -- If queuing was not requested, or failed for some reason, do it live. + IF NOT queuing_success THEN + IF queuing_flag THEN + RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id; + END IF; + + IF new_action = 'delete' THEN + IF TG_TABLE_SCHEMA = 'biblio' THEN + SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success; + ELSIF TG_TABLE_SCHEMA = 'authority' THEN + SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success; + END IF; + ELSE + IF TG_TABLE_SCHEMA = 'biblio' THEN + SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success; + ELSIF TG_TABLE_SCHEMA = 'authority' THEN + SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success; + END IF; + END IF; + + IF NOT ingest_success THEN + PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled; + IF FOUND THEN + RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id; + ELSE + RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id; + END IF; + END IF; + END IF; + + RETURN NEW; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete (); +CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete (); + COMMIT; diff --git a/Open-ILS/src/sql/Pg/950.data.seed-values.sql b/Open-ILS/src/sql/Pg/950.data.seed-values.sql index 20ba28ca48..1e0c6f65ba 100644 --- a/Open-ILS/src/sql/Pg/950.data.seed-values.sql +++ b/Open-ILS/src/sql/Pg/950.data.seed-values.sql @@ -23170,3 +23170,105 @@ VALUES ( ) ); +INSERT INTO config.global_flag (name, enabled, label) VALUES ( + 'ingest.queued.max_threads', TRUE, + oils_i18n_gettext( + 'ingest.queued.max_threads', + 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes', + 'cgf', + 'label' + )),( + 'ingest.queued.abort_on_error', FALSE, + oils_i18n_gettext( + 'ingest.queued.abort_on_error', + 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.propagate', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.propagate', + 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled', + 'cgf', + 'label' + )),( + 'ingest.queued.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.all', + 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.all', + 'Queued Ingest: Use Queued Ingest for all bib record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.all', + 'Queued Ingest: Use Queued Ingest for all authority record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.insert.marc_edit_inline', TRUE, + oils_i18n_gettext( + 'ingest.queued.biblio.insert.marc_edit_inline', + 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.insert', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.insert', + 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.insert', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.insert', + 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.update.marc_edit_inline', TRUE, + oils_i18n_gettext( + 'ingest.queued.biblio.update.marc_edit_inline', + 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.update', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.update', + 'Queued Ingest: Use Queued Ingest for bib record ingest on update', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.update', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.update', + 'Queued Ingest: Use Queued Ingest for authority record ingest on update', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.delete', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.delete', + 'Queued Ingest: Use Queued Ingest for bib record ingest on delete', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.delete', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.delete', + 'Queued Ingest: Use Queued Ingest for authority record ingest on delete', + 'cgf', + 'label' + ) +); + +UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads'; diff --git a/Open-ILS/src/sql/Pg/999.functions.global.sql b/Open-ILS/src/sql/Pg/999.functions.global.sql index 9060299219..d115364c54 100644 --- a/Open-ILS/src/sql/Pg/999.functions.global.sql +++ b/Open-ILS/src/sql/Pg/999.functions.global.sql @@ -1568,11 +1568,9 @@ $func$ LANGUAGE PLPGSQL; -- Ingest triggers CREATE TRIGGER fingerprint_tgr BEFORE INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE biblio.fingerprint_trigger ('eng','BKS'); -CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE biblio.indexing_ingest_or_delete (); CREATE TRIGGER bbb_simple_rec_trigger AFTER INSERT OR UPDATE OR DELETE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE reporter.simple_rec_trigger (); CREATE TRIGGER map_thesaurus_to_control_set BEFORE INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE authority.map_thesaurus_to_control_set (); -CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE authority.indexing_ingest_or_delete (); -- Utility routines, callable via cstore diff --git a/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.queued_ingest.sql b/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.queued_ingest.sql new file mode 100644 index 0000000000..13abd49d35 --- /dev/null +++ b/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.queued_ingest.sql @@ -0,0 +1,1084 @@ +BEGIN; + +INSERT INTO config.global_flag (name, enabled, label) VALUES ( + 'ingest.queued.max_threads', TRUE, + oils_i18n_gettext( + 'ingest.queued.max_threads', + 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes', + 'cgf', + 'label' + )),( + 'ingest.queued.abort_on_error', FALSE, + oils_i18n_gettext( + 'ingest.queued.abort_on_error', + 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.propagate', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.propagate', + 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled', + 'cgf', + 'label' + )),( + 'ingest.queued.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.all', + 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.all', + 'Queued Ingest: Use Queued Ingest for all bib record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.all', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.all', + 'Queued Ingest: Use Queued Ingest for all authority record ingest', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.insert.marc_edit_inline', TRUE, + oils_i18n_gettext( + 'ingest.queued.biblio.insert.marc_edit_inline', + 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.insert', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.insert', + 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.insert', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.insert', + 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.update.marc_edit_inline', TRUE, + oils_i18n_gettext( + 'ingest.queued.biblio.update.marc_edit_inline', + 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.update', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.update', + 'Queued Ingest: Use Queued Ingest for bib record ingest on update', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.update', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.update', + 'Queued Ingest: Use Queued Ingest for authority record ingest on update', + 'cgf', + 'label' + )),( + 'ingest.queued.biblio.delete', FALSE, + oils_i18n_gettext( + 'ingest.queued.biblio.delete', + 'Queued Ingest: Use Queued Ingest for bib record ingest on delete', + 'cgf', + 'label' + )),( + 'ingest.queued.authority.delete', FALSE, + oils_i18n_gettext( + 'ingest.queued.authority.delete', + 'Queued Ingest: Use Queued Ingest for authority record ingest on delete', + 'cgf', + 'label' + ) +); + +UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads'; + +CREATE TABLE action.ingest_queue ( + id SERIAL PRIMARY KEY, + created TIMESTAMPTZ NOT NULL DEFAULT NOW(), + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED, + start_time TIMESTAMPTZ, + end_time TIMESTAMPTZ, + threads INT, + why TEXT +); + +CREATE TABLE action.ingest_queue_entry ( + id BIGSERIAL PRIMARY KEY, + record BIGINT NOT NULL, -- points to a record id of the appropriate record_type + record_type TEXT NOT NULL, + action TEXT NOT NULL, + run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(), + state_data TEXT NOT NULL DEFAULT '', + queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED, + override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED, + ingest_time TIMESTAMPTZ, + fail_time TIMESTAMPTZ +); +CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL; +CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL; + +CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry ( + record_id BIGINT, + rtype TEXT DEFAULT 'biblio', + when_to_run TIMESTAMPTZ DEFAULT NOW(), + queue_id INT DEFAULT NULL, + ingest_action TEXT DEFAULT 'update', -- will be the most common? + old_state_data TEXT DEFAULT '' +) RETURNS BOOL AS $F$ +DECLARE + new_entry action.ingest_queue_entry%ROWTYPE; + prev_del_entry action.ingest_queue_entry%ROWTYPE; + diag_detail TEXT; + diag_context TEXT; +BEGIN + + IF ingest_action = 'delete' THEN + -- first see if there is an outstanding entry + SELECT * INTO prev_del_entry + FROM action.ingest_queue_entry + WHERE qe.record = record_id + AND qe.state_date = old_state_data + AND qe.record_type = rtype + AND qe.ingest_time IS NULL + AND qe.override_by IS NULL; + END IF; + + WITH existing_queue_entry_cte AS ( + SELECT queue_id AS queue, + rtype AS record_type, + record_id AS record, + qe.id AS override_by, + ingest_action AS action, + q.run_at AS run_at, + old_state_data AS state_data + FROM action.ingest_queue_entry qe + JOIN action.ingest_queue q ON (qe.queue = q.id) + WHERE qe.record = record_id + AND q.end_time IS NULL + AND qe.record_type = rtype + AND qe.state_data = old_state_data + AND qe.ingest_time IS NULL + AND qe.fail_time IS NULL + AND qe.override_by IS NULL + ), existing_nonqueue_entry_cte AS ( + SELECT queue_id AS queue, + rtype AS record_type, + record_id AS record, + qe.id AS override_by, + ingest_action AS action, + qe.run_at AS run_at, + old_state_data AS state_data + FROM action.ingest_queue_entry qe + WHERE qe.record = record_id + AND qe.queue IS NULL + AND qe.record_type = rtype + AND qe.state_data = old_state_data + AND qe.ingest_time IS NULL + AND qe.fail_time IS NULL + AND qe.override_by IS NULL + ), new_entry_cte AS ( + SELECT * FROM existing_queue_entry_cte + UNION ALL + SELECT * FROM existing_nonqueue_entry_cte + UNION ALL + SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data + ), insert_entry_cte AS ( + INSERT INTO action.ingest_queue_entry + (queue, record_type, record, override_by, action, run_at, state_data) + SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte + ORDER BY 4 NULLS LAST, 6 + LIMIT 1 + RETURNING * + ) SELECT * INTO new_entry FROM insert_entry_cte; + + IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry + UPDATE action.ingest_queue_entry + SET override_by = new_entry.id + WHERE id = prev_del_entry.id; + + UPDATE action.ingest_queue_entry + SET override_by = NULL + WHERE id = new_entry.id; + + ELSIF new_entry.override_by IS NOT NULL THEN + RETURN TRUE; -- already handled, don't notify + END IF; + + NOTIFY queued_ingest; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$F$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$ +DECLARE + ingest_success BOOL := NULL; + qe action.ingest_queue_entry%ROWTYPE; +BEGIN + + SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid; + IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN + RETURN TRUE; -- Already done + END IF; + + IF qe.action = 'delete' THEN + IF qe.record_type = 'biblio' THEN + SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record; + ELSIF qe.record_type = 'authority' THEN + SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record; + END IF; + ELSE + IF qe.record_type = 'biblio' THEN + IF qe.action = 'propagate' THEN + SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success; + ELSE + SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record; + END IF; + ELSIF qe.record_type = 'authority' THEN + SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record; + END IF; + END IF; + + IF NOT ingest_success THEN + UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled; + IF FOUND THEN + RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id; + ELSE + RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id; + END IF; + ELSE + UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id; + END IF; + + RETURN ingest_success; +END; +$func$ LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$ +BEGIN + IF NEW.ingest_time IS NOT NULL THEN + UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id; + END IF; + + RETURN NULL; +END; +$F$ LANGUAGE PLPGSQL; + +CREATE TRIGGER complete_duplicated_entries_trigger + AFTER UPDATE ON action.ingest_queue_entry + FOR EACH ROW WHEN (NEW.override_by IS NULL) + EXECUTE PROCEDURE action.complete_duplicated_entries(); + +CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$ + $_SHARED{"ingest_queue_id"} = $_[0]; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$ + return $_SHARED{"ingest_queue_id"}; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$ + delete($_SHARED{"ingest_queue_id"}); +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$ + $_SHARED{"ingest_queue_force"} = $_[0]; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$ + return $_SHARED{"ingest_queue_force"}; +$$ LANGUAGE plperlu; + +CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$ + delete($_SHARED{"ingest_queue_force"}); +$$ LANGUAGE plperlu; + +------------------ ingest functions ------------------ + +CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + tmp_bool BOOL; + diag_detail TEXT; + diag_context TEXT; +BEGIN + PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled; + tmp_bool := FOUND; + + PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool); + + IF NOT tmp_bool THEN + -- One needs to keep these around to support searches + -- with the #deleted modifier, so one should turn on the named + -- internal flag for that functionality. + DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id; + END IF; + + DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible + DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items + DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + skip_facet BOOL := FALSE; + skip_display BOOL := FALSE; + skip_browse BOOL := FALSE; + skip_search BOOL := FALSE; + skip_auth BOOL := FALSE; + skip_full BOOL := FALSE; + skip_attrs BOOL := FALSE; + skip_luri BOOL := FALSE; + skip_mrmap BOOL := FALSE; + only_attrs TEXT[] := NULL; + only_fields INT[] := '{}'::INT[]; + diag_detail TEXT; + diag_context TEXT; +BEGIN + + -- Record authority linking + SELECT extra LIKE '%skip_authority%' INTO skip_auth; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled; + IF NOT FOUND AND NOT skip_auth THEN + PERFORM biblio.map_authority_linking( bib.id, bib.marc ); + END IF; + + -- Flatten and insert the mfr data + SELECT extra LIKE '%skip_full_rec%' INTO skip_full; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled; + IF NOT FOUND AND NOT skip_full THEN + PERFORM metabib.reingest_metabib_full_rec(bib.id); + END IF; + + -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields + SELECT extra LIKE '%skip_attrs%' INTO skip_attrs; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled; + IF NOT FOUND AND NOT skip_attrs THEN + IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN + SELECT REGEXP_SPLIT_TO_ARRAY( + (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1], + '\s*,\s*' + ) INTO only_attrs; + END IF; + + PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only); + END IF; + + -- Gather and insert the field entry data + SELECT extra LIKE '%skip_facet%' INTO skip_facet; + SELECT extra LIKE '%skip_display%' INTO skip_display; + SELECT extra LIKE '%skip_browse%' INTO skip_browse; + SELECT extra LIKE '%skip_search%' INTO skip_search; + + IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN + SELECT REGEXP_SPLIT_TO_ARRAY( + (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1], + '\s*,\s*' + )::INT[] INTO only_fields; + END IF; + + IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN + PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields); + END IF; + + -- Located URI magic + SELECT extra LIKE '%skip_luri%' INTO skip_luri; + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled; + IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF; + + -- (re)map metarecord-bib linking + SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap; + IF insert_only THEN -- if not deleted and performing an insert, check for the flag + PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled; + IF NOT FOUND AND NOT skip_mrmap THEN + PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint ); + END IF; + ELSE -- we're doing an update, and we're not deleted, remap + PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled; + IF NOT FOUND AND NOT skip_mrmap THEN + PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint ); + END IF; + END IF; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + tmp_bool BOOL; + diag_detail TEXT; + diag_context TEXT; +BEGIN + DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible + DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records + DELETE FROM authority.simple_heading WHERE record = NEW.id; + -- Should remove matching $0 from controlled fields at the same time? + + -- XXX What do we about the actual linking subfields present in + -- authority records that target this one when this happens? + DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + + +CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$ +DECLARE + ashs authority.simple_heading%ROWTYPE; + mbe_row metabib.browse_entry%ROWTYPE; + mbe_id BIGINT; + ash_id BIGINT; + diag_detail TEXT; + diag_context TEXT; +BEGIN + + -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled; + + IF NOT FOUND AND auth.heading <> old_heading THEN + PERFORM authority.propagate_changes(auth.id); + END IF; + + IF NOT insert_only THEN + DELETE FROM authority.authority_linking WHERE source = auth.id; + DELETE FROM authority.simple_heading WHERE record = auth.id; + END IF; + + INSERT INTO authority.authority_linking (source, target, field) + SELECT source, target, field FROM authority.calculate_authority_linking( + auth.id, auth.control_set, auth.marc::XML + ); + + FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP + + INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus) + VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus); + ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS); + + SELECT INTO mbe_row * FROM metabib.browse_entry + WHERE value = ashs.value AND sort_value = ashs.sort_value; + + IF FOUND THEN + mbe_id := mbe_row.id; + ELSE + INSERT INTO metabib.browse_entry + ( value, sort_value ) VALUES + ( ashs.value, ashs.sort_value ); + + mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS); + END IF; + + INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id); + + END LOOP; + + -- Flatten and insert the afr data + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled; + IF NOT FOUND THEN + PERFORM authority.reingest_authority_full_rec(auth.id); + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled; + IF NOT FOUND THEN + PERFORM authority.reingest_authority_rec_descriptor(auth.id); + END IF; + END IF; + + RETURN TRUE; +EXCEPTION WHEN OTHERS THEN + GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL, + diag_context = PG_EXCEPTION_CONTEXT; + RAISE WARNING '%\n%', diag_detail, diag_context; + RETURN FALSE; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$ +DECLARE + old_state_data TEXT := ''; + new_action TEXT; + queuing_force TEXT; + queuing_flag_name TEXT; + queuing_flag BOOL := FALSE; + queuing_success BOOL := FALSE; + ingest_success BOOL := FALSE; + ingest_queue INT; +BEGIN + + -- Identify the ingest action type + IF TG_OP = 'UPDATE' THEN + + -- Gather type-specific data for later use + IF TG_TABLE_SCHEMA = 'authority' THEN + old_state_data = OLD.heading; + END IF; + + IF NOT OLD.deleted THEN -- maybe reingest? + IF NEW.deleted THEN + new_action = 'delete'; -- nope, delete + ELSE + new_action = 'update'; -- yes, update + END IF; + ELSIF NOT NEW.deleted THEN + new_action = 'insert'; -- revivify, AKA insert + ELSE + RETURN NEW; -- was and is still deleted, don't ingest + END IF; + ELSIF TG_OP = 'INSERT' THEN + new_action = 'insert'; -- brand new + ELSE + RETURN OLD; -- really deleting the record + END IF; + + queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action; + -- See if we should be queuing anything + SELECT enabled INTO queuing_flag + FROM config.internal_flag + WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name) + AND enabled + LIMIT 1; + + SELECT action.get_queued_ingest_force() INTO queuing_force; + IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN + queuing_flag := TRUE; + END IF; + + -- you (or part of authority propagation) can forcibly disable specific queuing actions + IF queuing_force = queuing_flag_name||'.disabled' THEN + queuing_flag := FALSE; + END IF; + + -- And if we should be queuing ... + IF queuing_flag THEN + ingest_queue := action.get_ingest_queue(); + + -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)... + IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest? + + PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled; + + -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed + IF NOT FOUND AND OLD.marc = NEW.marc THEN + RETURN NEW; + END IF; + END IF; + + -- Otherwise, attempt to enqueue + SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success; + END IF; + + -- If queuing was not requested, or failed for some reason, do it live. + IF NOT queuing_success THEN + IF queuing_flag THEN + RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id; + END IF; + + IF new_action = 'delete' THEN + IF TG_TABLE_SCHEMA = 'biblio' THEN + SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success; + ELSIF TG_TABLE_SCHEMA = 'authority' THEN + SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success; + END IF; + ELSE + IF TG_TABLE_SCHEMA = 'biblio' THEN + SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success; + ELSIF TG_TABLE_SCHEMA = 'authority' THEN + SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success; + END IF; + END IF; + + IF NOT ingest_success THEN + PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled; + IF FOUND THEN + RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id; + ELSE + RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id; + END IF; + END IF; + END IF; + + RETURN NEW; +END; +$func$ LANGUAGE PLPGSQL; + +DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry; +DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry; + +CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete (); +CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete (); + +CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$ +DECLARE + transformed_xml TEXT; + rmarc TEXT := prmarc; + tmp_val TEXT; + prev_xfrm TEXT; + normalizer RECORD; + xfrm config.xml_transform%ROWTYPE; + attr_vector INT[] := '{}'::INT[]; + attr_vector_tmp INT[]; + attr_list TEXT[] := pattr_list; + attr_value TEXT[]; + norm_attr_value TEXT[]; + tmp_xml TEXT; + tmp_array TEXT[]; + attr_def config.record_attr_definition%ROWTYPE; + ccvm_row config.coded_value_map%ROWTYPE; + jump_past BOOL; +BEGIN + + IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete + SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition + WHERE ( + tag IS NOT NULL OR + fixed_field IS NOT NULL OR + xpath IS NOT NULL OR + phys_char_sf IS NOT NULL OR + composite + ) AND ( + filter OR sorter + ); + END IF; + + IF rmarc IS NULL THEN + SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid; + END IF; + + FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP + + jump_past := FALSE; -- This gets set when we are non-multi and have found something + attr_value := '{}'::TEXT[]; + norm_attr_value := '{}'::TEXT[]; + attr_vector_tmp := '{}'::INT[]; + + SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; + + IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection + SELECT ARRAY_AGG(value) INTO attr_value + FROM (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x + WHERE record = rid + AND tag LIKE attr_def.tag + AND CASE + WHEN attr_def.sf_list IS NOT NULL + THEN POSITION(subfield IN attr_def.sf_list) > 0 + ELSE TRUE + END + GROUP BY tag + ORDER BY tag; + + IF NOT attr_def.multi THEN + attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))]; + jump_past := TRUE; + END IF; + END IF; + + IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field + attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field); + + IF NOT attr_def.multi THEN + attr_value := ARRAY[attr_value[1]]; + jump_past := TRUE; + END IF; + END IF; + + IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression + + SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format; + + -- See if we can skip the XSLT ... it's expensive + IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN + -- Can't skip the transform + IF xfrm.xslt <> '---' THEN + transformed_xml := oils_xslt_process(rmarc,xfrm.xslt); + ELSE + transformed_xml := rmarc; + END IF; + + prev_xfrm := xfrm.name; + END IF; + + IF xfrm.name IS NULL THEN + -- just grab the marcxml (empty) transform + SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1; + prev_xfrm := xfrm.name; + END IF; + + FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP + tmp_val := oils_xpath_string( + '//*', + tmp_xml, + COALESCE(attr_def.joiner,' '), + ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]] + ); + IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN + attr_value := attr_value || tmp_val; + EXIT WHEN NOT attr_def.multi; + END IF; + END LOOP; + END IF; + + IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map + SELECT ARRAY_AGG(m.value) INTO tmp_array + FROM vandelay.marc21_physical_characteristics(rmarc) v + LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value) + WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '') + AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) ); + + attr_value := attr_value || tmp_array; + + IF NOT attr_def.multi THEN + attr_value := ARRAY[attr_value[1]]; + END IF; + + END IF; + + -- apply index normalizers to attr_value + FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP + FOR normalizer IN + SELECT n.func AS func, + n.param_count AS param_count, + m.params AS params + FROM config.index_normalizer n + JOIN config.record_attr_index_norm_map m ON (m.norm = n.id) + WHERE attr = attr_def.name + ORDER BY m.pos LOOP + EXECUTE 'SELECT ' || normalizer.func || '(' || + COALESCE( quote_literal( tmp_val ), 'NULL' ) || + CASE + WHEN normalizer.param_count > 0 + THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'') + ELSE '' + END || + ')' INTO tmp_val; + + END LOOP; + IF tmp_val IS NOT NULL AND tmp_val <> '' THEN + -- note that a string that contains only blanks + -- is a valid value for some attributes + norm_attr_value := norm_attr_value || tmp_val; + END IF; + END LOOP; + + IF attr_def.filter THEN + -- Create unknown uncontrolled values and find the IDs of the values + IF ccvm_row.id IS NULL THEN + FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP + IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN + BEGIN -- use subtransaction to isolate unique constraint violations + INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val ); + EXCEPTION WHEN unique_violation THEN END; + END IF; + END LOOP; + + SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value ); + ELSE + SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value ); + END IF; + + -- Add the new value to the vector + attr_vector := attr_vector || attr_vector_tmp; + END IF; + + IF attr_def.sorter THEN + DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name; + IF norm_attr_value[1] IS NOT NULL THEN + INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]); + END IF; + END IF; + + END LOOP; + +/* We may need to rewrite the vlist to contain + the intersection of new values for requested + attrs and old values for ignored attrs. To + do this, we take the old attr vlist and + subtract any values that are valid for the + requested attrs, and then add back the new + set of attr values. */ + + IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN + SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid; + SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list); + attr_vector := attr_vector || attr_vector_tmp; + END IF; + + -- On to composite attributes, now that the record attrs have been pulled. Processed in name order, so later composite + -- attributes can depend on earlier ones. + PERFORM metabib.compile_composite_attr_cache_init(); + FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP + + FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP + + tmp_val := metabib.compile_composite_attr( ccvm_row.id ); + CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do + + IF attr_def.filter THEN + IF attr_vector @@ tmp_val::query_int THEN + attr_vector = attr_vector + intset(ccvm_row.id); + EXIT WHEN NOT attr_def.multi; + END IF; + END IF; + + IF attr_def.sorter THEN + IF attr_vector @@ tmp_val THEN + DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name; + INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code); + END IF; + END IF; + + END LOOP; + + END LOOP; + + IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN + INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector) + ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist; + END IF; + +END; + +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.propagate_changes + (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$ +DECLARE + queuing_success BOOL := FALSE; +BEGIN + + PERFORM 1 FROM config.global_flag + WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate') + AND enabled; + + IF FOUND THEN + -- XXX enqueue special 'propagate' bib action + SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success; + + IF queuing_success THEN + RETURN aid; + END IF; + END IF; + + PERFORM authority.apply_propagate_changes(aid, bid); + RETURN aid; +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION authority.apply_propagate_changes + (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$ +DECLARE + bib_forced BOOL := FALSE; + bib_rec biblio.record_entry%ROWTYPE; + new_marc TEXT; +BEGIN + + SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid; + + new_marc := vandelay.merge_record_xml( + bib_rec.marc, authority.generate_overlay_template(aid)); + + IF new_marc = bib_rec.marc THEN + -- Authority record change had no impact on this bib record. + -- Nothing left to do. + RETURN aid; + END IF; + + PERFORM 1 FROM config.global_flag + WHERE name = 'ingest.disable_authority_auto_update_bib_meta' + AND enabled; + + IF NOT FOUND THEN + -- update the bib record editor and edit_date + bib_rec.editor := ( + SELECT editor FROM authority.record_entry WHERE id = aid); + bib_rec.edit_date = NOW(); + END IF; + + PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled'); + + UPDATE biblio.record_entry SET + marc = new_marc, + editor = bib_rec.editor, + edit_date = bib_rec.edit_date + WHERE id = bid; + + PERFORM action.clear_queued_ingest_force(); + + RETURN aid; + +END; +$func$ LANGUAGE PLPGSQL; + +CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries( + bib_id BIGINT, + skip_facet BOOL DEFAULT FALSE, + skip_display BOOL DEFAULT FALSE, + skip_browse BOOL DEFAULT FALSE, + skip_search BOOL DEFAULT FALSE, + only_fields INT[] DEFAULT '{}'::INT[] +) RETURNS VOID AS $func$ +DECLARE + fclass RECORD; + ind_data metabib.field_entry_template%ROWTYPE; + mbe_row metabib.browse_entry%ROWTYPE; + mbe_id BIGINT; + b_skip_facet BOOL; + b_skip_display BOOL; + b_skip_browse BOOL; + b_skip_search BOOL; + value_prepped TEXT; + field_list INT[] := only_fields; + field_types TEXT[] := '{}'::TEXT[]; +BEGIN + + IF field_list = '{}'::INT[] THEN + SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field; + END IF; + + SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet; + SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display; + SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse; + SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search; + + IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF; + IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF; + IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF; + IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF; + + PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled; + IF NOT FOUND THEN + IF NOT b_skip_search THEN + FOR fclass IN SELECT * FROM config.metabib_class LOOP + EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list; + END LOOP; + END IF; + IF NOT b_skip_facet THEN + DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list); + END IF; + IF NOT b_skip_display THEN + DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list); + END IF; + IF NOT b_skip_browse THEN + DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list); + END IF; + END IF; + + FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP + + -- don't store what has been normalized away + CONTINUE WHEN ind_data.value IS NULL; + + IF ind_data.field < 0 THEN + ind_data.field = -1 * ind_data.field; + END IF; + + IF ind_data.facet_field AND NOT b_skip_facet THEN + INSERT INTO metabib.facet_entry (field, source, value) + VALUES (ind_data.field, ind_data.source, ind_data.value); + END IF; + + IF ind_data.display_field AND NOT b_skip_display THEN + INSERT INTO metabib.display_entry (field, source, value) + VALUES (ind_data.field, ind_data.source, ind_data.value); + END IF; + + + IF ind_data.browse_field AND NOT b_skip_browse THEN + -- A caveat about this SELECT: this should take care of replacing + -- old mbe rows when data changes, but not if normalization (by + -- which I mean specifically the output of + -- evergreen.oils_tsearch2()) changes. It may or may not be + -- expensive to add a comparison of index_vector to index_vector + -- to the WHERE clause below. + + CONTINUE WHEN ind_data.sort_value IS NULL; + + value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field); + IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that + SELECT INTO mbe_row * FROM metabib.browse_entry + WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value + ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess + END IF; + + IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use + mbe_id := mbe_row.id; + ELSE -- otherwise, an UPSERT-protected variant + INSERT INTO metabib.browse_entry + ( value, sort_value ) VALUES + ( value_prepped, ind_data.sort_value ) + ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id + RETURNING id INTO mbe_id; + END IF; + + INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority) + VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority); + END IF; + + IF ind_data.search_field AND NOT b_skip_search THEN + -- Avoid inserting duplicate rows + EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class || + '_field_entry WHERE field = $1 AND source = $2 AND value = $3' + INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value; + -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id; + IF mbe_id IS NULL THEN + EXECUTE $$ + INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value) + VALUES ($$ || + quote_literal(ind_data.field) || $$, $$ || + quote_literal(ind_data.source) || $$, $$ || + quote_literal(ind_data.value) || + $$);$$; + END IF; + END IF; + + END LOOP; + + IF NOT b_skip_search THEN + PERFORM metabib.update_combined_index_vectors(bib_id); + PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled; + IF NOT FOUND THEN + PERFORM search.symspell_dictionary_reify(); + END IF; + END IF; + + RETURN; +END; +$func$ LANGUAGE PLPGSQL; + +COMMIT; + diff --git a/Open-ILS/src/support-scripts/ingest_ctl b/Open-ILS/src/support-scripts/ingest_ctl new file mode 100755 index 0000000000..66311b2134 --- /dev/null +++ b/Open-ILS/src/support-scripts/ingest_ctl @@ -0,0 +1,1183 @@ +#!/usr/bin/perl +# --------------------------------------------------------------- +# Copyright © 2022 Equinox Open Library Initiative, INC. +# Mike Rylander +# +# This program is free software; you can redistribute it and/or modify +# it under the terms of the GNU General Public License as published by +# the Free Software Foundation; either version 2 of the License, or +# (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# --------------------------------------------------------------- + +use strict; +use warnings; +use DBI; +use Getopt::Long; +use Time::HiRes qw/usleep time/; +use IO::Handle; +use POSIX ":sys_wait_h"; +use Socket; +use OpenSRF::Utils ':daemon'; + +my $raise_db_error = 0; +# Globals for the command line options: + +my $opt_lockfile = '/tmp/queued-ingest-coordinator-LOCK'; +my $opt_logfile = '/tmp/queued-ingest-coordinator-LOG'; +my $daemon = 0; # do we go into the background? +my $stop = 0; # stop a running coordinator, if the lock file is there +my $chatty = 0; # do we yell into the void? +my $opt_max_child; # max number of parallel worker processes +my $max_child = 20; # max number of parallel worker processes +my $max_child_force; # max number of parallel worker processes + +# for enqueuing mode +my $start_id; # start processing at this record ID. +my $end_id; # stop processing when this record ID is reached. +my $opt_pipe; # Read record ids from STDIN. +my $stats_only; # Just report on QI processing stats/progress +my $totals_only; # Only give top-line aggregate stats, no per day breakdown +my $stats_queues; # Provide a list of queue-specfic stats +my $stats_since='001-01-01';# Default "since" time for stats +my $queue; # queue id, either pre-existing or created based on params +my $queue_type = 'biblio'; # type of records for batch enqueuing +my $queue_why; # description for this reingest queue +my $queue_action = 'update';# what action is happening to the records: update, insert, delete, propagate +my $queue_state_data = ''; # State data required for queue entry processing +my $queue_owner; # Owner of the queue +my $queue_run_at; # Owner of the queue +my $queue_threads; # parallelism for this queue (capped at max_child) +my $skip_browse = 0; # Skip the browse reingest. +my $skip_attrs = 0; # Skip the record attributes reingest. +my $skip_search = 0; # Skip the search reingest. +my $skip_facets = 0; # Skip the facets reingest. +my $skip_display = 0; # Skip the display reingest. +my $skip_full_rec = 0; # Skip the full_rec reingest. +my $skip_authority = 0; # Skip the authority reingest. +my $skip_luri = 0; # Skip the luri reingest. +my $skip_mrmap = 0; # Skip the metarecord remapping. +my $record_attrs = []; # Skip the metarecord remapping. +my $metabib_fields = []; # Skip the metarecord remapping. +my $input_records = []; # Records supplied via CLI switch. +my $pingest = ''; # Special "pingest" flag, supplying an EG user name as queue owner. + +my $help; # show help text + +# Database connection options with defaults: +my $db_user = $ENV{PGUSER} || 'evergreen'; +my $db_host = $ENV{PGHOST} || 'localhost'; +my $db_db = $ENV{PGDATABASE} || 'evergreen'; +my $db_pw = $ENV{PGPASSWORD} || 'evergreen'; +my $db_port = $ENV{PGPORT} || 5432; + +GetOptions( + 'lock-file=s' => \$opt_lockfile, + 'log-file=s' => \$opt_logfile, + 'dbuser=s' => \$db_user, + 'dbhost=s' => \$db_host, + 'dbname=s' => \$db_db, + 'dbpw=s' => \$db_pw, + 'dbport=i' => \$db_port, + 'max-child=i' => \$opt_max_child, + 'max-child-force' => \$max_child_force, + 'stats' => \$stats_only, + 'totals-only' => \$totals_only, + 'queue-stats' => \$stats_queues, + 'since=s' => \$stats_since, + 'queue=i' => \$queue, + 'queue-action=s' => \$queue_action, + 'queue-name=s' => \$queue_why, + 'queue-type=s' => \$queue_type, + 'queue-owner=s' => \$queue_owner, + 'queue-run-at=s' => \$queue_run_at, + 'queue-threads=i' => \$queue_threads, + 'queue-state-data=s'=> \$queue_state_data, + 'skip-browse' => \$skip_browse, + 'skip-attrs' => \$skip_attrs, + 'skip-search' => \$skip_search, + 'skip-facets' => \$skip_facets, + 'skip-display' => \$skip_display, + 'skip-full_rec' => \$skip_full_rec, + 'skip-authority' => \$skip_authority, + 'skip-luri' => \$skip_luri, + 'skip-mr-map' => \$skip_mrmap, + 'attr=s@' => \$record_attrs, + 'field=s@' => \$metabib_fields, + 'record=s@' => \$input_records, + 'start-id=i' => \$start_id, + 'end-id=i' => \$end_id, + 'pipe' => \$opt_pipe, + 'pingest=s' => \$pingest, + 'coordinator' => \$daemon, + 'stop' => \$stop, + 'chatty' => \$chatty, + 'help' => \$help +) or help(); + +sub help { + print <connect( + $dsn, $db_user, $db_pw, + { AutoCommit => 1, + pg_expand_array => 0, + pg_enable_utf8 => 1, + pg_bool_tf => 0, + RaiseError => $raise_db_error + } +) || die "Could not connect to the database\n"; + +my $configured_max_child = $main_dbh->selectcol_arrayref( + "SELECT value FROM config.global_flag WHERE name = 'ingest.queued.max_threads'" +)->[0] || 20; +$max_child = $configured_max_child if (!$opt_max_child); + + +if (defined($opt_max_child) && $opt_max_child > 20 && !$max_child_force) { + warn('Max Child > 20 ... no, sorry'); + help(); +} + +if ($opt_max_child) { + $max_child = $opt_max_child; +} + +if ($max_child <= 0) { + $max_child = 20; +} + +if ($opt_pipe && ($start_id || $end_id)) { + warn('Mutually exclusive options: either pipe or start/end range'); + help(); +} + +if ($daemon && ($start_id || $end_id || $opt_pipe)) { + warn('Mutually exclusive options: cannot start or stop the Coordinator in Enqueuing mode'); + help(); +} + +if (!$daemon && $stop) { + warn('Option --stop can only be used with the --coordinator option'); + help(); +} + +if ($daemon && $queue) { + warn('Mutually exclusive options: cannot start or stop the Coordinator in one-shot processing mode'); + help(); +} + +if ($queue_type && !(grep {$_ eq $queue_type} qw/biblio authority/)) { + warn('Valid queue types are biblio and authority'); + help(); +} + +if (!(grep {$_ eq $queue_action} qw/insert update delete propagate/)) { + warn('Valid queue actions are: insert, update, delete, propagate'); + help(); +} + +if ($queue && ($queue_owner || $queue_why || $queue_threads || $queue_run_at)) { + warn('Mutually exclusive options: specify a queue id OR queue creation values'); + help(); +} + + +if ($daemon) { # background mode, we need a lockfile; + + if ($stop) { + die "Lockfile $opt_lockfile does not exist, is the coordinator running?\n" unless (-e $opt_lockfile); + + open(F, "<$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for reading, wrong user?\n"; + my $old_pid = ; + close F; + + if ($old_pid) { + if (kill(0,$old_pid)) { + my $dead_count = kill(9,$old_pid); + if ($dead_count) { + warn "Coordinator process terminated, removing lock file $opt_lockfile\n"; + unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; + } else { + die "Could not kill coordinator process $old_pid\n"; + } + } else { + warn "Coordinator process not running, removing stale lock file\n"; + unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; + } + } else { + warn "Coordinator lock file empty, removing lock file $opt_lockfile\n"; + unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n"; + } + + exit; + } + + # check the lockfile + die "I'm already running with lock-file $opt_lockfile\n" if (-e $opt_lockfile); + + $main_dbh->disconnect; + + daemonize("Queued Ingest Coordinator") if ($daemon); + + # set the lockfile + open(F, ">$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for writing\n"; + print F $$; + close F; + + open(STDERR, ">>$opt_logfile") if ($opt_logfile); +} + +my $start_time = time; +my %stats; + +sub reset_stats { + %stats = ( + total => { + }, biblio => { + insert => {}, + update => {}, + delete => {}, + propagate => {} + }, authority => { + insert => {}, + update => {}, + delete => {} + }, seconds => {} + ); +} + +reset_stats(); + +my %processors; +my %queues_in_progress; + +my $db_connections_in_use = 0; + +if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode + + if ($pingest) { # special mode that sets up two queues that can run in parallel + + my $no_browse = $skip_browse; + my $orig_stat_data = $queue_state_data; + + # set up the first queue + $queue = undef; + $queue_threads //= 4; + $queue_type = 'biblio'; + $queue_action = 'update'; + $queue_why = 'pingest - fields and attributes queue'; + $queue_owner = $pingest; + + # for pingest mode, always skip authority and luri, and skip browse in the first queue + $skip_browse = 1; + $skip_authority = 1; + $skip_luri = 1; + + my $record_list = enqueue_input(); + report_stats('Enqueuing '.$queue_why); + + if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest + # set up the second queue + $queue = undef; + $queue_threads //= 4; + $queue_why = 'pingest - browse queue'; + $queue_state_data = $orig_stat_data; + + $skip_browse = 0; + $skip_attrs = 1; + $skip_search = 1; + $skip_facets = 1; + $skip_display = 1; + $skip_full_rec = 1; + $skip_mrmap = 1; + + reset_stats(); + + enqueue_input($record_list); + report_stats('Enqueuing '.$queue_why); + } + + } else { # just a regular, user-defined QI request + enqueue_input(); + report_stats('Enqueuing'); + } + + +} elsif ($queue && !$stats_only) { # single queue processing mode + + my $q = gather_one_queue($queue); + process_one_queue($q->{id}, $max_child); + complete_outstanding_queue($q->{id}); + report_stats('Queue Processing'); + +} elsif (!$daemon && !$stats_only) { # special case: foreground single process, end after + + my @dbhs = create_processor_dbhs($max_child); + my $clear = 0; + + my $new_queues = gather_outstanding_queues(); # array ref of queues + for my $q (@$new_queues) { + process_one_queue($q->{id}, $max_child, \@dbhs); + complete_outstanding_queue($q->{id}); + report_stats('Queue and Entry Processing', $clear++); + } + + my $new_entries = gather_outstanding_nonqueue_entries('NULL'); # array ref of queue entries + my @eids = map { $$_{id} } @$new_entries; + while (my @current_subset = splice(@eids, 0, 10 * $max_child)) { + process_input_list(\@current_subset, \@dbhs); + report_stats('Queue and Entry Processing', $clear++); + } + +} elsif($stats_only) { + my @output; + + my $q_query = <<" SQL"; +SELECT run_at::DATE AS scheduled_date, + SUM((start_time IS NULL)::INT) AS pending, + SUM((start_time IS NOT NULL AND end_time IS NULL)::INT) AS ongoing, + SUM((end_time IS NOT NULL)::INT) AS complete, + COUNT(*) AS total + FROM action.ingest_queue + WHERE run_at >= ? + GROUP BY ROLLUP (1) + ORDER BY 1 + SQL + + if (!$queue) { # all queues in the time range + my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $stats_since); + if (@$qstat_rows > 1) { + $qstat_rows = [ $$qstat_rows[-1] ] if ($totals_only); + + push @output, "Named Queue processing stats"; + push @output, "============================",""; + for my $row ( @$qstat_rows ) { + push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date}); + push @output, " Totals"," ------" unless ($$row{scheduled_date}); + + push @output, " Pending: $$row{pending}"; + push @output, " Ongoing: $$row{ongoing}"; + push @output, " Complete: $$row{complete}"; + + push @output,""; + push @output,'-'x50; + push @output,""; + } + + push @output,""; + } + } + + if ($stats_queues || $queue) { + $q_query = <<" SQL"; +SELECT q.*, + u.usrname, + SUM((e.ingest_time IS NULL AND e.override_by IS NULL)::INT) AS pending, + SUM((e.override_by IS NOT NULL)::INT) AS overridden, + SUM((e.ingest_time IS NOT NULL)::INT) AS complete, + SUM((e.fail_time IS NOT NULL)::INT) AS failed, + COUNT(e.id) AS events + FROM action.ingest_queue q + JOIN actor.usr u ON (q.who=u.id) + LEFT JOIN action.ingest_queue_entry e ON (e.queue=q.id) + WHERE q.XX ? + GROUP BY 1,2,3,4,5,6,7,8,9 + ORDER BY q.run_at, q.id + SQL + + my $param = $stats_since; + if ($queue) { + $param = $queue; + $q_query =~ s/XX/id =/; + } else { + $q_query =~ s/XX/run_at >=/; + } + + my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $param); + if (@$qstat_rows) { + + push @output, "Named Queue details"; + push @output, "===================",""; + for my $row ( @$qstat_rows ) { + push @output, "* Queue id: $$row{id} | Threads: $$row{threads} | Owner: $$row{usrname}"; + push @output, "* Reason: $$row{why}"; + push @output, "* Create time: $$row{created}"; + push @output, "* Scheduled start time: $$row{run_at}"; + push @output, " - Started: $$row{start_time}"; + push @output, " - Ended : $$row{end_time}",""; + push @output, " Pending: $$row{pending}"; + push @output, " Overridden: $$row{overridden}"; + push @output, " Complete: $$row{complete}"; + push @output, " Failed: $$row{failed}"; + push @output, " Total: $$row{events}"; + push @output, "Percent complete: " . sprintf('%.2f',(($$row{complete} + 1.0) / ($$row{events} - $$row{failed} + 1.0)) * 100.0); + + push @output,""; + push @output,'-'x50; + push @output,""; + } + + push @output,""; + } + } + + if (!$queue) { + my $e_query = <<" SQL"; +SELECT run_at::DATE AS scheduled_date, + record_type, + action, + SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NOT NULL)::INT) AS pending_with_queue, + SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NULL)::INT) AS pending_without_queue, + SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL)::INT) AS pending, + SUM((ingest_time IS NOT NULL AND override_by IS NULL)::INT) AS processed, + SUM((ingest_time IS NOT NULL AND override_by IS NOT NULL)::INT) AS overridden, + SUM((fail_time IS NOT NULL)::INT) AS failed, + SUM((ingest_time IS NOT NULL)::INT) AS complete, + COUNT(*) AS total + FROM action.ingest_queue_entry + WHERE run_at >= ? + GROUP BY 2,3, ROLLUP (1) + ORDER BY 1,2,3 + SQL + + my $estat_rows = $main_dbh->selectall_arrayref($e_query, { Slice => {} }, $stats_since); + if (@$estat_rows > 1) { + $estat_rows = [ grep { !defined($$_{scheduled_date}) } @$estat_rows ] if ($totals_only); + + push @output, "Record processing stats"; + push @output, "============================",""; + for my $row ( @$estat_rows ) { + push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date}); + push @output, " Record Type: $$row{record_type}; Action: $$row{action}"; + push @output, " Totals"," ------" unless ($$row{scheduled_date}); + + push @output, " Pending: $$row{pending}"; + push @output, " ... $$row{pending_with_queue} with a named queue" if $$row{pending}; + push @output, " ... $$row{pending_without_queue} without a named queue" if $$row{pending}; + push @output, " Processed: $$row{processed}"; + push @output, " Overridden: $$row{overridden}"; + push @output, " Complete: $$row{complete}"; + push @output, " Failed: $$row{failed}"; + push @output, " Total: $$row{total}"; + + push @output,""; + push @output,'-'x50; + push @output,""; + } + } + + push @output,""; + } + + print join("\n", @output); + +} elsif($daemon) { # background processing + + $SIG{CHLD} = \&REAPER; + + my %queues_in_waiting; + my @orphan_entries; + + my $pid = spawn_kid(); + my $orphan_processor = $processors{$pid}; + $orphan_processor->{state} = 0; # will be decremented (made true) once used + print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism + + refresh_main_dbh()->do('LISTEN queued_ingest'); + + my $loops = 1; + while ($loops) { + + warn_if_chatty("starting processing loop"); + + my @complete_processors = grep { $_->{state} == 3 } values %processors; + warn_if_chatty("".scalar(@complete_processors)." complete for cleanup"); + for my $p (@complete_processors) { + + if (my $dead_pid = $$p{DEAD}) { + warn_if_chatty("processor $dead_pid already cleaned up, final flush"); + delete $processors{$dead_pid}{$_} for keys %{$processors{$dead_pid}}; + delete $processors{$dead_pid}; + next; + } + + $$p{DEAD} = $$p{pid}; + warn_if_chatty("phase 1 cleanup of processor $$p{DEAD}"); + $db_connections_in_use -= $$p{threads}; + + if ($$p{queues}) { + for my $q (keys %{$$p{queues}}) { + warn_if_chatty("processor $$p{pid} finished processing queue $q"); + delete $queues_in_progress{$q}{processors}{$$p{pid}}; + if (!scalar(keys(%{$queues_in_progress{$q}{processors}}))) { # that was the last processor for the queue + warn_if_chatty("queue $q complete"); + complete_outstanding_queue($q); + delete $queues_in_progress{$q}; + delete $$p{queues}{$q}; + } + } + } + } + + warn_if_chatty("".scalar(keys(%queues_in_progress))." queues in progress"); + warn_if_chatty("checking for new queues"); + + my $new_queues = gather_outstanding_queues(); # array ref of queues + if (@$new_queues) { + warn_if_chatty("".scalar(@$new_queues)." new queues"); + $queues_in_waiting{$_->{id}} = $_ for @$new_queues; + + my @ready_kids = grep { $_->{state} == 1 } values %processors; + if (my $needed = scalar(@$new_queues) - scalar(@ready_kids)) { + warn_if_chatty("spawning $needed new processors"); + spawn_kid() while ($needed--); + } + + @ready_kids = grep { $_->{state} == 1 } values %processors; + + my @sorted_queues = sort {$$a{run_at} cmp $$b{run_at}} values %queues_in_waiting; + for my $q (@sorted_queues) { + + my $local_max = $max_child - $db_connections_in_use; + if ($local_max > 0) { # we have connections available + my $ready = shift @ready_kids; + next unless $ready; + + # cap at unused max if more threads were requested + $$q{threads} = $local_max if ($$q{threads} > $local_max); + $ready->{threads} = $$q{threads}; + $ready->{state} = 2; # running now + $ready->{queues}{$$q{id}} = 1; + + $queues_in_progress{$$q{id}} = delete $queues_in_waiting{$$q{id}}; + $queues_in_progress{$$q{id}}{processors}{$ready->{pid}} = 1; + + $db_connections_in_use += $$q{threads}; + print {$ready->{pipe}} "n:Queue $$q{id}\n"; + print {$ready->{pipe}} "p:$$q{threads}\n"; + print {$ready->{pipe}} "q:$$q{id}\n"; + shutdown($ready->{pipe},2); + } else { + warn_if_chatty("no db connections available, we'll wait"); + } + } + } + + warn_if_chatty("checking orphan processor"); + + if ($orphan_processor->{state} == 3) { # replace it + + warn_if_chatty("replacing orphan processor, it is finished"); + + $pid = spawn_kid(); + $orphan_processor = $processors{$pid}; + $orphan_processor->{state} = 0; # will be decremented (made true) once used + print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism + } + + warn_if_chatty("gathering orphan entries"); + + my $new_entries = gather_outstanding_nonqueue_entries(10 * $max_child); # array ref of queue entries + if ($new_entries and @$new_entries) { + warn_if_chatty("".scalar(@$new_entries)." new entries"); + if ($orphan_processor->{state}) { # already processing some entries + warn_if_chatty("orphan processor is busy, wait and loop"); + } else { + my $local_max = $max_child - $db_connections_in_use; + if ($local_max > 0) { + $orphan_processor->{state}--; + $db_connections_in_use += $local_max; + $orphan_processor->{threads} = $local_max; + $orphan_processor->{entries} = [ map { $$_{id} } @$new_entries ]; + print {$orphan_processor->{pipe}} "p:$local_max\n"; # set parallelism + print {$orphan_processor->{pipe}} "e:$_\n" for (@{$orphan_processor->{entries}}); + print {$orphan_processor->{pipe}} "!!\n"; + shutdown($orphan_processor->{pipe},2); + } else { + warn_if_chatty("no db connections available for the orphan processor, wait and loop"); + } + } + } + + my $inner_loops = 0; + warn_if_chatty("waiting for LISTEN notifications"); + until (defined(refresh_main_dbh()->pg_notifies)) { + sleep(1); + $inner_loops++; + if ($inner_loops >= 10) { + last; # loop after 10s at most + } + } + + $loops++; + } +} + +exit; + +sub refresh_main_dbh { + unless ($main_dbh->ping) { + $main_dbh = DBI->connect( + $dsn, $db_user, $db_pw, + { AutoCommit => 1, + pg_expand_array => 0, + pg_enable_utf8 => 1, + pg_bool_tf => 0, + RaiseError => $raise_db_error + } + ) || die "Could not connect to the database\n"; + $main_dbh->do('LISTEN queued_ingest') if ($daemon); + } + return $main_dbh; +} + +sub REAPER { + local $!; # don't let waitpid() overwrite current error + warn_if_chatty("REAPER called"); + while ((my $pid = waitpid(-1, WNOHANG)) > 0) { + warn_if_chatty("reaping kid $pid"); + $processors{$pid}{state} = 3; + } + $SIG{CHLD} = \&REAPER; +} + +sub create_processor_dbhs { + my $count = shift; + + my @dbhs; + while (scalar(@dbhs) < $count) { + push @dbhs, DBI->connect( + $dsn, $db_user, $db_pw, + { AutoCommit => 1, + pg_expand_array => 0, + pg_enable_utf8 => 1, + pg_bool_tf => 0, + RaiseError => $raise_db_error + } + ); + } + + return @dbhs; +} + +sub complete_outstanding_queue { + my $qid = shift; + + return refresh_main_dbh()->do( + 'UPDATE action.ingest_queue SET end_time = NOW()'. + ' WHERE id=? RETURNING *', + {}, $qid + ); +} + +sub gather_one_queue { + my $qid = shift; + my $q = refresh_main_dbh()->selectrow_hashref( + 'UPDATE action.ingest_queue SET start_time = NOW()'. + ' WHERE id = ? RETURNING *', + {},$qid + ); + + return $q; +} + +sub gather_outstanding_queues { + my $qs = refresh_main_dbh()->selectall_hashref( + 'UPDATE action.ingest_queue SET start_time = NOW()'. + ' WHERE start_time IS NULL AND run_at <= NOW()'. + ' RETURNING *', + 'id' + ); + + for my $q (values %$qs) { + $q->{threads} ||= 1; + } + + return [values %$qs]; +} + +sub gather_outstanding_nonqueue_entries { + my $limit = shift; + return refresh_main_dbh()->selectall_arrayref( + "SELECT * FROM action.ingest_queue_entry". + " WHERE queue IS NULL". + " AND run_at <= NOW()". + " AND override_by IS NULL". + " AND ingest_time IS NULL". + " AND fail_time IS NULL". + " ORDER BY run_at, id". + " LIMIT $limit", + { Slice => {} } + ); +} + +sub spawn_kid { + my $parent; + my $child; + + socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC) + || die "socketpair: $!"; + + $parent->autoflush(1); + $child->autoflush(1); + + my $kid_pid = fork() // + die "Could not fork worker process"; + + if ($kid_pid) { + close($parent); + $processors{$kid_pid} = { + pipe => $child, + pid => $kid_pid, + state => 1 + }; + } else { + set_psname("Queued Ingest worker - waiting"); + open(STDERR, ">>$opt_logfile") if ($opt_logfile); + $SIG{CHLD} = 'IGNORE'; + close($child); + process_commands_from_parent($parent); + warn_if_chatty("finished processing commands from parent, exiting"); + report_stats("Entry Processing - worker $$",0,1); + exit; + } + + return $kid_pid; +} + +sub warn_if_chatty { + return unless $chatty; + + my $msg = shift; + my $time = time; + warn "$time [$$] $msg\n"; +} + +sub process_commands_from_parent { + my $ppipe = shift; + + my $stop_after = 0; + while (1) { + my @input; + my $dbh_count = 1; + my $cont = 0; + while (<$ppipe>) { + $cont = 1; + chomp; + if (/^q:(\d+)$/) { # we were passed a queue id + my $queue = $1; + $stop_after = 1; + warn_if_chatty("processing queue $queue, should exit after"); + process_one_queue($queue,$dbh_count); + warn_if_chatty("processing queue $queue complete"); + } elsif (/^n:(.+)$/) { # we were passed a process name + set_psname("Queued Ingest worker - ". $1); + } elsif (/^p:(\d+)$/) { # we were passed a dbh count (parallelism) + $dbh_count = $1 || 1; + warn_if_chatty("parallelism set to $dbh_count"); + } elsif (/^e:(\d+)$/) { # we were passed an entry id + my $entry = $1; + push @input, $entry; + } elsif (/^##$/) { # This is the "process those, but then wait" command + last; + } elsif (/^!!$/) { # This is the "end of input, process and exit" command + warn_if_chatty("end of the command stream, should exit after"); + $stop_after = 1; + last; + } + } + + # we have a list of entries to process + if (my $icount = scalar(@input)) { + my @dbhs = create_processor_dbhs($dbh_count); + warn_if_chatty("processing $icount entries..."); + process_input_list(\@input, \@dbhs); + warn_if_chatty("processing $icount entries complete"); + } + + last if $stop_after || !$cont; + } + + close($ppipe); +} + +sub process_one_queue { + my $qid = shift; + my $dbh_count = shift || 1; + my $dbh_list = shift; + + return unless $qid; + + my @dbhs = $dbh_list ? @$dbh_list : create_processor_dbhs($dbh_count); + my @input = @{$dbhs[0]->selectcol_arrayref( + 'SELECT id FROM action.ingest_queue_entry'. + ' WHERE queue = ?'. + ' AND override_by IS NULL'. + ' AND ingest_time IS NULL'. + ' AND fail_time IS NULL', + {}, $qid + )}; + + return unless @input; + return process_input_list(\@input, \@dbhs); +} + +sub process_entry_begin { + my $entry_id = shift; + my $ms = shift; + my $dbh = shift; + + + my $entry = $dbh->selectrow_hashref( + "SELECT * FROM action.ingest_queue_entry WHERE id = ?", undef, $entry_id + ); + + if (!$$entry{id}) { + return $dbh; + } + + my $sth = $dbh->prepare( + "SELECT action.process_ingest_queue_entry(?)", {pg_async => 1} + ); + + if (!$sth->execute($entry_id)) { + $stats{total}{fail}++; + $stats{$$entry{record_type}}{$$entry{action}}{fail}++; + + my $current_second = CORE::time; + $stats{seconds}{$current_second}{fail}++; + $stats{seconds}{$current_second}{total}++; + + return $dbh; + } + + $$ms{$entry_id} = { + entry => $entry, + dbh => $dbh, + sth => $sth + }; + + return undef; +} + +sub process_entry_complete { + my $eid = shift; + my $ms = shift; + + if ($$ms{$eid}{sth}->pg_ready) { + $$ms{$eid}{sth}->pg_result; + my ($success) = $$ms{$eid}{sth}->fetchrow_array; + $$ms{$eid}{sth}->finish; + + $success = $success ? 'success' : 'fail'; + $stats{total}{$success}++; + $stats{$$ms{$eid}{entry}{record_type}}{$$ms{$eid}{entry}{action}}{$success}++; + + my $current_second = CORE::time; + $stats{seconds}{$current_second}{$success}++; + $stats{seconds}{$current_second}{total}++; + + my $dbh = delete $$ms{$eid}{dbh}; + delete $$ms{$eid}{$_} for keys %{$$ms{$eid}}; + delete $$ms{$eid}; + return $dbh; + } + + return undef; +} + +sub process_input_list { + my $input = shift; + my $dbhs = shift; + + my %microstate; + while (@$input || keys(%microstate)) { + + # do we have an idle worker, and work to do? + if (@$input && scalar(@$dbhs)) { + my $entry_id = shift @$input; + my $dbh = shift @$dbhs; + my $failed_dbh = process_entry_begin($entry_id, \%microstate, $dbh); + if ($failed_dbh) { + push @$dbhs, $failed_dbh; + next; + } + } + + # look at them in ascending order, as the oldest will have + # been running the longest and is more likely to be finished + my @entries = sort {$a <=> $b} keys %microstate; + for my $eid (@entries) { + my $success_dbh = process_entry_complete($eid, \%microstate); + if ($success_dbh) { + push @$dbhs, $success_dbh; + } + } + + usleep(10000) if (keys %microstate); # ~0.01 seconds + } + + return $dbhs; +} + +sub report_stats { + my $label = shift; + my $clear = shift; + my $warn = shift; + my $runtime = time - $start_time; + my @seconds_list = sort keys %{$stats{seconds}}; + my $first_second = $seconds_list[0]; + my $last_second = $seconds_list[-1]; + my $processing_seconds = ($last_second - $first_second) + 1.0; + + system('clear') if $clear; + + print "$label stats:\n" unless $warn; + warn "$label stats:\n" if $warn; + for my $type (qw/biblio authority/) { + for my $action ( sort keys %{$stats{$type}} ) { + my $part = $stats{$type}{$action}; + next unless (defined($$part{success}) || defined($$part{fail})); + + $$part{success} //= 0; + $$part{fail} //= 0; + + my $subtotal = $$part{success} + $$part{fail}; + print " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" unless $warn; + warn " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" if $warn; + } + } + + $stats{total}{success} //= 0; + $stats{total}{fail} //= 0; + + my $per_sec = ($stats{total}{success} + $stats{total}{fail}) / $processing_seconds; + + print " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n". + " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" unless $warn; + warn " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n". + " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" if $warn; +} + +sub enqueue_input { + my $predestined_input = shift; + my @input; + + if ($predestined_input and @$predestined_input) { + @input = @$predestined_input; + } elsif ($opt_pipe) { + while () { + # Assume any string of digits is an id. + if (my @subs = /([0-9]+)/g) { + push(@input, @subs); + } + } + } elsif (@$input_records) { + @input = grep { /^\d+$/ } @$input_records; + } else { + my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED"; + if ($start_id && $end_id) { + $q .= " AND id BETWEEN $start_id AND $end_id"; + } elsif ($start_id) { + $q .= " AND id >= $start_id"; + } elsif ($end_id) { + $q .= " AND id <= $end_id"; + } + $q .= ' ORDER BY id ASC'; + + @input = @{$main_dbh->selectcol_arrayref($q)}; + } + + $main_dbh->begin_work; + if ($queue_why || $queue_threads || $queue_run_at) { + my $rows = $main_dbh->do( + 'INSERT INTO action.ingest_queue (why,threads,run_at) VALUES(?,?,?)', + undef, $queue_why, $queue_threads || 1, $queue_run_at || 'NOW' + ); + if ($rows && $queue_owner) { + $queue = $main_dbh->last_insert_id(undef,'action','ingest_queue',undef); + if ($queue && $queue_owner) { + my $uid = $main_dbh->selectcol_arrayref( + 'SELECT id FROM actor.usr WHERE usrname = ?', + undef, $queue_owner + )->[0]; + if ($uid) { + $rows = $main_dbh->do( + 'UPDATE action.ingest_queue SET who = ? WHERE id = ?', + undef, $uid, $queue + ); + } + } + } + } + + my $q_obj = {}; + if ($queue) { + $q_obj = $main_dbh->selectrow_hashref("SELECT * FROM action.ingest_queue WHERE id=$queue") || {}; + } + $queue = $q_obj->{id} || '0'; + + if ($queue_type eq 'biblio' and $queue_action eq 'update') { + $queue_state_data .= ';skip_browse' if $skip_browse; + $queue_state_data .= ';skip_attrs' if $skip_attrs; + $queue_state_data .= ';skip_search' if $skip_search; + $queue_state_data .= ';skip_facets' if $skip_facets; + $queue_state_data .= ';skip_display' if $skip_display; + $queue_state_data .= ';skip_full_rec' if $skip_full_rec; + $queue_state_data .= ';skip_authority' if $skip_authority; + $queue_state_data .= ';skip_luri' if $skip_luri; + $queue_state_data .= ';skip_mrmap' if $skip_mrmap; + + $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs; + $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields; + } + + my $qid = $q_obj->{id}; + my $run_at = $q_obj->{run_at} || 'NOW'; + for my $rid (@input) { + my $success = $main_dbh->selectcol_arrayref( + 'SELECT action.enqueue_ingest_entry(?, ?, ?, ?, ?, ?)', + {}, + $rid, $queue_type, $run_at, $qid, $queue_action, $queue_state_data + )->[0]; + + my $update_key = $success ? 'success' : 'fail'; + $stats{total}{$update_key}++; + $stats{$queue_type}{$queue_action}{$update_key}++; + + my $current_second = CORE::time; + $stats{seconds}{$current_second}{$update_key}++; + $stats{seconds}{$current_second}{total}++; + } + $main_dbh->commit; + + return \@input; +} +