Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix use after free of task in FuturesUnordered when dropped future panics #2886

Merged
merged 2 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 48 additions & 20 deletions futures-util/src/stream/futures_unordered/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -255,16 +255,6 @@ impl<Fut> FuturesUnordered<Fut> {
// `wake` from doing any work in the future
let prev = task.queued.swap(true, SeqCst);

// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}

// If the queued flag was previously set, then it means that this task
// is still in our internal ready to run queue. We then transfer
// ownership of our reference count to the ready to run queue, and it'll
Expand All @@ -276,8 +266,25 @@ impl<Fut> FuturesUnordered<Fut> {
// enqueue the task, so our task will never see the ready to run queue
// again. The task itself will be deallocated once all reference counts
// have been dropped elsewhere by the various wakers that contain it.
if prev {
mem::forget(task);
//
// Use ManuallyDrop to transfer the reference count ownership before
// dropping the future so unwinding won't release the reference count.
let md_slot;
let task = if prev {
md_slot = mem::ManuallyDrop::new(task);
&*md_slot
} else {
&task
};
Comment on lines +270 to +278
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the main fix


// Drop the future, even if it hasn't finished yet. This is safe
// because we're dropping the future on the thread that owns
// `FuturesUnordered`, which correctly tracks `Fut`'s lifetimes and
// such.
unsafe {
// Set to `None` rather than `take()`ing to prevent moving the
// future.
*task.future.get() = None;
}
}

Expand Down Expand Up @@ -566,15 +573,36 @@ impl<Fut> FuturesUnordered<Fut> {

impl<Fut> Drop for FuturesUnordered<Fut> {
fn drop(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
while !self.head_all.get_mut().is_null() {
let head = *self.head_all.get_mut();
let task = unsafe { self.unlink(head) };
self.release_task(task);
// Before the strong reference to the queue is dropped we need all
// futures to be dropped. See note at the bottom of this method.
//
// This ensures we drop all futures even in the case of one drop
// panicking and unwinding.
//
// If a second future panics, that will trigger an abort from panicking
// while panicking.
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If possible, I would prefer to leak the remaining elements after the first panic has occurred. That is the same behavior as in the standard library and crossbeam's queues/channels (rust-lang/rust#108164 (comment)).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure I can change it to that by leaking a clone of the queue Arc, it's simpler.

I wasn't sure what would be ideal here so I just went with the behavior of Vec/struct fields/slices/etc. I don't have a particular preference, but I'm a bit curious about the divergence here. I guess the rationale is to prefer leaking over an abort that stops the whole process? Is there something about queues/channels that makes them special here or is it just that we have the freedom to select this preferred behavior.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm miri is complaining about the leak.

With

MIRIFLAGS="-Zmiri-strict-provenance -Zmiri-symbolic-alignment-check -Zmiri-disable-isolation" cargo +nightly miri nextest run --workspace --all-features -- panic_on_drop_fut

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added a separate miri invocation for the test that ignores leaks

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Btw there might be some potential to make the miri CI faster using nextest

struct DropGuard<'a, Fut>(&'a mut FuturesUnordered<Fut>);
impl<Fut> DropGuard<'_, Fut> {
fn drop_futures(&mut self) {
// When a `FuturesUnordered` is dropped we want to drop all futures
// associated with it. At the same time though there may be tons of
// wakers flying around which contain `Task<Fut>` references
// inside them. We'll let those naturally get deallocated.
while !self.0.head_all.get_mut().is_null() {
let head = *self.0.head_all.get_mut();
let task = unsafe { self.0.unlink(head) };
self.0.release_task(task);
}
}
}
impl<Fut> Drop for DropGuard<'_, Fut> {
fn drop(&mut self) {
self.drop_futures();
}
}
let mut guard = DropGuard(self);
guard.drop_futures();
mem::forget(guard); // no need to check head_all again
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was added to make sure all futures are dropped if one panics. Although it looks like tasks have a check for this in their drop that aborts. At least with this, leaking and aborts may be avoided if only one future panics.


// Note that at this point we could still have a bunch of tasks in the
// ready to run queue. None of those tasks, however, have futures
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@ pub(super) struct ReadyToRunQueue<Fut> {
/// An MPSC queue into which the tasks containing the futures are inserted
/// whenever the future inside is scheduled for polling.
impl<Fut> ReadyToRunQueue<Fut> {
// FIXME: this takes raw pointer without safety conditions.

/// The enqueue function from the 1024cores intrusive MPSC queue algorithm.
pub(super) fn enqueue(&self, task: *const Task<Fut>) {
unsafe {
Expand Down
23 changes: 23 additions & 0 deletions futures/tests/stream_futures_unordered.rs
Original file line number Diff line number Diff line change
Expand Up @@ -406,3 +406,26 @@ fn clear_in_loop() {
}
});
}

// https://github.com/rust-lang/futures-rs/issues/2863#issuecomment-2219441515
#[test]
#[should_panic]
fn panic_on_drop_fut() {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minimal example taken from issue. This failed under miri before the changes to fix it.

struct BadFuture;

impl Drop for BadFuture {
fn drop(&mut self) {
panic!()
}
}

impl Future for BadFuture {
type Output = ();

fn poll(self: Pin<&mut Self>, _cx: &mut Context<'_>) -> Poll<Self::Output> {
Poll::Pending
}
}

FuturesUnordered::default().push(BadFuture);
}