Skip to content

Commit 68a9e3d

Browse files
authored
feat: auto-tune (dynamic) stream receive window (#176)
- Send Yamux' Pings on an interval to measure the connection round-trip-time. - Dynamically grow the stream receive window based on the round-trip-time and the estimated bandwidth.
1 parent 16ffe54 commit 68a9e3d

File tree

15 files changed

+769
-158
lines changed

15 files changed

+769
-158
lines changed

CHANGELOG.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,8 @@
11
# 0.13.0
22

3+
- Introduce dynamic stream receive window auto-tuning.
4+
While low-resourced deployments maintain the benefit of small buffers, high resource deployments eventually end-up with a window of roughly the bandwidth-delay-product (ideal) and are thus able to use the entire available bandwidth.
5+
See [PR 176](https://github.com/libp2p/rust-yamux/pull/176) for performance results and details on the implementation.
36
- Remove `WindowUpdateMode`.
47
Behavior will always be `WindowUpdateMode::OnRead`, thus enabling flow-control and enforcing backpressure.
58
See [PR 178](https://github.com/libp2p/rust-yamux/pull/178).

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
[workspace]
2-
members = ["yamux", "test-harness"]
2+
members = ["yamux", "test-harness", "quickcheck-ext"]
33
resolver = "2"

quickcheck-ext/Cargo.toml

Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
[package]
2+
name = "quickcheck-ext"
3+
version = "0.1.0"
4+
edition = "2021"
5+
publish = false
6+
license = "Unlicense/MIT"
7+
8+
[package.metadata.release]
9+
release = false
10+
11+
[dependencies]
12+
quickcheck = "1"
13+
num-traits = "0.2"

quickcheck-ext/src/lib.rs

Lines changed: 46 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,46 @@
1+
#![cfg_attr(docsrs, feature(doc_cfg, doc_auto_cfg))]
2+
3+
pub use quickcheck::*;
4+
5+
use core::ops::Range;
6+
use num_traits::sign::Unsigned;
7+
8+
pub trait GenRange {
9+
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, _range: Range<T>) -> T;
10+
11+
fn gen_index(&mut self, ubound: usize) -> usize {
12+
if ubound <= (core::u32::MAX as usize) {
13+
self.gen_range(0..ubound as u32) as usize
14+
} else {
15+
self.gen_range(0..ubound)
16+
}
17+
}
18+
}
19+
20+
impl GenRange for Gen {
21+
fn gen_range<T: Unsigned + Arbitrary + Copy>(&mut self, range: Range<T>) -> T {
22+
<T as Arbitrary>::arbitrary(self) % (range.end - range.start) + range.start
23+
}
24+
}
25+
26+
pub trait SliceRandom {
27+
fn shuffle<T>(&mut self, arr: &mut [T]);
28+
fn choose_multiple<'a, T>(
29+
&mut self,
30+
arr: &'a [T],
31+
amount: usize,
32+
) -> std::iter::Take<std::vec::IntoIter<&'a T>> {
33+
let mut v: Vec<&T> = arr.iter().collect();
34+
self.shuffle(&mut v);
35+
v.into_iter().take(amount)
36+
}
37+
}
38+
39+
impl SliceRandom for Gen {
40+
fn shuffle<T>(&mut self, arr: &mut [T]) {
41+
for i in (1..arr.len()).rev() {
42+
// invariant: elements with index > i have been locked in place.
43+
arr.swap(i, self.gen_index(i + 1));
44+
}
45+
}
46+
}

test-harness/Cargo.toml

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@ publish = false
77
[dependencies]
88
yamux = { path = "../yamux" }
99
futures = "0.3.4"
10-
quickcheck = "1.0"
10+
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }
1111
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
1212
tokio-util = { version = "0.7", features = ["compat"] }
1313
anyhow = "1"
@@ -17,7 +17,6 @@ log = "0.4.17"
1717
criterion = "0.5"
1818
env_logger = "0.10"
1919
futures = "0.3.4"
20-
quickcheck = "1.0"
2120
tokio = { version = "1.0", features = ["net", "rt-multi-thread", "macros", "time"] }
2221
tokio-util = { version = "0.7", features = ["compat"] }
2322
constrained-connection = "0.1"

