Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't invoke the RustFutureContinuationCallback while holding the mutex #1901

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 7 additions & 7 deletions uniffi_core/src/ffi/rustfuture/future.rs
Original file line number Diff line number Diff line change
Expand Up @@ -202,7 +202,7 @@ where
// This Mutex should never block if our code is working correctly, since there should not be
// multiple threads calling [Self::poll] and/or [Self::complete] at the same time.
future: Mutex<WrappedFuture<F, T, UT>>,
scheduler: Mutex<Scheduler>,
scheduler: Scheduler,
// UT is used as the generic parameter for [LowerReturn].
// Let's model this with PhantomData as a function that inputs a UT value.
_phantom: PhantomData<fn(UT) -> ()>,
Expand All @@ -218,7 +218,7 @@ where
pub(super) fn new(future: F, _tag: UT) -> Arc<Self> {
Arc::new(Self {
future: Mutex::new(WrappedFuture::new(future)),
scheduler: Mutex::new(Scheduler::new()),
scheduler: Scheduler::new(),
_phantom: PhantomData,
})
}
Expand All @@ -232,20 +232,20 @@ where
if ready {
callback(data, RustFuturePoll::Ready)
} else {
self.scheduler.lock().unwrap().store(callback, data);
self.scheduler.store(callback, data);
}
}

pub(super) fn is_cancelled(&self) -> bool {
self.scheduler.lock().unwrap().is_cancelled()
self.scheduler.is_cancelled()
}

pub(super) fn wake(&self) {
self.scheduler.lock().unwrap().wake();
self.scheduler.wake();
}

pub(super) fn cancel(&self) {
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
}

pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType {
Expand All @@ -254,7 +254,7 @@ where

pub(super) fn free(self: Arc<Self>) {
// Call cancel() to send any leftover data to the continuation callback
self.scheduler.lock().unwrap().cancel();
self.scheduler.cancel();
// Ensure we drop our inner future, releasing all held references
self.future.lock().unwrap().free();
}
Expand Down
135 changes: 110 additions & 25 deletions uniffi_core/src/ffi/rustfuture/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
* License, v. 2.0. If a copy of the MPL was not distributed with this
* file, You can obtain one at http://mozilla.org/MPL/2.0/. */

use std::mem;
use std::{mem, sync::Mutex};

use super::{RustFutureContinuationCallback, RustFuturePoll};

Expand All @@ -20,7 +20,12 @@ use super::{RustFutureContinuationCallback, RustFuturePoll};
/// state, invoking any future callbacks as soon as they're stored.

#[derive(Debug)]
pub(super) enum Scheduler {
pub(super) struct Scheduler {
state: Mutex<SchedulerState>,
}

#[derive(Debug)]
pub(super) enum SchedulerState {
/// No continuations set, neither wake() nor cancel() called.
Empty,
/// `wake()` was called when there was no continuation set. The next time `store` is called,
Expand All @@ -33,58 +38,138 @@ pub(super) enum Scheduler {
Set(RustFutureContinuationCallback, *const ()),
}

impl Scheduler {
pub(super) fn new() -> Self {
Self::Empty
/// Encapsulates a call to a RustFutureContinuationCallback
struct CallbackCall {
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
}

impl CallbackCall {
fn new(
callback: RustFutureContinuationCallback,
data: *const (),
poll_data: RustFuturePoll,
) -> Self {
Self {
callback,
data,
poll_data,
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: *const ()) {
fn invoke(self) {
(self.callback)(self.data, self.poll_data)
}
}

/// The SchedulerState impl contains all the ways to mutate the inner state field.
///
/// All methods return an `Option<CallbackCall>` rather than invoking the callback directly.
/// This is important, since the Mutex is locked while inside these methods. If we called the
/// callback directly, the foreign code could poll the future again, which would try to lock the
/// mutex again and lead to a deadlock.
impl SchedulerState {
fn store(
&mut self,
callback: RustFutureContinuationCallback,
data: *const (),
) -> Option<CallbackCall> {
match self {
Self::Empty => *self = Self::Set(callback, data),
Self::Empty => {
*self = Self::Set(callback, data);
None
}
Self::Set(old_callback, old_data) => {
log::error!(
"store: observed `Self::Set` state. Is poll() being called from multiple threads at once?"
"store: observed `SchedulerState::Set` state. Is poll() being called from multiple threads at once?"
);
old_callback(*old_data, RustFuturePoll::Ready);
let call = CallbackCall::new(*old_callback, *old_data, RustFuturePoll::MaybeReady);
*self = Self::Set(callback, data);
Some(call)
}
Self::Waked => {
*self = Self::Empty;
callback(data, RustFuturePoll::MaybeReady);
}
Self::Cancelled => {
callback(data, RustFuturePoll::Ready);
Some(CallbackCall::new(
callback,
data,
RustFuturePoll::MaybeReady,
))
}
Self::Cancelled => Some(CallbackCall::new(callback, data, RustFuturePoll::Ready)),
}
}

pub(super) fn wake(&mut self) {
fn wake(&mut self) -> Option<CallbackCall> {
match self {
// If we had a continuation set, then call it and transition to the `Empty` state.
Self::Set(callback, old_data) => {
SchedulerState::Set(callback, old_data) => {
let old_data = *old_data;
let callback = *callback;
*self = Self::Empty;
callback(old_data, RustFuturePoll::MaybeReady);
*self = SchedulerState::Empty;
Some(CallbackCall::new(
callback,
old_data,
RustFuturePoll::MaybeReady,
))
}
// If we were in the `Empty` state, then transition to `Waked`. The next time `store`
// is called, we will immediately call the continuation.
Self::Empty => *self = Self::Waked,
SchedulerState::Empty => {
*self = SchedulerState::Waked;
None
}
// This is a no-op if we were in the `Cancelled` or `Waked` state.
_ => (),
_ => None,
}
}

fn cancel(&mut self) -> Option<CallbackCall> {
if let SchedulerState::Set(callback, old_data) =
mem::replace(self, SchedulerState::Cancelled)
{
Some(CallbackCall::new(callback, old_data, RustFuturePoll::Ready))
} else {
None
}
}
}

impl Scheduler {
pub(super) fn new() -> Self {
Self {
state: Mutex::new(SchedulerState::Empty),
}
}

pub(super) fn cancel(&mut self) {
if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) {
callback(old_data, RustFuturePoll::Ready);
/// Call a method on the inner state field
///
/// If it returns a callback to invoke, then make the call after releasing the mutex.
fn call_state_method(&self, f: impl Fn(&mut SchedulerState) -> Option<CallbackCall>) {
let mut state = self.state.lock().unwrap();
let callback_call = f(&mut state);
drop(state);
if let Some(callback_call) = callback_call {
callback_call.invoke()
}
}

/// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or
/// `Cancelled` state, call the continuation immediately with the data.
pub(super) fn store(&self, callback: RustFutureContinuationCallback, data: *const ()) {
self.call_state_method(|state| state.store(callback, data))
}

pub(super) fn wake(&self) {
self.call_state_method(SchedulerState::wake)
}

pub(super) fn cancel(&self) {
self.call_state_method(SchedulerState::cancel)
}

pub(super) fn is_cancelled(&self) -> bool {
matches!(self, Self::Cancelled)
matches!(*self.state.lock().unwrap(), SchedulerState::Cancelled)
}
}

Expand Down