Add Marque.pm.in in support-scripts.
authorJason Stephenson <jstephenson@mvlc.org>
Sat, 21 Sep 2013 18:13:47 +0000 (14:13 -0400)
committerThomas Berezansky <tsbere@mvlc.org>
Tue, 1 Oct 2013 16:56:49 +0000 (12:56 -0400)
This commit creates Marque.pm.in in the support-scripts  src directory.
Marque.pm.in will be transformed into Marque.pm at build time.  Marque.pm
replaces the current marc_export scipt with a faster, DBI based alternative.
marc_export will become a symlink to Marque.pm in the ${prefix}/bin dir.

This commit also adds an upgrade script to create indexes on create_date and
edit_date in authority.record_entry.  This makes performance of the new
--since option acceptable when used on authorities.

At the moment, most features of the original marc_export are still supported.
Only mfhd export and the progress report have been omitted at this time.  A
future commit will add support for mfhd.  I am not a fan of progress output
from command line applications, though I may add that feature back in with
a switch to turn it off.

See the release notes (to be written in an upcoming commit) for design notes,
and performance information.

This is a squashed, rebased commit of previous work.

Signed-off-by: Jason Stephenson <jstephenson@mvlc.org>
Open-ILS/src/sql/Pg/011.schema.authority.sql
Open-ILS/src/sql/Pg/upgrade/XXXX.index.authority_record_entry.sql [new file with mode: 0644]
Open-ILS/src/support-scripts/Marque.pm.in [new file with mode: 0644]

index 17fe07d..0882050 100644 (file)
@@ -135,6 +135,8 @@ CREATE TABLE authority.record_entry (
 );
 CREATE INDEX authority_record_entry_creator_idx ON authority.record_entry ( creator );
 CREATE INDEX authority_record_entry_editor_idx ON authority.record_entry ( editor );
+CREATE INDEX authority_record_entry_create_date_idx ON authority.record_entry ( create_date );
+CREATE INDEX authority_record_entry_edit_date_idx ON authority.record_entry ( edit_date );
 CREATE INDEX authority_record_deleted_idx ON authority.record_entry(deleted) WHERE deleted IS FALSE OR deleted = false;
 CREATE TRIGGER a_marcxml_is_well_formed BEFORE INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE biblio.check_marcxml_well_formed();
 CREATE TRIGGER b_maintain_901 BEFORE INSERT OR UPDATE ON authority.record_entry FOR EACH ROW EXECUTE PROCEDURE evergreen.maintain_901();