test-harness/src/lib.rs

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ where
7676
.try_for_each_concurrent(None, |mut stream| async move {
7777
{
7878
let (mut r, mut w) = AsyncReadExt::split(&mut stream);
79-
futures::io::copy(&mut r, &mut w).await?;
79+
futures::io::copy(&mut r, &mut w).await.unwrap();
8080
}
8181
stream.close().await?;
8282
Ok(())
@@ -447,9 +447,21 @@ pub struct TestConfig(pub Config);
447447

448448
impl Arbitrary for TestConfig {
449449
fn arbitrary(g: &mut Gen) -> Self {
450+
use quickcheck::GenRange;
451+
450452
let mut c = Config::default();
453+
let max_num_streams = 512;
454+
451455
c.set_read_after_close(Arbitrary::arbitrary(g));
452-
c.set_receive_window(256 * 1024 + u32::arbitrary(g) % (768 * 1024));
456+
c.set_max_num_streams(max_num_streams);
457+
if bool::arbitrary(g) {
458+
c.set_max_connection_receive_window(Some(
459+
g.gen_range(max_num_streams * (yamux::DEFAULT_CREDIT as usize)..usize::MAX),
460+
));
461+
} else {
462+
c.set_max_connection_receive_window(None);
463+
}
464+
453465
TestConfig(c)
454466
}
455467
}

test-harness/tests/ack_backlog.rs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -197,8 +197,8 @@ where
197197
this.worker_streams.push(ping_pong(stream.unwrap()).boxed());
198198
continue;
199199
}
200-
(Poll::Ready(_), Some(_)) => {
201-
panic!("should not be able to open stream if server hasn't acknowledged existing streams")
200+
(Poll::Ready(e), Some(_)) => {
201+
panic!("should not be able to open stream if server hasn't acknowledged existing streams: {:?}", e)
202202
}
203203
(Poll::Pending, None) => {}
204204
}

yamux/Cargo.toml

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,7 @@ repository = "https://github.com/paritytech/yamux"
1010
edition = "2021"
1111

1212
[dependencies]
13-
futures = { version = "0.3.12", default-features = false, features = ["std"] }
13+
futures = { version = "0.3.12", default-features = false, features = ["std", "executor"] }
1414
log = "0.4.8"
1515
nohash-hasher = "0.2"
1616
parking_lot = "0.12"
@@ -20,4 +20,4 @@ pin-project = "1.1.0"
2020

2121
[dev-dependencies]
2222
futures = { version = "0.3.12", default-features = false, features = ["executor"] }
23-
quickcheck = "1.0"
23+
quickcheck = { package = "quickcheck-ext", path = "../quickcheck-ext" }

yamux/src/connection.rs

Lines changed: 51 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@
1414
1515
mod cleanup;
1616
mod closing;
17+
mod rtt;
1718
mod stream;
1819

1920
use crate::tagged_stream::TaggedStream;
@@ -287,8 +288,15 @@ struct Active<T> {
287288

288289
pending_frames: VecDeque<Frame<()>>,
289290
new_outbound_stream_waker: Option<Waker>,
290-
}
291291

292+
rtt: rtt::Rtt,
293+
294+
/// A stream's `max_stream_receive_window` can grow beyond [`DEFAULT_CREDIT`], see
295+
/// [`Stream::next_window_update`]. This field is the sum of the bytes by which all streams'
296+
/// `max_stream_receive_window` have each exceeded [`DEFAULT_CREDIT`]. Used to enforce
297+
/// [`Config::max_connection_receive_window`].
298+
accumulated_max_stream_windows: Arc<Mutex<usize>>,
299+
}
292300
/// `Stream` to `Connection` commands.
293301
#[derive(Debug)]
294302
pub(crate) enum StreamCommand {
@@ -300,15 +308,13 @@ pub(crate) enum StreamCommand {
300308

301309
/// Possible actions as a result of incoming frame handling.
302310
#[derive(Debug)]
303-
enum Action {
311+
pub(crate) enum Action {
304312
/// Nothing to be done.
305313
None,
306314
/// A new stream has been opened by the remote.
307315
New(Stream),
308316
/// A ping should be answered.
309317
Ping(Frame<Ping>),
310-
/// A stream should be reset.
311-
Reset(Frame<Data>),
312318
/// The connection should be terminated.
313319
Terminate(Frame<GoAway>),
314320
}
@@ -341,7 +347,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
341347
fn new(socket: T, cfg: Config, mode: Mode) -> Self {
342348
let id = Id::random();
343349
log::debug!("new connection: {} ({:?})", id, mode);
344-
let socket = frame::Io::new(id, socket, cfg.max_buffer_size).fuse();
350+
let socket = frame::Io::new(id, socket).fuse();
345351
Active {
346352
id,
347353
mode,
@@ -356,6 +362,8 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
356362
},
357363
pending_frames: VecDeque::default(),
358364
new_outbound_stream_waker: None,
365+
rtt: rtt::Rtt::new(),
366+
accumulated_max_stream_windows: Default::default(),
359367
}
360368
}
361369

@@ -376,6 +384,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
376384
fn poll(&mut self, cx: &mut Context<'_>) -> Poll<Result<Stream>> {
377385
loop {
378386
if self.socket.poll_ready_unpin(cx).is_ready() {
387+
// Note `next_ping` does not register a waker and thus if not called regularly (idle
388+
// connection) no ping is sent. This is deliberate as an idle connection does not
389+
// need RTT measurements to increase its stream receive window.
390+
if let Some(frame) = self.rtt.next_ping() {
391+
self.socket.start_send_unpin(frame.into())?;
392+
continue;
393+
}
394+
379395
if let Some(frame) = self.pending_frames.pop_front() {
380396
self.socket.start_send_unpin(frame)?;
381397
continue;
@@ -439,20 +455,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
439455
log::trace!("{}: creating new outbound stream", self.id);
440456

441457
let id = self.next_stream_id()?;
442-
let extra_credit = self.config.receive_window - DEFAULT_CREDIT;
443-
444-
if extra_credit > 0 {
445-
let mut frame = Frame::window_update(id, extra_credit);
446-
frame.header_mut().syn();
447-
log::trace!("{}/{}: sending initial {}", self.id, id, frame.header());
448-
self.pending_frames.push_back(frame.into());
449-
}
450-
451-
let mut stream = self.make_new_outbound_stream(id, self.config.receive_window);
452-
453-
if extra_credit == 0 {
454-
stream.set_flag(stream::Flag::Syn)
455-
}
458+
let stream = self.make_new_outbound_stream(id);
456459

457460
log::debug!("{}: new outbound {} of {}", self.id, stream, self);
458461
self.streams.insert(id, stream.clone_shared());
@@ -537,7 +540,9 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
537540
fn on_frame(&mut self, frame: Frame<()>) -> Result<Option<Stream>> {
538541
log::trace!("{}: received: {}", self.id, frame.header());
539542

540-
if frame.header().flags().contains(header::ACK) {
543+
if frame.header().flags().contains(header::ACK)
544+
&& matches!(frame.header().tag(), Tag::Data | Tag::WindowUpdate)
545+
{
541546
let id = frame.header().stream_id();
542547
if let Some(stream) = self.streams.get(&id) {
543548
stream
@@ -565,10 +570,6 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
565570
log::trace!("{}/{}: pong", self.id, f.header().stream_id());
566571
self.pending_frames.push_back(f.into());
567572
}
568-
Action::Reset(f) => {
569-
log::trace!("{}/{}: sending reset", self.id, f.header().stream_id());
570-
self.pending_frames.push_back(f.into());
571-
}
572573
Action::Terminate(f) => {
573574
log::trace!("{}: sending term", self.id);
574575
self.pending_frames.push_back(f.into());
@@ -620,23 +621,22 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
620621
log::error!("{}: maximum number of streams reached", self.id);
621622
return Action::Terminate(Frame::internal_error());
622623
}
623-
let mut stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
624+
let stream = self.make_new_inbound_stream(stream_id, DEFAULT_CREDIT);
624625
{
625626
let mut shared = stream.shared();
626627
if is_finish {
627628
shared.update_state(self.id, stream_id, State::RecvClosed);
628629
}
629-
shared.window = shared.window.saturating_sub(frame.body_len());
630+
shared.consume_receive_window(frame.body_len());
630631
shared.buffer.push(frame.into_body());
631632
}
632-
stream.set_flag(stream::Flag::Ack);
633633
self.streams.insert(stream_id, stream.clone_shared());
634634
return Action::New(stream);
635635
}
636636

637637
if let Some(s) = self.streams.get_mut(&stream_id) {
638638
let mut shared = s.lock();
639-
if frame.body().len() > shared.window as usize {
639+
if frame.body_len() > shared.receive_window() {
640640
log::error!(
641641
"{}/{}: frame body larger than window of stream",
642642
self.id,
@@ -647,18 +647,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
647647
if is_finish {
648648
shared.update_state(self.id, stream_id, State::RecvClosed);
649649
}
650-
let max_buffer_size = self.config.max_buffer_size;
651-
if shared.buffer.len() >= max_buffer_size {
652-
log::error!(
653-
"{}/{}: buffer of stream grows beyond limit",
654-
self.id,
655-
stream_id
656-
);
657-
let mut header = Header::data(stream_id, 0);
658-
header.rst();
659-
return Action::Reset(Frame::new(header));
660-
}
661-
shared.window = shared.window.saturating_sub(frame.body_len());
650+
shared.consume_receive_window(frame.body_len());
662651
shared.buffer.push(frame.into_body());
663652
if let Some(w) = shared.reader.take() {
664653
w.wake()
@@ -718,8 +707,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
718707
}
719708

720709
let credit = frame.header().credit() + DEFAULT_CREDIT;
721-
let mut stream = self.make_new_inbound_stream(stream_id, credit);
722-
stream.set_flag(stream::Flag::Ack);
710+
let stream = self.make_new_inbound_stream(stream_id, credit);
723711

724712
if is_finish {
725713
stream
@@ -732,7 +720,7 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
732720

733721
if let Some(s) = self.streams.get_mut(&stream_id) {
734722
let mut shared = s.lock();
735-
shared.credit += frame.header().credit();
723+
shared.increase_send_window_by(frame.header().credit());
736724
if is_finish {
737725
shared.update_state(self.id, stream_id, State::RecvClosed);
738726
}
@@ -761,15 +749,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
761749
fn on_ping(&mut self, frame: &Frame<Ping>) -> Action {
762750
let stream_id = frame.header().stream_id();
763751
if frame.header().flags().contains(header::ACK) {
764-
// pong
765-
return Action::None;
752+
return self.rtt.handle_pong(frame.nonce());
766753
}
767754
if stream_id == CONNECTION_ID || self.streams.contains_key(&stream_id) {
768755
let mut hdr = Header::ping(frame.header().nonce());
769756
hdr.ack();
770757
return Action::Ping(Frame::new(hdr));
771758
}
772-
log::trace!(
759+
log::debug!(
773760
"{}/{}: ping for unknown stream, possibly dropped earlier: {:?}",
774761
self.id,
775762
stream_id,
@@ -794,10 +781,18 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
794781
waker.wake();
795782
}
796783

797-
Stream::new_inbound(id, self.id, config, credit, sender)
784+
Stream::new_inbound(
785+
id,
786+
self.id,
787+
config,
788+
credit,
789+
sender,
790+
self.rtt.clone(),
791+
self.accumulated_max_stream_windows.clone(),
792+
)
798793
}
799794

800-
fn make_new_outbound_stream(&mut self, id: StreamId, window: u32) -> Stream {
795+
fn make_new_outbound_stream(&mut self, id: StreamId) -> Stream {
801796
let config = self.config.clone();
802797

803798
let (sender, receiver) = mpsc::channel(10); // 10 is an arbitrary number.
@@ -806,7 +801,14 @@ impl<T: AsyncRead + AsyncWrite + Unpin> Active<T> {
806801
waker.wake();
807802
}
808803

809-
Stream::new_outbound(id, self.id, config, window, sender)
804+
Stream::new_outbound(
805+
id,
806+
self.id,
807+
config,
808+
sender,
809+
self.rtt.clone(),
810+
self.accumulated_max_stream_windows.clone(),
811+
)
810812
}
811813

812814
fn next_stream_id(&mut self) -> Result<StreamId> {

0 commit comments

Comments
 (0)