perl websocket client WIP
authorBill Erickson <berickxx@gmail.com>
Mon, 7 Mar 2016 16:47:43 +0000 (11:47 -0500)
committerBill Erickson <berickxx@gmail.com>
Mon, 7 Mar 2016 16:47:43 +0000 (11:47 -0500)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/extras/perl-websocket-client.pl
src/perl/lib/OpenSRF/Transport/WebSockets.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm [new file with mode: 0644]
src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm [new file with mode: 0644]

index be06d88..2931bf6 100755 (executable)
@@ -1,15 +1,22 @@
 #!/usr/bin/perl
 use strict;
 use warnings;
-use IO::Socket::SSL; # for params
-use OpenSRF::Utils::WebSocketsClient;
+use OpenSRF::System;                                                           
+use OpenSRF::AppSession;
 use OpenSRF::DomainObject::oilsMethod;
 use OpenSRF::DomainObject::oilsMessage;
+use OpenSRF::Transport::WebSockets::Client;
+
+my $osrf_config = '/openils/conf/opensrf_core.xml';
+
+# note: for now this still connects to jabber.
+OpenSRF::System->bootstrap_client(config_file => $osrf_config);                
 
 my $method = OpenSRF::DomainObject::oilsMethod->new(
     method => 'opensrf.system.echo',
     params => ['hello, world']
 );
