$rec->editor($e->requestor->id);
$rec->edit_date('now');
$rec->marc($marc);
+
+ my $inline_ingest = $e->retrieve_config_global_flag('ingest.queued.biblio.update.marc_edit_inline');
+ $inline_ingest = ($inline_ingest and $U->is_true($inline_ingest->enabled));
+
+ $e->json_query(
+ {from => [ 'action.set_queued_ingest_force', 'ingest.queued.biblio.update.disabled' ]}
+ ) if ($inline_ingest);
+
$e->update_biblio_record_entry($rec) or return $e->die_event;
+ $e->json_query(
+ {from => ['action.clear_queued_ingest_force']}
+ ) if ($inline_ingest);
+
return $rec;
}
$record->edit_date('now');
$record->marc($marc);
+ my $inline_ingest = $e->retrieve_config_global_flag('ingest.queued.biblio.insert.marc_edit_inline');
+ $inline_ingest = ($inline_ingest and $U->is_true($inline_ingest->enabled));
+
+ $e->json_query(
+ {from => [ 'action.set_queued_ingest_force', 'ingest.queued.biblio.insert.disabled' ]}
+ ) if ($inline_ingest);
+
$record = $e->create_biblio_record_entry($record) or return $e->die_event;
+ $e->json_query(
+ {from => ['action.clear_queued_ingest_force']}
+ ) if ($inline_ingest);
+
if($use_id) {
my $existing = $e->search_biblio_record_entry(
{
END;
$func$ LANGUAGE PLPGSQL;
-CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
+CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
bib_id BIGINT,
- skip_facet BOOL DEFAULT FALSE,
+ skip_facet BOOL DEFAULT FALSE,
skip_display BOOL DEFAULT FALSE,
- skip_browse BOOL DEFAULT FALSE,
+ skip_browse BOOL DEFAULT FALSE,
skip_search BOOL DEFAULT FALSE,
only_fields INT[] DEFAULT '{}'::INT[]
) RETURNS VOID AS $func$
IF NOT FOUND THEN
IF NOT b_skip_search THEN
FOR fclass IN SELECT * FROM config.metabib_class LOOP
- -- RAISE NOTICE 'Emptying out %', fclass.name;
- EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id;
+ EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
END LOOP;
END IF;
IF NOT b_skip_facet THEN
- DELETE FROM metabib.facet_entry WHERE source = bib_id;
+ DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
END IF;
IF NOT b_skip_display THEN
- DELETE FROM metabib.display_entry WHERE source = bib_id;
+ DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
END IF;
IF NOT b_skip_browse THEN
- DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id;
+ DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
END IF;
END IF;
FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
- -- don't store what has been normalized away
+ -- don't store what has been normalized away
CONTINUE WHEN ind_data.value IS NULL;
IF ind_data.field < 0 THEN
CONTINUE WHEN ind_data.sort_value IS NULL;
value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
- IF ind_data.browse_nocase THEN
+ IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
SELECT INTO mbe_row * FROM metabib.browse_entry
WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
- ELSE
- SELECT INTO mbe_row * FROM metabib.browse_entry
- WHERE value = value_prepped AND sort_value = ind_data.sort_value;
END IF;
- IF FOUND THEN
+ IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
mbe_id := mbe_row.id;
- ELSE
+ ELSE -- otherwise, an UPSERT-protected variant
INSERT INTO metabib.browse_entry
( value, sort_value ) VALUES
- ( value_prepped, ind_data.sort_value );
-
- mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
+ ( value_prepped, ind_data.sort_value )
+ ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
+ RETURNING id INTO mbe_id;
END IF;
INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
norm_attr_value := '{}'::TEXT[];
attr_vector_tmp := '{}'::INT[];
- SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
+ SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
SELECT ARRAY_AGG(value) INTO attr_value
WHERE record = rid
AND tag LIKE attr_def.tag
AND CASE
- WHEN attr_def.sf_list IS NOT NULL
+ WHEN attr_def.sf_list IS NOT NULL
THEN POSITION(subfield IN attr_def.sf_list) > 0
ELSE TRUE
END
IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
-
+
-- See if we can skip the XSLT ... it's expensive
IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
-- Can't skip the transform
ELSE
transformed_xml := rmarc;
END IF;
-
+
prev_xfrm := xfrm.name;
END IF;
norm_attr_value := norm_attr_value || tmp_val;
END IF;
END LOOP;
-
+
IF attr_def.filter THEN
-- Create unknown uncontrolled values and find the IDs of the values
IF ccvm_row.id IS NULL THEN
requested attrs, and then add back the new
set of attr values. */
- IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
+ IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
attr_vector := attr_vector || attr_vector_tmp;
END LOOP;
IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
- IF rdeleted THEN -- initial insert OR revivication
- DELETE FROM metabib.record_attr_vector_list WHERE source = rid;
- INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector);
- ELSE
- UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid;
- END IF;
+ INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
+ ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
END IF;
END;
$func$ LANGUAGE PLPGSQL;
-
--- AFTER UPDATE OR INSERT trigger for biblio.record_entry
-CREATE OR REPLACE FUNCTION biblio.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
+CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
DECLARE
tmp_bool BOOL;
+ diag_detail TEXT;
+ diag_context TEXT;
BEGIN
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
+ tmp_bool := FOUND;
- IF NEW.deleted THEN -- If this bib is deleted
-
- PERFORM * FROM config.internal_flag WHERE
- name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
-
- tmp_bool := FOUND; -- Just in case this is changed by some other statement
+ PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
- PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint, TRUE, tmp_bool );
-
- IF NOT tmp_bool THEN
- -- One needs to keep these around to support searches
- -- with the #deleted modifier, so one should turn on the named
- -- internal flag for that functionality.
- DELETE FROM metabib.record_attr_vector_list WHERE source = NEW.id;
- END IF;
-
- DELETE FROM authority.bib_linking WHERE bib = NEW.id; -- Avoid updating fields in bibs that are no longer visible
- DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = NEW.id; -- Separate any multi-homed items
- DELETE FROM metabib.browse_entry_def_map WHERE source = NEW.id; -- Don't auto-suggest deleted bibs
- RETURN NEW; -- and we're done
+ IF NOT tmp_bool THEN
+ -- One needs to keep these around to support searches
+ -- with the #deleted modifier, so one should turn on the named
+ -- internal flag for that functionality.
+ DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
END IF;
- IF TG_OP = 'UPDATE' AND OLD.deleted IS FALSE THEN -- re-ingest?
- PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
+ DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
+ DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
+ DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
- IF NOT FOUND AND OLD.marc = NEW.marc THEN -- don't do anything if the MARC didn't change
- RETURN NEW;
- END IF;
- END IF;
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ skip_facet BOOL := FALSE;
+ skip_display BOOL := FALSE;
+ skip_browse BOOL := FALSE;
+ skip_search BOOL := FALSE;
+ skip_auth BOOL := FALSE;
+ skip_full BOOL := FALSE;
+ skip_attrs BOOL := FALSE;
+ skip_luri BOOL := FALSE;
+ skip_mrmap BOOL := FALSE;
+ only_attrs TEXT[] := NULL;
+ only_fields INT[] := '{}'::INT[];
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
-- Record authority linking
+ SELECT extra LIKE '%skip_authority%' INTO skip_auth;
PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
- IF NOT FOUND THEN
- PERFORM biblio.map_authority_linking( NEW.id, NEW.marc );
+ IF NOT FOUND AND NOT skip_auth THEN
+ PERFORM biblio.map_authority_linking( bib.id, bib.marc );
END IF;
-- Flatten and insert the mfr data
+ SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
- IF NOT FOUND THEN
- PERFORM metabib.reingest_metabib_full_rec(NEW.id);
+ IF NOT FOUND AND NOT skip_full THEN
+ PERFORM metabib.reingest_metabib_full_rec(bib.id);
+ END IF;
- -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
- PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
- IF NOT FOUND THEN
- PERFORM metabib.reingest_record_attributes(NEW.id, NULL, NEW.marc, TG_OP = 'INSERT' OR OLD.deleted);
+ -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
+ SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
+ IF NOT FOUND AND NOT skip_attrs THEN
+ IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
+ SELECT REGEXP_SPLIT_TO_ARRAY(
+ (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
+ '\s*,\s*'
+ ) INTO only_attrs;
END IF;
+
+ PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
END IF;
-- Gather and insert the field entry data
- PERFORM metabib.reingest_metabib_field_entries(NEW.id);
+ SELECT extra LIKE '%skip_facet%' INTO skip_facet;
+ SELECT extra LIKE '%skip_display%' INTO skip_display;
+ SELECT extra LIKE '%skip_browse%' INTO skip_browse;
+ SELECT extra LIKE '%skip_search%' INTO skip_search;
+
+ IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
+ SELECT REGEXP_SPLIT_TO_ARRAY(
+ (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
+ '\s*,\s*'
+ )::INT[] INTO only_fields;
+ END IF;
+
+ IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
+ PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
+ END IF;
-- Located URI magic
+ SELECT extra LIKE '%skip_luri%' INTO skip_luri;
PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
- IF NOT FOUND THEN PERFORM biblio.extract_located_uris( NEW.id, NEW.marc, NEW.editor ); END IF;
+ IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
-- (re)map metarecord-bib linking
- IF TG_OP = 'INSERT' THEN -- if not deleted and performing an insert, check for the flag
+ SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
+ IF insert_only THEN -- if not deleted and performing an insert, check for the flag
PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
- IF NOT FOUND THEN
- PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint );
+ IF NOT FOUND AND NOT skip_mrmap THEN
+ PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
END IF;
ELSE -- we're doing an update, and we're not deleted, remap
PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
+ IF NOT FOUND AND NOT skip_mrmap THEN
+ PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
+ END IF;
+ END IF;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ tmp_bool BOOL;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+ DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
+ DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
+ DELETE FROM authority.simple_heading WHERE record = NEW.id;
+ -- Should remove matching $0 from controlled fields at the same time?
+
+ -- XXX What do we about the actual linking subfields present in
+ -- authority records that target this one when this happens?
+ DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ ashs authority.simple_heading%ROWTYPE;
+ mbe_row metabib.browse_entry%ROWTYPE;
+ mbe_id BIGINT;
+ ash_id BIGINT;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+
+ -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
+
+ IF NOT FOUND AND auth.heading <> old_heading THEN
+ PERFORM authority.propagate_changes(auth.id);
+ END IF;
+
+ IF NOT insert_only THEN
+ DELETE FROM authority.authority_linking WHERE source = auth.id;
+ DELETE FROM authority.simple_heading WHERE record = auth.id;
+ END IF;
+
+ INSERT INTO authority.authority_linking (source, target, field)
+ SELECT source, target, field FROM authority.calculate_authority_linking(
+ auth.id, auth.control_set, auth.marc::XML
+ );
+
+ FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
+
+ INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
+ VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
+ ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
+
+ SELECT INTO mbe_row * FROM metabib.browse_entry
+ WHERE value = ashs.value AND sort_value = ashs.sort_value;
+
+ IF FOUND THEN
+ mbe_id := mbe_row.id;
+ ELSE
+ INSERT INTO metabib.browse_entry
+ ( value, sort_value ) VALUES
+ ( ashs.value, ashs.sort_value );
+
+ mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
+ END IF;
+
+ INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
+
+ END LOOP;
+
+ -- Flatten and insert the afr data
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
+ IF NOT FOUND THEN
+ PERFORM authority.reingest_authority_full_rec(auth.id);
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
IF NOT FOUND THEN
- PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint );
+ PERFORM authority.reingest_authority_rec_descriptor(auth.id);
END IF;
END IF;
- RETURN NEW;
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
END;
$func$ LANGUAGE PLPGSQL;
hold INT NOT NULL REFERENCES action.hold_request (id) ON UPDATE CASCADE ON DELETE CASCADE
);
+CREATE TABLE action.ingest_queue (
+ id SERIAL PRIMARY KEY,
+ created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
+ start_time TIMESTAMPTZ,
+ end_time TIMESTAMPTZ,
+ threads INT,
+ why TEXT
+);
+
+CREATE TABLE action.ingest_queue_entry (
+ id BIGSERIAL PRIMARY KEY,
+ record BIGINT NOT NULL, -- points to a record id of the appropriate record_type
+ record_type TEXT NOT NULL,
+ action TEXT NOT NULL,
+ run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ state_data TEXT NOT NULL DEFAULT '',
+ queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
+ override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
+ ingest_time TIMESTAMPTZ,
+ fail_time TIMESTAMPTZ
+);
+CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
+CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
+
+CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
+ record_id BIGINT,
+ rtype TEXT DEFAULT 'biblio',
+ when_to_run TIMESTAMPTZ DEFAULT NOW(),
+ queue_id INT DEFAULT NULL,
+ ingest_action TEXT DEFAULT 'update', -- will be the most common?
+ old_state_data TEXT DEFAULT ''
+) RETURNS BOOL AS $F$
+DECLARE
+ new_entry action.ingest_queue_entry%ROWTYPE;
+ prev_del_entry action.ingest_queue_entry%ROWTYPE;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+
+ IF ingest_action = 'delete' THEN
+ -- first see if there is an outstanding entry
+ SELECT * INTO prev_del_entry
+ FROM action.ingest_queue_entry
+ WHERE qe.record = record_id
+ AND qe.state_date = old_state_data
+ AND qe.record_type = rtype
+ AND qe.ingest_time IS NULL
+ AND qe.override_by IS NULL;
+ END IF;
+
+ WITH existing_queue_entry_cte AS (
+ SELECT queue_id AS queue,
+ rtype AS record_type,
+ record_id AS record,
+ qe.id AS override_by,
+ ingest_action AS action,
+ q.run_at AS run_at,
+ old_state_data AS state_data
+ FROM action.ingest_queue_entry qe
+ JOIN action.ingest_queue q ON (qe.queue = q.id)
+ WHERE qe.record = record_id
+ AND q.end_time IS NULL
+ AND qe.record_type = rtype
+ AND qe.state_data = old_state_data
+ AND qe.ingest_time IS NULL
+ AND qe.fail_time IS NULL
+ AND qe.override_by IS NULL
+ ), existing_nonqueue_entry_cte AS (
+ SELECT queue_id AS queue,
+ rtype AS record_type,
+ record_id AS record,
+ qe.id AS override_by,
+ ingest_action AS action,
+ qe.run_at AS run_at,
+ old_state_data AS state_data
+ FROM action.ingest_queue_entry qe
+ WHERE qe.record = record_id
+ AND qe.queue IS NULL
+ AND qe.record_type = rtype
+ AND qe.state_data = old_state_data
+ AND qe.ingest_time IS NULL
+ AND qe.fail_time IS NULL
+ AND qe.override_by IS NULL
+ ), new_entry_cte AS (
+ SELECT * FROM existing_queue_entry_cte
+ UNION ALL
+ SELECT * FROM existing_nonqueue_entry_cte
+ UNION ALL
+ SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
+ ), insert_entry_cte AS (
+ INSERT INTO action.ingest_queue_entry
+ (queue, record_type, record, override_by, action, run_at, state_data)
+ SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
+ ORDER BY 4 NULLS LAST, 6
+ LIMIT 1
+ RETURNING *
+ ) SELECT * INTO new_entry FROM insert_entry_cte;
+
+ IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
+ UPDATE action.ingest_queue_entry
+ SET override_by = new_entry.id
+ WHERE id = prev_del_entry.id;
+
+ UPDATE action.ingest_queue_entry
+ SET override_by = NULL
+ WHERE id = new_entry.id;
+
+ ELSIF new_entry.override_by IS NOT NULL THEN
+ RETURN TRUE; -- already handled, don't notify
+ END IF;
+
+ NOTIFY queued_ingest;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$F$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
+DECLARE
+ ingest_success BOOL := NULL;
+ qe action.ingest_queue_entry%ROWTYPE;
+BEGIN
+
+ SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
+ IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
+ RETURN TRUE; -- Already done
+ END IF;
+
+ IF qe.action = 'delete' THEN
+ IF qe.record_type = 'biblio' THEN
+ SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
+ ELSIF qe.record_type = 'authority' THEN
+ SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
+ END IF;
+ ELSE
+ IF qe.record_type = 'biblio' THEN
+ IF qe.action = 'propagate' THEN
+ SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success;
+ ELSE
+ SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
+ END IF;
+ ELSIF qe.record_type = 'authority' THEN
+ SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
+ END IF;
+ END IF;
+
+ IF NOT ingest_success THEN
+ UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
+ IF FOUND THEN
+ RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
+ ELSE
+ RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
+ END IF;
+ ELSE
+ UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
+ END IF;
+
+ RETURN ingest_success;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+
+CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
+BEGIN
+ IF NEW.ingest_time IS NOT NULL THEN
+ UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
+ END IF;
+
+ RETURN NULL;
+END;
+$F$ LANGUAGE PLPGSQL;
+
+CREATE TRIGGER complete_duplicated_entries_trigger
+ AFTER UPDATE ON action.ingest_queue_entry
+ FOR EACH ROW WHEN (NEW.override_by IS NULL)
+ EXECUTE PROCEDURE action.complete_duplicated_entries();
+
+CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
+ $_SHARED{"ingest_queue_id"} = $_[0];
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
+ return $_SHARED{"ingest_queue_id"};
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
+ delete($_SHARED{"ingest_queue_id"});
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
+ $_SHARED{"ingest_queue_force"} = $_[0];
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
+ return $_SHARED{"ingest_queue_force"};
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
+ delete($_SHARED{"ingest_queue_force"});
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION authority.propagate_changes
+ (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
+DECLARE
+ queuing_success BOOL := FALSE;
+BEGIN
+
+ PERFORM 1 FROM config.global_flag
+ WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
+ AND enabled;
+
+ IF FOUND THEN
+ -- XXX enqueue special 'propagate' bib action
+ SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success;
+
+ IF queuing_success THEN
+ RETURN aid;
+ END IF;
+ END IF;
+
+ PERFORM authority.apply_propagate_changes(aid, bid);
+ RETURN aid;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
+ (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
+DECLARE
+ bib_forced BOOL := FALSE;
+ bib_rec biblio.record_entry%ROWTYPE;
+ new_marc TEXT;
+BEGIN
+
+ SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
+
+ new_marc := vandelay.merge_record_xml(
+ bib_rec.marc, authority.generate_overlay_template(aid));
+
+ IF new_marc = bib_rec.marc THEN
+ -- Authority record change had no impact on this bib record.
+ -- Nothing left to do.
+ RETURN aid;
+ END IF;
+
+ PERFORM 1 FROM config.global_flag
+ WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
+ AND enabled;
+
+ IF NOT FOUND THEN
+ -- update the bib record editor and edit_date
+ bib_rec.editor := (
+ SELECT editor FROM authority.record_entry WHERE id = aid);
+ bib_rec.edit_date = NOW();
+ END IF;
+
+ PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
+
+ UPDATE biblio.record_entry SET
+ marc = new_marc,
+ editor = bib_rec.editor,
+ edit_date = bib_rec.edit_date
+ WHERE id = bid;
+
+ PERFORM action.clear_queued_ingest_force();
+
+ RETURN aid;
+
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
+DECLARE
+ old_state_data TEXT := '';
+ new_action TEXT;
+ queuing_force TEXT;
+ queuing_flag_name TEXT;
+ queuing_flag BOOL := FALSE;
+ queuing_success BOOL := FALSE;
+ ingest_success BOOL := FALSE;
+ ingest_queue INT;
+BEGIN
+
+ -- Identify the ingest action type
+ IF TG_OP = 'UPDATE' THEN
+
+ -- Gather type-specific data for later use
+ IF TG_TABLE_SCHEMA = 'authority' THEN
+ old_state_data = OLD.heading;
+ END IF;
+
+ IF NOT OLD.deleted THEN -- maybe reingest?
+ IF NEW.deleted THEN
+ new_action = 'delete'; -- nope, delete
+ ELSE
+ new_action = 'update'; -- yes, update
+ END IF;
+ ELSIF NOT NEW.deleted THEN
+ new_action = 'insert'; -- revivify, AKA insert
+ ELSE
+ RETURN NEW; -- was and is still deleted, don't ingest
+ END IF;
+ ELSIF TG_OP = 'INSERT' THEN
+ new_action = 'insert'; -- brand new
+ ELSE
+ RETURN OLD; -- really deleting the record
+ END IF;
+
+ queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
+ -- See if we should be queuing anything
+ SELECT enabled INTO queuing_flag
+ FROM config.internal_flag
+ WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
+ AND enabled
+ LIMIT 1;
+
+ SELECT action.get_queued_ingest_force() INTO queuing_force;
+ IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
+ queuing_flag := TRUE;
+ END IF;
+
+ -- you (or part of authority propagation) can forcibly disable specific queuing actions
+ IF queuing_force = queuing_flag_name||'.disabled' THEN
+ queuing_flag := FALSE;
+ END IF;
+
+ -- And if we should be queuing ...
+ IF queuing_flag THEN
+ ingest_queue := action.get_ingest_queue();
+
+ -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
+ IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
+
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
+
+ -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
+ IF NOT FOUND AND OLD.marc = NEW.marc THEN
+ RETURN NEW;
+ END IF;
+ END IF;
+
+ -- Otherwise, attempt to enqueue
+ SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
+ END IF;
+
+ -- If queuing was not requested, or failed for some reason, do it live.
+ IF NOT queuing_success THEN
+ IF queuing_flag THEN
+ RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
+ END IF;
+
+ IF new_action = 'delete' THEN
+ IF TG_TABLE_SCHEMA = 'biblio' THEN
+ SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
+ ELSIF TG_TABLE_SCHEMA = 'authority' THEN
+ SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
+ END IF;
+ ELSE
+ IF TG_TABLE_SCHEMA = 'biblio' THEN
+ SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
+ ELSIF TG_TABLE_SCHEMA = 'authority' THEN
+ SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
+ END IF;
+ END IF;
+
+ IF NOT ingest_success THEN
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
+ IF FOUND THEN
+ RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
+ ELSE
+ RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
+ END IF;
+ END IF;
+ END IF;
+
+ RETURN NEW;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
+CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
+
COMMIT;
)
);
+INSERT INTO config.global_flag (name, enabled, label) VALUES (
+ 'ingest.queued.max_threads', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.max_threads',
+ 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.abort_on_error', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.abort_on_error',
+ 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.propagate', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.propagate',
+ 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.all',
+ 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.all',
+ 'Queued Ingest: Use Queued Ingest for all bib record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.all',
+ 'Queued Ingest: Use Queued Ingest for all authority record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.insert.marc_edit_inline', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.insert.marc_edit_inline',
+ 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.insert', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.insert',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.insert', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.insert',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.update.marc_edit_inline', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.update.marc_edit_inline',
+ 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.update', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.update',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on update',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.update', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.update',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on update',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.delete', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.delete',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.delete', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.delete',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
+ 'cgf',
+ 'label'
+ )
+);
+
+UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
-- Ingest triggers
CREATE TRIGGER fingerprint_tgr BEFORE INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE biblio.fingerprint_trigger ('eng','BKS');
-CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE biblio.indexing_ingest_or_delete ();
CREATE TRIGGER bbb_simple_rec_trigger AFTER INSERT OR UPDATE OR DELETE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE reporter.simple_rec_trigger ();
CREATE TRIGGER map_thesaurus_to_control_set BEFORE INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE authority.map_thesaurus_to_control_set ();
-CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE authority.indexing_ingest_or_delete ();
-- Utility routines, callable via cstore
--- /dev/null
+BEGIN;
+
+INSERT INTO config.global_flag (name, enabled, label) VALUES (
+ 'ingest.queued.max_threads', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.max_threads',
+ 'Queued Ingest: Maximum number of database workers allowed for queued ingest processes',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.abort_on_error', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.abort_on_error',
+ 'Queued Ingest: Abort transaction on ingest error rather than simply logging an error',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.propagate', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.propagate',
+ 'Queued Ingest: Queue all bib record updates on authority change propagation, even if bib queuing is not generally enabled',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.all',
+ 'Queued Ingest: Use Queued Ingest for all bib and authority record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.all',
+ 'Queued Ingest: Use Queued Ingest for all bib record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.all', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.all',
+ 'Queued Ingest: Use Queued Ingest for all authority record ingest',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.insert.marc_edit_inline', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.insert.marc_edit_inline',
+ 'Queued Ingest: Do NOT use Queued Ingest when creating a new bib, or undeleting a bib, via the MARC editor',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.insert', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.insert',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on insert and undelete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.insert', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.insert',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on insert and undelete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.update.marc_edit_inline', TRUE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.update.marc_edit_inline',
+ 'Queued Ingest: Do NOT Use Queued Ingest when editing bib records via the MARC Editor',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.update', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.update',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on update',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.update', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.update',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on update',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.biblio.delete', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.biblio.delete',
+ 'Queued Ingest: Use Queued Ingest for bib record ingest on delete',
+ 'cgf',
+ 'label'
+ )),(
+ 'ingest.queued.authority.delete', FALSE,
+ oils_i18n_gettext(
+ 'ingest.queued.authority.delete',
+ 'Queued Ingest: Use Queued Ingest for authority record ingest on delete',
+ 'cgf',
+ 'label'
+ )
+);
+
+UPDATE config.global_flag SET value = '20' WHERE name = 'ingest.queued.max_threads';
+
+CREATE TABLE action.ingest_queue (
+ id SERIAL PRIMARY KEY,
+ created TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ who INT REFERENCES actor.usr (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
+ start_time TIMESTAMPTZ,
+ end_time TIMESTAMPTZ,
+ threads INT,
+ why TEXT
+);
+
+CREATE TABLE action.ingest_queue_entry (
+ id BIGSERIAL PRIMARY KEY,
+ record BIGINT NOT NULL, -- points to a record id of the appropriate record_type
+ record_type TEXT NOT NULL,
+ action TEXT NOT NULL,
+ run_at TIMESTAMPTZ NOT NULL DEFAULT NOW(),
+ state_data TEXT NOT NULL DEFAULT '',
+ queue INT REFERENCES action.ingest_queue (id) ON UPDATE CASCADE ON DELETE CASCADE DEFERRABLE INITIALLY DEFERRED,
+ override_by BIGINT REFERENCES action.ingest_queue_entry (id) ON UPDATE CASCADE ON DELETE SET NULL DEFERRABLE INITIALLY DEFERRED,
+ ingest_time TIMESTAMPTZ,
+ fail_time TIMESTAMPTZ
+);
+CREATE UNIQUE INDEX record_pending_once ON action.ingest_queue_entry (record_type,record,state_data) WHERE ingest_time IS NULL AND override_by IS NULL;
+CREATE INDEX entry_override_by_idx ON action.ingest_queue_entry (override_by) WHERE override_by IS NOT NULL;
+
+CREATE OR REPLACE FUNCTION action.enqueue_ingest_entry (
+ record_id BIGINT,
+ rtype TEXT DEFAULT 'biblio',
+ when_to_run TIMESTAMPTZ DEFAULT NOW(),
+ queue_id INT DEFAULT NULL,
+ ingest_action TEXT DEFAULT 'update', -- will be the most common?
+ old_state_data TEXT DEFAULT ''
+) RETURNS BOOL AS $F$
+DECLARE
+ new_entry action.ingest_queue_entry%ROWTYPE;
+ prev_del_entry action.ingest_queue_entry%ROWTYPE;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+
+ IF ingest_action = 'delete' THEN
+ -- first see if there is an outstanding entry
+ SELECT * INTO prev_del_entry
+ FROM action.ingest_queue_entry
+ WHERE qe.record = record_id
+ AND qe.state_date = old_state_data
+ AND qe.record_type = rtype
+ AND qe.ingest_time IS NULL
+ AND qe.override_by IS NULL;
+ END IF;
+
+ WITH existing_queue_entry_cte AS (
+ SELECT queue_id AS queue,
+ rtype AS record_type,
+ record_id AS record,
+ qe.id AS override_by,
+ ingest_action AS action,
+ q.run_at AS run_at,
+ old_state_data AS state_data
+ FROM action.ingest_queue_entry qe
+ JOIN action.ingest_queue q ON (qe.queue = q.id)
+ WHERE qe.record = record_id
+ AND q.end_time IS NULL
+ AND qe.record_type = rtype
+ AND qe.state_data = old_state_data
+ AND qe.ingest_time IS NULL
+ AND qe.fail_time IS NULL
+ AND qe.override_by IS NULL
+ ), existing_nonqueue_entry_cte AS (
+ SELECT queue_id AS queue,
+ rtype AS record_type,
+ record_id AS record,
+ qe.id AS override_by,
+ ingest_action AS action,
+ qe.run_at AS run_at,
+ old_state_data AS state_data
+ FROM action.ingest_queue_entry qe
+ WHERE qe.record = record_id
+ AND qe.queue IS NULL
+ AND qe.record_type = rtype
+ AND qe.state_data = old_state_data
+ AND qe.ingest_time IS NULL
+ AND qe.fail_time IS NULL
+ AND qe.override_by IS NULL
+ ), new_entry_cte AS (
+ SELECT * FROM existing_queue_entry_cte
+ UNION ALL
+ SELECT * FROM existing_nonqueue_entry_cte
+ UNION ALL
+ SELECT queue_id, rtype, record_id, NULL, ingest_action, COALESCE(when_to_run,NOW()), old_state_data
+ ), insert_entry_cte AS (
+ INSERT INTO action.ingest_queue_entry
+ (queue, record_type, record, override_by, action, run_at, state_data)
+ SELECT queue, record_type, record, override_by, action, run_at, state_data FROM new_entry_cte
+ ORDER BY 4 NULLS LAST, 6
+ LIMIT 1
+ RETURNING *
+ ) SELECT * INTO new_entry FROM insert_entry_cte;
+
+ IF prev_del_entry.id IS NOT NULL THEN -- later delete overrides earlier unapplied entry
+ UPDATE action.ingest_queue_entry
+ SET override_by = new_entry.id
+ WHERE id = prev_del_entry.id;
+
+ UPDATE action.ingest_queue_entry
+ SET override_by = NULL
+ WHERE id = new_entry.id;
+
+ ELSIF new_entry.override_by IS NOT NULL THEN
+ RETURN TRUE; -- already handled, don't notify
+ END IF;
+
+ NOTIFY queued_ingest;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$F$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION action.process_ingest_queue_entry (qeid BIGINT) RETURNS BOOL AS $func$
+DECLARE
+ ingest_success BOOL := NULL;
+ qe action.ingest_queue_entry%ROWTYPE;
+BEGIN
+
+ SELECT * INTO qe FROM action.ingest_queue_entry WHERE id = qeid;
+ IF qe.ingest_time IS NOT NULL OR qe.override_by IS NOT NULL THEN
+ RETURN TRUE; -- Already done
+ END IF;
+
+ IF qe.action = 'delete' THEN
+ IF qe.record_type = 'biblio' THEN
+ SELECT metabib.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
+ ELSIF qe.record_type = 'authority' THEN
+ SELECT authority.indexing_delete(r.*, qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
+ END IF;
+ ELSE
+ IF qe.record_type = 'biblio' THEN
+ IF qe.action = 'propagate' THEN
+ SELECT authority.apply_propagate_changes(qe.state_data::BIGINT, qe.record) INTO ingest_success;
+ ELSE
+ SELECT metabib.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM biblio.record_entry r WHERE r.id = qe.record;
+ END IF;
+ ELSIF qe.record_type = 'authority' THEN
+ SELECT authority.indexing_update(r.*, qe.action = 'insert', qe.state_data) INTO ingest_success FROM authority.record_entry r WHERE r.id = qe.record;
+ END IF;
+ END IF;
+
+ IF NOT ingest_success THEN
+ UPDATE action.ingest_queue_entry SET fail_time = NOW() WHERE id = qe.id;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
+ IF FOUND THEN
+ RAISE EXCEPTION 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
+ ELSE
+ RAISE WARNING 'Ingest action of % on %.record_entry % for queue entry % failed', qe.action, qe.record_type, qe.record, qe.id;
+ END IF;
+ ELSE
+ UPDATE action.ingest_queue_entry SET ingest_time = NOW() WHERE id = qe.id;
+ END IF;
+
+ RETURN ingest_success;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+
+CREATE OR REPLACE FUNCTION action.complete_duplicated_entries () RETURNS TRIGGER AS $F$
+BEGIN
+ IF NEW.ingest_time IS NOT NULL THEN
+ UPDATE action.ingest_queue_entry SET ingest_time = NEW.ingest_time WHERE override_by = NEW.id;
+ END IF;
+
+ RETURN NULL;
+END;
+$F$ LANGUAGE PLPGSQL;
+
+CREATE TRIGGER complete_duplicated_entries_trigger
+ AFTER UPDATE ON action.ingest_queue_entry
+ FOR EACH ROW WHEN (NEW.override_by IS NULL)
+ EXECUTE PROCEDURE action.complete_duplicated_entries();
+
+CREATE OR REPLACE FUNCTION action.set_ingest_queue(INT) RETURNS VOID AS $$
+ $_SHARED{"ingest_queue_id"} = $_[0];
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.get_ingest_queue() RETURNS INT AS $$
+ return $_SHARED{"ingest_queue_id"};
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.clear_ingest_queue() RETURNS VOID AS $$
+ delete($_SHARED{"ingest_queue_id"});
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.set_queued_ingest_force(TEXT) RETURNS VOID AS $$
+ $_SHARED{"ingest_queue_force"} = $_[0];
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.get_queued_ingest_force() RETURNS TEXT AS $$
+ return $_SHARED{"ingest_queue_force"};
+$$ LANGUAGE plperlu;
+
+CREATE OR REPLACE FUNCTION action.clear_queued_ingest_force() RETURNS VOID AS $$
+ delete($_SHARED{"ingest_queue_force"});
+$$ LANGUAGE plperlu;
+
+------------------ ingest functions ------------------
+
+CREATE OR REPLACE FUNCTION metabib.indexing_delete (bib biblio.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ tmp_bool BOOL;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
+ tmp_bool := FOUND;
+
+ PERFORM metabib.remap_metarecord_for_bib(bib.id, bib.fingerprint, TRUE, tmp_bool);
+
+ IF NOT tmp_bool THEN
+ -- One needs to keep these around to support searches
+ -- with the #deleted modifier, so one should turn on the named
+ -- internal flag for that functionality.
+ DELETE FROM metabib.record_attr_vector_list WHERE source = bib.id;
+ END IF;
+
+ DELETE FROM authority.bib_linking WHERE bib = bib.id; -- Avoid updating fields in bibs that are no longer visible
+ DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = bib.id; -- Separate any multi-homed items
+ DELETE FROM metabib.browse_entry_def_map WHERE source = bib.id; -- Don't auto-suggest deleted bibs
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION metabib.indexing_update (bib biblio.record_entry, insert_only BOOL DEFAULT FALSE, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ skip_facet BOOL := FALSE;
+ skip_display BOOL := FALSE;
+ skip_browse BOOL := FALSE;
+ skip_search BOOL := FALSE;
+ skip_auth BOOL := FALSE;
+ skip_full BOOL := FALSE;
+ skip_attrs BOOL := FALSE;
+ skip_luri BOOL := FALSE;
+ skip_mrmap BOOL := FALSE;
+ only_attrs TEXT[] := NULL;
+ only_fields INT[] := '{}'::INT[];
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+
+ -- Record authority linking
+ SELECT extra LIKE '%skip_authority%' INTO skip_auth;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
+ IF NOT FOUND AND NOT skip_auth THEN
+ PERFORM biblio.map_authority_linking( bib.id, bib.marc );
+ END IF;
+
+ -- Flatten and insert the mfr data
+ SELECT extra LIKE '%skip_full_rec%' INTO skip_full;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
+ IF NOT FOUND AND NOT skip_full THEN
+ PERFORM metabib.reingest_metabib_full_rec(bib.id);
+ END IF;
+
+ -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
+ SELECT extra LIKE '%skip_attrs%' INTO skip_attrs;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
+ IF NOT FOUND AND NOT skip_attrs THEN
+ IF extra ~ 'attr\(\s*(\w[ ,\w]*?)\s*\)' THEN
+ SELECT REGEXP_SPLIT_TO_ARRAY(
+ (REGEXP_MATCHES(extra, 'attr\(\s*(\w[ ,\w]*?)\s*\)'))[1],
+ '\s*,\s*'
+ ) INTO only_attrs;
+ END IF;
+
+ PERFORM metabib.reingest_record_attributes(bib.id, only_attrs, bib.marc, insert_only);
+ END IF;
+
+ -- Gather and insert the field entry data
+ SELECT extra LIKE '%skip_facet%' INTO skip_facet;
+ SELECT extra LIKE '%skip_display%' INTO skip_display;
+ SELECT extra LIKE '%skip_browse%' INTO skip_browse;
+ SELECT extra LIKE '%skip_search%' INTO skip_search;
+
+ IF extra ~ 'field_list\(\s*(\d[ ,\d]+)\s*\)' THEN
+ SELECT REGEXP_SPLIT_TO_ARRAY(
+ (REGEXP_MATCHES(extra, 'field_list\(\s*(\d[ ,\d]+)\s*\)'))[1],
+ '\s*,\s*'
+ )::INT[] INTO only_fields;
+ END IF;
+
+ IF NOT skip_facet OR NOT skip_display OR NOT skip_browse OR NOT skip_search THEN
+ PERFORM metabib.reingest_metabib_field_entries(bib.id, skip_facet, skip_display, skip_browse, skip_search, only_fields);
+ END IF;
+
+ -- Located URI magic
+ SELECT extra LIKE '%skip_luri%' INTO skip_luri;
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
+ IF NOT FOUND AND NOT skip_luri THEN PERFORM biblio.extract_located_uris( bib.id, bib.marc, bib.editor ); END IF;
+
+ -- (re)map metarecord-bib linking
+ SELECT extra LIKE '%skip_mrmap%' INTO skip_mrmap;
+ IF insert_only THEN -- if not deleted and performing an insert, check for the flag
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
+ IF NOT FOUND AND NOT skip_mrmap THEN
+ PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
+ END IF;
+ ELSE -- we're doing an update, and we're not deleted, remap
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
+ IF NOT FOUND AND NOT skip_mrmap THEN
+ PERFORM metabib.remap_metarecord_for_bib( bib.id, bib.fingerprint );
+ END IF;
+ END IF;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.indexing_delete (auth authority.record_entry, extra TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ tmp_bool BOOL;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+ DELETE FROM authority.bib_linking WHERE authority = NEW.id; -- Avoid updating fields in bibs that are no longer visible
+ DELETE FROM authority.full_rec WHERE record = NEW.id; -- Avoid validating fields against deleted authority records
+ DELETE FROM authority.simple_heading WHERE record = NEW.id;
+ -- Should remove matching $0 from controlled fields at the same time?
+
+ -- XXX What do we about the actual linking subfields present in
+ -- authority records that target this one when this happens?
+ DELETE FROM authority.authority_linking WHERE source = NEW.id OR target = NEW.id;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+
+CREATE OR REPLACE FUNCTION authority.indexing_update (auth authority.record_entry, insert_only BOOL DEFAULT FALSE, old_heading TEXT DEFAULT NULL) RETURNS BOOL AS $func$
+DECLARE
+ ashs authority.simple_heading%ROWTYPE;
+ mbe_row metabib.browse_entry%ROWTYPE;
+ mbe_id BIGINT;
+ ash_id BIGINT;
+ diag_detail TEXT;
+ diag_context TEXT;
+BEGIN
+
+ -- Unless there's a setting stopping us, propagate these updates to any linked bib records when the heading changes
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_auto_update' AND enabled;
+
+ IF NOT FOUND AND auth.heading <> old_heading THEN
+ PERFORM authority.propagate_changes(auth.id);
+ END IF;
+
+ IF NOT insert_only THEN
+ DELETE FROM authority.authority_linking WHERE source = auth.id;
+ DELETE FROM authority.simple_heading WHERE record = auth.id;
+ END IF;
+
+ INSERT INTO authority.authority_linking (source, target, field)
+ SELECT source, target, field FROM authority.calculate_authority_linking(
+ auth.id, auth.control_set, auth.marc::XML
+ );
+
+ FOR ashs IN SELECT * FROM authority.simple_heading_set(auth.marc) LOOP
+
+ INSERT INTO authority.simple_heading (record,atag,value,sort_value,thesaurus)
+ VALUES (ashs.record, ashs.atag, ashs.value, ashs.sort_value, ashs.thesaurus);
+ ash_id := CURRVAL('authority.simple_heading_id_seq'::REGCLASS);
+
+ SELECT INTO mbe_row * FROM metabib.browse_entry
+ WHERE value = ashs.value AND sort_value = ashs.sort_value;
+
+ IF FOUND THEN
+ mbe_id := mbe_row.id;
+ ELSE
+ INSERT INTO metabib.browse_entry
+ ( value, sort_value ) VALUES
+ ( ashs.value, ashs.sort_value );
+
+ mbe_id := CURRVAL('metabib.browse_entry_id_seq'::REGCLASS);
+ END IF;
+
+ INSERT INTO metabib.browse_entry_simple_heading_map (entry,simple_heading) VALUES (mbe_id,ash_id);
+
+ END LOOP;
+
+ -- Flatten and insert the afr data
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_full_rec' AND enabled;
+ IF NOT FOUND THEN
+ PERFORM authority.reingest_authority_full_rec(auth.id);
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_rec_descriptor' AND enabled;
+ IF NOT FOUND THEN
+ PERFORM authority.reingest_authority_rec_descriptor(auth.id);
+ END IF;
+ END IF;
+
+ RETURN TRUE;
+EXCEPTION WHEN OTHERS THEN
+ GET STACKED DIAGNOSTICS diag_detail = PG_EXCEPTION_DETAIL,
+ diag_context = PG_EXCEPTION_CONTEXT;
+ RAISE WARNING '%\n%', diag_detail, diag_context;
+ RETURN FALSE;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION evergreen.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
+DECLARE
+ old_state_data TEXT := '';
+ new_action TEXT;
+ queuing_force TEXT;
+ queuing_flag_name TEXT;
+ queuing_flag BOOL := FALSE;
+ queuing_success BOOL := FALSE;
+ ingest_success BOOL := FALSE;
+ ingest_queue INT;
+BEGIN
+
+ -- Identify the ingest action type
+ IF TG_OP = 'UPDATE' THEN
+
+ -- Gather type-specific data for later use
+ IF TG_TABLE_SCHEMA = 'authority' THEN
+ old_state_data = OLD.heading;
+ END IF;
+
+ IF NOT OLD.deleted THEN -- maybe reingest?
+ IF NEW.deleted THEN
+ new_action = 'delete'; -- nope, delete
+ ELSE
+ new_action = 'update'; -- yes, update
+ END IF;
+ ELSIF NOT NEW.deleted THEN
+ new_action = 'insert'; -- revivify, AKA insert
+ ELSE
+ RETURN NEW; -- was and is still deleted, don't ingest
+ END IF;
+ ELSIF TG_OP = 'INSERT' THEN
+ new_action = 'insert'; -- brand new
+ ELSE
+ RETURN OLD; -- really deleting the record
+ END IF;
+
+ queuing_flag_name := 'ingest.queued.'||TG_TABLE_SCHEMA||'.'||new_action;
+ -- See if we should be queuing anything
+ SELECT enabled INTO queuing_flag
+ FROM config.internal_flag
+ WHERE name IN ('ingest.queued.all','ingest.queued.'||TG_TABLE_SCHEMA||'.all', queuing_flag_name)
+ AND enabled
+ LIMIT 1;
+
+ SELECT action.get_queued_ingest_force() INTO queuing_force;
+ IF queuing_flag IS NULL AND queuing_force = queuing_flag_name THEN
+ queuing_flag := TRUE;
+ END IF;
+
+ -- you (or part of authority propagation) can forcibly disable specific queuing actions
+ IF queuing_force = queuing_flag_name||'.disabled' THEN
+ queuing_flag := FALSE;
+ END IF;
+
+ -- And if we should be queuing ...
+ IF queuing_flag THEN
+ ingest_queue := action.get_ingest_queue();
+
+ -- ... but this is NOT a named or forced queue request (marc editor update, say, or vandelay overlay)...
+ IF queuing_force IS NULL AND ingest_queue IS NULL AND new_action = 'update' THEN -- re-ingest?
+
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
+
+ -- ... then don't do anything if ingest.reingest.force_on_same_marc is not enabled and the MARC hasn't changed
+ IF NOT FOUND AND OLD.marc = NEW.marc THEN
+ RETURN NEW;
+ END IF;
+ END IF;
+
+ -- Otherwise, attempt to enqueue
+ SELECT action.enqueue_ingest_entry( NEW.id, TG_TABLE_SCHEMA, NOW(), ingest_queue, new_action, old_state_data) INTO queuing_success;
+ END IF;
+
+ -- If queuing was not requested, or failed for some reason, do it live.
+ IF NOT queuing_success THEN
+ IF queuing_flag THEN
+ RAISE WARNING 'Enqueuing of %.record_entry % for ingest failed, attempting direct ingest', TG_TABLE_SCHEMA, NEW.id;
+ END IF;
+
+ IF new_action = 'delete' THEN
+ IF TG_TABLE_SCHEMA = 'biblio' THEN
+ SELECT metabib.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
+ ELSIF TG_TABLE_SCHEMA = 'authority' THEN
+ SELECT authority.indexing_delete(NEW.*, old_state_data) INTO ingest_success;
+ END IF;
+ ELSE
+ IF TG_TABLE_SCHEMA = 'biblio' THEN
+ SELECT metabib.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
+ ELSIF TG_TABLE_SCHEMA = 'authority' THEN
+ SELECT authority.indexing_update(NEW.*, new_action = 'insert', old_state_data) INTO ingest_success;
+ END IF;
+ END IF;
+
+ IF NOT ingest_success THEN
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.queued.abort_on_error' AND enabled;
+ IF FOUND THEN
+ RAISE EXCEPTION 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
+ ELSE
+ RAISE WARNING 'Ingest of %.record_entry % failed', TG_TABLE_SCHEMA, NEW.id;
+ END IF;
+ END IF;
+ END IF;
+
+ RETURN NEW;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+DROP TRIGGER aaa_indexing_ingest_or_delete ON biblio.record_entry;
+DROP TRIGGER aaa_auth_ingest_or_delete ON authority.record_entry;
+
+CREATE TRIGGER aaa_indexing_ingest_or_delete AFTER INSERT OR UPDATE ON biblio.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
+CREATE TRIGGER aaa_auth_ingest_or_delete AFTER INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.indexing_ingest_or_delete ();
+
+CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
+DECLARE
+ transformed_xml TEXT;
+ rmarc TEXT := prmarc;
+ tmp_val TEXT;
+ prev_xfrm TEXT;
+ normalizer RECORD;
+ xfrm config.xml_transform%ROWTYPE;
+ attr_vector INT[] := '{}'::INT[];
+ attr_vector_tmp INT[];
+ attr_list TEXT[] := pattr_list;
+ attr_value TEXT[];
+ norm_attr_value TEXT[];
+ tmp_xml TEXT;
+ tmp_array TEXT[];
+ attr_def config.record_attr_definition%ROWTYPE;
+ ccvm_row config.coded_value_map%ROWTYPE;
+ jump_past BOOL;
+BEGIN
+
+ IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
+ SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition
+ WHERE (
+ tag IS NOT NULL OR
+ fixed_field IS NOT NULL OR
+ xpath IS NOT NULL OR
+ phys_char_sf IS NOT NULL OR
+ composite
+ ) AND (
+ filter OR sorter
+ );
+ END IF;
+
+ IF rmarc IS NULL THEN
+ SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
+ END IF;
+
+ FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
+
+ jump_past := FALSE; -- This gets set when we are non-multi and have found something
+ attr_value := '{}'::TEXT[];
+ norm_attr_value := '{}'::TEXT[];
+ attr_vector_tmp := '{}'::INT[];
+
+ SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1;
+
+ IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
+ SELECT ARRAY_AGG(value) INTO attr_value
+ FROM (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
+ WHERE record = rid
+ AND tag LIKE attr_def.tag
+ AND CASE
+ WHEN attr_def.sf_list IS NOT NULL
+ THEN POSITION(subfield IN attr_def.sf_list) > 0
+ ELSE TRUE
+ END
+ GROUP BY tag
+ ORDER BY tag;
+
+ IF NOT attr_def.multi THEN
+ attr_value := ARRAY[ARRAY_TO_STRING(attr_value, COALESCE(attr_def.joiner,' '))];
+ jump_past := TRUE;
+ END IF;
+ END IF;
+
+ IF NOT jump_past AND attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
+ attr_value := attr_value || vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
+
+ IF NOT attr_def.multi THEN
+ attr_value := ARRAY[attr_value[1]];
+ jump_past := TRUE;
+ END IF;
+ END IF;
+
+ IF NOT jump_past AND attr_def.xpath IS NOT NULL THEN -- and xpath expression
+
+ SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
+
+ -- See if we can skip the XSLT ... it's expensive
+ IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
+ -- Can't skip the transform
+ IF xfrm.xslt <> '---' THEN
+ transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
+ ELSE
+ transformed_xml := rmarc;
+ END IF;
+
+ prev_xfrm := xfrm.name;
+ END IF;
+
+ IF xfrm.name IS NULL THEN
+ -- just grab the marcxml (empty) transform
+ SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
+ prev_xfrm := xfrm.name;
+ END IF;
+
+ FOR tmp_xml IN SELECT UNNEST(oils_xpath(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]])) LOOP
+ tmp_val := oils_xpath_string(
+ '//*',
+ tmp_xml,
+ COALESCE(attr_def.joiner,' '),
+ ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
+ );
+ IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
+ attr_value := attr_value || tmp_val;
+ EXIT WHEN NOT attr_def.multi;
+ END IF;
+ END LOOP;
+ END IF;
+
+ IF NOT jump_past AND attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
+ SELECT ARRAY_AGG(m.value) INTO tmp_array
+ FROM vandelay.marc21_physical_characteristics(rmarc) v
+ LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
+ WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
+ AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
+
+ attr_value := attr_value || tmp_array;
+
+ IF NOT attr_def.multi THEN
+ attr_value := ARRAY[attr_value[1]];
+ END IF;
+
+ END IF;
+
+ -- apply index normalizers to attr_value
+ FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
+ FOR normalizer IN
+ SELECT n.func AS func,
+ n.param_count AS param_count,
+ m.params AS params
+ FROM config.index_normalizer n
+ JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
+ WHERE attr = attr_def.name
+ ORDER BY m.pos LOOP
+ EXECUTE 'SELECT ' || normalizer.func || '(' ||
+ COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
+ CASE
+ WHEN normalizer.param_count > 0
+ THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
+ ELSE ''
+ END ||
+ ')' INTO tmp_val;
+
+ END LOOP;
+ IF tmp_val IS NOT NULL AND tmp_val <> '' THEN
+ -- note that a string that contains only blanks
+ -- is a valid value for some attributes
+ norm_attr_value := norm_attr_value || tmp_val;
+ END IF;
+ END LOOP;
+
+ IF attr_def.filter THEN
+ -- Create unknown uncontrolled values and find the IDs of the values
+ IF ccvm_row.id IS NULL THEN
+ FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
+ IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
+ BEGIN -- use subtransaction to isolate unique constraint violations
+ INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
+ EXCEPTION WHEN unique_violation THEN END;
+ END IF;
+ END LOOP;
+
+ SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
+ ELSE
+ SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
+ END IF;
+
+ -- Add the new value to the vector
+ attr_vector := attr_vector || attr_vector_tmp;
+ END IF;
+
+ IF attr_def.sorter THEN
+ DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
+ IF norm_attr_value[1] IS NOT NULL THEN
+ INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
+ END IF;
+ END IF;
+
+ END LOOP;
+
+/* We may need to rewrite the vlist to contain
+ the intersection of new values for requested
+ attrs and old values for ignored attrs. To
+ do this, we take the old attr vlist and
+ subtract any values that are valid for the
+ requested attrs, and then add back the new
+ set of attr values. */
+
+ IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN
+ SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
+ SELECT attr_vector_tmp - ARRAY_AGG(id::INT) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
+ attr_vector := attr_vector || attr_vector_tmp;
+ END IF;
+
+ -- On to composite attributes, now that the record attrs have been pulled. Processed in name order, so later composite
+ -- attributes can depend on earlier ones.
+ PERFORM metabib.compile_composite_attr_cache_init();
+ FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
+
+ FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
+
+ tmp_val := metabib.compile_composite_attr( ccvm_row.id );
+ CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
+
+ IF attr_def.filter THEN
+ IF attr_vector @@ tmp_val::query_int THEN
+ attr_vector = attr_vector + intset(ccvm_row.id);
+ EXIT WHEN NOT attr_def.multi;
+ END IF;
+ END IF;
+
+ IF attr_def.sorter THEN
+ IF attr_vector @@ tmp_val THEN
+ DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
+ INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
+ END IF;
+ END IF;
+
+ END LOOP;
+
+ END LOOP;
+
+ IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
+ INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector)
+ ON CONFLICT (source) DO UPDATE SET vlist = EXCLUDED.vlist;
+ END IF;
+
+END;
+
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.propagate_changes
+ (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
+DECLARE
+ queuing_success BOOL := FALSE;
+BEGIN
+
+ PERFORM 1 FROM config.global_flag
+ WHERE name IN ('ingest.queued.all','ingest.queued.authority.propagate')
+ AND enabled;
+
+ IF FOUND THEN
+ -- XXX enqueue special 'propagate' bib action
+ SELECT action.enqueue_ingest_entry( bid, 'biblio', NOW(), 'propagate', aid::TEXT) INTO queuing_success;
+
+ IF queuing_success THEN
+ RETURN aid;
+ END IF;
+ END IF;
+
+ PERFORM authority.apply_propagate_changes(aid, bid);
+ RETURN aid;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION authority.apply_propagate_changes
+ (aid BIGINT, bid BIGINT) RETURNS BIGINT AS $func$
+DECLARE
+ bib_forced BOOL := FALSE;
+ bib_rec biblio.record_entry%ROWTYPE;
+ new_marc TEXT;
+BEGIN
+
+ SELECT INTO bib_rec * FROM biblio.record_entry WHERE id = bid;
+
+ new_marc := vandelay.merge_record_xml(
+ bib_rec.marc, authority.generate_overlay_template(aid));
+
+ IF new_marc = bib_rec.marc THEN
+ -- Authority record change had no impact on this bib record.
+ -- Nothing left to do.
+ RETURN aid;
+ END IF;
+
+ PERFORM 1 FROM config.global_flag
+ WHERE name = 'ingest.disable_authority_auto_update_bib_meta'
+ AND enabled;
+
+ IF NOT FOUND THEN
+ -- update the bib record editor and edit_date
+ bib_rec.editor := (
+ SELECT editor FROM authority.record_entry WHERE id = aid);
+ bib_rec.edit_date = NOW();
+ END IF;
+
+ PERFORM action.set_queued_ingest_force('ingest.queued.biblio.update.disabled');
+
+ UPDATE biblio.record_entry SET
+ marc = new_marc,
+ editor = bib_rec.editor,
+ edit_date = bib_rec.edit_date
+ WHERE id = bid;
+
+ PERFORM action.clear_queued_ingest_force();
+
+ RETURN aid;
+
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION metabib.reingest_metabib_field_entries(
+ bib_id BIGINT,
+ skip_facet BOOL DEFAULT FALSE,
+ skip_display BOOL DEFAULT FALSE,
+ skip_browse BOOL DEFAULT FALSE,
+ skip_search BOOL DEFAULT FALSE,
+ only_fields INT[] DEFAULT '{}'::INT[]
+) RETURNS VOID AS $func$
+DECLARE
+ fclass RECORD;
+ ind_data metabib.field_entry_template%ROWTYPE;
+ mbe_row metabib.browse_entry%ROWTYPE;
+ mbe_id BIGINT;
+ b_skip_facet BOOL;
+ b_skip_display BOOL;
+ b_skip_browse BOOL;
+ b_skip_search BOOL;
+ value_prepped TEXT;
+ field_list INT[] := only_fields;
+ field_types TEXT[] := '{}'::TEXT[];
+BEGIN
+
+ IF field_list = '{}'::INT[] THEN
+ SELECT ARRAY_AGG(id) INTO field_list FROM config.metabib_field;
+ END IF;
+
+ SELECT COALESCE(NULLIF(skip_facet, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_facet_indexing' AND enabled)) INTO b_skip_facet;
+ SELECT COALESCE(NULLIF(skip_display, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_display_indexing' AND enabled)) INTO b_skip_display;
+ SELECT COALESCE(NULLIF(skip_browse, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_browse_indexing' AND enabled)) INTO b_skip_browse;
+ SELECT COALESCE(NULLIF(skip_search, FALSE), EXISTS (SELECT enabled FROM config.internal_flag WHERE name = 'ingest.skip_search_indexing' AND enabled)) INTO b_skip_search;
+
+ IF NOT b_skip_facet THEN field_types := field_types || '{facet}'; END IF;
+ IF NOT b_skip_display THEN field_types := field_types || '{display}'; END IF;
+ IF NOT b_skip_browse THEN field_types := field_types || '{browse}'; END IF;
+ IF NOT b_skip_search THEN field_types := field_types || '{search}'; END IF;
+
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.assume_inserts_only' AND enabled;
+ IF NOT FOUND THEN
+ IF NOT b_skip_search THEN
+ FOR fclass IN SELECT * FROM config.metabib_class LOOP
+ EXECUTE $$DELETE FROM metabib.$$ || fclass.name || $$_field_entry WHERE source = $$ || bib_id || $$ AND field = ANY($1)$$ USING field_list;
+ END LOOP;
+ END IF;
+ IF NOT b_skip_facet THEN
+ DELETE FROM metabib.facet_entry WHERE source = bib_id AND field = ANY(field_list);
+ END IF;
+ IF NOT b_skip_display THEN
+ DELETE FROM metabib.display_entry WHERE source = bib_id AND field = ANY(field_list);
+ END IF;
+ IF NOT b_skip_browse THEN
+ DELETE FROM metabib.browse_entry_def_map WHERE source = bib_id AND def = ANY(field_list);
+ END IF;
+ END IF;
+
+ FOR ind_data IN SELECT * FROM biblio.extract_metabib_field_entry( bib_id, ' ', field_types, field_list ) LOOP
+
+ -- don't store what has been normalized away
+ CONTINUE WHEN ind_data.value IS NULL;
+
+ IF ind_data.field < 0 THEN
+ ind_data.field = -1 * ind_data.field;
+ END IF;
+
+ IF ind_data.facet_field AND NOT b_skip_facet THEN
+ INSERT INTO metabib.facet_entry (field, source, value)
+ VALUES (ind_data.field, ind_data.source, ind_data.value);
+ END IF;
+
+ IF ind_data.display_field AND NOT b_skip_display THEN
+ INSERT INTO metabib.display_entry (field, source, value)
+ VALUES (ind_data.field, ind_data.source, ind_data.value);
+ END IF;
+
+
+ IF ind_data.browse_field AND NOT b_skip_browse THEN
+ -- A caveat about this SELECT: this should take care of replacing
+ -- old mbe rows when data changes, but not if normalization (by
+ -- which I mean specifically the output of
+ -- evergreen.oils_tsearch2()) changes. It may or may not be
+ -- expensive to add a comparison of index_vector to index_vector
+ -- to the WHERE clause below.
+
+ CONTINUE WHEN ind_data.sort_value IS NULL;
+
+ value_prepped := metabib.browse_normalize(ind_data.value, ind_data.field);
+ IF ind_data.browse_nocase THEN -- for "nocase" browse definions, look for a preexisting row that matches case-insensitively on value and use that
+ SELECT INTO mbe_row * FROM metabib.browse_entry
+ WHERE evergreen.lowercase(value) = evergreen.lowercase(value_prepped) AND sort_value = ind_data.sort_value
+ ORDER BY sort_value, value LIMIT 1; -- gotta pick something, I guess
+ END IF;
+
+ IF mbe_row.id IS NOT NULL THEN -- asked to check for, and found, a "nocase" version to use
+ mbe_id := mbe_row.id;
+ ELSE -- otherwise, an UPSERT-protected variant
+ INSERT INTO metabib.browse_entry
+ ( value, sort_value ) VALUES
+ ( value_prepped, ind_data.sort_value )
+ ON CONFLICT (sort_value, value) DO UPDATE SET sort_value = EXCLUDED.sort_value -- must update a row to return an existing id
+ RETURNING id INTO mbe_id;
+ END IF;
+
+ INSERT INTO metabib.browse_entry_def_map (entry, def, source, authority)
+ VALUES (mbe_id, ind_data.field, ind_data.source, ind_data.authority);
+ END IF;
+
+ IF ind_data.search_field AND NOT b_skip_search THEN
+ -- Avoid inserting duplicate rows
+ EXECUTE 'SELECT 1 FROM metabib.' || ind_data.field_class ||
+ '_field_entry WHERE field = $1 AND source = $2 AND value = $3'
+ INTO mbe_id USING ind_data.field, ind_data.source, ind_data.value;
+ -- RAISE NOTICE 'Search for an already matching row returned %', mbe_id;
+ IF mbe_id IS NULL THEN
+ EXECUTE $$
+ INSERT INTO metabib.$$ || ind_data.field_class || $$_field_entry (field, source, value)
+ VALUES ($$ ||
+ quote_literal(ind_data.field) || $$, $$ ||
+ quote_literal(ind_data.source) || $$, $$ ||
+ quote_literal(ind_data.value) ||
+ $$);$$;
+ END IF;
+ END IF;
+
+ END LOOP;
+
+ IF NOT b_skip_search THEN
+ PERFORM metabib.update_combined_index_vectors(bib_id);
+ PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_symspell_reification' AND enabled;
+ IF NOT FOUND THEN
+ PERFORM search.symspell_dictionary_reify();
+ END IF;
+ END IF;
+
+ RETURN;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+COMMIT;
+
--- /dev/null
+#!/usr/bin/perl
+# ---------------------------------------------------------------
+# Copyright © 2022 Equinox Open Library Initiative, INC.
+# Mike Rylander <mrylander@gmail.com>
+#
+# This program is free software; you can redistribute it and/or modify
+# it under the terms of the GNU General Public License as published by
+# the Free Software Foundation; either version 2 of the License, or
+# (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU General Public License for more details.
+# ---------------------------------------------------------------
+
+use strict;
+use warnings;
+use DBI;
+use Getopt::Long;
+use Time::HiRes qw/usleep time/;
+use IO::Handle;
+use POSIX ":sys_wait_h";
+use Socket;
+use OpenSRF::Utils ':daemon';
+
+my $raise_db_error = 0;
+# Globals for the command line options:
+
+my $opt_lockfile = '/tmp/queued-ingest-coordinator-LOCK';
+my $opt_logfile = '/tmp/queued-ingest-coordinator-LOG';
+my $daemon = 0; # do we go into the background?
+my $stop = 0; # stop a running coordinator, if the lock file is there
+my $chatty = 0; # do we yell into the void?
+my $opt_max_child; # max number of parallel worker processes
+my $max_child = 20; # max number of parallel worker processes
+my $max_child_force; # max number of parallel worker processes
+
+# for enqueuing mode
+my $start_id; # start processing at this record ID.
+my $end_id; # stop processing when this record ID is reached.
+my $opt_pipe; # Read record ids from STDIN.
+my $stats_only; # Just report on QI processing stats/progress
+my $totals_only; # Only give top-line aggregate stats, no per day breakdown
+my $stats_queues; # Provide a list of queue-specfic stats
+my $stats_since='001-01-01';# Default "since" time for stats
+my $queue; # queue id, either pre-existing or created based on params
+my $queue_type = 'biblio'; # type of records for batch enqueuing
+my $queue_why; # description for this reingest queue
+my $queue_action = 'update';# what action is happening to the records: update, insert, delete, propagate
+my $queue_state_data = ''; # State data required for queue entry processing
+my $queue_owner; # Owner of the queue
+my $queue_run_at; # Owner of the queue
+my $queue_threads; # parallelism for this queue (capped at max_child)
+my $skip_browse = 0; # Skip the browse reingest.
+my $skip_attrs = 0; # Skip the record attributes reingest.
+my $skip_search = 0; # Skip the search reingest.
+my $skip_facets = 0; # Skip the facets reingest.
+my $skip_display = 0; # Skip the display reingest.
+my $skip_full_rec = 0; # Skip the full_rec reingest.
+my $skip_authority = 0; # Skip the authority reingest.
+my $skip_luri = 0; # Skip the luri reingest.
+my $skip_mrmap = 0; # Skip the metarecord remapping.
+my $record_attrs = []; # Skip the metarecord remapping.
+my $metabib_fields = []; # Skip the metarecord remapping.
+my $input_records = []; # Records supplied via CLI switch.
+my $pingest = ''; # Special "pingest" flag, supplying an EG user name as queue owner.
+
+my $help; # show help text
+
+# Database connection options with defaults:
+my $db_user = $ENV{PGUSER} || 'evergreen';
+my $db_host = $ENV{PGHOST} || 'localhost';
+my $db_db = $ENV{PGDATABASE} || 'evergreen';
+my $db_pw = $ENV{PGPASSWORD} || 'evergreen';
+my $db_port = $ENV{PGPORT} || 5432;
+
+GetOptions(
+ 'lock-file=s' => \$opt_lockfile,
+ 'log-file=s' => \$opt_logfile,
+ 'dbuser=s' => \$db_user,
+ 'dbhost=s' => \$db_host,
+ 'dbname=s' => \$db_db,
+ 'dbpw=s' => \$db_pw,
+ 'dbport=i' => \$db_port,
+ 'max-child=i' => \$opt_max_child,
+ 'max-child-force' => \$max_child_force,
+ 'stats' => \$stats_only,
+ 'totals-only' => \$totals_only,
+ 'queue-stats' => \$stats_queues,
+ 'since=s' => \$stats_since,
+ 'queue=i' => \$queue,
+ 'queue-action=s' => \$queue_action,
+ 'queue-name=s' => \$queue_why,
+ 'queue-type=s' => \$queue_type,
+ 'queue-owner=s' => \$queue_owner,
+ 'queue-run-at=s' => \$queue_run_at,
+ 'queue-threads=i' => \$queue_threads,
+ 'queue-state-data=s'=> \$queue_state_data,
+ 'skip-browse' => \$skip_browse,
+ 'skip-attrs' => \$skip_attrs,
+ 'skip-search' => \$skip_search,
+ 'skip-facets' => \$skip_facets,
+ 'skip-display' => \$skip_display,
+ 'skip-full_rec' => \$skip_full_rec,
+ 'skip-authority' => \$skip_authority,
+ 'skip-luri' => \$skip_luri,
+ 'skip-mr-map' => \$skip_mrmap,
+ 'attr=s@' => \$record_attrs,
+ 'field=s@' => \$metabib_fields,
+ 'record=s@' => \$input_records,
+ 'start-id=i' => \$start_id,
+ 'end-id=i' => \$end_id,
+ 'pipe' => \$opt_pipe,
+ 'pingest=s' => \$pingest,
+ 'coordinator' => \$daemon,
+ 'stop' => \$stop,
+ 'chatty' => \$chatty,
+ 'help' => \$help
+) or help();
+
+sub help {
+ print <<HELP;
+
+ # Enqueue records 1-500000 for reingest later, just one worker for the queue
+ $0 --queue-threads 1 \
+ --queue-type biblio \
+ --queue-run-at tomorrow \
+ --queue-owner admin \
+ --queue-name "slowly updating records due to new RDA attributes"
+ --start-id 1 --end-id 500000
+
+ # Start the background worker
+ $0 --coordinator --max-child $max_child
+
+ # Stop the background worker
+ $0 --coordinator --stop
+
+ # Process whatever you can Right Now
+ $0 --max-child $max_child
+
+ # Process a single queue Right Now
+ $0 --queue 1234 --max-child $max_child
+
+ # Stats on Queued Ingest processing so far today
+ $0 --stats --since today --totals-only
+
+ General options
+
+ --help
+ Show this help text.
+
+ --queue
+ Id of an existing queue to report, process, or enqueue into
+
+
+ Reporting options
+
+ --stats
+ Request statistical information about Queued Ingest processing.
+ This option is required for other report options to work.
+
+ --totals-only
+ Only present aggregate total statistics for the reported time
+ rather than a daily breakdown.
+
+ --since
+ Limit statistics to processing that was scheduled to happen
+ at or after this timestamp. Normal PostgreSQL shorthand for
+ timestamps are allowed, such as "today" or "yesterday".
+ Default: no limit
+
+ --queue-stats
+ Provide a per-queue breakdown of processing statistics for
+ the requested time.
+
+ --queue
+ ID of a queue about which to a breakdown of processing
+ statistics.
+
+
+ Processing options
+
+ --coordinator
+ Start the background watcher.
+ This option conficts with --pipe, --start-id, and --end-id.
+
+ --max-child
+ Max number of database workers to use for entries without a
+ named queue, or when enqueuing to a named queue, the number
+ of database workers to use for queue processing.
+
+
+Enqueuing options
+
+ --queue-name
+ Name for a new queue
+
+ --queue-type
+ Type of records to be enqueued; biblio or authority
+ Default: biblio
+
+ --queue-action
+ Action triggering the record queuing; insert, update, delete,
+ propagate
+ Default: update
+
+ --queue-owner
+ User name of the owner of a new queue
+
+ --queue-run-at
+ ISO timestamp at which a queue should begin processing
+ Default: NOW
+
+ --queue-threads
+ Processing concurrency for a new queue, capped at $max_child
+ Default: 1
+
+ --queue-state-data
+ Any state data required for queue entry processing. For
+ instance, the authority record ID when enqueuing a bib
+ record for authority propagation with the "propagate"
+ action.
+
+ ** If none of the name, owner, or run-at options are specified,
+ records will be enqueued for processing without a named ingest
+ queue.
+
+ --start-id
+ Start processing at this record ID.
+
+ --end-id
+ Stop processing when this record ID is reached
+
+ --pipe
+ Read record IDs to reingest from standard input.
+ This option conflicts with --start-id and/or --end-id.
+
+HELP
+ exit;
+}
+
+help() if $help;
+
+my $dsn = "dbi:Pg:dbname=$db_db;host=$db_host;port=$db_port;application_name='queued_ingest';sslmode=allow";
+
+my $main_dbh = DBI->connect(
+ $dsn, $db_user, $db_pw,
+ { AutoCommit => 1,
+ pg_expand_array => 0,
+ pg_enable_utf8 => 1,
+ pg_bool_tf => 0,
+ RaiseError => $raise_db_error
+ }
+) || die "Could not connect to the database\n";
+
+my $configured_max_child = $main_dbh->selectcol_arrayref(
+ "SELECT value FROM config.global_flag WHERE name = 'ingest.queued.max_threads'"
+)->[0] || 20;
+$max_child = $configured_max_child if (!$opt_max_child);
+
+
+if (defined($opt_max_child) && $opt_max_child > 20 && !$max_child_force) {
+ warn('Max Child > 20 ... no, sorry');
+ help();
+}
+
+if ($opt_max_child) {
+ $max_child = $opt_max_child;
+}
+
+if ($max_child <= 0) {
+ $max_child = 20;
+}
+
+if ($opt_pipe && ($start_id || $end_id)) {
+ warn('Mutually exclusive options: either pipe or start/end range');
+ help();
+}
+
+if ($daemon && ($start_id || $end_id || $opt_pipe)) {
+ warn('Mutually exclusive options: cannot start or stop the Coordinator in Enqueuing mode');
+ help();
+}
+
+if (!$daemon && $stop) {
+ warn('Option --stop can only be used with the --coordinator option');
+ help();
+}
+
+if ($daemon && $queue) {
+ warn('Mutually exclusive options: cannot start or stop the Coordinator in one-shot processing mode');
+ help();
+}
+
+if ($queue_type && !(grep {$_ eq $queue_type} qw/biblio authority/)) {
+ warn('Valid queue types are biblio and authority');
+ help();
+}
+
+if (!(grep {$_ eq $queue_action} qw/insert update delete propagate/)) {
+ warn('Valid queue actions are: insert, update, delete, propagate');
+ help();
+}
+
+if ($queue && ($queue_owner || $queue_why || $queue_threads || $queue_run_at)) {
+ warn('Mutually exclusive options: specify a queue id OR queue creation values');
+ help();
+}
+
+
+if ($daemon) { # background mode, we need a lockfile;
+
+ if ($stop) {
+ die "Lockfile $opt_lockfile does not exist, is the coordinator running?\n" unless (-e $opt_lockfile);
+
+ open(F, "<$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for reading, wrong user?\n";
+ my $old_pid = <F>;
+ close F;
+
+ if ($old_pid) {
+ if (kill(0,$old_pid)) {
+ my $dead_count = kill(9,$old_pid);
+ if ($dead_count) {
+ warn "Coordinator process terminated, removing lock file $opt_lockfile\n";
+ unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
+ } else {
+ die "Could not kill coordinator process $old_pid\n";
+ }
+ } else {
+ warn "Coordinator process not running, removing stale lock file\n";
+ unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
+ }
+ } else {
+ warn "Coordinator lock file empty, removing lock file $opt_lockfile\n";
+ unlink($opt_lockfile) or die "Could not remove lock file $opt_lockfile\n";
+ }
+
+ exit;
+ }
+
+ # check the lockfile
+ die "I'm already running with lock-file $opt_lockfile\n" if (-e $opt_lockfile);
+
+ $main_dbh->disconnect;
+
+ daemonize("Queued Ingest Coordinator") if ($daemon);
+
+ # set the lockfile
+ open(F, ">$opt_lockfile") or die "Unable to open lockfile $opt_lockfile for writing\n";
+ print F $$;
+ close F;
+
+ open(STDERR, ">>$opt_logfile") if ($opt_logfile);
+}
+
+my $start_time = time;
+my %stats;
+
+sub reset_stats {
+ %stats = (
+ total => {
+ }, biblio => {
+ insert => {},
+ update => {},
+ delete => {},
+ propagate => {}
+ }, authority => {
+ insert => {},
+ update => {},
+ delete => {}
+ }, seconds => {}
+ );
+}
+
+reset_stats();
+
+my %processors;
+my %queues_in_progress;
+
+my $db_connections_in_use = 0;
+
+if ($start_id || $end_id || $opt_pipe || @$input_records) { # enqueuing mode
+
+ if ($pingest) { # special mode that sets up two queues that can run in parallel
+
+ my $no_browse = $skip_browse;
+ my $orig_stat_data = $queue_state_data;
+
+ # set up the first queue
+ $queue = undef;
+ $queue_threads //= 4;
+ $queue_type = 'biblio';
+ $queue_action = 'update';
+ $queue_why = 'pingest - fields and attributes queue';
+ $queue_owner = $pingest;
+
+ # for pingest mode, always skip authority and luri, and skip browse in the first queue
+ $skip_browse = 1;
+ $skip_authority = 1;
+ $skip_luri = 1;
+
+ my $record_list = enqueue_input();
+ report_stats('Enqueuing '.$queue_why);
+
+ if (!$no_browse and @$record_list) { # user didn't ask to skip browse reingest
+ # set up the second queue
+ $queue = undef;
+ $queue_threads //= 4;
+ $queue_why = 'pingest - browse queue';
+ $queue_state_data = $orig_stat_data;
+
+ $skip_browse = 0;
+ $skip_attrs = 1;
+ $skip_search = 1;
+ $skip_facets = 1;
+ $skip_display = 1;
+ $skip_full_rec = 1;
+ $skip_mrmap = 1;
+
+ reset_stats();
+
+ enqueue_input($record_list);
+ report_stats('Enqueuing '.$queue_why);
+ }
+
+ } else { # just a regular, user-defined QI request
+ enqueue_input();
+ report_stats('Enqueuing');
+ }
+
+
+} elsif ($queue && !$stats_only) { # single queue processing mode
+
+ my $q = gather_one_queue($queue);
+ process_one_queue($q->{id}, $max_child);
+ complete_outstanding_queue($q->{id});
+ report_stats('Queue Processing');
+
+} elsif (!$daemon && !$stats_only) { # special case: foreground single process, end after
+
+ my @dbhs = create_processor_dbhs($max_child);
+ my $clear = 0;
+
+ my $new_queues = gather_outstanding_queues(); # array ref of queues
+ for my $q (@$new_queues) {
+ process_one_queue($q->{id}, $max_child, \@dbhs);
+ complete_outstanding_queue($q->{id});
+ report_stats('Queue and Entry Processing', $clear++);
+ }
+
+ my $new_entries = gather_outstanding_nonqueue_entries('NULL'); # array ref of queue entries
+ my @eids = map { $$_{id} } @$new_entries;
+ while (my @current_subset = splice(@eids, 0, 10 * $max_child)) {
+ process_input_list(\@current_subset, \@dbhs);
+ report_stats('Queue and Entry Processing', $clear++);
+ }
+
+} elsif($stats_only) {
+ my @output;
+
+ my $q_query = <<" SQL";
+SELECT run_at::DATE AS scheduled_date,
+ SUM((start_time IS NULL)::INT) AS pending,
+ SUM((start_time IS NOT NULL AND end_time IS NULL)::INT) AS ongoing,
+ SUM((end_time IS NOT NULL)::INT) AS complete,
+ COUNT(*) AS total
+ FROM action.ingest_queue
+ WHERE run_at >= ?
+ GROUP BY ROLLUP (1)
+ ORDER BY 1
+ SQL
+
+ if (!$queue) { # all queues in the time range
+ my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $stats_since);
+ if (@$qstat_rows > 1) {
+ $qstat_rows = [ $$qstat_rows[-1] ] if ($totals_only);
+
+ push @output, "Named Queue processing stats";
+ push @output, "============================","";
+ for my $row ( @$qstat_rows ) {
+ push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
+ push @output, " Totals"," ------" unless ($$row{scheduled_date});
+
+ push @output, " Pending: $$row{pending}";
+ push @output, " Ongoing: $$row{ongoing}";
+ push @output, " Complete: $$row{complete}";
+
+ push @output,"";
+ push @output,'-'x50;
+ push @output,"";
+ }
+
+ push @output,"";
+ }
+ }
+
+ if ($stats_queues || $queue) {
+ $q_query = <<" SQL";
+SELECT q.*,
+ u.usrname,
+ SUM((e.ingest_time IS NULL AND e.override_by IS NULL)::INT) AS pending,
+ SUM((e.override_by IS NOT NULL)::INT) AS overridden,
+ SUM((e.ingest_time IS NOT NULL)::INT) AS complete,
+ SUM((e.fail_time IS NOT NULL)::INT) AS failed,
+ COUNT(e.id) AS events
+ FROM action.ingest_queue q
+ JOIN actor.usr u ON (q.who=u.id)
+ LEFT JOIN action.ingest_queue_entry e ON (e.queue=q.id)
+ WHERE q.XX ?
+ GROUP BY 1,2,3,4,5,6,7,8,9
+ ORDER BY q.run_at, q.id
+ SQL
+
+ my $param = $stats_since;
+ if ($queue) {
+ $param = $queue;
+ $q_query =~ s/XX/id =/;
+ } else {
+ $q_query =~ s/XX/run_at >=/;
+ }
+
+ my $qstat_rows = $main_dbh->selectall_arrayref($q_query, { Slice => {} }, $param);
+ if (@$qstat_rows) {
+
+ push @output, "Named Queue details";
+ push @output, "===================","";
+ for my $row ( @$qstat_rows ) {
+ push @output, "* Queue id: $$row{id} | Threads: $$row{threads} | Owner: $$row{usrname}";
+ push @output, "* Reason: $$row{why}";
+ push @output, "* Create time: $$row{created}";
+ push @output, "* Scheduled start time: $$row{run_at}";
+ push @output, " - Started: $$row{start_time}";
+ push @output, " - Ended : $$row{end_time}","";
+ push @output, " Pending: $$row{pending}";
+ push @output, " Overridden: $$row{overridden}";
+ push @output, " Complete: $$row{complete}";
+ push @output, " Failed: $$row{failed}";
+ push @output, " Total: $$row{events}";
+ push @output, "Percent complete: " . sprintf('%.2f',(($$row{complete} + 1.0) / ($$row{events} - $$row{failed} + 1.0)) * 100.0);
+
+ push @output,"";
+ push @output,'-'x50;
+ push @output,"";
+ }
+
+ push @output,"";
+ }
+ }
+
+ if (!$queue) {
+ my $e_query = <<" SQL";
+SELECT run_at::DATE AS scheduled_date,
+ record_type,
+ action,
+ SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NOT NULL)::INT) AS pending_with_queue,
+ SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL AND queue IS NULL)::INT) AS pending_without_queue,
+ SUM((ingest_time IS NULL AND fail_time IS NULL AND override_by IS NULL)::INT) AS pending,
+ SUM((ingest_time IS NOT NULL AND override_by IS NULL)::INT) AS processed,
+ SUM((ingest_time IS NOT NULL AND override_by IS NOT NULL)::INT) AS overridden,
+ SUM((fail_time IS NOT NULL)::INT) AS failed,
+ SUM((ingest_time IS NOT NULL)::INT) AS complete,
+ COUNT(*) AS total
+ FROM action.ingest_queue_entry
+ WHERE run_at >= ?
+ GROUP BY 2,3, ROLLUP (1)
+ ORDER BY 1,2,3
+ SQL
+
+ my $estat_rows = $main_dbh->selectall_arrayref($e_query, { Slice => {} }, $stats_since);
+ if (@$estat_rows > 1) {
+ $estat_rows = [ grep { !defined($$_{scheduled_date}) } @$estat_rows ] if ($totals_only);
+
+ push @output, "Record processing stats";
+ push @output, "============================","";
+ for my $row ( @$estat_rows ) {
+ push @output, "* Scheduled processing date: $$row{scheduled_date}" if ($$row{scheduled_date});
+ push @output, " Record Type: $$row{record_type}; Action: $$row{action}";
+ push @output, " Totals"," ------" unless ($$row{scheduled_date});
+
+ push @output, " Pending: $$row{pending}";
+ push @output, " ... $$row{pending_with_queue} with a named queue" if $$row{pending};
+ push @output, " ... $$row{pending_without_queue} without a named queue" if $$row{pending};
+ push @output, " Processed: $$row{processed}";
+ push @output, " Overridden: $$row{overridden}";
+ push @output, " Complete: $$row{complete}";
+ push @output, " Failed: $$row{failed}";
+ push @output, " Total: $$row{total}";
+
+ push @output,"";
+ push @output,'-'x50;
+ push @output,"";
+ }
+ }
+
+ push @output,"";
+ }
+
+ print join("\n", @output);
+
+} elsif($daemon) { # background processing
+
+ $SIG{CHLD} = \&REAPER;
+
+ my %queues_in_waiting;
+ my @orphan_entries;
+
+ my $pid = spawn_kid();
+ my $orphan_processor = $processors{$pid};
+ $orphan_processor->{state} = 0; # will be decremented (made true) once used
+ print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
+
+ refresh_main_dbh()->do('LISTEN queued_ingest');
+
+ my $loops = 1;
+ while ($loops) {
+
+ warn_if_chatty("starting processing loop");
+
+ my @complete_processors = grep { $_->{state} == 3 } values %processors;
+ warn_if_chatty("".scalar(@complete_processors)." complete for cleanup");
+ for my $p (@complete_processors) {
+
+ if (my $dead_pid = $$p{DEAD}) {
+ warn_if_chatty("processor $dead_pid already cleaned up, final flush");
+ delete $processors{$dead_pid}{$_} for keys %{$processors{$dead_pid}};
+ delete $processors{$dead_pid};
+ next;
+ }
+
+ $$p{DEAD} = $$p{pid};
+ warn_if_chatty("phase 1 cleanup of processor $$p{DEAD}");
+ $db_connections_in_use -= $$p{threads};
+
+ if ($$p{queues}) {
+ for my $q (keys %{$$p{queues}}) {
+ warn_if_chatty("processor $$p{pid} finished processing queue $q");
+ delete $queues_in_progress{$q}{processors}{$$p{pid}};
+ if (!scalar(keys(%{$queues_in_progress{$q}{processors}}))) { # that was the last processor for the queue
+ warn_if_chatty("queue $q complete");
+ complete_outstanding_queue($q);
+ delete $queues_in_progress{$q};
+ delete $$p{queues}{$q};
+ }
+ }
+ }
+ }
+
+ warn_if_chatty("".scalar(keys(%queues_in_progress))." queues in progress");
+ warn_if_chatty("checking for new queues");
+
+ my $new_queues = gather_outstanding_queues(); # array ref of queues
+ if (@$new_queues) {
+ warn_if_chatty("".scalar(@$new_queues)." new queues");
+ $queues_in_waiting{$_->{id}} = $_ for @$new_queues;
+
+ my @ready_kids = grep { $_->{state} == 1 } values %processors;
+ if (my $needed = scalar(@$new_queues) - scalar(@ready_kids)) {
+ warn_if_chatty("spawning $needed new processors");
+ spawn_kid() while ($needed--);
+ }
+
+ @ready_kids = grep { $_->{state} == 1 } values %processors;
+
+ my @sorted_queues = sort {$$a{run_at} cmp $$b{run_at}} values %queues_in_waiting;
+ for my $q (@sorted_queues) {
+
+ my $local_max = $max_child - $db_connections_in_use;
+ if ($local_max > 0) { # we have connections available
+ my $ready = shift @ready_kids;
+ next unless $ready;
+
+ # cap at unused max if more threads were requested
+ $$q{threads} = $local_max if ($$q{threads} > $local_max);
+ $ready->{threads} = $$q{threads};
+ $ready->{state} = 2; # running now
+ $ready->{queues}{$$q{id}} = 1;
+
+ $queues_in_progress{$$q{id}} = delete $queues_in_waiting{$$q{id}};
+ $queues_in_progress{$$q{id}}{processors}{$ready->{pid}} = 1;
+
+ $db_connections_in_use += $$q{threads};
+ print {$ready->{pipe}} "n:Queue $$q{id}\n";
+ print {$ready->{pipe}} "p:$$q{threads}\n";
+ print {$ready->{pipe}} "q:$$q{id}\n";
+ shutdown($ready->{pipe},2);
+ } else {
+ warn_if_chatty("no db connections available, we'll wait");
+ }
+ }
+ }
+
+ warn_if_chatty("checking orphan processor");
+
+ if ($orphan_processor->{state} == 3) { # replace it
+
+ warn_if_chatty("replacing orphan processor, it is finished");
+
+ $pid = spawn_kid();
+ $orphan_processor = $processors{$pid};
+ $orphan_processor->{state} = 0; # will be decremented (made true) once used
+ print {$orphan_processor->{pipe}} "n:Orphan processor\n"; # set parallelism
+ }
+
+ warn_if_chatty("gathering orphan entries");
+
+ my $new_entries = gather_outstanding_nonqueue_entries(10 * $max_child); # array ref of queue entries
+ if ($new_entries and @$new_entries) {
+ warn_if_chatty("".scalar(@$new_entries)." new entries");
+ if ($orphan_processor->{state}) { # already processing some entries
+ warn_if_chatty("orphan processor is busy, wait and loop");
+ } else {
+ my $local_max = $max_child - $db_connections_in_use;
+ if ($local_max > 0) {
+ $orphan_processor->{state}--;
+ $db_connections_in_use += $local_max;
+ $orphan_processor->{threads} = $local_max;
+ $orphan_processor->{entries} = [ map { $$_{id} } @$new_entries ];
+ print {$orphan_processor->{pipe}} "p:$local_max\n"; # set parallelism
+ print {$orphan_processor->{pipe}} "e:$_\n" for (@{$orphan_processor->{entries}});
+ print {$orphan_processor->{pipe}} "!!\n";
+ shutdown($orphan_processor->{pipe},2);
+ } else {
+ warn_if_chatty("no db connections available for the orphan processor, wait and loop");
+ }
+ }
+ }
+
+ my $inner_loops = 0;
+ warn_if_chatty("waiting for LISTEN notifications");
+ until (defined(refresh_main_dbh()->pg_notifies)) {
+ sleep(1);
+ $inner_loops++;
+ if ($inner_loops >= 10) {
+ last; # loop after 10s at most
+ }
+ }
+
+ $loops++;
+ }
+}
+
+exit;
+
+sub refresh_main_dbh {
+ unless ($main_dbh->ping) {
+ $main_dbh = DBI->connect(
+ $dsn, $db_user, $db_pw,
+ { AutoCommit => 1,
+ pg_expand_array => 0,
+ pg_enable_utf8 => 1,
+ pg_bool_tf => 0,
+ RaiseError => $raise_db_error
+ }
+ ) || die "Could not connect to the database\n";
+ $main_dbh->do('LISTEN queued_ingest') if ($daemon);
+ }
+ return $main_dbh;
+}
+
+sub REAPER {
+ local $!; # don't let waitpid() overwrite current error
+ warn_if_chatty("REAPER called");
+ while ((my $pid = waitpid(-1, WNOHANG)) > 0) {
+ warn_if_chatty("reaping kid $pid");
+ $processors{$pid}{state} = 3;
+ }
+ $SIG{CHLD} = \&REAPER;
+}
+
+sub create_processor_dbhs {
+ my $count = shift;
+
+ my @dbhs;
+ while (scalar(@dbhs) < $count) {
+ push @dbhs, DBI->connect(
+ $dsn, $db_user, $db_pw,
+ { AutoCommit => 1,
+ pg_expand_array => 0,
+ pg_enable_utf8 => 1,
+ pg_bool_tf => 0,
+ RaiseError => $raise_db_error
+ }
+ );
+ }
+
+ return @dbhs;
+}
+
+sub complete_outstanding_queue {
+ my $qid = shift;
+
+ return refresh_main_dbh()->do(
+ 'UPDATE action.ingest_queue SET end_time = NOW()'.
+ ' WHERE id=? RETURNING *',
+ {}, $qid
+ );
+}
+
+sub gather_one_queue {
+ my $qid = shift;
+ my $q = refresh_main_dbh()->selectrow_hashref(
+ 'UPDATE action.ingest_queue SET start_time = NOW()'.
+ ' WHERE id = ? RETURNING *',
+ {},$qid
+ );
+
+ return $q;
+}
+
+sub gather_outstanding_queues {
+ my $qs = refresh_main_dbh()->selectall_hashref(
+ 'UPDATE action.ingest_queue SET start_time = NOW()'.
+ ' WHERE start_time IS NULL AND run_at <= NOW()'.
+ ' RETURNING *',
+ 'id'
+ );
+
+ for my $q (values %$qs) {
+ $q->{threads} ||= 1;
+ }
+
+ return [values %$qs];
+}
+
+sub gather_outstanding_nonqueue_entries {
+ my $limit = shift;
+ return refresh_main_dbh()->selectall_arrayref(
+ "SELECT * FROM action.ingest_queue_entry".
+ " WHERE queue IS NULL".
+ " AND run_at <= NOW()".
+ " AND override_by IS NULL".
+ " AND ingest_time IS NULL".
+ " AND fail_time IS NULL".
+ " ORDER BY run_at, id".
+ " LIMIT $limit",
+ { Slice => {} }
+ );
+}
+
+sub spawn_kid {
+ my $parent;
+ my $child;
+
+ socketpair($child, $parent, AF_UNIX, SOCK_STREAM, PF_UNSPEC)
+ || die "socketpair: $!";
+
+ $parent->autoflush(1);
+ $child->autoflush(1);
+
+ my $kid_pid = fork() //
+ die "Could not fork worker process";
+
+ if ($kid_pid) {
+ close($parent);
+ $processors{$kid_pid} = {
+ pipe => $child,
+ pid => $kid_pid,
+ state => 1
+ };
+ } else {
+ set_psname("Queued Ingest worker - waiting");
+ open(STDERR, ">>$opt_logfile") if ($opt_logfile);
+ $SIG{CHLD} = 'IGNORE';
+ close($child);
+ process_commands_from_parent($parent);
+ warn_if_chatty("finished processing commands from parent, exiting");
+ report_stats("Entry Processing - worker $$",0,1);
+ exit;
+ }
+
+ return $kid_pid;
+}
+
+sub warn_if_chatty {
+ return unless $chatty;
+
+ my $msg = shift;
+ my $time = time;
+ warn "$time [$$] $msg\n";
+}
+
+sub process_commands_from_parent {
+ my $ppipe = shift;
+
+ my $stop_after = 0;
+ while (1) {
+ my @input;
+ my $dbh_count = 1;
+ my $cont = 0;
+ while (<$ppipe>) {
+ $cont = 1;
+ chomp;
+ if (/^q:(\d+)$/) { # we were passed a queue id
+ my $queue = $1;
+ $stop_after = 1;
+ warn_if_chatty("processing queue $queue, should exit after");
+ process_one_queue($queue,$dbh_count);
+ warn_if_chatty("processing queue $queue complete");
+ } elsif (/^n:(.+)$/) { # we were passed a process name
+ set_psname("Queued Ingest worker - ". $1);
+ } elsif (/^p:(\d+)$/) { # we were passed a dbh count (parallelism)
+ $dbh_count = $1 || 1;
+ warn_if_chatty("parallelism set to $dbh_count");
+ } elsif (/^e:(\d+)$/) { # we were passed an entry id
+ my $entry = $1;
+ push @input, $entry;
+ } elsif (/^##$/) { # This is the "process those, but then wait" command
+ last;
+ } elsif (/^!!$/) { # This is the "end of input, process and exit" command
+ warn_if_chatty("end of the command stream, should exit after");
+ $stop_after = 1;
+ last;
+ }
+ }
+
+ # we have a list of entries to process
+ if (my $icount = scalar(@input)) {
+ my @dbhs = create_processor_dbhs($dbh_count);
+ warn_if_chatty("processing $icount entries...");
+ process_input_list(\@input, \@dbhs);
+ warn_if_chatty("processing $icount entries complete");
+ }
+
+ last if $stop_after || !$cont;
+ }
+
+ close($ppipe);
+}
+
+sub process_one_queue {
+ my $qid = shift;
+ my $dbh_count = shift || 1;
+ my $dbh_list = shift;
+
+ return unless $qid;
+
+ my @dbhs = $dbh_list ? @$dbh_list : create_processor_dbhs($dbh_count);
+ my @input = @{$dbhs[0]->selectcol_arrayref(
+ 'SELECT id FROM action.ingest_queue_entry'.
+ ' WHERE queue = ?'.
+ ' AND override_by IS NULL'.
+ ' AND ingest_time IS NULL'.
+ ' AND fail_time IS NULL',
+ {}, $qid
+ )};
+
+ return unless @input;
+ return process_input_list(\@input, \@dbhs);
+}
+
+sub process_entry_begin {
+ my $entry_id = shift;
+ my $ms = shift;
+ my $dbh = shift;
+
+
+ my $entry = $dbh->selectrow_hashref(
+ "SELECT * FROM action.ingest_queue_entry WHERE id = ?", undef, $entry_id
+ );
+
+ if (!$$entry{id}) {
+ return $dbh;
+ }
+
+ my $sth = $dbh->prepare(
+ "SELECT action.process_ingest_queue_entry(?)", {pg_async => 1}
+ );
+
+ if (!$sth->execute($entry_id)) {
+ $stats{total}{fail}++;
+ $stats{$$entry{record_type}}{$$entry{action}}{fail}++;
+
+ my $current_second = CORE::time;
+ $stats{seconds}{$current_second}{fail}++;
+ $stats{seconds}{$current_second}{total}++;
+
+ return $dbh;
+ }
+
+ $$ms{$entry_id} = {
+ entry => $entry,
+ dbh => $dbh,
+ sth => $sth
+ };
+
+ return undef;
+}
+
+sub process_entry_complete {
+ my $eid = shift;
+ my $ms = shift;
+
+ if ($$ms{$eid}{sth}->pg_ready) {
+ $$ms{$eid}{sth}->pg_result;
+ my ($success) = $$ms{$eid}{sth}->fetchrow_array;
+ $$ms{$eid}{sth}->finish;
+
+ $success = $success ? 'success' : 'fail';
+ $stats{total}{$success}++;
+ $stats{$$ms{$eid}{entry}{record_type}}{$$ms{$eid}{entry}{action}}{$success}++;
+
+ my $current_second = CORE::time;
+ $stats{seconds}{$current_second}{$success}++;
+ $stats{seconds}{$current_second}{total}++;
+
+ my $dbh = delete $$ms{$eid}{dbh};
+ delete $$ms{$eid}{$_} for keys %{$$ms{$eid}};
+ delete $$ms{$eid};
+ return $dbh;
+ }
+
+ return undef;
+}
+
+sub process_input_list {
+ my $input = shift;
+ my $dbhs = shift;
+
+ my %microstate;
+ while (@$input || keys(%microstate)) {
+
+ # do we have an idle worker, and work to do?
+ if (@$input && scalar(@$dbhs)) {
+ my $entry_id = shift @$input;
+ my $dbh = shift @$dbhs;
+ my $failed_dbh = process_entry_begin($entry_id, \%microstate, $dbh);
+ if ($failed_dbh) {
+ push @$dbhs, $failed_dbh;
+ next;
+ }
+ }
+
+ # look at them in ascending order, as the oldest will have
+ # been running the longest and is more likely to be finished
+ my @entries = sort {$a <=> $b} keys %microstate;
+ for my $eid (@entries) {
+ my $success_dbh = process_entry_complete($eid, \%microstate);
+ if ($success_dbh) {
+ push @$dbhs, $success_dbh;
+ }
+ }
+
+ usleep(10000) if (keys %microstate); # ~0.01 seconds
+ }
+
+ return $dbhs;
+}
+
+sub report_stats {
+ my $label = shift;
+ my $clear = shift;
+ my $warn = shift;
+ my $runtime = time - $start_time;
+ my @seconds_list = sort keys %{$stats{seconds}};
+ my $first_second = $seconds_list[0];
+ my $last_second = $seconds_list[-1];
+ my $processing_seconds = ($last_second - $first_second) + 1.0;
+
+ system('clear') if $clear;
+
+ print "$label stats:\n" unless $warn;
+ warn "$label stats:\n" if $warn;
+ for my $type (qw/biblio authority/) {
+ for my $action ( sort keys %{$stats{$type}} ) {
+ my $part = $stats{$type}{$action};
+ next unless (defined($$part{success}) || defined($$part{fail}));
+
+ $$part{success} //= 0;
+ $$part{fail} //= 0;
+
+ my $subtotal = $$part{success} + $$part{fail};
+ print " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" unless $warn;
+ warn " * Type: $type\n - Action: $action :: $$part{success}/$$part{fail}/$subtotal\n" if $warn;
+ }
+ }
+
+ $stats{total}{success} //= 0;
+ $stats{total}{fail} //= 0;
+
+ my $per_sec = ($stats{total}{success} + $stats{total}{fail}) / $processing_seconds;
+
+ print " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
+ " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" unless $warn;
+ warn " * Timing: $runtime runtime, $processing_seconds seconds processing, $per_sec per second\n".
+ " * Total: $stats{total}{success} succeeded, $stats{total}{fail} failed\n" if $warn;
+}
+
+sub enqueue_input {
+ my $predestined_input = shift;
+ my @input;
+
+ if ($predestined_input and @$predestined_input) {
+ @input = @$predestined_input;
+ } elsif ($opt_pipe) {
+ while (<STDIN>) {
+ # Assume any string of digits is an id.
+ if (my @subs = /([0-9]+)/g) {
+ push(@input, @subs);
+ }
+ }
+ } elsif (@$input_records) {
+ @input = grep { /^\d+$/ } @$input_records;
+ } else {
+ my $q = "SELECT id FROM $queue_type.record_entry WHERE NOT DELETED";
+ if ($start_id && $end_id) {
+ $q .= " AND id BETWEEN $start_id AND $end_id";
+ } elsif ($start_id) {
+ $q .= " AND id >= $start_id";
+ } elsif ($end_id) {
+ $q .= " AND id <= $end_id";
+ }
+ $q .= ' ORDER BY id ASC';
+
+ @input = @{$main_dbh->selectcol_arrayref($q)};
+ }
+
+ $main_dbh->begin_work;
+ if ($queue_why || $queue_threads || $queue_run_at) {
+ my $rows = $main_dbh->do(
+ 'INSERT INTO action.ingest_queue (why,threads,run_at) VALUES(?,?,?)',
+ undef, $queue_why, $queue_threads || 1, $queue_run_at || 'NOW'
+ );
+ if ($rows && $queue_owner) {
+ $queue = $main_dbh->last_insert_id(undef,'action','ingest_queue',undef);
+ if ($queue && $queue_owner) {
+ my $uid = $main_dbh->selectcol_arrayref(
+ 'SELECT id FROM actor.usr WHERE usrname = ?',
+ undef, $queue_owner
+ )->[0];
+ if ($uid) {
+ $rows = $main_dbh->do(
+ 'UPDATE action.ingest_queue SET who = ? WHERE id = ?',
+ undef, $uid, $queue
+ );
+ }
+ }
+ }
+ }
+
+ my $q_obj = {};
+ if ($queue) {
+ $q_obj = $main_dbh->selectrow_hashref("SELECT * FROM action.ingest_queue WHERE id=$queue") || {};
+ }
+ $queue = $q_obj->{id} || '0';
+
+ if ($queue_type eq 'biblio' and $queue_action eq 'update') {
+ $queue_state_data .= ';skip_browse' if $skip_browse;
+ $queue_state_data .= ';skip_attrs' if $skip_attrs;
+ $queue_state_data .= ';skip_search' if $skip_search;
+ $queue_state_data .= ';skip_facets' if $skip_facets;
+ $queue_state_data .= ';skip_display' if $skip_display;
+ $queue_state_data .= ';skip_full_rec' if $skip_full_rec;
+ $queue_state_data .= ';skip_authority' if $skip_authority;
+ $queue_state_data .= ';skip_luri' if $skip_luri;
+ $queue_state_data .= ';skip_mrmap' if $skip_mrmap;
+
+ $queue_state_data .= ';attr_list('.join(',',@$record_attrs).')' if @$record_attrs;
+ $queue_state_data .= ';field_list('.join(',',@$metabib_fields).')' if @$metabib_fields;
+ }
+
+ my $qid = $q_obj->{id};
+ my $run_at = $q_obj->{run_at} || 'NOW';
+ for my $rid (@input) {
+ my $success = $main_dbh->selectcol_arrayref(
+ 'SELECT action.enqueue_ingest_entry(?, ?, ?, ?, ?, ?)',
+ {},
+ $rid, $queue_type, $run_at, $qid, $queue_action, $queue_state_data
+ )->[0];
+
+ my $update_key = $success ? 'success' : 'fail';
+ $stats{total}{$update_key}++;
+ $stats{$queue_type}{$queue_action}{$update_key}++;
+
+ my $current_second = CORE::time;
+ $stats{seconds}{$current_second}{$update_key}++;
+ $stats{seconds}{$current_second}{total}++;
+ }
+ $main_dbh->commit;
+
+ return \@input;
+}
+