This patch replaces the regex-based XML stream parsing mechanism with an XML::Parser...
authorerickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 31 Mar 2008 20:47:41 +0000 (20:47 +0000)
committererickson <erickson@9efc2488-bf62-4759-914b-345cdb29e865>
Mon, 31 Mar 2008 20:47:41 +0000 (20:47 +0000)
The API and parsing behavior should behave identically
This requires a new OpenSRF Perl dependency -> FreezeThaw

git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1296 9efc2488-bf62-4759-914b-345cdb29e865

src/perlmods/OpenSRF/Transport.pm
src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm
src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm
src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm
src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm [new file with mode: 0644]
src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm [new file with mode: 0644]
src/perlmods/OpenSRF/UnixServer.pm

index 4629e57..69e803e 100644 (file)
@@ -81,16 +81,13 @@ sub handler {
 
        $logger->transport( "Transport handler() received $data", INTERNAL );
 
-       # pass data to the message envelope 
-       my $helper = OpenSRF::Transport::SlimJabber::MessageWrapper->new( $data );
+       my $remote_id   = $data->from;
+       my $sess_id     = $data->thread;
+       my $body        = $data->body;
+       my $type        = $data->type;
 
-       # Extract message information
-       my $remote_id   = $helper->get_remote_id();
-       my $sess_id     = $helper->get_sess_id();
-       my $body        = $helper->get_body();
-       my $type        = $helper->get_msg_type();
+       $logger->set_osrf_xid($data->osrf_xid);
 
-       $logger->set_osrf_xid($helper->get_osrf_xid);
 
        if (defined($type) and $type eq 'error') {
                throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!");
@@ -129,8 +126,8 @@ sub handler {
        eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); };
        if( $@ ) {
 
-               $logger->transport( "Received bogus JSON: $@", INFO );
-               $logger->transport( "Bogus JSON data: \n $body \n", INTERNAL );
+               $logger->warn("Received bogus JSON: $@");
+               $logger->warn("Bogus JSON data: $body");
                my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" );
 
                $app_session->status($res);
index 0447da3..c136c2c 100644 (file)
 package OpenSRF::Transport::SlimJabber::Client;
 use strict; use warnings;
 use OpenSRF::EX;
-use base qw( OpenSRF );
-use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::Utils::Config;
-use Time::HiRes qw(ualarm);
-use OpenSRF::Utils::Config;
-
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use IO::Socket::INET;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::SlimJabber::XMPPReader;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
 use IO::Socket::UNIX;
-
-=head1 Description
-
-OpenSRF::Transport::SlimJabber::Client
-
-Home-brewed slimmed down jabber connection agent. Supports SSL connections
-with a config file options:
-
-  transport->server->sslport # the ssl port
-  transport->server->ssl  # is this ssl?
-
-=cut
-
-my $logger = "OpenSRF::Utils::Logger";
+use FreezeThaw qw/freeze/;
 
 sub DESTROY{
-       my $self = shift;
-       $self->disconnect;
+    shift()->disconnect;
 }
 
-sub disconnect{
-       my $self = shift;
-       my $socket = $self->{_socket};
-       if( $socket and $socket->connected() ) {
-               print $socket "</stream:stream>";
-               close( $socket );
-       }
-}
-
-
-=head2 new()
-
-Creates a new Client object.
-
-debug and log_file are not required if you don't care to log the activity, 
-however all other parameters are.
-
-%params:
-
-       username
-       resource        
-       password
-       debug    
-       log_file
-
-=cut
-
 sub new {
-
        my( $class, %params ) = @_;
-
-       $class = ref( $class ) || $class;
-
-       my $port                        = $params{'port'}                       || return undef;
-       my $username    = $params{'username'}   || return undef;
-       my $resource    = $params{'resource'}   || return undef;
-       my $password    = $params{'password'}   || return undef;
-       my $host                        = $params{'host'}                       || return undef;
-
-       my $jid = "$username\@$host\/$resource";
-
-       my $self = bless {} => $class;
-
-       $self->jid( $jid );
-       $self->host( $host );
-       $self->port( $port );
-       $self->username( $username );
-       $self->resource( $resource );
-       $self->password( $password );
-       $self->{temp_buffer} = "";
-
-       $logger->transport( "Creating Client instance: $host:$port, $username, $resource",
-                       $logger->INFO );
-
+    my $self = bless({}, ref($class) || $class);
+    $self->params(\%params);
        return $self;
 }
 
-# clears the tmp buffer as well as the TCP buffer
-sub buffer_reset { 
-
-       my $self = shift;
-       $self->{temp_buffer} = ""; 
 
-       my $fh = $self->{_socket};
-       set_nonblock( $fh );
-       my $t_buf = "";
-       while( sysread( $fh, $t_buf, 4096 ) ) {} 
-       set_block( $fh );
+sub reader {
+    my($self, $reader) = @_;
+    $self->{reader} = $reader if $reader;
+    return $self->{reader};
 }
-# -------------------------------------------------
-
-=head2 gather()
-
-Gathers all Jabber messages sitting in the collection queue 
-and hands them each to their respective callbacks.  This call
-does not block (calls Process(0))
 
-=cut
-
-sub gather { my $self = shift; $self->process( 0 ); }
-
-# -------------------------------------------------
-
-=head2 listen()
-
-Blocks and gathers incoming messages as they arrive.  Does not return
-unless an error occurs.
-
-Throws an OpenSRF::EX::JabberException if the call to Process ever fails.
-
-=cut
-sub listen {
-       my $self = shift;
-
-       my $sock = $self->unix_sock();
-       my $socket = IO::Socket::UNIX->new( Peer => $sock  );
-       $logger->transport( "Unix Socket opened by Listener", INTERNAL );
-       
-       throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
-               unless ($socket->connected);
-               
-       while(1) {
-               my $o = $self->process( -1 );
-               $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL );
-               if( ! defined( $o ) ) {
-                       throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" );
-               }
-               print $socket $o;
-
-       }
-       throw OpenSRF::EX::Socket( "How did we get here?!?!" );
+sub params {
+    my($self, $params) = @_;
+    $self->{params} = $params if $params;
+    return $self->{params};
 }
 
-sub set_nonblock {
-       my $fh = shift;
-       my      $flags = fcntl($fh, F_GETFL, 0)
-               or die "Can't get flags for the socket: $!\n";
-
-       $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL );
-
-       fcntl($fh, F_SETFL, $flags | O_NONBLOCK)
-               or die "Can't set flags for the socket: $!\n";
-
-       return $flags;
+sub socket {
+    my($self, $socket) = @_;
+    $self->{socket} = $socket if $socket;
+    return $self->{socket};
 }
 
-sub reset_fl {
-       my $fh = shift;
-       my $flags = shift;
-       $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL );
-       fcntl($fh, F_SETFL, $flags) if defined $flags;
+sub disconnect {
+    my $self = shift;
+       $self->reader->disconnect if $self->reader;
 }
 
-sub set_block {
-       my $fh = shift;
-
-       my      $flags = fcntl($fh, F_GETFL, 0)
-               or die "Can't get flags for the socket: $!\n";
 
-       $flags &= ~O_NONBLOCK;
-
-       fcntl($fh, F_SETFL, $flags)
-               or die "Can't set flags for the socket: $!\n";
+sub gather { 
+    my $self = shift; 
+    $self->process( 0 ); 
 }
 
-
-sub timed_read {
-       my ($self, $timeout) = @_;
-    $timeout = defined($timeout) ? int($timeout) : undef;
-
-       $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
-       if( $self->can( "app" ) ) {
-               $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL );
-       }
-
-       # See if there is a complete message in the temp_buffer
-       # that we can return
-       if( $self->{temp_buffer} ) {
-               my $buffer = $self->{temp_buffer};
-               my $complete = 0;
-               $self->{temp_buffer} = '';
-
-               my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o);
-               $self->{last_tag} = $tag;
-               $logger->transport("Using tag: $tag  ", INTERNAL);
-
-               if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
-                       $buffer = $1;
-                       $self->{temp_buffer} = $2;
-                       $complete++;
-                       $logger->transport( "completed read with $buffer", INTERNAL );
-               } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
-                       $self->{temp_buffer} = $1;
-                       $complete++;
-                       $logger->transport( "completed read with $buffer", INTERNAL );
-               } else {
-                       $self->{temp_buffer} = $buffer;
-               }
-                               
-               if( $buffer and $complete ) {
-                       return $buffer;
-               }
-
-       }
-       ############
-
-       my $fh = $self->{_socket};
-
-       unless( $fh and $fh->connected ) {
-               throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR );
-       }
-
-       $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer};
-
-       my $flags;
-       if (defined($timeout) && !$timeout) {
-               $flags = set_nonblock( $fh );
-       }
-
-       $timeout ||= 0;
-       $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL );
-
-
-       my $complete = 0;
-       my $first_read = 1;
-       my $xml = '';
-       eval {
-               my $tag = '';
-               eval {
-                       no warnings;
-                       local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
-
-                       # alarm needs a number greater => 1.
-                       my $alarm_timeout = $timeout;
-                       if( $alarm_timeout > 0 and $alarm_timeout < 1 ) {
-                               $alarm_timeout = 1;
-                       }
-                       alarm $alarm_timeout;
-                       do {    
-
-                               my $buffer = $self->{temp_buffer};
-                               $self->{temp_buffer} = '';
-                               #####
-
-                               # This code is no longer in use
-                               #my $ff =  fcntl($fh, F_GETFL, 0);
-                               #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) {
-                                       #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR );
-                               #}
-
-                               my $t_buf = "";
-                               my $read_size = 1024; my $f = 0;
-                               while( my $n = sysread( $fh, $t_buf, $read_size ) ) {
-
-                                       unless( $fh->connected ) {
-                                               OpenSRF::EX::JabberDisconnected->throw(
-                                                       "Lost jabber client in timed_read()");
-                                       }
-
-                                       # XXX Change me to debug/internal at some point, this is for testing...
-                                       # XXX Found a race condition where reading >= $read_size bytes of data
-                                       # will fail if the log line below is removed.
-                                       $logger->info("timed_read() read $n bytes of data");
-
-
-                                       $buffer .= $t_buf;
-                                       if( $n < $read_size ) {
-                                               #reset_fl( $fh, $f ) if $f;
-                                               set_block( $fh );
-                                               last;
-                                       }
-                                       # see if there is any more data to grab...
-                                       $f = set_nonblock( $fh );
-                               }
-
-                               #sysread($fh, $buffer, 2048, length($buffer) );
-                               #sysread( $fh, $t_buf, 2048 );
-                               #$buffer .= $t_buf;
-
-                               #####
-                               $logger->transport(" Got [$buffer] from the socket", INTERNAL);
-
-                               if ($first_read) {
-                                       $logger->transport(" First read Buffer\n [$buffer]", INTERNAL);
-                                       ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o);
-                                       $self->{last_tag} = $tag;
-                                       $first_read--;
-                                       $logger->transport("Using tag: $tag  ", INTERNAL);
-                               }
-
-                               if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) {
-                                       $buffer = $1;
-                                       $self->{temp_buffer} = $2;
-                                       $complete++;
-                                       $logger->transport( "completed read with $buffer", INTERNAL );
-                               } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) {
-                                       $self->{temp_buffer} = $1;
-                                       $complete++;
-                                       $logger->transport( "completed read with $buffer", INTERNAL );
-                               }
-                               
-                               $xml .= $buffer;
-
-                       } while (!$complete && $xml);
-                       alarm(0);
-               };
-               alarm(0);
-       };
-
-       $logger->transport( "XML Read: $xml", INTERNAL );
-       #reset_fl( $fh, $flags) if defined $flags;
-       set_block( $fh ) if defined $flags;
-
-       if ($complete) {
-               return $xml;
-       }
-       if( $@ ) {
-               return undef;
-       }
-       return "";
-}
-
-
 # -------------------------------------------------
 
 sub tcp_connected {
-
        my $self = shift;
-       return 1 if ($self->{_socket} and $self->{_socket}->connected);
-       return 0;
+    return $self->reader->tcp_connected if $self->reader;
+    return 0;
 }
 
