Provisional upgrade script for composite attributes
authorMike Rylander <mrylander@gmail.com>
Tue, 21 Jan 2014 17:52:34 +0000 (12:52 -0500)
committerMike Rylander <mrylander@gmail.com>
Wed, 29 Jan 2014 18:24:50 +0000 (13:24 -0500)
Various baseline fixes including syntax repairs, use of
JSON::XS for debian squeez support, correction of plperlu
SPI usage.

Signed-off-by: Bill Erickson <berick@esilibrary.com>
Signed-off-by: Mike Rylander <mrylander@gmail.com>
Open-ILS/src/sql/Pg/002.schema.config.sql
Open-ILS/src/sql/Pg/030.schema.metabib.sql
Open-ILS/src/sql/Pg/upgrade/YYYY.schema.CRA.sql [new file with mode: 0644]

index 1ae319a..1a58f55 100644 (file)
@@ -852,8 +852,8 @@ END;
 $f$ LANGUAGE PLPGSQL;
 
 CREATE TABLE config.composite_attr_entry_definition(
-    coded_value PRIMARY KEY NOT NULL REFERENCES config.coded_value_map (id) ON UPDATE CASCADE ON DELETE CASCADE,
-    definition  TEXT        NOT NULL -- JSON
+    coded_value INT  PRIMARY KEY NOT NULL REFERENCES config.coded_value_map (id) ON UPDATE CASCADE ON DELETE CASCADE,
+    definition  TEXT    NOT NULL -- JSON
 );
 
 -- List applied db patches that are deprecated by (and block the application of) my_db_patch
index c23a3ac..ad9943a 100644 (file)
@@ -296,19 +296,18 @@ CREATE VIEW metabib.full_attr_id_map AS
     SELECT id, attr, value FROM metabib.composite_attr_id_map;
 
 
-CREATE FUNCTION metabib.compile_composite_attr ( cattr_id INT ) RETURNS query_int AS $func$
+CREATE OR REPLACE FUNCTION metabib.compile_composite_attr ( cattr_id INT ) RETURNS query_int AS $func$
 
-    use JSON;
+    use JSON::XS;
     my $cid = shift;
 
     my $cattr = spi_exec_query(
-        "SELECT * FROM config.composite_attr_entry_defintion WHERE id = $cid"
+        "SELECT * FROM config.composite_attr_entry_definition WHERE coded_value = $cid"
     )->{rows}[0];
 
     die("Composite attribute not found with an id of $cid") unless $cattr;
 
