perl websocket client WIP user/berick/perl-websockets-client
authorBill Erickson <berickxx@gmail.com>
Thu, 10 Mar 2016 19:36:45 +0000 (14:36 -0500)
committerBill Erickson <berickxx@gmail.com>
Thu, 10 Mar 2016 19:36:45 +0000 (14:36 -0500)
Signed-off-by: Bill Erickson <berickxx@gmail.com>
src/extras/perl-websocket-client.pl
src/perl/lib/OpenSRF/AppSession.pm
src/perl/lib/OpenSRF/System.pm
src/perl/lib/OpenSRF/Transport/PeerHandle.pm
src/perl/lib/OpenSRF/Transport/WebSockets/Client.pm
src/perl/lib/OpenSRF/Transport/WebSockets/Message.pm

index 8302c17..d9e1278 100755 (executable)
 # -------------------------------------------------------------
 use strict;
 use warnings;
+use OpenSRF::System;
+use OpenSRF::AppSession;
 use OpenSRF::DomainObject::oilsMethod;
 use OpenSRF::DomainObject::oilsMessage;
 use OpenSRF::Transport::WebSockets::Client;
 
+my $osrf_config = '/openils/conf/opensrf_core_ws.xml';
+
+OpenSRF::Transport->message_envelope(
+    "OpenSRF::Transport::WebSockets::Message");
+
+OpenSRF::Transport::PeerHandle->peer_class(
+    "OpenSRF::Transport::WebSockets::Client");
+
+OpenSRF::System->bootstrap_client(config_file => $osrf_config);
+
+my $ses = OpenSRF::AppSession->create('open-ils.auth');
+my $req = $ses->request(
+    'opensrf.system.echo', 'hello, world', "g'night, moon");
+
+while (1) {
+    my $resp;
+    eval { $resp = $req->recv };
+    if ($@) {
+        warn "Request failed: $@\n";
+        last;
+    }
+    last unless $resp;
+    print "Received: " . $resp->content . "\n";
+}
+
+
+
+
+
+
+__DATA__
+
+# direct/manual approach
+
 my $method = OpenSRF::DomainObject::oilsMethod->new(
     method => 'opensrf.system.echo',
     params => ['hello, world']
index 9e371c7..c1760c3 100644 (file)
@@ -207,6 +207,11 @@ sub get_app_targets {
        my $app = shift;
 
        my $conf = OpenSRF::Utils::Config->current;
+
+    # Direct routing bypasses routers and always uses
+    # the top-level service name as the recipient.
+    return $app if $conf->bootstrap->direct_routing;
+
        my $router_name = $conf->bootstrap->router_name || 'router';
        my $domain = $conf->bootstrap->domain;
        $logger->error("use of <domains/> is deprecated") if $conf->bootstrap->domains;
@@ -240,13 +245,7 @@ sub create {
                                
        $logger->debug( "AppSession creating new client session for $app", DEBUG );
 
-       my $stateless = 0;
-       my $c = OpenSRF::Utils::SettingsClient->new();
-       # we can get an infinite loop if we're grabbing the settings and we
-       # need the settings to grab the settings...
-       if($app ne "opensrf.settings" || $c->has_config()) { 
-               $stateless = $c->config_value("apps", $app, "stateless");
-       }
+       my $stateless = 1;
 
        my $sess_id = time . rand( $$ );
        while ( $class->find($sess_id) ) {
index 39aeaf9..3c3f44e 100644 (file)
@@ -32,8 +32,15 @@ sub load_bootstrap_config {
 
     OpenSRF::Utils::Config->load(config_file => $bootstrap_config_file);
     OpenSRF::Utils::JSON->register_class_hint(name => "OpenSRF::Application", hint => "method", type => "hash");
-    OpenSRF::Transport->message_envelope("OpenSRF::Transport::SlimJabber::MessageWrapper");
-    OpenSRF::Transport::PeerHandle->set_peer_client("OpenSRF::Transport::SlimJabber::PeerConnection");
+
+    OpenSRF::Transport->message_envelope(
+        "OpenSRF::Transport::SlimJabber::MessageWrapper")
+        unless OpenSRF::Transport->message_envelope;
+
+    OpenSRF::Transport::PeerHandle->peer_class(
+        "OpenSRF::Transport::SlimJabber::PeerConnection")
+        unless OpenSRF::Transport::PeerHandle->peer_class;
+
     OpenSRF::Application->server_class('client');
     # Read in a shared portion of the config file
     # for later use in log parameter redaction
index e263971..1e163cc 100644 (file)
@@ -5,6 +5,7 @@ use base qw/OpenSRF::Transport::SlimJabber::PeerConnection/;
 use vars '@ISA';
 
 my $peer;
+my $peer_class;
 
 =head2 peer_handle( $handle )
 
@@ -26,15 +27,17 @@ and that module is 'used' and unshifted into @ISA.  We now have that
 classes capabilities.  
 
 =cut
-sub set_peer_client {
-       my( $class, $peer ) = @_;
-       if( $peer ) {
-               $peer->use;
+sub peer_class {
+       my( $class, $peer_cls ) = @_;
+       if( $peer_cls ) {
+               $peer_cls->use;
                if( $@ ) {
                        throw OpenSRF::EX::PANIC ( "Unable to set peer client: $@" );
                }
-               unshift @ISA, $peer;
+               unshift @ISA, $peer_cls;
+        $peer_class = $peer_cls;
        }
+    return $peer_class;
 }
 
 1;
index 8af2f36..5528dfc 100644 (file)
@@ -98,6 +98,8 @@ sub connect {
     my $url = sprintf("wss://%s:%d", $self->{host}, $self->{port});
     $url .= '/' . $self->{path} if $self->{path};
 
+    $logger->info("WebSockets connecting to $url");
+
     my $client = Protocol::WebSocket::Client->new(
         url => $url,
         on_write => sub {
@@ -119,8 +121,11 @@ sub connect {
         # add resposnes to the response queue
         read => sub {
             my ($client, $data) = @_;
-            push(@{$self->{responses}}, 
-                OpenSRF::Transport::WebSockets::Message->new(json => $data));
+            my $msg = 
+                OpenSRF::Transport::WebSockets::Message->new(json => $data);
+            # all received messages come from the service we're talking to.
+            $msg->from($self->app);
+            push(@{$self->{responses}}, $msg);
         }
     );
 
@@ -192,7 +197,9 @@ sub connected {
 # OpenSRF method; checks for received messages.
 sub process {
     my $self = shift;
-    return $self->recv(@_);
+    my $val = $self->recv(@_);
+    return 0 unless $val;
+       return OpenSRF::Transport->handler($self->app, $val);
 }
 
 sub next_msg {
@@ -231,7 +238,9 @@ sub recv {
 
     $timeout ||= 0;
     $timeout = undef if $timeout < 0;
+
     my $socket = $self->{socket};
+    return undef unless $socket;
 
     set_block($socket);
     
@@ -251,7 +260,7 @@ sub recv {
 
     while($nbytes = sysread($socket, $buf, $read_size)) {
         $self->{client}->read($buf);
-        if($nbytes < $read_size or $self->peek_msg) {
+        if($nbytes < $read_size || $self->peek_msg) {
             set_block($socket) if $nonblock;
             last;
         }
index beb5784..cd2d777 100644 (file)
@@ -16,6 +16,7 @@ package OpenSRF::Transport::WebSockets::Message;
 use strict; 
 use warnings;
 use OpenSRF::Utils::JSON;
+use OpenSRF::Utils::Logger qw/$logger/;
 
 sub new {
     my $class = shift;
@@ -26,11 +27,15 @@ sub new {
         $self->parse_json($args{json});
     } else {
         $self->{to}     = $args{to};
+        $self->{from}   = $args{from};
         $self->{body}   = $args{body}; # oilsMessage array as JSON
         $self->{msg}    = $args{msg}; # oilsMessage array as objects
         $self->{thread} = $args{thread} || $$ . rand();
     }
 
+    $logger->debug(
+        "WebSocket::Message building message with thread".$self->{thread});
+
     return $self;
 }
 
@@ -40,6 +45,19 @@ sub to {
     return $self->{to};
 }
 
+sub from {
+    my ($self, $from) = @_;
+    $self->{from} = $from if defined $from;
+    return $self->{from} || ''; # unused by websockets
+}
+
+sub type {
+    my ($self, $type) = @_;
+    $self->{type} = $type if defined $type;
+    return $self->{type} || ''; # unused by websockets
+}
+
+
 sub thread {
     my ($self, $thread) = @_;
     $self->{thread} = $thread if defined $thread;