-sub password {
-       my( $self, $password ) = @_;
-       $self->{'oils:password'} = $password if $password;
-       return $self->{'oils:password'};
-}
 
-# -------------------------------------------------
-
-sub username {
-       my( $self, $username ) = @_;
-       $self->{'oils:username'} = $username if $username;
-       return $self->{'oils:username'};
-}
-       
-# -------------------------------------------------
-
-sub resource {
-       my( $self, $resource ) = @_;
-       $self->{'oils:resource'} = $resource if $resource;
-       return $self->{'oils:resource'};
-}
-
-# -------------------------------------------------
-
-sub jid {
-       my( $self, $jid ) = @_;
-       $self->{'oils:jid'} = $jid if $jid;
-       return $self->{'oils:jid'};
-}
-
-sub port {
-       my( $self, $port ) = @_;
-       $self->{'oils:port'} = $port if $port;
-       return $self->{'oils:port'};
-}
-
-sub host {
-       my( $self, $host ) = @_;
-       $self->{'oils:host'} = $host if $host;
-       return $self->{'oils:host'};
-}
-
-# -------------------------------------------------
-
-=head2 send()
-
-       Sends a Jabber message.
-       
-       %params:
-               to                      - The JID of the recipient
-               thread  - The Jabber thread
-               body            - The body of the message
-
-=cut
 
 sub send {
        my $self = shift;
-       my %params = @_;
-
-       my $to = $params{'to'} || return undef;
-       my $body = $params{'body'} || return undef;
-       my $thread = $params{'thread'} || "";
-       my $router_command = $params{'router_command'} || "";
-       my $router_class = $params{'router_class'} || "";
-
-       my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new;
-
-       $msg->setTo( $to );
-       $msg->setThread( $thread ) if $thread;
-       $msg->setBody( $body );
-       $msg->set_router_command( $router_command );
-       $msg->set_router_class( $router_class );
-    $msg->set_osrf_xid($logger->get_osrf_xid);
-
-       $logger->transport( 
-                       "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL );
-
-       my $soc = $self->{_socket};
-       unless( $soc and $soc->connected ) {
-               throw OpenSRF::EX::Jabber ("No longer connected to jabber server");
-       }
-       print $soc $msg->toString;
-
-       $logger->transport( 
-                       "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL );
+    my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_);
+    $self->reader->send($msg->to_xml);
 }
 
-
-=head2 inintialize()
-
-Connect to the server and log in.  
-
-Throws an OpenSRF::EX::JabberException if we cannot connect
-to the server or if the authentication fails.
-
-=cut
-
-# --- The logging lines have been commented out until we decide 
-# on which log files we're using.
-
 sub initialize {
 
        my $self = shift;
 
-       my $jid         = $self->jid; 
-       my $host        = $self->host; 
-       my $port        = $self->port; 
-       my $username    = $self->username;
-       my $resource    = $self->resource;
-       my $password    = $self->password;
+       my $host        = $self->params->{host}; 
+       my $port        = $self->params->{port}; 
+       my $username    = $self->params->{username};
+       my $resource    = $self->params->{resource};
+       my $password    = $self->params->{password};
 
-       my $stream = <<"        XML";
-<stream:stream to='$host' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>
-       XML
+    my $jid = "$username\@$host/$resource";
 
        my $conf = OpenSRF::Utils::Config->current;
-       my $tail = "_$$";
-       if(!$conf->bootstrap->router_name && $username eq "router") {
-               $tail = "";
-       }
-
-       my $auth = <<"  XML";
-<iq id='123' type='set'>
-<query xmlns='jabber:iq:auth'>
-<username>$username</username>
-<password>$password</password>
-<resource>${resource}$tail</resource>
-</query>
-</iq>
-       XML
-
-       my $sock_type = 'IO::Socket::INET';
-       
-       # if port is a string, then we're connecting to a UNIX socket
-       unless( $port =~ /^\d+$/ ) {
-               $sock_type = 'IO::Socket::UNIX';
-       }
-
-       # --- 5 tries to connect to the jabber server
-       my $socket;
-       for(1..5) {
-               $socket = $sock_type->new( PeerHost => $host,
-                                          PeerPort => $port,
-                                          Peer => $port,
-                                          Proto    => 'tcp' );
-               $logger->debug( "$jid: $_ connect attempt to $host:$port");
-               last if ( $socket and $socket->connected );
-               $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)");
-               sleep 3;
-       }
-
-       unless ( $socket and $socket->connected ) {
-               throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" );
-       }
 
-       $logger->transport( "Logging into jabber as $jid " .
-                       "from " . ref( $self ), DEBUG );
-
-       print $socket $stream;
-
-       my $buffer;
-       eval {
-               eval {
-                       local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required
-                       alarm 3;
-                       sysread($socket, $buffer, 4096);
-                       $logger->transport( "Login buffer 1: $buffer", INTERNAL );
-                       alarm(0);
-               };
-               alarm(0);
-       };
-
-       print $socket $auth;
-
-       if( $socket and $socket->connected() ) {
-               $self->{_socket} = $socket;
-       } else {
-               throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR );
-       }
+       my $tail = "_$$";
+       $tail = "" if !$conf->bootstrap->router_name and $username eq "router";
+    $resource = "$resource$tail";
 
