From 3575a07dbb9af00acb28e789b65b5fdd87742f61 Mon Sep 17 00:00:00 2001 From: Taiki Endo Date: Fri, 8 Dec 2023 01:22:51 +0900 Subject: [PATCH] FuturesUnordered: Fix clear implementation --- .../src/stream/futures_unordered/mod.rs | 21 +++-------- .../futures_unordered/ready_to_run_queue.rs | 37 ++++++------------- futures/tests/stream_futures_unordered.rs | 25 +++++++++++++ 3 files changed, 43 insertions(+), 40 deletions(-) diff --git a/futures-util/src/stream/futures_unordered/mod.rs b/futures-util/src/stream/futures_unordered/mod.rs index 0dbaea9080..dedf75dee2 100644 --- a/futures-util/src/stream/futures_unordered/mod.rs +++ b/futures-util/src/stream/futures_unordered/mod.rs @@ -558,20 +558,7 @@ impl Debug for FuturesUnordered { impl FuturesUnordered { /// Clears the set, removing all futures. pub fn clear(&mut self) { - self.clear_head_all(); - - // we just cleared all the tasks, and we have &mut self, so this is safe. - unsafe { self.ready_to_run_queue.clear() }; - - self.is_terminated.store(false, Relaxed); - } - - fn clear_head_all(&mut self) { - while !self.head_all.get_mut().is_null() { - let head = *self.head_all.get_mut(); - let task = unsafe { self.unlink(head) }; - self.release_task(task); - } + *self = Self::new(); } } @@ -581,7 +568,11 @@ impl Drop for FuturesUnordered { // associated with it. At the same time though there may be tons of // wakers flying around which contain `Task` references // inside them. We'll let those naturally get deallocated. - self.clear_head_all(); + while !self.head_all.get_mut().is_null() { + let head = *self.head_all.get_mut(); + let task = unsafe { self.unlink(head) }; + self.release_task(task); + } // Note that at this point we could still have a bunch of tasks in the // ready to run queue. None of those tasks, however, have futures diff --git a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs index 4518705320..a924935d23 100644 --- a/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs +++ b/futures-util/src/stream/futures_unordered/ready_to_run_queue.rs @@ -85,38 +85,25 @@ impl ReadyToRunQueue { pub(super) fn stub(&self) -> *const Task { Arc::as_ptr(&self.stub) } - - // Clear the queue of tasks. - // - // Note that each task has a strong reference count associated with it - // which is owned by the ready to run queue. This method just pulls out - // tasks and drops their refcounts. - // - // # Safety - // - // - All tasks **must** have had their futures dropped already (by FuturesUnordered::clear) - // - The caller **must** guarantee unique access to `self` - pub(crate) unsafe fn clear(&self) { - loop { - // SAFETY: We have the guarantee of mutual exclusion required by `dequeue`. - match self.dequeue() { - Dequeue::Empty => break, - Dequeue::Inconsistent => abort("inconsistent in drop"), - Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), - } - } - } } impl Drop for ReadyToRunQueue { fn drop(&mut self) { // Once we're in the destructor for `Inner` we need to clear out // the ready to run queue of tasks if there's anything left in there. - - // All tasks have had their futures dropped already by the `FuturesUnordered` - // destructor above, and we have &mut self, so this is safe. + // + // Note that each task has a strong reference count associated with it + // which is owned by the ready to run queue. All tasks should have had + // their futures dropped already by the `FuturesUnordered` destructor + // above, so we're just pulling out tasks and dropping their refcounts. unsafe { - self.clear(); + loop { + match self.dequeue() { + Dequeue::Empty => break, + Dequeue::Inconsistent => abort("inconsistent in drop"), + Dequeue::Data(ptr) => drop(Arc::from_raw(ptr)), + } + } } } } diff --git a/futures/tests/stream_futures_unordered.rs b/futures/tests/stream_futures_unordered.rs index 5ce11670c0..36d30e6208 100644 --- a/futures/tests/stream_futures_unordered.rs +++ b/futures/tests/stream_futures_unordered.rs @@ -381,3 +381,28 @@ fn clear() { tasks.clear(); assert!(!tasks.is_terminated()); } + +// https://github.com/rust-lang/futures-rs/issues/2529#issuecomment-997290279 +#[test] +fn clear_in_loop() { + const N: usize = + if cfg!(miri) || option_env!("QEMU_LD_PREFIX").is_some() { 100 } else { 10_000 }; + futures::executor::block_on(async { + async fn task() { + let (s, r) = oneshot::channel(); + std::thread::spawn(|| { + std::thread::sleep(std::time::Duration::from_micros(100)); + let _ = s.send(()); + }); + r.await.unwrap() + } + let mut futures = FuturesUnordered::new(); + for _ in 0..N { + for _ in 0..24 { + futures.push(task()); + } + let _ = futures.next().await; + futures.clear(); + } + }); +}