Among other thibngs, makes troubleshooting a bit easier.
Signed-off-by: Bill Erickson <berickxx@gmail.com>
lib/OpenSRF/Transport.pm
lib/OpenSRF/Transport/Listener.pm
lib/OpenSRF/Transport/PeerHandle.pm
-lib/OpenSRF/Transport/SlimJabber.pm
-lib/OpenSRF/Transport/SlimJabber/Client.pm
-lib/OpenSRF/Transport/SlimJabber/MessageWrapper.pm
-lib/OpenSRF/Transport/SlimJabber/PeerConnection.pm
-lib/OpenSRF/Transport/SlimJabber/XMPPMessage.pm
-lib/OpenSRF/Transport/SlimJabber/XMPPReader.pm
+lib/OpenSRF/Transport/Redis/Client.pm
+lib/OpenSRF/Transport/Redis/Message.pm
+lib/OpenSRF/Transport/Redis/BusConnection.pm
+lib/OpenSRF/Transport/Redis/PeerConnection.pm
lib/OpenSRF/Utils.pm
lib/OpenSRF/Utils/Cache.pm
lib/OpenSRF/Utils/Config.pm
=head2 Synopsis
- throw OpenSRF::EX::Jabber ("I Am Dying");
+ throw OpenSRF::EX::Transport ("I Am Dying");
OpenSRF::EX::InvalidArg->throw( "Another way" );
- my $je = OpenSRF::EX::Jabber->new( "I Cannot Connect" );
+ my $je = OpenSRF::EX::Transport->new( "I Cannot Connect" );
$je->throw();
# Some basic exceptions
# -------------------------------------------------------------------
-package OpenSRF::EX::Jabber;
-use base 'OpenSRF::EX::ERROR';
-our $ex_msg_header = "Jabber Exception";
-
-package OpenSRF::EX::JabberDisconnected;
-use base 'OpenSRF::EX::ERROR';
-our $ex_msg_header = "JabberDisconnected Exception";
-
-=head2 OpenSRF::EX::Jabber
-
-Thrown when there is a problem using the Jabber service
-
-=cut
package OpenSRF::EX::Transport;
use base 'OpenSRF::EX::ERROR';
local $SIG{'PIPE'} = sub { $self->{sig_pipe} = 1; };
# In rare cases a child can die between creation and first
- # write, typically a result of a jabber connect error. Before
+ # write, typically a result of a bus connect error. Before
# sending data to each child, confirm it's still alive. If it's
# not, log the error and drop the message to prevent the parent
# process from dying.
my $orig_name = $0;
$0 = "$0*";
- # Discard extraneous data from the jabber socket
+ # Discard extraneous data from the bus
if(!$network->flush_socket()) {
$logger->error("server: network disconnected! child dropping request and exiting: $data");
exit;
$logger->transport( "Transport::handler() creating \n$body", INTERNAL );
- # We need to disconnect the session if we got a jabber error on the client side. For
- # server side, we'll just tear down the session and go away.
+ # We need to disconnect the session if we got a bus error on the client
+ # side. For server side, we'll just tear down the session and go away.
if (defined($type) and $type eq 'error') {
# If we're a server
if( $app_session->endpoint == $app_session->SERVER() ) {
#$app_session->push_resend( $app_session->app_request(
# $doc->documentElement->firstChild->threadTrace ) );
$logger->debug(
- "Got Jabber error on client connection $remote_id, nothing we can do..", ERROR );
+ "Got error on client connection $remote_id, nothing we can do..", ERROR );
return 1;
}
}
package OpenSRF::Transport::PeerHandle;
use OpenSRF::Utils::Logger qw(:level);
use OpenSRF::EX;
-use base qw/OpenSRF::Transport::SlimJabber::PeerConnection/;
+use base qw/OpenSRF::Transport::Redis::PeerConnection/;
use vars '@ISA';
my $peer;
+++ /dev/null
-package OpenSRF::Transport::SlimJabber;
-use base qw/OpenSRF::Transport/;
-
-=head2 OpenSRF::Transport::SlimJabber
-
-Implements the Transport interface for providing the system with appropriate
-classes for handling transport layer messaging
-
-=cut
-
-
-sub get_peer_client { return "OpenSRF::Transport::SlimJabber::PeerConnection"; }
-
-1;
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::Client;
-
-use strict;
-use warnings;
-
-use OpenSRF::EX;
-use OpenSRF::Utils::Config;
-use OpenSRF::Utils::Logger qw/$logger/;
-use OpenSRF::Transport::SlimJabber::XMPPReader;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
-use IO::Socket::INET;
-
-=head1 NAME
-
-OpenSRF::Transport::SlimJabber::Client
-
-=head1 SYNOPSIS
-
-
-
-=head1 DESCRIPTION
-
-
-
-=cut
-
-=head1 METHODS
-
-=head2 new
-
-=cut
-
-sub new {
- my( $class, %params ) = @_;
- my $self = bless({}, ref($class) || $class);
- $self->params(\%params);
- return $self;
-}
-
-=head2 reader
-
-=cut
-
-sub reader {
- my($self, $reader) = @_;
- $self->{reader} = $reader if $reader;
- return $self->{reader};
-}
-
-=head2 params
-
-=cut
-
-sub params {
- my($self, $params) = @_;
- $self->{params} = $params if $params;
- return $self->{params};
-}
-
-=head2 socket
-
-=cut
-
-sub socket {
- my($self, $socket) = @_;
- $self->{socket} = $socket if $socket;
- return $self->{socket};
-}
-
-=head2 disconnect
-
-=cut
-
-sub disconnect {
- my $self = shift;
- $self->reader->disconnect if $self->reader;
-}
-
-
-=head2 gather
-
-=cut
-
-sub gather {
- my $self = shift;
- $self->process( 0 );
-}
-
-# -------------------------------------------------
-
-=head2 tcp_connected
-
-=cut
-
-sub tcp_connected {
- my $self = shift;
- return $self->reader->tcp_connected if $self->reader;
- return 0;
-}
-
-sub connected {
- my $self = shift;
- return $self->reader->connected if $self->reader;
- return 0;
-}
-
-
-=head2 send
-
-=cut
-
-sub send {
- my $self = shift;
- my $msg = OpenSRF::Transport::SlimJabber::XMPPMessage->new(@_);
- $msg->osrf_xid($logger->get_osrf_xid);
- $msg->from($self->xmpp_id);
- my $xml = $msg->to_xml;
- {
- use bytes;
- my $len = length($xml);
- if ($len >= $self->{msg_size_warn}) {
- $logger->warn("Sending large message of $len bytes to " . $msg->to)
- }
- }
- $self->reader->send($xml);
-}
-
-=head2 initialize
-
-=cut
-
-sub initialize {
-
- my $self = shift;
-
- my $host = $self->params->{host};
- my $port = $self->params->{port};
- my $username = $self->params->{username};
- my $resource = $self->params->{resource};
- my $password = $self->params->{password};
-
- my $conf = OpenSRF::Utils::Config->current;
-
- $self->{msg_size_warn} = $conf->bootstrap->msg_size_warn || 1800000;
-
- my $tail = "_$$";
- $tail = "" if !$conf->bootstrap->router_name and $username eq "router";
- $resource = "$resource$tail";
-
- my $socket = IO::Socket::INET->new(
- PeerHost => $host,
- PeerPort => int($port),
- Proto => 'tcp' );
-
- throw OpenSRF::EX::Jabber("Could not open TCP socket to Jabber server: $@")
- unless ( $socket and $socket->connected );
-
- $self->socket($socket);
- $self->reader(OpenSRF::Transport::SlimJabber::XMPPReader->new($socket));
- $self->reader->connect($host, $username, $password, $resource);
-
- throw OpenSRF::EX::Jabber("Could not authenticate with Jabber server: $@")
- unless ( $self->reader->connected );
-
- $self->xmpp_id("$username\@$host/$resource");
- $logger->debug(sub{return "Created XMPP connection " . $self->xmpp_id });
- return $self;
-}
-
-
-# Our full login: username@host/resource
-sub xmpp_id {
- my($self, $xmpp_id) = @_;
- $self->{xmpp_id} = $xmpp_id if $xmpp_id;
- return $self->{xmpp_id};
-}
-
-
-=head2 construct
-
-=cut
-
-sub construct {
- my( $class, $app ) = @_;
- $class->peer_handle($class->new( $app )->initialize());
-}
-
-
-=head2 process
-
-=cut
-
-sub process {
- my($self, $timeout) = @_;
-
- $timeout ||= 0;
- $timeout = int($timeout);
-
- unless( $self->reader and $self->reader->connected ) {
- throw OpenSRF::EX::JabberDisconnected
- ("This JabberClient instance is no longer connected to the server ");
- }
-
- return $self->reader->wait_msg($timeout);
-}
-
-
-=head2 flush_socket
-
-Sets the socket to O_NONBLOCK, reads all of the data off of the
-socket, the restores the sockets flags. Returns 1 on success, 0 if
-the socket isn't connected.
-
-=cut
-
-sub flush_socket {
- my $self = shift;
- return 0 unless $self->reader;
- return $self->reader->flush_socket;
-}
-
-1;
-
-
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::MessageWrapper;
-use strict; use warnings;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
-
-# ----------------------------------------------------------
-# Legacy wrapper for XMPPMessage
-# ----------------------------------------------------------
-
-sub new {
- my $class = shift;
- my $msg = shift;
- return bless({msg => $msg}, ref($class) || $class);
-}
-
-sub msg {
- my($self, $msg) = @_;
- $self->{msg} = $msg if $msg;
- return $self->{msg};
-}
-
-sub toString {
- return $_[0]->msg->to_xml;
-}
-
-sub get_body {
- return $_[0]->msg->body;
-}
-
-sub get_sess_id {
- return $_[0]->msg->thread;
-}
-
-sub get_msg_type {
- return $_[0]->msg->type;
-}
-
-sub get_remote_id {
- return $_[0]->msg->from;
-}
-
-sub setType {
- $_[0]->msg->type(shift());
-}
-
-sub setTo {
- $_[0]->msg->to(shift());
-}
-
-sub setThread {
- $_[0]->msg->thread(shift());
-}
-
-sub setBody {
- $_[0]->msg->body(shift());
-}
-
-sub set_router_command {
- $_[0]->msg->router_command(shift());
-}
-sub set_router_class {
- $_[0]->msg->router_class(shift());
-}
-
-sub set_osrf_xid {
- $_[0]->msg->osrf_xid(shift());
-}
-
-sub get_osrf_xid {
- return $_[0]->msg->osrf_xid;
-}
-
-1;
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::PeerConnection;
-use strict;
-use base qw/OpenSRF::Transport::SlimJabber::Client/;
-use OpenSRF::Utils::Config;
-use OpenSRF::Utils::Logger qw(:level);
-use OpenSRF::EX qw/:try/;
-
-=head1 Description
-
-Represents a single connection to a remote peer. The
-Jabber values are loaded from the config file.
-
-Subclasses OpenSRF::Transport::SlimJabber::Client.
-
-=cut
-
-=head2 new()
-
- new( $appname );
-
- The $appname parameter tells this class how to find the correct
- Jabber username, password, etc to connect to the server.
-
-=cut
-
-our %apps_hash;
-our $_singleton_connection;
-
-sub retrieve {
- my( $class, $app ) = @_;
- return $_singleton_connection;
-}
-
-sub reset {
- return unless $_singleton_connection;
- $_singleton_connection->disconnect;
- $_singleton_connection = undef;
-}
-
-
-sub new {
- my( $class, $app ) = @_;
-
- my $peer_con = $class->retrieve;
- return $peer_con if ($peer_con and $peer_con->tcp_connected);
-
- my $config = OpenSRF::Utils::Config->current;
-
- if( ! $config ) {
- throw OpenSRF::EX::Config( "No suitable config found for PeerConnection" );
- }
-
- my $conf = OpenSRF::Utils::Config->current;
- my $domain = $conf->bootstrap->domain;
- my $h = $conf->env->hostname;
- OpenSRF::Utils::Logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
-
- my $username = $conf->bootstrap->username;
- my $password = $conf->bootstrap->passwd;
- my $port = $conf->bootstrap->port;
- my $resource = "${app}_drone_at_$h";
- my $host = $domain; # XXX for now...
-
- if( $app eq "client" ) { $resource = "client_at_$h"; }
-
- OpenSRF::EX::Config->throw( "JPeer could not load all necessary values from config" )
- unless ( $username and $password and $resource and $host and $port );
-
- my $self = __PACKAGE__->SUPER::new(
- username => $username,
- resource => $resource,
- password => $password,
- host => $host,
- port => $port,
- );
-
- bless( $self, $class );
-
- $self->app($app);
-
- $_singleton_connection = $self;
- $apps_hash{$app} = $self;
-
- return $_singleton_connection;
- return $apps_hash{$app};
-}
-
-sub process {
- my $self = shift;
- my $val = $self->SUPER::process(@_);
- return 0 unless $val;
- return OpenSRF::Transport->handler($self->app, $val);
-}
-
-sub app {
- my $self = shift;
- my $app = shift;
- $self->{app} = $app if $app;
- return $self->{app};
-}
-
-1;
-
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::XMPPMessage;
-use strict; use warnings;
-use OpenSRF::Utils::Logger qw/$logger/;
-use OpenSRF::EX qw/:try/;
-use strict; use warnings;
-use XML::LibXML;
-
-use constant JABBER_MESSAGE =>
- "<message to='%s' from='%s'>".
- "<opensrf router_command='%s' router_class='%s' osrf_xid='%s'/>".
- "<thread>%s</thread><body>%s</body></message>";
-
-sub new {
- my $class = shift;
- my %args = @_;
- my $self = bless({}, $class);
-
- if($args{xml}) {
- $self->parse_xml($args{xml});
-
- } else {
- $self->{to} = $args{to} || '';
- $self->{from} = $args{from} || '';
- $self->{thread} = $args{thread} || '';
- $self->{body} = $args{body} || '';
- $self->{osrf_xid} = $args{osrf_xid} || '';
- $self->{router_command} = $args{router_command} || '';
- $self->{router_class} = $args{router_class} || '';
- }
-
- return $self;
-}
-
-sub to {
- my($self, $to) = @_;
- $self->{to} = $to if defined $to;
- return $self->{to};
-}
-sub from {
- my($self, $from) = @_;
- $self->{from} = $from if defined $from;
- return $self->{from};
-}
-sub thread {
- my($self, $thread) = @_;
- $self->{thread} = $thread if defined $thread;
- return $self->{thread};
-}
-sub body {
- my($self, $body) = @_;
- $self->{body} = $body if defined $body;
- return $self->{body};
-}
-sub status {
- my($self, $status) = @_;
- $self->{status} = $status if defined $status;
- return $self->{status};
-}
-sub type {
- my($self, $type) = @_;
- $self->{type} = $type if defined $type;
- return $self->{type};
-}
-sub err_type {
- my($self, $err_type) = @_;
- $self->{err_type} = $err_type if defined $err_type;
- return $self->{err_type};
-}
-sub err_code {
- my($self, $err_code) = @_;
- $self->{err_code} = $err_code if defined $err_code;
- return $self->{err_code};
-}
-sub osrf_xid {
- my($self, $osrf_xid) = @_;
- $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
- return $self->{osrf_xid};
-}
-sub router_command {
- my($self, $router_command) = @_;
- $self->{router_command} = $router_command if defined $router_command;
- return $self->{router_command};
-}
-sub router_class {
- my($self, $router_class) = @_;
- $self->{router_class} = $router_class if defined $router_class;
- return $self->{router_class};
-}
-
-
-sub to_xml {
- my $self = shift;
-
- my $body = $self->{body};
- $body =~ s/&/&/sog;
- $body =~ s/</</sog;
- $body =~ s/>/>/sog;
-
- return sprintf(
- JABBER_MESSAGE,
- $self->{to},
- $self->{from},
- $self->{router_command},
- $self->{router_class},
- $self->{osrf_xid},
- $self->{thread},
- $body
- );
-}
-
-sub parse_xml {
- my($self, $xml) = @_;
- my($doc, $err);
-
- try {
- $doc = XML::LibXML->new->parse_string($xml);
- } catch Error with {
- my $err = shift;
- $logger->error("Error parsing message xml: $xml --- $err");
- };
- throw $err if $err;
-
- my $root = $doc->documentElement;
- my $osrf_node = $root->findnodes('/message/opensrf')->shift;
-
- $self->{body} = $root->findnodes('/message/body').'';
- $self->{thread} = $root->findnodes('/message/thread').'';
-
- $self->{from} = $osrf_node->getAttribute('router_from');
- $self->{from} = $root->getAttribute('from') unless $self->{from};
-
- $self->{to} = $root->getAttribute('to');
-
- $self->{type} = $root->getAttribute('type');
- $self->{osrf_xid} = $osrf_node->getAttribute('osrf_xid');
-}
-
-
-1;
+++ /dev/null
-package OpenSRF::Transport::SlimJabber::XMPPReader;
-use strict; use warnings;
-use XML::Parser;
-use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
-use Time::HiRes qw/time/;
-use OpenSRF::Transport::SlimJabber::XMPPMessage;
-use OpenSRF::Utils::Logger qw/$logger/;
-use OpenSRF::EX;
-
-# -----------------------------------------------------------
-# Connect, disconnect, and authentication messsage templates
-# -----------------------------------------------------------
-use constant JABBER_CONNECT =>
- "<stream:stream to='%s' xmlns='jabber:client' xmlns:stream='http://etherx.jabber.org/streams'>";
-
-use constant JABBER_BASIC_AUTH =>
- "<iq id='123' type='set'><query xmlns='jabber:iq:auth'>" .
- "<username>%s</username><password>%s</password><resource>%s</resource></query></iq>";
-
-use constant JABBER_DISCONNECT => "</stream:stream>";
-
-
-# -----------------------------------------------------------
-# XMPP Stream states
-# -----------------------------------------------------------
-use constant DISCONNECTED => 1;
-use constant CONNECT_RECV => 2;
-use constant CONNECTED => 3;
-
-
-# -----------------------------------------------------------
-# XMPP Message states
-# -----------------------------------------------------------
-use constant IN_NOTHING => 1;
-use constant IN_BODY => 2;
-use constant IN_THREAD => 3;
-use constant IN_STATUS => 4;
-
-
-# -----------------------------------------------------------
-# Constructor, getter/setters
-# -----------------------------------------------------------
-sub new {
- my $class = shift;
- my $socket = shift;
-
- my $self = bless({}, $class);
-
- $self->{queue} = [];
- $self->{stream_state} = DISCONNECTED;
- $self->{xml_state} = IN_NOTHING;
- $self->socket($socket);
-
- my $p = new XML::Parser(Handlers => {
- Start => \&start_element,
- End => \&end_element,
- Char => \&characters,
- });
-
- $self->parser($p->parse_start); # create a push parser
- $self->parser->{_parent_} = $self;
- $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
- return $self;
-}
-
-sub push_msg {
- my($self, $msg) = @_;
- push(@{$self->{queue}}, $msg) if $msg;
-}
-
-sub next_msg {
- my $self = shift;
- return shift @{$self->{queue}};
-}
-
-sub peek_msg {
- my $self = shift;
- return (@{$self->{queue}} > 0);
-}
-
-sub parser {
- my($self, $parser) = @_;
- $self->{parser} = $parser if $parser;
- return $self->{parser};
-}
-
-sub socket {
- my($self, $socket) = @_;
- $self->{socket} = $socket if $socket;
- return $self->{socket};
-}
-
-sub stream_state {
- my($self, $stream_state) = @_;
- $self->{stream_state} = $stream_state if $stream_state;
- return $self->{stream_state};
-}
-
-sub xml_state {
- my($self, $xml_state) = @_;
- $self->{xml_state} = $xml_state if $xml_state;
- return $self->{xml_state};
-}
-
-sub message {
- my($self, $message) = @_;
- $self->{message} = $message if $message;
- return $self->{message};
-}
-
-
-# -----------------------------------------------------------
-# Stream and connection handling methods
-# -----------------------------------------------------------
-
-sub connect {
- my($self, $domain, $username, $password, $resource) = @_;
-
- $self->send(sprintf(JABBER_CONNECT, $domain));
- $self->wait(10);
-
- unless($self->{stream_state} == CONNECT_RECV) {
- $logger->error("No initial XMPP response from server");
- return 0;
- }
-
- $self->send(sprintf(JABBER_BASIC_AUTH, $username, $password, $resource));
- $self->wait(10);
-
- unless($self->connected) {
- $logger->error('XMPP connect failed');
- return 0;
- }
-
- return 1;
-}
-
-sub disconnect {
- my $self = shift;
- return unless $self->socket;
- if($self->tcp_connected) {
- $self->send(JABBER_DISCONNECT);
- shutdown($self->socket, 2);
- }
- close($self->socket);
-}
-
-# -----------------------------------------------------------
-# returns true if this stream is connected to the server
-# -----------------------------------------------------------
-sub connected {
- my $self = shift;
- return ($self->tcp_connected and $self->{stream_state} == CONNECTED);
-}
-
-# -----------------------------------------------------------
-# returns true if the socket is connected
-# -----------------------------------------------------------
-sub tcp_connected {
- my $self = shift;
- return ($self->socket and $self->socket->connected);
-}
-
-# -----------------------------------------------------------
-# sends pre-formated XML
-# -----------------------------------------------------------
-sub send {
- my($self, $xml) = @_;
-
- local $SIG{'PIPE'} = sub {
- $logger->error("Disconnected from Jabber server, exiting immediately");
- exit(99);
- };
- $self->{socket}->print($xml);
-}
-
-# -----------------------------------------------------------
-# Puts a file handle into blocking mode
-# -----------------------------------------------------------
-sub set_block {
- my $fh = shift;
- my $flags = fcntl($fh, F_GETFL, 0);
- $flags &= ~O_NONBLOCK;
- fcntl($fh, F_SETFL, $flags);
-}
-
-
-# -----------------------------------------------------------
-# Puts a file handle into non-blocking mode
-# -----------------------------------------------------------
-sub set_nonblock {
- my $fh = shift;
- my $flags = fcntl($fh, F_GETFL, 0);
- fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
-}
-
-
-sub wait {
- my($self, $timeout) = @_;
-
- return $self->next_msg if $self->peek_msg;
-
- $timeout ||= 0;
- $timeout = undef if $timeout < 0;
- my $socket = $self->{socket};
-
- set_block($socket);
-
- # build the select readset
- my $infile = '';
- vec($infile, $socket->fileno, 1) = 1;
-
- my $nfound;
- if (!OpenSRF->OSRF_APACHE_REQUEST_OBJ || $timeout <= 1.0) {
- $nfound = select($infile, undef, undef, $timeout);
- } else {
- $timeout -= 1.0;
- for (
- my $sleep = 1.0;
- $timeout >= 0.0;
- do {
- $sleep = $timeout < 1.0 ? $timeout : 1.0;
- $timeout -= 1.0;
- }
- ) {
- $nfound = select($infile, undef, undef, $sleep);
- last if $nfound;
- if (
- OpenSRF->OSRF_APACHE_REQUEST_OBJ &&
- OpenSRF->OSRF_APACHE_REQUEST_OBJ->connection->aborted
- ) {
- # Should this be more severe? Die or throw error?
- $logger->warn("Upstream Apache client disconnected, aborting.");
- last;
- };
- }
- }
- return undef if !$nfound or $nfound == -1;
-
- # now slurp the data off the socket
- my $buf;
- my $read_size = 1024;
- my $nonblock = 0;
- my $nbytes;
- my $first_read = 1;
-
- while($nbytes = sysread($socket, $buf, $read_size)) {
- $self->{parser}->parse_more($buf) if $buf;
- if($nbytes < $read_size or $self->peek_msg) {
- set_block($socket) if $nonblock;
- last;
- }
- set_nonblock($socket) unless $nonblock;
- $nonblock = 1;
- $first_read = 0;
- }
-
- if ($first_read and defined $nbytes and $nbytes == 0) {
- # if the first read on an active socket is 0 bytes,
- # the socket has been disconnected from the remote end.
- $self->{stream_state} = DISCONNECTED;
- $logger->error("Disconnected from Jabber server");
- throw OpenSRF::EX::Jabber("Disconnected from Jabber server");
- }
-
- return $self->next_msg;
-}
-
-# -----------------------------------------------------------
-# Waits up to timeout seconds for a fully-formed XMPP
-# message to arrive. If timeout is < 0, waits indefinitely
-# -----------------------------------------------------------
-sub wait_msg {
- my($self, $timeout) = @_;
- my $xml;
-
- $timeout = 0 unless defined $timeout;
-
- if($timeout < 0) {
- while(1) {
- return $xml if $xml = $self->wait($timeout);
- }
-
- } else {
- while($timeout >= 0) {
- my $start = time;
- return $xml if $xml = $self->wait($timeout);
- $timeout -= time - $start;
- }
- }
-
- return undef;
-}
-
-
-# -----------------------------------------------------------
-# SAX Handlers
-# -----------------------------------------------------------
-
-
-sub start_element {
- my($parser, $name, %attrs) = @_;
- my $self = $parser->{_parent_};
-
- if($name eq 'message') {
-
- my $msg = $self->{message};
- $msg->{to} = $attrs{'to'};
- $msg->{from} = $attrs{from};
- $msg->{type} = $attrs{type};
-
- } elsif($name eq 'opensrf') {
-
- # These will be authoritative if they exist
- my $msg = $self->{message};
- $msg->{from} = $attrs{router_from} if $attrs{router_from};
- $msg->{osrf_xid} = $attrs{'osrf_xid'};
-
- } elsif($name eq 'body') {
- $self->{xml_state} = IN_BODY;
-
- } elsif($name eq 'thread') {
- $self->{xml_state} = IN_THREAD;
-
- } elsif($name eq 'stream:stream') {
- $self->{stream_state} = CONNECT_RECV;
-
- } elsif($name eq 'iq') {
- if($attrs{type} and $attrs{type} eq 'result') {
- $self->{stream_state} = CONNECTED;
- }
-
- } elsif($name eq 'status') {
- $self->{xml_state } = IN_STATUS;
-
- } elsif($name eq 'stream:error') {
- $self->{stream_state} = DISCONNECTED;
-
- } elsif($name eq 'error') {
- $self->{message}->{err_type} = $attrs{'type'};
- $self->{message}->{err_code} = $attrs{'code'};
- }
-}
-
-sub characters {
- my($parser, $chars) = @_;
- my $self = $parser->{_parent_};
- my $state = $self->{xml_state};
-
- if($state == IN_BODY) {
- $self->{message}->{body} .= $chars;
-
- } elsif($state == IN_THREAD) {
- $self->{message}->{thread} .= $chars;
-
- } elsif($state == IN_STATUS) {
- $self->{message}->{status} .= $chars;
- }
-}
-
-sub end_element {
- my($parser, $name) = @_;
- my $self = $parser->{_parent_};
- $self->{xml_state} = IN_NOTHING;
-
- if($name eq 'message') {
- $self->push_msg($self->{message});
- $self->{message} = OpenSRF::Transport::SlimJabber::XMPPMessage->new;
-
- } elsif($name eq 'stream:stream') {
- $self->{stream_state} = DISCONNECTED;
- }
-}
-
-
-# read all the data on the jabber socket through the
-# parser and drop the resulting message
-sub flush_socket {
- my $self = shift;
- return 0 unless $self->connected;
-
- while ($self->wait(0)) {
- # TODO remove this log line
- $logger->info("flushing data from socket...");
- }
-
- return $self->connected;
-}
-
-
-
-1;
-
#!perl -T
-use Test::More tests => 9;
+use Test::More tests => 7;
BEGIN {
use_ok( 'OpenSRF::Transport' );
use_ok( 'OpenSRF::Transport::Listener' );
use_ok( 'OpenSRF::Transport::PeerHandle' );
-use_ok( 'OpenSRF::Transport::SlimJabber' );
-use_ok( 'OpenSRF::Transport::SlimJabber::Client' );
-use_ok( 'OpenSRF::Transport::SlimJabber::MessageWrapper' );
-use_ok( 'OpenSRF::Transport::SlimJabber::PeerConnection' );
-use_ok( 'OpenSRF::Transport::SlimJabber::XMPPMessage' );
-use_ok( 'OpenSRF::Transport::SlimJabber::XMPPReader' );
+use_ok( 'OpenSRF::Transport::Redis::Client' );
+use_ok( 'OpenSRF::Transport::Redis::Message' );
+use_ok( 'OpenSRF::Transport::Redis::PeerConnection' );
+use_ok( 'OpenSRF::Transport::Redis::BusConnection' );