diff --git a/Open-ILS/src/sql/Pg/upgrade/XXXX.index.authority_record_entry.sql b/Open-ILS/src/sql/Pg/upgrade/XXXX.index.authority_record_entry.sql
new file mode 100644 (file)
index 0000000..1c8e03c
--- /dev/null
@@ -0,0 +1,2 @@
+CREATE INDEX authority_record_entry_create_date_idx ON authority.record_entry ( create_date );
+CREATE INDEX authority_record_entry_edit_date_idx ON authority.record_entry ( edit_date );
diff --git a/Open-ILS/src/support-scripts/Marque.pm.in b/Open-ILS/src/support-scripts/Marque.pm.in
new file mode 100644 (file)
index 0000000..7d187f3
--- /dev/null
@@ -0,0 +1,751 @@
+#!/usr/bin/perl
+# ---------------------------------------------------------------
+# Copyright © 2013 Merrimack Valley Library Consortium
+# Jason Stephenson <jstephenson@mvlc.org>
+#
+# This program is part of Evergreen; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License as
+# published by the Free Software Foundation; either version 2 of the
+# License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# ---------------------------------------------------------------
+use strict;
+use warnings;
+use OpenILS::Utils::Fieldmapper;
+use OpenILS::Application::AppUtils;
+use OpenSRF::Utils::JSON;
+use MARC::Field;
+use MARC::Record;
+use MARC::File::XML (BinaryEncoding => 'UTF-8');
+use Date::Manip::Date;
+my $U = 'OpenILS::Application::AppUtils';
+
+binmode(STDERR, ':utf8');
+
+package Marque;
+
+our $config = Marque::Config->new();
+Fieldmapper->import(IDL => $config->option_value('xml-idl'));
+
+# Look for passed in ids:
+my @ids = ();
+if ($config->need_ids()) {
+    print STDERR "Waiting for input\n";
+    while (my $i = <>) {
+        push @ids, $i if ($i =~ /^\s*[0-9]+\s*$/);
+    }
+}
+
+my $exporter;
+if ($config->option_value('type') eq 'authority') {
+    $exporter = Marque::Authority->new(\@ids);
+} else {
+    $exporter = Marque::Biblio->new(\@ids);
+}
+
+Marque::Output::output($exporter);
+
+# ------------------------------------------------------------------
+package Marque::Config;
+
+use Getopt::Long;
+use List::MoreUtils qw(none);
+use OpenSRF::System;
+use OpenSRF::Utils::SettingsClient;
+
+use constant FORMATS => qw(USMARC UNIMARC XML BRE ARE);
+use constant STORES => qw(reporter cstore storage);
+use constant TYPES => qw(authority biblio);
+
+
+sub new {
+    my $class = shift;
+
+    my $self = {};
+
+    # For command line options.
+    my %opts;
+
+    # set some default values
+    $opts{'format'} = 'USMARC';
+    $opts{'encoding'} = 'MARC8';
+    $opts{'type'} = 'biblio';
+    $opts{'money'} = '$';
+    $opts{'timeout'} = 0;
+    $opts{'config'} = '@sysconfdir@/opensrf_core.xml';
+    $opts{'store'} = 'reporter';
+
+    GetOptions(\%opts,
+               'help',
+               'items',
+               'mfhd',
+               'all',
+               'replace_001',
+               'location=s',
+               'money=s',
+               'config=s',
+               'format=s',
+               'type=s',
+               'xml-idl=s',
+               'encoding=s',
+               'timeout=i',
+               'library=s@',
+               'since=s',
+               'store=s',
+               'debug');
+
+    if ($opts{help}) {
+        print <<"HELP";
+This script exports MARC authority, bibliographic, and serial holdings
+records from an Evergreen database. 
+
+Input to this script can consist of a list of record IDs, with one record ID
+per line, corresponding to the record ID in the Evergreen database table of
+your requested record type.
+
+Alternately, passing the --all option will attempt to export all records of
+the specified type from the Evergreen database. The --all option starts at
+record ID 1 and increments the ID by 1 until the largest ID in the database
+is retrieved. This may not be very efficient for databases with large gaps
+in their ID sequences.
+
+Usage: $0 [options]
+ --help or -h       This screen.
+ --config or -c     Configuration file [@sysconfdir@/opensrf_core.xml]
+ --format or -f     Output format (USMARC, UNIMARC, XML, BRE, ARE) [USMARC]
+ --encoding or -e   Output encoding (UTF-8, ISO-8859-?, MARC8) [MARC8]
+ --xml-idl or -x    Location of the IDL XML
+ --timeout          Remains for backward compatibility. No loner used.
+ --type or -t       Record type (BIBLIO, AUTHORITY) [BIBLIO]
+ --all or -a        Export all records; ignores input list
+ --replace_001      Replace the 001 field value with the record ID
+ --store            Use the given storage backend to connect to the database.
+                    Choices are (reporter, cstore, storage) [reporter]
+ --since            Export records modified since a certain date and time.
+
+ Additional options for type = 'BIBLIO':
+ --items or -i      Include items (holdings) in the output
+ --money            Currency symbol to use in item price field [\$]
+ --mfhd             Export serial MFHD records for associated bib records
+                    Not compatible with --format=BRE
+ --location or -l   MARC Location Code for holdings from
+                    http://www.loc.gov/marc/organizations/orgshome.html
+ --library          Export the bibliographic records that have attached
+                    holdings for the listed library or libraries as
+                    identified by shortname
+
+Examples:
+
+To export a set of USMARC records in a file named "output_file" based on the
+IDs contained in a file named "list_of_ids":
+  cat list_of_ids | $0 > output_file
+
+To export a set of MARC21XML authority records in a file named "output.xml"
+for all authority records in the database:
+  $0 --format XML --type AUTHORITY --all > output.xml
+
+To export a set of USMARC bibliographic records encoded in UTF-8 in a file
+named "sys1_bibs.mrc" based on records which have attached callnumbers for the
+libraries with the short names "BR1" and "BR2":
+
+  $0 --library BR1 --library BR2 --encoding UTF-8 > sys1_bibs.mrc
+
+HELP
+        exit;
+    }
+
+    OpenSRF::System->bootstrap_client( config_file => $opts{config} );
+    my $sclient = OpenSRF::Utils::SettingsClient->new();
+    unless ($opts{'xml-idl'}) {
+        $opts{'xml-idl'} = $sclient->config_value('IDL');
+    }
+
+    # Validate some of the settings.
+    if ($opts{all} && $opts{library}) {
+        die('Incompatible arguments: you cannot combine a request for all ' .
+                'records with a request for records by library');
+    }
+    if ($opts{all} && $opts{since}) {
+        die('Incompatible arguments: you cannot combine a request for all ' .
+                'records with a request for records added or changed since a certain date');
+    }
+    $opts{type} = lc($opts{type});
+    if (none {$_ eq $opts{type}} (TYPES)) {
+        die "Please select a supported type.  ".
+            "Right now that means one of [".
+                join('|',(FORMATS)). "]\n";
+    }
+    $opts{format} = uc($opts{format});
+    if (none {$_ eq $opts{format}} (FORMATS)) {
+        die "Please select a supported format.  ".
+            "Right now that means one of [".
+                join('|',(FORMATS)). "]\n";
+    }
+
+    if ($opts{format} eq 'ARE' && $opts{type} ne 'authority') {
+        die "Format ARE is not compatible with type " . $opts{type};
+    }
+    if ($opts{format} eq 'BRE' && $opts{type} ne 'biblio') {
+        die "Format BRE is not compatible with type " . $opts{type};
+    }
+    if ($opts{format} eq 'BRE' && $opts{items}) {
+        die "Format BRE is not compatible with exporting holdings."
+    }
+
+    if ($opts{mfhd}) {
+        # Remove the next line when I actually figure out how to work
+        # this into the biblio extractor.
+        die "MFHD export not presently supported.";
+        if ($opts{type} ne 'biblio') {
+            die "MFHD export only works with bibliographic records.";
+        } elsif ($opts{format} eq 'BRE') {
+            die "MFHD export incompatible with format BRE.";
+        } elsif (!$opts{all}) {
+            die "MFHD export only work when exporting all records.";
+        }
+    }
+
+    $opts{store} = lc($opts{store});
+    if (none {$_ eq $opts{store}} (STORES)) {
+        die "Please select a supported store.  ".
+            "Right now that means one of [".
+                join('|',(STORES)). "]\n";
+    } else {
+        my $app;
+        if ($opts{store} eq 'reporter') {
+            $app = 'open-ils.reporter-store';
+        } else {
+            $app = 'open-ils.' . $opts{store};
+        }
+        if ($app eq 'open-ils.storage') {
+            $self->{dbsettings} = $sclient->config_value(
+                apps => $app => app_settings => databases => 'database');
+        } else {
+            $self->{dbsettings} = $sclient->config_value(
+                apps => $app => app_settings => 'database');
+        }
+    }
+    $opts{encoding} = uc($opts{encoding});
+
+    $self->{'options'} = \%opts;
+    bless $self, $class;
+    return $self;
+}
+
+sub option_value {
+    my ($self, $option) = @_;
+    return $self->{options}->{$option};
+}
+
+sub database_settings {
+    my $self = shift;
+    return $self->{dbsettings};
+}
+
+sub need_ids {
+    my $self = shift;
+    my $rv = 1;
+
+    $rv = 0 if ($self->{options}->{all});
+    $rv = 0 if ($self->{options}->{since});
+    $rv = 0 if ($self->{options}->{library});
+
+    return $rv;
+}
+
+# ------------------------------------------------------------------
+# This package exists to get a connection to the database.  Since
+# we'll need one for both biblio records and authorities, we've made a
+# single subpackage with a function so that we don't have to duplicate
+# code.
+package Marque::Connector;
+
+use DBI;
+
+# Pass a Marque::Config object's database_settings return value into
+# this to get a DBI connection.
+# ex:
+# my $db = Marque::Connector::connect($config->database_settings);
+sub connect {
+    my $args = shift;
+
+    # Build a connect string from the args.
+    my $connect_string = 'DBI:Pg:';
+    $connect_string .= 'dbname=' . $args->{db} . ';';
+    $connect_string .= 'host=' . $args->{host} . ';';
+    $connect_string .= 'port=' . $args->{port};
+
+    my $db_handle = DBI->connect($connect_string,
+                                 $args->{user}, $args->{pw});
+    return $db_handle;
+}
+
+# A function to get the date into a format better for PostgreSQL.
+sub db_date {
+    my $input = shift;
+    my $date;
+    if (ref($input) eq 'Date::Manip::Date') {
+        $date = $input;
+    } else {
+        $date = Date::Manip::Date->new();
+        if ($date->parse($input)) {
+            die "Can't parse date $input";
+        }
+    }
+    return $date->printf("%Y-%m-%dT%H:%M:%S%z");
+ }
+
+# ------------------------------------------------------------------
+# You would typically have the next two packages inherit from a common
+# superclass, but ineritance doesn't seem to work when all packages
+# are in single file, so we have some duplicated code between these
+# two.
+
+# Get bibliographic records from the database.
+package Marque::Biblio;
+
+sub new {
+    my $class = shift;
+    my $idlist = shift;
+    my $self = {idlist => $idlist};
+    $self->{handle} = Marque::Connector::connect(
+        $Marque::config->database_settings);
+    $self->{since_date} = Date::Manip::Date->new;
+    $self->{since_date}->parse($Marque::config->option_value('since'));
+
+    # We need multiple fieldmapper classes depending on our
+    # options. We'll just get the information that we'll need for them
+    # all right here instead of only fetching the information when
+    # needed.
+    $self->{breClass} = Fieldmapper::class_for_hint('bre');
+    $self->{acnClass} = Fieldmapper::class_for_hint('acn');
+    $self->{acpClass} = Fieldmapper::class_for_hint('acp');
+    $self->{sreClass} = Fieldmapper::class_for_hint('sre');
+
+    bless $self, $class;
+    return $self;
+}
+
+sub build_query {
+    my $self = shift;
+
+    # Get the field names and tables for our classes. We add the fully
+    # qualified table names to the fields so that the joins will work.
+    my $breTable = $self->{breClass}->Table();
+    my @breFields = map {$breTable . '.' . $_} $self->{breClass}->real_fields();
+    my $acnTable = $self->{acnClass}->Table();
+    my $acpTable = $self->{acpClass}->Table();
+
+    # Now we build the query in pieces:
+
+    # We always select the bre fields:
+    my $select = 'select ' . join(',', @breFields);
+    # We always use the bre table.
+    my $from = "from $breTable";
+
+    # If have the libraries or items options, we need to join the
+    # asset.call_number table. If we have both, this variable checks
+    # that it has already been joined so we don't create an invalid
+    # query.
+    my $acn_joined = 0;
+    # Join to the acn table as needed for the library option.
+    if ($Marque::config->option_value('library')) {
+        $acn_joined = 1;
+        $from .= <<ACN_JOIN;
+
+join $acnTable on $acnTable.record = $breTable.id
+and $acnTable.deleted = 'f'
+join (select id from actor.org_unit where shortname in (
+ACN_JOIN
+
+        $from .= join(',',
+                      map {"'$_'"} @{$Marque::config->option_value('library')});
+        $from .= ") aou on $acnTable.owning_lib = aou.id";
+    }
+
+    if ($Marque::config->option_value('items')) {
+        unless ($acn_joined) {
+            $from .= "\njoin $acnTable on $acnTable.record = $breTable.id";
+            $from .= "\nand $acnTable.deleted = 'f'";
+        }
+        $from .= "\njoin $acpTable on $acpTable.call_number = $acnTable.id";
+        $from .= "\nand $acpTable.deleted = 'f'";
+    }
+
+    # The where really depends on a few options:
+    my $where = 'where ';
+    # We fill in the where as necessary.
+    if ($self->{idlist} && @{$self->{idlist}}) {
+        $where .= "$breTable.id in (" . join(',', @{$self->{idlist}}) . ')';
+    } elsif ($Marque::config->option_value('since')) {
+        my $since_str = Marque::Connector::db_date($self->{since_date});
+        $where .= "$breTable.edit_date > '$since_str'";
+        $where .= " or $breTable.create_date > '$since_str'";
+    } else {
+        # We want all non-deleted records.
+        $where .= "$breTable.deleted = 'f'";
+    }
+
+    $self->{query} = $select . "\n" . $from . "\n" . $where;
+}
+
+sub execute_query {
+    my $self = shift;
+    $self->build_query() unless ($self->{query});
+    $self->{sth} = $self->{handle}->prepare($self->{query});
+    return $self->{sth}->execute;
+}
+
+sub next {
+    my $self = shift;
+    my $output;
+
+    my @data = $self->{sth}->fetchrow_array;
+    if (@data) {
+        my $bre = $self->{breClass}->new(\@data);
+        if ($Marque::config->option_value('format') eq 'BRE') {
+            $output = OpenSRF::Utils::JSON->perl2JSON($bre);
+        } else {
+            my $r;
+            eval {
+                $r = MARC::Record->new_from_xml($bre->marc(),
+                                                $Marque::config->option_value('encoding'),
+                                                $Marque::config->option_value('format'));
+            };
+            if ($@) {
+                print STDERR "Error in authority record " . $bre->id() . "\n";
+                print STDERR "$@\n";
+                import MARC::File::XML; # Reset SAX Parser.
+                return $self->next();
+            }
+            if ($Marque::config->option_value('replace_001')) {
+                my $tcn = $r->field('001');
+                if ($tcn) {
+                    $tcn->update($bre->id());
+                } else {
+                    $tcn = MARC::Field->new('001', $bre->id());
+                    $r->insert_fields_ordered($tcn);
+                }
+            }
+            if ($Marque::config->option_value('since')) {
+                my $leader = $r->leader();
+                if ($U->is_true($bre->deleted())) {
+                    $leader = substr($leader, 5, 1, 'd');
+                    $r->leader($leader);
+                } else {
+                    my $create_date = Date::Manip::Date->new;
+                    $create_date->parse($bre->create_date());
+                    my $edit_date = Date::Manip::Date->new;
+                    $edit_date->parse($bre->edit_date());
+                    if ($self->{since_date}->cmp($create_date) < 0) {
+                        $leader = substr($leader, 5, 1, 'n');
+                        $r->leader($leader);
+                    } elsif ($self->{since_date}->cmp($edit_date) < 0) {
+                        $leader = substr($leader, 5, 1, 'c');
+                        $r->leader($leader);
+                    }
+                }
+            }
+            if ($Marque::config->option_value('items')) {
+                my @acps = $self->acps_for_bre($bre);
+                foreach my $acp (@acps) {
+                    my $location = $Marque::config->option_value('location');
+                    my $price = ($acp->price() ? $Marque::config->option_value('money').$acp->price() : '');
+                    $r->insert_grouped_field(
+                        MARC::Field->new(
+                            '852', '4', ' ',
+                            ($location ? ('a' => $location) : ()),
+                            b => $acp->call_number()->owning_lib()->shortname(),
+                            b => $acp->circ_lib()->shortname(),
+                            c => $acp->copy_location()->name(),
+                            j => $acp->call_number()->label(),
+                            ($acp->circ_modifier() ? (g => $acp->circ_modifier()) : ()),
+                            p => $acp->barcode(),
+                            ($price ? (y => $price) : ()),
+                            ($acp->copy_number() ? (t => $acp->copy_number()) : ()),
+                            ($U->is_true($acp->ref()) ? (x => 'reference') : ()),
+                            (!$U->is_true($acp->holdable()) ? (x => 'unholdable') : ()),
+                            (!$U->is_true($acp->circulate()) ? (x => 'noncirculating') : ()),
+                            (!$U->is_true($acp->opac_visible()) ? (x => 'hidden') : ())
+                        )
+                    );
+                }
+            }
+            if ($Marque::config->option_value('format') eq 'XML') {
+                $output = $r->as_xml_record;
+                $output =~ s/^<\?.+?\?>$//mo;
+            } else {
+                $output = $r->as_usmarc;
+            }
+        }
+    }
+
+    return $output;
+}
+
+# Returns a list of aou objects in an array.
+sub orgs {
+    my $self = shift;
+    unless ($self->{orgs} && @{$self->{orgs}}) {
+        my $fmClass = Fieldmapper::class_for_hint('aou');
+        my @classFields = $fmClass->real_fields();
+        my $classTable = $fmClass->Table();
+        my $query = 'select ' . join(',', @classFields);
+        $query .= "\nfrom $classTable";
+        my $sth = $self->{handle}->prepare($query);
+        if ($sth->execute()) {
+            my $result = $sth->fetchall_arrayref();
+            my @orgs = map {$fmClass->new($_)} @{$result};
+            $self->{orgs} = \@orgs;
+        } else {
+            $self->{orgs} = [];
+        }
+    }
+    return @{$self->{orgs}};
+}
+
+# Returns an array of acpl objects.
+sub shelves {
+    my $self = shift;
+
+    unless ($self->{shelves} && @{$self->{shelves}}) {
+        my $fmClass = Fieldmapper::class_for_hint('acpl');
+        my @classFields = $fmClass->real_fields();
+        my $classTable = $fmClass->Table();
+        my $query = 'select ' . join(',', @classFields);
+        $query .= "\nfrom $classTable";
+        my $result = $self->{handle}->selectall_arrayref($query);
+        my @shelves = map {$fmClass->new($_)} @{$result};
+        $self->{shelves} = \@shelves;
+    }
+
+    return @{$self->{shelves}};
+}
+
+# Returns an array of acn objects for a given bre object or id.
+sub acns_for_bre {
+    my $self = shift;
+    my $bre = shift;
+    $bre = $bre->id() if (ref($bre));
+
+    unless ($self->{acnHandle}) {
+        my $query = "select " . join(',', $self->{acnClass}->real_fields());
+        $query .= "\nfrom " . $self->{acnClass}->Table();
+        $query .= "\nwhere record = ? and deleted = 'f'";
+        if ($Marque::config->option_value('library')) {
+            $query .= "\nand owning_lib in (";
+            $query .= "select id from actor.org_unit where shortname in (";
+            $query .= join(',', map {"'$_'"} @{$Marque::config->option_value('library')});
+            $query .= "))";
+        }
+        $self->{acnHandle} = $self->{handle}->prepare($query);
+    }
+
+    if ($self->{acnHandle}->execute($bre)) {
+        my $result = $self->{acnHandle}->fetchall_arrayref();
+        return map {$self->{acnClass}->new($_)} @{$result};
+    }
+
+    # If for some reason, we don't find anything.
+    return undef;
+}
+
+# Returns an array of acp objects for a given bre object or id.
+sub acps_for_bre {
+    my $self = shift;
+    my $bre = shift;
+    $bre = $bre->id() if (ref($bre));
+
+    my @orgs = $self->orgs();
+    my @locations = $self->shelves();
+
+    my @acns = $self->acns_for_bre($bre);
+    if (@acns) {
+        my $query = 'select ' . join(',', $self->{acpClass}->real_fields());
+        $query .= "\nfrom " . $self->{acpClass}->Table();
+        $query .= "\nwhere deleted = 'f' and call_number in (";
+        $query .= join(',', map {$_->id()} @acns);
+        $query .= ")";
+        my $result = $self->{handle}->selectall_arrayref($query);
+        if ($result && @{$result}) {
+            my @acps = map {$self->{acpClass}->new($_)} @{$result};
+            return map {
+                my $cn = $_->call_number();
+                my $clib = $_->circ_lib();
+                my $loc = $_->copy_loaction();
+                my ($org) = grep {$_->id() == $clib} @orgs;
+                my ($acn) = grep {$_->id() == $cn} @acns;
+                my ($location) = grep {$_->id() == $loc} @locations;
+                my $olib = $acn->owning_lib();
+                my ($owner) = grep {$_->id() == $olib} @orgs;
+                $acn->owning_lib($owner);
+                $_->call_number($acn);
+                $_->circ_lib($org);
+                $_->copy_location($location);
+            } @acps;
+        }
+    }
+
+    # If for some reason, we don't find anything.
+    return undef;
+}
+
+# Get authority records from the database.
+package Marque::Authority;
+
+sub new {
+    my $class = shift;
+    my $idlist = shift;
+    my $self = {idlist => $idlist};
+    $self->{handle} = Marque::Connector::connect(
+        $Marque::config->database_settings);
+    $self->{fmClass} = Fieldmapper::class_for_hint('are');
+    $self->{since_date} = Date::Manip::Date->new;
+    $self->{since_date}->parse($Marque::config->option_value('since'));
+    bless $self, $class;
+    return $self;
+}
+
+sub build_query {
+    my $self = shift;
+
+    # Get the information for are object from the Fieldmapper:
+        my @fields  = $self->{fmClass}->real_fields();
+    my $table = $self->{fmClass}->Table();
+
+    # Build the actual query.
+    my $select = "select " . join(',', @fields);
+    my $from = "from $table";
+    my $where = 'where ';
+
+    # If we have an idlist, we pretty much ignore anything else.
+    if ($self->{idlist} && @{$self->{idlist}}) {
+        $where .= 'id in (' . join(',', @{$self->{idlist}}) . ')';
+    } elsif ($Marque::config->option_value('since')) {
+        my $since_str = Marque::Connector::db_date($self->{since_date});
+        $where .= "edit_date > '$since_str'";
+        $where .= " or create_date > '$since_str'";
+    } else {
+        # We want all non-deleted records.
+        $where .= "deleted = 'f'";
+    }
+
+    $self->{query} = $select . "\n" . $from . "\n" . $where;
+}
+
+sub execute_query {
+    my $self = shift;
+    $self->build_query() unless ($self->{query});
+    $self->{sth} = $self->{handle}->prepare($self->{query});
+    return $self->{sth}->execute;
+}
+
+sub next {
+    my $self = shift;
+    my $output;
+    my @data = $self->{sth}->fetchrow_array;
+
+    if (@data) {
+        my $format = $Marque::config->option_value('format');
+        my $are = $self->{fmClass}->new(\@data);
+        if ($format eq 'ARE') {
+            $output = OpenSRF::Utils::JSON->perl2JSON($are);
+        } else {
+            my $r;
+            eval {
+                $r = MARC::Record->new_from_xml($are->marc(),
+                                                $Marque::config->option_value('encoding'),
+                                                $Marque::config->option_value('format'));
+            };
+            if ($@) {
+                print STDERR "Error in authority record " . $are->id() . "\n";
+                print STDERR "$@\n";
+                import MARC::File::XML; # Reset SAX Parser.
+                return $self->next();
+            }
+            if ($Marque::config->option_value('replace_001')) {
+                my $tcn = $r->field('001');
+                if ($tcn) {
+                    $tcn->update($are->id());
+                } else {
+                    $tcn = MARC::Field->new('001', $are->id());
+                    $r->insert_fields_ordered($tcn);
+                }
+            }
+            if ($Marque::config->option_value('since')) {
+                my $leader = $r->leader();
+                if ($U->is_true($are->deleted())) {
+                    $leader = substr($leader, 5, 1, 'd');
+                    $r->leader($leader);
+                } else {
+                    my $create_date = Date::Manip::Date->new;
+                    $create_date->parse($are->create_date());
+                    my $edit_date = Date::Manip::Date->new;
+                    $edit_date->parse($are->edit_date());
+                    if ($self->{since_date}->cmp($create_date) < 0) {
+                        $leader = substr($leader, 5, 1, 'n');
+                        $r->leader($leader);
+                    } elsif ($self->{since_date}->cmp($edit_date) < 0) {
+                        $leader = substr($leader, 5, 1, 'c');
+                        $r->leader($leader);
+                    }
+                }
+            }
+            if ($Marque::config->option_value('format') eq 'XML') {
+                $output = $r->as_xml_record;
+                $output =~ s/^<\?.+?\?>$//mo;
+            } else {
+                $output = $r->as_usmarc;
+            }
+        }
+    }
+
+    return $output;
+}
+
+# ------------------------------------------------------------------
+# Since the ultimate output is largely independent of the type of the
+# records, we use a single subpackage to group our output routines.
+package Marque::Output;
+
+sub output {
+    my $extractor = shift;
+    if ($extractor->execute_query) {
+        if ($Marque::config->option_value('encoding') eq 'UTF-8') {
+            binmode(STDOUT, ':utf8');
+        } else {
+            binmode(STDOUT, ':raw');
+        }
+
+        &preamble;
+        while (my $output = $extractor->next()) {
+            print $output;
+        }
+        &postamble;
+    } else {
+        print STDERR $extractor->{query} if ($Marque::config->option_value('debug'));
+        die "Database query failed!";
+    }
+}
+
+sub preamble {
+    if ($Marque::config->option_value('format') eq 'XML') {
+        my $encoding = $Marque::config->option_value('encoding');
+        print <<PREAMBLE;
+<?xml version="1.0" encoding="$encoding"?>
+<collection xmlns='http://www.loc.gov/MARC21/slim'>
+PREAMBLE
+    }
+}
+
+sub postamble {
+    if ($Marque::config->option_value('format') eq 'XML') {
+        print "</collection>\n";
+    }
+}
+
+1;