Provisional upgrade script for MVF
authorMike Rylander <mrylander@gmail.com>
Wed, 15 Jan 2014 21:48:13 +0000 (16:48 -0500)
committerMike Rylander <mrylander@gmail.com>
Wed, 29 Jan 2014 18:21:22 +0000 (13:21 -0500)
Signed-off-by: Mike Rylander <mrylander@gmail.com>
Open-ILS/src/sql/Pg/upgrade/XXXX.schema.MVF.sql [new file with mode: 0644]

diff --git a/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.MVF.sql b/Open-ILS/src/sql/Pg/upgrade/XXXX.schema.MVF.sql
new file mode 100644 (file)
index 0000000..0eee625
--- /dev/null
@@ -0,0 +1,383 @@
+BEGIN;
+
+CREATE EXTENSION intarray;
+
+ALTER TABLE config.record_attr_definition ADD COLUMN multi NOT NULL DEFAULT TRUE;
+
+UPDATE  config.record_attr_definition
+  SET   multi = FALSE
+  WHERE name IN ('bib_level','control_type','pubdate','cat_form','enc_level','item_type','titlesort','authorsort');
+
+CREATE OR REPLACE FUNCTION vandelay.marc21_physical_characteristics( marc TEXT) RETURNS SETOF biblio.marc21_physical_characteristics AS $func$
+DECLARE
+    rowid   INT := 0;
+    _007    TEXT;
+    ptype   config.marc21_physical_characteristic_type_map%ROWTYPE;
+    psf     config.marc21_physical_characteristic_subfield_map%ROWTYPE;
+    pval    config.marc21_physical_characteristic_value_map%ROWTYPE;
+    retval  biblio.marc21_physical_characteristics%ROWTYPE;
+BEGIN
+
+    FOR _007 IN SELECT oils_xpath_string('//*', value) FROM UNNEST(oils_xpath('//*[@tag="007"]', marc)) x(value) LOOP
+        IF _007 IS NOT NULL AND _007 <> '' THEN
+            SELECT * INTO ptype FROM config.marc21_physical_characteristic_type_map WHERE ptype_key = SUBSTRING( _007, 1, 1 );
+
+            IF ptype.ptype_key IS NOT NULL THEN
+                FOR psf IN SELECT * FROM config.marc21_physical_characteristic_subfield_map WHERE ptype_key = ptype.ptype_key LOOP
+                    SELECT * INTO pval FROM config.marc21_physical_characteristic_value_map WHERE ptype_subfield = psf.id AND value = SUBSTRING( _007, psf.start_pos + 1, psf.length );
+
+                    IF pval.id IS NOT NULL THEN
+                        rowid := rowid + 1;
+                        retval.id := rowid;
+                        retval.ptype := ptype.ptype_key;
+                        retval.subfield := psf.id;
+                        retval.value := pval.id;
+                        RETURN NEXT retval;
+                    END IF;
+
+                END LOOP;
+            END IF;
+        END IF;
+    END LOOP;
+
+    RETURN;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+-- DECREMENTING serial starts at -1
+CREATE SEQUENCE metabib.uncontrolled_record_attr_value_id_seq INCREMENT BY -1;
+
+CREATE TABLE metabib.uncontrolled_record_attr_value (
+    id      BIGINT  PRIMARY KEY DEFAULT nextval('metabib.uncontrolled_record_attr_value_id_seq'),
+    attr    TEXT    NOT NULL REFERENCES config.record_attr_definition (name),
+    value   TEXT    NOT NULL
+);
+CREATE UNIQUE INDEX muv_once_idx ON metabib.uncontrolled_record_attr_value (attr,value);
+
+CREATE TABLE metabib.record_attr_vector_list (
+    source  BIGINT  PRIMARY KEY REFERNECES  biblio.record_entry (id),
+    vlist   INT[]   NOT NULL -- stores id from ccvm AND murav
+);
+CREATE INDEX mrca_vlist_idx ON metabib.record_attr_vector_list USING gin ( vlist gin__int_ops );
+
+CREATE TABLE metabib.record_sorter (
+    id      BIGSERIAL   PRIMARY KEY,
+    source  BIGINT      NOT NULL REFERENCES biblio.record_entry (id) ON DELETE CASCADE,
+    attr    TEXT        NOT NULL REFERENCES config.record_attr_definition (name) ON DELETE CASCADE,
+    value   TEXT        NOT NULL
+);
+CREATE INDEX metabib_sorter_source_idx ON metabib.record_sorter (source); -- we may not need one of this or the next ... stats will tell
+CREATE INDEX metabib_sorter_s_a_idx ON metabib.record_sorter (source, attr);
+CREATE INDEX metabib_sorter_a_v_idx ON metabib.record_sorter (attr, value);
+
+DROP TABLE metabib.record_attr;
+
+CREATE TYPE metbib.record_attr_type AS (
+    id      BIGINT,
+    attrs   HSTORE
+);
+
+-- Back-compat view ... we're moving to an INTARRAY world
+CREATE VIEW metabib.record_attr AS
+    SELECT  v.source AS id,
+            hstore( ARRAY_AGG( ARRAY[ COALESCE(c.ctype,u.attr), COALESCE(c.value,u.value) ] ) )
+      FROM  metabib.record_attr_vector_list v
+            LEFT JOIN metabib.uncontrolled_record_attr_value u ON ( u.id = ANY( v.vlist ) )
+            LEFT JOIN config.coded_value_map c ON ( c.id = ANY( v.vlist ) )
+      WHERE c.id IS NOT NULL OR u.id IS NOT NULL
+      GROUP BY 1;
+CREATE OR REPLACE FUNCTION vandelay.marc21_extract_fixed_field_list( marc TEXT, ff TEXT ) RETURNS TEXT[] AS $func$
+DECLARE
+    rtype       TEXT;
+    ff_pos      RECORD;
+    tag_data    RECORD;
+    val         TEXT;
+    collection  TEXT[] := '{}'::TEXT[];
+BEGIN
+    rtype := (vandelay.marc21_record_type( marc )).code;
+    FOR ff_pos IN SELECT * FROM config.marc21_ff_pos_map WHERE fixed_field = ff AND rec_type = rtype ORDER BY tag DESC LOOP
+        IF ff_pos.tag = 'ldr' THEN
+            val := oils_xpath_string('//*[local-name()="leader"]', marc);
+            IF val IS NOT NULL THEN
+                val := SUBSTRING( val, ff_pos.start_pos + 1, ff_pos.length );
+                collection := collection || val;
+            END IF;
+        ELSE
+            FOR tag_data IN SELECT value FROM UNNEST( oils_xpath( '//*[@tag="' || UPPER(ff_pos.tag) || '"]/text()', marc ) ) x(value) LOOP
+                val := SUBSTRING( tag_data.value, ff_pos.start_pos + 1, ff_pos.length );
+                collection := collection || val;
+            END LOOP;
+        END IF;
+        val := REPEAT( ff_pos.default_val, ff_pos.length );
+        collection := collection || val;
+    END LOOP;
+
+    RETURN collection;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+CREATE OR REPLACE FUNCTION biblio.marc21_extract_fixed_field_list( rid BIGINT, ff TEXT ) RETURNS TEXT[] AS $func$
+    SELECT * FROM vandelay.marc21_extract_fixed_field_list( (SELECT marc FROM biblio.record_entry WHERE id = $1), $2 );
+$func$ LANGUAGE SQL;
+
+CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
+DECLARE
+    transformed_xml TEXT;
+    rmarc           TEXT := prmarc;
+    tmp_val         TEXT;
+    prev_xfrm       TEXT;
+    normalizer      RECORD;
+    xfrm            config.xml_transform%ROWTYPE;
+    attr_vector     INT[] := '{}'::INT[];
+    attr_vector_tmp INT[];
+    attr_list       TEXT[] := pattr_list;
+    attr_value      TEXT[];
+    norm_attr_value TEXT[];
+    tmp_xml         XML;
+    attr_def        config.record_attr_definition%ROWTYPE;
+    ccvm_row        config.code_value_map%ROWTYPE;
+BEGIN
+
+    IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
+        SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition;
+    END IF;
+
+    IF rmarc IS NULL THEN
+        SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
+    END IF;
+
+    FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE name = ANY( attr_list ) ORDER BY format LOOP
+
+        attr_value := '{}'::TEXT[]
+        norm_attr_value := '{}'::TEXT[]
+        attr_vector_tmp := '{}'::INT[]
+
+        SELECT * INTO ccvm_row FROM config.code_value_map c WHERE c.ctype = attr_def.name; 
+
+        -- tag+sf attrs only support SVF
+        IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
+            SELECT  ARRAY[ARRAY_TO_STRING(ARRAY_AGG(value), COALESCE(attr_def.joiner,' '))] INTO attr_value
+              FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
+              WHERE record = rid
+                    AND tag LIKE attr_def.tag
+                    AND CASE
+                        WHEN attr_def.sf_list IS NOT NULL 
+                            THEN POSITION(subfield IN attr_def.sf_list) > 0
+                        ELSE TRUE
+                    END
+              GROUP BY tag
+              ORDER BY tag
+              LIMIT 1;
+
+        ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
+            attr_value := vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
+
+            IF NOT attr_def.multi THEN
+                attr_value := ARRAY[attr_value[1]];
+            END IF;
+
+        ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
+
+            SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
+        
+            -- See if we can skip the XSLT ... it's expensive
+            IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
+                -- Can't skip the transform
+                IF xfrm.xslt <> '---' THEN
+                    transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
+                ELSE
+                    transformed_xml := rmarc;
+                END IF;
+    
+                prev_xfrm := xfrm.name;
+            END IF;
+
+            IF xfrm.name IS NULL THEN
+                -- just grab the marcxml (empty) transform
+                SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
+                prev_xfrm := xfrm.name;
+            END IF;
+
+            FOR tmp_xml IN SELECT XPATH(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]) LOOP
+                attr_value := attr_value ||
+                                oils_xpath_string(
+                                    '//*',
+                                    tmp_xml::TEXT,
+                                    COALESCE(attr_def.joiner,' '),
+                                    ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
+                                );
+                EXIT WHEN NOT attr_def.multi;
+            END LOOP;
+
+        ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
+            SELECT  ARRAY_AGG(m.value) INTO attr_vlue
+              FROM  vandelay.marc21_physical_characteristics(rmarc) v
+                    LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
+              WHERE v.subfield = attr_def.phys_char_sf
+                    AND ( ccvm.id IS NULL OR ( ccvm.id IS NOT NULL AND v.id IS NOT NULL) );
+
+            IF NOT attr_def.multi THEN
+                attr_value := ARRAY[attr_value[1]];
+            END IF;
+
+        END IF;
+
+                -- apply index normalizers to attr_value
+        FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value);
+            FOR normalizer IN
+                SELECT  n.func AS func,
+                        n.param_count AS param_count,
+                        m.params AS params
+                  FROM  config.index_normalizer n
+                        JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
+                  WHERE attr = attr_def.name
+                  ORDER BY m.pos LOOP
+                    EXECUTE 'SELECT ' || normalizer.func || '(' ||
+                    COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
+                        CASE
+                            WHEN normalizer.param_count > 0
+                                THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
+                                ELSE ''
+                            END ||
+                    ')' INTO tmp_val;
+
+            END LOOP;
+            norm_attr_value := norm_attr_value || tmp_val;
+        END LOOP;
+        
+        IF attr_def.filter THEN
+            -- Create unknown uncontrolled values and find the IDs of the values
+            IF ccvm.id IS NULL THEN
+                FOR tmp_val FROM SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
+                    BEGIN; -- use subtransaction to isolate unique constraint violations
+                        INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
+                    EXCEPTION WHEN unique_violation THEN END;
+                END LOOP;
+
+                SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
+            ELSE
+                SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND value = ANY( norm_attr_value );
+            END IF;
+
+            -- Add the new value to the vector
+            attr_vector := attr_vector || attr_vector_tmp;
+        END IF;
+
+        IF attr_def.sorter THEN
+            DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
+            INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
+        END IF;
+
+    END LOOP;
+
+    IF ARRAY_LENGTH(vlist, 1) > 0 THEN
+/* We may need to rewrite the vlist to contain
+   the intersection of new values for requested
+   attrs and old values for ignored attrs. To
+   do this, we take the old attr vlist and
+   subtract any values that are valid for the
+   requested attrs, and then add back the new
+   set of attr values. */
+        IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN 
+            SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
+            SELECT attr_vector_tmp - ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = ANY (pattr_list);
+            SELECT attr_vector_tmp - ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = ANY (pattr_list);
+            attr_vector := attr_vector || attr_vector_tmp;
+        END IF;
+
+        IF rdeleted THEN -- initial insert OR revivication
+            DELETE FROM metabib.record_attr_vector_list WHERE source = rid;
+            INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector);
+            ELSE
+            UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid;
+        END IF;
+    END IF;
+
+END;
+
+$$ LANGUAGE PLPGSQL;
+
+
+-- AFTER UPDATE OR INSERT trigger for biblio.record_entry
+CREATE OR REPLACE FUNCTION biblio.indexing_ingest_or_delete () RETURNS TRIGGER AS $func$
+BEGIN
+
+    IF NEW.deleted IS TRUE THEN -- If this bib is deleted
+        PERFORM * FROM config.internal_flag WHERE
+            name = 'ingest.metarecord_mapping.preserve_on_delete' AND enabled;
+        IF NOT FOUND THEN
+            -- One needs to keep these around to support searches
+            -- with the #deleted modifier, so one should turn on the named
+            -- internal flag for that functionality.
+            DELETE FROM metabib.metarecord_source_map WHERE source = NEW.id;
+            DELETE FROM metabib.record_attr_vector_list WHERE source = NEW.id;
+        END IF;
+
+        DELETE FROM authority.bib_linking WHERE bib = NEW.id; -- Avoid updating fields in bibs that are no longer visible
+        DELETE FROM biblio.peer_bib_copy_map WHERE peer_record = NEW.id; -- Separate any multi-homed items
+        DELETE FROM metabib.browse_entry_def_map WHERE source = NEW.id; -- Don't auto-suggest deleted bibs
+        RETURN NEW; -- and we're done
+            END IF;
+
+    IF TG_OP = 'UPDATE' THEN -- re-ingest?
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.reingest.force_on_same_marc' AND enabled;
+
+        IF NOT FOUND AND OLD.marc = NEW.marc THEN -- don't do anything if the MARC didn't change
+            RETURN NEW;
+        END IF;
+    END IF;
+
+    -- Record authority linking
+    PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_authority_linking' AND enabled;
+    IF NOT FOUND THEN
+        PERFORM biblio.map_authority_linking( NEW.id, NEW.marc );
+    END IF;
+
+    -- Flatten and insert the mfr data
+    PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_full_rec' AND enabled;
+    IF NOT FOUND THEN
+        PERFORM metabib.reingest_metabib_full_rec(NEW.id);
+
+        -- Now we pull out attribute data, which is dependent on the mfr for all but XPath-based fields
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_metabib_rec_descriptor' AND enabled;
+        IF NOT FOUND THEN
+            PERFORM metabib.reingest_record_attributes(NEW.id, NULL, NEW.marc, TG_OP = 'INSERT' OR OLD.deleted);
+        END IF;
+    END IF;
+
+    -- Gather and insert the field entry data
+    PERFORM metabib.reingest_metabib_field_entries(NEW.id);
+
+    -- Located URI magic
+    IF TG_OP = 'INSERT' THEN
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
+        IF NOT FOUND THEN
+            PERFORM biblio.extract_located_uris( NEW.id, NEW.marc, NEW.editor );
+        END IF;
+    ELSE
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.disable_located_uri' AND enabled;
+        IF NOT FOUND THEN
+            PERFORM biblio.extract_located_uris( NEW.id, NEW.marc, NEW.editor );
+        END IF;
+    END IF;
+
+    -- (re)map metarecord-bib linking
+    IF TG_OP = 'INSERT' THEN -- if not deleted and performing an insert, check for the flag
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_insert' AND enabled;
+        IF NOT FOUND THEN
+            PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint );
+        END IF;
+    ELSE -- we're doing an update, and we're not deleted, remap
+        PERFORM * FROM config.internal_flag WHERE name = 'ingest.metarecord_mapping.skip_on_update' AND enabled;
+        IF NOT FOUND THEN
+            PERFORM metabib.remap_metarecord_for_bib( NEW.id, NEW.fingerprint );
+        END IF;
+    END IF;
+
+    RETURN NEW;
+END;
+$func$ LANGUAGE PLPGSQL;
+
+COMMIT;
+