From 32601db31f9ea8a4be79709e4294c5db4b071e22 Mon Sep 17 00:00:00 2001 From: erickson Date: Mon, 31 Mar 2008 20:47:41 +0000 Subject: [PATCH] This patch replaces the regex-based XML stream parsing mechanism with an XML::Parser (expat) based parser. The API and parsing behavior should behave identically This requires a new OpenSRF Perl dependency -> FreezeThaw git-svn-id: svn://svn.open-ils.org/OpenSRF/trunk@1296 9efc2488-bf62-4759-914b-345cdb29e865 --- src/perlmods/OpenSRF/Transport.pm | 17 +- .../OpenSRF/Transport/SlimJabber/Client.pm | 613 ++------------------- .../OpenSRF/Transport/SlimJabber/Inbound.pm | 11 +- .../OpenSRF/Transport/SlimJabber/MessageWrapper.pm | 107 +--- .../OpenSRF/Transport/SlimJabber/PeerConnection.pm | 26 +- .../OpenSRF/Transport/SlimJabber/XMPPMessage.pm | 134 +++++ .../OpenSRF/Transport/SlimJabber/XMPPReader.pm | 350 ++++++++++++ src/perlmods/OpenSRF/UnixServer.pm | 2 + 8 files changed, 585 insertions(+), 675 deletions(-) create mode 100644 src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm create mode 100644 src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm diff --git a/src/perlmods/OpenSRF/Transport.pm b/src/perlmods/OpenSRF/Transport.pm index 4629e57..69e803e 100644 --- a/src/perlmods/OpenSRF/Transport.pm +++ b/src/perlmods/OpenSRF/Transport.pm @@ -81,16 +81,13 @@ sub handler { $logger->transport( "Transport handler() received $data", INTERNAL ); - # pass data to the message envelope - my $helper = OpenSRF::Transport::SlimJabber::MessageWrapper->new( $data ); + my $remote_id = $data->from; + my $sess_id = $data->thread; + my $body = $data->body; + my $type = $data->type; - # Extract message information - my $remote_id = $helper->get_remote_id(); - my $sess_id = $helper->get_sess_id(); - my $body = $helper->get_body(); - my $type = $helper->get_msg_type(); + $logger->set_osrf_xid($data->osrf_xid); - $logger->set_osrf_xid($helper->get_osrf_xid); if (defined($type) and $type eq 'error') { throw OpenSRF::EX::Session ("$remote_id IS NOT CONNECTED TO THE NETWORK!!!"); @@ -129,8 +126,8 @@ sub handler { eval { $doc = OpenSRF::Utils::JSON->JSON2perl($body); }; if( $@ ) { - $logger->transport( "Received bogus JSON: $@", INFO ); - $logger->transport( "Bogus JSON data: \n $body \n", INTERNAL ); + $logger->warn("Received bogus JSON: $@"); + $logger->warn("Bogus JSON data: $body"); my $res = OpenSRF::DomainObject::oilsXMLParseError->new( status => "JSON Parse Error --- $body\n\n$@" ); $app_session->status($res); diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm index 0447da3..c136c2c 100644 --- a/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/Client.pm @@ -1,595 +1,126 @@ package OpenSRF::Transport::SlimJabber::Client; use strict; use warnings; use OpenSRF::EX; -use base qw( OpenSRF ); -use OpenSRF::Utils::Logger qw(:level); use OpenSRF::Utils::Config; -use Time::HiRes qw(ualarm); -use OpenSRF::Utils::Config; - -use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); -use IO::Socket::INET; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Transport::SlimJabber::XMPPReader; +use OpenSRF::Transport::SlimJabber::XMPPMessage; use IO::Socket::UNIX; - -=head1 Description - -OpenSRF::Transport::SlimJabber::Client - -Home-brewed slimmed down jabber connection agent. Supports SSL connections -with a config file options: - - transport->server->sslport # the ssl port - transport->server->ssl # is this ssl? - -=cut - -my $logger = "OpenSRF::Utils::Logger"; +use FreezeThaw qw/freeze/; sub DESTROY{ - my $self = shift; - $self->disconnect; + shift()->disconnect; } -sub disconnect{ - my $self = shift; - my $socket = $self->{_socket}; - if( $socket and $socket->connected() ) { - print $socket ""; - close( $socket ); - } -} - - -=head2 new() - -Creates a new Client object. - -debug and log_file are not required if you don't care to log the activity, -however all other parameters are. - -%params: - - username - resource - password - debug - log_file - -=cut - sub new { - my( $class, %params ) = @_; - - $class = ref( $class ) || $class; - - my $port = $params{'port'} || return undef; - my $username = $params{'username'} || return undef; - my $resource = $params{'resource'} || return undef; - my $password = $params{'password'} || return undef; - my $host = $params{'host'} || return undef; - - my $jid = "$username\@$host\/$resource"; - - my $self = bless {} => $class; - - $self->jid( $jid ); - $self->host( $host ); - $self->port( $port ); - $self->username( $username ); - $self->resource( $resource ); - $self->password( $password ); - $self->{temp_buffer} = ""; - - $logger->transport( "Creating Client instance: $host:$port, $username, $resource", - $logger->INFO ); - + my $self = bless({}, ref($class) || $class); + $self->params(\%params); return $self; } -# clears the tmp buffer as well as the TCP buffer -sub buffer_reset { - - my $self = shift; - $self->{temp_buffer} = ""; - my $fh = $self->{_socket}; - set_nonblock( $fh ); - my $t_buf = ""; - while( sysread( $fh, $t_buf, 4096 ) ) {} - set_block( $fh ); +sub reader { + my($self, $reader) = @_; + $self->{reader} = $reader if $reader; + return $self->{reader}; } -# ------------------------------------------------- - -=head2 gather() - -Gathers all Jabber messages sitting in the collection queue -and hands them each to their respective callbacks. This call -does not block (calls Process(0)) -=cut - -sub gather { my $self = shift; $self->process( 0 ); } - -# ------------------------------------------------- - -=head2 listen() - -Blocks and gathers incoming messages as they arrive. Does not return -unless an error occurs. - -Throws an OpenSRF::EX::JabberException if the call to Process ever fails. - -=cut -sub listen { - my $self = shift; - - my $sock = $self->unix_sock(); - my $socket = IO::Socket::UNIX->new( Peer => $sock ); - $logger->transport( "Unix Socket opened by Listener", INTERNAL ); - - throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " ) - unless ($socket->connected); - - while(1) { - my $o = $self->process( -1 ); - $logger->transport( "Call to process() in listener returned:\n $o", INTERNAL ); - if( ! defined( $o ) ) { - throw OpenSRF::EX::Jabber( "Listen Loop failed at 'process()'" ); - } - print $socket $o; - - } - throw OpenSRF::EX::Socket( "How did we get here?!?!" ); +sub params { + my($self, $params) = @_; + $self->{params} = $params if $params; + return $self->{params}; } -sub set_nonblock { - my $fh = shift; - my $flags = fcntl($fh, F_GETFL, 0) - or die "Can't get flags for the socket: $!\n"; - - $logger->transport( "Setting NONBLOCK: original flags: $flags", INTERNAL ); - - fcntl($fh, F_SETFL, $flags | O_NONBLOCK) - or die "Can't set flags for the socket: $!\n"; - - return $flags; +sub socket { + my($self, $socket) = @_; + $self->{socket} = $socket if $socket; + return $self->{socket}; } -sub reset_fl { - my $fh = shift; - my $flags = shift; - $logger->transport( "Restoring BLOCK: to flags $flags", INTERNAL ); - fcntl($fh, F_SETFL, $flags) if defined $flags; +sub disconnect { + my $self = shift; + $self->reader->disconnect if $self->reader; } -sub set_block { - my $fh = shift; - - my $flags = fcntl($fh, F_GETFL, 0) - or die "Can't get flags for the socket: $!\n"; - $flags &= ~O_NONBLOCK; - - fcntl($fh, F_SETFL, $flags) - or die "Can't set flags for the socket: $!\n"; +sub gather { + my $self = shift; + $self->process( 0 ); } - -sub timed_read { - my ($self, $timeout) = @_; - $timeout = defined($timeout) ? int($timeout) : undef; - - $logger->transport( "Temp Buffer Contained: \n". $self->{temp_buffer}, INTERNAL) if $self->{temp_buffer}; - if( $self->can( "app" ) ) { - $logger->transport( "timed_read called for ".$self->app.", I am: ".$self->jid, INTERNAL ); - } - - # See if there is a complete message in the temp_buffer - # that we can return - if( $self->{temp_buffer} ) { - my $buffer = $self->{temp_buffer}; - my $complete = 0; - $self->{temp_buffer} = ''; - - my ($tag) = ($buffer =~ /<([^\s\?\>]+)/o); - $self->{last_tag} = $tag; - $logger->transport("Using tag: $tag ", INTERNAL); - - if ( $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) { - $buffer = $1; - $self->{temp_buffer} = $2; - $complete++; - $logger->transport( "completed read with $buffer", INTERNAL ); - } elsif ( $buffer =~ /^<$tag[^>]*?\/>(.*)/) { - $self->{temp_buffer} = $1; - $complete++; - $logger->transport( "completed read with $buffer", INTERNAL ); - } else { - $self->{temp_buffer} = $buffer; - } - - if( $buffer and $complete ) { - return $buffer; - } - - } - ############ - - my $fh = $self->{_socket}; - - unless( $fh and $fh->connected ) { - throw OpenSRF::EX::Socket ("Attempted read on closed socket", ERROR ); - } - - $logger->transport( "Temp Buffer After first attempt: \n ".$self->{temp_buffer}, INTERNAL) if $self->{temp_buffer}; - - my $flags; - if (defined($timeout) && !$timeout) { - $flags = set_nonblock( $fh ); - } - - $timeout ||= 0; - $logger->transport( "Calling timed_read with timetout $timeout", INTERNAL ); - - - my $complete = 0; - my $first_read = 1; - my $xml = ''; - eval { - my $tag = ''; - eval { - no warnings; - local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required - - # alarm needs a number greater => 1. - my $alarm_timeout = $timeout; - if( $alarm_timeout > 0 and $alarm_timeout < 1 ) { - $alarm_timeout = 1; - } - alarm $alarm_timeout; - do { - - my $buffer = $self->{temp_buffer}; - $self->{temp_buffer} = ''; - ##### - - # This code is no longer in use - #my $ff = fcntl($fh, F_GETFL, 0); - #if ($ff == ($ff | O_NONBLOCK) and $timeout > 0 ) { - #throw OpenSRF::EX::ERROR ("File flags are set to NONBLOCK but timeout is $timeout", ERROR ); - #} - - my $t_buf = ""; - my $read_size = 1024; my $f = 0; - while( my $n = sysread( $fh, $t_buf, $read_size ) ) { - - unless( $fh->connected ) { - OpenSRF::EX::JabberDisconnected->throw( - "Lost jabber client in timed_read()"); - } - - # XXX Change me to debug/internal at some point, this is for testing... - # XXX Found a race condition where reading >= $read_size bytes of data - # will fail if the log line below is removed. - $logger->info("timed_read() read $n bytes of data"); - - - $buffer .= $t_buf; - if( $n < $read_size ) { - #reset_fl( $fh, $f ) if $f; - set_block( $fh ); - last; - } - # see if there is any more data to grab... - $f = set_nonblock( $fh ); - } - - #sysread($fh, $buffer, 2048, length($buffer) ); - #sysread( $fh, $t_buf, 2048 ); - #$buffer .= $t_buf; - - ##### - $logger->transport(" Got [$buffer] from the socket", INTERNAL); - - if ($first_read) { - $logger->transport(" First read Buffer\n [$buffer]", INTERNAL); - ($tag) = ($buffer =~ /<([^\s\?\>\/]+){1}/o); - $self->{last_tag} = $tag; - $first_read--; - $logger->transport("Using tag: $tag ", INTERNAL); - } - - if (!$first_read && $buffer =~ /^(.*?<\/$tag>){1}(.*)/s) { - $buffer = $1; - $self->{temp_buffer} = $2; - $complete++; - $logger->transport( "completed read with $buffer", INTERNAL ); - } elsif (!$first_read && $buffer =~ /^<$tag[^>]*?\/>(.*)/) { - $self->{temp_buffer} = $1; - $complete++; - $logger->transport( "completed read with $buffer", INTERNAL ); - } - - $xml .= $buffer; - - } while (!$complete && $xml); - alarm(0); - }; - alarm(0); - }; - - $logger->transport( "XML Read: $xml", INTERNAL ); - #reset_fl( $fh, $flags) if defined $flags; - set_block( $fh ) if defined $flags; - - if ($complete) { - return $xml; - } - if( $@ ) { - return undef; - } - return ""; -} - - # ------------------------------------------------- sub tcp_connected { - my $self = shift; - return 1 if ($self->{_socket} and $self->{_socket}->connected); - return 0; + return $self->reader->tcp_connected if $self->reader; + return 0; } -sub password { - my( $self, $password ) = @_; - $self->{'oils:password'} = $password if $password; - return $self->{'oils:password'}; -} -# ------------------------------------------------- - -sub username { - my( $self, $username ) = @_; - $self->{'oils:username'} = $username if $username; - return $self->{'oils:username'}; -} - -# ------------------------------------------------- - -sub resource { - my( $self, $resource ) = @_; - $self->{'oils:resource'} = $resource if $resource; - return $self->{'oils:resource'}; -} - -# ------------------------------------------------- - -sub jid { - my( $self, $jid ) = @_; - $self->{'oils:jid'} = $jid if $jid; - return $self->{'oils:jid'}; -} - -sub port { - my( $self, $port ) = @_; - $self->{'oils:port'} = $port if $port; - return $self->{'oils:port'}; -} - -sub host { - my( $self, $host ) = @_; - $self->{'oils:host'} = $host if $host; - return $self->{'oils:host'}; -} - -# ------------------------------------------------- - -=head2 send() - - Sends a Jabber message. - - %params: - to - The JID of the recipient - thread - The Jabber thread - body - The body of the message - -=cut sub send { my $self = shift; - my %params = @_; - - my $to = $params{'to'} || return undef; - my $body = $params{'body'} || return undef; - my $thread = $params{'thread'} || ""; - my $router_command = $params{'router_command'} || ""; - my $router_class = $params{'router_class'} || ""; - - my $msg = OpenSRF::Transport::SlimJabber::MessageWrapper->new; - - $msg->setTo( $to ); - $msg->setThread( $thread ) if $thread; - $msg->setBody( $body ); - $msg->set_router_command( $router_command ); - $msg->set_router_class( $router_class ); - $msg->set_osrf_xid($logger->get_osrf_xid); - - $logger->transport( - "JabberClient Sending message to $to with thread $thread and body: \n$body", INTERNAL ); - - my $soc = $self->{_socket}; - unless( $soc and $soc->connected ) { - throw OpenSRF::EX::Jabber ("No longer connected to jabber server"); - } - print $soc $msg->toString; - - $logger->transport( - "JabberClient Sent message to $to with thread $thread and body: \n$body", INTERNAL ); + my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_); + $self->reader->send($msg->to_xml); } - -=head2 inintialize() - -Connect to the server and log in. - -Throws an OpenSRF::EX::JabberException if we cannot connect -to the server or if the authentication fails. - -=cut - -# --- The logging lines have been commented out until we decide -# on which log files we're using. - sub initialize { my $self = shift; - my $jid = $self->jid; - my $host = $self->host; - my $port = $self->port; - my $username = $self->username; - my $resource = $self->resource; - my $password = $self->password; + my $host = $self->params->{host}; + my $port = $self->params->{port}; + my $username = $self->params->{username}; + my $resource = $self->params->{resource}; + my $password = $self->params->{password}; - my $stream = <<" XML"; - - XML + my $jid = "$username\@$host/$resource"; my $conf = OpenSRF::Utils::Config->current; - my $tail = "_$$"; - if(!$conf->bootstrap->router_name && $username eq "router") { - $tail = ""; - } - - my $auth = <<" XML"; - - -$username -$password -${resource}$tail - - - XML - - my $sock_type = 'IO::Socket::INET'; - - # if port is a string, then we're connecting to a UNIX socket - unless( $port =~ /^\d+$/ ) { - $sock_type = 'IO::Socket::UNIX'; - } - - # --- 5 tries to connect to the jabber server - my $socket; - for(1..5) { - $socket = $sock_type->new( PeerHost => $host, - PeerPort => $port, - Peer => $port, - Proto => 'tcp' ); - $logger->debug( "$jid: $_ connect attempt to $host:$port"); - last if ( $socket and $socket->connected ); - $logger->warn( "$jid: Failed to connect to server...$host:$port (Try # $_)"); - sleep 3; - } - - unless ( $socket and $socket->connected ) { - throw OpenSRF::EX::Jabber( " Could not connect to Jabber server: $!" ); - } - $logger->transport( "Logging into jabber as $jid " . - "from " . ref( $self ), DEBUG ); - - print $socket $stream; - - my $buffer; - eval { - eval { - local $SIG{ALRM} = sub { die "alarm\n" }; # NB: \n required - alarm 3; - sysread($socket, $buffer, 4096); - $logger->transport( "Login buffer 1: $buffer", INTERNAL ); - alarm(0); - }; - alarm(0); - }; - - print $socket $auth; - - if( $socket and $socket->connected() ) { - $self->{_socket} = $socket; - } else { - throw OpenSRF::EX::Jabber( " ** Unable to connect to Jabber server", ERROR ); - } + my $tail = "_$$"; + $tail = "" if !$conf->bootstrap->router_name and $username eq "router"; + $resource = "$resource$tail"; + my $socket = IO::Socket::INET->new( + PeerHost => $host, + PeerPort => $port, + Peer => $port, + Proto => 'tcp' ); - $buffer = $self->timed_read(10); + throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $!") + unless ( $socket and $socket->connected ); - if( $buffer ) {$logger->transport( "Login buffer 2: $buffer", INTERNAL );} + $self->socket($socket); + $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket)); + $self->reader->connect($host, $username, $password, $resource); - if( $buffer and $buffer =~ /type=["\']result["\']/ ) { - $logger->transport( " * $jid: Jabber authenticated and connected", DEBUG ); - } else { - if( !$buffer ) { $buffer = " "; } - $socket->close; - throw OpenSRF::EX::Jabber( " * $jid: Unable to authenticate: $buffer", ERROR ); - } + throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $!") + unless ( $self->reader->connected ); return $self; } + sub construct { my( $class, $app ) = @_; - $logger->transport("Constructing new Jabber connection for $app, my class $class", INTERNAL ); - $class->peer_handle( - $class->new( $app )->initialize() ); + $class->peer_handle($class->new( $app )->initialize()); } -sub process { - my( $self, $timeout ) = @_; +sub process { + my($self, $timeout) = @_; $timeout ||= 0; $timeout = int($timeout); - undef $timeout if ( $timeout < 0 ); - - unless( $self->{_socket}->connected ) { - OpenSRF::EX::JabberDisconnected->throw( - "This JabberClient instance is no longer connected to the server " . - $self->username . " : " . $self->resource, ERROR ); - } - - my $val = $self->timed_read( $timeout ); - - $timeout = "FOREVER" unless ( defined $timeout ); - - if ( ! defined( $val ) ) { - OpenSRF::EX::Jabber->throw( - "Call to Client->timed_read( $timeout ) failed", ERROR ); - } elsif ( ! $val ) { - $logger->transport( - "Call to Client->timed_read( $timeout ) returned 0 bytes of data", INTERNAL ); - } elsif ( $val ) { - $logger->transport( - "Call to Client->timed_read( $timeout ) successfully returned data", INTERNAL ); - } - my $t = $self->{last_tag}; - - if( $t and $val ) { - my @msgs = $val =~ /(<$t[^>]*>.*?<\/$t>)/g; - $val = shift(@msgs); - - if (@msgs) { - my $tmp = $self->{temp_buffer}; - - $self->{temp_buffer} = ''; - $self->{temp_buffer} .= $_ for (@msgs); - $self->{temp_buffer} .= $tmp; - } + unless( $self->reader and $self->reader->connected ) { + throw OpenSRF::EX::JabberDisconnected + ("This JabberClient instance is no longer connected to the server "); } - return $val; + return $self->reader->wait_msg($timeout); } @@ -599,34 +130,10 @@ sub process { # Returns 1 on success, 0 if the socket isn't connected # -------------------------------------------------------------- sub flush_socket { - my $self = shift; - my $socket = $self->{_socket}; - - if( $socket ) { - - my $buf; - my $flags = fcntl($socket, F_GETFL, 0); - - fcntl($socket, F_SETFL, $flags | O_NONBLOCK); - while( my $n = sysread( $socket, $buf, 8192 ) ) { - $logger->debug("flush_socket dropped $n bytes of data"); - if(!$socket->connected()) { - $logger->error("flush_socket dropped data on disconnected socket: $buf"); - } - } - fcntl($socket, F_SETFL, $flags); - - return 0 unless $socket->connected(); - - return 1; - - } else { - - return 0; - } + return $self->reader->flush_socket; } +1; -1; diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm index 90436fe..9194927 100644 --- a/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/Inbound.pm @@ -6,6 +6,7 @@ use OpenSRF::Utils::Logger qw(:level); use OpenSRF::Utils::SettingsClient; use OpenSRF::Utils::Config; use Time::HiRes qw/usleep/; +use FreezeThaw qw/freeze/; my $logger = "OpenSRF::Utils::Logger"; @@ -20,9 +21,6 @@ This service should be loaded at system startup. =cut -# XXX This will be overhauled to connect as a component instead of as -# a user. all in good time, though. - { my $unix_sock; sub unix_sock { return $unix_sock; } @@ -98,7 +96,6 @@ sub listen { $logger->info("loading router info $routers"); for my $router (@$routers) { - if(ref $router) { if( !$router->{services} || grep { $_ eq $self->{app} } @{$router->{services}->{service}} ) { my $name = $router->{name}; @@ -134,9 +131,7 @@ sub listen { $logger->debug("Inbound listener calling process()"); try { - $o = $self->process( -1 ); - - $logger->debug("Inbound listener received ".length($o)." bytes of data"); + $o = $self->process(-1); if(!$o){ $logger->error( @@ -158,7 +153,7 @@ sub listen { throw OpenSRF::EX::Socket( "Unable to connect to UnixServer: socket-file: $sock \n :=> $! " ) unless ($socket->connected); - print $socket $o; + print $socket freeze($o); $socket->close; } } diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm index 6375f6d..0fa95c5 100644 --- a/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/MessageWrapper.pm @@ -1,125 +1,72 @@ package OpenSRF::Transport::SlimJabber::MessageWrapper; -use XML::LibXML; -use OpenSRF::EX qw/:try/; -use OpenSRF::Utils::Logger qw/$logger/; +use strict; use warnings; +use OpenSRF::Transport::SlimJabber::XMPPMessage; + +# ---------------------------------------------------------- +# Legacy wrapper for XMPPMessage +# ---------------------------------------------------------- sub new { my $class = shift; - $class = ref($class) || $class; - - my $xml = shift; - - my ($doc, $msg); - if ($xml) { - my $err; - - try { - $doc = XML::LibXML->new->parse_string($xml); - } catch Error with { - $err = shift; - warn "MessageWrapper received bad XML : error = $err\nXML = $xml\n"; - $logger->error("MessageWrapper received bad XML : error = $err : XML = $xml"); - }; - throw $err if $err; - - $msg = $doc->documentElement; - } else { - $doc = XML::LibXML::Document->createDocument; - $msg = $doc->createElement( 'message' ); - $doc->setDocumentElement( $msg ); - } - - - my $self = { msg_node => $msg }; - - return bless $self => $class; + my $msg = shift; + return bless({msg => $msg}, ref($class) || $class); +} + +sub msg { + my($self, $msg) = @_; + $self->{msg} = $msg if $msg; + return $self->{msg}; } sub toString { - my $self = shift; - if( $self->{msg_node} ) { - return $self->{msg_node}->toString(@_); - } - return ""; + return $_[0]->msg->to_xml; } sub get_body { - my $self = shift; - my ($t_body) = grep {$_->nodeName eq 'body'} $self->{msg_node}->childNodes; - if( $t_body ) { - my $body = $t_body->textContent; - return $body; - } - return ""; + return $_[0]->msg->body; } sub get_sess_id { - my $self = shift; - my ($t_node) = grep {$_->nodeName eq 'thread'} $self->{msg_node}->childNodes; - if( $t_node ) { - return $t_node->textContent; - } - return ""; + return $_[0]->msg->thread; } sub get_msg_type { - my $self = shift; - $self->{msg_node}->getAttribute( 'type' ); + return $_[0]->msg->type; } sub get_remote_id { - my $self = shift; - - # - my $rid = $self->{msg_node}->getAttribute( 'router_from' ); - return $rid if $rid; - - return $self->{msg_node}->getAttribute( 'from' ); + return $_[0]->msg->from; } sub setType { - my $self = shift; - $self->{msg_node}->setAttribute( type => shift ); + $_[0]->msg->type(shift()); } sub setTo { - my $self = shift; - $self->{msg_node}->setAttribute( to => shift ); + $_[0]->msg->to(shift()); } sub setThread { - my $self = shift; - $self->{msg_node}->appendTextChild( thread => shift ); + $_[0]->msg->thread(shift()); } sub setBody { - my $self = shift; - my $body = shift; - $self->{msg_node}->appendTextChild( body => $body ); + $_[0]->msg->body(shift()); } sub set_router_command { - my( $self, $router_command ) = @_; - if( $router_command ) { - $self->{msg_node}->setAttribute( router_command => $router_command ); - } + $_[0]->msg->router_command(shift()); } sub set_router_class { - my( $self, $router_class ) = @_; - if( $router_class ) { - $self->{msg_node}->setAttribute( router_class => $router_class ); - } + $_[0]->msg->router_class(shift()); } sub set_osrf_xid { - my( $self, $xid ) = @_; - $self->{msg_node}->setAttribute( osrf_xid => $xid ); + $_[0]->msg->osrf_xid(shift()); } - sub get_osrf_xid { - my $self = shift; - $self->{msg_node}->getAttribute('osrf_xid'); + return $_[0]->msg->osrf_xid; } 1; diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm index 5c5b959..7c59456 100644 --- a/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/PeerConnection.pm @@ -29,15 +29,9 @@ our $_singleton_connection; sub retrieve { my( $class, $app ) = @_; return $_singleton_connection; -# my @keys = keys %apps_hash; -# OpenSRF::Utils::Logger->transport( -# "Requesting peer for $app and we have @keys", INFO ); -# return $apps_hash{$app}; } - -# !! In here we use the bootstrap config .... sub new { my( $class, $app ) = @_; @@ -63,12 +57,6 @@ sub new { if( $app eq "client" ) { $resource = "client_at_$h"; } -# unless ( $conf->bootstrap->router_name ) { -# $username = 'router'; -# $resource = $app; -# } - - OpenSRF::EX::Config->throw( "JPeer could not load all necesarry values from config" ) unless ( $username and $password and $resource and $host and $port ); @@ -94,26 +82,16 @@ sub new { } sub process { - my $self = shift; my $val = $self->SUPER::process(@_); return 0 unless $val; - - OpenSRF::Utils::Logger->transport( "Calling transport handler for ".$self->app." with: $val", INTERNAL ); - my $t; - $t = OpenSRF::Transport->handler($self->app, $val); - - return $t; + return OpenSRF::Transport->handler($self->app, $val); } sub app { my $self = shift; my $app = shift; - if( $app ) { - OpenSRF::Utils::Logger->transport( "PEER changing app to $app: ".$self->jid, INTERNAL ); - } - - $self->{app} = $app if ($app); + $self->{app} = $app if $app; return $self->{app}; } diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm new file mode 100644 index 0000000..9bd5328 --- /dev/null +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPMessage.pm @@ -0,0 +1,134 @@ +package OpenSRF::Transport::SlimJabber::XMPPMessage; +use strict; use warnings; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::EX qw/:try/; +use strict; use warnings; +use XML::LibXML; + +use constant JABBER_MESSAGE => + "". + "%s%s"; + +sub new { + my $class = shift; + my %args = @_; + my $self = bless({}, $class); + + if($args{xml}) { + $self->parse_xml($args{xml}); + + } else { + $self->{to} = $args{to} || ''; + $self->{from} = $args{from} || ''; + $self->{thread} = $args{thread} || ''; + $self->{body} = $args{body} || ''; + $self->{osrf_xid} = $args{osrf_xid} || ''; + $self->{router_command} = $args{router_command} || ''; + $self->{router_class} = $args{router_class} || ''; + } + + return $self; +} + +sub to { + my($self, $to) = @_; + $self->{to} = $to if defined $to; + return $self->{to}; +} +sub from { + my($self, $from) = @_; + $self->{from} = $from if defined $from; + return $self->{from}; +} +sub thread { + my($self, $thread) = @_; + $self->{thread} = $thread if defined $thread; + return $self->{thread}; +} +sub body { + my($self, $body) = @_; + $self->{body} = $body if defined $body; + return $self->{body}; +} +sub status { + my($self, $status) = @_; + $self->{status} = $status if defined $status; + return $self->{status}; +} +sub type { + my($self, $type) = @_; + $self->{type} = $type if defined $type; + return $self->{type}; +} +sub err_type { + my($self, $err_type) = @_; + $self->{err_type} = $err_type if defined $err_type; + return $self->{err_type}; +} +sub err_code { + my($self, $err_code) = @_; + $self->{err_code} = $err_code if defined $err_code; + return $self->{err_code}; +} +sub osrf_xid { + my($self, $osrf_xid) = @_; + $self->{osrf_xid} = $osrf_xid if defined $osrf_xid; + return $self->{osrf_xid}; +} +sub router_command { + my($self, $router_command) = @_; + $self->{router_command} = $router_command if defined $router_command; + return $self->{router_command}; +} +sub router_class { + my($self, $router_class) = @_; + $self->{router_class} = $router_class if defined $router_class; + return $self->{router_class}; +} + + +sub to_xml { + my $self = shift; + + my $body = $self->{body}; + $body =~ s/&/&/sog; + $body =~ s//>/sog; + + return sprintf( + JABBER_MESSAGE, + $self->{to}, + $self->{from}, + $self->{router_command}, + $self->{router_class}, + $self->{osrf_xid}, + $self->{thread}, + $body + ); +} + +sub parse_xml { + my($self, $xml) = @_; + my($doc, $err); + + try { + $doc = XML::LibXML->new->parse_string($xml); + } catch Error with { + my $err = shift; + $logger->error("Error parsing message xml: $xml --- $err"); + }; + throw $err if $err; + + my $root = $doc->documentElement; + + $self->{body} = $root->findnodes('/message/body').''; + $self->{thread} = $root->findnodes('/message/thread').''; + $self->{from} = $root->getAttribute('router_from'); + $self->{from} = $root->getAttribute('from') unless $self->{from}; + $self->{to} = $root->getAttribute('to'); + $self->{type} = $root->getAttribute('type'); + $self->{osrf_xid} = $root->getAttribute('osrf_xid'); +} + + +1; diff --git a/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm new file mode 100644 index 0000000..b0705ab --- /dev/null +++ b/src/perlmods/OpenSRF/Transport/SlimJabber/XMPPReader.pm @@ -0,0 +1,350 @@ +package OpenSRF::Transport::SlimJabber::XMPPReader; +use strict; use warnings; +use XML::Parser; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use Time::HiRes qw/time/; +use OpenSRF::Transport::SlimJabber::XMPPMessage; +use OpenSRF::Utils::Logger qw/$logger/; + +# ----------------------------------------------------------- +# Connect, disconnect, and authentication messsage templates +# ----------------------------------------------------------- +use constant JABBER_CONNECT => + ""; + +use constant JABBER_BASIC_AUTH => + "" . + "%s%s%s"; + +use constant JABBER_DISCONNECT => ""; + + +# ----------------------------------------------------------- +# XMPP Stream states +# ----------------------------------------------------------- +use constant DISCONNECTED => 1; +use constant CONNECT_RECV => 2; +use constant CONNECTED => 3; + + +# ----------------------------------------------------------- +# XMPP Message states +# ----------------------------------------------------------- +use constant IN_NOTHING => 1; +use constant IN_BODY => 2; +use constant IN_THREAD => 3; +use constant IN_STATUS => 4; + + +# ----------------------------------------------------------- +# Constructor, getter/setters +# ----------------------------------------------------------- +sub new { + my $class = shift; + my $socket = shift; + + my $self = bless({}, $class); + + $self->{queue} = []; + $self->{stream_state} = DISCONNECTED; + $self->{xml_state} = IN_NOTHING; + $self->socket($socket); + + my $p = new XML::Parser(Handlers => { + Start => \&start_element, + End => \&end_element, + Char => \&characters, + }); + + $self->parser($p->parse_start); # create a push parser + $self->parser->{_parent_} = $self; + $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; + return $self; +} + +sub push_msg { + my($self, $msg) = @_; + push(@{$self->{queue}}, $msg) if $msg; +} + +sub next_msg { + my $self = shift; + return shift @{$self->{queue}}; +} + +sub peek_msg { + my $self = shift; + return (@{$self->{queue}} > 0); +} + +sub parser { + my($self, $parser) = @_; + $self->{parser} = $parser if $parser; + return $self->{parser}; +} + +sub socket { + my($self, $socket) = @_; + $self->{socket} = $socket if $socket; + return $self->{socket}; +} + +sub stream_state { + my($self, $stream_state) = @_; + $self->{stream_state} = $stream_state if $stream_state; + return $self->{stream_state}; +} + +sub xml_state { + my($self, $xml_state) = @_; + $self->{xml_state} = $xml_state if $xml_state; + return $self->{xml_state}; +} + +sub message { + my($self, $message) = @_; + $self->{message} = $message if $message; + return $self->{message}; +} + + +# ----------------------------------------------------------- +# Stream and connection handling methods +# ----------------------------------------------------------- + +sub connect { + my($self, $domain, $username, $password, $resource) = @_; + + $self->send(sprintf(JABBER_CONNECT, $domain)); + $self->wait(10); + + unless($self->{stream_state} == CONNECT_RECV) { + $logger->error("No initial XMPP response from server"); + return 0; + } + + $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource)); + $self->wait(10); + + unless($self->connected) { + $logger->error('XMPP connect failed'); + return 0; + } + + return 1; +} + +sub disconnect { + my $self = shift; + $self->send(JABBER_DISCONNECT); + shutdown($self->socket, 2); + close($self->socket); +} + +# ----------------------------------------------------------- +# returns true if this stream is connected to the server +# ----------------------------------------------------------- +sub connected { + my $self = shift; + return ($self->tcp_connected and $self->{stream_state} == CONNECTED); +} + +# ----------------------------------------------------------- +# returns true if the socket is connected +# ----------------------------------------------------------- +sub tcp_connected { + my $self = shift; + return ($self->socket and $self->socket->connected); +} + +# ----------------------------------------------------------- +# sends pre-formated XML +# ----------------------------------------------------------- +sub send { + my($self, $xml) = @_; + $self->{socket}->print($xml); +} + +# ----------------------------------------------------------- +# Puts a file handle into blocking mode +# ----------------------------------------------------------- +sub set_block { + my $fh = shift; + my $flags = fcntl($fh, F_GETFL, 0); + $flags &= ~O_NONBLOCK; + fcntl($fh, F_SETFL, $flags); +} + + +# ----------------------------------------------------------- +# Puts a file handle into non-blocking mode +# ----------------------------------------------------------- +sub set_nonblock { + my $fh = shift; + my $flags = fcntl($fh, F_GETFL, 0); + fcntl($fh, F_SETFL, $flags | O_NONBLOCK); +} + + +sub wait { + my($self, $timeout) = @_; + + return $self->next_msg if $self->peek_msg; + + $timeout ||= 0; + $timeout = undef if $timeout < 0; + my $socket = $self->{socket}; + + set_block($socket); + + # build the select readset + my $infile = ''; + vec($infile, $socket->fileno, 1) = 1; + return undef unless select($infile, undef, undef, $timeout); + + # now slurp the data off the socket + my $buf; + my $read_size = 1024; + while(my $n = sysread($socket, $buf, $read_size)) { + $self->{parser}->parse_more($buf) if $buf; + if($n < $read_size or $self->peek_msg) { + set_block($socket); + last; + } + set_nonblock($socket); + } + + return $self->next_msg; +} + +# ----------------------------------------------------------- +# Waits up to timeout seconds for a fully-formed XMPP +# message to arrive. If timeout is < 0, waits indefinitely +# ----------------------------------------------------------- +sub wait_msg { + my($self, $timeout) = @_; + my $xml; + + $timeout = 0 unless defined $timeout; + + if($timeout < 0) { + while(1) { + return $xml if $xml = $self->wait($timeout); + } + + } else { + while($timeout >= 0) { + my $start = time; + return $xml if $xml = $self->wait($timeout); + $timeout -= time - $start; + } + } + + return undef; +} + + +# ----------------------------------------------------------- +# SAX Handlers +# ----------------------------------------------------------- + + +sub start_element { + my($parser, $name, %attrs) = @_; + my $self = $parser->{_parent_}; + + if($name eq 'message') { + + my $msg = $self->{message}; + $msg->{to} = $attrs{'to'}; + $msg->{from} = $attrs{router_from} if $attrs{router_from}; + $msg->{from} = $attrs{from} unless $msg->{from}; + $msg->{osrf_xid} = $attrs{'osrf_xid'}; + $msg->{type} = $attrs{type}; + + } elsif($name eq 'body') { + $self->{xml_state} = IN_BODY; + + } elsif($name eq 'thread') { + $self->{xml_state} = IN_THREAD; + + } elsif($name eq 'stream:stream') { + $self->{stream_state} = CONNECT_RECV; + + } elsif($name eq 'iq') { + if($attrs{type} and $attrs{type} eq 'result') { + $self->{stream_state} = CONNECTED; + } + + } elsif($name eq 'status') { + $self->{xml_state } = IN_STATUS; + + } elsif($name eq 'stream:error') { + $self->{stream_state} = DISCONNECTED; + + } elsif($name eq 'error') { + $self->{message}->{err_type} = $attrs{'type'}; + $self->{message}->{err_code} = $attrs{'code'}; + $self->{stream_state} = DISCONNECTED; + } +} + +sub characters { + my($parser, $chars) = @_; + my $self = $parser->{_parent_}; + my $state = $self->{xml_state}; + + if($state == IN_BODY) { + $self->{message}->{body} .= $chars; + + } elsif($state == IN_THREAD) { + $self->{message}->{thread} .= $chars; + + } elsif($state == IN_STATUS) { + $self->{message}->{status} .= $chars; + } +} + +sub end_element { + my($parser, $name) = @_; + my $self = $parser->{_parent_}; + $self->{xml_state} = IN_NOTHING; + + if($name eq 'message') { + $self->push_msg($self->{message}); + $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; + + } elsif($name eq 'stream:stream') { + $self->{stream_state} = DISCONNECTED; + } +} + +sub flush_socket { + my $self = shift; + my $socket = $self->socket; + return 0 unless $socket and $socket->connected; + + my $flags = fcntl($socket, F_GETFL, 0); + fcntl($socket, F_SETFL, $flags | O_NONBLOCK); + + while( my $n = sysread( $socket, my $buf, 8192 ) ) { + $logger->debug("flush_socket dropped $n bytes of data"); + $logger->error("flush_socket dropped data on disconnected socket: $buf") + unless($socket->connected); + } + + fcntl($socket, F_SETFL, $flags); + return 0 unless $socket->connected; + return 1; +} + + + + + +1; + + + + + diff --git a/src/perlmods/OpenSRF/UnixServer.pm b/src/perlmods/OpenSRF/UnixServer.pm index 2792db5..c4b48c8 100644 --- a/src/perlmods/OpenSRF/UnixServer.pm +++ b/src/perlmods/OpenSRF/UnixServer.pm @@ -14,6 +14,7 @@ use OpenSRF::Utils::JSON; use vars qw/@ISA $app/; use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); use Carp; +use FreezeThaw qw/thaw/; use IO::Socket::INET; use IO::Socket::UNIX; @@ -97,6 +98,7 @@ sub process_request { exit; } + ($data) = thaw($data); my $app_session = OpenSRF::Transport->handler( $self->app(), $data ); if(!ref($app_session)) { -- 2.11.0