Skip to content

Commit

Permalink
Merge branch 'kazuho/pacer' into kazuho/jumpstart
Browse files Browse the repository at this point in the history
  • Loading branch information
kazuho committed Mar 9, 2024
2 parents ec38df0 + fc7e0bd commit 9ba8b41
Show file tree
Hide file tree
Showing 5 changed files with 124 additions and 35 deletions.
2 changes: 1 addition & 1 deletion CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ ADD_EXECUTABLE(udpfw t/udpfw.c)

ADD_CUSTOM_TARGET(check env BINARY_DIR=${CMAKE_CURRENT_BINARY_DIR} WITH_DTRACE=${WITH_DTRACE} prove --exec "sh -c" -v ${CMAKE_CURRENT_BINARY_DIR}/*.t t/*.t
WORKING_DIRECTORY ${CMAKE_CURRENT_SOURCE_DIR}
DEPENDS cli test.t)
DEPENDS cli udpfw test.t)

ADD_CUSTOM_TARGET(format clang-format -i `git ls-files include lib src t | egrep '\\.[ch]$$'`)

Expand Down
28 changes: 15 additions & 13 deletions include/quicly/pacer.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,7 @@ extern "C" {
/**
* Simple pacer. The design guarantees that the formula below is met for any given pacer-restricted period:
*
* flow_rate * duration + burst_credit <= bytes_sent < flow_rate * duration + burst_credit + mtu
*
* where `burst_credit` is defined as:
*
* burst_credit = max((9 * mtu + 1) - flow_rate, 0)
*
* and that the sender never sends more than max(10*mtu, flow_rate) per every time slice.
* flow_rate * duration + 8 * mtu <= bytes_sent < flow_rate * duration + 10 * mtu
*/
typedef struct st_quicly_pacer_t {
/**
Expand All @@ -52,7 +46,8 @@ typedef struct st_quicly_pacer_t {
size_t bytes_sent;
} quicly_pacer_t;

#define QUICLY_PACER_CALC_BURST_BYTES(mtu) ((size_t)(mtu)*9 + 1)
#define QUICLY_PACER_BURST_LOW 8 /* lower bound in packets */
#define QUICLY_PACER_BURST_HIGH 10 /* high bound in packets */

/**
* resets the pacer
Expand Down Expand Up @@ -86,7 +81,7 @@ inline void quicly_pacer_reset(quicly_pacer_t *pacer)
inline int64_t quicly_pacer_can_send_at(quicly_pacer_t *pacer, uint32_t bytes_per_msec, uint16_t mtu)
{
/* return "now" if we have room in current msec */
size_t burst_size = QUICLY_PACER_CALC_BURST_BYTES(mtu);
size_t burst_size = QUICLY_PACER_BURST_LOW * mtu + 1;
size_t burst_credit = burst_size > bytes_per_msec ? burst_size - bytes_per_msec : 0;
if (pacer->bytes_sent < bytes_per_msec + burst_credit)
return 0;
Expand All @@ -106,8 +101,8 @@ inline uint64_t quicly_pacer_get_window(quicly_pacer_t *pacer, int64_t now, uint
if (now < can_send_at)
return 0;

/* Calculate burst window, as max(10mtu, bytes_per_msec) */
size_t burst_window = QUICLY_PACER_CALC_BURST_BYTES(mtu);
/* Calculate the upper bound of burst window (the size is later rounded up) */
size_t burst_window = (QUICLY_PACER_BURST_HIGH - 1) * mtu + 1;
if (burst_window < bytes_per_msec)
burst_window = bytes_per_msec;

Expand All @@ -118,11 +113,18 @@ inline uint64_t quicly_pacer_get_window(quicly_pacer_t *pacer, int64_t now, uint
uint64_t window, delta = (now - pacer->at) * bytes_per_msec;
if (pacer->bytes_sent > delta) {
pacer->bytes_sent -= delta;
window = burst_window > pacer->bytes_sent ? burst_window - pacer->bytes_sent : 1;
if (burst_window > pacer->bytes_sent) {
window = (burst_window - pacer->bytes_sent + mtu - 1) / mtu;
if (window < 2)
window = 2;
} else {
window = 2;
}
} else {
pacer->bytes_sent = 0;
window = burst_window;
window = (burst_window + mtu - 1) / mtu;
}
window *= mtu;

pacer->at = now;

Expand Down
7 changes: 4 additions & 3 deletions lib/quicly.c
Original file line number Diff line number Diff line change
Expand Up @@ -5047,9 +5047,10 @@ static int do_send(quicly_conn_t *conn, quicly_send_context_t *s)
conn->egress.send_ack_at = INT64_MAX; /* we have sent ACKs for every epoch (or before address validation) */
int can_send_stream_data = scheduler_can_send(conn);
update_send_alarm(conn, can_send_stream_data, 1);
update_cc_limited(conn, can_send_stream_data && (s->num_datagrams == s->max_datagrams ||
conn->egress.loss.sentmap.bytes_in_flight >= conn->egress.cc.cwnd ||
pacer_can_send_at(conn) > conn->stash.now));
update_cc_limited(conn, can_send_stream_data && conn->super.remote.address_validation.validated &&
(s->num_datagrams == s->max_datagrams ||
conn->egress.loss.sentmap.bytes_in_flight >= conn->egress.cc.cwnd ||
pacer_can_send_at(conn) > conn->stash.now));
if (s->num_datagrams != 0)
update_idle_timeout(conn, 0);
}
Expand Down
99 changes: 94 additions & 5 deletions t/e2e.t
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,14 @@ use strict;
use warnings;
use Digest::MD5 qw(md5_hex);
use File::Temp qw(tempdir);
use IO::Select;
use IO::Socket::INET;
use JSON;
use Net::EmptyPort qw(empty_port);
use POSIX ":sys_wait_h";
use Scope::Guard qw(scope_guard);
use Test::More;
use Time::HiRes qw(sleep);
use Time::HiRes qw(sleep time);

sub complex ($$;$) {
my $s = shift;
Expand All @@ -27,10 +28,15 @@ sub complex ($$;$) {

$ENV{BINARY_DIR} ||= ".";
my $cli = "$ENV{BINARY_DIR}/cli";
my $udpfw = "$ENV{BINARY_DIR}/udpfw";
my $port = empty_port({
host => "127.0.0.1",
proto => "udp",
});
my $udpfw_port = empty_port({
host => "127.0.0.1",
proto => "udp",
});
my $tempdir = tempdir(CLEANUP => 1);

subtest "hello" => sub {
Expand Down Expand Up @@ -347,6 +353,83 @@ subtest "raw-certificates-ec" => sub {
is $resp, "hello world\n";
};

subtest "slow-start" => sub {
# spawn udpfw that applies 100ms RTT but otherwise nothing
my $udpfw_guard = spawn_process(
["sh", "-c", "exec $udpfw -b 100 -i 1 -p 0 -B 100 -I 1 -P 100000 -l $udpfw_port 127.0.0.1 $port > /dev/null 2>&1"],
$udpfw_port,
);

# read first $size bytes from client $cli (which would be the payload received) and check RT
my $doit = sub {
my ($size, $rt_min, $rt_max) = @_;
subtest "${size}B" => sub {
my $start_at = time;
open my $fh, "-|", "$cli -p /$size 127.0.0.1 $udpfw_port 2>&1"
or die "failed to launch $cli:$!";
for (my $total_read = 0; $total_read < $size;) {
IO::Select->new($fh)->can_read(); # block until the command writes something
my $nread = sysread $fh, my $buf, 65536;
die "failed to read from pipe, got $nread:$!"
unless $nread > 0;
$total_read += $nread;
}
my $elapsed = time - $start_at;
diag $elapsed;
cmp_ok $rt_min * 0.1, '<=', $elapsed, "RT >= $rt_min";
cmp_ok $rt_max * 0.1, '>=', $elapsed, "RT <= $rt_max";
};
};

my $each_cc = sub {
my $cb = shift;
for my $cc (qw(reno pico cubic)) {
subtest $cc => sub {
$cb->($cc);
};
}
};

subtest "no-pacing" => sub {
$each_cc->(sub {
my $cc = shift;
subtest "respect-app-limited" => sub {
plan skip_all => "Cubic TODO respect app-limited"
if $cc eq "cubic";
my $guard = spawn_server("-C", "$cc:10");
# tail of 1st, 2nd, and 3rd batch fits into each round trip
$doit->(@$_)
for ([14000, 2, 2.5], [45000, 3, 3.5], [72000, 4, 4.5]);
};
subtest "disregard-app-limited" => sub {
my $guard = spawn_server("-C", "$cc:10", "--disregard-app-limited");
# tail of 1st, 2nd, and 3rd batch fits into each round trip
$doit->(@$_)
for ([16000, 2, 2.5], [48000, 3, 3.5], [72000, 4, 4.5]);
};
});
};

subtest "pacing" => sub {
$each_cc->(sub {
my $cc = shift;
subtest "respect-app-limited" => sub {
plan skip_all => "Cubic TODO respect app-limited"
if $cc eq "cubic";
my $guard = spawn_server("-C", "$cc:20:p");
# check head of 1st and 3rd batch, tail of 1st and 2nd
$doit->(@$_)
for ([1000, 2, 2.3], [28000, 2.3, 3], [85000, 3.3, 4], [89000, 4, 4.5]);
};
subtest "disregard-app-limited" => sub {
my $guard = spawn_server("-C", "$cc:20:p", "--disregard-app-limited");
# tail of 1st, 2nd, and 3rd batch fits into each round trip
$doit->(@$_)
for ([1000, 2, 2.3], [30000, 2.3, 3], [87000, 3.3, 4], [96000, 4, 4.5]);
};
});
}
};

done_testing;

Expand All @@ -357,16 +440,22 @@ sub spawn_server {
} else {
@cmd = ($cli, "-k", "t/assets/server.key", "-c", "t/assets/server.crt", @_, "127.0.0.1", $port);
}
spawn_process(\@cmd, $port);
}

sub spawn_process {
my ($cmd, $listen_port) = @_;

my $pid = fork;
die "fork failed:$!"
unless defined $pid;
if ($pid == 0) {
exec @cmd;
die "failed to exec $cmd[0]:$?";
exec @$cmd;
die "failed to exec @{[$cmd->[0]]}:$?";
}
while (`netstat -na` !~ /^udp.*\s127\.0\.0\.1[\.:]$port\s/m) {
while (`netstat -na` !~ /^udp.*\s(127\.0\.0\.1|0\.0\.0\.0|\*)[\.:]$listen_port\s/m) {
if (waitpid($pid, WNOHANG) == $pid) {
die "failed to launch server";
die "failed to launch @{[$cmd->[0]]}";
}
sleep 0.1;
}
Expand Down
23 changes: 10 additions & 13 deletions t/pacer.c
Original file line number Diff line number Diff line change
Expand Up @@ -108,26 +108,23 @@ static void test_medium(void)

static void test_slow(void)
{
const uint32_t bytes_per_msec = 500; /* one packet every 2.4ms */
const uint32_t bytes_per_msec = 700;
quicly_pacer_t pacer;
int64_t now = 1;

quicly_pacer_reset(&pacer);

now = test_pattern(&pacer, now, bytes_per_msec,
(const struct pattern[]){
{1, 10 * mtu, 10 * mtu}, /* borrow 1199 bytes */
{4, 1 * mtu, 1 * mtu}, /* borrowing 899 bytes, after 3ms */
{6, 1 * mtu, 1 * mtu}, /* borrowing 1099 bytes */
{9, 1 * mtu, 1 * mtu}, /* borrowing 799 bytes, after 3ms */
{11, 1 * mtu, 1 * mtu}, /* borrowing 999 bytes */
{13, 1 * mtu, 1 * mtu}, /* borrowing 1199 bytes */
{16, 1 * mtu, 1 * mtu}, /* borrowing 899 bytes, after 3ms */
{18, 1 * mtu, 1 * mtu}, /* borrowing 1099 bytes */
{21, 1 * mtu, 1 * mtu}, /* borrowing 799 bytes, after 3ms */
{23, 1 * mtu, 1 * mtu}, /* borrowing 999 bytes */
{25, 1 * mtu, 1 * mtu}, /* borrowing 1199 bytes */
{28, 1 * mtu, 1 * mtu}, /* borrowing 899 bytes, after 3ms */
{1, 10 * mtu, 10 * mtu}, /* borrow 12000 bytes */
{5, 2 * mtu, 2 * mtu}, /* borrowing 11600 bytes after 4ms */
{8, 2 * mtu, 2 * mtu}, /* borrowing 11900 bytes after 3ms */
{12, 2 * mtu, 2 * mtu}, /* borrowing 11500 bytes after 4ms */
{15, 2 * mtu, 2 * mtu}, /* borrowing 11800 bytes after 3ms */
{19, 2 * mtu, 2 * mtu}, /* borrowing 11400 bytes after 4ms */
{22, 2 * mtu, 2 * mtu}, /* borrowing 11700 bytes after 3ms */
{25, 2 * mtu, 2 * mtu}, /* borrowing 12000 bytes after 3ms */
{29, 2 * mtu, 2 * mtu}, /* borrowing 11600 bytes after 4ms */
{0},
});
}
Expand Down

0 comments on commit 9ba8b41

Please sign in to comment.