Skip to content

Commit

Permalink
Don't invoke the RustFutureContinuationCallback while holding the mutex
Browse files Browse the repository at this point in the history
As described in the comments, this could lead to a deadlock if the
foreign code immediately polled the future.  This doesn't currently
happen with any of the core bindings, but it might with other bindings
and it seems like a footgun in general.

To accomplish this, I made a separate class to handle the state and
moved the mutex to only wrap that state.  The state mutation happens
with the mutex locked and the callback invocation happens after it's
unlocked.
  • Loading branch information
bendk committed Dec 13, 2023
1 parent 4f4c989 commit 3f7095b
Show file tree
Hide file tree
Showing 2 changed files with 117 additions and 32 deletions.
14 changes: 7 additions & 7 deletions uniffi_core/src/ffi/rustfuture/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where
// This Mutex should never block if our code is working correctly, since there should not be
// multiple threads calling [Self::poll] and/or [Self::complete] at the same time.
future: Mutex<WrappedFuture<F, T, UT>>,
scheduler: Mutex<Scheduler>,
scheduler: Scheduler,
// UT is used as the generic parameter for [LowerReturn].
// Let's model this with PhantomData as a function that inputs a UT value.
_phantom: PhantomData<fn(UT) -> ()>,
Expand All @@ -218,7 +218,7 @@ where
pub(super) fn new(future: F, _tag: UT) -> Arc<Self> {
Arc::new(Self {
future: Mutex::new(WrappedFuture::new(future)),
scheduler: Mutex::new(Scheduler::new()),
scheduler: Scheduler::new(),
_phantom: PhantomData,
})
}
Expand All @@ -232,20 +232,20 @@ where
if ready {
callback(data, RustFuturePoll::Ready)
} else {
self.scheduler.lock().unwrap().store(callback, data);
self.scheduler.store(callback, data);
}
}

pub(super) fn is_cancelled(&self) -> bool {
self.scheduler.lock().unwrap().is_cancelled()
self.scheduler.is_cancelled()
}

pub(super) fn wake(&self) {
self.scheduler.lock().unwrap().wake();
self.scheduler.wake();
}

pub(super) fn cancel(&self) {
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
}

pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType {
Expand All @@ -254,7 +254,7 @@ where

pub(super) fn free(self: Arc<Self>) {
// Call cancel() to send any leftover data to the continuation callback
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
// Ensure we drop our inner future, releasing all held references
self.future.lock().unwrap().free();
}
Expand Down
135 changes: 110 additions & 25 deletions uniffi_core/src/ffi/rustfuture/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::mem;
use std::{mem, sync::Mutex};

use super::{RustFutureContinuationCallback, RustFuturePoll};

Expand All @@ -20,7 +20,12 @@ use super::{RustFutureContinuationCallback, RustFuturePoll};
/// state, invoking any future callbacks as soon as they're stored.
#[derive(Debug)]
pub(super) enum Scheduler {
pub(super) struct Scheduler {
state: Mutex<SchedulerState>,
}

#[derive(Debug)]
pub(super) enum SchedulerState {
/// No continuations set, neither wake() nor cancel() called.
Empty,
/// `wake()` was called when there was no continuation set. The next time `store` is called,
Expand All @@ -33,58 +38,138 @@ pub(super) enum Scheduler {
Set(RustFutureContinuationCallback, *const ()),
}

impl Scheduler {
pub(super) fn new() -> Self {
Self::Empty
/// Encapsulates a call to a RustFutureContinuationCallback
struct CallbackCall {
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
}

impl CallbackCall {
fn new(
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
) -> Self {
Self {
callback,
data,
poll_data,
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: *const ()) {
fn invoke(self) {
(self.callback)(self.data, self.poll_data)
}
}

/// The SchedulerState impl contains all the ways to mutate the inner state field.
///
/// All methods return an `Option<CallbackCall>` rather than invoking the callback directly.
/// This is important, since the Mutex is locked while inside these methods. If we called the
/// callback directly, the foreign code could poll the future again, which would try to lock the
/// mutex again and lead to a deadlock.
impl SchedulerState {
fn store(
&mut self,
callback: RustFutureContinuationCallback,
data: *const (),
) -> Option<CallbackCall> {
match self {
Self::Empty => *self = Self::Set(callback, data),
Self::Empty => {
*self = Self::Set(callback, data);
None
}
Self::Set(old_callback, old_data) => {
log::error!(
"store: observed `Self::Set` state. Is poll() being called from multiple threads at once?"
"store: observed `SchedulerState::Set` state. Is poll() being called from multiple threads at once?"
);
old_callback(*old_data, RustFuturePoll::Ready);
let call = CallbackCall::new(*old_callback, *old_data, RustFuturePoll::Ready);
*self = Self::Set(callback, data);
Some(call)
}
Self::Waked => {
*self = Self::Empty;
callback(data, RustFuturePoll::MaybeReady);
}
Self::Cancelled => {
callback(data, RustFuturePoll::Ready);
Some(CallbackCall::new(
callback,
data,
RustFuturePoll::MaybeReady,
))
}
Self::Cancelled => Some(CallbackCall::new(callback, data, RustFuturePoll::Ready)),
}
}

pub(super) fn wake(&mut self) {
fn wake(&mut self) -> Option<CallbackCall> {
match self {
// If we had a continuation set, then call it and transition to the `Empty` state.
Self::Set(callback, old_data) => {
SchedulerState::Set(callback, old_data) => {
let old_data = *old_data;
let callback = *callback;
*self = Self::Empty;
callback(old_data, RustFuturePoll::MaybeReady);
*self = SchedulerState::Empty;
Some(CallbackCall::new(
callback,
old_data,
RustFuturePoll::MaybeReady,
))
}
// If we were in the `Empty` state, then transition to `Waked`. The next time `store`
// is called, we will immediately call the continuation.
Self::Empty => *self = Self::Waked,
SchedulerState::Empty => {
*self = SchedulerState::Waked;
None
}
// This is a no-op if we were in the `Cancelled` or `Waked` state.
_ => (),
_ => None,
}
}

fn cancel(&mut self) -> Option<CallbackCall> {
if let SchedulerState::Set(callback, old_data) =
mem::replace(self, SchedulerState::Cancelled)
{
Some(CallbackCall::new(callback, old_data, RustFuturePoll::Ready))
} else {
None
}
}
}

impl Scheduler {
pub(super) fn new() -> Self {
Self {
state: Mutex::new(SchedulerState::Empty),
}
}

pub(super) fn cancel(&mut self) {
if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) {
callback(old_data, RustFuturePoll::Ready);
/// Call a method on the inner state field
///
/// If it returns a callback to invoke, then make the call after releasing the mutex.
fn call_state_method(&self, f: impl Fn(&mut SchedulerState) -> Option<CallbackCall>) {
let mut state = self.state.lock().unwrap();
let callback_call = f(&mut state);
drop(state);
if let Some(callback_call) = callback_call {
callback_call.invoke()
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&self, callback: RustFutureContinuationCallback, data: *const ()) {
self.call_state_method(|state| state.store(callback, data))
}

pub(super) fn wake(&self) {
self.call_state_method(SchedulerState::wake)
}

pub(super) fn cancel(&self) {
self.call_state_method(SchedulerState::cancel)
}

pub(super) fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled)
matches!(*self.state.lock().unwrap(), SchedulerState::Cancelled)
}
}

Expand Down

0 comments on commit 3f7095b

Please sign in to comment.