+    my $socket = IO::Socket::INET->new(
+        PeerHost => $host,
+        PeerPort => $port,
+        Peer => $port,
+        Proto  => 'tcp' );
 
-       $buffer = $self->timed_read(10);
+    throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $!")
+           unless ( $socket and $socket->connected );
 
-       if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );}
+    $self->socket($socket);
+    $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket));
+    $self->reader->connect($host, $username, $password, $resource);
 
-       if( $buffer and $buffer =~ /type=["\']result["\']/ ) { 
-               $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG );
-       } else {
-               if( !$buffer ) { $buffer = " "; }
-               $socket->close;
-               throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR );
-       }
+    throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $!")
+           unless ( $self->reader->connected );
 
        return $self;
 }
 
+
 sub construct {
        my( $class, $app ) = @_;
-       $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL );
-       $class->peer_handle( 
-                       $class->new( $app )->initialize() );
+       $class->peer_handle($class->new( $app )->initialize());
 }
 
-sub process {
 
-       my( $self, $timeout ) = @_;
+sub process {
+       my($self, $timeout) = @_;
 
        $timeout ||= 0;
     $timeout = int($timeout);
-       undef $timeout if ( $timeout < 0 );
-
-       unless( $self->{_socket}->connected ) {
-               OpenSRF::EX::JabberDisconnected->throw( 
-                 "This JabberClient instance is no longer connected to the server " . 
-                 $self->username . " : " . $self->resource, ERROR );
-       }
-
-       my $val = $self->timed_read( $timeout );
-
-       $timeout = "FOREVER" unless ( defined $timeout );
-       
-       if ( ! defined( $val ) ) {
-               OpenSRF::EX::Jabber->throw( 
-                 "Call to Client->timed_read( $timeout ) failed", ERROR );
-       } elsif ( ! $val ) {
-               $logger->transport( 
-                       "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL );
-       } elsif ( $val ) {
-               $logger->transport( 
-                       "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL );
-       }
 
-       my $t = $self->{last_tag};
-
-       if( $t and $val ) {
-               my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g;
-               $val = shift(@msgs);
-       
-               if (@msgs) {
-                       my $tmp = $self->{temp_buffer};
-       
-                       $self->{temp_buffer} = '';
-                       $self->{temp_buffer} .= $_ for (@msgs);
-                       $self->{temp_buffer} .= $tmp;
-               }
+       unless( $self->reader and $self->reader->connected ) {
+        throw OpenSRF::EX::JabberDisconnected 
+            ("This JabberClient instance is no longer connected to the server ");
        }
 
-       return $val;
+    return $self->reader->wait_msg($timeout);
 }
 
 
@@ -599,34 +130,10 @@ sub process {
 # Returns 1 on success, 0 if the socket isn't connected
 # --------------------------------------------------------------
 sub flush_socket {
-
        my $self = shift;
-       my $socket = $self->{_socket};
-
-       if( $socket ) {
-
-               my $buf;
-               my      $flags = fcntl($socket, F_GETFL, 0);
-
-               fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
-               while( my $n = sysread( $socket, $buf, 8192 ) ) {
-                       $logger->debug("flush_socket dropped $n bytes of data");
-                       if(!$socket->connected()) {
-                               $logger->error("flush_socket dropped data on disconnected socket: $buf");
-                       }
-               }
-               fcntl($socket, F_SETFL, $flags);
-
-               return 0 unless $socket->connected();
-
-               return 1;
-
-       } else {
-
-               return 0;
-       }
+    return $self->reader->flush_socket;
 }
 
+1;
 
 
-1;
index 90436fe..9194927 100644 (file)
@@ -6,6 +6,7 @@ use OpenSRF::Utils::Logger qw(:level);
 use OpenSRF::Utils::SettingsClient;
 use OpenSRF::Utils::Config;
 use Time::HiRes qw/usleep/;
+use FreezeThaw qw/freeze/;
 
 my $logger = "OpenSRF::Utils::Logger";
 
@@ -20,9 +21,6 @@ This service should be loaded at system startup.
 
 =cut
 
-# XXX This will be overhauled to connect as a component instead of as
-# a user.  all in good time, though.
-
 {
        my $unix_sock;
        sub unix_sock { return $unix_sock; }
@@ -98,7 +96,6 @@ sub listen {
         $logger->info("loading router info $routers");
 
         for my $router (@$routers) {
-
             if(ref $router) {
                 if( !$router->{services} || grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) {
                     my $name = $router->{name};
@@ -134,9 +131,7 @@ sub listen {
                $logger->debug("Inbound listener calling process()");
 
                try {
-                       $o = $self->process( -1 );
-
-                       $logger->debug("Inbound listener received ".length($o)." bytes of data");
+                       $o = $self->process(-1);
 
                        if(!$o){
                                $logger->error(
@@ -158,7 +153,7 @@ sub listen {
                        throw OpenSRF::EX::Socket( 
                                "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " )
                                unless ($socket->connected);
-                       print $socket $o;
+                       print $socket freeze($o);
                        $socket->close;
                } 
        }
index 6375f6d..0fa95c5 100644 (file)
 package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use XML::LibXML;
-use OpenSRF::EX qw/:try/;
-use OpenSRF::Utils::Logger qw/$logger/;
+use strict; use warnings;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+
+# ----------------------------------------------------------
+# Legacy wrapper for XMPPMessage
+# ----------------------------------------------------------
 
 sub new {
        my $class = shift;
-       $class = ref($class) || $class;
-
-       my $xml = shift;
-
-       my ($doc, $msg);
-       if ($xml) {
-               my $err;
-
-               try {
-                       $doc = XML::LibXML->new->parse_string($xml);
-               } catch Error with {
-                       $err = shift; 
-                       warn "MessageWrapper received bad XML : error = $err\nXML = $xml\n";
-                       $logger->error("MessageWrapper received bad XML : error = $err : XML = $xml");
-               };
-               throw $err if $err;
-
-               $msg = $doc->documentElement;
-       } else {
-               $doc = XML::LibXML::Document->createDocument;
-               $msg = $doc->createElement( 'message' );
-               $doc->setDocumentElement( $msg );
-       }
-
-       
-       my $self = { msg_node => $msg };
-
-       return bless $self => $class;
+    my $msg = shift;
+    return bless({msg => $msg}, ref($class) || $class);
+}
+
+sub msg {
+    my($self, $msg) = @_;
+    $self->{msg} = $msg if $msg;
+    return $self->{msg};
 }
 
 sub toString {
-       my $self = shift;
-       if( $self->{msg_node} ) {
-               return $self->{msg_node}->toString(@_);
-       }
-       return "";
+    return $_[0]->msg->to_xml;
 }
 
 sub get_body {
-       my $self = shift;
-       my ($t_body) = grep {$_->nodeName eq 'body'} $self->{msg_node}->childNodes;
-       if( $t_body ) {
-               my $body = $t_body->textContent;
-               return $body;
-       }
-       return "";
+    return $_[0]->msg->body;
 }
 
 sub get_sess_id {
-       my $self = shift;
-       my ($t_node) = grep {$_->nodeName eq 'thread'} $self->{msg_node}->childNodes;
-       if( $t_node ) {
-               return $t_node->textContent;
-       }
-       return "";
+    return $_[0]->msg->thread;
 }
 
 sub get_msg_type {
-       my $self = shift;
-       $self->{msg_node}->getAttribute( 'type' );
+    return $_[0]->msg->type;
 }
 
 sub get_remote_id {
-       my $self = shift;
-
-       #
-       my $rid = $self->{msg_node}->getAttribute( 'router_from' );
-       return $rid if $rid;
-
-       return $self->{msg_node}->getAttribute( 'from' );
+    return $_[0]->msg->from;
 }
 
 sub setType {
-       my $self = shift;
-       $self->{msg_node}->setAttribute( type => shift );
+    $_[0]->msg->type(shift());
 }
 
 sub setTo {
-       my $self = shift;
-       $self->{msg_node}->setAttribute( to => shift );
+    $_[0]->msg->to(shift());
 }
 
 sub setThread {
-       my $self = shift;
-       $self->{msg_node}->appendTextChild( thread => shift );
+    $_[0]->msg->thread(shift());
 }
 
 sub setBody {
-       my $self = shift;
-       my $body = shift;
-       $self->{msg_node}->appendTextChild( body => $body );
+    $_[0]->msg->body(shift());
 }
 
 sub set_router_command {
-       my( $self, $router_command ) = @_;
-       if( $router_command ) {
-               $self->{msg_node}->setAttribute( router_command => $router_command );
-       }
+    $_[0]->msg->router_command(shift());
 }
 sub set_router_class {
-       my( $self, $router_class ) = @_;
-       if( $router_class ) {
-               $self->{msg_node}->setAttribute( router_class => $router_class );
-       }
+    $_[0]->msg->router_class(shift());
 }
 
 sub set_osrf_xid {
-   my( $self, $xid ) = @_;
-   $self->{msg_node}->setAttribute( osrf_xid => $xid );
+    $_[0]->msg->osrf_xid(shift());
 }
 
-
 sub get_osrf_xid {
-   my $self = shift;
-   $self->{msg_node}->getAttribute('osrf_xid');
+   return $_[0]->msg->osrf_xid;
 }
 
 1;
index 5c5b959..7c59456 100644 (file)
@@ -29,15 +29,9 @@ our $_singleton_connection;
 sub retrieve { 
        my( $class, $app ) = @_;
        return $_singleton_connection;
-#      my @keys = keys %apps_hash;
-#      OpenSRF::Utils::Logger->transport( 
-#                      "Requesting peer for $app and we have @keys", INFO );
-#      return $apps_hash{$app};
 }
 
 
-
-# !! In here we use the bootstrap config ....
 sub new {
        my( $class, $app ) = @_;
 
@@ -63,12 +57,6 @@ sub new {
 
        if( $app eq "client" ) { $resource = "client_at_$h"; }
 
-#      unless ( $conf->bootstrap->router_name ) {
-#              $username = 'router';
-#              $resource = $app;
-#      }
-
-
        OpenSRF::EX::Config->throw( "JPeer could not load all necesarry values from config" )
                unless ( $username and $password and $resource and $host and $port );
 
@@ -94,26 +82,16 @@ sub new {
 }
 
 sub process {
-
        my $self = shift;
        my $val = $self->SUPER::process(@_);
        return 0 unless $val;
-
-       OpenSRF::Utils::Logger->transport( "Calling transport handler for ".$self->app." with: $val", INTERNAL );
-       my $t;
-       $t = OpenSRF::Transport->handler($self->app, $val);
-
-       return $t;
+       return OpenSRF::Transport->handler($self->app, $val);
 }
 
 sub app {
        my $self = shift;
        my $app = shift;
-       if( $app ) {
-               OpenSRF::Utils::Logger->transport( "PEER changing app to $app: ".$self->jid, INTERNAL );
-       }
-
-       $self->{app} = $app if ($app);
+       $self->{app} = $app if $app;
        return $self->{app};
 }
 
diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
new file mode 100644 (file)
index 0000000..9bd5328
--- /dev/null
@@ -0,0 +1,134 @@
+package OpenSRF::Transport::SlimJabber::XMPPMessage;
+use strict; use warnings;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::EX qw/:try/;
+use strict; use warnings;
+use XML::LibXML;
+
+use constant JABBER_MESSAGE =>
+    "<message to='%s' from='%s' router_command='%s' router_class='%s' osrf_xid='%s'>".
+    "<thread>%s</thread><body>%s</body></message>";
+
+sub new {
+    my $class = shift;
+    my %args = @_;
+    my $self = bless({}, $class);
+
+    if($args{xml}) {
+        $self->parse_xml($args{xml});
+
+    } else {
+        $self->{to} = $args{to} || '';
+        $self->{from} = $args{from} || '';
+        $self->{thread} = $args{thread} || '';
+        $self->{body} = $args{body} || '';
+        $self->{osrf_xid} = $args{osrf_xid} || '';
+        $self->{router_command} = $args{router_command} || '';
+        $self->{router_class} = $args{router_class} || '';
+    }
+
+    return $self;
+}
+
+sub to {
+    my($self, $to) = @_;
+    $self->{to} = $to if defined $to;
+    return $self->{to};
+}
+sub from {
+    my($self, $from) = @_;
+    $self->{from} = $from if defined $from;
+    return $self->{from};
+}
+sub thread {
+    my($self, $thread) = @_;
+    $self->{thread} = $thread if defined $thread;
+    return $self->{thread};
+}
+sub body {
+    my($self, $body) = @_;
+    $self->{body} = $body if defined $body;
+    return $self->{body};
+}
+sub status {
+    my($self, $status) = @_;
+    $self->{status} = $status if defined $status;
+    return $self->{status};
+}
+sub type {
+    my($self, $type) = @_;
+    $self->{type} = $type if defined $type;
+    return $self->{type};
+}
+sub err_type {
+    my($self, $err_type) = @_;
+    $self->{err_type} = $err_type if defined $err_type;
+    return $self->{err_type};
+}
+sub err_code {
+    my($self, $err_code) = @_;
+    $self->{err_code} = $err_code if defined $err_code;
+    return $self->{err_code};
+}
+sub osrf_xid {
+    my($self, $osrf_xid) = @_;
+    $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+    return $self->{osrf_xid};
+}
+sub router_command {
+    my($self, $router_command) = @_;
+    $self->{router_command} = $router_command if defined $router_command;
+    return $self->{router_command};
+}
+sub router_class {
+    my($self, $router_class) = @_;
+    $self->{router_class} = $router_class if defined $router_class;
+    return $self->{router_class};
+}
+
+
+sub to_xml {
+    my $self = shift;
+
+    my $body = $self->{body};
+    $body =~ s/&/&amp;/sog;
+    $body =~ s/</&lt;/sog;
+    $body =~ s/>/&gt;/sog;
+
+    return sprintf(
+        JABBER_MESSAGE,
+        $self->{to},
+        $self->{from},
+        $self->{router_command},
+        $self->{router_class},
+        $self->{osrf_xid},
+        $self->{thread},
+        $body
+    );
+}
+
+sub parse_xml {
+    my($self, $xml) = @_;
+    my($doc, $err);
+
+    try {
+        $doc = XML::LibXML->new->parse_string($xml);
+    } catch Error with {
+        my $err = shift;
+        $logger->error("Error parsing message xml: $xml --- $err");
+    };
+    throw $err if $err;
+
+    my $root = $doc->documentElement;
+
+    $self->{body} = $root->findnodes('/message/body').'';
+    $self->{thread} = $root->findnodes('/message/thread').'';
+    $self->{from} = $root->getAttribute('router_from');
+    $self->{from} = $root->getAttribute('from') unless $self->{from};
+    $self->{to} = $root->getAttribute('to');
+    $self->{type} = $root->getAttribute('type');
+    $self->{osrf_xid} = $root->getAttribute('osrf_xid');
+}
+
+
+1;
diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm
new file mode 100644 (file)
index 0000000..b0705ab
--- /dev/null
@@ -0,0 +1,350 @@
+package OpenSRF::Transport::SlimJabber::XMPPReader;
+use strict; use warnings;
+use XML::Parser;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use Time::HiRes qw/time/;
+use OpenSRF::Transport::SlimJabber::XMPPMessage;
+use OpenSRF::Utils::Logger qw/$logger/;
+
+# -----------------------------------------------------------
+# Connect, disconnect, and authentication messsage templates
+# -----------------------------------------------------------
+use constant JABBER_CONNECT =>
+    "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
+
+use constant JABBER_BASIC_AUTH =>
+    "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
+    "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
+
+use constant JABBER_DISCONNECT => "</stream:stream>";
+
+
+# -----------------------------------------------------------
+# XMPP Stream states
+# -----------------------------------------------------------
+use constant DISCONNECTED   => 1;
+use constant CONNECT_RECV   => 2;
+use constant CONNECTED      => 3;
+
+
+# -----------------------------------------------------------
+# XMPP Message states
+# -----------------------------------------------------------
+use constant IN_NOTHING => 1;
+use constant IN_BODY    => 2;
+use constant IN_THREAD  => 3;
+use constant IN_STATUS  => 4;
+
+
+# -----------------------------------------------------------
+# Constructor, getter/setters
+# -----------------------------------------------------------
+sub new {
+    my $class = shift;
+    my $socket = shift;
+
+    my $self = bless({}, $class);
+
+    $self->{queue} = [];
+    $self->{stream_state} = DISCONNECTED;
+    $self->{xml_state} = IN_NOTHING;
+    $self->socket($socket);
+
+    my $p = new XML::Parser(Handlers => {
+        Start => \&start_element,
+        End   => \&end_element,
+        Char  => \&characters,
+    });
+
+    $self->parser($p->parse_start); # create a push parser
+    $self->parser->{_parent_} = $self;
+    $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+    return $self;
+}
+
+sub push_msg {
+    my($self, $msg) = @_; 
+    push(@{$self->{queue}}, $msg) if $msg;
+}
+
+sub next_msg {
+    my $self = shift;
+    return shift @{$self->{queue}};
+}
+
+sub peek_msg {
+    my $self = shift;
+    return (@{$self->{queue}} > 0);
+}
+
+sub parser {
+    my($self, $parser) = @_;
+    $self->{parser} = $parser if $parser;
+    return $self->{parser};
+}
+
+sub socket {
+    my($self, $socket) = @_;
+    $self->{socket} = $socket if $socket;
+    return $self->{socket};
+}
+
+sub stream_state {
+    my($self, $stream_state) = @_;
+    $self->{stream_state} = $stream_state if $stream_state;
+    return $self->{stream_state};
+}
+
+sub xml_state {
+    my($self, $xml_state) = @_;
+    $self->{xml_state} = $xml_state if $xml_state;
+    return $self->{xml_state};
+}
+
+sub message {
+    my($self, $message) = @_;
+    $self->{message} = $message if $message;
+    return $self->{message};
+}
+
+
+# -----------------------------------------------------------
+# Stream and connection handling methods
+# -----------------------------------------------------------
+
+sub connect {
+    my($self, $domain, $username, $password, $resource) = @_;
+    
+    $self->send(sprintf(JABBER_CONNECT, $domain));
+    $self->wait(10);
+
+    unless($self->{stream_state} == CONNECT_RECV) {
+        $logger->error("No initial XMPP response from server");
+        return 0;
+    }
+
+    $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
+    $self->wait(10);
+
+    unless($self->connected) {
+        $logger->error('XMPP connect failed');
+        return 0;
+    }
+
+    return 1;
+}
+
+sub disconnect {
+    my $self = shift;
+    $self->send(JABBER_DISCONNECT); 
+    shutdown($self->socket, 2);
+    close($self->socket);
+}
+
+# -----------------------------------------------------------
+# returns true if this stream is connected to the server
+# -----------------------------------------------------------
+sub connected {
+    my $self = shift;
+    return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
+}
+
+# -----------------------------------------------------------
+# returns true if the socket is connected
+# -----------------------------------------------------------
+sub tcp_connected {
+    my $self = shift;
+    return ($self->socket and $self->socket->connected);
+}
+
+# -----------------------------------------------------------
+# sends pre-formated XML
+# -----------------------------------------------------------
+sub send {
+    my($self, $xml) = @_;
+    $self->{socket}->print($xml);
+}
+
+# -----------------------------------------------------------
+# Puts a file handle into blocking mode
+# -----------------------------------------------------------
+sub set_block {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    $flags &= ~O_NONBLOCK;
+    fcntl($fh, F_SETFL, $flags);
+}
+
+
+# -----------------------------------------------------------
+# Puts a file handle into non-blocking mode
+# -----------------------------------------------------------
+sub set_nonblock {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+
+sub wait {
+    my($self, $timeout) = @_;
+     
+    return $self->next_msg if $self->peek_msg;
+
+    $timeout ||= 0;
+    $timeout = undef if $timeout < 0;
+    my $socket = $self->{socket};
+
+    set_block($socket);
+    
+    # build the select readset
+    my $infile = '';
+    vec($infile, $socket->fileno, 1) = 1;
+    return undef unless select($infile, undef, undef, $timeout);
+
+    # now slurp the data off the socket
+    my $buf;
+    my $read_size = 1024;
+    while(my $n = sysread($socket, $buf, $read_size)) {
+        $self->{parser}->parse_more($buf) if $buf;
+        if($n < $read_size or $self->peek_msg) {
+            set_block($socket);
+            last;
+        }
+        set_nonblock($socket);
+    }
+
+    return $self->next_msg;
+}
+
+# -----------------------------------------------------------
+# Waits up to timeout seconds for a fully-formed XMPP
+# message to arrive.  If timeout is < 0, waits indefinitely
+# -----------------------------------------------------------
+sub wait_msg {
+    my($self, $timeout) = @_;
+    my $xml;
+
+    $timeout = 0 unless defined $timeout;
+
+    if($timeout < 0) {
+        while(1) {
+            return $xml if $xml = $self->wait($timeout); 
+        }
+
+    } else {
+        while($timeout >= 0) {
+            my $start = time;
+            return $xml if $xml = $self->wait($timeout); 
+            $timeout -= time - $start;
+        }
+    }
+
+    return undef;
+}
+
+
+# -----------------------------------------------------------
+# SAX Handlers
+# -----------------------------------------------------------
+
+
+sub start_element {
+    my($parser, $name, %attrs) = @_;
+    my $self = $parser->{_parent_};
+
+    if($name eq 'message') {
+
+        my $msg = $self->{message};
+        $msg->{to} = $attrs{'to'};
+        $msg->{from} = $attrs{router_from} if $attrs{router_from};
+        $msg->{from} = $attrs{from} unless $msg->{from};
+        $msg->{osrf_xid} = $attrs{'osrf_xid'};
+        $msg->{type} = $attrs{type};
+
+    } elsif($name eq 'body') {
+        $self->{xml_state} = IN_BODY;
+
+    } elsif($name eq 'thread') {
+        $self->{xml_state} = IN_THREAD;
+
+    } elsif($name eq 'stream:stream') {
+        $self->{stream_state} = CONNECT_RECV;
+
+    } elsif($name eq 'iq') {
+        if($attrs{type} and $attrs{type} eq 'result') {
+            $self->{stream_state} = CONNECTED;
+        }
+
+    } elsif($name eq 'status') {
+        $self->{xml_state } = IN_STATUS;
+
+    } elsif($name eq 'stream:error') {
+        $self->{stream_state} = DISCONNECTED;
+
+    } elsif($name eq 'error') {
+        $self->{message}->{err_type} = $attrs{'type'};
+        $self->{message}->{err_code} = $attrs{'code'};
+        $self->{stream_state} = DISCONNECTED;
+    }
+}
+
+sub characters {
+    my($parser, $chars) = @_;
+    my $self = $parser->{_parent_};
+    my $state = $self->{xml_state};
+
+    if($state == IN_BODY) {
+        $self->{message}->{body} .= $chars;
+
+    } elsif($state == IN_THREAD) {
+        $self->{message}->{thread} .= $chars;
+
+    } elsif($state == IN_STATUS) {
+        $self->{message}->{status} .= $chars;
+    }
+}
+
+sub end_element {
+    my($parser, $name) = @_;
+    my $self = $parser->{_parent_};
+    $self->{xml_state} = IN_NOTHING;
+
+    if($name eq 'message') {
+        $self->push_msg($self->{message});
+        $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
+
+    } elsif($name eq 'stream:stream') {
+        $self->{stream_state} = DISCONNECTED;
+    }
+}
+
+sub flush_socket {
+       my $self = shift;
+       my $socket = $self->socket;
+    return 0 unless $socket and $socket->connected;
+
+    my $flags = fcntl($socket, F_GETFL, 0);
+    fcntl($socket, F_SETFL, $flags | O_NONBLOCK);
+
+    while( my $n = sysread( $socket, my $buf, 8192 ) ) {
+        $logger->debug("flush_socket dropped $n bytes of data");
+        $logger->error("flush_socket dropped data on disconnected socket: $buf")
+            unless($socket->connected);
+    }
+
+    fcntl($socket, F_SETFL, $flags);
+    return 0 unless $socket->connected;
+    return 1;
+}
+
+
+
+
+
+1;
+
+
+
+
+
index 2792db5..c4b48c8 100644 (file)
@@ -14,6 +14,7 @@ use OpenSRF::Utils::JSON;
 use vars qw/@ISA $app/;
 use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
 use Carp;
+use FreezeThaw qw/thaw/;
 
 use IO::Socket::INET;
 use IO::Socket::UNIX;
@@ -97,6 +98,7 @@ sub process_request {
                exit;
        }
 
+    ($data) = thaw($data);
        my $app_session = OpenSRF::Transport->handler( $self->app(), $data );
 
        if(!ref($app_session)) {