From 3f7095bec52b98bc2f71b54b38a0094f2d89a38b Mon Sep 17 00:00:00 2001 From: Ben Dean-Kawamura Date: Wed, 13 Dec 2023 15:07:23 -0500 Subject: [PATCH] Don't invoke the RustFutureContinuationCallback while holding the mutex As described in the comments, this could lead to a deadlock if the foreign code immediately polled the future. This doesn't currently happen with any of the core bindings, but it might with other bindings and it seems like a footgun in general. To accomplish this, I made a separate class to handle the state and moved the mutex to only wrap that state. The state mutation happens with the mutex locked and the callback invocation happens after it's unlocked. --- uniffi_core/src/ffi/rustfuture/future.rs | 14 +- uniffi_core/src/ffi/rustfuture/scheduler.rs | 135 ++++++++++++++++---- 2 files changed, 117 insertions(+), 32 deletions(-) diff --git a/uniffi_core/src/ffi/rustfuture/future.rs b/uniffi_core/src/ffi/rustfuture/future.rs index b104b20a32..ed136d38de 100644 --- a/uniffi_core/src/ffi/rustfuture/future.rs +++ b/uniffi_core/src/ffi/rustfuture/future.rs @@ -202,7 +202,7 @@ where // This Mutex should never block if our code is working correctly, since there should not be // multiple threads calling [Self::poll] and/or [Self::complete] at the same time. future: Mutex>, - scheduler: Mutex, + scheduler: Scheduler, // UT is used as the generic parameter for [LowerReturn]. // Let's model this with PhantomData as a function that inputs a UT value. _phantom: PhantomData ()>, @@ -218,7 +218,7 @@ where pub(super) fn new(future: F, _tag: UT) -> Arc { Arc::new(Self { future: Mutex::new(WrappedFuture::new(future)), - scheduler: Mutex::new(Scheduler::new()), + scheduler: Scheduler::new(), _phantom: PhantomData, }) } @@ -232,20 +232,20 @@ where if ready { callback(data, RustFuturePoll::Ready) } else { - self.scheduler.lock().unwrap().store(callback, data); + self.scheduler.store(callback, data); } } pub(super) fn is_cancelled(&self) -> bool { - self.scheduler.lock().unwrap().is_cancelled() + self.scheduler.is_cancelled() } pub(super) fn wake(&self) { - self.scheduler.lock().unwrap().wake(); + self.scheduler.wake(); } pub(super) fn cancel(&self) { - self.scheduler.lock().unwrap().cancel(); + self.scheduler.cancel(); } pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType { @@ -254,7 +254,7 @@ where pub(super) fn free(self: Arc) { // Call cancel() to send any leftover data to the continuation callback - self.scheduler.lock().unwrap().cancel(); + self.scheduler.cancel(); // Ensure we drop our inner future, releasing all held references self.future.lock().unwrap().free(); } diff --git a/uniffi_core/src/ffi/rustfuture/scheduler.rs b/uniffi_core/src/ffi/rustfuture/scheduler.rs index aae5a0c1cf..c2d5c41275 100644 --- a/uniffi_core/src/ffi/rustfuture/scheduler.rs +++ b/uniffi_core/src/ffi/rustfuture/scheduler.rs @@ -2,7 +2,7 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::mem; +use std::{mem, sync::Mutex}; use super::{RustFutureContinuationCallback, RustFuturePoll}; @@ -20,7 +20,12 @@ use super::{RustFutureContinuationCallback, RustFuturePoll}; /// state, invoking any future callbacks as soon as they're stored. #[derive(Debug)] -pub(super) enum Scheduler { +pub(super) struct Scheduler { + state: Mutex, +} + +#[derive(Debug)] +pub(super) enum SchedulerState { /// No continuations set, neither wake() nor cancel() called. Empty, /// `wake()` was called when there was no continuation set. The next time `store` is called, @@ -33,58 +38,138 @@ pub(super) enum Scheduler { Set(RustFutureContinuationCallback, *const ()), } -impl Scheduler { - pub(super) fn new() -> Self { - Self::Empty +/// Encapsulates a call to a RustFutureContinuationCallback +struct CallbackCall { + callback: RustFutureContinuationCallback, + data: *const (), + poll_data: RustFuturePoll, +} + +impl CallbackCall { + fn new( + callback: RustFutureContinuationCallback, + data: *const (), + poll_data: RustFuturePoll, + ) -> Self { + Self { + callback, + data, + poll_data, + } } - /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or - /// `Cancelled` state, call the continuation immediately with the data. - pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: *const ()) { + fn invoke(self) { + (self.callback)(self.data, self.poll_data) + } +} + +/// The SchedulerState impl contains all the ways to mutate the inner state field. +/// +/// All methods return an `Option` rather than invoking the callback directly. +/// This is important, since the Mutex is locked while inside these methods. If we called the +/// callback directly, the foreign code could poll the future again, which would try to lock the +/// mutex again and lead to a deadlock. +impl SchedulerState { + fn store( + &mut self, + callback: RustFutureContinuationCallback, + data: *const (), + ) -> Option { match self { - Self::Empty => *self = Self::Set(callback, data), + Self::Empty => { + *self = Self::Set(callback, data); + None + } Self::Set(old_callback, old_data) => { log::error!( - "store: observed `Self::Set` state. Is poll() being called from multiple threads at once?" + "store: observed `SchedulerState::Set` state. Is poll() being called from multiple threads at once?" ); - old_callback(*old_data, RustFuturePoll::Ready); + let call = CallbackCall::new(*old_callback, *old_data, RustFuturePoll::Ready); *self = Self::Set(callback, data); + Some(call) } Self::Waked => { *self = Self::Empty; - callback(data, RustFuturePoll::MaybeReady); - } - Self::Cancelled => { - callback(data, RustFuturePoll::Ready); + Some(CallbackCall::new( + callback, + data, + RustFuturePoll::MaybeReady, + )) } + Self::Cancelled => Some(CallbackCall::new(callback, data, RustFuturePoll::Ready)), } } - pub(super) fn wake(&mut self) { + fn wake(&mut self) -> Option { match self { // If we had a continuation set, then call it and transition to the `Empty` state. - Self::Set(callback, old_data) => { + SchedulerState::Set(callback, old_data) => { let old_data = *old_data; let callback = *callback; - *self = Self::Empty; - callback(old_data, RustFuturePoll::MaybeReady); + *self = SchedulerState::Empty; + Some(CallbackCall::new( + callback, + old_data, + RustFuturePoll::MaybeReady, + )) } // If we were in the `Empty` state, then transition to `Waked`. The next time `store` // is called, we will immediately call the continuation. - Self::Empty => *self = Self::Waked, + SchedulerState::Empty => { + *self = SchedulerState::Waked; + None + } // This is a no-op if we were in the `Cancelled` or `Waked` state. - _ => (), + _ => None, + } + } + + fn cancel(&mut self) -> Option { + if let SchedulerState::Set(callback, old_data) = + mem::replace(self, SchedulerState::Cancelled) + { + Some(CallbackCall::new(callback, old_data, RustFuturePoll::Ready)) + } else { + None + } + } +} + +impl Scheduler { + pub(super) fn new() -> Self { + Self { + state: Mutex::new(SchedulerState::Empty), } } - pub(super) fn cancel(&mut self) { - if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) { - callback(old_data, RustFuturePoll::Ready); + /// Call a method on the inner state field + /// + /// If it returns a callback to invoke, then make the call after releasing the mutex. + fn call_state_method(&self, f: impl Fn(&mut SchedulerState) -> Option) { + let mut state = self.state.lock().unwrap(); + let callback_call = f(&mut state); + drop(state); + if let Some(callback_call) = callback_call { + callback_call.invoke() } } + /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or + /// `Cancelled` state, call the continuation immediately with the data. + pub(super) fn store(&self, callback: RustFutureContinuationCallback, data: *const ()) { + self.call_state_method(|state| state.store(callback, data)) + } + + pub(super) fn wake(&self) { + self.call_state_method(SchedulerState::wake) + } + + pub(super) fn cancel(&self) { + self.call_state_method(SchedulerState::cancel) + } + pub(super) fn is_cancelled(&self) -> bool { - matches!(self, Self::Cancelled) + matches!(*self.state.lock().unwrap(), SchedulerState::Cancelled) } }