From 5cabadaaaae03cbee1137a2a50f91528b2817523 Mon Sep 17 00:00:00 2001 From: Ben Dean-Kawamura Date: Thu, 2 Nov 2023 15:57:05 -0400 Subject: [PATCH] Async blocking task support Replaced `ForeignExecutor` with `BlockingTaskQueue`. BlockingTaskQueue allows a Rust closure to be scheduled on a foreign thread where blocking operations are okay. The closure runs inside the parent future, which is nice because it allows the closure to reference its outside scope. Added new tests for this in the futures fixtures. Updated the tests to check that handles are being released properly. TODO: implement dropping BackgroundQueue and releasing the handle. --- Cargo.lock | 88 +++- Cargo.toml | 1 - docs/manual/src/futures.md | 60 ++- fixtures/foreign-executor/Cargo.toml | 19 - fixtures/foreign-executor/build.rs | 7 - .../foreign-executor/src/foreign_executor.udl | 7 - fixtures/foreign-executor/src/lib.rs | 71 --- .../tests/bindings/test_foreign_executor.kts | 39 -- .../tests/bindings/test_foreign_executor.py | 50 -- .../bindings/test_foreign_executor.swift | 42 -- .../tests/test_generated_bindings.rs | 7 - fixtures/futures/Cargo.toml | 1 + fixtures/futures/src/lib.rs | 51 ++ .../futures/tests/bindings/test_futures.kts | 66 ++- .../futures/tests/bindings/test_futures.py | 55 ++ .../futures/tests/bindings/test_futures.swift | 123 ++--- fixtures/metadata/src/tests.rs | 2 +- .../gen_kotlin/blocking_task_queue.rs} | 9 +- .../src/bindings/kotlin/gen_kotlin/mod.rs | 7 +- .../src/bindings/kotlin/templates/Async.kt | 62 ++- .../templates/BlockingTaskQueueTemplate.kt | 23 + .../templates/ForeignExecutorTemplate.kt | 78 --- .../bindings/kotlin/templates/HandleMap.kt | 3 + .../src/bindings/kotlin/templates/Helpers.kt | 2 +- .../src/bindings/kotlin/templates/Types.kt | 6 +- .../gen_python/blocking_task_queue.rs} | 14 +- .../src/bindings/python/gen_python/mod.rs | 5 +- .../src/bindings/python/templates/Async.py | 73 ++- .../templates/BlockingTaskQueueTemplate.py | 21 + .../templates/ForeignExecutorTemplate.py | 63 --- .../bindings/python/templates/HandleMap.py | 3 + .../src/bindings/python/templates/Helpers.py | 3 +- .../templates/NamespaceLibraryTemplate.py | 21 - .../src/bindings/python/templates/Types.py | 4 +- .../src/bindings/python/templates/wrapper.py | 1 + .../src/bindings/ruby/gen_ruby/mod.rs | 11 +- .../{executor.rs => blocking_task_queue.rs} | 14 +- .../src/bindings/swift/gen_swift/mod.rs | 10 +- .../src/bindings/swift/templates/Async.swift | 79 ++- .../templates/BlockingTaskQueueTemplate.swift | 28 ++ .../swift/templates/BridgingHeaderTemplate.h | 14 +- .../templates/ForeignExecutorTemplate.swift | 69 --- .../bindings/swift/templates/HandleMap.swift | 6 + .../src/bindings/swift/templates/Types.swift | 4 +- uniffi_bindgen/src/interface/ffi.rs | 4 +- uniffi_bindgen/src/interface/mod.rs | 32 +- uniffi_bindgen/src/interface/universe.rs | 5 +- uniffi_bindgen/src/scaffolding/mod.rs | 2 +- uniffi_core/src/ffi/foreigncallbacks.rs | 25 +- uniffi_core/src/ffi/foreignexecutor.rs | 473 ------------------ uniffi_core/src/ffi/mod.rs | 2 - uniffi_core/src/ffi/rustfuture/future.rs | 65 ++- uniffi_core/src/ffi/rustfuture/mod.rs | 34 +- uniffi_core/src/ffi/rustfuture/scheduler.rs | 190 ++++++- uniffi_core/src/ffi/rustfuture/tests.rs | 123 ++++- uniffi_core/src/ffi_converter_impls.rs | 26 +- uniffi_core/src/metadata.rs | 2 +- uniffi_macros/src/setup_scaffolding.rs | 13 +- uniffi_meta/src/metadata.rs | 2 +- uniffi_meta/src/reader.rs | 2 +- uniffi_meta/src/types.rs | 2 +- uniffi_udl/src/resolver.rs | 2 +- 62 files changed, 1011 insertions(+), 1315 deletions(-) delete mode 100644 fixtures/foreign-executor/Cargo.toml delete mode 100644 fixtures/foreign-executor/build.rs delete mode 100644 fixtures/foreign-executor/src/foreign_executor.udl delete mode 100644 fixtures/foreign-executor/src/lib.rs delete mode 100644 fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts delete mode 100644 fixtures/foreign-executor/tests/bindings/test_foreign_executor.py delete mode 100644 fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift delete mode 100644 fixtures/foreign-executor/tests/test_generated_bindings.rs rename uniffi_bindgen/src/bindings/{python/gen_python/executor.rs => kotlin/gen_kotlin/blocking_task_queue.rs} (61%) create mode 100644 uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt delete mode 100644 uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt rename uniffi_bindgen/src/bindings/{kotlin/gen_kotlin/executor.rs => python/gen_python/blocking_task_queue.rs} (51%) create mode 100644 uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py delete mode 100644 uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py rename uniffi_bindgen/src/bindings/swift/gen_swift/{executor.rs => blocking_task_queue.rs} (51%) create mode 100644 uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift delete mode 100644 uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift delete mode 100644 uniffi_core/src/ffi/foreignexecutor.rs diff --git a/Cargo.lock b/Cargo.lock index a3a018f69b..9ed8ce0b0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,26 +612,53 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541" +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" + +[[package]] +name = "futures-executor" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -648,6 +675,47 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" + +[[package]] +name = "futures-task" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" + +[[package]] +name = "futures-util" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generator" version = "0.7.5" @@ -1737,17 +1805,11 @@ dependencies = [ "url", ] -[[package]] -name = "uniffi-fixture-foreign-executor" -version = "0.23.0" -dependencies = [ - "uniffi", -] - [[package]] name = "uniffi-fixture-futures" version = "0.21.0" dependencies = [ + "futures", "once_cell", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 4d919af4a0..c6e51a03bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ members = [ "fixtures/ext-types/lib", "fixtures/ext-types/proc-macro-lib", - "fixtures/foreign-executor", "fixtures/keywords/kotlin", "fixtures/keywords/rust", "fixtures/keywords/swift", diff --git a/docs/manual/src/futures.md b/docs/manual/src/futures.md index ba93bae87a..42fec50690 100644 --- a/docs/manual/src/futures.md +++ b/docs/manual/src/futures.md @@ -46,4 +46,62 @@ In Rust `Future` terminology this means the foreign bindings supply the "executo There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read. -See the [foreign-executor fixture](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/foreign-executor) for more implementation details. +## Blocking tasks + +Rust executors are designed around an assumption that the `Future::poll` function will return quickly. +This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads. +Foreign executors make similar assumptions and sometimes more extreme ones. +For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing. + +This raises the question of how async code can interact with blocking code. +"blocking" here means code that preforms blocking IO, long-running computations without `await` breaks, etc. +To support this, UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block. + +On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code. +It's `run_blocking` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function. +It inputs a closure and runs it in the `BlockingTaskQueue`. +This closure can reference the outside scope (it does not need to be `'static`). +For example: + +```rust +#[derive(uniffi::Object)] +struct DataStore { + // Used to run blocking tasks + queue: uniffi::BlockingTaskQueue, + // Low-level DB object with blocking methods + db: Mutex, +} + +#[uniffi::export] +impl DataStore { + #[uniffi::constructor] + fn new(queue: uniffi::BlockingTaskQueue) -> Self { + Self { + queue, + db: Mutex::new(Database::new()) + } + } + + fn fetch_all_items(&self) -> Vec { + self.queue.run_blocking(|| self.db.lock().fetch_all_items()) + } +} +``` + +On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class. + +### Kotlin +Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`. +Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice. +A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`. + +### Swift +Swift uses `DispatchQueue` for its `BlockingTaskQueue`. +The `DispatchQueue` should be concurrent for all in almost all circumstances -- the user-initiated global queue is normally a good choice. +A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`. + +### Python + +Python uses a `futures.Executor` for its `BlockingTaskQueue`. +`ThreadPoolExecutor` is typically a good choice. +A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`. diff --git a/fixtures/foreign-executor/Cargo.toml b/fixtures/foreign-executor/Cargo.toml deleted file mode 100644 index 57d7f4fd2b..0000000000 --- a/fixtures/foreign-executor/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "uniffi-fixture-foreign-executor" -version = "0.23.0" -edition = "2021" -license = "MPL-2.0" -publish = false - -[lib] -crate-type = ["lib", "cdylib"] -name = "uniffi_fixture_foreign_executor" - -[dependencies] -uniffi = { path = "../../uniffi", version = "0.25" } - -[build-dependencies] -uniffi = { path = "../../uniffi", version = "0.25", features = ["build"] } - -[dev-dependencies] -uniffi = { path = "../../uniffi", version = "0.25", features = ["bindgen-tests"] } diff --git a/fixtures/foreign-executor/build.rs b/fixtures/foreign-executor/build.rs deleted file mode 100644 index 908761bdea..0000000000 --- a/fixtures/foreign-executor/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -fn main() { - uniffi::generate_scaffolding("src/foreign_executor.udl").unwrap(); -} diff --git a/fixtures/foreign-executor/src/foreign_executor.udl b/fixtures/foreign-executor/src/foreign_executor.udl deleted file mode 100644 index 2933e56c6a..0000000000 --- a/fixtures/foreign-executor/src/foreign_executor.udl +++ /dev/null @@ -1,7 +0,0 @@ -namespace fixture_foreign_executor { }; - -interface ForeignExecutorTester { - constructor(ForeignExecutor executor); - [Name=new_from_sequence] - constructor(sequence executors); -}; diff --git a/fixtures/foreign-executor/src/lib.rs b/fixtures/foreign-executor/src/lib.rs deleted file mode 100644 index 20d788addf..0000000000 --- a/fixtures/foreign-executor/src/lib.rs +++ /dev/null @@ -1,71 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Instant; -use uniffi::ForeignExecutor; - -pub struct ForeignExecutorTester { - executor: ForeignExecutor, - last_result: Arc>>, -} - -// All constructor have to be defined in UDL for now -impl ForeignExecutorTester { - fn new(executor: ForeignExecutor) -> Self { - Self { - executor, - last_result: Arc::new(Mutex::new(None)), - } - } - - // Test inputting the ForeignExecutor from a Vec. This tests that they can be written to a - // `RustBuffer` - fn new_from_sequence(executors: Vec) -> Self { - assert_eq!(executors.len(), 1); - Self::new(executors.into_iter().next().unwrap()) - } -} - -#[uniffi::export] -impl ForeignExecutorTester { - /// Schedule a fire-and-forget task to run the test - fn schedule_test(&self, delay: u32) { - let last_result = self.last_result.clone(); - *last_result.lock().unwrap() = None; - // Start a thread to schedule the call. This tests if the foreign bindings can handle - // schedule callbacks from a thread that they don't manage. - thread::scope(move |scope| { - scope.spawn(move || { - let start_time = Instant::now(); - let initial_thread_id = thread::current().id(); - // Schedule a call with the foreign executor - self.executor.schedule(delay, move || { - // Return data on when/where the call happened. We check that this matches the - // expectations in the foreign bindings tests - let call_happened_in_different_thread = - thread::current().id() != initial_thread_id; - let delay_ms = start_time.elapsed().as_millis() as u32; - *last_result.lock().unwrap() = Some(TestResult { - call_happened_in_different_thread, - delay_ms, - }); - }); - }); - }); - } - - fn get_last_result(&self) -> Option { - self.last_result.lock().unwrap().take() - } -} - -#[derive(uniffi::Record)] -pub struct TestResult { - pub call_happened_in_different_thread: bool, - pub delay_ms: u32, -} - -uniffi::include_scaffolding!("foreign_executor"); diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts deleted file mode 100644 index 404c06a463..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts +++ /dev/null @@ -1,39 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import uniffi.fixture_foreign_executor.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking - - -val coroutineScope = CoroutineScope(Dispatchers.IO) -// Test scheduling calls with no delay -runBlocking { - val tester = ForeignExecutorTester(coroutineScope) - launch { - tester.scheduleTest(0U) - } - delay(100L) - val result = tester.getLastResult() ?: throw RuntimeException("ForeignExecutorTester.getLastResult() returned null") - assert(result.callHappenedInDifferentThread) - assert(result.delayMs <= 100U) - tester.close() -} - -// Test scheduling calls with a delay and using the newFromSequence constructor -runBlocking { - val tester = ForeignExecutorTester.newFromSequence(listOf(coroutineScope)) - launch { - tester.scheduleTest(100U) - } - delay(200L) - val result = tester.getLastResult() ?: throw RuntimeException("ForeignExecutorTester.getLastResult() returned null") - assert(result.callHappenedInDifferentThread) - assert(result.delayMs >= 100U) - assert(result.delayMs <= 200U) - tester.close() -} diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py deleted file mode 100644 index fdd06be520..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py +++ /dev/null @@ -1,50 +0,0 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import asyncio -import unittest -import weakref -from fixture_foreign_executor import ForeignExecutorTester - -class TestForeignExecutor(unittest.TestCase): - def test_schedule(self): - async def run_test(constructor, delay): - if constructor == "primary": - tester = ForeignExecutorTester(asyncio.get_running_loop()) - elif constructor == "new_from_sequence": - tester = ForeignExecutorTester.new_from_sequence([asyncio.get_running_loop()]) - else: - raise AssertionError(f"Unknown constructor: {constructor}") - tester.schedule_test(delay) - await asyncio.sleep((delay / 1000) + 0.1) - return tester.get_last_result() - - # Test no delay and lifting the foreign executor directly - result = asyncio.run(run_test("primary", 0)) - self.assertTrue(result.call_happened_in_different_thread) - self.assertTrue(result.delay_ms <= 1) - - # Test no delay and reading the foreign executor from a list - result = asyncio.run(run_test("new_from_sequence", 10)) - self.assertTrue(result.call_happened_in_different_thread) - self.assertTrue(9 <= result.delay_ms <= 11) - - def test_reference_counts(self): - # Create an event loop - loop = asyncio.new_event_loop() - loop_ref = weakref.ref(loop) - # Create ForeignExecutorTester that stores the loop - tester = ForeignExecutorTester(loop) - tester2 = ForeignExecutorTester.new_from_sequence([loop]), - # Test that testers hold a reference to the loop. After deleting the loop, the weakref should still be alive - loop.close() - del loop - self.assertNotEqual(loop_ref(), None, "ForeignExecutor didn't take a reference to the event loop") - # Deleting testers should cause the loop to be destroyed - del tester - del tester2 - self.assertEqual(loop_ref(), None, "ForeignExecutor didn't release a reference to the event loop") - -if __name__=='__main__': - unittest.main() diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift deleted file mode 100644 index 4a230d0f60..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift +++ /dev/null @@ -1,42 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import Foundation -import fixture_foreign_executor - -func runTest(tester: ForeignExecutorTester, delay: UInt32) async -> TestResult { - let handle = Task { () -> TestResult in - tester.scheduleTest(delay: delay) - try! await Task.sleep(nanoseconds: numericCast((delay + 10) * 1000000)) - return tester.getLastResult()! - } - return await handle.value -} - -Task { - // Test scheduling with no delay - let result = await runTest( - tester: ForeignExecutorTester( - executor: UniFfiForeignExecutor(priority: TaskPriority.background) - ), - delay: 0 - ) - assert(result.callHappenedInDifferentThread) - assert(result.delayMs <= 1) - - // Test scheduling with delay and an executor created from a list - let result2 = await runTest( - tester: ForeignExecutorTester.newFromSequence( - executors: [UniFfiForeignExecutor(priority: TaskPriority.background)] - ), - delay: 1000 - ) - assert(result2.callHappenedInDifferentThread) - assert(result2.delayMs >= 90) - assert(result2.delayMs <= 110) -} - - - -// No need to test reference counting, since `UniFfiForeignExecutor` on Swift is just a value type diff --git a/fixtures/foreign-executor/tests/test_generated_bindings.rs b/fixtures/foreign-executor/tests/test_generated_bindings.rs deleted file mode 100644 index 5ff08c627a..0000000000 --- a/fixtures/foreign-executor/tests/test_generated_bindings.rs +++ /dev/null @@ -1,7 +0,0 @@ -uniffi::build_foreign_language_testcases!( - "tests/bindings/test_foreign_executor.py", - "tests/bindings/test_foreign_executor.kts", - // Disabled because of intermittent CI failures - // (https://github.com/mozilla/uniffi-rs/issues/1536) - // "tests/bindings/test_foreign_executor.swift", -); diff --git a/fixtures/futures/Cargo.toml b/fixtures/futures/Cargo.toml index 78b08cb689..67c850e9f0 100644 --- a/fixtures/futures/Cargo.toml +++ b/fixtures/futures/Cargo.toml @@ -16,6 +16,7 @@ path = "src/bin.rs" [dependencies] uniffi = { path = "../../uniffi", version = "0.25", features = ["tokio", "cli"] } +futures = "0.3.29" thiserror = "1.0" tokio = { version = "1.24.1", features = ["time", "sync"] } once_cell = "1.18.0" diff --git a/fixtures/futures/src/lib.rs b/fixtures/futures/src/lib.rs index 39a521495e..b3be27f551 100644 --- a/fixtures/futures/src/lib.rs +++ b/fixtures/futures/src/lib.rs @@ -11,6 +11,8 @@ use std::{ time::Duration, }; +use futures::stream::{FuturesUnordered, StreamExt}; + /// Non-blocking timer future. pub struct TimerFuture { shared_state: Arc>, @@ -326,4 +328,53 @@ pub async fn use_shared_resource(options: SharedResourceOptions) -> Result<(), A Ok(()) } +/// Async function that uses a blocking task queue to do its work +#[uniffi::export] +pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 { + queue.run_blocking(|| value * value).await +} + +/// Same as before, but this one runs multiple tasks +#[uniffi::export] +pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec) -> Vec { + // Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with. + // In particular, if we don't notify the waker then FuturesUnordered will not poll again. + let mut futures: FuturesUnordered<_> = (0..items.len()) + .into_iter() + .map(|i| { + // Test that we can use references from the surrounding scope + let items = &items; + queue.run_blocking(move || items[i] * items[i]) + }) + .collect(); + let mut results = vec![]; + while let Some(result) = futures.next().await { + results.push(result); + } + results.sort(); + results +} + +/// ...and this one uses multiple BlockingTaskQueues +#[uniffi::export] +pub async fn calc_squares_multi_queue( + queues: Vec, + items: Vec, +) -> Vec { + let mut futures: FuturesUnordered<_> = (0..items.len()) + .into_iter() + .map(|i| { + // Test that we can use references from the surrounding scope + let items = &items; + queues[i].run_blocking(move || items[i] * items[i]) + }) + .collect(); + let mut results = vec![]; + while let Some(result) = futures.next().await { + results.push(result); + } + results.sort(); + results +} + uniffi::include_scaffolding!("futures"); diff --git a/fixtures/futures/tests/bindings/test_futures.kts b/fixtures/futures/tests/bindings/test_futures.kts index 810bb40f41..61538b720c 100644 --- a/fixtures/futures/tests/bindings/test_futures.kts +++ b/fixtures/futures/tests/bindings/test_futures.kts @@ -1,9 +1,22 @@ import uniffi.fixture.futures.* +import java.util.concurrent.Executors import kotlinx.coroutines.* import kotlin.system.* +fun runAsyncTest(test: suspend CoroutineScope.() -> Unit) { + val initialBlockingTaskQueueHandleCount = uniffiBlockingTaskQueueHandleCount() + val initialPollHandleCount = uniffiPollHandleCount() + val time = runBlocking { + measureTimeMillis { + test() + } + } + assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueHandleCount) + assert(uniffiPollHandleCount() == initialPollHandleCount) +} + // init UniFFI to get good measurements after that -runBlocking { +runAsyncTest { val time = measureTimeMillis { alwaysReady() } @@ -24,7 +37,7 @@ fun assertApproximateTime(actualTime: Long, expectedTime: Int, testName: String } // Test `always_ready`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = alwaysReady() @@ -35,7 +48,7 @@ runBlocking { } // Test `void`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = void() @@ -46,7 +59,7 @@ runBlocking { } // Test `sleep`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { sleep(200U) } @@ -55,7 +68,7 @@ runBlocking { } // Test sequential futures. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = sayAfter(100U, "Alice") val resultBob = sayAfter(200U, "Bob") @@ -68,7 +81,7 @@ runBlocking { } // Test concurrent futures. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = async { sayAfter(100U, "Alice") } val resultBob = async { sayAfter(200U, "Bob") } @@ -81,7 +94,7 @@ runBlocking { } // Test async methods. -runBlocking { +runAsyncTest { val megaphone = newMegaphone() val time = measureTimeMillis { val resultAlice = megaphone.sayAfter(200U, "Alice") @@ -92,7 +105,7 @@ runBlocking { assertApproximateTime(time, 200, "async methods") } -runBlocking { +runAsyncTest { val megaphone = newMegaphone() val time = measureTimeMillis { val resultAlice = sayAfterWithMegaphone(megaphone, 200U, "Alice") @@ -104,7 +117,7 @@ runBlocking { } // Test async method returning optional object -runBlocking { +runAsyncTest { val megaphone = asyncMaybeNewMegaphone(true) assert(megaphone != null) @@ -113,7 +126,7 @@ runBlocking { } // Test with the Tokio runtime. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = sayAfterWithTokio(200U, "Alice") @@ -124,7 +137,7 @@ runBlocking { } // Test fallible function/method. -runBlocking { +runAsyncTest { val time1 = measureTimeMillis { try { fallibleMe(false) @@ -189,7 +202,7 @@ runBlocking { } // Test record. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = newMyRecord("foo", 42U) @@ -203,7 +216,7 @@ runBlocking { } // Test a broken sleep. -runBlocking { +runAsyncTest { val time = measureTimeMillis { brokenSleep(100U, 0U) // calls the waker twice immediately sleep(100U) // wait for possible failure @@ -217,7 +230,7 @@ runBlocking { // Test a future that uses a lock and that is cancelled. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val job = launch { useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U)) @@ -236,7 +249,7 @@ runBlocking { } // Test a future that uses a lock and that is not cancelled. -runBlocking { +runAsyncTest { val time = measureTimeMillis { useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U)) @@ -244,3 +257,26 @@ runBlocking { } println("useSharedResource (not canceled): ${time}ms") } + +// Test blocking task queues +runAsyncTest { + withTimeout(1000) { + assert(calcSquare(Dispatchers.IO, 20) == 400) + } + + withTimeout(1000) { + assert(calcSquares(Dispatchers.IO, listOf(1, -2, 3)) == listOf(1, 4, 9)) + } + + val executors = listOf( + Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor(), + ) + withTimeout(1000) { + assert(calcSquaresMultiQueue(executors.map { it.asCoroutineDispatcher() }, listOf(1, -2, 3)) == listOf(1, 4, 9)) + } + for (executor in executors) { + executor.shutdown() + } +} diff --git a/fixtures/futures/tests/bindings/test_futures.py b/fixtures/futures/tests/bindings/test_futures.py index bfbeba86f8..3c5f0cec48 100644 --- a/fixtures/futures/tests/bindings/test_futures.py +++ b/fixtures/futures/tests/bindings/test_futures.py @@ -1,25 +1,31 @@ +import futures from futures import * +import contextlib import unittest from datetime import datetime import asyncio +from concurrent.futures import ThreadPoolExecutor def now(): return datetime.now() class TestFutures(unittest.TestCase): def test_always_ready(self): + @self.check_handle_counts() async def test(): self.assertEqual(await always_ready(), True) asyncio.run(test()) def test_void(self): + @self.check_handle_counts() async def test(): self.assertEqual(await void(), None) asyncio.run(test()) def test_sleep(self): + @self.check_handle_counts() async def test(): t0 = now() await sleep(2000) @@ -31,6 +37,7 @@ async def test(): asyncio.run(test()) def test_sequential_futures(self): + @self.check_handle_counts() async def test(): t0 = now() result_alice = await say_after(100, 'Alice') @@ -45,6 +52,7 @@ async def test(): asyncio.run(test()) def test_concurrent_tasks(self): + @self.check_handle_counts() async def test(): alice = asyncio.create_task(say_after(100, 'Alice')) bob = asyncio.create_task(say_after(200, 'Bob')) @@ -62,6 +70,7 @@ async def test(): asyncio.run(test()) def test_async_methods(self): + @self.check_handle_counts() async def test(): megaphone = new_megaphone() t0 = now() @@ -75,6 +84,7 @@ async def test(): asyncio.run(test()) def test_async_object_param(self): + @self.check_handle_counts() async def test(): megaphone = new_megaphone() t0 = now() @@ -88,6 +98,7 @@ async def test(): asyncio.run(test()) def test_with_tokio_runtime(self): + @self.check_handle_counts() async def test(): t0 = now() result_alice = await say_after_with_tokio(200, 'Alice') @@ -100,6 +111,7 @@ async def test(): asyncio.run(test()) def test_fallible(self): + @self.check_handle_counts() async def test(): result = await fallible_me(False) self.assertEqual(result, 42) @@ -124,6 +136,7 @@ async def test(): asyncio.run(test()) def test_fallible_struct(self): + @self.check_handle_counts() async def test(): megaphone = await fallible_struct(False) self.assertEqual(await megaphone.fallible_me(False), 42) @@ -137,6 +150,7 @@ async def test(): asyncio.run(test()) def test_record(self): + @self.check_handle_counts() async def test(): result = await new_my_record("foo", 42) self.assertEqual(result.__class__, MyRecord) @@ -146,6 +160,7 @@ async def test(): asyncio.run(test()) def test_cancel(self): + @self.check_handle_counts() async def test(): # Create a task task = asyncio.create_task(say_after(200, 'Alice')) @@ -163,6 +178,7 @@ async def test(): # Test a future that uses a lock and that is cancelled. def test_shared_resource_cancellation(self): + @self.check_handle_counts() async def test(): task = asyncio.create_task(use_shared_resource( SharedResourceOptions(release_after_ms=5000, timeout_ms=100))) @@ -173,10 +189,49 @@ async def test(): asyncio.run(test()) def test_shared_resource_no_cancellation(self): + @self.check_handle_counts() async def test(): await use_shared_resource(SharedResourceOptions(release_after_ms=100, timeout_ms=1000)) await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) asyncio.run(test()) + # blocking task queue tests + + def test_calc_square(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executor = ThreadPoolExecutor() + self.assertEqual(await calc_square(executor, 20), 400) + asyncio.run(test()) + + def test_calc_squares(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executor = ThreadPoolExecutor() + self.assertEqual(await calc_squares(executor, [1, -2, 3]), [1, 4, 9]) + asyncio.run(test()) + + def test_calc_squares_multi_queue(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executors = [ + ThreadPoolExecutor(), + ThreadPoolExecutor(), + ThreadPoolExecutor(), + ] + self.assertEqual(await calc_squares_multi_queue(executors, [1, -2, 3]), [1, 4, 9]) + asyncio.run(test()) + + @contextlib.asynccontextmanager + async def check_handle_counts(self): + initial_poll_handle_count = len(futures.UNIFFI_POLL_HANDLE_MAP) + initial_blocking_task_queue_handle_count = len(futures.UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP) + yield + self.assertEqual(len(futures.UNIFFI_POLL_HANDLE_MAP), initial_poll_handle_count) + self.assertEqual(len(futures.UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP), initial_blocking_task_queue_handle_count) + if __name__ == '__main__': unittest.main() diff --git a/fixtures/futures/tests/bindings/test_futures.swift b/fixtures/futures/tests/bindings/test_futures.swift index 20e24c40ff..54bc3aea28 100644 --- a/fixtures/futures/tests/bindings/test_futures.swift +++ b/fixtures/futures/tests/bindings/test_futures.swift @@ -3,10 +3,21 @@ import Foundation // To get `DispatchGroup` and `Date` types. var counter = DispatchGroup() -// Test `alwaysReady` -counter.enter() +func asyncTest(test: @escaping () async throws -> ()) { + let initialBlockingTaskQueueCount = uniffiBlockingTaskQueueHandleCount() + let initialPollCount = uniffiPollHandleCount() + counter.enter() + Task { + try! await test() + counter.leave() + } + counter.wait() + assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueCount) + assert(uniffiPollHandleCount() == initialPollCount) +} -Task { +// Test `alwaysReady` +asyncTest { let t0 = Date() let result = await alwaysReady() let t1 = Date() @@ -14,40 +25,28 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration < 0.1) assert(result == true) - - counter.leave() } // Test record. -counter.enter() - -Task { +asyncTest { let result = await newMyRecord(a: "foo", b: 42) assert(result.a == "foo") assert(result.b == 42) - - counter.leave() } // Test `void` -counter.enter() - -Task { +asyncTest { let t0 = Date() await void() let t1 = Date() let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration < 0.1) - - counter.leave() } // Test `Sleep` -counter.enter() - -Task { +asyncTest { let t0 = Date() let result = await sleep(ms: 2000) let t1 = Date() @@ -55,14 +54,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result == true) - - counter.leave() } // Test sequential futures. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result_alice = await sayAfter(ms: 1000, who: "Alice") let result_bob = await sayAfter(ms: 2000, who: "Bob") @@ -72,14 +67,10 @@ Task { assert(tDelta.duration > 3 && tDelta.duration < 3.1) assert(result_alice == "Hello, Alice!") assert(result_bob == "Hello, Bob!") - - counter.leave() } // Test concurrent futures. -counter.enter() - -Task { +asyncTest { async let alice = sayAfter(ms: 1000, who: "Alice") async let bob = sayAfter(ms: 2000, who: "Bob") @@ -91,14 +82,10 @@ Task { assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "Hello, Alice!") assert(result_bob == "Hello, Bob!") - - counter.leave() } // Test async methods -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -108,26 +95,18 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "HELLO, ALICE!") - - counter.leave() } // Test async function returning an object -counter.enter() - -Task { +asyncTest { let megaphone = await asyncNewMegaphone() let result = try await megaphone.fallibleMe(doFail: false) assert(result == 42) - - counter.leave() } // Test with the Tokio runtime. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result_alice = await sayAfterWithTokio(ms: 2000, who: "Alice") let t1 = Date() @@ -135,15 +114,11 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "Hello, Alice (with Tokio)!") - - counter.leave() } // Test fallible function/method… // … which doesn't throw. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result = try await fallibleMe(doFail: false) let t1 = Date() @@ -151,19 +126,15 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) assert(result == 42) - - counter.leave() } -Task { +asyncTest { let m = try await fallibleStruct(doFail: false) let result = try await m.fallibleMe(doFail: false) assert(result == 42) } -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -173,14 +144,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) assert(result == 42) - - counter.leave() } // … which does throw. -counter.enter() - -Task { +asyncTest { let t0 = Date() do { @@ -195,11 +162,9 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) - - counter.leave() } -Task { +asyncTest { do { let _ = try await fallibleStruct(doFail: true) } catch MyError.Foo { @@ -209,9 +174,7 @@ Task { } } -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -228,13 +191,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) - - counter.leave() } // Test a future that uses a lock and that is cancelled. -counter.enter() -Task { +asyncTest { let task = Task { try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000)) } @@ -250,15 +210,30 @@ Task { // Try accessing the shared resource again. The initial task should release the shared resource // before the timeout expires. try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000)) - counter.leave() } // Test a future that uses a lock and that is not cancelled. -counter.enter() -Task { +asyncTest { try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000)) try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000)) - counter.leave() } -counter.wait() +// Test blocking task queues +asyncTest { + let calcSquareResult = await calcSquare(queue: DispatchQueue.global(qos: .userInitiated), value: 20) + assert(calcSquareResult == 400) + + let calcSquaresResult = await calcSquares(queue: DispatchQueue.global(qos: .userInitiated), items: [1, -2, 3]) + assert(calcSquaresResult == [1, 4, 9]) + + let calcSquaresMultiQueueResult = await calcSquaresMultiQueue( + queues: [ + DispatchQueue(label: "test-queue1", attributes: DispatchQueue.Attributes.concurrent), + DispatchQueue(label: "test-queue2", attributes: DispatchQueue.Attributes.concurrent), + DispatchQueue(label: "test-queue3", attributes: DispatchQueue.Attributes.concurrent) + ], + items: [1, -2, 3] + ) + assert(calcSquaresMultiQueueResult == [1, 4, 9]) +} + diff --git a/fixtures/metadata/src/tests.rs b/fixtures/metadata/src/tests.rs index eac852cfea..5fd35be02c 100644 --- a/fixtures/metadata/src/tests.rs +++ b/fixtures/metadata/src/tests.rs @@ -114,7 +114,7 @@ mod test_type_ids { check_type_id::(Type::Float64); check_type_id::(Type::Boolean); check_type_id::(Type::String); - check_type_id::(Type::ForeignExecutor); + check_type_id::(Type::BlockingTaskQueue); } #[test] diff --git a/uniffi_bindgen/src/bindings/python/gen_python/executor.rs b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs similarity index 61% rename from uniffi_bindgen/src/bindings/python/gen_python/executor.rs rename to uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs index 2834d9e2ac..cec719b4b5 100644 --- a/uniffi_bindgen/src/bindings/python/gen_python/executor.rs +++ b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs @@ -5,14 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { fn type_label(&self) -> String { - "asyncio.BaseEventLoop".into() + // Kotlin uses CoroutineContext for BlockingTaskQueue + "CoroutineContext".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs index 7cd8867d14..11aed97137 100644 --- a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs +++ b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs @@ -15,12 +15,12 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; mod error; -mod executor; mod external; mod miscellany; mod object; @@ -298,7 +298,6 @@ impl KotlinCodeOracle { } FfiType::ForeignBytes => "ForeignBytes.ByValue".to_string(), FfiType::ForeignCallback => "ForeignCallback".to_string(), - FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback".to_string(), FfiType::RustFutureContinuationCallback => { "UniFffiRustFutureContinuationCallbackType".to_string() } @@ -364,7 +363,7 @@ impl AsCodeType for T { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } @@ -542,7 +541,7 @@ pub mod filters { ) -> Result { let ffi_func = callable.ffi_rust_future_poll(ci); Ok(format!( - "{{ future, callback, continuation -> _UniFFILib.INSTANCE.{ffi_func}(future, callback, continuation) }}" + "{{ future, callback, continuation, blockingTaskQueue -> _UniFFILib.INSTANCE.{ffi_func}(future, callback, continuation, blockingTaskQueue) }}" )) } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt index f2ee0ba0d5..80ad63585a 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt @@ -1,20 +1,60 @@ // Async return type handlers -internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toShort() -internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toShort() +internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toByte() +internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toByte() -internal val uniffiContinuationHandleMap = UniffiHandleMap>() +/** + * In-progress poll of a RustFuture + * + * Lifecycle of UniffiPoll: + * * UniffiPoll is constructed with a Kotlin continuation that `uniffiRustCallAsync` awaits. + * * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * * poll() is called, which calls the `rust_future_poll()` + * * When rust calls the continuation function we can either: + * * Consume the handle, complete the continuation, and cause `uniffiRustCallAsync` to continue + * * Run another poll in an CoroutineContext (AKA BlockingTaskQueue). + */ +internal class UniffiPoll( + val continuation: CancellableContinuation, + val rustFuture: UniffiHandle, + val pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle, Long) -> Unit, +) { + val handle = uniffiPollHandleMap.newHandle(this) + + internal fun poll(BlockingTaskQueueHandle: UniffiHandle = 0) { + pollFunc(rustFuture, uniffiRustFutureContinuationCallback, handle, BlockingTaskQueueHandle) + } + + internal fun pollInContext(BlockingTaskQueueHandle: UniffiHandle) { + val coroutineContext = uniffiBlockingTaskQueueHandleMap.get(BlockingTaskQueueHandle) + CoroutineScope(coroutineContext).launch { + poll(BlockingTaskQueueHandle) + } + } + + internal fun completeContinuation(pollResult: Byte) { + continuation.resume(pollResult) + } +} + +internal val uniffiPollHandleMap = UniffiHandleMap() // FFI type for Rust future continuations internal object uniffiRustFutureContinuationCallback: UniFffiRustFutureContinuationCallbackType { - override fun callback(continuationHandle: UniffiHandle, pollResult: Short) { - uniffiContinuationHandleMap.consumeHandle(continuationHandle).resume(pollResult) + override fun callback(handle: UniffiHandle, pollResult: Byte, BlockingTaskQueueHandle: Long) { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY || BlockingTaskQueueHandle == 0L) { + val poll = uniffiPollHandleMap.consumeHandle(handle) + poll.completeContinuation(pollResult) + } else { + val poll = uniffiPollHandleMap.get(handle) + poll.pollInContext(BlockingTaskQueueHandle) + } } } internal suspend fun uniffiRustCallAsync( rustFuture: UniffiHandle, - pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle) -> Unit, + pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle, Long) -> Unit, completeFunc: (UniffiHandle, RustCallStatus) -> F, freeFunc: (UniffiHandle) -> Unit, liftFunc: (F) -> T, @@ -22,12 +62,8 @@ internal suspend fun uniffiRustCallAsync( ): T { try { do { - val pollResult = suspendCancellableCoroutine { continuation -> - pollFunc( - rustFuture, - uniffiRustFutureContinuationCallback, - uniffiContinuationHandleMap.newHandle(continuation) - ) + val pollResult = suspendCancellableCoroutine { + UniffiPoll(it, rustFuture, pollFunc).poll() } } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); @@ -39,3 +75,5 @@ internal suspend fun uniffiRustCallAsync( } } +// For testing +public fun uniffiPollHandleCount() = uniffiPollHandleMap.size diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt new file mode 100644 index 0000000000..3541bb948c --- /dev/null +++ b/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt @@ -0,0 +1,23 @@ +{{ self.add_import("kotlin.coroutines.CoroutineContext") }} + +internal val uniffiBlockingTaskQueueHandleMap = UniffiHandleMap() + +public object {{ ffi_converter_name }}: FfiConverter { + + override fun lower(value: CoroutineContext): UniffiHandle = uniffiBlockingTaskQueueHandleMap.newHandle(value) + + override fun lift(value: UniffiHandle): CoroutineContext = uniffiBlockingTaskQueueHandleMap.consumeHandle(value) + + override fun allocationSize(value: {{ type_name }}) = 8 + + override fun read(buf: ByteBuffer): CoroutineContext { + return lift(buf.getLong()) + } + + override fun write(value: CoroutineContext, buf: ByteBuffer) { + buf.putLong(lower(value)) + } +} + +// For testing +public fun uniffiBlockingTaskQueueHandleCount() = uniffiBlockingTaskQueueHandleMap.size diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt deleted file mode 100644 index 90c84b9fc9..0000000000 --- a/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt +++ /dev/null @@ -1,78 +0,0 @@ -{{ self.add_import("kotlinx.coroutines.CoroutineScope") }} -{{ self.add_import("kotlinx.coroutines.delay") }} -{{ self.add_import("kotlinx.coroutines.isActive") }} -{{ self.add_import("kotlinx.coroutines.launch") }} - -internal const val UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0.toByte() -internal const val UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED = 1.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2.toByte() - -// Callback function to execute a Rust task. The Kotlin code schedules these in a coroutine then -// invokes them. -internal interface UniFfiRustTaskCallback : com.sun.jna.Callback { - fun callback(rustTaskData: Pointer?, statusCode: Byte) -} - -internal object UniFfiForeignExecutorCallback : com.sun.jna.Callback { - fun callback(handle: UniffiHandle, delayMs: Int, rustTask: UniFfiRustTaskCallback?, rustTaskData: Pointer?) : Byte { - if (rustTask == null) { - FfiConverterForeignExecutor.drop(handle) - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - val coroutineScope = FfiConverterForeignExecutor.lift(handle) - if (coroutineScope.isActive) { - val job = coroutineScope.launch { - if (delayMs > 0) { - delay(delayMs.toLong()) - } - rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS) - } - job.invokeOnCompletion { cause -> - if (cause != null) { - rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_CANCELLED) - } - } - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED - } - } - } -} - -public object FfiConverterForeignExecutor: FfiConverter { - internal val handleMap = UniffiHandleMap() - - internal fun drop(handle: UniffiHandle) { - handleMap.consumeHandle(handle) - } - - internal fun register(lib: _UniFFILib) { - {%- match ci.ffi_foreign_executor_callback_set() %} - {%- when Some with (fn) %} - lib.{{ fn.name() }}(UniFfiForeignExecutorCallback) - {%- when None %} - {#- No foreign executor, we don't set anything #} - {% endmatch %} - } - - override fun allocationSize(value: CoroutineScope) = 8 - - override fun lift(value: UniffiHandle): CoroutineScope { - return handleMap.get(value) - } - - override fun read(buf: ByteBuffer): CoroutineScope { - return lift(buf.getLong()) - } - - override fun lower(value: CoroutineScope): UniffiHandle { - return handleMap.newHandle(value) - } - - override fun write(value: CoroutineScope, buf: ByteBuffer) { - buf.putLong(lower(value)) - } -} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt b/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt index 427d01f7ca..de323a2e74 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt @@ -39,6 +39,9 @@ internal class UniffiHandleMap { map.remove(key(handle)) ?: throw InternalException("Missing key in handlemap: was the handle used after being freed?") } + val size: Int + get() = map.size + companion object { // Generate map IDs that are likely to be unique private var mapIdCounter: Long = {{ ci.namespace_hash() }}.and(0x7FFF) diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt index e91c8e4119..ff358c5a5a 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt @@ -80,5 +80,5 @@ private inline fun rustCall(callback: (RustCallStatus) -> U): U { // FFI type for Rust future continuations internal interface UniFffiRustFutureContinuationCallbackType : com.sun.jna.Callback { - fun callback(continuationHandle: Long, pollResult: Short); + fun callback(handle: UniffiHandle, pollResult: Byte, BlockingTaskQueueHandle: Long); } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt index 89546eac92..d693df249d 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt @@ -82,8 +82,8 @@ {%- when Type::CallbackInterface { module_path, name } %} {% include "CallbackInterfaceTemplate.kt" %} -{%- when Type::ForeignExecutor %} -{% include "ForeignExecutorTemplate.kt" %} +{%- when Type::BlockingTaskQueue %} +{% include "BlockingTaskQueueTemplate.kt" %} {%- when Type::Timestamp %} {% include "TimestampHelper.kt" %} @@ -104,6 +104,8 @@ {%- if ci.has_async_fns() %} {# Import types needed for async support #} {{ self.add_import("kotlin.coroutines.resume") }} +{{ self.add_import("kotlinx.coroutines.launch") }} {{ self.add_import("kotlinx.coroutines.suspendCancellableCoroutine") }} {{ self.add_import("kotlinx.coroutines.CancellableContinuation") }} +{{ self.add_import("kotlinx.coroutines.CoroutineScope") }} {%- endif %} diff --git a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs b/uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs similarity index 51% rename from uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs rename to uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs index e46058e7f1..3f23d618a0 100644 --- a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs +++ b/uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs @@ -5,19 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { + // On python we use an concurrent.futures.Executor for a BlockingTaskQueue fn type_label(&self) -> String { - // Kotlin uses a CoroutineScope for ForeignExecutor - "CoroutineScope".into() + "concurrent.futures.Executor".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() - } - - fn initialization_fn(&self) -> Option { - Some("FfiConverterForeignExecutor.register".into()) + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs index 5e385e457b..edf2a2bdc5 100644 --- a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs +++ b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs @@ -15,11 +15,11 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; -mod executor; mod external; mod miscellany; mod object; @@ -318,7 +318,6 @@ impl PythonCodeOracle { }, FfiType::ForeignBytes => "_UniffiForeignBytes".to_string(), FfiType::ForeignCallback => "_UNIFFI_FOREIGN_CALLBACK_T".to_string(), - FfiType::ForeignExecutorCallback => "_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T".to_string(), FfiType::RustFutureContinuationCallback => "_UNIFFI_FUTURE_CONTINUATION_T".to_string(), } } @@ -376,7 +375,7 @@ impl AsCodeType for T { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } diff --git a/uniffi_bindgen/src/bindings/python/templates/Async.py b/uniffi_bindgen/src/bindings/python/templates/Async.py index f75bfcd821..68440aaf28 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Async.py +++ b/uniffi_bindgen/src/bindings/python/templates/Async.py @@ -1,35 +1,70 @@ # RustFuturePoll values -_UNIFFI_RUST_FUTURE_POLL_READY = 0 -_UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 +UNIFFI_RUST_FUTURE_POLL_READY = 0 +UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 + +""" +In-progress poll of a RustFuture + +Lifecycle of UniffiPoll: + * UniffiPoll is constructed with a Python future that `_uniffi_rust_call_async` awaits + * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * poll() is called, which calls the `rust_future_poll()` + * When rust calls the continuation function we can either: + * Consume the handle, complete the Python future, and cause `_uniffi_rust_call_async` to continue + * Run another poll in an futures.Executor thread (AKA BlockingTaskQueue). +""" +class UniffiPoll: + def __init__( + self, + eventloop: asyncio.AbstractEventLoop, + py_future: asyncio.Future, + rust_future: int, + ffi_poll: ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8, ctypes.c_uint64) + ): + self.eventloop = eventloop + self.py_future = py_future + self.rust_future = rust_future + self.ffi_poll = ffi_poll + self.handle = UNIFFI_POLL_HANDLE_MAP.new_handle(self) + + def poll(self, blocking_task_queue_handle=0): + self.ffi_poll(self.rust_future, _uniffi_continuation_callback, self.handle, blocking_task_queue_handle) + + def poll_in_executor(self, blocking_task_queue_handle): + executor = UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.get(blocking_task_queue_handle) + executor.submit(self.poll, blocking_task_queue_handle) + + def complete_py_future(self, poll_code: int): + self.eventloop.call_soon_threadsafe(self._complete_py_future, poll_code) + + def _complete_py_future(self, poll_code: int): + if not self.py_future.cancelled(): + self.py_future.set_result(poll_code) # Stores futures for _uniffi_continuation_callback -_UNIFFI_CONTINUATION_HANDLE_MAP = UniffiHandleMap() +UNIFFI_POLL_HANDLE_MAP = UniffiHandleMap() # Continuation callback for async functions # lift the return value or error and resolve the future, causing the async function to resume. @_UNIFFI_FUTURE_CONTINUATION_T -def _uniffi_continuation_callback(handle, poll_code): - (eventloop, future) = _UNIFFI_CONTINUATION_HANDLE_MAP.consume_handle(handle) - eventloop.call_soon_threadsafe(_uniffi_set_future_result, future, poll_code) - -def _uniffi_set_future_result(future, poll_code): - if not future.cancelled(): - future.set_result(poll_code) +def _uniffi_continuation_callback(handle, poll_code, blocking_task_queue_handle): + if poll_code == UNIFFI_RUST_FUTURE_POLL_READY or blocking_task_queue_handle == 0: + poll = UNIFFI_POLL_HANDLE_MAP.consume_handle(handle) + poll.complete_py_future(poll_code) + elif blocking_task_queue_handle != 0: + poll = UNIFFI_POLL_HANDLE_MAP.get(handle) + poll.poll_in_executor(blocking_task_queue_handle) async def _uniffi_rust_call_async(rust_future, ffi_poll, ffi_complete, ffi_free, lift_func, error_ffi_converter): try: eventloop = asyncio.get_running_loop() - # Loop and poll until we see a _UNIFFI_RUST_FUTURE_POLL_READY value + # Loop and poll until we see a UNIFFI_RUST_FUTURE_POLL_READY value while True: - future = eventloop.create_future() - ffi_poll( - rust_future, - _uniffi_continuation_callback, - _UNIFFI_CONTINUATION_HANDLE_MAP.new_handle((eventloop, future)), - ) - poll_code = await future - if poll_code == _UNIFFI_RUST_FUTURE_POLL_READY: + py_future = eventloop.create_future() + UniffiPoll(eventloop, py_future, rust_future, ffi_poll).poll() + poll_code = await py_future + if poll_code == UNIFFI_RUST_FUTURE_POLL_READY: break return lift_func( diff --git a/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py b/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py new file mode 100644 index 0000000000..ab1222213b --- /dev/null +++ b/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py @@ -0,0 +1,21 @@ +{{ self.add_import("concurrent.futures") }} + +UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP = UniffiHandleMap() +class {{ ffi_converter_name }}: + @staticmethod + def lift(handle: UniffiHandle) -> concurrent.futures.Executor: + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.consume_handle(handle) + + @staticmethod + def lower(value: concurrent.futures.Executor) -> UniffiHandle: + if not isinstance(value, concurrent.futures.Executor): + raise TypeError("Expected concurrent.futures.Executor instance, {} found".format(type(value).__name__)) + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.new_handle(value) + + @classmethod + def read(cls, buf: _UniffiRustBuffer): + return cls.lift(buf.read_u64()) + + @classmethod + def write(cls, value: {{ type_name }}, buf: _UniffiRustBuffer): + buf.write_u64(cls.lower(value)) diff --git a/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py b/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py deleted file mode 100644 index c4a28a03cf..0000000000 --- a/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py +++ /dev/null @@ -1,63 +0,0 @@ -# FFI code for the ForeignExecutor type - -{{ self.add_import("asyncio") }} - -_UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0 -_UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED = 1 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2 - -class {{ ffi_converter_name }}: - _handle_map = UniffiHandleMap() - - @classmethod - def lower(cls, eventloop): - if not isinstance(eventloop, asyncio.BaseEventLoop): - raise TypeError("_uniffi_executor_callback: Expected EventLoop instance") - return cls._handle_map.new_handle(eventloop) - - @classmethod - def write(cls, eventloop, buf): - buf.write_u64(cls.lower(eventloop)) - - @classmethod - def read(cls, buf): - return cls.lift(buf.read_u64()) - - @classmethod - def lift(cls, value): - return cls._handle_map.get(value) - -@_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T -def _uniffi_executor_callback(handle, delay, task_ptr, task_data): - if task_ptr is None: - {{ ffi_converter_name }}._handle_map.consume_handle(handle) - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - else: - eventloop = {{ ffi_converter_name }}._handle_map.get(handle) - if eventloop.is_closed(): - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED - - callback = _UNIFFI_RUST_TASK(task_ptr) - # FIXME: there's no easy way to get a callback when an eventloop is closed. This means that - # if eventloop is called before the `call_soon_threadsafe()` calls are invoked, the call - # will never happen and we will probably leak a resource. - if delay == 0: - # This can be called from any thread, so make sure to use `call_soon_threadsafe' - eventloop.call_soon_threadsafe(callback, task_data, - _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS) - else: - # For delayed tasks, we use `call_soon_threadsafe()` + `call_later()` to make the - # operation threadsafe - eventloop.call_soon_threadsafe(eventloop.call_later, delay / 1000.0, callback, - task_data, _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS) - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - -# Register the callback with the scaffolding -{%- match ci.ffi_foreign_executor_callback_set() %} -{%- when Some with (fn) %} -_UniffiLib.{{ fn.name() }}(_uniffi_executor_callback) -{%- when None %} -{#- No foreign executor, we don't set anything #} -{% endmatch %} diff --git a/uniffi_bindgen/src/bindings/python/templates/HandleMap.py b/uniffi_bindgen/src/bindings/python/templates/HandleMap.py index 182e1d4ffc..baae3b7ddc 100644 --- a/uniffi_bindgen/src/bindings/python/templates/HandleMap.py +++ b/uniffi_bindgen/src/bindings/python/templates/HandleMap.py @@ -62,3 +62,6 @@ def consume_handle(self, handle: int) -> object: return self._map.pop(self._key(handle)) except KeyError: raise InternalError("handlemap key error: was the handle used after being freed?") + + def __len__(self) -> int: + return len(self._map) diff --git a/uniffi_bindgen/src/bindings/python/templates/Helpers.py b/uniffi_bindgen/src/bindings/python/templates/Helpers.py index 0569528377..445b54e39a 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Helpers.py +++ b/uniffi_bindgen/src/bindings/python/templates/Helpers.py @@ -71,5 +71,4 @@ def _uniffi_check_call_status(error_ffi_converter, call_status): _UNIFFI_FOREIGN_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_ulonglong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_char), ctypes.c_int, ctypes.POINTER(_UniffiRustBuffer)) # UniFFI future continuation -_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8) - +_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8, ctypes.c_uint64) diff --git a/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py b/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py index bfcc28be81..bfbd7e159c 100644 --- a/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py +++ b/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py @@ -1,26 +1,5 @@ # Define some ctypes FFI types that we use in the library -""" -ctypes type for the foreign executor callback. This is a built-in interface for scheduling -tasks - -Args: - executor: opaque c_size_t value representing the eventloop - delay: delay in ms - task: function pointer to the task callback - task_data: void pointer to the task callback data - -Normally we should call task(task_data) after the detail. -However, when task is NULL this indicates that Rust has dropped the ForeignExecutor and we should -decrease the EventLoop refcount. -""" -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int8, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p) - -""" -Function pointer for a Rust task, which a callback function that takes a opaque pointer -""" -_UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int8) - def _uniffi_future_callback_t(return_type): """ Factory function to create callback function types for async functions diff --git a/uniffi_bindgen/src/bindings/python/templates/Types.py b/uniffi_bindgen/src/bindings/python/templates/Types.py index 84afa6bbff..9acf2badb7 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Types.py +++ b/uniffi_bindgen/src/bindings/python/templates/Types.py @@ -94,8 +94,8 @@ {%- when Type::External { name, module_path, namespace, kind, tagged } %} {%- include "ExternalTemplate.py" %} -{%- when Type::ForeignExecutor %} -{%- include "ForeignExecutorTemplate.py" %} +{%- when Type::BlockingTaskQueue %} +{%- include "BlockingTaskQueueTemplate.py" %} {%- else %} {%- endmatch %} diff --git a/uniffi_bindgen/src/bindings/python/templates/wrapper.py b/uniffi_bindgen/src/bindings/python/templates/wrapper.py index 024663ae5b..3fb7f7dd8a 100644 --- a/uniffi_bindgen/src/bindings/python/templates/wrapper.py +++ b/uniffi_bindgen/src/bindings/python/templates/wrapper.py @@ -13,6 +13,7 @@ # compile the rust component. The easiest way to ensure this is to bundle the Python # helpers directly inline like we're doing here. +from dataclasses import dataclass import os import sys import ctypes diff --git a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs index 41f8a7506e..4e7fd41096 100644 --- a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs +++ b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs @@ -57,7 +57,7 @@ pub fn canonical_name(t: &Type) -> String { Type::CallbackInterface { name, .. } => format!("CallbackInterface{name}"), Type::Timestamp => "Timestamp".into(), Type::Duration => "Duration".into(), - Type::ForeignExecutor => "ForeignExecutor".into(), + Type::BlockingTaskQueue => "BlockingTaskQueue".into(), // Recursive types. // These add a prefix to the name of the underlying type. // The component API definition cannot give names to recursive types, so as long as the @@ -156,9 +156,6 @@ mod filters { // Callback interfaces are not yet implemented, but this needs to return something in // order for the coverall tests to pass. FfiType::ForeignCallback => ":pointer".to_string(), - FfiType::ForeignExecutorCallback => { - unimplemented!("Foreign executors are not implemented") - } FfiType::RustFutureContinuationCallback => { unimplemented!("Async functions are not implemented") } @@ -261,7 +258,7 @@ mod filters { } Type::External { .. } => panic!("No support for external types, yet"), Type::Custom { .. } => panic!("No support for custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } @@ -297,7 +294,7 @@ mod filters { ), Type::External { .. } => panic!("No support for lowering external types, yet"), Type::Custom { .. } => panic!("No support for lowering custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } @@ -338,7 +335,7 @@ mod filters { ), Type::External { .. } => panic!("No support for lifting external types, yet"), Type::Custom { .. } => panic!("No support for lifting custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } } diff --git a/uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs b/uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs similarity index 51% rename from uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs rename to uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs index 73a373d8d9..ee79fc476b 100644 --- a/uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs +++ b/uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs @@ -5,19 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { fn type_label(&self) -> String { - // On Swift, we define a struct to represent a ForeignExecutor - "UniFfiForeignExecutor".into() + // On Swift, we use a DispatchQueue for BlockingTaskQueue + "DispatchQueue".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() - } - - fn initialization_fn(&self) -> Option { - Some("uniffiInitForeignExecutor".into()) + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs index 87b3651728..651e6b60f3 100644 --- a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs +++ b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs @@ -17,11 +17,11 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; -mod executor; mod external; mod miscellany; mod object; @@ -403,7 +403,7 @@ impl SwiftCodeOracle { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } @@ -464,16 +464,13 @@ impl SwiftCodeOracle { FfiType::RustBuffer(_) => "RustBuffer".into(), FfiType::ForeignBytes => "ForeignBytes".into(), FfiType::ForeignCallback => "ForeignCallback".into(), - FfiType::ForeignExecutorCallback => "ForeignExecutorCallback".into(), FfiType::RustFutureContinuationCallback => "UniFfiRustFutureContinuation".into(), } } fn ffi_type_label(&self, ffi_type: &FfiType) -> String { match ffi_type { - FfiType::ForeignCallback - | FfiType::ForeignExecutorCallback - | FfiType::RustFutureContinuationCallback => { + FfiType::ForeignCallback | FfiType::RustFutureContinuationCallback => { format!("{} _Nonnull", self.ffi_type_label_raw(ffi_type)) } _ => self.ffi_type_label_raw(ffi_type), @@ -574,7 +571,6 @@ pub mod filters { FfiType::RustBuffer(_) => "RustBuffer".into(), FfiType::ForeignBytes => "ForeignBytes".into(), FfiType::ForeignCallback => "ForeignCallback _Nonnull".into(), - FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback _Nonnull".into(), FfiType::RustFutureContinuationCallback => { "UniFfiRustFutureContinuation _Nonnull".into() } diff --git a/uniffi_bindgen/src/bindings/swift/templates/Async.swift b/uniffi_bindgen/src/bindings/swift/templates/Async.swift index f7446ca5e0..00aedff07b 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/Async.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/Async.swift @@ -1,11 +1,75 @@ private let UNIFFI_RUST_FUTURE_POLL_READY: Int8 = 0 private let UNIFFI_RUST_FUTURE_POLL_MAYBE_READY: Int8 = 1 -fileprivate var UNIFFI_CONTINUATION_HANDLE_MAP = UniffiHandleMap>() +/** + * In-progress poll of a RustFuture + * + * Lifecycle of UniffiPoll: + * * UniffiPoll is constructed with a Swift continuation that `uniffiRustCallAsync` awaits + * * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * * poll() is called, which calls the `rust_future_poll()` + * * When rust calls the continuation function we can either: + * * Consume the handle, complete the continuation, and cause `uniffiRustCallAsync` to continue + * * Run another poll in an CoroutineContext (AKA BlockingTaskQueue). + */ +fileprivate class UniffiPoll { + let continuation: UnsafeContinuation + let rustFuture: UInt64 + let pollFunc: (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> () + var handle: UInt64 + + init( + continuation: UnsafeContinuation, + rustFuture: UInt64, + pollFunc: @escaping (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> () + ) { + self.continuation = continuation + self.rustFuture = rustFuture + self.pollFunc = pollFunc + self.handle = 0 + } + + fileprivate func createHandle() { + self.handle = UNIFFI_POLL_HANDLE_MAP.newHandle(obj: self) + } + + fileprivate func poll(_ BlockingTaskQueueHandle: UInt64 = 0) { + pollFunc(rustFuture, uniffiFutureContinuationCallback, handle, BlockingTaskQueueHandle) + } + + fileprivate func pollInDispatchQueue(_ BlockingTaskQueueHandle: UInt64) { + let queue = UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.get(handle: BlockingTaskQueueHandle) + queue.async { + self.poll(BlockingTaskQueueHandle) + } + } + + fileprivate func completeContinuation(_ pollResult: Int8) { + continuation.resume(returning: pollResult) + } +} + +fileprivate var UNIFFI_POLL_HANDLE_MAP = UniffiHandleMap() + +// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They +// lift the return value or error and resume the suspended function. +fileprivate func uniffiFutureContinuationCallback( + handle: UInt64, + pollResult: Int8, + BlockingTaskQueueHandle: UInt64 +) { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) || (BlockingTaskQueueHandle == 0) { + let poll = UNIFFI_POLL_HANDLE_MAP.consumeHandle(handle: handle) + poll.completeContinuation(pollResult) + } else { + let poll = UNIFFI_POLL_HANDLE_MAP.get(handle: handle) + poll.pollInDispatchQueue(BlockingTaskQueueHandle) + } +} fileprivate func uniffiRustCallAsync( rustFutureFunc: () -> UInt64, - pollFunc: (UInt64, @escaping UniFfiRustFutureContinuation, UInt64) -> (), + pollFunc: @escaping (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> (), completeFunc: (UInt64, UnsafeMutablePointer) -> F, freeFunc: (UInt64) -> (), liftFunc: (F) throws -> T, @@ -21,7 +85,9 @@ fileprivate func uniffiRustCallAsync( var pollResult: Int8; repeat { pollResult = await withUnsafeContinuation { - pollFunc(rustFuture, uniffiFutureContinuationCallback, UNIFFI_CONTINUATION_HANDLE_MAP.newHandle(obj: $0)) + let poll = UniffiPoll(continuation: $0, rustFuture: rustFuture, pollFunc: pollFunc) + poll.createHandle() + poll.poll() } } while pollResult != UNIFFI_RUST_FUTURE_POLL_READY @@ -31,8 +97,7 @@ fileprivate func uniffiRustCallAsync( )) } -// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They -// lift the return value or error and resume the suspended function. -fileprivate func uniffiFutureContinuationCallback(handle: UInt64, pollResult: Int8) { - UNIFFI_CONTINUATION_HANDLE_MAP.consumeHandle(handle: handle).resume(returning: pollResult) +// For testing +public func uniffiPollHandleCount() -> Int { + UNIFFI_POLL_HANDLE_MAP.count } diff --git a/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift new file mode 100644 index 0000000000..c4b80f05e8 --- /dev/null +++ b/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift @@ -0,0 +1,28 @@ +fileprivate var UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP = UniffiHandleMap() + +public struct {{ ffi_converter_name }}: FfiConverter { + + typealias FfiType = UInt64 + typealias SwiftType = DispatchQueue + + public static func lift(_ handle: UInt64) throws -> DispatchQueue { + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.consumeHandle(handle: handle) + } + + public static func lower(_ value: DispatchQueue) -> UInt64 { + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.newHandle(obj: value) + } + + public static func read(from buf: inout (data: Data, offset: Data.Index)) throws -> DispatchQueue { + return try lift(try readInt(&buf)) + } + + public static func write(_ value: DispatchQueue, into buf: inout [UInt8]) { + writeInt(&buf, lower(value)) + } +} + +// For testing +public func uniffiBlockingTaskQueueHandleCount() -> Int { + UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.count +} diff --git a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h index 587eab591b..374d8455c7 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h +++ b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h @@ -31,18 +31,6 @@ typedef struct RustBuffer typedef int32_t (*ForeignCallback)(uint64_t, int32_t, const uint8_t *_Nonnull, int32_t, RustBuffer *_Nonnull); -// Task defined in Rust that Swift executes -typedef void (*UniFfiRustTaskCallback)(const void * _Nullable, int8_t); - -// Callback to execute Rust tasks using a Swift Task -// -// Args: -// executor: ForeignExecutor lowered into a size_t value -// delay: Delay in MS -// task: UniFfiRustTaskCallback to call -// task_data: data to pass the task callback -typedef int8_t (*UniFfiForeignExecutorCallback)(size_t, uint32_t, UniFfiRustTaskCallback _Nullable, const void * _Nullable); - typedef struct ForeignBytes { int32_t len; @@ -60,7 +48,7 @@ typedef struct RustCallStatus { #endif // def UNIFFI_SHARED_H // Continuation callback for UniFFI Futures -typedef void (*UniFfiRustFutureContinuation)(uint64_t, int8_t); +typedef void (*UniFfiRustFutureContinuation)(uint64_t, int8_t, uint64_t); // Scaffolding functions {%- for func in ci.iter_ffi_function_definitions() %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift deleted file mode 100644 index 167e4c7546..0000000000 --- a/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift +++ /dev/null @@ -1,69 +0,0 @@ -private let UNIFFI_RUST_TASK_CALLBACK_SUCCESS: Int8 = 0 -private let UNIFFI_RUST_TASK_CALLBACK_CANCELLED: Int8 = 1 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS: Int8 = 0 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED: Int8 = 1 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR: Int8 = 2 - -// Encapsulates an executor that can run Rust tasks -// -// On Swift, `Task.detached` can handle this we just need to know what priority to send it. -public struct UniFfiForeignExecutor { - var priority: TaskPriority - - public init(priority: TaskPriority) { - self.priority = priority - } - - public init() { - self.priority = Task.currentPriority - } -} - -fileprivate struct FfiConverterForeignExecutor: FfiConverter { - typealias SwiftType = UniFfiForeignExecutor - // Rust uses a pointer to represent the FfiConverterForeignExecutor, but we only need a u8. - // let's use `Int`, which is equivalent to `size_t` - typealias FfiType = Int - - public static func lift(_ value: FfiType) throws -> SwiftType { - UniFfiForeignExecutor(priority: TaskPriority(rawValue: numericCast(value))) - } - public static func lower(_ value: SwiftType) -> FfiType { - numericCast(value.priority.rawValue) - } - - public static func read(from buf: inout (data: Data, offset: Data.Index)) throws -> SwiftType { - fatalError("FfiConverterForeignExecutor.read not implemented yet") - } - public static func write(_ value: SwiftType, into buf: inout [UInt8]) { - fatalError("FfiConverterForeignExecutor.read not implemented yet") - } -} - - -fileprivate func uniffiForeignExecutorCallback(executorHandle: Int, delayMs: UInt32, rustTask: UniFfiRustTaskCallback?, taskData: UnsafeRawPointer?) -> Int8 { - if let rustTask = rustTask { - let executor = try! FfiConverterForeignExecutor.lift(executorHandle) - Task.detached(priority: executor.priority) { - if delayMs != 0 { - let nanoseconds: UInt64 = numericCast(delayMs * 1000000) - try! await Task.sleep(nanoseconds: nanoseconds) - } - rustTask(taskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS) - } - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - // When rustTask is null, we should drop the foreign executor. - // However, since its just a value type, we don't need to do anything here. - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } -} - -fileprivate func uniffiInitForeignExecutor() { - {%- match ci.ffi_foreign_executor_callback_set() %} - {%- when Some with (fn) %} - {{ fn.name() }}(uniffiForeignExecutorCallback) - {%- when None %} - {#- No foreign executor, we don't set anything #} - {% endmatch %} -} diff --git a/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift b/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift index b294eddd7f..c94b526450 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift @@ -18,6 +18,12 @@ fileprivate class UniffiHandleMap { // Note: foreign handles are always odd private var keyCounter: UInt64 = 1 + var count: Int { + get { + map.count + } + } + private func nextKey() -> UInt64 { let key = keyCounter keyCounter = (keyCounter + 2) & 0xFFFF_FFFF_FFFF diff --git a/uniffi_bindgen/src/bindings/swift/templates/Types.swift b/uniffi_bindgen/src/bindings/swift/templates/Types.swift index aba34f4b0b..ba4d2059c8 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/Types.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/Types.swift @@ -64,8 +64,8 @@ {%- when Type::CallbackInterface { name, module_path } %} {%- include "CallbackInterfaceTemplate.swift" %} -{%- when Type::ForeignExecutor %} -{%- include "ForeignExecutorTemplate.swift" %} +{%- when Type::BlockingTaskQueue %} +{%- include "BlockingTaskQueueTemplate.swift" %} {%- when Type::Custom { name, module_path, builtin } %} {%- include "CustomType.swift" %} diff --git a/uniffi_bindgen/src/interface/ffi.rs b/uniffi_bindgen/src/interface/ffi.rs index de8db86334..0b8d73d514 100644 --- a/uniffi_bindgen/src/interface/ffi.rs +++ b/uniffi_bindgen/src/interface/ffi.rs @@ -46,8 +46,6 @@ pub enum FfiType { ForeignBytes, /// Pointer to a callback function that handles all callbacks on the foreign language side. ForeignCallback, - /// Pointer to the callback function that's invoked to schedule calls with a ForeignExecutor - ForeignExecutorCallback, /// Continuation function for a Rust future RustFutureContinuationCallback, // TODO: you can imagine a richer structural typesystem here, e.g. `Ref` or something. @@ -84,7 +82,7 @@ impl From<&Type> for FfiType { // Object types interfaces are passed as opaque handles. Type::Object { .. } | Type::CallbackInterface { .. } - | Type::ForeignExecutor + | Type::BlockingTaskQueue | Type::External { kind: ExternalKind::Interface, .. diff --git a/uniffi_bindgen/src/interface/mod.rs b/uniffi_bindgen/src/interface/mod.rs index 99420a1504..7b88613dc7 100644 --- a/uniffi_bindgen/src/interface/mod.rs +++ b/uniffi_bindgen/src/interface/mod.rs @@ -463,6 +463,10 @@ impl ComponentInterface { name: "continuation_data".to_owned(), type_: FfiType::Handle, }, + FfiArgument { + name: "blocking_task_queue".to_owned(), + type_: FfiType::UInt64, + }, ], return_type: None, has_rust_call_status_arg: false, @@ -572,7 +576,6 @@ impl ComponentInterface { .chain(self.iter_rust_buffer_ffi_function_definitions()) .chain(self.iter_futures_ffi_function_definitons()) .chain(self.iter_checksum_ffi_functions()) - .chain(self.ffi_foreign_executor_callback_set()) .chain([self.ffi_uniffi_contract_version()]) } @@ -651,27 +654,6 @@ impl ComponentInterface { }) } - /// The ffi_foreign_executor_callback_set FFI function - /// - /// We only include this in the FFI if the `ForeignExecutor` type is actually used - pub fn ffi_foreign_executor_callback_set(&self) -> Option { - if self.types.contains(&Type::ForeignExecutor) { - Some(FfiFunction { - name: format!("ffi_{}_foreign_executor_callback_set", self.ffi_namespace()), - arguments: vec![FfiArgument { - name: "callback".into(), - type_: FfiType::ForeignExecutorCallback, - }], - return_type: None, - is_async: false, - has_rust_call_status_arg: false, - is_object_free_function: false, - }) - } else { - None - } - } - /// List all API checksums to check /// /// Returns a list of (export_symbol_name, checksum) items @@ -777,6 +759,12 @@ impl ComponentInterface { bail!("Conflicting type definition for \"{}\"", defn.name()); } self.types.add_known_types(defn.iter_types())?; + // Add the BlockingTaskQueue if there are any async functions. + // This isn't strictly necessary, but it simplifies the template code. + if defn.is_async() { + self.types + .add_known_type(&uniffi_meta::Type::BlockingTaskQueue)?; + } self.functions.push(defn); Ok(()) diff --git a/uniffi_bindgen/src/interface/universe.rs b/uniffi_bindgen/src/interface/universe.rs index e69d86e44f..14c7286015 100644 --- a/uniffi_bindgen/src/interface/universe.rs +++ b/uniffi_bindgen/src/interface/universe.rs @@ -83,8 +83,8 @@ impl TypeUniverse { Type::Bytes => self.add_type_definition("bytes", type_)?, Type::Timestamp => self.add_type_definition("timestamp", type_)?, Type::Duration => self.add_type_definition("duration", type_)?, - Type::ForeignExecutor => { - self.add_type_definition("ForeignExecutor", type_)?; + Type::BlockingTaskQueue => { + self.add_type_definition("BlockingTaskQueue", type_)?; } Type::Object { name, .. } | Type::Record { name, .. } @@ -119,6 +119,7 @@ impl TypeUniverse { } /// Check if a [Type] is present + #[cfg(test)] pub fn contains(&self, type_: &Type) -> bool { self.all_known_types.contains(type_) } diff --git a/uniffi_bindgen/src/scaffolding/mod.rs b/uniffi_bindgen/src/scaffolding/mod.rs index f3759cf6fa..f75b87bd21 100644 --- a/uniffi_bindgen/src/scaffolding/mod.rs +++ b/uniffi_bindgen/src/scaffolding/mod.rs @@ -45,7 +45,7 @@ mod filters { format!("std::sync::Arc<{}>", imp.rust_name_for(name)) } Type::CallbackInterface { name, .. } => format!("Box"), - Type::ForeignExecutor => "::uniffi::ForeignExecutor".into(), + Type::BlockingTaskQueue => "::uniffi::BlockingTaskQueue".into(), Type::Optional { inner_type } => { format!("std::option::Option<{}>", type_rs(inner_type)?) } diff --git a/uniffi_core/src/ffi/foreigncallbacks.rs b/uniffi_core/src/ffi/foreigncallbacks.rs index 1746489e30..43c7f3ecfa 100644 --- a/uniffi_core/src/ffi/foreigncallbacks.rs +++ b/uniffi_core/src/ffi/foreigncallbacks.rs @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; -use crate::{Handle, RustBuffer, RustTaskCallback}; +use crate::{Handle, RustBuffer}; /// ForeignCallback is the Rust representation of a foreign language function. /// It is the basis for all callbacks interfaces. It is registered exactly once per callback interface, @@ -37,31 +37,9 @@ pub type ForeignCallback = unsafe extern "C" fn( buf_ptr: *mut RustBuffer, ) -> i32; -/// Callback to schedule a Rust call with a `ForeignExecutor`. The bindings code registers exactly -/// one of these with the Rust code. -/// -/// Delay is an approximate amount of ms to wait before scheduling the call. Delay is usually 0, -/// which means schedule sometime soon. -/// -/// As a special case, when Rust drops the foreign executor, with `task=null`. The foreign -/// bindings should release the reference to the executor that was reserved for Rust. -/// -/// This callback can be invoked from any thread, including threads created by Rust. -/// -/// The callback should return one of the `ForeignExecutorCallbackResult` values. -pub type ForeignExecutorCallback = extern "C" fn( - executor: Handle, - delay: u32, - task: Option, - task_data: *const (), -) -> i8; - /// Store a [ForeignCallback] pointer pub(crate) struct ForeignCallbackCell(AtomicUsize); -/// Store a [ForeignExecutorCallback] pointer -pub(crate) struct ForeignExecutorCallbackCell(AtomicUsize); - /// Macro to define foreign callback types as well as the callback cell. macro_rules! impl_foreign_callback_cell { ($callback_type:ident, $cell_type:ident) => { @@ -100,4 +78,3 @@ macro_rules! impl_foreign_callback_cell { } impl_foreign_callback_cell!(ForeignCallback, ForeignCallbackCell); -impl_foreign_callback_cell!(ForeignExecutorCallback, ForeignExecutorCallbackCell); diff --git a/uniffi_core/src/ffi/foreignexecutor.rs b/uniffi_core/src/ffi/foreignexecutor.rs deleted file mode 100644 index 3f0f826644..0000000000 --- a/uniffi_core/src/ffi/foreignexecutor.rs +++ /dev/null @@ -1,473 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! Schedule tasks using a foreign executor. - -use std::panic; - -use crate::{ForeignExecutorCallback, ForeignExecutorCallbackCell, Handle}; - -/// Result code returned by `ForeignExecutorCallback` -#[repr(i8)] -#[derive(Debug, PartialEq, Eq)] -pub enum ForeignExecutorCallbackResult { - /// Callback was scheduled successfully - Success = 0, - /// Callback couldn't be scheduled because the foreign executor is canceled/closed. - Cancelled = 1, - /// Callback couldn't be scheduled because of some other error - Error = 2, -} - -impl ForeignExecutorCallbackResult { - /// Check the result code for the foreign executor callback - /// - /// If the result was `ForeignExecutorCallbackResult.Success`, this method returns `true`. - /// - /// If not, this method returns `false`, logging errors for any unexpected return values - pub fn check_result_code(result: i8) -> bool { - match result { - n if n == ForeignExecutorCallbackResult::Success as i8 => true, - n if n == ForeignExecutorCallbackResult::Cancelled as i8 => false, - n if n == ForeignExecutorCallbackResult::Error as i8 => { - log::error!( - "ForeignExecutorCallbackResult::Error returned by foreign executor callback" - ); - false - } - n => { - log::error!("Unknown code ({n}) returned by foreign executor callback"); - false - } - } - } -} - -// Option should use the null pointer optimization and be represented in C as a -// regular pointer. Let's check that. -static_assertions::assert_eq_size!(usize, Option); - -/// Callback for a Rust task, this is what the foreign executor invokes -/// -/// The task will be passed the `task_data` passed to `ForeignExecutorCallback` in addition to one -/// of the `RustTaskCallbackCode` values. -pub type RustTaskCallback = extern "C" fn(*const (), RustTaskCallbackCode); - -/// Passed to a `RustTaskCallback` function when the executor invokes them. -/// -/// Every `RustTaskCallback` will be invoked eventually, this code is used to distinguish the times -/// when it's invoked successfully vs times when the callback is being called because the foreign -/// executor has been cancelled / shutdown -#[repr(i8)] -#[derive(Debug, PartialEq, Eq)] -pub enum RustTaskCallbackCode { - /// Successful task callback invocation - Success = 0, - /// The `ForeignExecutor` has been cancelled. - /// - /// This signals that any progress using the executor should be halted. In particular, Futures - /// should not continue to progress. - Cancelled = 1, -} - -static FOREIGN_EXECUTOR_CALLBACK: ForeignExecutorCallbackCell = ForeignExecutorCallbackCell::new(); - -/// Set the global ForeignExecutorCallback. This is called by the foreign bindings, normally -/// during initialization. -pub fn foreign_executor_callback_set(callback: ForeignExecutorCallback) { - FOREIGN_EXECUTOR_CALLBACK.set(callback); -} - -/// Schedule Rust calls using a foreign executor -#[derive(Debug)] -pub struct ForeignExecutor { - pub(crate) handle: Handle, -} - -impl ForeignExecutor { - pub fn new(executor: Handle) -> Self { - Self { handle: executor } - } - - /// Schedule a closure to be run. - /// - /// This method can be used for "fire-and-forget" style calls, where the calling code doesn't - /// need to await the result. - /// - /// Closure requirements: - /// - Send: since the closure will likely run on a different thread - /// - 'static: since it runs at an arbitrary time, so all references need to be 'static - /// - panic::UnwindSafe: if the closure panics, it should not corrupt any data - pub fn schedule(&self, delay: u32, task: F) { - let leaked_ptr: *mut F = Box::leak(Box::new(task)); - if !schedule_raw( - self.handle, - delay, - schedule_callback::, - leaked_ptr as *const (), - ) { - // If schedule_raw() failed, drop the leaked box since `schedule_callback()` has not been - // scheduled to run. - unsafe { - drop(Box::::from_raw(leaked_ptr)); - }; - } - } - - /// Schedule a closure to be run and get a Future for the result - /// - /// Closure requirements: - /// - Send: since the closure will likely run on a different thread - /// - 'static: since it runs at an arbitrary time, so all references need to be 'static - /// - panic::UnwindSafe: if the closure panics, it should not corrupt any data - pub async fn run(&self, delay: u32, closure: F) -> T - where - F: FnOnce() -> T + Send + 'static + panic::UnwindSafe, - T: Send + 'static, - { - // Create a oneshot channel to handle the future - let (sender, receiver) = oneshot::channel(); - // We can use `AssertUnwindSafe` here because: - // - The closure is unwind safe - // - `Sender` is not marked unwind safe, maybe this is just an oversight in the oneshot - // library. However, calling `send()` and dropping the Sender should certainly be - // unwind safe. `send()` should probably not panic at all and if it does it shouldn't - // do it in a way that breaks the Receiver. - // - Calling `expect` may result in a panic, but this should should not break either the - // Sender or Receiver. - self.schedule( - delay, - panic::AssertUnwindSafe(move || { - sender.send(closure()).expect("Error sending future result") - }), - ); - receiver.await.expect("Error receiving future result") - } -} - -/// Low-level schedule interface -/// -/// When using this function, take care to ensure that the `ForeignExecutor` that holds the -/// `Handle` has not been dropped. -/// -/// Returns true if the callback was successfully scheduled -pub(crate) fn schedule_raw( - handle: Handle, - delay: u32, - callback: RustTaskCallback, - data: *const (), -) -> bool { - let result_code = (FOREIGN_EXECUTOR_CALLBACK.get())(handle, delay, Some(callback), data); - ForeignExecutorCallbackResult::check_result_code(result_code) -} - -impl Drop for ForeignExecutor { - fn drop(&mut self) { - (FOREIGN_EXECUTOR_CALLBACK.get())(self.handle, 0, None, std::ptr::null()); - } -} - -extern "C" fn schedule_callback(data: *const (), status_code: RustTaskCallbackCode) -where - F: FnOnce() + Send + 'static + panic::UnwindSafe, -{ - // No matter what, we need to call Box::from_raw() to balance the Box::leak() call. - let task = unsafe { Box::from_raw(data as *mut F) }; - // Skip running the task for the `RustTaskCallbackCode::Cancelled` code - if status_code == RustTaskCallbackCode::Success { - run_task(task); - } -} - -/// Run a scheduled task, catching any panics. -/// -/// If there are panics, then we will log a warning and return None. -fn run_task T + panic::UnwindSafe, T>(task: F) -> Option { - match panic::catch_unwind(task) { - Ok(v) => Some(v), - Err(cause) => { - let message = if let Some(s) = cause.downcast_ref::<&'static str>() { - (*s).to_string() - } else if let Some(s) = cause.downcast_ref::() { - s.clone() - } else { - "Unknown panic!".to_string() - }; - log::warn!("Error calling UniFFI callback function: {message}"); - None - } - } -} - -#[cfg(test)] -pub use test::MockEventLoop; - -#[cfg(test)] -mod test { - use super::*; - use crate::HandleAlloc; - use std::{ - future::Future, - pin::Pin, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, Once, - }, - task::{Context, Poll, Wake, Waker}, - }; - - /// Simulate an event loop / task queue / coroutine scope on the foreign side - /// - /// This simply collects scheduled calls into a Vec for testing purposes. - /// - /// Most of the MockEventLoop methods are `pub` since it's also used by the `rustfuture` tests. - pub struct MockEventLoop { - // Wrap everything in a mutex since we typically share access to MockEventLoop via an Arc - inner: Mutex, - } - - pub struct MockEventLoopInner { - // calls that have been scheduled - calls: Vec<(u32, Option, *const ())>, - // has the event loop been shutdown? - is_shutdown: bool, - } - - unsafe impl Send for MockEventLoopInner {} - - static FOREIGN_EXECUTOR_CALLBACK_INIT: Once = Once::new(); - - impl MockEventLoop { - pub fn new() -> Arc { - // Make sure we install a foreign executor callback that can deal with mock event loops - FOREIGN_EXECUTOR_CALLBACK_INIT - .call_once(|| foreign_executor_callback_set(mock_executor_callback)); - - Arc::new(Self { - inner: Mutex::new(MockEventLoopInner { - calls: vec![], - is_shutdown: false, - }), - }) - } - - /// Create a new Handle - pub fn new_handle(self: &Arc) -> Handle { - >::new_handle(Arc::clone(self)) - } - - pub fn new_executor(self: &Arc) -> ForeignExecutor { - ForeignExecutor { - handle: self.new_handle(), - } - } - - /// Get the current number of scheduled calls - pub fn call_count(&self) -> usize { - self.inner.lock().unwrap().calls.len() - } - - /// Get the last scheduled call - pub fn last_call(&self) -> (u32, Option, *const ()) { - self.inner - .lock() - .unwrap() - .calls - .last() - .cloned() - .expect("no calls scheduled") - } - - /// Run all currently scheduled calls - pub fn run_all_calls(&self) { - let mut inner = self.inner.lock().unwrap(); - let is_shutdown = inner.is_shutdown; - for (_delay, callback, data) in inner.calls.drain(..) { - if !is_shutdown { - callback.unwrap()(data, RustTaskCallbackCode::Success); - } else { - callback.unwrap()(data, RustTaskCallbackCode::Cancelled); - } - } - } - - /// Shutdown the eventloop, causing scheduled calls and future calls to be cancelled - pub fn shutdown(&self) { - self.inner.lock().unwrap().is_shutdown = true; - } - } - - // `ForeignExecutorCallback` that we install for testing - extern "C" fn mock_executor_callback( - handle: Handle, - delay: u32, - task: Option, - task_data: *const (), - ) -> i8 { - let eventloop = >::get_arc(handle); - let mut inner = eventloop.inner.lock().unwrap(); - if inner.is_shutdown { - ForeignExecutorCallbackResult::Cancelled as i8 - } else { - inner.calls.push((delay, task, task_data)); - ForeignExecutorCallbackResult::Success as i8 - } - } - - #[test] - fn test_schedule_raw() { - extern "C" fn callback(data: *const (), _status_code: RustTaskCallbackCode) { - unsafe { - *(data as *mut u32) += 1; - } - } - - let eventloop = MockEventLoop::new(); - - let value: u32 = 0; - assert_eq!(eventloop.call_count(), 0); - - schedule_raw( - eventloop.new_handle(), - 0, - callback, - &value as *const u32 as *const (), - ); - assert_eq!(eventloop.call_count(), 1); - assert_eq!(value, 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(value, 1); - } - - #[test] - fn test_schedule() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - let value = Arc::new(AtomicU32::new(0)); - assert_eq!(eventloop.call_count(), 0); - - let value2 = value.clone(); - executor.schedule(0, move || { - value2.fetch_add(1, Ordering::Relaxed); - }); - assert_eq!(eventloop.call_count(), 1); - assert_eq!(value.load(Ordering::Relaxed), 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(value.load(Ordering::Relaxed), 1); - } - - #[derive(Default)] - struct MockWaker { - wake_count: AtomicU32, - } - - impl Wake for MockWaker { - fn wake(self: Arc) { - self.wake_count.fetch_add(1, Ordering::Relaxed); - } - } - - #[test] - fn test_run() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - let mock_waker = Arc::new(MockWaker::default()); - let waker = Waker::from(mock_waker.clone()); - let mut context = Context::from_waker(&waker); - assert_eq!(eventloop.call_count(), 0); - - let mut future = executor.run(0, move || "test-return-value"); - unsafe { - assert_eq!( - Pin::new_unchecked(&mut future).poll(&mut context), - Poll::Pending - ); - } - assert_eq!(eventloop.call_count(), 1); - assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 1); - unsafe { - assert_eq!( - Pin::new_unchecked(&mut future).poll(&mut context), - Poll::Ready("test-return-value") - ); - } - } - - #[test] - fn test_drop() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - - drop(executor); - // Calling drop should schedule a call with null task data. - assert_eq!(eventloop.call_count(), 1); - assert_eq!(eventloop.last_call().1, None); - } - - // Test that cancelled calls never run - #[test] - fn test_cancelled_call() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - // Create a shared counter - let counter = Arc::new(AtomicU32::new(0)); - // schedule increments using both `schedule()` and run()` - let counter_clone = Arc::clone(&counter); - executor.schedule(0, move || { - counter_clone.fetch_add(1, Ordering::Relaxed); - }); - let counter_clone = Arc::clone(&counter); - let future = executor.run(0, move || { - counter_clone.fetch_add(1, Ordering::Relaxed); - }); - // shutdown the eventloop before the scheduled call gets a chance to run. - eventloop.shutdown(); - // `run_all_calls()` will cause the scheduled task callbacks to run, but will pass - // `RustTaskCallbackCode::Cancelled` to it. This drop the scheduled closure without executing - // it. - eventloop.run_all_calls(); - - assert_eq!(counter.load(Ordering::Relaxed), 0); - drop(future); - } - - // Test that when scheduled calls are cancelled, the closures are dropped properly - #[test] - fn test_cancellation_drops_closures() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - - // Create an Arc<> that we will move into the closures to test if they are dropped or not - let arc = Arc::new(0); - let arc_clone = Arc::clone(&arc); - executor.schedule(0, move || assert_eq!(*arc_clone, 0)); - let arc_clone = Arc::clone(&arc); - let future = executor.run(0, move || assert_eq!(*arc_clone, 0)); - - // shutdown the eventloop and run the (cancelled) scheduled calls. - eventloop.shutdown(); - eventloop.run_all_calls(); - // try to schedule some more calls now that the loop has been shutdown - let arc_clone = Arc::clone(&arc); - executor.schedule(0, move || assert_eq!(*arc_clone, 0)); - let arc_clone = Arc::clone(&arc); - let future2 = executor.run(0, move || assert_eq!(*arc_clone, 0)); - - // Drop the futures so they don't hold on to any references - drop(future); - drop(future2); - - // All of these closures should have been dropped by now, there only remaining arc - // reference should be the original - assert_eq!(Arc::strong_count(&arc), 1); - } -} diff --git a/uniffi_core/src/ffi/mod.rs b/uniffi_core/src/ffi/mod.rs index f7176671bf..8e26be37b0 100644 --- a/uniffi_core/src/ffi/mod.rs +++ b/uniffi_core/src/ffi/mod.rs @@ -8,7 +8,6 @@ pub mod callbackinterface; pub mod ffidefault; pub mod foreignbytes; pub mod foreigncallbacks; -pub mod foreignexecutor; pub mod handle; pub mod rustbuffer; pub mod rustcalls; @@ -18,7 +17,6 @@ pub use callbackinterface::*; pub use ffidefault::FfiDefault; pub use foreignbytes::*; pub use foreigncallbacks::*; -pub use foreignexecutor::*; pub use handle::*; pub use rustbuffer::*; pub use rustcalls::*; diff --git a/uniffi_core/src/ffi/rustfuture/future.rs b/uniffi_core/src/ffi/rustfuture/future.rs index 4e9dc6ca10..bf04954fa9 100644 --- a/uniffi_core/src/ffi/rustfuture/future.rs +++ b/uniffi_core/src/ffi/rustfuture/future.rs @@ -16,11 +16,15 @@ //! 2a. In a loop: //! - Call [rust_future_poll] //! - Suspend the function until the [rust_future_poll] continuation function is called -//! - If the continuation was function was called with [RustFuturePoll::Ready], then break +//! - If the continuation was function was called with [super::RustFuturePoll::Ready], then break //! otherwise continue. //! 2b. If the async function is cancelled, then call [rust_future_cancel]. This causes the -//! continuation function to be called with [RustFuturePoll::Ready] and the [RustFuture] to +//! continuation function to be called with [super::RustFuturePoll::Ready] and the [RustFuture] to //! enter a cancelled state. +//! 2c. If the Rust code wants schedule work to be run in a `BlockingTaskQueue`, then the +//! continuation is called with [super::RustFuturePoll::MaybeReady] and the blocking task queue handle. +//! The foreign code is responsible for ensuring the next [rust_future_poll] call happens in +//! that blocking task queue and the handle is passed to [rust_future_poll]. //! 3. Call [rust_future_complete] to get the result of the future. //! 4. Call [rust_future_free] to free the future, ideally in a finally block. This: //! - Releases any resources held by the future @@ -85,7 +89,7 @@ use std::{ task::{Context, Poll, Wake}, }; -use super::{RustFutureContinuationCallback, RustFuturePoll, Scheduler}; +use super::{RustFutureContinuationCallback, Scheduler}; use crate::{ derive_ffi_traits, rust_call_with_out_status, FfiDefault, Handle, LowerReturn, RustCallStatus, }; @@ -204,7 +208,7 @@ where // This Mutex should never block if our code is working correctly, since there should not be // multiple threads calling [Self::poll] and/or [Self::complete] at the same time. future: Mutex>, - continuation_data: Mutex, + scheduler: Mutex, // UT is used as the generic parameter for [LowerReturn]. // Let's model this with PhantomData as a function that inputs a UT value. _phantom: PhantomData ()>, @@ -220,34 +224,39 @@ where pub(super) fn new(future: F, _tag: UT) -> Arc { Arc::new(Self { future: Mutex::new(WrappedFuture::new(future)), - continuation_data: Mutex::new(Scheduler::new()), + scheduler: Mutex::new(Scheduler::new()), _phantom: PhantomData, }) } - pub(super) fn poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle) { - let ready = self.is_cancelled() || { + pub(super) fn poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ) { + let mut ready = self + .scheduler + .lock() + .unwrap() + .on_poll_start(blocking_task_queue); + if !ready { let mut locked = self.future.lock().unwrap(); let waker: std::task::Waker = Arc::clone(&self).into(); - locked.poll(&mut Context::from_waker(&waker)) + ready = locked.poll(&mut Context::from_waker(&waker)) }; - if ready { - callback(data, RustFuturePoll::Ready) - } else { - self.continuation_data.lock().unwrap().store(callback, data); - } - } - - pub(super) fn is_cancelled(&self) -> bool { - self.continuation_data.lock().unwrap().is_cancelled() + self.scheduler + .lock() + .unwrap() + .on_poll_end(callback, data, ready); } pub(super) fn wake(&self) { - self.continuation_data.lock().unwrap().wake(); + self.scheduler.lock().unwrap().wake(); } pub(super) fn cancel(&self) { - self.continuation_data.lock().unwrap().cancel(); + self.scheduler.lock().unwrap().cancel(); } pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType { @@ -256,7 +265,7 @@ where pub(super) fn free(self: Arc) { // Call cancel() to send any leftover data to the continuation callback - self.continuation_data.lock().unwrap().cancel(); + self.scheduler.lock().unwrap().cancel(); // Ensure we drop our inner future, releasing all held references self.future.lock().unwrap().free(); } @@ -291,7 +300,12 @@ where /// only create those functions for each of the 13 possible FFI return types. #[doc(hidden)] pub trait RustFutureFfi: Send + Sync { - fn ffi_poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle); + fn ffi_poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ); fn ffi_cancel(&self); fn ffi_complete(&self, call_status: &mut RustCallStatus) -> ReturnType; fn ffi_free(self: Arc); @@ -304,8 +318,13 @@ where T: LowerReturn + Send + 'static, UT: Send + 'static, { - fn ffi_poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle) { - self.poll(callback, data) + fn ffi_poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ) { + self.poll(callback, data, blocking_task_queue) } fn ffi_cancel(&self) { diff --git a/uniffi_core/src/ffi/rustfuture/mod.rs b/uniffi_core/src/ffi/rustfuture/mod.rs index 17630db05b..f73d3aa559 100644 --- a/uniffi_core/src/ffi/rustfuture/mod.rs +++ b/uniffi_core/src/ffi/rustfuture/mod.rs @@ -14,6 +14,8 @@ mod tests; use crate::{Handle, HandleAlloc, LowerReturn, RustCallStatus}; +pub use scheduler::BlockingTaskQueue; + /// Result code for [rust_future_poll]. This is passed to the continuation function. #[repr(i8)] #[derive(Debug, PartialEq, Eq)] @@ -28,7 +30,13 @@ pub enum RustFuturePoll { /// /// The Rust side of things calls this when the foreign side should call [rust_future_poll] again /// to continue progress on the future. -pub type RustFutureContinuationCallback = extern "C" fn(callback_data: Handle, RustFuturePoll); +/// +/// * `data` is the handle that the foreign code passed to `poll()` +/// * `poll_result` is the result of the poll +/// * `blocking_task_task_queue` is the blocking task queue handle to run the next `poll()` on or 0 if it +/// doesn't need to run in a blocking task queue. +pub type RustFutureContinuationCallback = + extern "C" fn(data: Handle, poll_result: RustFuturePoll, blocking_task_task_queue: u64); // === Public FFI API === @@ -58,17 +66,25 @@ where /// a [RustFuturePoll] value. For each [rust_future_poll] call the continuation will be called /// exactly once. /// +/// If this poll is running on a blocking task queue, then `blocking_task_queue` is the handle for it. +/// Otherwise `blocking_task_queue` is 0. +/// /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] pub unsafe fn rust_future_poll( - handle: Handle, + future: Handle, callback: RustFutureContinuationCallback, data: Handle, + blocking_task_queue: u64, ) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_poll(callback, data) + as HandleAlloc>::get_arc(future).ffi_poll( + callback, + data, + Handle::from_raw(blocking_task_queue), + ) } /// Cancel a Rust future @@ -81,11 +97,11 @@ pub unsafe fn rust_future_poll( /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] -pub unsafe fn rust_future_cancel(handle: Handle) +pub unsafe fn rust_future_cancel(future: Handle) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_cancel() + as HandleAlloc>::get_arc(future).ffi_cancel() } /// Complete a Rust future @@ -99,13 +115,13 @@ where /// - The `T` param must correctly correspond to the [rust_future_new] call. It must /// be `>::ReturnType` pub unsafe fn rust_future_complete( - handle: Handle, + future: Handle, out_status: &mut RustCallStatus, ) -> ReturnType where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_complete(out_status) + as HandleAlloc>::get_arc(future).ffi_complete(out_status) } /// Free a Rust future, dropping the strong reference and releasing all references held by the @@ -114,9 +130,9 @@ where /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] -pub unsafe fn rust_future_free(handle: Handle) +pub unsafe fn rust_future_free(future: Handle) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::consume_handle(handle).ffi_free() + as HandleAlloc>::consume_handle(future).ffi_free() } diff --git a/uniffi_core/src/ffi/rustfuture/scheduler.rs b/uniffi_core/src/ffi/rustfuture/scheduler.rs index f2d5dc097f..34951fe45d 100644 --- a/uniffi_core/src/ffi/rustfuture/scheduler.rs +++ b/uniffi_core/src/ffi/rustfuture/scheduler.rs @@ -2,11 +2,72 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::mem; +use std::{cell::RefCell, future::poll_fn, mem, task::Poll, thread_local}; use super::{RustFutureContinuationCallback, RustFuturePoll}; use crate::Handle; +/// Context of the current `RustFuture::poll` call +struct RustFutureContext { + /// Blocking task queue that the future is being polled on + current_blocking_task_queue: Option, + /// Blocking task queue that we've been asked to schedule the next poll on + scheduled_blocking_task_queue: Option, +} + +thread_local! { + static CONTEXT: RefCell = RefCell::new(RustFutureContext { + current_blocking_task_queue: None, + scheduled_blocking_task_queue: None, + }); +} + +fn with_context R, R>(operation: F) -> R { + CONTEXT.with(|context| operation(&mut *context.borrow_mut())) +} + +/// Schedule work in a blocking task queue +/// +/// The returned future will attempt to arrange for [RustFuture::poll] to be called in the +/// blocking task queue. Once [RustFuture::poll] is running in the blocking task queue, then the future +/// will be ready. +/// +/// There's one tricky issue here: how can we ensure that when the top-level task is run in the +/// blocking task queue, this future will be polled? What happens this future is a child of `join!`, +/// `FuturesUnordered` or some other Future that handles its own polling? +/// +/// We start with an assumption: if we notify the waker then this future will be polled when the +/// top-level task is polled next. If a future does not honor this then we consider it a broken +/// future. This seems fair, since that future would almost certainly break a lot of other future +/// code. +/// +/// Based on that, we can have a simple system. When we're polled: +/// * If we're running in the blocking task queue, then we return `Poll::Ready`. +/// * If not, we return `Poll::Pending` and notify the waker so that the future polls again on +/// the next top-level poll. +/// +/// Note that this can be inefficient if the code awaits multiple blocking task queues at once. We +/// can only run the next poll on one of them, but all futures will be woken up. This seems okay +/// for our intended use cases, it would be pretty odd for a library to use multiple blocking task +/// queues. The alternative would be to store the set of all pending blocking task queues, which +/// seems like complete overkill for our purposes. +pub(super) async fn schedule_in_blocking_task_queue(blocking_task_queue: Handle) { + poll_fn(|future_context| { + with_context(|poll_context| { + if poll_context.current_blocking_task_queue == Some(blocking_task_queue) { + Poll::Ready(()) + } else { + poll_context + .scheduled_blocking_task_queue + .get_or_insert(blocking_task_queue); + future_context.waker().wake_by_ref(); + Poll::Pending + } + }) + }) + .await +} + /// Schedules a [crate::RustFuture] by managing the continuation data /// /// This struct manages the continuation callback and data that comes from the foreign side. It @@ -26,7 +87,7 @@ pub(super) enum Scheduler { Empty, /// `wake()` was called when there was no continuation set. The next time `store` is called, /// the continuation should be immediately invoked with `RustFuturePoll::MaybeReady` - Waked, + ShouldWake, /// The future has been cancelled, any future `store` calls should immediately result in the /// continuation being called with `RustFuturePoll::Ready`. Cancelled, @@ -39,26 +100,85 @@ impl Scheduler { Self::Empty } - /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or - /// `Cancelled` state, call the continuation immediately with the data. - pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: Handle) { + /// Called at the start of a `RustFuture::poll()` call. + /// + /// Returns true if the future is already ready and can skip the `poll()` call (for example, + /// because we're cancelled). + pub(super) fn on_poll_start(&mut self, current_blocking_task_queue: Option) -> bool { + // Reset the context + with_context(|context| { + *context = RustFutureContext { + current_blocking_task_queue, + scheduled_blocking_task_queue: None, + } + }); match self { - Self::Empty => *self = Self::Set(callback, data), - Self::Set(old_callback, old_data) => { + Self::Cancelled => true, + Self::Set(callback, data) => { + // Somehow we're in poll(), but never called our callback. Something is wrong, but + // let's call the callback anyways. That way at least we're keeping our side of + // the contract. log::error!( - "store: observed `Self::Set` state. Is poll() being called from multiple threads at once?" + "Scheduler::on_poll_start: observed `Self::Set` state. Is poll() being called from multiple threads at once?" ); - old_callback(*old_data, RustFuturePoll::Ready); - *self = Self::Set(callback, data); + callback(*data, RustFuturePoll::Ready, 0); + *self = Self::Empty; + false } - Self::Waked => { + Self::ShouldWake => { + // We're about to poll now, so we can unset `ShouldWake` *self = Self::Empty; - callback(data, RustFuturePoll::MaybeReady); + false } - Self::Cancelled => { - callback(data, RustFuturePoll::Ready); + Self::Empty => false, + } + } + + /// Called at the end of a `RustFuture::poll()` call. + /// + /// We will either store the continuation callback or call it immediately. + pub(super) fn on_poll_end( + &mut self, + callback: RustFutureContinuationCallback, + data: Handle, + ready: bool, + ) { + let scheduled_blocking_task_queue = + with_context(|context| context.scheduled_blocking_task_queue); + if ready || self.is_cancelled() { + // The future is ready now, call the callback + callback(data, RustFuturePoll::Ready, 0); + } else if let Some(handle) = scheduled_blocking_task_queue { + // We were asked to schedule the future in a blocking task queue, call the callback + callback(data, RustFuturePoll::MaybeReady, handle.as_raw()); + } else { + match self { + // [Self::wake] was called during the poll + Self::ShouldWake => { + *self = Self::Empty; + callback(data, RustFuturePoll::MaybeReady, 0); + } + // Nothing special happened, store the callback to prepare for the next [Self::wake] + Self::Empty => *self = Self::Set(callback, data), + // This should never happen + Self::Set(old_callback, old_data) => { + log::error!( + "on_poll_end: observed `Self::Set` state which should never happen" + ); + old_callback(*old_data, RustFuturePoll::Ready, 0); + *self = Self::Set(callback, data); + } + // We checked [Self::is_cancelled] above. + Self::Cancelled => unreachable!(), } } + // Reset the context + with_context(|context| { + *context = RustFutureContext { + current_blocking_task_queue: None, + scheduled_blocking_task_queue: None, + } + }); } pub(super) fn wake(&mut self) { @@ -68,11 +188,11 @@ impl Scheduler { let old_data = *old_data; let callback = *callback; *self = Self::Empty; - callback(old_data, RustFuturePoll::MaybeReady); + callback(old_data, RustFuturePoll::MaybeReady, 0); } // If we were in the `Empty` state, then transition to `Waked`. The next time `store` // is called, we will immediately call the continuation. - Self::Empty => *self = Self::Waked, + Self::Empty => *self = Self::ShouldWake, // This is a no-op if we were in the `Cancelled` or `Waked` state. _ => (), } @@ -80,18 +200,42 @@ impl Scheduler { pub(super) fn cancel(&mut self) { if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) { - callback(old_data, RustFuturePoll::Ready); + callback(old_data, RustFuturePoll::Ready, 0); } } - pub(super) fn is_cancelled(&self) -> bool { matches!(self, Self::Cancelled) } } -// The `Handle` data pointer references an object on the foreign side. -// This object must be `Sync` in Rust terminology -- it must be safe for us to pass the pointer to the continuation callback from any thread. -// If the foreign side upholds their side of the contract, then `Scheduler` is Send + Sync. +/// Represents a foreign-managed blocking task queue that we can use to schedule futures in +/// +/// On the foreign side this is a Kotlin `CoroutineContext`, Python `Executor` or Swift +/// `DispatchQueue`. Foreign code can pass one of those objects into exported Rust functions and +/// UniFFI will create a `Handle` to represent it. The Rust code on the other side gets a +/// `BlockingTaskQueue` instance. +/// +/// Rust async code can call [BlockingTaskQueue::run_blocking] to run a closure in that +/// blocking task queue. Use this for functions with blocking operations that should not be executed +/// in a normal async context. Some examples are non-async file/network operations, long-running +/// CPU-bound tasks, blocking database operations, etc. +pub struct BlockingTaskQueue(Handle); + +impl BlockingTaskQueue { + pub fn new(handle: Handle) -> Self { + Self(handle) + } -unsafe impl Send for Scheduler {} -unsafe impl Sync for Scheduler {} + pub fn into_handle(self) -> Handle { + self.0 + } + + /// Run a closure in a blocking task queue + pub async fn run_blocking(&self, f: F) -> R + where + F: FnOnce() -> R, + { + schedule_in_blocking_task_queue(self.0).await; + f() + } +} diff --git a/uniffi_core/src/ffi/rustfuture/tests.rs b/uniffi_core/src/ffi/rustfuture/tests.rs index 475a8360e8..67c36d0bd8 100644 --- a/uniffi_core/src/ffi/rustfuture/tests.rs +++ b/uniffi_core/src/ffi/rustfuture/tests.rs @@ -65,17 +65,42 @@ fn channel() -> (Sender, Arc>) { } /// Poll a Rust future and get an OnceCell that's set when the continuation is called -fn poll(rust_future: &Arc>) -> Arc> { +fn poll(rust_future: &Arc>) -> Arc> { let cell = Arc::new(OnceCell::new()); - let handle = - as HandleAlloc>::new_handle(Arc::clone(&cell)); - rust_future.clone().ffi_poll(poll_continuation, handle); + let handle = as HandleAlloc>::new_handle( + Arc::clone(&cell), + ); + rust_future + .clone() + .ffi_poll(poll_continuation, handle, None); + cell +} + +/// Like poll, but simulate `poll()` being called from a blocking task queue +fn poll_from_blocking_task_queue( + rust_future: &Arc>, + blocking_task_queue: Handle, +) -> Arc> { + let cell = Arc::new(OnceCell::new()); + let handle = as HandleAlloc>::new_handle( + Arc::clone(&cell), + ); + rust_future + .clone() + .ffi_poll(poll_continuation, handle, Some(blocking_task_queue)); cell } -extern "C" fn poll_continuation(handle: Handle, code: RustFuturePoll) { - let cell = as HandleAlloc>::get_arc(handle); - cell.set(code).expect("Error setting OnceCell"); +extern "C" fn poll_continuation( + continuation_data: Handle, + code: RustFuturePoll, + blocking_task_queue: u64, +) { + let cell = as HandleAlloc>::get_arc( + continuation_data, + ); + cell.set((code, blocking_task_queue)) + .expect("Error setting OnceCell"); } fn complete(rust_future: Arc>) -> (RustBuffer, RustCallStatus) { @@ -84,25 +109,44 @@ fn complete(rust_future: Arc>) -> (RustBuffer, Rus (return_value, out_status_code) } +fn check_continuation_not_called(once_cell: &OnceCell<(RustFuturePoll, u64)>) { + assert_eq!(once_cell.get(), None); +} + +fn check_continuation_called( + once_cell: &OnceCell<(RustFuturePoll, u64)>, + poll_result: RustFuturePoll, +) { + assert_eq!(once_cell.get(), Some(&(poll_result, 0))); +} + +fn check_continuation_called_with_blocking_task_queue_handle( + once_cell: &OnceCell<(RustFuturePoll, u64)>, + poll_result: RustFuturePoll, + handle: Handle, +) { + assert_eq!(once_cell.get(), Some(&(poll_result, handle.as_raw()))); +} + #[test] fn test_success() { let (sender, rust_future) = channel(); // Test polling the rust future before it's ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.wake(); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // Test polling the rust future when it's ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.send(Ok("All done".into())); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // Future polls should immediately return ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); // Complete the future let (return_buf, call_status) = complete(rust_future); @@ -118,12 +162,12 @@ fn test_error() { let (sender, rust_future) = channel(); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.send(Err("Something went wrong".into())); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); let (_, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Error); @@ -145,14 +189,14 @@ fn test_cancel() { let (_sender, rust_future) = channel(); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); rust_future.ffi_cancel(); // Cancellation should immediately invoke the callback with RustFuturePoll::Ready - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); // Future polls should immediately invoke the callback with RustFuturePoll::Ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); let (_, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Cancelled); @@ -188,7 +232,7 @@ fn test_complete_with_stored_continuation() { let continuation_result = poll(&rust_future); rust_future.ffi_free(); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); } // Test what happens if we see a `wake()` call while we're polling the future. This can @@ -211,10 +255,47 @@ fn test_wake_during_poll() { let rust_future: Arc> = RustFuture::new(future, crate::UniFfiTag); let continuation_result = poll(&rust_future); // The continuation function should called immediately - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // A second poll should finish the future let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); + let (return_buf, call_status) = complete(rust_future); + assert_eq!(call_status.code, RustCallStatusCode::Success); + assert_eq!( + >::try_lift(return_buf).unwrap(), + "All done" + ); +} + +#[test] +fn test_blocking_task() { + let blocking_task_queue_handle = Handle::from_raw_unchecked(1001); + let future = async move { + schedule_in_blocking_task_queue(blocking_task_queue_handle).await; + "All done".to_owned() + }; + let rust_future: Arc> = RustFuture::new(future, crate::UniFfiTag); + // On the first poll, the future should not be ready and it should ask to be scheduled in the + // blocking task queue + let continuation_result = poll(&rust_future); + check_continuation_called_with_blocking_task_queue_handle( + &continuation_result, + RustFuturePoll::MaybeReady, + blocking_task_queue_handle, + ); + // If we poll it again not in a blocking task queue, then we get the same result + let continuation_result = poll(&rust_future); + check_continuation_called_with_blocking_task_queue_handle( + &continuation_result, + RustFuturePoll::MaybeReady, + blocking_task_queue_handle, + ); + // When we poll it in the blocking task queue, then the future is ready + let continuation_result = + poll_from_blocking_task_queue(&rust_future, blocking_task_queue_handle); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); + + // Complete the future let (return_buf, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Success); assert_eq!( diff --git a/uniffi_core/src/ffi_converter_impls.rs b/uniffi_core/src/ffi_converter_impls.rs index 2460f87cb0..d3d8dd112c 100644 --- a/uniffi_core/src/ffi_converter_impls.rs +++ b/uniffi_core/src/ffi_converter_impls.rs @@ -23,7 +23,7 @@ /// "UT" means an abitrary `UniFfiTag` type. use crate::{ check_remaining, derive_ffi_traits, ffi_converter_rust_buffer_lift_and_lower, metadata, - ConvertError, FfiConverter, ForeignExecutor, Handle, Lift, LiftReturn, Lower, LowerReturn, + BlockingTaskQueue, ConvertError, FfiConverter, Handle, Lift, LiftReturn, Lower, LowerReturn, MetadataBuffer, Result, RustBuffer, UnexpectedUniFFICallbackError, }; use anyhow::bail; @@ -405,25 +405,21 @@ where .concat(V::TYPE_ID_META); } -/// FFI support for [ForeignExecutor] -/// -/// These are passed over the FFI as opaque pointer-sized types representing the foreign executor. -/// The foreign bindings may use an actual pointer to the executor object, or a usized integer -/// handle. -unsafe impl FfiConverter for ForeignExecutor { +/// FFI support for [BlockingTaskQueue] +unsafe impl FfiConverter for BlockingTaskQueue { type FfiType = Handle; // Passing these back to the foreign bindings is currently not supported - fn lower(executor: Self) -> Self::FfiType { - executor.handle + fn lower(blocking_task_queue: Self) -> Self::FfiType { + blocking_task_queue.into_handle() } - fn write(executor: Self, buf: &mut Vec) { - buf.put_u64(executor.handle.as_raw()) + fn write(blocking_task_queue: Self, buf: &mut Vec) { + buf.put_u64(blocking_task_queue.into_handle().as_raw()) } - fn try_lift(executor: Self::FfiType) -> Result { - Ok(ForeignExecutor::new(executor)) + fn try_lift(handle: Self::FfiType) -> Result { + Ok(BlockingTaskQueue::new(handle)) } fn try_read(buf: &mut &[u8]) -> Result { @@ -431,7 +427,7 @@ unsafe impl FfiConverter for ForeignExecutor { } const TYPE_ID_META: MetadataBuffer = - MetadataBuffer::from_code(metadata::codes::TYPE_FOREIGN_EXECUTOR); + MetadataBuffer::from_code(metadata::codes::TYPE_BLOCKING_TASK_QUEUE); } derive_ffi_traits!(blanket u8); @@ -448,7 +444,7 @@ derive_ffi_traits!(blanket bool); derive_ffi_traits!(blanket String); derive_ffi_traits!(blanket Duration); derive_ffi_traits!(blanket SystemTime); -derive_ffi_traits!(blanket ForeignExecutor); +derive_ffi_traits!(blanket BlockingTaskQueue); // For composite types, derive LowerReturn, LiftReturn, etc, from Lift/Lower. // diff --git a/uniffi_core/src/metadata.rs b/uniffi_core/src/metadata.rs index 770d2b36d5..d69c0449da 100644 --- a/uniffi_core/src/metadata.rs +++ b/uniffi_core/src/metadata.rs @@ -67,7 +67,7 @@ pub mod codes { pub const TYPE_CUSTOM: u8 = 22; pub const TYPE_RESULT: u8 = 23; pub const TYPE_FUTURE: u8 = 24; - pub const TYPE_FOREIGN_EXECUTOR: u8 = 25; + pub const TYPE_BLOCKING_TASK_QUEUE: u8 = 25; pub const TYPE_UNIT: u8 = 255; // Literal codes for LiteralMetadata - note that we don't support diff --git a/uniffi_macros/src/setup_scaffolding.rs b/uniffi_macros/src/setup_scaffolding.rs index fbce21a1e9..ee94c67df9 100644 --- a/uniffi_macros/src/setup_scaffolding.rs +++ b/uniffi_macros/src/setup_scaffolding.rs @@ -20,8 +20,6 @@ pub fn setup_scaffolding(namespace: String) -> Result { let ffi_rustbuffer_free_ident = format_ident!("ffi_{module_path}_rustbuffer_free"); let ffi_rustbuffer_reserve_ident = format_ident!("ffi_{module_path}_rustbuffer_reserve"); let reexport_hack_ident = format_ident!("{module_path}_uniffi_reexport_hack"); - let ffi_foreign_executor_callback_set_ident = - format_ident!("ffi_{module_path}_foreign_executor_callback_set"); let ffi_rust_future_scaffolding_fns = rust_future_scaffolding_fns(&module_path); Ok(quote! { @@ -87,13 +85,6 @@ pub fn setup_scaffolding(namespace: String) -> Result { uniffi::ffi::uniffi_rustbuffer_reserve(buf, additional, call_status) } - #[allow(clippy::missing_safety_doc, missing_docs)] - #[doc(hidden)] - #[no_mangle] - pub extern "C" fn #ffi_foreign_executor_callback_set_ident(callback: uniffi::ffi::ForeignExecutorCallback) { - uniffi::ffi::foreign_executor_callback_set(callback) - } - #ffi_rust_future_scaffolding_fns // Code to re-export the UniFFI scaffolding functions. @@ -175,8 +166,8 @@ fn rust_future_scaffolding_fns(module_path: &str) -> TokenStream { #[allow(clippy::missing_safety_doc, missing_docs)] #[doc(hidden)] #[no_mangle] - pub unsafe extern "C" fn #ffi_rust_future_poll(handle: ::uniffi::Handle, callback: ::uniffi::RustFutureContinuationCallback, data: ::uniffi::Handle) { - ::uniffi::ffi::rust_future_poll::<#return_type, crate::UniFfiTag>(handle, callback, data); + pub unsafe extern "C" fn #ffi_rust_future_poll(handle: ::uniffi::Handle, callback: ::uniffi::RustFutureContinuationCallback, data: ::uniffi::Handle, blocking_task_queue: u64) { + ::uniffi::ffi::rust_future_poll::<#return_type, crate::UniFfiTag>(handle, callback, data, blocking_task_queue); } #[allow(clippy::missing_safety_doc, missing_docs)] diff --git a/uniffi_meta/src/metadata.rs b/uniffi_meta/src/metadata.rs index 6e490a4866..a5a04bfb16 100644 --- a/uniffi_meta/src/metadata.rs +++ b/uniffi_meta/src/metadata.rs @@ -50,7 +50,7 @@ pub mod codes { pub const TYPE_CUSTOM: u8 = 22; pub const TYPE_RESULT: u8 = 23; //pub const TYPE_FUTURE: u8 = 24; - pub const TYPE_FOREIGN_EXECUTOR: u8 = 25; + pub const TYPE_BLOCKING_TASK_QUEUE: u8 = 25; pub const TYPE_UNIT: u8 = 255; // Literal codes diff --git a/uniffi_meta/src/reader.rs b/uniffi_meta/src/reader.rs index bf6525f2b5..08a078eb42 100644 --- a/uniffi_meta/src/reader.rs +++ b/uniffi_meta/src/reader.rs @@ -122,7 +122,7 @@ impl<'a> MetadataReader<'a> { codes::TYPE_STRING => Type::String, codes::TYPE_DURATION => Type::Duration, codes::TYPE_SYSTEM_TIME => Type::Timestamp, - codes::TYPE_FOREIGN_EXECUTOR => Type::ForeignExecutor, + codes::TYPE_BLOCKING_TASK_QUEUE => Type::BlockingTaskQueue, codes::TYPE_RECORD => Type::Record { module_path: self.read_string()?, name: self.read_string()?, diff --git a/uniffi_meta/src/types.rs b/uniffi_meta/src/types.rs index 24f8a6f2a8..2e81b0c835 100644 --- a/uniffi_meta/src/types.rs +++ b/uniffi_meta/src/types.rs @@ -85,7 +85,7 @@ pub enum Type { // How the object is implemented. imp: ObjectImpl, }, - ForeignExecutor, + BlockingTaskQueue, // Types defined in the component API, each of which has a string name. Record { module_path: String, diff --git a/uniffi_udl/src/resolver.rs b/uniffi_udl/src/resolver.rs index 14a7a4c6f1..1409c3a6ff 100644 --- a/uniffi_udl/src/resolver.rs +++ b/uniffi_udl/src/resolver.rs @@ -209,7 +209,7 @@ pub(crate) fn resolve_builtin_type(name: &str) -> Option { "f64" => Some(Type::Float64), "timestamp" => Some(Type::Timestamp), "duration" => Some(Type::Duration), - "ForeignExecutor" => Some(Type::ForeignExecutor), + "BlockingTaskQueue" => Some(Type::BlockingTaskQueue), _ => None, } }