-    my $plan = spi_prepare('SELECT * FROM metabib.full_attr_id_map WHERE attr = $1 AND value = $2', qw/TEXT TEXT/);
-    my $def = from_json $cattr->{definition};
+    my $def = decode_json $cattr->{definition};
 
     sub recurse {
         my $d = shift;
@@ -317,16 +316,18 @@ CREATE FUNCTION metabib.compile_composite_attr ( cattr_id INT ) RETURNS query_in
 
         if (ref $d eq 'HASH') { # node or AND
             if (exists $d->{_attr}) { # it is a node
-                return spi_query_prepared(
+                my $plan = spi_prepare('SELECT * FROM metabib.full_attr_id_map WHERE attr = $1 AND value = $2', qw/TEXT TEXT/);
+                return spi_exec_prepared(
                     $plan, {limit => 1}, $d->{_attr}, $d->{_val}
                 )->{rows}[0]{id};
+                spi_freeplan($plan);
             } elsif (exists $d->{_not} && scalar(keys(%$d)) == 1) { # it is a NOT
                 return '!' . recurse($$d{_not});
             } else { # an AND list
                 @list = map { recurse($$d{$_}) } sort keys %$d;
             }
-        } elsif (ref $d eq 'ARRAY')
-            $j = '|'
+        } elsif (ref $d eq 'ARRAY') {
+            $j = '|';
             @list = map { recurse($_) } @$d;
         }
         return '(' . join($j,@list) . ')' if @list;
@@ -335,7 +336,7 @@ CREATE FUNCTION metabib.compile_composite_attr ( cattr_id INT ) RETURNS query_in
 
     return recurse($def);
 
-$func$ STRICT STABLE IMMUTABLE LANGUAGE plperlu;
+$func$ STRICT IMMUTABLE LANGUAGE plperlu;
 
 CREATE TABLE metabib.record_attr_vector_list (
     source  BIGINT  PRIMARY KEY REFERENCES biblio.record_entry (id),
@@ -376,7 +377,7 @@ CREATE VIEW metabib.record_attr_flat AS
             m.attr,
             m.value
       FROM  metabib.full_attr_id_map m
-            JOIN  metabib.record_attr_vector_list v ( m.id = ANY( v.vlist ) )
+            JOIN  metabib.record_attr_vector_list v ON ( m.id = ANY( v.vlist ) );
 
 CREATE VIEW metabib.record_attr AS
     SELECT id, HSTORE( ARRAY_AGG( attr ), ARRAY_AGG( value ) ) AS attrs FROM metabib.record_attr_flat GROUP BY 1;
@@ -1566,7 +1567,7 @@ BEGIN
         FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
 
             tmp_val := metabib.compile_composite_attr( ccvm_row.id );
-            NEXT WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
+            CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
 
             IF attr_def.filter THEN
                 IF attr_vector @@ tmp_val::query_int THEN
diff --git a/Open-ILS/src/sql/Pg/upgrade/YYYY.schema.CRA.sql b/Open-ILS/src/sql/Pg/upgrade/YYYY.schema.CRA.sql
new file mode 100644 (file)
index 0000000..cdc3e78
--- /dev/null
@@ -0,0 +1,289 @@
+BEGIN;
+
+ALTER TABLE config.record_attr_definition ADD COLUMN composite BOOL NOT NULL DEFAULT FALSE;
+
+CREATE TABLE config.composite_attr_entry_definition(
+    coded_value INT  PRIMARY KEY NOT NULL REFERENCES config.coded_value_map (id) ON UPDATE CASCADE ON DELETE CASCADE,
+    definition  TEXT NOT NULL -- JSON
+);
+
+CREATE OR REPLACE VIEW metabib.record_attr_id_map AS
+    SELECT id, attr, value FROM metabib.uncontrolled_record_attr_value
+        UNION
+    SELECT  c.id, c.ctype AS attr, c.code AS value
+      FROM  config.coded_value_map c
+            JOIN config.record_attr_definition d ON (d.name = c.ctype AND NOT d.composite);
+
+CREATE VIEW metabib.composite_attr_id_map AS
+    SELECT  c.id, c.ctype AS attr, c.code AS value
+      FROM  config.coded_value_map c
+            JOIN config.record_attr_definition d ON (d.name = c.ctype AND d.composite);
+
+CREATE OR REPLACE VIEW metabib.full_attr_id_map AS
+    SELECT id, attr, value FROM metabib.record_attr_id_map
+        UNION
+    SELECT id, attr, value FROM metabib.composite_attr_id_map;
+
+
+CREATE OR REPLACE FUNCTION metabib.compile_composite_attr ( cattr_id INT ) RETURNS query_int AS $func$
+
+    use JSON::XS;
+    my $cid = shift;
+
+    my $cattr = spi_exec_query(
+        "SELECT * FROM config.composite_attr_entry_definition WHERE coded_value = $cid"
+    )->{rows}[0];
+
+    die("Composite attribute not found with an id of $cid") unless $cattr;
+
+    my $def = decode_json $cattr->{definition};
+
+    sub recurse {
+        my $d = shift;
+        my $j = '&';
+        my @list;
+
+        if (ref $d eq 'HASH') { # node or AND
+            if (exists $d->{_attr}) { # it is a node
+                my $plan = spi_prepare('SELECT * FROM metabib.full_attr_id_map WHERE attr = $1 AND value = $2', qw/TEXT TEXT/);
+                return spi_exec_prepared(
+                    $plan, {limit => 1}, $d->{_attr}, $d->{_val}
+                )->{rows}[0]{id};
+                spi_freeplan($plan);
+            } elsif (exists $d->{_not} && scalar(keys(%$d)) == 1) { # it is a NOT
+                return '!' . recurse($$d{_not});
+            } else { # an AND list
+                @list = map { recurse($$d{$_}) } sort keys %$d;
+            }
+        } elsif (ref $d eq 'ARRAY') {
+            $j = '|';
+            @list = map { recurse($_) } @$d;
+        }
+        return '(' . join($j,@list) . ')' if @list;
+        return '';
+    }
+
+    return recurse($def);
+
+$func$ STRICT IMMUTABLE LANGUAGE plperlu;
+
+CREATE OR REPLACE VIEW metabib.record_attr_flat AS
+    SELECT  v.source AS id,
+            m.attr,
+            m.value
+      FROM  metabib.full_attr_id_map m
+            JOIN  metabib.record_attr_vector_list v ON ( m.id = ANY( v.vlist ) );
+
+CREATE OR REPLACE FUNCTION metabib.reingest_record_attributes (rid BIGINT, pattr_list TEXT[] DEFAULT NULL, prmarc TEXT DEFAULT NULL, rdeleted BOOL DEFAULT TRUE) RETURNS VOID AS $func$
+DECLARE
+    transformed_xml TEXT;
+    rmarc           TEXT := prmarc;
+    tmp_val         TEXT;
+    prev_xfrm       TEXT;
+    normalizer      RECORD;
+    xfrm            config.xml_transform%ROWTYPE;
+    attr_vector     INT[] := '{}'::INT[];
+    attr_vector_tmp INT[];
+    attr_list       TEXT[] := pattr_list;
+    attr_value      TEXT[];
+    norm_attr_value TEXT[];
+    tmp_xml         XML;
+    attr_def        config.record_attr_definition%ROWTYPE;
+    ccvm_row        config.coded_value_map%ROWTYPE;
+BEGIN
+
+    IF attr_list IS NULL OR rdeleted THEN -- need to do the full dance on INSERT or undelete
+        SELECT ARRAY_AGG(name) INTO attr_list FROM config.record_attr_definition;
+    END IF;
+
+    IF rmarc IS NULL THEN
+        SELECT marc INTO rmarc FROM biblio.record_entry WHERE id = rid;
+    END IF;
+
+    FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE NOT composite AND name = ANY( attr_list ) ORDER BY format LOOP
+
+        attr_value := '{}'::TEXT[];
+        norm_attr_value := '{}'::TEXT[];
+        attr_vector_tmp := '{}'::INT[];
+
+        SELECT * INTO ccvm_row FROM config.coded_value_map c WHERE c.ctype = attr_def.name LIMIT 1; 
+
+        -- tag+sf attrs only support SVF
+        IF attr_def.tag IS NOT NULL THEN -- tag (and optional subfield list) selection
+            SELECT  ARRAY[ARRAY_TO_STRING(ARRAY_AGG(value), COALESCE(attr_def.joiner,' '))] INTO attr_value
+              FROM  (SELECT * FROM metabib.full_rec ORDER BY tag, subfield) AS x
+              WHERE record = rid
+                    AND tag LIKE attr_def.tag
+                    AND CASE
+                        WHEN attr_def.sf_list IS NOT NULL 
+                            THEN POSITION(subfield IN attr_def.sf_list) > 0
+                        ELSE TRUE
+                    END
+              GROUP BY tag
+              ORDER BY tag
+              LIMIT 1;
+
+        ELSIF attr_def.fixed_field IS NOT NULL THEN -- a named fixed field, see config.marc21_ff_pos_map.fixed_field
+            attr_value := vandelay.marc21_extract_fixed_field_list(rmarc, attr_def.fixed_field);
+
+            IF NOT attr_def.multi THEN
+                attr_value := ARRAY[attr_value[1]];
+            END IF;
+
+        ELSIF attr_def.xpath IS NOT NULL THEN -- and xpath expression
+
+            SELECT INTO xfrm * FROM config.xml_transform WHERE name = attr_def.format;
+        
+            -- See if we can skip the XSLT ... it's expensive
+            IF prev_xfrm IS NULL OR prev_xfrm <> xfrm.name THEN
+                -- Can't skip the transform
+                IF xfrm.xslt <> '---' THEN
+                    transformed_xml := oils_xslt_process(rmarc,xfrm.xslt);
+                ELSE
+                    transformed_xml := rmarc;
+                END IF;
+    
+                prev_xfrm := xfrm.name;
+            END IF;
+
+            IF xfrm.name IS NULL THEN
+                -- just grab the marcxml (empty) transform
+                SELECT INTO xfrm * FROM config.xml_transform WHERE xslt = '---' LIMIT 1;
+                prev_xfrm := xfrm.name;
+            END IF;
+
+            FOR tmp_xml IN SELECT XPATH(attr_def.xpath, transformed_xml, ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]) LOOP
+                tmp_val := oils_xpath_string(
+                                '//*',
+                                tmp_xml::TEXT,
+                                COALESCE(attr_def.joiner,' '),
+                                ARRAY[ARRAY[xfrm.prefix, xfrm.namespace_uri]]
+                            );
+                IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
+                    attr_value := attr_value || tmp_val;
+                    EXIT WHEN NOT attr_def.multi;
+                END IF;
+            END LOOP;
+
+        ELSIF attr_def.phys_char_sf IS NOT NULL THEN -- a named Physical Characteristic, see config.marc21_physical_characteristic_*_map
+            SELECT  ARRAY_AGG(m.value) INTO attr_value
+              FROM  vandelay.marc21_physical_characteristics(rmarc) v
+                    LEFT JOIN config.marc21_physical_characteristic_value_map m ON (m.id = v.value)
+              WHERE v.subfield = attr_def.phys_char_sf AND (m.value IS NOT NULL AND BTRIM(m.value) <> '')
+                    AND ( ccvm_row.id IS NULL OR ( ccvm_row.id IS NOT NULL AND v.id IS NOT NULL) );
+
+            IF NOT attr_def.multi THEN
+                attr_value := ARRAY[attr_value[1]];
+            END IF;
+
+        END IF;
+
+                -- apply index normalizers to attr_value
+        FOR tmp_val IN SELECT value FROM UNNEST(attr_value) x(value) LOOP
+            FOR normalizer IN
+                SELECT  n.func AS func,
+                        n.param_count AS param_count,
+                        m.params AS params
+                  FROM  config.index_normalizer n
+                        JOIN config.record_attr_index_norm_map m ON (m.norm = n.id)
+                  WHERE attr = attr_def.name
+                  ORDER BY m.pos LOOP
+                    EXECUTE 'SELECT ' || normalizer.func || '(' ||
+                    COALESCE( quote_literal( tmp_val ), 'NULL' ) ||
+                        CASE
+                            WHEN normalizer.param_count > 0
+                                THEN ',' || REPLACE(REPLACE(BTRIM(normalizer.params,'[]'),E'\'',E'\\\''),E'"',E'\'')
+                                ELSE ''
+                            END ||
+                    ')' INTO tmp_val;
+
+            END LOOP;
+            IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
+                norm_attr_value := norm_attr_value || tmp_val;
+            END IF;
+        END LOOP;
+        
+        IF attr_def.filter THEN
+            -- Create unknown uncontrolled values and find the IDs of the values
+            IF ccvm_row.id IS NULL THEN
+                FOR tmp_val IN SELECT value FROM UNNEST(norm_attr_value) x(value) LOOP
+                    IF tmp_val IS NOT NULL AND BTRIM(tmp_val) <> '' THEN
+                        BEGIN -- use subtransaction to isolate unique constraint violations
+                            INSERT INTO metabib.uncontrolled_record_attr_value ( attr, value ) VALUES ( attr_def.name, tmp_val );
+                        EXCEPTION WHEN unique_violation THEN END;
+                    END IF;
+                END LOOP;
+
+                SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.uncontrolled_record_attr_value WHERE attr = attr_def.name AND value = ANY( norm_attr_value );
+            ELSE
+                SELECT ARRAY_AGG(id) INTO attr_vector_tmp FROM config.coded_value_map WHERE ctype = attr_def.name AND code = ANY( norm_attr_value );
+            END IF;
+
+            -- Add the new value to the vector
+            attr_vector := attr_vector || attr_vector_tmp;
+        END IF;
+
+        IF attr_def.sorter THEN
+            DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
+            INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, norm_attr_value[1]);
+        END IF;
+
+    END LOOP;
+
+/* We may need to rewrite the vlist to contain
+   the intersection of new values for requested
+   attrs and old values for ignored attrs. To
+   do this, we take the old attr vlist and
+   subtract any values that are valid for the
+   requested attrs, and then add back the new
+   set of attr values. */
+
+        IF ARRAY_LENGTH(pattr_list, 1) > 0 THEN 
+            SELECT vlist INTO attr_vector_tmp FROM metabib.record_attr_vector_list WHERE source = rid;
+            SELECT attr_vector_tmp - ARRAY_AGG(id) INTO attr_vector_tmp FROM metabib.full_attr_id_map WHERE attr = ANY (pattr_list);
+            attr_vector := attr_vector || attr_vector_tmp;
+        END IF;
+
+    -- On to composite attributes, now that the record attrs have been pulled.  Processed in name order, so later composite
+    -- attributes can depend on earlier ones.
+    FOR attr_def IN SELECT * FROM config.record_attr_definition WHERE composite AND name = ANY( attr_list ) ORDER BY name LOOP
+
+        FOR ccvm_row IN SELECT * FROM config.coded_value_map c WHERE c.ctype = attr_def.name ORDER BY value LOOP
+
+            tmp_val := metabib.compile_composite_attr( ccvm_row.id );
+            CONTINUE WHEN tmp_val IS NULL OR tmp_val = ''; -- nothing to do
+
+            IF attr_def.filter THEN
+                IF attr_vector @@ tmp_val::query_int THEN
+                    attr_vector = attr_vector + intset(ccvm_row.id);
+                    EXIT WHEN NOT attr_def.multi;
+                END IF;
+            END IF;
+
+            IF attr_def.sorter THEN
+                IF attr_vector ~~ tmp_val THEN
+                    DELETE FROM metabib.record_sorter WHERE source = rid AND attr = attr_def.name;
+                    INSERT INTO metabib.record_sorter (source, attr, value) VALUES (rid, attr_def.name, ccvm_row.code);
+                END IF;
+            END IF;
+
+        END LOOP;
+
+    END LOOP;
+
+    IF ARRAY_LENGTH(attr_vector, 1) > 0 THEN
+        IF rdeleted THEN -- initial insert OR revivication
+            DELETE FROM metabib.record_attr_vector_list WHERE source = rid;
+            INSERT INTO metabib.record_attr_vector_list (source, vlist) VALUES (rid, attr_vector);
+        ELSE
+            UPDATE metabib.record_attr_vector_list SET vlist = attr_vector WHERE source = rid;
+        END IF;
+    END IF;
+
+END;
+
+$func$ LANGUAGE PLPGSQL;
+
+COMMIT;
+