diff --git a/uniffi_core/src/ffi/rustfuture/future.rs b/uniffi_core/src/ffi/rustfuture/future.rs index b104b20a32..ed136d38de 100644 --- a/uniffi_core/src/ffi/rustfuture/future.rs +++ b/uniffi_core/src/ffi/rustfuture/future.rs @@ -202,7 +202,7 @@ where // This Mutex should never block if our code is working correctly, since there should not be // multiple threads calling [Self::poll] and/or [Self::complete] at the same time. future: Mutex>, - scheduler: Mutex, + scheduler: Scheduler, // UT is used as the generic parameter for [LowerReturn]. // Let's model this with PhantomData as a function that inputs a UT value. _phantom: PhantomData ()>, @@ -218,7 +218,7 @@ where pub(super) fn new(future: F, _tag: UT) -> Arc { Arc::new(Self { future: Mutex::new(WrappedFuture::new(future)), - scheduler: Mutex::new(Scheduler::new()), + scheduler: Scheduler::new(), _phantom: PhantomData, }) } @@ -232,20 +232,20 @@ where if ready { callback(data, RustFuturePoll::Ready) } else { - self.scheduler.lock().unwrap().store(callback, data); + self.scheduler.store(callback, data); } } pub(super) fn is_cancelled(&self) -> bool { - self.scheduler.lock().unwrap().is_cancelled() + self.scheduler.is_cancelled() } pub(super) fn wake(&self) { - self.scheduler.lock().unwrap().wake(); + self.scheduler.wake(); } pub(super) fn cancel(&self) { - self.scheduler.lock().unwrap().cancel(); + self.scheduler.cancel(); } pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType { @@ -254,7 +254,7 @@ where pub(super) fn free(self: Arc) { // Call cancel() to send any leftover data to the continuation callback - self.scheduler.lock().unwrap().cancel(); + self.scheduler.cancel(); // Ensure we drop our inner future, releasing all held references self.future.lock().unwrap().free(); } diff --git a/uniffi_core/src/ffi/rustfuture/scheduler.rs b/uniffi_core/src/ffi/rustfuture/scheduler.rs index aae5a0c1cf..c2d5c41275 100644 --- a/uniffi_core/src/ffi/rustfuture/scheduler.rs +++ b/uniffi_core/src/ffi/rustfuture/scheduler.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::mem; +use std::{mem, sync::Mutex}; use super::{RustFutureContinuationCallback, RustFuturePoll}; @@ -20,7 +20,12 @@ use super::{RustFutureContinuationCallback, RustFuturePoll}; /// state, invoking any future callbacks as soon as they're stored. #[derive(Debug)] -pub(super) enum Scheduler { +pub(super) struct Scheduler { + state: Mutex, +} + +#[derive(Debug)] +pub(super) enum SchedulerState { /// No continuations set, neither wake() nor cancel() called. Empty, /// `wake()` was called when there was no continuation set. The next time `store` is called, @@ -33,58 +38,138 @@ pub(super) enum Scheduler { Set(RustFutureContinuationCallback, *const ()), } -impl Scheduler { - pub(super) fn new() -> Self { - Self::Empty +/// Encapsulates a call to a RustFutureContinuationCallback +struct CallbackCall { + callback: RustFutureContinuationCallback, + data: *const (), + poll_data: RustFuturePoll, +} + +impl CallbackCall { + fn new( + callback: RustFutureContinuationCallback, + data: *const (), + poll_data: RustFuturePoll, + ) -> Self { + Self { + callback, + data, + poll_data, + } } - /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or - /// `Cancelled` state, call the continuation immediately with the data. - pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: *const ()) { + fn invoke(self) { + (self.callback)(self.data, self.poll_data) + } +} + +/// The SchedulerState impl contains all the ways to mutate the inner state field. +/// +/// All methods return an `Option` rather than invoking the callback directly. +/// This is important, since the Mutex is locked while inside these methods. If we called the +/// callback directly, the foreign code could poll the future again, which would try to lock the +/// mutex again and lead to a deadlock. +impl SchedulerState { + fn store( + &mut self, + callback: RustFutureContinuationCallback, + data: *const (), + ) -> Option { match self { - Self::Empty => *self = Self::Set(callback, data), + Self::Empty => { + *self = Self::Set(callback, data); + None + } Self::Set(old_callback, old_data) => { log::error!( - "store: observed `Self::Set` state. Is poll() being called from multiple threads at once?" + "store: observed `SchedulerState::Set` state. Is poll() being called from multiple threads at once?" ); - old_callback(*old_data, RustFuturePoll::Ready); + let call = CallbackCall::new(*old_callback, *old_data, RustFuturePoll::Ready); *self = Self::Set(callback, data); + Some(call) } Self::Waked => { *self = Self::Empty; - callback(data, RustFuturePoll::MaybeReady); - } - Self::Cancelled => { - callback(data, RustFuturePoll::Ready); + Some(CallbackCall::new( + callback, + data, + RustFuturePoll::MaybeReady, + )) } + Self::Cancelled => Some(CallbackCall::new(callback, data, RustFuturePoll::Ready)), } } - pub(super) fn wake(&mut self) { + fn wake(&mut self) -> Option { match self { // If we had a continuation set, then call it and transition to the `Empty` state. - Self::Set(callback, old_data) => { + SchedulerState::Set(callback, old_data) => { let old_data = *old_data; let callback = *callback; - *self = Self::Empty; - callback(old_data, RustFuturePoll::MaybeReady); + *self = SchedulerState::Empty; + Some(CallbackCall::new( + callback, + old_data, + RustFuturePoll::MaybeReady, + )) } // If we were in the `Empty` state, then transition to `Waked`. The next time `store` // is called, we will immediately call the continuation. - Self::Empty => *self = Self::Waked, + SchedulerState::Empty => { + *self = SchedulerState::Waked; + None + } // This is a no-op if we were in the `Cancelled` or `Waked` state. - _ => (), + _ => None, + } + } + + fn cancel(&mut self) -> Option { + if let SchedulerState::Set(callback, old_data) = + mem::replace(self, SchedulerState::Cancelled) + { + Some(CallbackCall::new(callback, old_data, RustFuturePoll::Ready)) + } else { + None + } + } +} + +impl Scheduler { + pub(super) fn new() -> Self { + Self { + state: Mutex::new(SchedulerState::Empty), } } - pub(super) fn cancel(&mut self) { - if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) { - callback(old_data, RustFuturePoll::Ready); + /// Call a method on the inner state field + /// + /// If it returns a callback to invoke, then make the call after releasing the mutex. + fn call_state_method(&self, f: impl Fn(&mut SchedulerState) -> Option) { + let mut state = self.state.lock().unwrap(); + let callback_call = f(&mut state); + drop(state); + if let Some(callback_call) = callback_call { + callback_call.invoke() } } + /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or + /// `Cancelled` state, call the continuation immediately with the data. + pub(super) fn store(&self, callback: RustFutureContinuationCallback, data: *const ()) { + self.call_state_method(|state| state.store(callback, data)) + } + + pub(super) fn wake(&self) { + self.call_state_method(SchedulerState::wake) + } + + pub(super) fn cancel(&self) { + self.call_state_method(SchedulerState::cancel) + } + pub(super) fn is_cancelled(&self) -> bool { - matches!(self, Self::Cancelled) + matches!(*self.state.lock().unwrap(), SchedulerState::Cancelled) } }