From e170736868d0f916b9c7e2dbd7de5bcc7529f969 Mon Sep 17 00:00:00 2001 From: Evan Rittenhouse Date: Sat, 2 Nov 2024 20:08:29 -0500 Subject: [PATCH] Add extra test with sneaky subscribe() We have to re-open the channel if a new receiver has subscribed after closing the channel. --- tokio/src/sync/broadcast.rs | 5 +++++ tokio/tests/sync_broadcast.rs | 34 ++++++++++++++++++++++++++++++++++ 2 files changed, 39 insertions(+) diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index 07d631cbb8c..f92acf3bbfd 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -876,6 +876,11 @@ fn new_receiver(shared: Arc>) -> Receiver { assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers"); tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + if tail.closed { + // Potentially need to re-open the channel, if a new receiver has been added between calls + // to poll() + tail.closed = false; + } let next = tail.pos; diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index 3c1b41dae1c..2153555694b 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -673,3 +673,37 @@ fn broadcast_sender_closed() { assert!(task.is_woken()); assert_ready!(task.poll()); } + +#[test] +fn broadcast_sender_closed_with_extra_subscribe() { + let (tx, rx) = broadcast::channel::<()>(1); + let rx2 = tx.subscribe(); + + let mut task = task::spawn(tx.closed()); + assert_pending!(task.poll()); + + drop(rx); + assert!(!task.is_woken()); + assert_pending!(task.poll()); + + drop(rx2); + assert!(task.is_woken()); + + let rx3 = tx.subscribe(); + assert_pending!(task.poll()); + + drop(rx3); + assert!(task.is_woken()); + assert_ready!(task.poll()); + + let mut task2 = task::spawn(tx.closed()); + assert_ready!(task2.poll()); + + let rx4 = tx.subscribe(); + let mut task3 = task::spawn(tx.closed()); + assert_pending!(task3.poll()); + + drop(rx4); + assert!(task3.is_woken()); + assert_ready!(task3.poll()); +}