From 924e960d0a0931e34c372037573df9b0439cb183 Mon Sep 17 00:00:00 2001 From: Mathieu Parent Date: Fri, 9 Apr 2010 14:40:33 +0200 Subject: [PATCH] perl-skinny: enhance test - Use thread for: keepalive, receive and send - Run indefinitevly --- src/mod/endpoints/mod_skinny/Net/Skinny.pm | 36 +++--- .../endpoints/mod_skinny/Net/Skinny/Client.pm | 94 ++++++++++++++ .../mod_skinny/Net/Skinny/Message.pm | 118 ++++++++++++------ .../mod_skinny/Net/Skinny/Protocol.pm | 3 +- src/mod/endpoints/mod_skinny/test-skinny.pl | 12 +- 5 files changed, 199 insertions(+), 64 deletions(-) create mode 100644 src/mod/endpoints/mod_skinny/Net/Skinny/Client.pm diff --git a/src/mod/endpoints/mod_skinny/Net/Skinny.pm b/src/mod/endpoints/mod_skinny/Net/Skinny.pm index e0ae460eac..c4ee1992ad 100644 --- a/src/mod/endpoints/mod_skinny/Net/Skinny.pm +++ b/src/mod/endpoints/mod_skinny/Net/Skinny.pm @@ -6,26 +6,25 @@ package Net::Skinny; use strict; use warnings; -use IO::Socket; + +require IO::Socket; + use Net::Skinny::Protocol qw/:all/; -our(@ISA); -@ISA = qw(IO::Socket::INET); +our @ISA = qw(IO::Socket::INET); sub new { shift->SUPER::new(PeerPort => 2000, @_); } -sub send_data +sub send_raw { my $self = shift; my $type = shift; - my $data = shift; - my $len = length($data)+4; + my $raw = shift; + my $len = length($raw)+4; printf "Sending message (length=%d, type=%s (%X))", $len, Net::Skinny::Protocol::skinny_message_type2str($type), $type; - $self->send( - pack("VVV", $len, 0, $type). - $data); + $self->send(pack("VVV", $len, 0, $type).$raw); printf ".\n"; } @@ -33,11 +32,8 @@ sub send_message { my $self = shift; my $type = shift; - return Net::Skinny::Message->new( - $self, - $type, - @_ - )->send(); + my $message = Net::Skinny::Message->new($type, @_); + return $self->send_raw($message->type(), $message->raw()); } sub receive_message @@ -58,20 +54,26 @@ sub receive_message printf "type=%s (%X))", Net::Skinny::Protocol::skinny_message_type2str($type), $type; if($len > 4) { $self->recv($buf, $len-4); + } else { + $buf = ''; } printf ".\n"; + return Net::Skinny::Message->new_raw($type, $buf); } sub sleep { my $self = shift; my $t = shift; - + my %args = @_; + $args{'quiet'} = 0 if not $args{'quiet'}; printf "Sleeping %d seconds", $t; while(--$t){ sleep(1); - printf "." if $t % 10; - printf "_" unless $t % 10; + if(!$args{'quiet'}) { + printf "." if $t % 10; + printf "_" unless $t % 10; + } } printf ".\n"; } diff --git a/src/mod/endpoints/mod_skinny/Net/Skinny/Client.pm b/src/mod/endpoints/mod_skinny/Net/Skinny/Client.pm new file mode 100644 index 0000000000..ee063cc911 --- /dev/null +++ b/src/mod/endpoints/mod_skinny/Net/Skinny/Client.pm @@ -0,0 +1,94 @@ +# Copyright (c) 2010 Mathieu Parent . +# All rights reserved. This program is free software; you can redistribute it +# and/or modify it under the same terms as Perl itself. + +package Net::Skinny::Client; + +use strict; +use warnings; + +use Config; +use threads; +use threads::shared; +use Thread::Queue; + +require Net::Skinny; +use Net::Skinny::Protocol qw/:all/; +use Net::Skinny::Message; + +our(@ISA); +@ISA = qw(Net::Skinny); + +my $keep_alive_thread; +my $keep_alives :shared; +our $kept_self; +my $messages_send_queue; +my $messages_receive_queue; + +$Config{useithreads} or die('Recompile Perl with threads to run this program.'); + +sub new { + $kept_self = shift->SUPER::new(@_); + $messages_send_queue = Thread::Queue->new(); + $messages_receive_queue = Thread::Queue->new(); + threads->create(\&send_messages_thread_func); + threads->create(\&receive_messages_thread_func); + return $kept_self; +} + +sub send_message { + my $self = shift; + $messages_send_queue->enqueue(\@_); +} + +sub receive_message { + my $self = shift; + my $message = $messages_receive_queue->dequeue(); + if($message->type() == 0x100) {#keepaliveack + if(1) { + lock($keep_alives); + $keep_alives--; + } + $message = $messages_receive_queue->dequeue(); + } + return $message; +} + +sub launch_keep_alive_thread +{ + if(!$keep_alive_thread) { + $keep_alive_thread = threads->create(\&keep_alive_thread_func); + } else { + print "keep-alive thread is already running\n"; + } + return $keep_alive_thread; +} + +sub keep_alive_thread_func +{ + while($kept_self) { + if(1) { + lock($keep_alives); + $keep_alives++; + $kept_self->send_message(KEEP_ALIVE_MESSAGE); + } #mutex unlocked + $kept_self->sleep(30, quiet => 0); + } +} + +sub send_messages_thread_func +{ + while(my $message = $messages_send_queue->dequeue()) { + my $type = shift @$message; + $kept_self->SUPER::send_message($type, @$message); + } +} + +sub receive_messages_thread_func +{ + while(1) { + $messages_receive_queue->enqueue($kept_self->SUPER::receive_message()); + } +} + +1; diff --git a/src/mod/endpoints/mod_skinny/Net/Skinny/Message.pm b/src/mod/endpoints/mod_skinny/Net/Skinny/Message.pm index 6a1a0ef1cf..5955216e1c 100644 --- a/src/mod/endpoints/mod_skinny/Net/Skinny/Message.pm +++ b/src/mod/endpoints/mod_skinny/Net/Skinny/Message.pm @@ -7,54 +7,96 @@ package Net::Skinny::Message; use strict; use warnings; +use threads; +use threads::shared; + use Net::Skinny::Protocol qw/:all/; -use Data::Dumper; - -require Exporter; -our @ISA = qw(Exporter); -our @EXPORT = qw(send); - -sub new { +sub new_empty { my $class = shift; my $self = {}; bless $self, $class; - $self->{'socket'} = shift; - $self->{'type'} = shift; - %{$self->{'data'}} = @_; - return $ self; + $self->{'type'} = undef; + $self->{'data'} = undef; + $self->{'raw'} = undef; + return $self; } -sub send { +sub new { + my $self = shift->new_empty(); + $self->type(shift); + $self->data(@_) if @_; + return $self; +} + +sub new_raw { + my $self = shift->new_empty(); + $self->type(shift); + $self->raw(shift); + return $self; +} + +sub type +{ my $self = shift; - my $struct = Net::Skinny::Protocol::skinny_message_struct($self->{'type'}); - my $raw = ''; - my $parsed_count = 0; - for my $info ( @$struct) { - last if !defined($self->{'data'}{@$info[1]}); - if(@$info[0] eq 'char') { - $raw .= pack("a".@$info[2], $self->{'data'}{@$info[1]}); - } elsif(@$info[0] eq 'uint32_t') { - $raw .= pack("V".@$info[2], $self->{'data'}{@$info[1]}); - } elsif(@$info[0] eq 'uint16_t') { - $raw .= pack("n".@$info[2], $self->{'data'}{@$info[1]}); - } elsif(@$info[0] eq 'struct in_addr') { - $raw .= pack("V".@$info[2], $self->{'data'}{@$info[1]}); - } elsif(@$info[0] eq 'struct station_capabilities') { - $raw .= $self->{'data'}{@$info[1]}; - } else { - printf "Unknown type: %s\n", @$info[0]; - return; - } - $parsed_count++; + my $type = @_ ? shift : undef; + if(defined($type)) { + $self->{'type'} = $type; } - if($parsed_count != scalar(keys %{$self->{'data'}})) { - printf "Incomplete message (type=%s (%X)) %d out of %d\n", Net::Skinny::Protocol::skinny_message_type2str($self->{'type'}), $self->{'type'}, - $parsed_count, scalar(keys %{$self->{'data'}}); - print Dumper(@$struct); - return; + return $self->{'type'}; +} + +sub data +{ + my $self = shift; + my @data = @_; + if(@data) { + %{$self->{'data'}} = @data; + $self->{'raw'} = undef; + } elsif(!defined($self->{'data'})) { + printf "Conversion from raw to data not implemented\n"; } - $self->{'socket'}->send_data($self->{'type'}, $raw); + return $self->{'data'}; +} + +sub raw +{ + my $self = shift; + my $raw = shift || undef; + if(defined($raw)) { + $self->{'raw'} = $raw; + $self->{'data'} = undef; + } + if(!defined($self->{'raw'})) { + my $struct = Net::Skinny::Protocol::skinny_message_struct($self->{'type'}); + my $raw = ''; + my $parsed_count = 0; + for my $info ( @$struct) { + last if !defined($self->{'data'}{@$info[1]}); + if(@$info[0] eq 'char') { + $raw .= pack("a".@$info[2], $self->{'data'}{@$info[1]}); + } elsif(@$info[0] eq 'uint32_t') { + $raw .= pack("V".@$info[2], $self->{'data'}{@$info[1]}); + } elsif(@$info[0] eq 'uint16_t') { + $raw .= pack("n".@$info[2], $self->{'data'}{@$info[1]}); + } elsif(@$info[0] eq 'struct in_addr') { + $raw .= pack("V".@$info[2], $self->{'data'}{@$info[1]}); + } elsif(@$info[0] eq 'struct station_capabilities') { + $raw .= $self->{'data'}{@$info[1]}; + } else { + printf "Unknown type: %s\n", @$info[0]; + return; + } + $parsed_count++; + } + if($parsed_count != scalar(keys %{$self->{'data'}})) { + printf "Incomplete message (type=%s (%X)) %d out of %d\n", Net::Skinny::Protocol::skinny_message_type2str($self->{'type'}), $self->{'type'}, + $parsed_count, scalar(keys %{$self->{'data'}}); + return; + } + $self->{'raw'} = $raw; + } + return $self->{'raw'}; } 1; diff --git a/src/mod/endpoints/mod_skinny/Net/Skinny/Protocol.pm b/src/mod/endpoints/mod_skinny/Net/Skinny/Protocol.pm index bfe0202787..90de304817 100644 --- a/src/mod/endpoints/mod_skinny/Net/Skinny/Protocol.pm +++ b/src/mod/endpoints/mod_skinny/Net/Skinny/Protocol.pm @@ -8,7 +8,6 @@ use strict; no strict "refs"; use warnings; use Carp; -use Data::Dumper; require Exporter; our @ISA = qw(Exporter); @@ -69,7 +68,6 @@ sub _find { printf "Unparsed line '%s' in %s\n", $_, $struct_name; } } - #print "$name: ".Dumper($struct{$name}); } } @sub{@_}; @@ -77,6 +75,7 @@ sub _find { sub skinny_message_type2str { my $message_type = shift; + return "UndefinedTypeMessage" if !defined($message_type); keys %const; while (my ($key, $value) = each %const) { diff --git a/src/mod/endpoints/mod_skinny/test-skinny.pl b/src/mod/endpoints/mod_skinny/test-skinny.pl index 390466f530..a8c7bb5d68 100644 --- a/src/mod/endpoints/mod_skinny/test-skinny.pl +++ b/src/mod/endpoints/mod_skinny/test-skinny.pl @@ -15,6 +15,7 @@ use Sys::Hostname; use Net::Skinny; use Net::Skinny::Protocol qw/:all/; use Net::Skinny::Message; +use Net::Skinny::Client; #Config my $skinny_server = hostname; @@ -23,13 +24,13 @@ my $device_ip = 10+256*(11+256*(12+256*13)); # 10.11.12.13 #====== $| = 1; -my $socket = Net::Skinny->new( +my $socket = Net::Skinny::Client->new( PeerAddr => $skinny_server, PeerPort => 2000, ); if(!$socket) { - print "Unable to connect to server\n"; + printf "Unable to connect to server %s\n", $skinny_server; exit 1; } # ============================================================================= @@ -84,11 +85,8 @@ $socket->send_message( count => 2 ); -for(my $i = 0; $i < 1; $i++) { - $socket->sleep(5); - $socket->send_message(KEEP_ALIVE_MESSAGE); - $socket->receive_message(); # keepaliveack -} +$socket->launch_keep_alive_thread(); + $socket->sleep(5); #NewCall