Skip to content

Commit

Permalink
Added modules DcServer, DcClient and simple examples of how to use both.
Browse files Browse the repository at this point in the history
  • Loading branch information
Donnie Cameron committed Jun 1, 2009
1 parent 95aba5a commit b0813a9
Show file tree
Hide file tree
Showing 5 changed files with 169 additions and 0 deletions.
33 changes: 33 additions & 0 deletions DcClient.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
package DcClient;

use IO::Socket;
use strict;
use warnings;

sub new {
# Params: host, port
my ($proto, %param)= @_;
my $class= ref($proto) || $proto;
bless +{%param} => $class;
}

sub stop_server {shift->query('$self->stop')}

sub query {
my ($self, $query)= @_;
my ($buffer, $reply)= ('', '');
my $socket= new IO::Socket::INET(
PeerAddr => $self->{host} || 'localhost',
PeerPort => $self->{port} || 8191,
Proto => 'tcp');
die "$!. Is the server running?\n" unless $socket;
print $socket $query . "\n.\n";
while($buffer= <$socket>) {
$reply.= $buffer;
last if $reply =~ s/\n\.\n$//;
}
$socket->shutdown(2);
$reply;
}

1;
121 changes: 121 additions & 0 deletions DcServer.pm
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package DcServer;

use lib '.';
use threads;
use threads::shared;
use Thread::Queue;
use IO::Socket;
use Time::HiRes qw/sleep/;
use strict;
use warnings;

my $stop :shared;
my $accept_queue= Thread::Queue->new;
my $closed_queue= Thread::Queue->new;

sub new {
# Params: host, port, thread_count, eom_marker, main_yield, main_cb,
# done_cb, processor_cb
my ($proto, %param)= @_;
my $class= ref($proto) || $proto;
bless +{
socket_defaults => +{
LocalHost => $param{host} || 'localhost',
LocalPort => $param{port} || 8191},
thread_count => $param{thread_count} || 10,
main_yield => $param{main_yield} || 5,
main_cb => $param{main_cb} || sub {},
done_cb => $param{done_cb} || sub {},
processor_cb => $param{processor_cb} || \&processor,
eom_marker => $param{eom_marker} || "\\n\\.\\n",
thread_pool => undef
} => $class;
}

# This callback (for processor_cb) does sommething stupid with the string
# that the client sends to the server, then returns the new string. This
# code hopefully illustrates how to put together a callback function for
# processing data from clients.
sub processor {
my ($data, $ip, $tid, $fnstop)= @_;
"[tid=$tid; ip=$ip] " . join('', reverse(split //, $data));
}

sub start {
my $self= shift;

# Start a thread to dispatch incoming requests
threads->create(sub {$self->accept_requests})->detach;

# Start the thread pool to handle dispatched requests
for (1 .. $self->{thread_count}) {
threads->create(sub {$self->request_handler})->detach}

# Start a loop for performing tasks in the background, while
# handling requests
$self->main_loop;

$self->{done_cb}->();
}

sub stop {
my $self= shift;
$stop= 1;
}

sub main_loop {
my $self= shift;
my $counter= 1;
until($stop) {
$self->{main_cb}->($counter++, sub {$self->stop});
sleep $self->{main_yield};
}
}

sub accept_requests {
my $self= shift;
my ($csocket, $n, %socket);
my $lsocket= new IO::Socket::INET(
%{$self->{socket_defaults}},
Proto => 'tcp',
Listen => 1,
Reuse => 1);
die "Can't create listerner socket. Server can't start. $!." unless $lsocket;
until($stop) {
$csocket= $lsocket->accept;
$n= fileno $csocket;
$socket{$n}= $csocket;
$accept_queue->enqueue($n . ' ' . inet_ntoa($csocket->peeraddr));
while($n= $closed_queue->dequeue_nb) {
$socket{$n}->shutdown(2);
delete $socket{$n}}}
$lsocket->shutdown(2);
print "Thread ", threads->tid, " terminated.\n";
}

sub request_handler {
my $self= shift;
my ($n, $ip, $data);
my ($receive_time, $process_time, $send_time);
until($stop) {
($n, $ip)= split / /, $accept_queue->dequeue;
next unless $n;
open my $socket, '+<&=' . $n or die $!;
if(defined($data= $self->receive_client_request($socket))) {
print $socket $self->{processor_cb}->(
$data, $ip, threads->tid, sub {$self->stop}
), "\n.\n"}
close $socket;
$closed_queue->enqueue($n)}
}

sub receive_client_request {
my ($self, $socket)= @_;
my ($eom, $buffer, $data)= $self->{eom_marker};
while($buffer= <$socket>) {
$data.= $buffer;
last if $data =~ s/$eom$//}
$data
}

1;
2 changes: 2 additions & 0 deletions README
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
See http://donnieknows.com/blog/perl-sockets-swimming-thread-pool

5 changes: 5 additions & 0 deletions client.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
use DcClient;
my $message= shift;
die "Usage: perl client.pl string\n" unless defined($message);
my $c= DcClient->new;
print "$message => ", $c->query($message), "\n";
8 changes: 8 additions & 0 deletions server.pl
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
use DcServer;
my $server= DcServer->new(processor_cb => \&reverse_text);
$server->start;

sub reverse_text {
my $data= shift;
join('', reverse(split //, $data));
}

0 comments on commit b0813a9

Please sign in to comment.