diff --git a/tokio/src/sync/broadcast.rs b/tokio/src/sync/broadcast.rs index c9469c0d3a0..2fecf5cd329 100644 --- a/tokio/src/sync/broadcast.rs +++ b/tokio/src/sync/broadcast.rs @@ -875,6 +875,11 @@ fn new_receiver(shared: Arc>) -> Receiver { assert!(tail.rx_cnt != MAX_RECEIVERS, "max receivers"); tail.rx_cnt = tail.rx_cnt.checked_add(1).expect("overflow"); + if tail.closed { + // Potentially need to re-open the channel, if a new receiver has been added between calls + // to poll() + tail.closed = false; + } let next = tail.pos; diff --git a/tokio/tests/sync_broadcast.rs b/tokio/tests/sync_broadcast.rs index f397782206e..23d9a178cbc 100644 --- a/tokio/tests/sync_broadcast.rs +++ b/tokio/tests/sync_broadcast.rs @@ -657,3 +657,37 @@ fn broadcast_sender_closed() { assert!(task.is_woken()); assert_ready!(task.poll()); } + +#[test] +fn broadcast_sender_closed_with_extra_subscribe() { + let (tx, rx) = broadcast::channel::<()>(1); + let rx2 = tx.subscribe(); + + let mut task = task::spawn(tx.closed()); + assert_pending!(task.poll()); + + drop(rx); + assert!(!task.is_woken()); + assert_pending!(task.poll()); + + drop(rx2); + assert!(task.is_woken()); + + let rx3 = tx.subscribe(); + assert_pending!(task.poll()); + + drop(rx3); + assert!(task.is_woken()); + assert_ready!(task.poll()); + + let mut task2 = task::spawn(tx.closed()); + assert_ready!(task2.poll()); + + let rx4 = tx.subscribe(); + let mut task3 = task::spawn(tx.closed()); + assert_pending!(task3.poll()); + + drop(rx4); + assert!(task3.is_woken()); + assert_ready!(task3.poll()); +}