+
 my $msg = OpenSRF::DomainObject::oilsMessage->new(
     type => 'REQUEST',
     api_level => 1,
@@ -19,24 +26,24 @@ my $msg = OpenSRF::DomainObject::oilsMessage->new(
 );
 
 # connects to localhost by default
-my $client = OpenSRF::Utils::WebSocketsClient->new(
-    ssl_params => { # testing only
-        SSL_verify_mode => SSL_VERIFY_NONE
-    }
+my $client = OpenSRF::Transport::WebSockets::Client->new(
+    'no-op',
+    host => 'localhost',
+    port => 7682,
+    ssl_no_verify => 1,
 );
 
 die "cound not connect\n" unless $client->connect;
 
-$client->send({
-    service => 'open-ils.auth',
+$client->send(
+    to => 'open-ils.auth',
     thread => rand(),
-    osrf_msg => [$msg]
-});
+    msg => [$msg]
+);
 
 my $response = $client->recv(-1);
-my $messages = $response->{osrf_msg};
 
-for my $msg (@$messages) {
+for my $msg (@{$response->{msg}}) {
     print "received: " . 
         OpenSRF::Utils::JSON->perl2JSON($msg) . "\n\n";
 }
diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets.pm b/src/perl/lib/OpenSRF/Transport/WebSockets.pm
new file mode 100644 (file)
index 0000000..4754e65
--- /dev/null
@@ -0,0 +1,12 @@
+package OpenSRF::Transport::WebSockets;
+use base qw/OpenSRF::Transport/;
+
+sub get_peer_client { 
+    return "OpenSRF::Transport::WebSockets::Client"; 
+}
+
+sub get_msg_envelope { 
+    return "OpenSRF::Transport::WebSockets::Message"; 
+}
+
+1;
diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm b/src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm
new file mode 100644 (file)
index 0000000..587aa98
--- /dev/null
@@ -0,0 +1,278 @@
+package OpenSRF::Transport::WebSockets::Client;
+# -------------------------------------------------------------
+# Copyright (C) 2016, King County Library System
+# Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# -------------------------------------------------------------
+use strict;
+use warnings;
+use IO::Socket::SSL;
+use Protocol::WebSocket::Client;
+use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK);
+use OpenSRF::Utils::Config;
+use OpenSRF::Utils::Logger qw/$logger/;
+use OpenSRF::Transport::WebSockets::Message;
+use Data::Dumper;
+$Data::Dumper::INDENT=0;
+
+
+our $connection;
+
+sub retrieve { 
+    return $connection;
+}
+
+sub reset {
+    return unless $connection;
+    $connection->disconnect;
+    $connection = undef;
+}
+
+
+sub new {
+    my ($class, $app, %args) = @_;
+
+    return $connection if 
+        $connection && $connection->{socket}->connected;
+
+    my $conf = OpenSRF::Utils::Config->current;
+
+    my $self = {
+        service => $app,
+        host => $args{host} || $conf->bootstrap->domain || 'localhost',
+        port => $args{port} || $conf->bootstrap->domain || 7682,
+        path => $args{path} || $conf->bootstrap->path || '/osrf-websocket-translator',
+        connected => 0,
+        responses => [],
+        ssl_params => {}
+    };
+
+    $self->{ssl_params} = {SSL_verify_mode => SSL_VERIFY_NONE}
+        if ($args{ssl_no_verify} || $conf->bootstrap->ssl_no_verify);
+
+    $connection = bless($self, $class);
+    return $connection;
+}
+
+
+sub connect {
+    my $self = shift;
+
+    $self->connect_socket or return undef;
+
+    my $url = sprintf("wss://%s:%d", $self->{host}, $self->{port});
+    $url .= '/' . $self->{path} if $self->{path};
+
+    my $client = Protocol::WebSocket::Client->new(
+        url => $url,
+        on_write => sub {
+            my ($client, $data) = @_;
+            $self->{socket}->print($data);
+        },
+        on_connect => sub {
+            my $client = shift;
+            $self->{connected} = 1;
+        },
+        on_error => sub {
+            my ($client, $error) = @_;
+            $logger->error("WebSockets error: $error");
+            $self->disconnect;
+        }
+    );
+
+    $client->on(
+        # add resposnes to the response queue
+        read => sub {
+            my ($client, $data) = @_;
+            push(@{$self->{responses}}, 
+                OpenSRF::Transport::WebSockets::Message->new(json => $data));
+        }
+    );
+
+    $self->{client} = $client; # needed in connect/recv callback
+
+    $client->connect;
+
+    while (!$self->{connected}) { $self->recv(-1) }
+
+    return 1;
+}
+
+sub connect_socket {
+    my $self = shift;
+
+    my $sock_args = {
+        PeerHost => $self->{host},
+        PeerPort => int($self->{port}),
+        Proto    => 'tcp',
+        %{$self->{ssl_params}}
+    };
+
+    $logger->debug("WebSocket socket args: " . Dumper($sock_args));
+    warn "WebSocket socket args: " . Dumper($sock_args) . "\n";
+
+    my $sock = IO::Socket::SSL->new(%$sock_args);
+
+    return $self->{socket} = $sock if $sock && $sock->connected;
+
+    warn "SSL socket connection failed to ".
+        $self->{host}.":".$self->{port}." => $!\n";
+
+    return undef;
+}
+
+
+sub initialize {
+    my $self = shift;
+    return $self->connect;
+}
+
+sub construct {
+    my ($class, $service) = @_;
+       $class->peer_handle($class->new($service)->initialize);
+}
+
+sub app {
+    my $self = shift;
+    my $app = shift;
+    $self->{service} = $app if $app;
+    return $self->{service};
+}
+
+
+# true if a connection exists and the TCP port is open.
+sub tcp_connected {
+    return 
+        $connection && 
+        $connection->{socket} &&
+        $connection->{socket}->connected;
+}
+
+# true if we have completed the websockets handshake w/ the server
+sub connected {
+    my $self = shift;
+    return $self->tcp_connected && $self->{connected};
+}
+
+# returns a message if one is availabe or becomes available 
+# within the timeout provided.  A timeout value of 0 or undef
+# means check for messages, but don't wait.  A timeout of -1 means
+# wait until a message arrives.  A timeout of > 0 means to wait 
+# the many seconds for a message to arrive.
+sub process {
+    my $self = shift;
+    return $self->recv(@_);
+}
+
+sub next_msg {
+    my $self = shift;
+    return shift @{$self->{responses}};
+}
+
+sub peek_msg {
+    my $self = shift;
+    return (@{$self->{responses}} > 0);
+}
+
+# -----------------------------------------------------------
+# Puts a file handle into blocking mode
+# -----------------------------------------------------------
+sub set_block {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    $flags &= ~O_NONBLOCK;
+    fcntl($fh, F_SETFL, $flags);
+}
+
+
+# -----------------------------------------------------------
+# Puts a file handle into non-blocking mode
+# -----------------------------------------------------------
+sub set_nonblock {
+    my $fh = shift;
+    my  $flags = fcntl($fh, F_GETFL, 0);
+    fcntl($fh, F_SETFL, $flags | O_NONBLOCK);
+}
+
+sub recv {
+    my ($self, $timeout) = @_;
+    return $self->next_msg if $self->peek_msg;
+
+    $timeout ||= 0;
+    $timeout = undef if $timeout < 0;
+    my $socket = $self->{socket};
+
+    set_block($socket);
+    
+    # build the select readset
+    my $infile = '';
+    vec($infile, $socket->fileno, 1) = 1;
+
+    my $nfound = select($infile, undef, undef, $timeout);
+    return undef if !$nfound or $nfound == -1;
+
+    # now slurp the data off the socket
+    my $buf;
+    my $read_size = 1024;
+    my $nonblock = 0;
+    my $nbytes;
+    my $first_read = 1;
+
+    while($nbytes = sysread($socket, $buf, $read_size)) {
+        $self->{client}->read($buf);
+        if($nbytes < $read_size or $self->peek_msg) {
+            set_block($socket) if $nonblock;
+            last;
+        }
+        set_nonblock($socket) unless $nonblock;
+        $nonblock = 1;
+        $first_read = 0;
+    }
+
+    if ($first_read and defined $nbytes and $nbytes == 0) {
+        # if the first read on an active socket is 0 bytes, 
+        # the socket has been disconnected from the remote end. 
+        die "Disconnected from WebSockets server\n";
+    }
+
+    return $self->next_msg;
+}
+
+
+sub send {
+       my $self = shift;
+    my $msg = OpenSRF::Transport::WebSockets::Message->new(@_);
+
+    $msg->osrf_xid($logger->get_osrf_xid);
+    my $json = $msg->serialize;
+
+    if (my $warn_size = $self->{msg_size_warn}) {
+        use bytes; 
+        my $len = length($json);
+        $logger->warn("Sending large message of $len bytes")
+            if $len >= $warn_size;
+    }
+
+    $self->{client}->write($json);
+}
+
+sub disconnect {
+    my $self = shift;
+    $self->{connected} = 0;
+    return unless $self->{client};
+    $self->{client}->disconnect;
+    delete $self->{client};
+}
+
+
+1;
+
diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm b/src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm
new file mode 100644 (file)
index 0000000..1078028
--- /dev/null
@@ -0,0 +1,136 @@
+package OpenSRF::Transport::WebSockets::Message;
+# -------------------------------------------------------------
+# Copyright (C) 2016, King County Library System
+# Bill Erickson <berickxx@gmail.com>
+#
+# This program is free software; you can redistribute it and/or
+# modify it under the terms of the GNU General Public License
+# as published by the Free Software Foundation; either version 2
+# of the License, or (at your option) any later version.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
+# GNU General Public License for more details.
+# -------------------------------------------------------------
+use strict; 
+use warnings;
+use OpenSRF::Utils::JSON;
+
+sub new {
+    my $class = shift;
+    warn "Message: @_\n";
+    my %args = @_;
+    my $self = bless({}, $class);
+
+    if ($args{json}) {
+        $self->parse_json($args{json});
+    } else {
+        $self->{to}     = $args{to};
+        $self->{body}   = $args{body}; # oilsMessage array as JSON
+        $self->{msg}    = $args{msg}; # oilsMessage array as objects
+        $self->{thread} = $args{thread} || $$ . rand();
+    }
+
+    return $self;
+}
+
+sub to {
+    my ($self, $to) = @_;
+    $self->{to} = $to if defined $to;
+    return $self->{to};
+}
+
+sub thread {
+    my ($self, $thread) = @_;
+    $self->{thread} = $thread if defined $thread;
+    return $self->{thread};
+}
+
+# contains a JSON-ified oilsMessage
+sub body {
+    my ($self, $body) = @_;
+    if (defined $body) {
+        $self->{body} = $body;
+    } elsif (!$self->{body} && $self->{msg}) {
+        $self->{body} = OpenSRF::Utils::JSON->perl2JSON($self->{msg});
+    }
+    return $self->{body};
+}
+
+sub osrf_xid {
+    my($self, $osrf_xid) = @_;
+    $self->{osrf_xid} = $osrf_xid if defined $osrf_xid;
+    return $self->{osrf_xid};
+}
+
+sub serialize {
+    my $self = shift;
+    return OpenSRF::Utils::JSON->perl2JSON({
+        service => $self->{to},
+        thread => $self->{thread},
+        osrf_msg => 
+            # to JSON-ify this message, must first un-JSON-ify the body.
+            # TODO: teach the upper layers to avoid unnecessary 
+            # JSON translations.
+            $self->{msg} ? $self->{msg} :
+                OpenSRF::Utils::JSON->JSON2perl($self->{body})
+    });
+}
+
+sub parse_json {
+    my ($self, $json) = @_;
+    my $blob = OpenSRF::Utils::JSON->JSON2perl($json);
+    $self->{to} = $blob->{service};
+    $self->{thread} = $blob->{thread};
+    # NOTE: see TODO above re: extra JSON translations
+    $self->{msg} = $blob->{osrf_msg};
+}
+
+sub msg {
+    my($self, $msg) = @_;
+    $self->{msg} = $msg if $msg;
+    return $self->{msg};
+}
+
+# TODO: these came from MessageWrapper... do we need any of these?
+
+sub toString {
+}
+
+sub get_body {
+}
+
+sub get_sess_id {
+}
+
+sub get_msg_type {
+}
+
+sub get_remote_id {
+}
+
+sub setType {
+}
+
+sub setTo {
+}
+
+sub setThread {
+}
+
+sub setBody {
+}
+
+sub set_router_command {
+}
+sub set_router_class {
+}
+
+sub set_osrf_xid {
+}
+
+sub get_osrf_xid {
+}
+
+1;