From 62ba7aa1012dd0d82e40b4e901c980c35f4cbc8a Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Fri, 12 May 2023 12:40:16 -0400 Subject: [PATCH] LP2017941 Remove more Jabber refs / libs Among other thibngs, makes troubleshooting a bit easier. Signed-off-by: Bill Erickson --- src/perl/MANIFEST | 10 +- src/perl/lib/OpenSRF/EX.pm | 17 +- src/perl/lib/OpenSRF/Server.pm | 4 +- src/perl/lib/OpenSRF/Transport.pm | 6 +- src/perl/lib/OpenSRF/Transport/PeerHandle.pm | 2 +- src/perl/lib/OpenSRF/Transport/SlimJabber.pm | 14 - .../lib/OpenSRF/Transport/SlimJabber/Client.pm | 224 ------------ .../OpenSRF/Transport/SlimJabber/MessageWrapper.pm | 72 ---- .../OpenSRF/Transport/SlimJabber/PeerConnection.pm | 103 ------ .../OpenSRF/Transport/SlimJabber/XMPPMessage.pm | 139 -------- .../lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm | 393 --------------------- src/perl/t/07-Transport.t | 12 +- 12 files changed, 17 insertions(+), 979 deletions(-) delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm delete mode 100644 src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm diff --git a/src/perl/MANIFEST b/src/perl/MANIFEST index 2f8a129..48ffcf0 100644 --- a/src/perl/MANIFEST +++ b/src/perl/MANIFEST @@ -26,12 +26,10 @@ lib/OpenSRF/System.pm lib/OpenSRF/Transport.pm lib/OpenSRF/Transport/Listener.pm lib/OpenSRF/Transport/PeerHandle.pm -lib/OpenSRF/Transport/SlimJabber.pm -lib/OpenSRF/Transport/SlimJabber/Client.pm -lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm -lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm -lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm -lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm +lib/OpenSRF/Transport/Redis/Client.pm +lib/OpenSRF/Transport/Redis/Message.pm +lib/OpenSRF/Transport/Redis/BusConnection.pm +lib/OpenSRF/Transport/Redis/PeerConnection.pm lib/OpenSRF/Utils.pm lib/OpenSRF/Utils/Cache.pm lib/OpenSRF/Utils/Config.pm diff --git a/src/perl/lib/OpenSRF/EX.pm b/src/perl/lib/OpenSRF/EX.pm index c1ae701..6492b0c 100644 --- a/src/perl/lib/OpenSRF/EX.pm +++ b/src/perl/lib/OpenSRF/EX.pm @@ -30,11 +30,11 @@ a new() method that takes a message and a message() method that returns that mes =head2 Synopsis - throw OpenSRF::EX::Jabber ("I Am Dying"); + throw OpenSRF::EX::Transport ("I Am Dying"); OpenSRF::EX::InvalidArg->throw( "Another way" ); - my $je = OpenSRF::EX::Jabber->new( "I Cannot Connect" ); + my $je = OpenSRF::EX::Transport->new( "I Cannot Connect" ); $je->throw(); @@ -147,19 +147,6 @@ our $ex_msg_header = "System PANIC"; # Some basic exceptions # ------------------------------------------------------------------- -package OpenSRF::EX::Jabber; -use base 'OpenSRF::EX::ERROR'; -our $ex_msg_header = "Jabber Exception"; - -package OpenSRF::EX::JabberDisconnected; -use base 'OpenSRF::EX::ERROR'; -our $ex_msg_header = "JabberDisconnected Exception"; - -=head2 OpenSRF::EX::Jabber - -Thrown when there is a problem using the Jabber service - -=cut package OpenSRF::EX::Transport; use base 'OpenSRF::EX::ERROR'; diff --git a/src/perl/lib/OpenSRF/Server.pm b/src/perl/lib/OpenSRF/Server.pm index 0535fae..8782963 100644 --- a/src/perl/lib/OpenSRF/Server.pm +++ b/src/perl/lib/OpenSRF/Server.pm @@ -299,7 +299,7 @@ sub write_child { local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; }; # In rare cases a child can die between creation and first - # write, typically a result of a jabber connect error. Before + # write, typically a result of a bus connect error. Before # sending data to each child, confirm it's still alive. If it's # not, log the error and drop the message to prevent the parent # process from dying. @@ -638,7 +638,7 @@ sub run { my $orig_name = $0; $0 = "$0*"; - # Discard extraneous data from the jabber socket + # Discard extraneous data from the bus if(!$network->flush_socket()) { $logger->error("server: network disconnected! child dropping request and exiting: $data"); exit; diff --git a/src/perl/lib/OpenSRF/Transport.pm b/src/perl/lib/OpenSRF/Transport.pm index a222c04..72d3b61 100644 --- a/src/perl/lib/OpenSRF/Transport.pm +++ b/src/perl/lib/OpenSRF/Transport.pm @@ -105,8 +105,8 @@ sub handler { $logger->transport( "Transport::handler() creating \n$body", INTERNAL ); - # We need to disconnect the session if we got a jabber error on the client side. For - # server side, we'll just tear down the session and go away. + # We need to disconnect the session if we got a bus error on the client + # side. For server side, we'll just tear down the session and go away. if (defined($type) and $type eq 'error') { # If we're a server if( $app_session->endpoint == $app_session->SERVER() ) { @@ -119,7 +119,7 @@ sub handler { #$app_session->push_resend( $app_session->app_request( # $doc->documentElement->firstChild->threadTrace ) ); $logger->debug( - "Got Jabber error on client connection $remote_id, nothing we can do..", ERROR ); + "Got error on client connection $remote_id, nothing we can do..", ERROR ); return 1; } } diff --git a/src/perl/lib/OpenSRF/Transport/PeerHandle.pm b/src/perl/lib/OpenSRF/Transport/PeerHandle.pm index e263971..4dff5a5 100644 --- a/src/perl/lib/OpenSRF/Transport/PeerHandle.pm +++ b/src/perl/lib/OpenSRF/Transport/PeerHandle.pm @@ -1,7 +1,7 @@ package OpenSRF::Transport::PeerHandle; use OpenSRF::Utils::Logger qw(:level); use OpenSRF::EX; -use base qw/OpenSRF::Transport::SlimJabber::PeerConnection/; +use base qw/OpenSRF::Transport::Redis::PeerConnection/; use vars '@ISA'; my $peer; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber.pm deleted file mode 100644 index aa460b5..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber.pm +++ /dev/null @@ -1,14 +0,0 @@ -package OpenSRF::Transport::SlimJabber; -use base qw/OpenSRF::Transport/; - -=head2 OpenSRF::Transport::SlimJabber - -Implements the Transport interface for providing the system with appropriate -classes for handling transport layer messaging - -=cut - - -sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; } - -1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm deleted file mode 100644 index cc19698..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/Client.pm +++ /dev/null @@ -1,224 +0,0 @@ -package OpenSRF::Transport::SlimJabber::Client; - -use strict; -use warnings; - -use OpenSRF::EX; -use OpenSRF::Utils::Config; -use OpenSRF::Utils::Logger qw/$logger/; -use OpenSRF::Transport::SlimJabber::XMPPReader; -use OpenSRF::Transport::SlimJabber::XMPPMessage; -use IO::Socket::INET; - -=head1 NAME - -OpenSRF::Transport::SlimJabber::Client - -=head1 SYNOPSIS - - - -=head1 DESCRIPTION - - - -=cut - -=head1 METHODS - -=head2 new - -=cut - -sub new { - my( $class, %params ) = @_; - my $self = bless({}, ref($class) || $class); - $self->params(\%params); - return $self; -} - -=head2 reader - -=cut - -sub reader { - my($self, $reader) = @_; - $self->{reader} = $reader if $reader; - return $self->{reader}; -} - -=head2 params - -=cut - -sub params { - my($self, $params) = @_; - $self->{params} = $params if $params; - return $self->{params}; -} - -=head2 socket - -=cut - -sub socket { - my($self, $socket) = @_; - $self->{socket} = $socket if $socket; - return $self->{socket}; -} - -=head2 disconnect - -=cut - -sub disconnect { - my $self = shift; - $self->reader->disconnect if $self->reader; -} - - -=head2 gather - -=cut - -sub gather { - my $self = shift; - $self->process( 0 ); -} - -# ------------------------------------------------- - -=head2 tcp_connected - -=cut - -sub tcp_connected { - my $self = shift; - return $self->reader->tcp_connected if $self->reader; - return 0; -} - -sub connected { - my $self = shift; - return $self->reader->connected if $self->reader; - return 0; -} - - -=head2 send - -=cut - -sub send { - my $self = shift; - my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_); - $msg->osrf_xid($logger->get_osrf_xid); - $msg->from($self->xmpp_id); - my $xml = $msg->to_xml; - { - use bytes; - my $len = length($xml); - if ($len >= $self->{msg_size_warn}) { - $logger->warn("Sending large message of $len bytes to " . $msg->to) - } - } - $self->reader->send($xml); -} - -=head2 initialize - -=cut - -sub initialize { - - my $self = shift; - - my $host = $self->params->{host}; - my $port = $self->params->{port}; - my $username = $self->params->{username}; - my $resource = $self->params->{resource}; - my $password = $self->params->{password}; - - my $conf = OpenSRF::Utils::Config->current; - - $self->{msg_size_warn} = $conf->bootstrap->msg_size_warn || 1800000; - - my $tail = "_$$"; - $tail = "" if !$conf->bootstrap->router_name and $username eq "router"; - $resource = "$resource$tail"; - - my $socket = IO::Socket::INET->new( - PeerHost => $host, - PeerPort => int($port), - Proto => 'tcp' ); - - throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $@") - unless ( $socket and $socket->connected ); - - $self->socket($socket); - $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket)); - $self->reader->connect($host, $username, $password, $resource); - - throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $@") - unless ( $self->reader->connected ); - - $self->xmpp_id("$username\@$host/$resource"); - $logger->debug(sub{return "Created XMPP connection " . $self->xmpp_id }); - return $self; -} - - -# Our full login: username@host/resource -sub xmpp_id { - my($self, $xmpp_id) = @_; - $self->{xmpp_id} = $xmpp_id if $xmpp_id; - return $self->{xmpp_id}; -} - - -=head2 construct - -=cut - -sub construct { - my( $class, $app ) = @_; - $class->peer_handle($class->new( $app )->initialize()); -} - - -=head2 process - -=cut - -sub process { - my($self, $timeout) = @_; - - $timeout ||= 0; - $timeout = int($timeout); - - unless( $self->reader and $self->reader->connected ) { - throw OpenSRF::EX::JabberDisconnected - ("This JabberClient instance is no longer connected to the server "); - } - - return $self->reader->wait_msg($timeout); -} - - -=head2 flush_socket - -Sets the socket to O_NONBLOCK, reads all of the data off of the -socket, the restores the sockets flags. Returns 1 on success, 0 if -the socket isn't connected. - -=cut - -sub flush_socket { - my $self = shift; - return 0 unless $self->reader; - return $self->reader->flush_socket; -} - -1; - - diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm deleted file mode 100644 index 0fa95c5..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm +++ /dev/null @@ -1,72 +0,0 @@ -package OpenSRF::Transport::SlimJabber::MessageWrapper; -use strict; use warnings; -use OpenSRF::Transport::SlimJabber::XMPPMessage; - -# ---------------------------------------------------------- -# Legacy wrapper for XMPPMessage -# ---------------------------------------------------------- - -sub new { - my $class = shift; - my $msg = shift; - return bless({msg => $msg}, ref($class) || $class); -} - -sub msg { - my($self, $msg) = @_; - $self->{msg} = $msg if $msg; - return $self->{msg}; -} - -sub toString { - return $_[0]->msg->to_xml; -} - -sub get_body { - return $_[0]->msg->body; -} - -sub get_sess_id { - return $_[0]->msg->thread; -} - -sub get_msg_type { - return $_[0]->msg->type; -} - -sub get_remote_id { - return $_[0]->msg->from; -} - -sub setType { - $_[0]->msg->type(shift()); -} - -sub setTo { - $_[0]->msg->to(shift()); -} - -sub setThread { - $_[0]->msg->thread(shift()); -} - -sub setBody { - $_[0]->msg->body(shift()); -} - -sub set_router_command { - $_[0]->msg->router_command(shift()); -} -sub set_router_class { - $_[0]->msg->router_class(shift()); -} - -sub set_osrf_xid { - $_[0]->msg->osrf_xid(shift()); -} - -sub get_osrf_xid { - return $_[0]->msg->osrf_xid; -} - -1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm deleted file mode 100644 index e4f4749..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm +++ /dev/null @@ -1,103 +0,0 @@ -package OpenSRF::Transport::SlimJabber::PeerConnection; -use strict; -use base qw/OpenSRF::Transport::SlimJabber::Client/; -use OpenSRF::Utils::Config; -use OpenSRF::Utils::Logger qw(:level); -use OpenSRF::EX qw/:try/; - -=head1 Description - -Represents a single connection to a remote peer. The -Jabber values are loaded from the config file. - -Subclasses OpenSRF::Transport::SlimJabber::Client. - -=cut - -=head2 new() - - new( $appname ); - - The $appname parameter tells this class how to find the correct - Jabber username, password, etc to connect to the server. - -=cut - -our %apps_hash; -our $_singleton_connection; - -sub retrieve { - my( $class, $app ) = @_; - return $_singleton_connection; -} - -sub reset { - return unless $_singleton_connection; - $_singleton_connection->disconnect; - $_singleton_connection = undef; -} - - -sub new { - my( $class, $app ) = @_; - - my $peer_con = $class->retrieve; - return $peer_con if ($peer_con and $peer_con->tcp_connected); - - my $config = OpenSRF::Utils::Config->current; - - if( ! $config ) { - throw OpenSRF::EX::Config( "No suitable config found for PeerConnection" ); - } - - my $conf = OpenSRF::Utils::Config->current; - my $domain = $conf->bootstrap->domain; - my $h = $conf->env->hostname; - OpenSRF::Utils::Logger->error("use of is deprecated") if $conf->bootstrap->domains; - - my $username = $conf->bootstrap->username; - my $password = $conf->bootstrap->passwd; - my $port = $conf->bootstrap->port; - my $resource = "${app}_drone_at_$h"; - my $host = $domain; # XXX for now... - - if( $app eq "client" ) { $resource = "client_at_$h"; } - - OpenSRF::EX::Config->throw( "JPeer could not load all necessary values from config" ) - unless ( $username and $password and $resource and $host and $port ); - - my $self = __PACKAGE__->SUPER::new( - username => $username, - resource => $resource, - password => $password, - host => $host, - port => $port, - ); - - bless( $self, $class ); - - $self->app($app); - - $_singleton_connection = $self; - $apps_hash{$app} = $self; - - return $_singleton_connection; - return $apps_hash{$app}; -} - -sub process { - my $self = shift; - my $val = $self->SUPER::process(@_); - return 0 unless $val; - return OpenSRF::Transport->handler($self->app, $val); -} - -sub app { - my $self = shift; - my $app = shift; - $self->{app} = $app if $app; - return $self->{app}; -} - -1; - diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm deleted file mode 100644 index 0fc4124..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm +++ /dev/null @@ -1,139 +0,0 @@ -package OpenSRF::Transport::SlimJabber::XMPPMessage; -use strict; use warnings; -use OpenSRF::Utils::Logger qw/$logger/; -use OpenSRF::EX qw/:try/; -use strict; use warnings; -use XML::LibXML; - -use constant JABBER_MESSAGE => - "". - "". - "%s%s"; - -sub new { - my $class = shift; - my %args = @_; - my $self = bless({}, $class); - - if($args{xml}) { - $self->parse_xml($args{xml}); - - } else { - $self->{to} = $args{to} || ''; - $self->{from} = $args{from} || ''; - $self->{thread} = $args{thread} || ''; - $self->{body} = $args{body} || ''; - $self->{osrf_xid} = $args{osrf_xid} || ''; - $self->{router_command} = $args{router_command} || ''; - $self->{router_class} = $args{router_class} || ''; - } - - return $self; -} - -sub to { - my($self, $to) = @_; - $self->{to} = $to if defined $to; - return $self->{to}; -} -sub from { - my($self, $from) = @_; - $self->{from} = $from if defined $from; - return $self->{from}; -} -sub thread { - my($self, $thread) = @_; - $self->{thread} = $thread if defined $thread; - return $self->{thread}; -} -sub body { - my($self, $body) = @_; - $self->{body} = $body if defined $body; - return $self->{body}; -} -sub status { - my($self, $status) = @_; - $self->{status} = $status if defined $status; - return $self->{status}; -} -sub type { - my($self, $type) = @_; - $self->{type} = $type if defined $type; - return $self->{type}; -} -sub err_type { - my($self, $err_type) = @_; - $self->{err_type} = $err_type if defined $err_type; - return $self->{err_type}; -} -sub err_code { - my($self, $err_code) = @_; - $self->{err_code} = $err_code if defined $err_code; - return $self->{err_code}; -} -sub osrf_xid { - my($self, $osrf_xid) = @_; - $self->{osrf_xid} = $osrf_xid if defined $osrf_xid; - return $self->{osrf_xid}; -} -sub router_command { - my($self, $router_command) = @_; - $self->{router_command} = $router_command if defined $router_command; - return $self->{router_command}; -} -sub router_class { - my($self, $router_class) = @_; - $self->{router_class} = $router_class if defined $router_class; - return $self->{router_class}; -} - - -sub to_xml { - my $self = shift; - - my $body = $self->{body}; - $body =~ s/&/&/sog; - $body =~ s//>/sog; - - return sprintf( - JABBER_MESSAGE, - $self->{to}, - $self->{from}, - $self->{router_command}, - $self->{router_class}, - $self->{osrf_xid}, - $self->{thread}, - $body - ); -} - -sub parse_xml { - my($self, $xml) = @_; - my($doc, $err); - - try { - $doc = XML::LibXML->new->parse_string($xml); - } catch Error with { - my $err = shift; - $logger->error("Error parsing message xml: $xml --- $err"); - }; - throw $err if $err; - - my $root = $doc->documentElement; - my $osrf_node = $root->findnodes('/message/opensrf')->shift; - - $self->{body} = $root->findnodes('/message/body').''; - $self->{thread} = $root->findnodes('/message/thread').''; - - $self->{from} = $osrf_node->getAttribute('router_from'); - $self->{from} = $root->getAttribute('from') unless $self->{from}; - - $self->{to} = $root->getAttribute('to'); - - $self->{type} = $root->getAttribute('type'); - $self->{osrf_xid} = $osrf_node->getAttribute('osrf_xid'); -} - - -1; diff --git a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm b/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm deleted file mode 100644 index 0a84ae1..0000000 --- a/src/perl/lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm +++ /dev/null @@ -1,393 +0,0 @@ -package OpenSRF::Transport::SlimJabber::XMPPReader; -use strict; use warnings; -use XML::Parser; -use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); -use Time::HiRes qw/time/; -use OpenSRF::Transport::SlimJabber::XMPPMessage; -use OpenSRF::Utils::Logger qw/$logger/; -use OpenSRF::EX; - -# ----------------------------------------------------------- -# Connect, disconnect, and authentication messsage templates -# ----------------------------------------------------------- -use constant JABBER_CONNECT => - ""; - -use constant JABBER_BASIC_AUTH => - "" . - "%s%s%s"; - -use constant JABBER_DISCONNECT => ""; - - -# ----------------------------------------------------------- -# XMPP Stream states -# ----------------------------------------------------------- -use constant DISCONNECTED => 1; -use constant CONNECT_RECV => 2; -use constant CONNECTED => 3; - - -# ----------------------------------------------------------- -# XMPP Message states -# ----------------------------------------------------------- -use constant IN_NOTHING => 1; -use constant IN_BODY => 2; -use constant IN_THREAD => 3; -use constant IN_STATUS => 4; - - -# ----------------------------------------------------------- -# Constructor, getter/setters -# ----------------------------------------------------------- -sub new { - my $class = shift; - my $socket = shift; - - my $self = bless({}, $class); - - $self->{queue} = []; - $self->{stream_state} = DISCONNECTED; - $self->{xml_state} = IN_NOTHING; - $self->socket($socket); - - my $p = new XML::Parser(Handlers => { - Start => \&start_element, - End => \&end_element, - Char => \&characters, - }); - - $self->parser($p->parse_start); # create a push parser - $self->parser->{_parent_} = $self; - $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; - return $self; -} - -sub push_msg { - my($self, $msg) = @_; - push(@{$self->{queue}}, $msg) if $msg; -} - -sub next_msg { - my $self = shift; - return shift @{$self->{queue}}; -} - -sub peek_msg { - my $self = shift; - return (@{$self->{queue}} > 0); -} - -sub parser { - my($self, $parser) = @_; - $self->{parser} = $parser if $parser; - return $self->{parser}; -} - -sub socket { - my($self, $socket) = @_; - $self->{socket} = $socket if $socket; - return $self->{socket}; -} - -sub stream_state { - my($self, $stream_state) = @_; - $self->{stream_state} = $stream_state if $stream_state; - return $self->{stream_state}; -} - -sub xml_state { - my($self, $xml_state) = @_; - $self->{xml_state} = $xml_state if $xml_state; - return $self->{xml_state}; -} - -sub message { - my($self, $message) = @_; - $self->{message} = $message if $message; - return $self->{message}; -} - - -# ----------------------------------------------------------- -# Stream and connection handling methods -# ----------------------------------------------------------- - -sub connect { - my($self, $domain, $username, $password, $resource) = @_; - - $self->send(sprintf(JABBER_CONNECT, $domain)); - $self->wait(10); - - unless($self->{stream_state} == CONNECT_RECV) { - $logger->error("No initial XMPP response from server"); - return 0; - } - - $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource)); - $self->wait(10); - - unless($self->connected) { - $logger->error('XMPP connect failed'); - return 0; - } - - return 1; -} - -sub disconnect { - my $self = shift; - return unless $self->socket; - if($self->tcp_connected) { - $self->send(JABBER_DISCONNECT); - shutdown($self->socket, 2); - } - close($self->socket); -} - -# ----------------------------------------------------------- -# returns true if this stream is connected to the server -# ----------------------------------------------------------- -sub connected { - my $self = shift; - return ($self->tcp_connected and $self->{stream_state} == CONNECTED); -} - -# ----------------------------------------------------------- -# returns true if the socket is connected -# ----------------------------------------------------------- -sub tcp_connected { - my $self = shift; - return ($self->socket and $self->socket->connected); -} - -# ----------------------------------------------------------- -# sends pre-formated XML -# ----------------------------------------------------------- -sub send { - my($self, $xml) = @_; - - local $SIG{'PIPE'} = sub { - $logger->error("Disconnected from Jabber server, exiting immediately"); - exit(99); - }; - $self->{socket}->print($xml); -} - -# ----------------------------------------------------------- -# Puts a file handle into blocking mode -# ----------------------------------------------------------- -sub set_block { - my $fh = shift; - my $flags = fcntl($fh, F_GETFL, 0); - $flags &= ~O_NONBLOCK; - fcntl($fh, F_SETFL, $flags); -} - - -# ----------------------------------------------------------- -# Puts a file handle into non-blocking mode -# ----------------------------------------------------------- -sub set_nonblock { - my $fh = shift; - my $flags = fcntl($fh, F_GETFL, 0); - fcntl($fh, F_SETFL, $flags | O_NONBLOCK); -} - - -sub wait { - my($self, $timeout) = @_; - - return $self->next_msg if $self->peek_msg; - - $timeout ||= 0; - $timeout = undef if $timeout < 0; - my $socket = $self->{socket}; - - set_block($socket); - - # build the select readset - my $infile = ''; - vec($infile, $socket->fileno, 1) = 1; - - my $nfound; - if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) { - $nfound = select($infile, undef, undef, $timeout); - } else { - $timeout -= 1.0; - for ( - my $sleep = 1.0; - $timeout >= 0.0; - do { - $sleep = $timeout < 1.0 ? $timeout : 1.0; - $timeout -= 1.0; - } - ) { - $nfound = select($infile, undef, undef, $sleep); - last if $nfound; - if ( - OpenSRF->OSRF_APACHE_REQUEST_OBJ && - OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted - ) { - # Should this be more severe? Die or throw error? - $logger->warn("Upstream Apache client disconnected, aborting."); - last; - }; - } - } - return undef if !$nfound or $nfound == -1; - - # now slurp the data off the socket - my $buf; - my $read_size = 1024; - my $nonblock = 0; - my $nbytes; - my $first_read = 1; - - while($nbytes = sysread($socket, $buf, $read_size)) { - $self->{parser}->parse_more($buf) if $buf; - if($nbytes < $read_size or $self->peek_msg) { - set_block($socket) if $nonblock; - last; - } - set_nonblock($socket) unless $nonblock; - $nonblock = 1; - $first_read = 0; - } - - if ($first_read and defined $nbytes and $nbytes == 0) { - # if the first read on an active socket is 0 bytes, - # the socket has been disconnected from the remote end. - $self->{stream_state} = DISCONNECTED; - $logger->error("Disconnected from Jabber server"); - throw OpenSRF::EX::Jabber("Disconnected from Jabber server"); - } - - return $self->next_msg; -} - -# ----------------------------------------------------------- -# Waits up to timeout seconds for a fully-formed XMPP -# message to arrive. If timeout is < 0, waits indefinitely -# ----------------------------------------------------------- -sub wait_msg { - my($self, $timeout) = @_; - my $xml; - - $timeout = 0 unless defined $timeout; - - if($timeout < 0) { - while(1) { - return $xml if $xml = $self->wait($timeout); - } - - } else { - while($timeout >= 0) { - my $start = time; - return $xml if $xml = $self->wait($timeout); - $timeout -= time - $start; - } - } - - return undef; -} - - -# ----------------------------------------------------------- -# SAX Handlers -# ----------------------------------------------------------- - - -sub start_element { - my($parser, $name, %attrs) = @_; - my $self = $parser->{_parent_}; - - if($name eq 'message') { - - my $msg = $self->{message}; - $msg->{to} = $attrs{'to'}; - $msg->{from} = $attrs{from}; - $msg->{type} = $attrs{type}; - - } elsif($name eq 'opensrf') { - - # These will be authoritative if they exist - my $msg = $self->{message}; - $msg->{from} = $attrs{router_from} if $attrs{router_from}; - $msg->{osrf_xid} = $attrs{'osrf_xid'}; - - } elsif($name eq 'body') { - $self->{xml_state} = IN_BODY; - - } elsif($name eq 'thread') { - $self->{xml_state} = IN_THREAD; - - } elsif($name eq 'stream:stream') { - $self->{stream_state} = CONNECT_RECV; - - } elsif($name eq 'iq') { - if($attrs{type} and $attrs{type} eq 'result') { - $self->{stream_state} = CONNECTED; - } - - } elsif($name eq 'status') { - $self->{xml_state } = IN_STATUS; - - } elsif($name eq 'stream:error') { - $self->{stream_state} = DISCONNECTED; - - } elsif($name eq 'error') { - $self->{message}->{err_type} = $attrs{'type'}; - $self->{message}->{err_code} = $attrs{'code'}; - } -} - -sub characters { - my($parser, $chars) = @_; - my $self = $parser->{_parent_}; - my $state = $self->{xml_state}; - - if($state == IN_BODY) { - $self->{message}->{body} .= $chars; - - } elsif($state == IN_THREAD) { - $self->{message}->{thread} .= $chars; - - } elsif($state == IN_STATUS) { - $self->{message}->{status} .= $chars; - } -} - -sub end_element { - my($parser, $name) = @_; - my $self = $parser->{_parent_}; - $self->{xml_state} = IN_NOTHING; - - if($name eq 'message') { - $self->push_msg($self->{message}); - $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new; - - } elsif($name eq 'stream:stream') { - $self->{stream_state} = DISCONNECTED; - } -} - - -# read all the data on the jabber socket through the -# parser and drop the resulting message -sub flush_socket { - my $self = shift; - return 0 unless $self->connected; - - while ($self->wait(0)) { - # TODO remove this log line - $logger->info("flushing data from socket..."); - } - - return $self->connected; -} - - - -1; - diff --git a/src/perl/t/07-Transport.t b/src/perl/t/07-Transport.t index e8271f8..0d71a94 100644 --- a/src/perl/t/07-Transport.t +++ b/src/perl/t/07-Transport.t @@ -1,6 +1,6 @@ #!perl -T -use Test::More tests => 9; +use Test::More tests => 7; BEGIN { use_ok( 'OpenSRF::Transport' ); @@ -8,9 +8,7 @@ BEGIN { use_ok( 'OpenSRF::Transport::Listener' ); use_ok( 'OpenSRF::Transport::PeerHandle' ); -use_ok( 'OpenSRF::Transport::SlimJabber' ); -use_ok( 'OpenSRF::Transport::SlimJabber::Client' ); -use_ok( 'OpenSRF::Transport::SlimJabber::MessageWrapper' ); -use_ok( 'OpenSRF::Transport::SlimJabber::PeerConnection' ); -use_ok( 'OpenSRF::Transport::SlimJabber::XMPPMessage' ); -use_ok( 'OpenSRF::Transport::SlimJabber::XMPPReader' ); +use_ok( 'OpenSRF::Transport::Redis::Client' ); +use_ok( 'OpenSRF::Transport::Redis::Message' ); +use_ok( 'OpenSRF::Transport::Redis::PeerConnection' ); +use_ok( 'OpenSRF::Transport::Redis::BusConnection' ); -- 2.11.0