From 922c75b25da67efaaed10c2885b06edc5fc1a426 Mon Sep 17 00:00:00 2001 From: Bill Erickson Date: Mon, 7 Mar 2016 11:47:43 -0500 Subject: [PATCH] perl websocket client WIP Signed-off-by: Bill Erickson --- src/extras/perl-websocket-client.pl | 31 ++- src/perl/lib/OpenSRF/Transport/WebSockets.pm | 12 + .../lib/OpenSRF/Transport/WebSockets/Client.pm | 278 +++++++++++++++++++++ .../lib/OpenSRF/Transport/WebSockets/Message.pm | 136 ++++++++++ 4 files changed, 445 insertions(+), 12 deletions(-) create mode 100644 src/perl/lib/OpenSRF/Transport/WebSockets.pm create mode 100644 src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm create mode 100644 src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm diff --git a/src/extras/perl-websocket-client.pl b/src/extras/perl-websocket-client.pl index be06d88..2931bf6 100755 --- a/src/extras/perl-websocket-client.pl +++ b/src/extras/perl-websocket-client.pl @@ -1,15 +1,22 @@ #!/usr/bin/perl use strict; use warnings; -use IO::Socket::SSL; # for params -use OpenSRF::Utils::WebSocketsClient; +use OpenSRF::System; +use OpenSRF::AppSession; use OpenSRF::DomainObject::oilsMethod; use OpenSRF::DomainObject::oilsMessage; +use OpenSRF::Transport::WebSockets::Client; + +my $osrf_config = '/openils/conf/opensrf_core.xml'; + +# note: for now this still connects to jabber. +OpenSRF::System->bootstrap_client(config_file => $osrf_config); my $method = OpenSRF::DomainObject::oilsMethod->new( method => 'opensrf.system.echo', params => ['hello, world'] ); + my $msg = OpenSRF::DomainObject::oilsMessage->new( type => 'REQUEST', api_level => 1, @@ -19,24 +26,24 @@ my $msg = OpenSRF::DomainObject::oilsMessage->new( ); # connects to localhost by default -my $client = OpenSRF::Utils::WebSocketsClient->new( - ssl_params => { # testing only - SSL_verify_mode => SSL_VERIFY_NONE - } +my $client = OpenSRF::Transport::WebSockets::Client->new( + 'no-op', + host => 'localhost', + port => 7682, + ssl_no_verify => 1, ); die "cound not connect\n" unless $client->connect; -$client->send({ - service => 'open-ils.auth', +$client->send( + to => 'open-ils.auth', thread => rand(), - osrf_msg => [$msg] -}); + msg => [$msg] +); my $response = $client->recv(-1); -my $messages = $response->{osrf_msg}; -for my $msg (@$messages) { +for my $msg (@{$response->{msg}}) { print "received: " . OpenSRF::Utils::JSON->perl2JSON($msg) . "\n\n"; } diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets.pm b/src/perl/lib/OpenSRF/Transport/WebSockets.pm new file mode 100644 index 0000000..4754e65 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/WebSockets.pm @@ -0,0 +1,12 @@ +package OpenSRF::Transport::WebSockets; +use base qw/OpenSRF::Transport/; + +sub get_peer_client { + return "OpenSRF::Transport::WebSockets::Client"; +} + +sub get_msg_envelope { + return "OpenSRF::Transport::WebSockets::Message"; +} + +1; diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm b/src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm new file mode 100644 index 0000000..587aa98 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm @@ -0,0 +1,278 @@ +package OpenSRF::Transport::WebSockets::Client; +# ------------------------------------------------------------- +# Copyright (C) 2016, King County Library System +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# ------------------------------------------------------------- +use strict; +use warnings; +use IO::Socket::SSL; +use Protocol::WebSocket::Client; +use Fcntl qw(F_GETFL F_SETFL O_NONBLOCK); +use OpenSRF::Utils::Config; +use OpenSRF::Utils::Logger qw/$logger/; +use OpenSRF::Transport::WebSockets::Message; +use Data::Dumper; +$Data::Dumper::INDENT=0; + + +our $connection; + +sub retrieve { + return $connection; +} + +sub reset { + return unless $connection; + $connection->disconnect; + $connection = undef; +} + + +sub new { + my ($class, $app, %args) = @_; + + return $connection if + $connection && $connection->{socket}->connected; + + my $conf = OpenSRF::Utils::Config->current; + + my $self = { + service => $app, + host => $args{host} || $conf->bootstrap->domain || 'localhost', + port => $args{port} || $conf->bootstrap->domain || 7682, + path => $args{path} || $conf->bootstrap->path || '/osrf-websocket-translator', + connected => 0, + responses => [], + ssl_params => {} + }; + + $self->{ssl_params} = {SSL_verify_mode => SSL_VERIFY_NONE} + if ($args{ssl_no_verify} || $conf->bootstrap->ssl_no_verify); + + $connection = bless($self, $class); + return $connection; +} + + +sub connect { + my $self = shift; + + $self->connect_socket or return undef; + + my $url = sprintf("wss://%s:%d", $self->{host}, $self->{port}); + $url .= '/' . $self->{path} if $self->{path}; + + my $client = Protocol::WebSocket::Client->new( + url => $url, + on_write => sub { + my ($client, $data) = @_; + $self->{socket}->print($data); + }, + on_connect => sub { + my $client = shift; + $self->{connected} = 1; + }, + on_error => sub { + my ($client, $error) = @_; + $logger->error("WebSockets error: $error"); + $self->disconnect; + } + ); + + $client->on( + # add resposnes to the response queue + read => sub { + my ($client, $data) = @_; + push(@{$self->{responses}}, + OpenSRF::Transport::WebSockets::Message->new(json => $data)); + } + ); + + $self->{client} = $client; # needed in connect/recv callback + + $client->connect; + + while (!$self->{connected}) { $self->recv(-1) } + + return 1; +} + +sub connect_socket { + my $self = shift; + + my $sock_args = { + PeerHost => $self->{host}, + PeerPort => int($self->{port}), + Proto => 'tcp', + %{$self->{ssl_params}} + }; + + $logger->debug("WebSocket socket args: " . Dumper($sock_args)); + warn "WebSocket socket args: " . Dumper($sock_args) . "\n"; + + my $sock = IO::Socket::SSL->new(%$sock_args); + + return $self->{socket} = $sock if $sock && $sock->connected; + + warn "SSL socket connection failed to ". + $self->{host}.":".$self->{port}." => $!\n"; + + return undef; +} + + +sub initialize { + my $self = shift; + return $self->connect; +} + +sub construct { + my ($class, $service) = @_; + $class->peer_handle($class->new($service)->initialize); +} + +sub app { + my $self = shift; + my $app = shift; + $self->{service} = $app if $app; + return $self->{service}; +} + + +# true if a connection exists and the TCP port is open. +sub tcp_connected { + return + $connection && + $connection->{socket} && + $connection->{socket}->connected; +} + +# true if we have completed the websockets handshake w/ the server +sub connected { + my $self = shift; + return $self->tcp_connected && $self->{connected}; +} + +# returns a message if one is availabe or becomes available +# within the timeout provided. A timeout value of 0 or undef +# means check for messages, but don't wait. A timeout of -1 means +# wait until a message arrives. A timeout of > 0 means to wait +# the many seconds for a message to arrive. +sub process { + my $self = shift; + return $self->recv(@_); +} + +sub next_msg { + my $self = shift; + return shift @{$self->{responses}}; +} + +sub peek_msg { + my $self = shift; + return (@{$self->{responses}} > 0); +} + +# ----------------------------------------------------------- +# Puts a file handle into blocking mode +# ----------------------------------------------------------- +sub set_block { + my $fh = shift; + my $flags = fcntl($fh, F_GETFL, 0); + $flags &= ~O_NONBLOCK; + fcntl($fh, F_SETFL, $flags); +} + + +# ----------------------------------------------------------- +# Puts a file handle into non-blocking mode +# ----------------------------------------------------------- +sub set_nonblock { + my $fh = shift; + my $flags = fcntl($fh, F_GETFL, 0); + fcntl($fh, F_SETFL, $flags | O_NONBLOCK); +} + +sub recv { + my ($self, $timeout) = @_; + return $self->next_msg if $self->peek_msg; + + $timeout ||= 0; + $timeout = undef if $timeout < 0; + my $socket = $self->{socket}; + + set_block($socket); + + # build the select readset + my $infile = ''; + vec($infile, $socket->fileno, 1) = 1; + + my $nfound = select($infile, undef, undef, $timeout); + return undef if !$nfound or $nfound == -1; + + # now slurp the data off the socket + my $buf; + my $read_size = 1024; + my $nonblock = 0; + my $nbytes; + my $first_read = 1; + + while($nbytes = sysread($socket, $buf, $read_size)) { + $self->{client}->read($buf); + if($nbytes < $read_size or $self->peek_msg) { + set_block($socket) if $nonblock; + last; + } + set_nonblock($socket) unless $nonblock; + $nonblock = 1; + $first_read = 0; + } + + if ($first_read and defined $nbytes and $nbytes == 0) { + # if the first read on an active socket is 0 bytes, + # the socket has been disconnected from the remote end. + die "Disconnected from WebSockets server\n"; + } + + return $self->next_msg; +} + + +sub send { + my $self = shift; + my $msg = OpenSRF::Transport::WebSockets::Message->new(@_); + + $msg->osrf_xid($logger->get_osrf_xid); + my $json = $msg->serialize; + + if (my $warn_size = $self->{msg_size_warn}) { + use bytes; + my $len = length($json); + $logger->warn("Sending large message of $len bytes") + if $len >= $warn_size; + } + + $self->{client}->write($json); +} + +sub disconnect { + my $self = shift; + $self->{connected} = 0; + return unless $self->{client}; + $self->{client}->disconnect; + delete $self->{client}; +} + + +1; + diff --git a/src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm b/src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm new file mode 100644 index 0000000..1078028 --- /dev/null +++ b/src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm @@ -0,0 +1,136 @@ +package OpenSRF::Transport::WebSockets::Message; +# ------------------------------------------------------------- +# Copyright (C) 2016, King County Library System +# Bill Erickson +# +# This program is free software; you can redistribute it and/or +# modify it under the terms of the GNU General Public License +# as published by the Free Software Foundation; either version 2 +# of the License, or (at your option) any later version. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU General Public License for more details. +# ------------------------------------------------------------- +use strict; +use warnings; +use OpenSRF::Utils::JSON; + +sub new { + my $class = shift; + warn "Message: @_\n"; + my %args = @_; + my $self = bless({}, $class); + + if ($args{json}) { + $self->parse_json($args{json}); + } else { + $self->{to} = $args{to}; + $self->{body} = $args{body}; # oilsMessage array as JSON + $self->{msg} = $args{msg}; # oilsMessage array as objects + $self->{thread} = $args{thread} || $$ . rand(); + } + + return $self; +} + +sub to { + my ($self, $to) = @_; + $self->{to} = $to if defined $to; + return $self->{to}; +} + +sub thread { + my ($self, $thread) = @_; + $self->{thread} = $thread if defined $thread; + return $self->{thread}; +} + +# contains a JSON-ified oilsMessage +sub body { + my ($self, $body) = @_; + if (defined $body) { + $self->{body} = $body; + } elsif (!$self->{body} && $self->{msg}) { + $self->{body} = OpenSRF::Utils::JSON->perl2JSON($self->{msg}); + } + return $self->{body}; +} + +sub osrf_xid { + my($self, $osrf_xid) = @_; + $self->{osrf_xid} = $osrf_xid if defined $osrf_xid; + return $self->{osrf_xid}; +} + +sub serialize { + my $self = shift; + return OpenSRF::Utils::JSON->perl2JSON({ + service => $self->{to}, + thread => $self->{thread}, + osrf_msg => + # to JSON-ify this message, must first un-JSON-ify the body. + # TODO: teach the upper layers to avoid unnecessary + # JSON translations. + $self->{msg} ? $self->{msg} : + OpenSRF::Utils::JSON->JSON2perl($self->{body}) + }); +} + +sub parse_json { + my ($self, $json) = @_; + my $blob = OpenSRF::Utils::JSON->JSON2perl($json); + $self->{to} = $blob->{service}; + $self->{thread} = $blob->{thread}; + # NOTE: see TODO above re: extra JSON translations + $self->{msg} = $blob->{osrf_msg}; +} + +sub msg { + my($self, $msg) = @_; + $self->{msg} = $msg if $msg; + return $self->{msg}; +} + +# TODO: these came from MessageWrapper... do we need any of these? + +sub toString { +} + +sub get_body { +} + +sub get_sess_id { +} + +sub get_msg_type { +} + +sub get_remote_id { +} + +sub setType { +} + +sub setTo { +} + +sub setThread { +} + +sub setBody { +} + +sub set_router_command { +} +sub set_router_class { +} + +sub set_osrf_xid { +} + +sub get_osrf_xid { +} + +1; -- 2.11.0