diff --git a/Cargo.lock b/Cargo.lock index a3a018f69b..9ed8ce0b0e 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -612,26 +612,53 @@ version = "2.9.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "0845fa252299212f0389d64ba26f34fa32cfe41588355f21ed507c59a0f64541" +[[package]] +name = "futures" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "da0290714b38af9b4a7b094b8a37086d1b4e61f2df9122c3cad2577669145335" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + [[package]] name = "futures-channel" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "955518d47e09b25bbebc7a18df10b81f0c766eaf4c4f1cccef2fca5f2a4fb5f2" +checksum = "ff4dd66668b557604244583e3e1e1eada8c5c2e96a6d0d6653ede395b78bbacb" dependencies = [ "futures-core", + "futures-sink", ] [[package]] name = "futures-core" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4bca583b7e26f571124fe5b7561d49cb2868d79116cfa0eefce955557c6fee8c" +checksum = "eb1d22c66e66d9d72e1758f0bd7d4fd0bee04cad842ee34587d68c07e45d088c" + +[[package]] +name = "futures-executor" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0f4fb8693db0cf099eadcca0efe2a5a22e4550f98ed16aba6c48700da29597bc" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] [[package]] name = "futures-io" -version = "0.3.28" +version = "0.3.29" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4fff74096e71ed47f8e023204cfd0aa1289cd54ae5430a9523be060cdb849964" +checksum = "8bf34a163b5c4c52d0478a4d757da8fb65cabef42ba90515efee0f6f9fa45aaa" [[package]] name = "futures-lite" @@ -648,6 +675,47 @@ dependencies = [ "waker-fn", ] +[[package]] +name = "futures-macro" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "53b153fd91e4b0147f4aced87be237c98248656bb01050b96bf3ee89220a8ddb" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e36d3378ee38c2a36ad710c5d30c2911d752cb941c00c72dbabfb786a7970817" + +[[package]] +name = "futures-task" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "efd193069b0ddadc69c46389b740bbccdd97203899b48d09c5f7969591d6bae2" + +[[package]] +name = "futures-util" +version = "0.3.29" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a19526d624e703a3179b3d322efec918b6246ea0fa51d41124525f00f1cc8104" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + [[package]] name = "generator" version = "0.7.5" @@ -1737,17 +1805,11 @@ dependencies = [ "url", ] -[[package]] -name = "uniffi-fixture-foreign-executor" -version = "0.23.0" -dependencies = [ - "uniffi", -] - [[package]] name = "uniffi-fixture-futures" version = "0.21.0" dependencies = [ + "futures", "once_cell", "thiserror", "tokio", diff --git a/Cargo.toml b/Cargo.toml index 4d919af4a0..c6e51a03bb 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -31,7 +31,6 @@ members = [ "fixtures/ext-types/lib", "fixtures/ext-types/proc-macro-lib", - "fixtures/foreign-executor", "fixtures/keywords/kotlin", "fixtures/keywords/rust", "fixtures/keywords/swift", diff --git a/docs/manual/src/futures.md b/docs/manual/src/futures.md index ba93bae87a..374a902432 100644 --- a/docs/manual/src/futures.md +++ b/docs/manual/src/futures.md @@ -46,4 +46,62 @@ In Rust `Future` terminology this means the foreign bindings supply the "executo There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read. -See the [foreign-executor fixture](https://github.com/mozilla/uniffi-rs/tree/main/fixtures/foreign-executor) for more implementation details. +## Blocking tasks + +Rust executors are designed around an assumption that the `Future::poll` function will return quickly. +This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads. +Foreign executors make similar assumptions and sometimes more extreme ones. +For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing. + +This raises the question of how async code can interact with blocking code. +"blocking" here means code that preforms blocking IO, long-running computations without `await` breaks, etc. +To support this, UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block. + +On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code. +It's `run_blocking` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function. +It inputs a closure and runs it in the `BlockingTaskQueue`. +This closure can reference the outside scope (it does not need to be `'static`). +For example: + +```rust +#[derive(uniffi::Object)] +struct DataStore { + // Used to run blocking tasks + queue: uniffi::BlockingTaskQueue, + // Low-level DB object with blocking methods + db: Mutex, +} + +#[uniffi::export] +impl DataStore { + #[uniffi::constructor] + fn new(queue: uniffi::BlockingTaskQueue) -> Self { + Self { + queue, + db: Mutex::new(Database::new()) + } + } + + fn fetch_all_items(&self) -> Vec { + queue.run_blocking(|| db.lock().fetch_all_items()) + } +} +``` + +On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class. + +### Kotlin +Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`. +Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice. +A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`. + +### Swift +Swift uses `DispatchQueue` for its `BlockingTaskQueue`. +The `DispatchQueue` should be concurrent for all in almost all circumstances -- the user-initiated global queue is normally a good choice. +A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`. + +### Python + +Python uses a `futures.Executor` for its `BlockingTaskQueue`. +`ThreadPoolExecutor` is typically a good choice. +A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`. diff --git a/fixtures/foreign-executor/Cargo.toml b/fixtures/foreign-executor/Cargo.toml deleted file mode 100644 index 57d7f4fd2b..0000000000 --- a/fixtures/foreign-executor/Cargo.toml +++ /dev/null @@ -1,19 +0,0 @@ -[package] -name = "uniffi-fixture-foreign-executor" -version = "0.23.0" -edition = "2021" -license = "MPL-2.0" -publish = false - -[lib] -crate-type = ["lib", "cdylib"] -name = "uniffi_fixture_foreign_executor" - -[dependencies] -uniffi = { path = "../../uniffi", version = "0.25" } - -[build-dependencies] -uniffi = { path = "../../uniffi", version = "0.25", features = ["build"] } - -[dev-dependencies] -uniffi = { path = "../../uniffi", version = "0.25", features = ["bindgen-tests"] } diff --git a/fixtures/foreign-executor/build.rs b/fixtures/foreign-executor/build.rs deleted file mode 100644 index 908761bdea..0000000000 --- a/fixtures/foreign-executor/build.rs +++ /dev/null @@ -1,7 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -fn main() { - uniffi::generate_scaffolding("src/foreign_executor.udl").unwrap(); -} diff --git a/fixtures/foreign-executor/src/foreign_executor.udl b/fixtures/foreign-executor/src/foreign_executor.udl deleted file mode 100644 index 2933e56c6a..0000000000 --- a/fixtures/foreign-executor/src/foreign_executor.udl +++ /dev/null @@ -1,7 +0,0 @@ -namespace fixture_foreign_executor { }; - -interface ForeignExecutorTester { - constructor(ForeignExecutor executor); - [Name=new_from_sequence] - constructor(sequence executors); -}; diff --git a/fixtures/foreign-executor/src/lib.rs b/fixtures/foreign-executor/src/lib.rs deleted file mode 100644 index 20d788addf..0000000000 --- a/fixtures/foreign-executor/src/lib.rs +++ /dev/null @@ -1,71 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -use std::sync::{Arc, Mutex}; -use std::thread; -use std::time::Instant; -use uniffi::ForeignExecutor; - -pub struct ForeignExecutorTester { - executor: ForeignExecutor, - last_result: Arc>>, -} - -// All constructor have to be defined in UDL for now -impl ForeignExecutorTester { - fn new(executor: ForeignExecutor) -> Self { - Self { - executor, - last_result: Arc::new(Mutex::new(None)), - } - } - - // Test inputting the ForeignExecutor from a Vec. This tests that they can be written to a - // `RustBuffer` - fn new_from_sequence(executors: Vec) -> Self { - assert_eq!(executors.len(), 1); - Self::new(executors.into_iter().next().unwrap()) - } -} - -#[uniffi::export] -impl ForeignExecutorTester { - /// Schedule a fire-and-forget task to run the test - fn schedule_test(&self, delay: u32) { - let last_result = self.last_result.clone(); - *last_result.lock().unwrap() = None; - // Start a thread to schedule the call. This tests if the foreign bindings can handle - // schedule callbacks from a thread that they don't manage. - thread::scope(move |scope| { - scope.spawn(move || { - let start_time = Instant::now(); - let initial_thread_id = thread::current().id(); - // Schedule a call with the foreign executor - self.executor.schedule(delay, move || { - // Return data on when/where the call happened. We check that this matches the - // expectations in the foreign bindings tests - let call_happened_in_different_thread = - thread::current().id() != initial_thread_id; - let delay_ms = start_time.elapsed().as_millis() as u32; - *last_result.lock().unwrap() = Some(TestResult { - call_happened_in_different_thread, - delay_ms, - }); - }); - }); - }); - } - - fn get_last_result(&self) -> Option { - self.last_result.lock().unwrap().take() - } -} - -#[derive(uniffi::Record)] -pub struct TestResult { - pub call_happened_in_different_thread: bool, - pub delay_ms: u32, -} - -uniffi::include_scaffolding!("foreign_executor"); diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts deleted file mode 100644 index 404c06a463..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.kts +++ /dev/null @@ -1,39 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import uniffi.fixture_foreign_executor.* -import kotlinx.coroutines.CoroutineScope -import kotlinx.coroutines.Dispatchers -import kotlinx.coroutines.delay -import kotlinx.coroutines.launch -import kotlinx.coroutines.runBlocking - - -val coroutineScope = CoroutineScope(Dispatchers.IO) -// Test scheduling calls with no delay -runBlocking { - val tester = ForeignExecutorTester(coroutineScope) - launch { - tester.scheduleTest(0U) - } - delay(100L) - val result = tester.getLastResult() ?: throw RuntimeException("ForeignExecutorTester.getLastResult() returned null") - assert(result.callHappenedInDifferentThread) - assert(result.delayMs <= 100U) - tester.close() -} - -// Test scheduling calls with a delay and using the newFromSequence constructor -runBlocking { - val tester = ForeignExecutorTester.newFromSequence(listOf(coroutineScope)) - launch { - tester.scheduleTest(100U) - } - delay(200L) - val result = tester.getLastResult() ?: throw RuntimeException("ForeignExecutorTester.getLastResult() returned null") - assert(result.callHappenedInDifferentThread) - assert(result.delayMs >= 100U) - assert(result.delayMs <= 200U) - tester.close() -} diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py deleted file mode 100644 index fdd06be520..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.py +++ /dev/null @@ -1,50 +0,0 @@ -# This Source Code Form is subject to the terms of the Mozilla Public -# License, v. 2.0. If a copy of the MPL was not distributed with this -# file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import asyncio -import unittest -import weakref -from fixture_foreign_executor import ForeignExecutorTester - -class TestForeignExecutor(unittest.TestCase): - def test_schedule(self): - async def run_test(constructor, delay): - if constructor == "primary": - tester = ForeignExecutorTester(asyncio.get_running_loop()) - elif constructor == "new_from_sequence": - tester = ForeignExecutorTester.new_from_sequence([asyncio.get_running_loop()]) - else: - raise AssertionError(f"Unknown constructor: {constructor}") - tester.schedule_test(delay) - await asyncio.sleep((delay / 1000) + 0.1) - return tester.get_last_result() - - # Test no delay and lifting the foreign executor directly - result = asyncio.run(run_test("primary", 0)) - self.assertTrue(result.call_happened_in_different_thread) - self.assertTrue(result.delay_ms <= 1) - - # Test no delay and reading the foreign executor from a list - result = asyncio.run(run_test("new_from_sequence", 10)) - self.assertTrue(result.call_happened_in_different_thread) - self.assertTrue(9 <= result.delay_ms <= 11) - - def test_reference_counts(self): - # Create an event loop - loop = asyncio.new_event_loop() - loop_ref = weakref.ref(loop) - # Create ForeignExecutorTester that stores the loop - tester = ForeignExecutorTester(loop) - tester2 = ForeignExecutorTester.new_from_sequence([loop]), - # Test that testers hold a reference to the loop. After deleting the loop, the weakref should still be alive - loop.close() - del loop - self.assertNotEqual(loop_ref(), None, "ForeignExecutor didn't take a reference to the event loop") - # Deleting testers should cause the loop to be destroyed - del tester - del tester2 - self.assertEqual(loop_ref(), None, "ForeignExecutor didn't release a reference to the event loop") - -if __name__=='__main__': - unittest.main() diff --git a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift b/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift deleted file mode 100644 index 4a230d0f60..0000000000 --- a/fixtures/foreign-executor/tests/bindings/test_foreign_executor.swift +++ /dev/null @@ -1,42 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -import Foundation -import fixture_foreign_executor - -func runTest(tester: ForeignExecutorTester, delay: UInt32) async -> TestResult { - let handle = Task { () -> TestResult in - tester.scheduleTest(delay: delay) - try! await Task.sleep(nanoseconds: numericCast((delay + 10) * 1000000)) - return tester.getLastResult()! - } - return await handle.value -} - -Task { - // Test scheduling with no delay - let result = await runTest( - tester: ForeignExecutorTester( - executor: UniFfiForeignExecutor(priority: TaskPriority.background) - ), - delay: 0 - ) - assert(result.callHappenedInDifferentThread) - assert(result.delayMs <= 1) - - // Test scheduling with delay and an executor created from a list - let result2 = await runTest( - tester: ForeignExecutorTester.newFromSequence( - executors: [UniFfiForeignExecutor(priority: TaskPriority.background)] - ), - delay: 1000 - ) - assert(result2.callHappenedInDifferentThread) - assert(result2.delayMs >= 90) - assert(result2.delayMs <= 110) -} - - - -// No need to test reference counting, since `UniFfiForeignExecutor` on Swift is just a value type diff --git a/fixtures/foreign-executor/tests/test_generated_bindings.rs b/fixtures/foreign-executor/tests/test_generated_bindings.rs deleted file mode 100644 index 5ff08c627a..0000000000 --- a/fixtures/foreign-executor/tests/test_generated_bindings.rs +++ /dev/null @@ -1,7 +0,0 @@ -uniffi::build_foreign_language_testcases!( - "tests/bindings/test_foreign_executor.py", - "tests/bindings/test_foreign_executor.kts", - // Disabled because of intermittent CI failures - // (https://github.com/mozilla/uniffi-rs/issues/1536) - // "tests/bindings/test_foreign_executor.swift", -); diff --git a/fixtures/futures/Cargo.toml b/fixtures/futures/Cargo.toml index 78b08cb689..67c850e9f0 100644 --- a/fixtures/futures/Cargo.toml +++ b/fixtures/futures/Cargo.toml @@ -16,6 +16,7 @@ path = "src/bin.rs" [dependencies] uniffi = { path = "../../uniffi", version = "0.25", features = ["tokio", "cli"] } +futures = "0.3.29" thiserror = "1.0" tokio = { version = "1.24.1", features = ["time", "sync"] } once_cell = "1.18.0" diff --git a/fixtures/futures/src/lib.rs b/fixtures/futures/src/lib.rs index 39a521495e..b3be27f551 100644 --- a/fixtures/futures/src/lib.rs +++ b/fixtures/futures/src/lib.rs @@ -11,6 +11,8 @@ use std::{ time::Duration, }; +use futures::stream::{FuturesUnordered, StreamExt}; + /// Non-blocking timer future. pub struct TimerFuture { shared_state: Arc>, @@ -326,4 +328,53 @@ pub async fn use_shared_resource(options: SharedResourceOptions) -> Result<(), A Ok(()) } +/// Async function that uses a blocking task queue to do its work +#[uniffi::export] +pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 { + queue.run_blocking(|| value * value).await +} + +/// Same as before, but this one runs multiple tasks +#[uniffi::export] +pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec) -> Vec { + // Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with. + // In particular, if we don't notify the waker then FuturesUnordered will not poll again. + let mut futures: FuturesUnordered<_> = (0..items.len()) + .into_iter() + .map(|i| { + // Test that we can use references from the surrounding scope + let items = &items; + queue.run_blocking(move || items[i] * items[i]) + }) + .collect(); + let mut results = vec![]; + while let Some(result) = futures.next().await { + results.push(result); + } + results.sort(); + results +} + +/// ...and this one uses multiple BlockingTaskQueues +#[uniffi::export] +pub async fn calc_squares_multi_queue( + queues: Vec, + items: Vec, +) -> Vec { + let mut futures: FuturesUnordered<_> = (0..items.len()) + .into_iter() + .map(|i| { + // Test that we can use references from the surrounding scope + let items = &items; + queues[i].run_blocking(move || items[i] * items[i]) + }) + .collect(); + let mut results = vec![]; + while let Some(result) = futures.next().await { + results.push(result); + } + results.sort(); + results +} + uniffi::include_scaffolding!("futures"); diff --git a/fixtures/futures/tests/bindings/test_futures.kts b/fixtures/futures/tests/bindings/test_futures.kts index 810bb40f41..61538b720c 100644 --- a/fixtures/futures/tests/bindings/test_futures.kts +++ b/fixtures/futures/tests/bindings/test_futures.kts @@ -1,9 +1,22 @@ import uniffi.fixture.futures.* +import java.util.concurrent.Executors import kotlinx.coroutines.* import kotlin.system.* +fun runAsyncTest(test: suspend CoroutineScope.() -> Unit) { + val initialBlockingTaskQueueHandleCount = uniffiBlockingTaskQueueHandleCount() + val initialPollHandleCount = uniffiPollHandleCount() + val time = runBlocking { + measureTimeMillis { + test() + } + } + assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueHandleCount) + assert(uniffiPollHandleCount() == initialPollHandleCount) +} + // init UniFFI to get good measurements after that -runBlocking { +runAsyncTest { val time = measureTimeMillis { alwaysReady() } @@ -24,7 +37,7 @@ fun assertApproximateTime(actualTime: Long, expectedTime: Int, testName: String } // Test `always_ready`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = alwaysReady() @@ -35,7 +48,7 @@ runBlocking { } // Test `void`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = void() @@ -46,7 +59,7 @@ runBlocking { } // Test `sleep`. -runBlocking { +runAsyncTest { val time = measureTimeMillis { sleep(200U) } @@ -55,7 +68,7 @@ runBlocking { } // Test sequential futures. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = sayAfter(100U, "Alice") val resultBob = sayAfter(200U, "Bob") @@ -68,7 +81,7 @@ runBlocking { } // Test concurrent futures. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = async { sayAfter(100U, "Alice") } val resultBob = async { sayAfter(200U, "Bob") } @@ -81,7 +94,7 @@ runBlocking { } // Test async methods. -runBlocking { +runAsyncTest { val megaphone = newMegaphone() val time = measureTimeMillis { val resultAlice = megaphone.sayAfter(200U, "Alice") @@ -92,7 +105,7 @@ runBlocking { assertApproximateTime(time, 200, "async methods") } -runBlocking { +runAsyncTest { val megaphone = newMegaphone() val time = measureTimeMillis { val resultAlice = sayAfterWithMegaphone(megaphone, 200U, "Alice") @@ -104,7 +117,7 @@ runBlocking { } // Test async method returning optional object -runBlocking { +runAsyncTest { val megaphone = asyncMaybeNewMegaphone(true) assert(megaphone != null) @@ -113,7 +126,7 @@ runBlocking { } // Test with the Tokio runtime. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val resultAlice = sayAfterWithTokio(200U, "Alice") @@ -124,7 +137,7 @@ runBlocking { } // Test fallible function/method. -runBlocking { +runAsyncTest { val time1 = measureTimeMillis { try { fallibleMe(false) @@ -189,7 +202,7 @@ runBlocking { } // Test record. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val result = newMyRecord("foo", 42U) @@ -203,7 +216,7 @@ runBlocking { } // Test a broken sleep. -runBlocking { +runAsyncTest { val time = measureTimeMillis { brokenSleep(100U, 0U) // calls the waker twice immediately sleep(100U) // wait for possible failure @@ -217,7 +230,7 @@ runBlocking { // Test a future that uses a lock and that is cancelled. -runBlocking { +runAsyncTest { val time = measureTimeMillis { val job = launch { useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U)) @@ -236,7 +249,7 @@ runBlocking { } // Test a future that uses a lock and that is not cancelled. -runBlocking { +runAsyncTest { val time = measureTimeMillis { useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U)) @@ -244,3 +257,26 @@ runBlocking { } println("useSharedResource (not canceled): ${time}ms") } + +// Test blocking task queues +runAsyncTest { + withTimeout(1000) { + assert(calcSquare(Dispatchers.IO, 20) == 400) + } + + withTimeout(1000) { + assert(calcSquares(Dispatchers.IO, listOf(1, -2, 3)) == listOf(1, 4, 9)) + } + + val executors = listOf( + Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor(), + Executors.newSingleThreadExecutor(), + ) + withTimeout(1000) { + assert(calcSquaresMultiQueue(executors.map { it.asCoroutineDispatcher() }, listOf(1, -2, 3)) == listOf(1, 4, 9)) + } + for (executor in executors) { + executor.shutdown() + } +} diff --git a/fixtures/futures/tests/bindings/test_futures.py b/fixtures/futures/tests/bindings/test_futures.py index bfbeba86f8..3c5f0cec48 100644 --- a/fixtures/futures/tests/bindings/test_futures.py +++ b/fixtures/futures/tests/bindings/test_futures.py @@ -1,25 +1,31 @@ +import futures from futures import * +import contextlib import unittest from datetime import datetime import asyncio +from concurrent.futures import ThreadPoolExecutor def now(): return datetime.now() class TestFutures(unittest.TestCase): def test_always_ready(self): + @self.check_handle_counts() async def test(): self.assertEqual(await always_ready(), True) asyncio.run(test()) def test_void(self): + @self.check_handle_counts() async def test(): self.assertEqual(await void(), None) asyncio.run(test()) def test_sleep(self): + @self.check_handle_counts() async def test(): t0 = now() await sleep(2000) @@ -31,6 +37,7 @@ async def test(): asyncio.run(test()) def test_sequential_futures(self): + @self.check_handle_counts() async def test(): t0 = now() result_alice = await say_after(100, 'Alice') @@ -45,6 +52,7 @@ async def test(): asyncio.run(test()) def test_concurrent_tasks(self): + @self.check_handle_counts() async def test(): alice = asyncio.create_task(say_after(100, 'Alice')) bob = asyncio.create_task(say_after(200, 'Bob')) @@ -62,6 +70,7 @@ async def test(): asyncio.run(test()) def test_async_methods(self): + @self.check_handle_counts() async def test(): megaphone = new_megaphone() t0 = now() @@ -75,6 +84,7 @@ async def test(): asyncio.run(test()) def test_async_object_param(self): + @self.check_handle_counts() async def test(): megaphone = new_megaphone() t0 = now() @@ -88,6 +98,7 @@ async def test(): asyncio.run(test()) def test_with_tokio_runtime(self): + @self.check_handle_counts() async def test(): t0 = now() result_alice = await say_after_with_tokio(200, 'Alice') @@ -100,6 +111,7 @@ async def test(): asyncio.run(test()) def test_fallible(self): + @self.check_handle_counts() async def test(): result = await fallible_me(False) self.assertEqual(result, 42) @@ -124,6 +136,7 @@ async def test(): asyncio.run(test()) def test_fallible_struct(self): + @self.check_handle_counts() async def test(): megaphone = await fallible_struct(False) self.assertEqual(await megaphone.fallible_me(False), 42) @@ -137,6 +150,7 @@ async def test(): asyncio.run(test()) def test_record(self): + @self.check_handle_counts() async def test(): result = await new_my_record("foo", 42) self.assertEqual(result.__class__, MyRecord) @@ -146,6 +160,7 @@ async def test(): asyncio.run(test()) def test_cancel(self): + @self.check_handle_counts() async def test(): # Create a task task = asyncio.create_task(say_after(200, 'Alice')) @@ -163,6 +178,7 @@ async def test(): # Test a future that uses a lock and that is cancelled. def test_shared_resource_cancellation(self): + @self.check_handle_counts() async def test(): task = asyncio.create_task(use_shared_resource( SharedResourceOptions(release_after_ms=5000, timeout_ms=100))) @@ -173,10 +189,49 @@ async def test(): asyncio.run(test()) def test_shared_resource_no_cancellation(self): + @self.check_handle_counts() async def test(): await use_shared_resource(SharedResourceOptions(release_after_ms=100, timeout_ms=1000)) await use_shared_resource(SharedResourceOptions(release_after_ms=0, timeout_ms=1000)) asyncio.run(test()) + # blocking task queue tests + + def test_calc_square(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executor = ThreadPoolExecutor() + self.assertEqual(await calc_square(executor, 20), 400) + asyncio.run(test()) + + def test_calc_squares(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executor = ThreadPoolExecutor() + self.assertEqual(await calc_squares(executor, [1, -2, 3]), [1, 4, 9]) + asyncio.run(test()) + + def test_calc_squares_multi_queue(self): + @self.check_handle_counts() + async def test(): + async with asyncio.timeout(1): + executors = [ + ThreadPoolExecutor(), + ThreadPoolExecutor(), + ThreadPoolExecutor(), + ] + self.assertEqual(await calc_squares_multi_queue(executors, [1, -2, 3]), [1, 4, 9]) + asyncio.run(test()) + + @contextlib.asynccontextmanager + async def check_handle_counts(self): + initial_poll_handle_count = len(futures.UNIFFI_POLL_HANDLE_MAP) + initial_blocking_task_queue_handle_count = len(futures.UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP) + yield + self.assertEqual(len(futures.UNIFFI_POLL_HANDLE_MAP), initial_poll_handle_count) + self.assertEqual(len(futures.UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP), initial_blocking_task_queue_handle_count) + if __name__ == '__main__': unittest.main() diff --git a/fixtures/futures/tests/bindings/test_futures.swift b/fixtures/futures/tests/bindings/test_futures.swift index 20e24c40ff..54bc3aea28 100644 --- a/fixtures/futures/tests/bindings/test_futures.swift +++ b/fixtures/futures/tests/bindings/test_futures.swift @@ -3,10 +3,21 @@ import Foundation // To get `DispatchGroup` and `Date` types. var counter = DispatchGroup() -// Test `alwaysReady` -counter.enter() +func asyncTest(test: @escaping () async throws -> ()) { + let initialBlockingTaskQueueCount = uniffiBlockingTaskQueueHandleCount() + let initialPollCount = uniffiPollHandleCount() + counter.enter() + Task { + try! await test() + counter.leave() + } + counter.wait() + assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueCount) + assert(uniffiPollHandleCount() == initialPollCount) +} -Task { +// Test `alwaysReady` +asyncTest { let t0 = Date() let result = await alwaysReady() let t1 = Date() @@ -14,40 +25,28 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration < 0.1) assert(result == true) - - counter.leave() } // Test record. -counter.enter() - -Task { +asyncTest { let result = await newMyRecord(a: "foo", b: 42) assert(result.a == "foo") assert(result.b == 42) - - counter.leave() } // Test `void` -counter.enter() - -Task { +asyncTest { let t0 = Date() await void() let t1 = Date() let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration < 0.1) - - counter.leave() } // Test `Sleep` -counter.enter() - -Task { +asyncTest { let t0 = Date() let result = await sleep(ms: 2000) let t1 = Date() @@ -55,14 +54,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result == true) - - counter.leave() } // Test sequential futures. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result_alice = await sayAfter(ms: 1000, who: "Alice") let result_bob = await sayAfter(ms: 2000, who: "Bob") @@ -72,14 +67,10 @@ Task { assert(tDelta.duration > 3 && tDelta.duration < 3.1) assert(result_alice == "Hello, Alice!") assert(result_bob == "Hello, Bob!") - - counter.leave() } // Test concurrent futures. -counter.enter() - -Task { +asyncTest { async let alice = sayAfter(ms: 1000, who: "Alice") async let bob = sayAfter(ms: 2000, who: "Bob") @@ -91,14 +82,10 @@ Task { assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "Hello, Alice!") assert(result_bob == "Hello, Bob!") - - counter.leave() } // Test async methods -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -108,26 +95,18 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "HELLO, ALICE!") - - counter.leave() } // Test async function returning an object -counter.enter() - -Task { +asyncTest { let megaphone = await asyncNewMegaphone() let result = try await megaphone.fallibleMe(doFail: false) assert(result == 42) - - counter.leave() } // Test with the Tokio runtime. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result_alice = await sayAfterWithTokio(ms: 2000, who: "Alice") let t1 = Date() @@ -135,15 +114,11 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 2 && tDelta.duration < 2.1) assert(result_alice == "Hello, Alice (with Tokio)!") - - counter.leave() } // Test fallible function/method… // … which doesn't throw. -counter.enter() - -Task { +asyncTest { let t0 = Date() let result = try await fallibleMe(doFail: false) let t1 = Date() @@ -151,19 +126,15 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) assert(result == 42) - - counter.leave() } -Task { +asyncTest { let m = try await fallibleStruct(doFail: false) let result = try await m.fallibleMe(doFail: false) assert(result == 42) } -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -173,14 +144,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) assert(result == 42) - - counter.leave() } // … which does throw. -counter.enter() - -Task { +asyncTest { let t0 = Date() do { @@ -195,11 +162,9 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) - - counter.leave() } -Task { +asyncTest { do { let _ = try await fallibleStruct(doFail: true) } catch MyError.Foo { @@ -209,9 +174,7 @@ Task { } } -counter.enter() - -Task { +asyncTest { let megaphone = newMegaphone() let t0 = Date() @@ -228,13 +191,10 @@ Task { let tDelta = DateInterval(start: t0, end: t1) assert(tDelta.duration > 0 && tDelta.duration < 0.1) - - counter.leave() } // Test a future that uses a lock and that is cancelled. -counter.enter() -Task { +asyncTest { let task = Task { try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000)) } @@ -250,15 +210,30 @@ Task { // Try accessing the shared resource again. The initial task should release the shared resource // before the timeout expires. try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000)) - counter.leave() } // Test a future that uses a lock and that is not cancelled. -counter.enter() -Task { +asyncTest { try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 100, timeoutMs: 1000)) try! await useSharedResource(options: SharedResourceOptions(releaseAfterMs: 0, timeoutMs: 1000)) - counter.leave() } -counter.wait() +// Test blocking task queues +asyncTest { + let calcSquareResult = await calcSquare(queue: DispatchQueue.global(qos: .userInitiated), value: 20) + assert(calcSquareResult == 400) + + let calcSquaresResult = await calcSquares(queue: DispatchQueue.global(qos: .userInitiated), items: [1, -2, 3]) + assert(calcSquaresResult == [1, 4, 9]) + + let calcSquaresMultiQueueResult = await calcSquaresMultiQueue( + queues: [ + DispatchQueue(label: "test-queue1", attributes: DispatchQueue.Attributes.concurrent), + DispatchQueue(label: "test-queue2", attributes: DispatchQueue.Attributes.concurrent), + DispatchQueue(label: "test-queue3", attributes: DispatchQueue.Attributes.concurrent) + ], + items: [1, -2, 3] + ) + assert(calcSquaresMultiQueueResult == [1, 4, 9]) +} + diff --git a/fixtures/metadata/src/tests.rs b/fixtures/metadata/src/tests.rs index eac852cfea..5fd35be02c 100644 --- a/fixtures/metadata/src/tests.rs +++ b/fixtures/metadata/src/tests.rs @@ -114,7 +114,7 @@ mod test_type_ids { check_type_id::(Type::Float64); check_type_id::(Type::Boolean); check_type_id::(Type::String); - check_type_id::(Type::ForeignExecutor); + check_type_id::(Type::BlockingTaskQueue); } #[test] diff --git a/uniffi_bindgen/src/bindings/python/gen_python/executor.rs b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs similarity index 61% rename from uniffi_bindgen/src/bindings/python/gen_python/executor.rs rename to uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs index 2834d9e2ac..cec719b4b5 100644 --- a/uniffi_bindgen/src/bindings/python/gen_python/executor.rs +++ b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/blocking_task_queue.rs @@ -5,14 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { fn type_label(&self) -> String { - "asyncio.BaseEventLoop".into() + // Kotlin uses CoroutineContext for BlockingTaskQueue + "CoroutineContext".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs index 7cd8867d14..11aed97137 100644 --- a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs +++ b/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/mod.rs @@ -15,12 +15,12 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; mod error; -mod executor; mod external; mod miscellany; mod object; @@ -298,7 +298,6 @@ impl KotlinCodeOracle { } FfiType::ForeignBytes => "ForeignBytes.ByValue".to_string(), FfiType::ForeignCallback => "ForeignCallback".to_string(), - FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback".to_string(), FfiType::RustFutureContinuationCallback => { "UniFffiRustFutureContinuationCallbackType".to_string() } @@ -364,7 +363,7 @@ impl AsCodeType for T { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } @@ -542,7 +541,7 @@ pub mod filters { ) -> Result { let ffi_func = callable.ffi_rust_future_poll(ci); Ok(format!( - "{{ future, callback, continuation -> _UniFFILib.INSTANCE.{ffi_func}(future, callback, continuation) }}" + "{{ future, callback, continuation, blockingTaskQueue -> _UniFFILib.INSTANCE.{ffi_func}(future, callback, continuation, blockingTaskQueue) }}" )) } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt index f2ee0ba0d5..80ad63585a 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Async.kt @@ -1,20 +1,60 @@ // Async return type handlers -internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toShort() -internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toShort() +internal const val UNIFFI_RUST_FUTURE_POLL_READY = 0.toByte() +internal const val UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1.toByte() -internal val uniffiContinuationHandleMap = UniffiHandleMap>() +/** + * In-progress poll of a RustFuture + * + * Lifecycle of UniffiPoll: + * * UniffiPoll is constructed with a Kotlin continuation that `uniffiRustCallAsync` awaits. + * * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * * poll() is called, which calls the `rust_future_poll()` + * * When rust calls the continuation function we can either: + * * Consume the handle, complete the continuation, and cause `uniffiRustCallAsync` to continue + * * Run another poll in an CoroutineContext (AKA BlockingTaskQueue). + */ +internal class UniffiPoll( + val continuation: CancellableContinuation, + val rustFuture: UniffiHandle, + val pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle, Long) -> Unit, +) { + val handle = uniffiPollHandleMap.newHandle(this) + + internal fun poll(BlockingTaskQueueHandle: UniffiHandle = 0) { + pollFunc(rustFuture, uniffiRustFutureContinuationCallback, handle, BlockingTaskQueueHandle) + } + + internal fun pollInContext(BlockingTaskQueueHandle: UniffiHandle) { + val coroutineContext = uniffiBlockingTaskQueueHandleMap.get(BlockingTaskQueueHandle) + CoroutineScope(coroutineContext).launch { + poll(BlockingTaskQueueHandle) + } + } + + internal fun completeContinuation(pollResult: Byte) { + continuation.resume(pollResult) + } +} + +internal val uniffiPollHandleMap = UniffiHandleMap() // FFI type for Rust future continuations internal object uniffiRustFutureContinuationCallback: UniFffiRustFutureContinuationCallbackType { - override fun callback(continuationHandle: UniffiHandle, pollResult: Short) { - uniffiContinuationHandleMap.consumeHandle(continuationHandle).resume(pollResult) + override fun callback(handle: UniffiHandle, pollResult: Byte, BlockingTaskQueueHandle: Long) { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY || BlockingTaskQueueHandle == 0L) { + val poll = uniffiPollHandleMap.consumeHandle(handle) + poll.completeContinuation(pollResult) + } else { + val poll = uniffiPollHandleMap.get(handle) + poll.pollInContext(BlockingTaskQueueHandle) + } } } internal suspend fun uniffiRustCallAsync( rustFuture: UniffiHandle, - pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle) -> Unit, + pollFunc: (UniffiHandle, UniFffiRustFutureContinuationCallbackType, UniffiHandle, Long) -> Unit, completeFunc: (UniffiHandle, RustCallStatus) -> F, freeFunc: (UniffiHandle) -> Unit, liftFunc: (F) -> T, @@ -22,12 +62,8 @@ internal suspend fun uniffiRustCallAsync( ): T { try { do { - val pollResult = suspendCancellableCoroutine { continuation -> - pollFunc( - rustFuture, - uniffiRustFutureContinuationCallback, - uniffiContinuationHandleMap.newHandle(continuation) - ) + val pollResult = suspendCancellableCoroutine { + UniffiPoll(it, rustFuture, pollFunc).poll() } } while (pollResult != UNIFFI_RUST_FUTURE_POLL_READY); @@ -39,3 +75,5 @@ internal suspend fun uniffiRustCallAsync( } } +// For testing +public fun uniffiPollHandleCount() = uniffiPollHandleMap.size diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt new file mode 100644 index 0000000000..3541bb948c --- /dev/null +++ b/uniffi_bindgen/src/bindings/kotlin/templates/BlockingTaskQueueTemplate.kt @@ -0,0 +1,23 @@ +{{ self.add_import("kotlin.coroutines.CoroutineContext") }} + +internal val uniffiBlockingTaskQueueHandleMap = UniffiHandleMap() + +public object {{ ffi_converter_name }}: FfiConverter { + + override fun lower(value: CoroutineContext): UniffiHandle = uniffiBlockingTaskQueueHandleMap.newHandle(value) + + override fun lift(value: UniffiHandle): CoroutineContext = uniffiBlockingTaskQueueHandleMap.consumeHandle(value) + + override fun allocationSize(value: {{ type_name }}) = 8 + + override fun read(buf: ByteBuffer): CoroutineContext { + return lift(buf.getLong()) + } + + override fun write(value: CoroutineContext, buf: ByteBuffer) { + buf.putLong(lower(value)) + } +} + +// For testing +public fun uniffiBlockingTaskQueueHandleCount() = uniffiBlockingTaskQueueHandleMap.size diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt b/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt deleted file mode 100644 index 90c84b9fc9..0000000000 --- a/uniffi_bindgen/src/bindings/kotlin/templates/ForeignExecutorTemplate.kt +++ /dev/null @@ -1,78 +0,0 @@ -{{ self.add_import("kotlinx.coroutines.CoroutineScope") }} -{{ self.add_import("kotlinx.coroutines.delay") }} -{{ self.add_import("kotlinx.coroutines.isActive") }} -{{ self.add_import("kotlinx.coroutines.launch") }} - -internal const val UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0.toByte() -internal const val UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED = 1.toByte() -internal const val UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2.toByte() - -// Callback function to execute a Rust task. The Kotlin code schedules these in a coroutine then -// invokes them. -internal interface UniFfiRustTaskCallback : com.sun.jna.Callback { - fun callback(rustTaskData: Pointer?, statusCode: Byte) -} - -internal object UniFfiForeignExecutorCallback : com.sun.jna.Callback { - fun callback(handle: UniffiHandle, delayMs: Int, rustTask: UniFfiRustTaskCallback?, rustTaskData: Pointer?) : Byte { - if (rustTask == null) { - FfiConverterForeignExecutor.drop(handle) - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - val coroutineScope = FfiConverterForeignExecutor.lift(handle) - if (coroutineScope.isActive) { - val job = coroutineScope.launch { - if (delayMs > 0) { - delay(delayMs.toLong()) - } - rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS) - } - job.invokeOnCompletion { cause -> - if (cause != null) { - rustTask.callback(rustTaskData, UNIFFI_RUST_TASK_CALLBACK_CANCELLED) - } - } - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELLED - } - } - } -} - -public object FfiConverterForeignExecutor: FfiConverter { - internal val handleMap = UniffiHandleMap() - - internal fun drop(handle: UniffiHandle) { - handleMap.consumeHandle(handle) - } - - internal fun register(lib: _UniFFILib) { - {%- match ci.ffi_foreign_executor_callback_set() %} - {%- when Some with (fn) %} - lib.{{ fn.name() }}(UniFfiForeignExecutorCallback) - {%- when None %} - {#- No foreign executor, we don't set anything #} - {% endmatch %} - } - - override fun allocationSize(value: CoroutineScope) = 8 - - override fun lift(value: UniffiHandle): CoroutineScope { - return handleMap.get(value) - } - - override fun read(buf: ByteBuffer): CoroutineScope { - return lift(buf.getLong()) - } - - override fun lower(value: CoroutineScope): UniffiHandle { - return handleMap.newHandle(value) - } - - override fun write(value: CoroutineScope, buf: ByteBuffer) { - buf.putLong(lower(value)) - } -} diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt b/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt index 427d01f7ca..de323a2e74 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/HandleMap.kt @@ -39,6 +39,9 @@ internal class UniffiHandleMap { map.remove(key(handle)) ?: throw InternalException("Missing key in handlemap: was the handle used after being freed?") } + val size: Int + get() = map.size + companion object { // Generate map IDs that are likely to be unique private var mapIdCounter: Long = {{ ci.namespace_hash() }}.and(0x7FFF) diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt index e91c8e4119..ff358c5a5a 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Helpers.kt @@ -80,5 +80,5 @@ private inline fun rustCall(callback: (RustCallStatus) -> U): U { // FFI type for Rust future continuations internal interface UniFffiRustFutureContinuationCallbackType : com.sun.jna.Callback { - fun callback(continuationHandle: Long, pollResult: Short); + fun callback(handle: UniffiHandle, pollResult: Byte, BlockingTaskQueueHandle: Long); } diff --git a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt index 89546eac92..d693df249d 100644 --- a/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt +++ b/uniffi_bindgen/src/bindings/kotlin/templates/Types.kt @@ -82,8 +82,8 @@ {%- when Type::CallbackInterface { module_path, name } %} {% include "CallbackInterfaceTemplate.kt" %} -{%- when Type::ForeignExecutor %} -{% include "ForeignExecutorTemplate.kt" %} +{%- when Type::BlockingTaskQueue %} +{% include "BlockingTaskQueueTemplate.kt" %} {%- when Type::Timestamp %} {% include "TimestampHelper.kt" %} @@ -104,6 +104,8 @@ {%- if ci.has_async_fns() %} {# Import types needed for async support #} {{ self.add_import("kotlin.coroutines.resume") }} +{{ self.add_import("kotlinx.coroutines.launch") }} {{ self.add_import("kotlinx.coroutines.suspendCancellableCoroutine") }} {{ self.add_import("kotlinx.coroutines.CancellableContinuation") }} +{{ self.add_import("kotlinx.coroutines.CoroutineScope") }} {%- endif %} diff --git a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs b/uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs similarity index 51% rename from uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs rename to uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs index e46058e7f1..3f23d618a0 100644 --- a/uniffi_bindgen/src/bindings/kotlin/gen_kotlin/executor.rs +++ b/uniffi_bindgen/src/bindings/python/gen_python/blocking_task_queue.rs @@ -5,19 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { + // On python we use an concurrent.futures.Executor for a BlockingTaskQueue fn type_label(&self) -> String { - // Kotlin uses a CoroutineScope for ForeignExecutor - "CoroutineScope".into() + "concurrent.futures.Executor".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() - } - - fn initialization_fn(&self) -> Option { - Some("FfiConverterForeignExecutor.register".into()) + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs index 5e385e457b..edf2a2bdc5 100644 --- a/uniffi_bindgen/src/bindings/python/gen_python/mod.rs +++ b/uniffi_bindgen/src/bindings/python/gen_python/mod.rs @@ -15,11 +15,11 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; -mod executor; mod external; mod miscellany; mod object; @@ -318,7 +318,6 @@ impl PythonCodeOracle { }, FfiType::ForeignBytes => "_UniffiForeignBytes".to_string(), FfiType::ForeignCallback => "_UNIFFI_FOREIGN_CALLBACK_T".to_string(), - FfiType::ForeignExecutorCallback => "_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T".to_string(), FfiType::RustFutureContinuationCallback => "_UNIFFI_FUTURE_CONTINUATION_T".to_string(), } } @@ -376,7 +375,7 @@ impl AsCodeType for T { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } diff --git a/uniffi_bindgen/src/bindings/python/templates/Async.py b/uniffi_bindgen/src/bindings/python/templates/Async.py index f75bfcd821..68440aaf28 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Async.py +++ b/uniffi_bindgen/src/bindings/python/templates/Async.py @@ -1,35 +1,70 @@ # RustFuturePoll values -_UNIFFI_RUST_FUTURE_POLL_READY = 0 -_UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 +UNIFFI_RUST_FUTURE_POLL_READY = 0 +UNIFFI_RUST_FUTURE_POLL_MAYBE_READY = 1 + +""" +In-progress poll of a RustFuture + +Lifecycle of UniffiPoll: + * UniffiPoll is constructed with a Python future that `_uniffi_rust_call_async` awaits + * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * poll() is called, which calls the `rust_future_poll()` + * When rust calls the continuation function we can either: + * Consume the handle, complete the Python future, and cause `_uniffi_rust_call_async` to continue + * Run another poll in an futures.Executor thread (AKA BlockingTaskQueue). +""" +class UniffiPoll: + def __init__( + self, + eventloop: asyncio.AbstractEventLoop, + py_future: asyncio.Future, + rust_future: int, + ffi_poll: ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8, ctypes.c_uint64) + ): + self.eventloop = eventloop + self.py_future = py_future + self.rust_future = rust_future + self.ffi_poll = ffi_poll + self.handle = UNIFFI_POLL_HANDLE_MAP.new_handle(self) + + def poll(self, blocking_task_queue_handle=0): + self.ffi_poll(self.rust_future, _uniffi_continuation_callback, self.handle, blocking_task_queue_handle) + + def poll_in_executor(self, blocking_task_queue_handle): + executor = UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.get(blocking_task_queue_handle) + executor.submit(self.poll, blocking_task_queue_handle) + + def complete_py_future(self, poll_code: int): + self.eventloop.call_soon_threadsafe(self._complete_py_future, poll_code) + + def _complete_py_future(self, poll_code: int): + if not self.py_future.cancelled(): + self.py_future.set_result(poll_code) # Stores futures for _uniffi_continuation_callback -_UNIFFI_CONTINUATION_HANDLE_MAP = UniffiHandleMap() +UNIFFI_POLL_HANDLE_MAP = UniffiHandleMap() # Continuation callback for async functions # lift the return value or error and resolve the future, causing the async function to resume. @_UNIFFI_FUTURE_CONTINUATION_T -def _uniffi_continuation_callback(handle, poll_code): - (eventloop, future) = _UNIFFI_CONTINUATION_HANDLE_MAP.consume_handle(handle) - eventloop.call_soon_threadsafe(_uniffi_set_future_result, future, poll_code) - -def _uniffi_set_future_result(future, poll_code): - if not future.cancelled(): - future.set_result(poll_code) +def _uniffi_continuation_callback(handle, poll_code, blocking_task_queue_handle): + if poll_code == UNIFFI_RUST_FUTURE_POLL_READY or blocking_task_queue_handle == 0: + poll = UNIFFI_POLL_HANDLE_MAP.consume_handle(handle) + poll.complete_py_future(poll_code) + elif blocking_task_queue_handle != 0: + poll = UNIFFI_POLL_HANDLE_MAP.get(handle) + poll.poll_in_executor(blocking_task_queue_handle) async def _uniffi_rust_call_async(rust_future, ffi_poll, ffi_complete, ffi_free, lift_func, error_ffi_converter): try: eventloop = asyncio.get_running_loop() - # Loop and poll until we see a _UNIFFI_RUST_FUTURE_POLL_READY value + # Loop and poll until we see a UNIFFI_RUST_FUTURE_POLL_READY value while True: - future = eventloop.create_future() - ffi_poll( - rust_future, - _uniffi_continuation_callback, - _UNIFFI_CONTINUATION_HANDLE_MAP.new_handle((eventloop, future)), - ) - poll_code = await future - if poll_code == _UNIFFI_RUST_FUTURE_POLL_READY: + py_future = eventloop.create_future() + UniffiPoll(eventloop, py_future, rust_future, ffi_poll).poll() + poll_code = await py_future + if poll_code == UNIFFI_RUST_FUTURE_POLL_READY: break return lift_func( diff --git a/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py b/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py new file mode 100644 index 0000000000..ab1222213b --- /dev/null +++ b/uniffi_bindgen/src/bindings/python/templates/BlockingTaskQueueTemplate.py @@ -0,0 +1,21 @@ +{{ self.add_import("concurrent.futures") }} + +UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP = UniffiHandleMap() +class {{ ffi_converter_name }}: + @staticmethod + def lift(handle: UniffiHandle) -> concurrent.futures.Executor: + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.consume_handle(handle) + + @staticmethod + def lower(value: concurrent.futures.Executor) -> UniffiHandle: + if not isinstance(value, concurrent.futures.Executor): + raise TypeError("Expected concurrent.futures.Executor instance, {} found".format(type(value).__name__)) + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.new_handle(value) + + @classmethod + def read(cls, buf: _UniffiRustBuffer): + return cls.lift(buf.read_u64()) + + @classmethod + def write(cls, value: {{ type_name }}, buf: _UniffiRustBuffer): + buf.write_u64(cls.lower(value)) diff --git a/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py b/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py deleted file mode 100644 index c4a28a03cf..0000000000 --- a/uniffi_bindgen/src/bindings/python/templates/ForeignExecutorTemplate.py +++ /dev/null @@ -1,63 +0,0 @@ -# FFI code for the ForeignExecutor type - -{{ self.add_import("asyncio") }} - -_UNIFFI_RUST_TASK_CALLBACK_SUCCESS = 0 -_UNIFFI_RUST_TASK_CALLBACK_CANCELLED = 1 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS = 0 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED = 1 -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR = 2 - -class {{ ffi_converter_name }}: - _handle_map = UniffiHandleMap() - - @classmethod - def lower(cls, eventloop): - if not isinstance(eventloop, asyncio.BaseEventLoop): - raise TypeError("_uniffi_executor_callback: Expected EventLoop instance") - return cls._handle_map.new_handle(eventloop) - - @classmethod - def write(cls, eventloop, buf): - buf.write_u64(cls.lower(eventloop)) - - @classmethod - def read(cls, buf): - return cls.lift(buf.read_u64()) - - @classmethod - def lift(cls, value): - return cls._handle_map.get(value) - -@_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T -def _uniffi_executor_callback(handle, delay, task_ptr, task_data): - if task_ptr is None: - {{ ffi_converter_name }}._handle_map.consume_handle(handle) - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - else: - eventloop = {{ ffi_converter_name }}._handle_map.get(handle) - if eventloop.is_closed(): - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED - - callback = _UNIFFI_RUST_TASK(task_ptr) - # FIXME: there's no easy way to get a callback when an eventloop is closed. This means that - # if eventloop is called before the `call_soon_threadsafe()` calls are invoked, the call - # will never happen and we will probably leak a resource. - if delay == 0: - # This can be called from any thread, so make sure to use `call_soon_threadsafe' - eventloop.call_soon_threadsafe(callback, task_data, - _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS) - else: - # For delayed tasks, we use `call_soon_threadsafe()` + `call_later()` to make the - # operation threadsafe - eventloop.call_soon_threadsafe(eventloop.call_later, delay / 1000.0, callback, - task_data, _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS) - return _UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - -# Register the callback with the scaffolding -{%- match ci.ffi_foreign_executor_callback_set() %} -{%- when Some with (fn) %} -_UniffiLib.{{ fn.name() }}(_uniffi_executor_callback) -{%- when None %} -{#- No foreign executor, we don't set anything #} -{% endmatch %} diff --git a/uniffi_bindgen/src/bindings/python/templates/HandleMap.py b/uniffi_bindgen/src/bindings/python/templates/HandleMap.py index 182e1d4ffc..baae3b7ddc 100644 --- a/uniffi_bindgen/src/bindings/python/templates/HandleMap.py +++ b/uniffi_bindgen/src/bindings/python/templates/HandleMap.py @@ -62,3 +62,6 @@ def consume_handle(self, handle: int) -> object: return self._map.pop(self._key(handle)) except KeyError: raise InternalError("handlemap key error: was the handle used after being freed?") + + def __len__(self) -> int: + return len(self._map) diff --git a/uniffi_bindgen/src/bindings/python/templates/Helpers.py b/uniffi_bindgen/src/bindings/python/templates/Helpers.py index 0569528377..445b54e39a 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Helpers.py +++ b/uniffi_bindgen/src/bindings/python/templates/Helpers.py @@ -71,5 +71,4 @@ def _uniffi_check_call_status(error_ffi_converter, call_status): _UNIFFI_FOREIGN_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int, ctypes.c_ulonglong, ctypes.c_ulong, ctypes.POINTER(ctypes.c_char), ctypes.c_int, ctypes.POINTER(_UniffiRustBuffer)) # UniFFI future continuation -_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8) - +_UNIFFI_FUTURE_CONTINUATION_T = ctypes.CFUNCTYPE(None, ctypes.c_uint64, ctypes.c_int8, ctypes.c_uint64) diff --git a/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py b/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py index bfcc28be81..bfbd7e159c 100644 --- a/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py +++ b/uniffi_bindgen/src/bindings/python/templates/NamespaceLibraryTemplate.py @@ -1,26 +1,5 @@ # Define some ctypes FFI types that we use in the library -""" -ctypes type for the foreign executor callback. This is a built-in interface for scheduling -tasks - -Args: - executor: opaque c_size_t value representing the eventloop - delay: delay in ms - task: function pointer to the task callback - task_data: void pointer to the task callback data - -Normally we should call task(task_data) after the detail. -However, when task is NULL this indicates that Rust has dropped the ForeignExecutor and we should -decrease the EventLoop refcount. -""" -_UNIFFI_FOREIGN_EXECUTOR_CALLBACK_T = ctypes.CFUNCTYPE(ctypes.c_int8, ctypes.c_uint64, ctypes.c_uint32, ctypes.c_void_p, ctypes.c_void_p) - -""" -Function pointer for a Rust task, which a callback function that takes a opaque pointer -""" -_UNIFFI_RUST_TASK = ctypes.CFUNCTYPE(None, ctypes.c_void_p, ctypes.c_int8) - def _uniffi_future_callback_t(return_type): """ Factory function to create callback function types for async functions diff --git a/uniffi_bindgen/src/bindings/python/templates/Types.py b/uniffi_bindgen/src/bindings/python/templates/Types.py index 84afa6bbff..9acf2badb7 100644 --- a/uniffi_bindgen/src/bindings/python/templates/Types.py +++ b/uniffi_bindgen/src/bindings/python/templates/Types.py @@ -94,8 +94,8 @@ {%- when Type::External { name, module_path, namespace, kind, tagged } %} {%- include "ExternalTemplate.py" %} -{%- when Type::ForeignExecutor %} -{%- include "ForeignExecutorTemplate.py" %} +{%- when Type::BlockingTaskQueue %} +{%- include "BlockingTaskQueueTemplate.py" %} {%- else %} {%- endmatch %} diff --git a/uniffi_bindgen/src/bindings/python/templates/wrapper.py b/uniffi_bindgen/src/bindings/python/templates/wrapper.py index 024663ae5b..3fb7f7dd8a 100644 --- a/uniffi_bindgen/src/bindings/python/templates/wrapper.py +++ b/uniffi_bindgen/src/bindings/python/templates/wrapper.py @@ -13,6 +13,7 @@ # compile the rust component. The easiest way to ensure this is to bundle the Python # helpers directly inline like we're doing here. +from dataclasses import dataclass import os import sys import ctypes diff --git a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs index 41f8a7506e..4e7fd41096 100644 --- a/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs +++ b/uniffi_bindgen/src/bindings/ruby/gen_ruby/mod.rs @@ -57,7 +57,7 @@ pub fn canonical_name(t: &Type) -> String { Type::CallbackInterface { name, .. } => format!("CallbackInterface{name}"), Type::Timestamp => "Timestamp".into(), Type::Duration => "Duration".into(), - Type::ForeignExecutor => "ForeignExecutor".into(), + Type::BlockingTaskQueue => "BlockingTaskQueue".into(), // Recursive types. // These add a prefix to the name of the underlying type. // The component API definition cannot give names to recursive types, so as long as the @@ -156,9 +156,6 @@ mod filters { // Callback interfaces are not yet implemented, but this needs to return something in // order for the coverall tests to pass. FfiType::ForeignCallback => ":pointer".to_string(), - FfiType::ForeignExecutorCallback => { - unimplemented!("Foreign executors are not implemented") - } FfiType::RustFutureContinuationCallback => { unimplemented!("Async functions are not implemented") } @@ -261,7 +258,7 @@ mod filters { } Type::External { .. } => panic!("No support for external types, yet"), Type::Custom { .. } => panic!("No support for custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } @@ -297,7 +294,7 @@ mod filters { ), Type::External { .. } => panic!("No support for lowering external types, yet"), Type::Custom { .. } => panic!("No support for lowering custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } @@ -338,7 +335,7 @@ mod filters { ), Type::External { .. } => panic!("No support for lifting external types, yet"), Type::Custom { .. } => panic!("No support for lifting custom types, yet"), - Type::ForeignExecutor => unimplemented!("Foreign executors are not implemented"), + Type::BlockingTaskQueue => unimplemented!("BlockingTaskQueue are not implemented"), }) } } diff --git a/uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs b/uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs similarity index 51% rename from uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs rename to uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs index 73a373d8d9..ee79fc476b 100644 --- a/uniffi_bindgen/src/bindings/swift/gen_swift/executor.rs +++ b/uniffi_bindgen/src/bindings/swift/gen_swift/blocking_task_queue.rs @@ -5,19 +5,15 @@ use crate::backend::CodeType; #[derive(Debug)] -pub struct ForeignExecutorCodeType; +pub struct BlockingTaskQueueCodeType; -impl CodeType for ForeignExecutorCodeType { +impl CodeType for BlockingTaskQueueCodeType { fn type_label(&self) -> String { - // On Swift, we define a struct to represent a ForeignExecutor - "UniFfiForeignExecutor".into() + // On Swift, we use a DispatchQueue for BlockingTaskQueue + "DispatchQueue".into() } fn canonical_name(&self) -> String { - "ForeignExecutor".into() - } - - fn initialization_fn(&self) -> Option { - Some("uniffiInitForeignExecutor".into()) + "BlockingTaskQueue".into() } } diff --git a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs index 87b3651728..651e6b60f3 100644 --- a/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs +++ b/uniffi_bindgen/src/bindings/swift/gen_swift/mod.rs @@ -17,11 +17,11 @@ use crate::backend::{CodeType, TemplateExpression}; use crate::interface::*; use crate::BindingsConfig; +mod blocking_task_queue; mod callback_interface; mod compounds; mod custom; mod enum_; -mod executor; mod external; mod miscellany; mod object; @@ -403,7 +403,7 @@ impl SwiftCodeOracle { Type::CallbackInterface { name, .. } => { Box::new(callback_interface::CallbackInterfaceCodeType::new(name)) } - Type::ForeignExecutor => Box::new(executor::ForeignExecutorCodeType), + Type::BlockingTaskQueue => Box::new(blocking_task_queue::BlockingTaskQueueCodeType), Type::Optional { inner_type } => { Box::new(compounds::OptionalCodeType::new(*inner_type)) } @@ -464,16 +464,13 @@ impl SwiftCodeOracle { FfiType::RustBuffer(_) => "RustBuffer".into(), FfiType::ForeignBytes => "ForeignBytes".into(), FfiType::ForeignCallback => "ForeignCallback".into(), - FfiType::ForeignExecutorCallback => "ForeignExecutorCallback".into(), FfiType::RustFutureContinuationCallback => "UniFfiRustFutureContinuation".into(), } } fn ffi_type_label(&self, ffi_type: &FfiType) -> String { match ffi_type { - FfiType::ForeignCallback - | FfiType::ForeignExecutorCallback - | FfiType::RustFutureContinuationCallback => { + FfiType::ForeignCallback | FfiType::RustFutureContinuationCallback => { format!("{} _Nonnull", self.ffi_type_label_raw(ffi_type)) } _ => self.ffi_type_label_raw(ffi_type), @@ -574,7 +571,6 @@ pub mod filters { FfiType::RustBuffer(_) => "RustBuffer".into(), FfiType::ForeignBytes => "ForeignBytes".into(), FfiType::ForeignCallback => "ForeignCallback _Nonnull".into(), - FfiType::ForeignExecutorCallback => "UniFfiForeignExecutorCallback _Nonnull".into(), FfiType::RustFutureContinuationCallback => { "UniFfiRustFutureContinuation _Nonnull".into() } diff --git a/uniffi_bindgen/src/bindings/swift/templates/Async.swift b/uniffi_bindgen/src/bindings/swift/templates/Async.swift index f7446ca5e0..00aedff07b 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/Async.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/Async.swift @@ -1,11 +1,75 @@ private let UNIFFI_RUST_FUTURE_POLL_READY: Int8 = 0 private let UNIFFI_RUST_FUTURE_POLL_MAYBE_READY: Int8 = 1 -fileprivate var UNIFFI_CONTINUATION_HANDLE_MAP = UniffiHandleMap>() +/** + * In-progress poll of a RustFuture + * + * Lifecycle of UniffiPoll: + * * UniffiPoll is constructed with a Swift continuation that `uniffiRustCallAsync` awaits + * * As part of the construction, a new handle is allocated for sending to `rust_future_poll`. + * * poll() is called, which calls the `rust_future_poll()` + * * When rust calls the continuation function we can either: + * * Consume the handle, complete the continuation, and cause `uniffiRustCallAsync` to continue + * * Run another poll in an CoroutineContext (AKA BlockingTaskQueue). + */ +fileprivate class UniffiPoll { + let continuation: UnsafeContinuation + let rustFuture: UInt64 + let pollFunc: (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> () + var handle: UInt64 + + init( + continuation: UnsafeContinuation, + rustFuture: UInt64, + pollFunc: @escaping (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> () + ) { + self.continuation = continuation + self.rustFuture = rustFuture + self.pollFunc = pollFunc + self.handle = 0 + } + + fileprivate func createHandle() { + self.handle = UNIFFI_POLL_HANDLE_MAP.newHandle(obj: self) + } + + fileprivate func poll(_ BlockingTaskQueueHandle: UInt64 = 0) { + pollFunc(rustFuture, uniffiFutureContinuationCallback, handle, BlockingTaskQueueHandle) + } + + fileprivate func pollInDispatchQueue(_ BlockingTaskQueueHandle: UInt64) { + let queue = UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.get(handle: BlockingTaskQueueHandle) + queue.async { + self.poll(BlockingTaskQueueHandle) + } + } + + fileprivate func completeContinuation(_ pollResult: Int8) { + continuation.resume(returning: pollResult) + } +} + +fileprivate var UNIFFI_POLL_HANDLE_MAP = UniffiHandleMap() + +// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They +// lift the return value or error and resume the suspended function. +fileprivate func uniffiFutureContinuationCallback( + handle: UInt64, + pollResult: Int8, + BlockingTaskQueueHandle: UInt64 +) { + if (pollResult == UNIFFI_RUST_FUTURE_POLL_READY) || (BlockingTaskQueueHandle == 0) { + let poll = UNIFFI_POLL_HANDLE_MAP.consumeHandle(handle: handle) + poll.completeContinuation(pollResult) + } else { + let poll = UNIFFI_POLL_HANDLE_MAP.get(handle: handle) + poll.pollInDispatchQueue(BlockingTaskQueueHandle) + } +} fileprivate func uniffiRustCallAsync( rustFutureFunc: () -> UInt64, - pollFunc: (UInt64, @escaping UniFfiRustFutureContinuation, UInt64) -> (), + pollFunc: @escaping (UInt64, @escaping UniFfiRustFutureContinuation, UInt64, UInt64) -> (), completeFunc: (UInt64, UnsafeMutablePointer) -> F, freeFunc: (UInt64) -> (), liftFunc: (F) throws -> T, @@ -21,7 +85,9 @@ fileprivate func uniffiRustCallAsync( var pollResult: Int8; repeat { pollResult = await withUnsafeContinuation { - pollFunc(rustFuture, uniffiFutureContinuationCallback, UNIFFI_CONTINUATION_HANDLE_MAP.newHandle(obj: $0)) + let poll = UniffiPoll(continuation: $0, rustFuture: rustFuture, pollFunc: pollFunc) + poll.createHandle() + poll.poll() } } while pollResult != UNIFFI_RUST_FUTURE_POLL_READY @@ -31,8 +97,7 @@ fileprivate func uniffiRustCallAsync( )) } -// Callback handlers for an async calls. These are invoked by Rust when the future is ready. They -// lift the return value or error and resume the suspended function. -fileprivate func uniffiFutureContinuationCallback(handle: UInt64, pollResult: Int8) { - UNIFFI_CONTINUATION_HANDLE_MAP.consumeHandle(handle: handle).resume(returning: pollResult) +// For testing +public func uniffiPollHandleCount() -> Int { + UNIFFI_POLL_HANDLE_MAP.count } diff --git a/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift new file mode 100644 index 0000000000..c4b80f05e8 --- /dev/null +++ b/uniffi_bindgen/src/bindings/swift/templates/BlockingTaskQueueTemplate.swift @@ -0,0 +1,28 @@ +fileprivate var UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP = UniffiHandleMap() + +public struct {{ ffi_converter_name }}: FfiConverter { + + typealias FfiType = UInt64 + typealias SwiftType = DispatchQueue + + public static func lift(_ handle: UInt64) throws -> DispatchQueue { + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.consumeHandle(handle: handle) + } + + public static func lower(_ value: DispatchQueue) -> UInt64 { + return UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.newHandle(obj: value) + } + + public static func read(from buf: inout (data: Data, offset: Data.Index)) throws -> DispatchQueue { + return try lift(try readInt(&buf)) + } + + public static func write(_ value: DispatchQueue, into buf: inout [UInt8]) { + writeInt(&buf, lower(value)) + } +} + +// For testing +public func uniffiBlockingTaskQueueHandleCount() -> Int { + UNIFFI_BLOCKING_TASK_QUEUE_HANDLE_MAP.count +} diff --git a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h index 587eab591b..374d8455c7 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h +++ b/uniffi_bindgen/src/bindings/swift/templates/BridgingHeaderTemplate.h @@ -31,18 +31,6 @@ typedef struct RustBuffer typedef int32_t (*ForeignCallback)(uint64_t, int32_t, const uint8_t *_Nonnull, int32_t, RustBuffer *_Nonnull); -// Task defined in Rust that Swift executes -typedef void (*UniFfiRustTaskCallback)(const void * _Nullable, int8_t); - -// Callback to execute Rust tasks using a Swift Task -// -// Args: -// executor: ForeignExecutor lowered into a size_t value -// delay: Delay in MS -// task: UniFfiRustTaskCallback to call -// task_data: data to pass the task callback -typedef int8_t (*UniFfiForeignExecutorCallback)(size_t, uint32_t, UniFfiRustTaskCallback _Nullable, const void * _Nullable); - typedef struct ForeignBytes { int32_t len; @@ -60,7 +48,7 @@ typedef struct RustCallStatus { #endif // def UNIFFI_SHARED_H // Continuation callback for UniFFI Futures -typedef void (*UniFfiRustFutureContinuation)(uint64_t, int8_t); +typedef void (*UniFfiRustFutureContinuation)(uint64_t, int8_t, uint64_t); // Scaffolding functions {%- for func in ci.iter_ffi_function_definitions() %} diff --git a/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift b/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift deleted file mode 100644 index 167e4c7546..0000000000 --- a/uniffi_bindgen/src/bindings/swift/templates/ForeignExecutorTemplate.swift +++ /dev/null @@ -1,69 +0,0 @@ -private let UNIFFI_RUST_TASK_CALLBACK_SUCCESS: Int8 = 0 -private let UNIFFI_RUST_TASK_CALLBACK_CANCELLED: Int8 = 1 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS: Int8 = 0 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_CANCELED: Int8 = 1 -private let UNIFFI_FOREIGN_EXECUTOR_CALLBACK_ERROR: Int8 = 2 - -// Encapsulates an executor that can run Rust tasks -// -// On Swift, `Task.detached` can handle this we just need to know what priority to send it. -public struct UniFfiForeignExecutor { - var priority: TaskPriority - - public init(priority: TaskPriority) { - self.priority = priority - } - - public init() { - self.priority = Task.currentPriority - } -} - -fileprivate struct FfiConverterForeignExecutor: FfiConverter { - typealias SwiftType = UniFfiForeignExecutor - // Rust uses a pointer to represent the FfiConverterForeignExecutor, but we only need a u8. - // let's use `Int`, which is equivalent to `size_t` - typealias FfiType = Int - - public static func lift(_ value: FfiType) throws -> SwiftType { - UniFfiForeignExecutor(priority: TaskPriority(rawValue: numericCast(value))) - } - public static func lower(_ value: SwiftType) -> FfiType { - numericCast(value.priority.rawValue) - } - - public static func read(from buf: inout (data: Data, offset: Data.Index)) throws -> SwiftType { - fatalError("FfiConverterForeignExecutor.read not implemented yet") - } - public static func write(_ value: SwiftType, into buf: inout [UInt8]) { - fatalError("FfiConverterForeignExecutor.read not implemented yet") - } -} - - -fileprivate func uniffiForeignExecutorCallback(executorHandle: Int, delayMs: UInt32, rustTask: UniFfiRustTaskCallback?, taskData: UnsafeRawPointer?) -> Int8 { - if let rustTask = rustTask { - let executor = try! FfiConverterForeignExecutor.lift(executorHandle) - Task.detached(priority: executor.priority) { - if delayMs != 0 { - let nanoseconds: UInt64 = numericCast(delayMs * 1000000) - try! await Task.sleep(nanoseconds: nanoseconds) - } - rustTask(taskData, UNIFFI_RUST_TASK_CALLBACK_SUCCESS) - } - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } else { - // When rustTask is null, we should drop the foreign executor. - // However, since its just a value type, we don't need to do anything here. - return UNIFFI_FOREIGN_EXECUTOR_CALLBACK_SUCCESS - } -} - -fileprivate func uniffiInitForeignExecutor() { - {%- match ci.ffi_foreign_executor_callback_set() %} - {%- when Some with (fn) %} - {{ fn.name() }}(uniffiForeignExecutorCallback) - {%- when None %} - {#- No foreign executor, we don't set anything #} - {% endmatch %} -} diff --git a/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift b/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift index b294eddd7f..c94b526450 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/HandleMap.swift @@ -18,6 +18,12 @@ fileprivate class UniffiHandleMap { // Note: foreign handles are always odd private var keyCounter: UInt64 = 1 + var count: Int { + get { + map.count + } + } + private func nextKey() -> UInt64 { let key = keyCounter keyCounter = (keyCounter + 2) & 0xFFFF_FFFF_FFFF diff --git a/uniffi_bindgen/src/bindings/swift/templates/Types.swift b/uniffi_bindgen/src/bindings/swift/templates/Types.swift index aba34f4b0b..ba4d2059c8 100644 --- a/uniffi_bindgen/src/bindings/swift/templates/Types.swift +++ b/uniffi_bindgen/src/bindings/swift/templates/Types.swift @@ -64,8 +64,8 @@ {%- when Type::CallbackInterface { name, module_path } %} {%- include "CallbackInterfaceTemplate.swift" %} -{%- when Type::ForeignExecutor %} -{%- include "ForeignExecutorTemplate.swift" %} +{%- when Type::BlockingTaskQueue %} +{%- include "BlockingTaskQueueTemplate.swift" %} {%- when Type::Custom { name, module_path, builtin } %} {%- include "CustomType.swift" %} diff --git a/uniffi_bindgen/src/interface/ffi.rs b/uniffi_bindgen/src/interface/ffi.rs index de8db86334..0b8d73d514 100644 --- a/uniffi_bindgen/src/interface/ffi.rs +++ b/uniffi_bindgen/src/interface/ffi.rs @@ -46,8 +46,6 @@ pub enum FfiType { ForeignBytes, /// Pointer to a callback function that handles all callbacks on the foreign language side. ForeignCallback, - /// Pointer to the callback function that's invoked to schedule calls with a ForeignExecutor - ForeignExecutorCallback, /// Continuation function for a Rust future RustFutureContinuationCallback, // TODO: you can imagine a richer structural typesystem here, e.g. `Ref` or something. @@ -84,7 +82,7 @@ impl From<&Type> for FfiType { // Object types interfaces are passed as opaque handles. Type::Object { .. } | Type::CallbackInterface { .. } - | Type::ForeignExecutor + | Type::BlockingTaskQueue | Type::External { kind: ExternalKind::Interface, .. diff --git a/uniffi_bindgen/src/interface/mod.rs b/uniffi_bindgen/src/interface/mod.rs index 99420a1504..7b88613dc7 100644 --- a/uniffi_bindgen/src/interface/mod.rs +++ b/uniffi_bindgen/src/interface/mod.rs @@ -463,6 +463,10 @@ impl ComponentInterface { name: "continuation_data".to_owned(), type_: FfiType::Handle, }, + FfiArgument { + name: "blocking_task_queue".to_owned(), + type_: FfiType::UInt64, + }, ], return_type: None, has_rust_call_status_arg: false, @@ -572,7 +576,6 @@ impl ComponentInterface { .chain(self.iter_rust_buffer_ffi_function_definitions()) .chain(self.iter_futures_ffi_function_definitons()) .chain(self.iter_checksum_ffi_functions()) - .chain(self.ffi_foreign_executor_callback_set()) .chain([self.ffi_uniffi_contract_version()]) } @@ -651,27 +654,6 @@ impl ComponentInterface { }) } - /// The ffi_foreign_executor_callback_set FFI function - /// - /// We only include this in the FFI if the `ForeignExecutor` type is actually used - pub fn ffi_foreign_executor_callback_set(&self) -> Option { - if self.types.contains(&Type::ForeignExecutor) { - Some(FfiFunction { - name: format!("ffi_{}_foreign_executor_callback_set", self.ffi_namespace()), - arguments: vec![FfiArgument { - name: "callback".into(), - type_: FfiType::ForeignExecutorCallback, - }], - return_type: None, - is_async: false, - has_rust_call_status_arg: false, - is_object_free_function: false, - }) - } else { - None - } - } - /// List all API checksums to check /// /// Returns a list of (export_symbol_name, checksum) items @@ -777,6 +759,12 @@ impl ComponentInterface { bail!("Conflicting type definition for \"{}\"", defn.name()); } self.types.add_known_types(defn.iter_types())?; + // Add the BlockingTaskQueue if there are any async functions. + // This isn't strictly necessary, but it simplifies the template code. + if defn.is_async() { + self.types + .add_known_type(&uniffi_meta::Type::BlockingTaskQueue)?; + } self.functions.push(defn); Ok(()) diff --git a/uniffi_bindgen/src/interface/universe.rs b/uniffi_bindgen/src/interface/universe.rs index e69d86e44f..14c7286015 100644 --- a/uniffi_bindgen/src/interface/universe.rs +++ b/uniffi_bindgen/src/interface/universe.rs @@ -83,8 +83,8 @@ impl TypeUniverse { Type::Bytes => self.add_type_definition("bytes", type_)?, Type::Timestamp => self.add_type_definition("timestamp", type_)?, Type::Duration => self.add_type_definition("duration", type_)?, - Type::ForeignExecutor => { - self.add_type_definition("ForeignExecutor", type_)?; + Type::BlockingTaskQueue => { + self.add_type_definition("BlockingTaskQueue", type_)?; } Type::Object { name, .. } | Type::Record { name, .. } @@ -119,6 +119,7 @@ impl TypeUniverse { } /// Check if a [Type] is present + #[cfg(test)] pub fn contains(&self, type_: &Type) -> bool { self.all_known_types.contains(type_) } diff --git a/uniffi_bindgen/src/scaffolding/mod.rs b/uniffi_bindgen/src/scaffolding/mod.rs index f3759cf6fa..f75b87bd21 100644 --- a/uniffi_bindgen/src/scaffolding/mod.rs +++ b/uniffi_bindgen/src/scaffolding/mod.rs @@ -45,7 +45,7 @@ mod filters { format!("std::sync::Arc<{}>", imp.rust_name_for(name)) } Type::CallbackInterface { name, .. } => format!("Box"), - Type::ForeignExecutor => "::uniffi::ForeignExecutor".into(), + Type::BlockingTaskQueue => "::uniffi::BlockingTaskQueue".into(), Type::Optional { inner_type } => { format!("std::option::Option<{}>", type_rs(inner_type)?) } diff --git a/uniffi_core/src/ffi/foreigncallbacks.rs b/uniffi_core/src/ffi/foreigncallbacks.rs index 1746489e30..43c7f3ecfa 100644 --- a/uniffi_core/src/ffi/foreigncallbacks.rs +++ b/uniffi_core/src/ffi/foreigncallbacks.rs @@ -10,7 +10,7 @@ use std::sync::atomic::{AtomicUsize, Ordering}; -use crate::{Handle, RustBuffer, RustTaskCallback}; +use crate::{Handle, RustBuffer}; /// ForeignCallback is the Rust representation of a foreign language function. /// It is the basis for all callbacks interfaces. It is registered exactly once per callback interface, @@ -37,31 +37,9 @@ pub type ForeignCallback = unsafe extern "C" fn( buf_ptr: *mut RustBuffer, ) -> i32; -/// Callback to schedule a Rust call with a `ForeignExecutor`. The bindings code registers exactly -/// one of these with the Rust code. -/// -/// Delay is an approximate amount of ms to wait before scheduling the call. Delay is usually 0, -/// which means schedule sometime soon. -/// -/// As a special case, when Rust drops the foreign executor, with `task=null`. The foreign -/// bindings should release the reference to the executor that was reserved for Rust. -/// -/// This callback can be invoked from any thread, including threads created by Rust. -/// -/// The callback should return one of the `ForeignExecutorCallbackResult` values. -pub type ForeignExecutorCallback = extern "C" fn( - executor: Handle, - delay: u32, - task: Option, - task_data: *const (), -) -> i8; - /// Store a [ForeignCallback] pointer pub(crate) struct ForeignCallbackCell(AtomicUsize); -/// Store a [ForeignExecutorCallback] pointer -pub(crate) struct ForeignExecutorCallbackCell(AtomicUsize); - /// Macro to define foreign callback types as well as the callback cell. macro_rules! impl_foreign_callback_cell { ($callback_type:ident, $cell_type:ident) => { @@ -100,4 +78,3 @@ macro_rules! impl_foreign_callback_cell { } impl_foreign_callback_cell!(ForeignCallback, ForeignCallbackCell); -impl_foreign_callback_cell!(ForeignExecutorCallback, ForeignExecutorCallbackCell); diff --git a/uniffi_core/src/ffi/foreignexecutor.rs b/uniffi_core/src/ffi/foreignexecutor.rs deleted file mode 100644 index 3f0f826644..0000000000 --- a/uniffi_core/src/ffi/foreignexecutor.rs +++ /dev/null @@ -1,473 +0,0 @@ -/* This Source Code Form is subject to the terms of the Mozilla Public - * License, v. 2.0. If a copy of the MPL was not distributed with this - * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ - -//! Schedule tasks using a foreign executor. - -use std::panic; - -use crate::{ForeignExecutorCallback, ForeignExecutorCallbackCell, Handle}; - -/// Result code returned by `ForeignExecutorCallback` -#[repr(i8)] -#[derive(Debug, PartialEq, Eq)] -pub enum ForeignExecutorCallbackResult { - /// Callback was scheduled successfully - Success = 0, - /// Callback couldn't be scheduled because the foreign executor is canceled/closed. - Cancelled = 1, - /// Callback couldn't be scheduled because of some other error - Error = 2, -} - -impl ForeignExecutorCallbackResult { - /// Check the result code for the foreign executor callback - /// - /// If the result was `ForeignExecutorCallbackResult.Success`, this method returns `true`. - /// - /// If not, this method returns `false`, logging errors for any unexpected return values - pub fn check_result_code(result: i8) -> bool { - match result { - n if n == ForeignExecutorCallbackResult::Success as i8 => true, - n if n == ForeignExecutorCallbackResult::Cancelled as i8 => false, - n if n == ForeignExecutorCallbackResult::Error as i8 => { - log::error!( - "ForeignExecutorCallbackResult::Error returned by foreign executor callback" - ); - false - } - n => { - log::error!("Unknown code ({n}) returned by foreign executor callback"); - false - } - } - } -} - -// Option should use the null pointer optimization and be represented in C as a -// regular pointer. Let's check that. -static_assertions::assert_eq_size!(usize, Option); - -/// Callback for a Rust task, this is what the foreign executor invokes -/// -/// The task will be passed the `task_data` passed to `ForeignExecutorCallback` in addition to one -/// of the `RustTaskCallbackCode` values. -pub type RustTaskCallback = extern "C" fn(*const (), RustTaskCallbackCode); - -/// Passed to a `RustTaskCallback` function when the executor invokes them. -/// -/// Every `RustTaskCallback` will be invoked eventually, this code is used to distinguish the times -/// when it's invoked successfully vs times when the callback is being called because the foreign -/// executor has been cancelled / shutdown -#[repr(i8)] -#[derive(Debug, PartialEq, Eq)] -pub enum RustTaskCallbackCode { - /// Successful task callback invocation - Success = 0, - /// The `ForeignExecutor` has been cancelled. - /// - /// This signals that any progress using the executor should be halted. In particular, Futures - /// should not continue to progress. - Cancelled = 1, -} - -static FOREIGN_EXECUTOR_CALLBACK: ForeignExecutorCallbackCell = ForeignExecutorCallbackCell::new(); - -/// Set the global ForeignExecutorCallback. This is called by the foreign bindings, normally -/// during initialization. -pub fn foreign_executor_callback_set(callback: ForeignExecutorCallback) { - FOREIGN_EXECUTOR_CALLBACK.set(callback); -} - -/// Schedule Rust calls using a foreign executor -#[derive(Debug)] -pub struct ForeignExecutor { - pub(crate) handle: Handle, -} - -impl ForeignExecutor { - pub fn new(executor: Handle) -> Self { - Self { handle: executor } - } - - /// Schedule a closure to be run. - /// - /// This method can be used for "fire-and-forget" style calls, where the calling code doesn't - /// need to await the result. - /// - /// Closure requirements: - /// - Send: since the closure will likely run on a different thread - /// - 'static: since it runs at an arbitrary time, so all references need to be 'static - /// - panic::UnwindSafe: if the closure panics, it should not corrupt any data - pub fn schedule(&self, delay: u32, task: F) { - let leaked_ptr: *mut F = Box::leak(Box::new(task)); - if !schedule_raw( - self.handle, - delay, - schedule_callback::, - leaked_ptr as *const (), - ) { - // If schedule_raw() failed, drop the leaked box since `schedule_callback()` has not been - // scheduled to run. - unsafe { - drop(Box::::from_raw(leaked_ptr)); - }; - } - } - - /// Schedule a closure to be run and get a Future for the result - /// - /// Closure requirements: - /// - Send: since the closure will likely run on a different thread - /// - 'static: since it runs at an arbitrary time, so all references need to be 'static - /// - panic::UnwindSafe: if the closure panics, it should not corrupt any data - pub async fn run(&self, delay: u32, closure: F) -> T - where - F: FnOnce() -> T + Send + 'static + panic::UnwindSafe, - T: Send + 'static, - { - // Create a oneshot channel to handle the future - let (sender, receiver) = oneshot::channel(); - // We can use `AssertUnwindSafe` here because: - // - The closure is unwind safe - // - `Sender` is not marked unwind safe, maybe this is just an oversight in the oneshot - // library. However, calling `send()` and dropping the Sender should certainly be - // unwind safe. `send()` should probably not panic at all and if it does it shouldn't - // do it in a way that breaks the Receiver. - // - Calling `expect` may result in a panic, but this should should not break either the - // Sender or Receiver. - self.schedule( - delay, - panic::AssertUnwindSafe(move || { - sender.send(closure()).expect("Error sending future result") - }), - ); - receiver.await.expect("Error receiving future result") - } -} - -/// Low-level schedule interface -/// -/// When using this function, take care to ensure that the `ForeignExecutor` that holds the -/// `Handle` has not been dropped. -/// -/// Returns true if the callback was successfully scheduled -pub(crate) fn schedule_raw( - handle: Handle, - delay: u32, - callback: RustTaskCallback, - data: *const (), -) -> bool { - let result_code = (FOREIGN_EXECUTOR_CALLBACK.get())(handle, delay, Some(callback), data); - ForeignExecutorCallbackResult::check_result_code(result_code) -} - -impl Drop for ForeignExecutor { - fn drop(&mut self) { - (FOREIGN_EXECUTOR_CALLBACK.get())(self.handle, 0, None, std::ptr::null()); - } -} - -extern "C" fn schedule_callback(data: *const (), status_code: RustTaskCallbackCode) -where - F: FnOnce() + Send + 'static + panic::UnwindSafe, -{ - // No matter what, we need to call Box::from_raw() to balance the Box::leak() call. - let task = unsafe { Box::from_raw(data as *mut F) }; - // Skip running the task for the `RustTaskCallbackCode::Cancelled` code - if status_code == RustTaskCallbackCode::Success { - run_task(task); - } -} - -/// Run a scheduled task, catching any panics. -/// -/// If there are panics, then we will log a warning and return None. -fn run_task T + panic::UnwindSafe, T>(task: F) -> Option { - match panic::catch_unwind(task) { - Ok(v) => Some(v), - Err(cause) => { - let message = if let Some(s) = cause.downcast_ref::<&'static str>() { - (*s).to_string() - } else if let Some(s) = cause.downcast_ref::() { - s.clone() - } else { - "Unknown panic!".to_string() - }; - log::warn!("Error calling UniFFI callback function: {message}"); - None - } - } -} - -#[cfg(test)] -pub use test::MockEventLoop; - -#[cfg(test)] -mod test { - use super::*; - use crate::HandleAlloc; - use std::{ - future::Future, - pin::Pin, - sync::{ - atomic::{AtomicU32, Ordering}, - Arc, Mutex, Once, - }, - task::{Context, Poll, Wake, Waker}, - }; - - /// Simulate an event loop / task queue / coroutine scope on the foreign side - /// - /// This simply collects scheduled calls into a Vec for testing purposes. - /// - /// Most of the MockEventLoop methods are `pub` since it's also used by the `rustfuture` tests. - pub struct MockEventLoop { - // Wrap everything in a mutex since we typically share access to MockEventLoop via an Arc - inner: Mutex, - } - - pub struct MockEventLoopInner { - // calls that have been scheduled - calls: Vec<(u32, Option, *const ())>, - // has the event loop been shutdown? - is_shutdown: bool, - } - - unsafe impl Send for MockEventLoopInner {} - - static FOREIGN_EXECUTOR_CALLBACK_INIT: Once = Once::new(); - - impl MockEventLoop { - pub fn new() -> Arc { - // Make sure we install a foreign executor callback that can deal with mock event loops - FOREIGN_EXECUTOR_CALLBACK_INIT - .call_once(|| foreign_executor_callback_set(mock_executor_callback)); - - Arc::new(Self { - inner: Mutex::new(MockEventLoopInner { - calls: vec![], - is_shutdown: false, - }), - }) - } - - /// Create a new Handle - pub fn new_handle(self: &Arc) -> Handle { - >::new_handle(Arc::clone(self)) - } - - pub fn new_executor(self: &Arc) -> ForeignExecutor { - ForeignExecutor { - handle: self.new_handle(), - } - } - - /// Get the current number of scheduled calls - pub fn call_count(&self) -> usize { - self.inner.lock().unwrap().calls.len() - } - - /// Get the last scheduled call - pub fn last_call(&self) -> (u32, Option, *const ()) { - self.inner - .lock() - .unwrap() - .calls - .last() - .cloned() - .expect("no calls scheduled") - } - - /// Run all currently scheduled calls - pub fn run_all_calls(&self) { - let mut inner = self.inner.lock().unwrap(); - let is_shutdown = inner.is_shutdown; - for (_delay, callback, data) in inner.calls.drain(..) { - if !is_shutdown { - callback.unwrap()(data, RustTaskCallbackCode::Success); - } else { - callback.unwrap()(data, RustTaskCallbackCode::Cancelled); - } - } - } - - /// Shutdown the eventloop, causing scheduled calls and future calls to be cancelled - pub fn shutdown(&self) { - self.inner.lock().unwrap().is_shutdown = true; - } - } - - // `ForeignExecutorCallback` that we install for testing - extern "C" fn mock_executor_callback( - handle: Handle, - delay: u32, - task: Option, - task_data: *const (), - ) -> i8 { - let eventloop = >::get_arc(handle); - let mut inner = eventloop.inner.lock().unwrap(); - if inner.is_shutdown { - ForeignExecutorCallbackResult::Cancelled as i8 - } else { - inner.calls.push((delay, task, task_data)); - ForeignExecutorCallbackResult::Success as i8 - } - } - - #[test] - fn test_schedule_raw() { - extern "C" fn callback(data: *const (), _status_code: RustTaskCallbackCode) { - unsafe { - *(data as *mut u32) += 1; - } - } - - let eventloop = MockEventLoop::new(); - - let value: u32 = 0; - assert_eq!(eventloop.call_count(), 0); - - schedule_raw( - eventloop.new_handle(), - 0, - callback, - &value as *const u32 as *const (), - ); - assert_eq!(eventloop.call_count(), 1); - assert_eq!(value, 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(value, 1); - } - - #[test] - fn test_schedule() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - let value = Arc::new(AtomicU32::new(0)); - assert_eq!(eventloop.call_count(), 0); - - let value2 = value.clone(); - executor.schedule(0, move || { - value2.fetch_add(1, Ordering::Relaxed); - }); - assert_eq!(eventloop.call_count(), 1); - assert_eq!(value.load(Ordering::Relaxed), 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(value.load(Ordering::Relaxed), 1); - } - - #[derive(Default)] - struct MockWaker { - wake_count: AtomicU32, - } - - impl Wake for MockWaker { - fn wake(self: Arc) { - self.wake_count.fetch_add(1, Ordering::Relaxed); - } - } - - #[test] - fn test_run() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - let mock_waker = Arc::new(MockWaker::default()); - let waker = Waker::from(mock_waker.clone()); - let mut context = Context::from_waker(&waker); - assert_eq!(eventloop.call_count(), 0); - - let mut future = executor.run(0, move || "test-return-value"); - unsafe { - assert_eq!( - Pin::new_unchecked(&mut future).poll(&mut context), - Poll::Pending - ); - } - assert_eq!(eventloop.call_count(), 1); - assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 0); - - eventloop.run_all_calls(); - assert_eq!(eventloop.call_count(), 0); - assert_eq!(mock_waker.wake_count.load(Ordering::Relaxed), 1); - unsafe { - assert_eq!( - Pin::new_unchecked(&mut future).poll(&mut context), - Poll::Ready("test-return-value") - ); - } - } - - #[test] - fn test_drop() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - - drop(executor); - // Calling drop should schedule a call with null task data. - assert_eq!(eventloop.call_count(), 1); - assert_eq!(eventloop.last_call().1, None); - } - - // Test that cancelled calls never run - #[test] - fn test_cancelled_call() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - // Create a shared counter - let counter = Arc::new(AtomicU32::new(0)); - // schedule increments using both `schedule()` and run()` - let counter_clone = Arc::clone(&counter); - executor.schedule(0, move || { - counter_clone.fetch_add(1, Ordering::Relaxed); - }); - let counter_clone = Arc::clone(&counter); - let future = executor.run(0, move || { - counter_clone.fetch_add(1, Ordering::Relaxed); - }); - // shutdown the eventloop before the scheduled call gets a chance to run. - eventloop.shutdown(); - // `run_all_calls()` will cause the scheduled task callbacks to run, but will pass - // `RustTaskCallbackCode::Cancelled` to it. This drop the scheduled closure without executing - // it. - eventloop.run_all_calls(); - - assert_eq!(counter.load(Ordering::Relaxed), 0); - drop(future); - } - - // Test that when scheduled calls are cancelled, the closures are dropped properly - #[test] - fn test_cancellation_drops_closures() { - let eventloop = MockEventLoop::new(); - let executor = eventloop.new_executor(); - - // Create an Arc<> that we will move into the closures to test if they are dropped or not - let arc = Arc::new(0); - let arc_clone = Arc::clone(&arc); - executor.schedule(0, move || assert_eq!(*arc_clone, 0)); - let arc_clone = Arc::clone(&arc); - let future = executor.run(0, move || assert_eq!(*arc_clone, 0)); - - // shutdown the eventloop and run the (cancelled) scheduled calls. - eventloop.shutdown(); - eventloop.run_all_calls(); - // try to schedule some more calls now that the loop has been shutdown - let arc_clone = Arc::clone(&arc); - executor.schedule(0, move || assert_eq!(*arc_clone, 0)); - let arc_clone = Arc::clone(&arc); - let future2 = executor.run(0, move || assert_eq!(*arc_clone, 0)); - - // Drop the futures so they don't hold on to any references - drop(future); - drop(future2); - - // All of these closures should have been dropped by now, there only remaining arc - // reference should be the original - assert_eq!(Arc::strong_count(&arc), 1); - } -} diff --git a/uniffi_core/src/ffi/mod.rs b/uniffi_core/src/ffi/mod.rs index f7176671bf..8e26be37b0 100644 --- a/uniffi_core/src/ffi/mod.rs +++ b/uniffi_core/src/ffi/mod.rs @@ -8,7 +8,6 @@ pub mod callbackinterface; pub mod ffidefault; pub mod foreignbytes; pub mod foreigncallbacks; -pub mod foreignexecutor; pub mod handle; pub mod rustbuffer; pub mod rustcalls; @@ -18,7 +17,6 @@ pub use callbackinterface::*; pub use ffidefault::FfiDefault; pub use foreignbytes::*; pub use foreigncallbacks::*; -pub use foreignexecutor::*; pub use handle::*; pub use rustbuffer::*; pub use rustcalls::*; diff --git a/uniffi_core/src/ffi/rustfuture/future.rs b/uniffi_core/src/ffi/rustfuture/future.rs index 4e9dc6ca10..bf04954fa9 100644 --- a/uniffi_core/src/ffi/rustfuture/future.rs +++ b/uniffi_core/src/ffi/rustfuture/future.rs @@ -16,11 +16,15 @@ //! 2a. In a loop: //! - Call [rust_future_poll] //! - Suspend the function until the [rust_future_poll] continuation function is called -//! - If the continuation was function was called with [RustFuturePoll::Ready], then break +//! - If the continuation was function was called with [super::RustFuturePoll::Ready], then break //! otherwise continue. //! 2b. If the async function is cancelled, then call [rust_future_cancel]. This causes the -//! continuation function to be called with [RustFuturePoll::Ready] and the [RustFuture] to +//! continuation function to be called with [super::RustFuturePoll::Ready] and the [RustFuture] to //! enter a cancelled state. +//! 2c. If the Rust code wants schedule work to be run in a `BlockingTaskQueue`, then the +//! continuation is called with [super::RustFuturePoll::MaybeReady] and the blocking task queue handle. +//! The foreign code is responsible for ensuring the next [rust_future_poll] call happens in +//! that blocking task queue and the handle is passed to [rust_future_poll]. //! 3. Call [rust_future_complete] to get the result of the future. //! 4. Call [rust_future_free] to free the future, ideally in a finally block. This: //! - Releases any resources held by the future @@ -85,7 +89,7 @@ use std::{ task::{Context, Poll, Wake}, }; -use super::{RustFutureContinuationCallback, RustFuturePoll, Scheduler}; +use super::{RustFutureContinuationCallback, Scheduler}; use crate::{ derive_ffi_traits, rust_call_with_out_status, FfiDefault, Handle, LowerReturn, RustCallStatus, }; @@ -204,7 +208,7 @@ where // This Mutex should never block if our code is working correctly, since there should not be // multiple threads calling [Self::poll] and/or [Self::complete] at the same time. future: Mutex>, - continuation_data: Mutex, + scheduler: Mutex, // UT is used as the generic parameter for [LowerReturn]. // Let's model this with PhantomData as a function that inputs a UT value. _phantom: PhantomData ()>, @@ -220,34 +224,39 @@ where pub(super) fn new(future: F, _tag: UT) -> Arc { Arc::new(Self { future: Mutex::new(WrappedFuture::new(future)), - continuation_data: Mutex::new(Scheduler::new()), + scheduler: Mutex::new(Scheduler::new()), _phantom: PhantomData, }) } - pub(super) fn poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle) { - let ready = self.is_cancelled() || { + pub(super) fn poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ) { + let mut ready = self + .scheduler + .lock() + .unwrap() + .on_poll_start(blocking_task_queue); + if !ready { let mut locked = self.future.lock().unwrap(); let waker: std::task::Waker = Arc::clone(&self).into(); - locked.poll(&mut Context::from_waker(&waker)) + ready = locked.poll(&mut Context::from_waker(&waker)) }; - if ready { - callback(data, RustFuturePoll::Ready) - } else { - self.continuation_data.lock().unwrap().store(callback, data); - } - } - - pub(super) fn is_cancelled(&self) -> bool { - self.continuation_data.lock().unwrap().is_cancelled() + self.scheduler + .lock() + .unwrap() + .on_poll_end(callback, data, ready); } pub(super) fn wake(&self) { - self.continuation_data.lock().unwrap().wake(); + self.scheduler.lock().unwrap().wake(); } pub(super) fn cancel(&self) { - self.continuation_data.lock().unwrap().cancel(); + self.scheduler.lock().unwrap().cancel(); } pub(super) fn complete(&self, call_status: &mut RustCallStatus) -> T::ReturnType { @@ -256,7 +265,7 @@ where pub(super) fn free(self: Arc) { // Call cancel() to send any leftover data to the continuation callback - self.continuation_data.lock().unwrap().cancel(); + self.scheduler.lock().unwrap().cancel(); // Ensure we drop our inner future, releasing all held references self.future.lock().unwrap().free(); } @@ -291,7 +300,12 @@ where /// only create those functions for each of the 13 possible FFI return types. #[doc(hidden)] pub trait RustFutureFfi: Send + Sync { - fn ffi_poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle); + fn ffi_poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ); fn ffi_cancel(&self); fn ffi_complete(&self, call_status: &mut RustCallStatus) -> ReturnType; fn ffi_free(self: Arc); @@ -304,8 +318,13 @@ where T: LowerReturn + Send + 'static, UT: Send + 'static, { - fn ffi_poll(self: Arc, callback: RustFutureContinuationCallback, data: Handle) { - self.poll(callback, data) + fn ffi_poll( + self: Arc, + callback: RustFutureContinuationCallback, + data: Handle, + blocking_task_queue: Option, + ) { + self.poll(callback, data, blocking_task_queue) } fn ffi_cancel(&self) { diff --git a/uniffi_core/src/ffi/rustfuture/mod.rs b/uniffi_core/src/ffi/rustfuture/mod.rs index 17630db05b..f73d3aa559 100644 --- a/uniffi_core/src/ffi/rustfuture/mod.rs +++ b/uniffi_core/src/ffi/rustfuture/mod.rs @@ -14,6 +14,8 @@ mod tests; use crate::{Handle, HandleAlloc, LowerReturn, RustCallStatus}; +pub use scheduler::BlockingTaskQueue; + /// Result code for [rust_future_poll]. This is passed to the continuation function. #[repr(i8)] #[derive(Debug, PartialEq, Eq)] @@ -28,7 +30,13 @@ pub enum RustFuturePoll { /// /// The Rust side of things calls this when the foreign side should call [rust_future_poll] again /// to continue progress on the future. -pub type RustFutureContinuationCallback = extern "C" fn(callback_data: Handle, RustFuturePoll); +/// +/// * `data` is the handle that the foreign code passed to `poll()` +/// * `poll_result` is the result of the poll +/// * `blocking_task_task_queue` is the blocking task queue handle to run the next `poll()` on or 0 if it +/// doesn't need to run in a blocking task queue. +pub type RustFutureContinuationCallback = + extern "C" fn(data: Handle, poll_result: RustFuturePoll, blocking_task_task_queue: u64); // === Public FFI API === @@ -58,17 +66,25 @@ where /// a [RustFuturePoll] value. For each [rust_future_poll] call the continuation will be called /// exactly once. /// +/// If this poll is running on a blocking task queue, then `blocking_task_queue` is the handle for it. +/// Otherwise `blocking_task_queue` is 0. +/// /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] pub unsafe fn rust_future_poll( - handle: Handle, + future: Handle, callback: RustFutureContinuationCallback, data: Handle, + blocking_task_queue: u64, ) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_poll(callback, data) + as HandleAlloc>::get_arc(future).ffi_poll( + callback, + data, + Handle::from_raw(blocking_task_queue), + ) } /// Cancel a Rust future @@ -81,11 +97,11 @@ pub unsafe fn rust_future_poll( /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] -pub unsafe fn rust_future_cancel(handle: Handle) +pub unsafe fn rust_future_cancel(future: Handle) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_cancel() + as HandleAlloc>::get_arc(future).ffi_cancel() } /// Complete a Rust future @@ -99,13 +115,13 @@ where /// - The `T` param must correctly correspond to the [rust_future_new] call. It must /// be `>::ReturnType` pub unsafe fn rust_future_complete( - handle: Handle, + future: Handle, out_status: &mut RustCallStatus, ) -> ReturnType where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::get_arc(handle).ffi_complete(out_status) + as HandleAlloc>::get_arc(future).ffi_complete(out_status) } /// Free a Rust future, dropping the strong reference and releasing all references held by the @@ -114,9 +130,9 @@ where /// # Safety /// /// The [Handle] must not previously have been passed to [rust_future_free] -pub unsafe fn rust_future_free(handle: Handle) +pub unsafe fn rust_future_free(future: Handle) where dyn RustFutureFfi: HandleAlloc, { - as HandleAlloc>::consume_handle(handle).ffi_free() + as HandleAlloc>::consume_handle(future).ffi_free() } diff --git a/uniffi_core/src/ffi/rustfuture/scheduler.rs b/uniffi_core/src/ffi/rustfuture/scheduler.rs index f2d5dc097f..34951fe45d 100644 --- a/uniffi_core/src/ffi/rustfuture/scheduler.rs +++ b/uniffi_core/src/ffi/rustfuture/scheduler.rs @@ -2,11 +2,72 @@ * License, v. 2.0. If a copy of the MPL was not distributed with this * file, You can obtain one at http://mozilla.org/MPL/2.0/. */ -use std::mem; +use std::{cell::RefCell, future::poll_fn, mem, task::Poll, thread_local}; use super::{RustFutureContinuationCallback, RustFuturePoll}; use crate::Handle; +/// Context of the current `RustFuture::poll` call +struct RustFutureContext { + /// Blocking task queue that the future is being polled on + current_blocking_task_queue: Option, + /// Blocking task queue that we've been asked to schedule the next poll on + scheduled_blocking_task_queue: Option, +} + +thread_local! { + static CONTEXT: RefCell = RefCell::new(RustFutureContext { + current_blocking_task_queue: None, + scheduled_blocking_task_queue: None, + }); +} + +fn with_context R, R>(operation: F) -> R { + CONTEXT.with(|context| operation(&mut *context.borrow_mut())) +} + +/// Schedule work in a blocking task queue +/// +/// The returned future will attempt to arrange for [RustFuture::poll] to be called in the +/// blocking task queue. Once [RustFuture::poll] is running in the blocking task queue, then the future +/// will be ready. +/// +/// There's one tricky issue here: how can we ensure that when the top-level task is run in the +/// blocking task queue, this future will be polled? What happens this future is a child of `join!`, +/// `FuturesUnordered` or some other Future that handles its own polling? +/// +/// We start with an assumption: if we notify the waker then this future will be polled when the +/// top-level task is polled next. If a future does not honor this then we consider it a broken +/// future. This seems fair, since that future would almost certainly break a lot of other future +/// code. +/// +/// Based on that, we can have a simple system. When we're polled: +/// * If we're running in the blocking task queue, then we return `Poll::Ready`. +/// * If not, we return `Poll::Pending` and notify the waker so that the future polls again on +/// the next top-level poll. +/// +/// Note that this can be inefficient if the code awaits multiple blocking task queues at once. We +/// can only run the next poll on one of them, but all futures will be woken up. This seems okay +/// for our intended use cases, it would be pretty odd for a library to use multiple blocking task +/// queues. The alternative would be to store the set of all pending blocking task queues, which +/// seems like complete overkill for our purposes. +pub(super) async fn schedule_in_blocking_task_queue(blocking_task_queue: Handle) { + poll_fn(|future_context| { + with_context(|poll_context| { + if poll_context.current_blocking_task_queue == Some(blocking_task_queue) { + Poll::Ready(()) + } else { + poll_context + .scheduled_blocking_task_queue + .get_or_insert(blocking_task_queue); + future_context.waker().wake_by_ref(); + Poll::Pending + } + }) + }) + .await +} + /// Schedules a [crate::RustFuture] by managing the continuation data /// /// This struct manages the continuation callback and data that comes from the foreign side. It @@ -26,7 +87,7 @@ pub(super) enum Scheduler { Empty, /// `wake()` was called when there was no continuation set. The next time `store` is called, /// the continuation should be immediately invoked with `RustFuturePoll::MaybeReady` - Waked, + ShouldWake, /// The future has been cancelled, any future `store` calls should immediately result in the /// continuation being called with `RustFuturePoll::Ready`. Cancelled, @@ -39,26 +100,85 @@ impl Scheduler { Self::Empty } - /// Store new continuation data if we are in the `Empty` state. If we are in the `Waked` or - /// `Cancelled` state, call the continuation immediately with the data. - pub(super) fn store(&mut self, callback: RustFutureContinuationCallback, data: Handle) { + /// Called at the start of a `RustFuture::poll()` call. + /// + /// Returns true if the future is already ready and can skip the `poll()` call (for example, + /// because we're cancelled). + pub(super) fn on_poll_start(&mut self, current_blocking_task_queue: Option) -> bool { + // Reset the context + with_context(|context| { + *context = RustFutureContext { + current_blocking_task_queue, + scheduled_blocking_task_queue: None, + } + }); match self { - Self::Empty => *self = Self::Set(callback, data), - Self::Set(old_callback, old_data) => { + Self::Cancelled => true, + Self::Set(callback, data) => { + // Somehow we're in poll(), but never called our callback. Something is wrong, but + // let's call the callback anyways. That way at least we're keeping our side of + // the contract. log::error!( - "store: observed `Self::Set` state. Is poll() being called from multiple threads at once?" + "Scheduler::on_poll_start: observed `Self::Set` state. Is poll() being called from multiple threads at once?" ); - old_callback(*old_data, RustFuturePoll::Ready); - *self = Self::Set(callback, data); + callback(*data, RustFuturePoll::Ready, 0); + *self = Self::Empty; + false } - Self::Waked => { + Self::ShouldWake => { + // We're about to poll now, so we can unset `ShouldWake` *self = Self::Empty; - callback(data, RustFuturePoll::MaybeReady); + false } - Self::Cancelled => { - callback(data, RustFuturePoll::Ready); + Self::Empty => false, + } + } + + /// Called at the end of a `RustFuture::poll()` call. + /// + /// We will either store the continuation callback or call it immediately. + pub(super) fn on_poll_end( + &mut self, + callback: RustFutureContinuationCallback, + data: Handle, + ready: bool, + ) { + let scheduled_blocking_task_queue = + with_context(|context| context.scheduled_blocking_task_queue); + if ready || self.is_cancelled() { + // The future is ready now, call the callback + callback(data, RustFuturePoll::Ready, 0); + } else if let Some(handle) = scheduled_blocking_task_queue { + // We were asked to schedule the future in a blocking task queue, call the callback + callback(data, RustFuturePoll::MaybeReady, handle.as_raw()); + } else { + match self { + // [Self::wake] was called during the poll + Self::ShouldWake => { + *self = Self::Empty; + callback(data, RustFuturePoll::MaybeReady, 0); + } + // Nothing special happened, store the callback to prepare for the next [Self::wake] + Self::Empty => *self = Self::Set(callback, data), + // This should never happen + Self::Set(old_callback, old_data) => { + log::error!( + "on_poll_end: observed `Self::Set` state which should never happen" + ); + old_callback(*old_data, RustFuturePoll::Ready, 0); + *self = Self::Set(callback, data); + } + // We checked [Self::is_cancelled] above. + Self::Cancelled => unreachable!(), } } + // Reset the context + with_context(|context| { + *context = RustFutureContext { + current_blocking_task_queue: None, + scheduled_blocking_task_queue: None, + } + }); } pub(super) fn wake(&mut self) { @@ -68,11 +188,11 @@ impl Scheduler { let old_data = *old_data; let callback = *callback; *self = Self::Empty; - callback(old_data, RustFuturePoll::MaybeReady); + callback(old_data, RustFuturePoll::MaybeReady, 0); } // If we were in the `Empty` state, then transition to `Waked`. The next time `store` // is called, we will immediately call the continuation. - Self::Empty => *self = Self::Waked, + Self::Empty => *self = Self::ShouldWake, // This is a no-op if we were in the `Cancelled` or `Waked` state. _ => (), } @@ -80,18 +200,42 @@ impl Scheduler { pub(super) fn cancel(&mut self) { if let Self::Set(callback, old_data) = mem::replace(self, Self::Cancelled) { - callback(old_data, RustFuturePoll::Ready); + callback(old_data, RustFuturePoll::Ready, 0); } } - pub(super) fn is_cancelled(&self) -> bool { matches!(self, Self::Cancelled) } } -// The `Handle` data pointer references an object on the foreign side. -// This object must be `Sync` in Rust terminology -- it must be safe for us to pass the pointer to the continuation callback from any thread. -// If the foreign side upholds their side of the contract, then `Scheduler` is Send + Sync. +/// Represents a foreign-managed blocking task queue that we can use to schedule futures in +/// +/// On the foreign side this is a Kotlin `CoroutineContext`, Python `Executor` or Swift +/// `DispatchQueue`. Foreign code can pass one of those objects into exported Rust functions and +/// UniFFI will create a `Handle` to represent it. The Rust code on the other side gets a +/// `BlockingTaskQueue` instance. +/// +/// Rust async code can call [BlockingTaskQueue::run_blocking] to run a closure in that +/// blocking task queue. Use this for functions with blocking operations that should not be executed +/// in a normal async context. Some examples are non-async file/network operations, long-running +/// CPU-bound tasks, blocking database operations, etc. +pub struct BlockingTaskQueue(Handle); + +impl BlockingTaskQueue { + pub fn new(handle: Handle) -> Self { + Self(handle) + } -unsafe impl Send for Scheduler {} -unsafe impl Sync for Scheduler {} + pub fn into_handle(self) -> Handle { + self.0 + } + + /// Run a closure in a blocking task queue + pub async fn run_blocking(&self, f: F) -> R + where + F: FnOnce() -> R, + { + schedule_in_blocking_task_queue(self.0).await; + f() + } +} diff --git a/uniffi_core/src/ffi/rustfuture/tests.rs b/uniffi_core/src/ffi/rustfuture/tests.rs index 475a8360e8..67c36d0bd8 100644 --- a/uniffi_core/src/ffi/rustfuture/tests.rs +++ b/uniffi_core/src/ffi/rustfuture/tests.rs @@ -65,17 +65,42 @@ fn channel() -> (Sender, Arc>) { } /// Poll a Rust future and get an OnceCell that's set when the continuation is called -fn poll(rust_future: &Arc>) -> Arc> { +fn poll(rust_future: &Arc>) -> Arc> { let cell = Arc::new(OnceCell::new()); - let handle = - as HandleAlloc>::new_handle(Arc::clone(&cell)); - rust_future.clone().ffi_poll(poll_continuation, handle); + let handle = as HandleAlloc>::new_handle( + Arc::clone(&cell), + ); + rust_future + .clone() + .ffi_poll(poll_continuation, handle, None); + cell +} + +/// Like poll, but simulate `poll()` being called from a blocking task queue +fn poll_from_blocking_task_queue( + rust_future: &Arc>, + blocking_task_queue: Handle, +) -> Arc> { + let cell = Arc::new(OnceCell::new()); + let handle = as HandleAlloc>::new_handle( + Arc::clone(&cell), + ); + rust_future + .clone() + .ffi_poll(poll_continuation, handle, Some(blocking_task_queue)); cell } -extern "C" fn poll_continuation(handle: Handle, code: RustFuturePoll) { - let cell = as HandleAlloc>::get_arc(handle); - cell.set(code).expect("Error setting OnceCell"); +extern "C" fn poll_continuation( + continuation_data: Handle, + code: RustFuturePoll, + blocking_task_queue: u64, +) { + let cell = as HandleAlloc>::get_arc( + continuation_data, + ); + cell.set((code, blocking_task_queue)) + .expect("Error setting OnceCell"); } fn complete(rust_future: Arc>) -> (RustBuffer, RustCallStatus) { @@ -84,25 +109,44 @@ fn complete(rust_future: Arc>) -> (RustBuffer, Rus (return_value, out_status_code) } +fn check_continuation_not_called(once_cell: &OnceCell<(RustFuturePoll, u64)>) { + assert_eq!(once_cell.get(), None); +} + +fn check_continuation_called( + once_cell: &OnceCell<(RustFuturePoll, u64)>, + poll_result: RustFuturePoll, +) { + assert_eq!(once_cell.get(), Some(&(poll_result, 0))); +} + +fn check_continuation_called_with_blocking_task_queue_handle( + once_cell: &OnceCell<(RustFuturePoll, u64)>, + poll_result: RustFuturePoll, + handle: Handle, +) { + assert_eq!(once_cell.get(), Some(&(poll_result, handle.as_raw()))); +} + #[test] fn test_success() { let (sender, rust_future) = channel(); // Test polling the rust future before it's ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.wake(); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // Test polling the rust future when it's ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.send(Ok("All done".into())); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // Future polls should immediately return ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); // Complete the future let (return_buf, call_status) = complete(rust_future); @@ -118,12 +162,12 @@ fn test_error() { let (sender, rust_future) = channel(); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); sender.send(Err("Something went wrong".into())); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); let (_, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Error); @@ -145,14 +189,14 @@ fn test_cancel() { let (_sender, rust_future) = channel(); let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), None); + check_continuation_not_called(&continuation_result); rust_future.ffi_cancel(); // Cancellation should immediately invoke the callback with RustFuturePoll::Ready - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); // Future polls should immediately invoke the callback with RustFuturePoll::Ready let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); let (_, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Cancelled); @@ -188,7 +232,7 @@ fn test_complete_with_stored_continuation() { let continuation_result = poll(&rust_future); rust_future.ffi_free(); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); } // Test what happens if we see a `wake()` call while we're polling the future. This can @@ -211,10 +255,47 @@ fn test_wake_during_poll() { let rust_future: Arc> = RustFuture::new(future, crate::UniFfiTag); let continuation_result = poll(&rust_future); // The continuation function should called immediately - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::MaybeReady)); + check_continuation_called(&continuation_result, RustFuturePoll::MaybeReady); // A second poll should finish the future let continuation_result = poll(&rust_future); - assert_eq!(continuation_result.get(), Some(&RustFuturePoll::Ready)); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); + let (return_buf, call_status) = complete(rust_future); + assert_eq!(call_status.code, RustCallStatusCode::Success); + assert_eq!( + >::try_lift(return_buf).unwrap(), + "All done" + ); +} + +#[test] +fn test_blocking_task() { + let blocking_task_queue_handle = Handle::from_raw_unchecked(1001); + let future = async move { + schedule_in_blocking_task_queue(blocking_task_queue_handle).await; + "All done".to_owned() + }; + let rust_future: Arc> = RustFuture::new(future, crate::UniFfiTag); + // On the first poll, the future should not be ready and it should ask to be scheduled in the + // blocking task queue + let continuation_result = poll(&rust_future); + check_continuation_called_with_blocking_task_queue_handle( + &continuation_result, + RustFuturePoll::MaybeReady, + blocking_task_queue_handle, + ); + // If we poll it again not in a blocking task queue, then we get the same result + let continuation_result = poll(&rust_future); + check_continuation_called_with_blocking_task_queue_handle( + &continuation_result, + RustFuturePoll::MaybeReady, + blocking_task_queue_handle, + ); + // When we poll it in the blocking task queue, then the future is ready + let continuation_result = + poll_from_blocking_task_queue(&rust_future, blocking_task_queue_handle); + check_continuation_called(&continuation_result, RustFuturePoll::Ready); + + // Complete the future let (return_buf, call_status) = complete(rust_future); assert_eq!(call_status.code, RustCallStatusCode::Success); assert_eq!( diff --git a/uniffi_core/src/ffi_converter_impls.rs b/uniffi_core/src/ffi_converter_impls.rs index 2460f87cb0..d3d8dd112c 100644 --- a/uniffi_core/src/ffi_converter_impls.rs +++ b/uniffi_core/src/ffi_converter_impls.rs @@ -23,7 +23,7 @@ /// "UT" means an abitrary `UniFfiTag` type. use crate::{ check_remaining, derive_ffi_traits, ffi_converter_rust_buffer_lift_and_lower, metadata, - ConvertError, FfiConverter, ForeignExecutor, Handle, Lift, LiftReturn, Lower, LowerReturn, + BlockingTaskQueue, ConvertError, FfiConverter, Handle, Lift, LiftReturn, Lower, LowerReturn, MetadataBuffer, Result, RustBuffer, UnexpectedUniFFICallbackError, }; use anyhow::bail; @@ -405,25 +405,21 @@ where .concat(V::TYPE_ID_META); } -/// FFI support for [ForeignExecutor] -/// -/// These are passed over the FFI as opaque pointer-sized types representing the foreign executor. -/// The foreign bindings may use an actual pointer to the executor object, or a usized integer -/// handle. -unsafe impl FfiConverter for ForeignExecutor { +/// FFI support for [BlockingTaskQueue] +unsafe impl FfiConverter for BlockingTaskQueue { type FfiType = Handle; // Passing these back to the foreign bindings is currently not supported - fn lower(executor: Self) -> Self::FfiType { - executor.handle + fn lower(blocking_task_queue: Self) -> Self::FfiType { + blocking_task_queue.into_handle() } - fn write(executor: Self, buf: &mut Vec) { - buf.put_u64(executor.handle.as_raw()) + fn write(blocking_task_queue: Self, buf: &mut Vec) { + buf.put_u64(blocking_task_queue.into_handle().as_raw()) } - fn try_lift(executor: Self::FfiType) -> Result { - Ok(ForeignExecutor::new(executor)) + fn try_lift(handle: Self::FfiType) -> Result { + Ok(BlockingTaskQueue::new(handle)) } fn try_read(buf: &mut &[u8]) -> Result { @@ -431,7 +427,7 @@ unsafe impl FfiConverter for ForeignExecutor { } const TYPE_ID_META: MetadataBuffer = - MetadataBuffer::from_code(metadata::codes::TYPE_FOREIGN_EXECUTOR); + MetadataBuffer::from_code(metadata::codes::TYPE_BLOCKING_TASK_QUEUE); } derive_ffi_traits!(blanket u8); @@ -448,7 +444,7 @@ derive_ffi_traits!(blanket bool); derive_ffi_traits!(blanket String); derive_ffi_traits!(blanket Duration); derive_ffi_traits!(blanket SystemTime); -derive_ffi_traits!(blanket ForeignExecutor); +derive_ffi_traits!(blanket BlockingTaskQueue); // For composite types, derive LowerReturn, LiftReturn, etc, from Lift/Lower. // diff --git a/uniffi_core/src/metadata.rs b/uniffi_core/src/metadata.rs index 770d2b36d5..d69c0449da 100644 --- a/uniffi_core/src/metadata.rs +++ b/uniffi_core/src/metadata.rs @@ -67,7 +67,7 @@ pub mod codes { pub const TYPE_CUSTOM: u8 = 22; pub const TYPE_RESULT: u8 = 23; pub const TYPE_FUTURE: u8 = 24; - pub const TYPE_FOREIGN_EXECUTOR: u8 = 25; + pub const TYPE_BLOCKING_TASK_QUEUE: u8 = 25; pub const TYPE_UNIT: u8 = 255; // Literal codes for LiteralMetadata - note that we don't support diff --git a/uniffi_macros/src/setup_scaffolding.rs b/uniffi_macros/src/setup_scaffolding.rs index fbce21a1e9..ee94c67df9 100644 --- a/uniffi_macros/src/setup_scaffolding.rs +++ b/uniffi_macros/src/setup_scaffolding.rs @@ -20,8 +20,6 @@ pub fn setup_scaffolding(namespace: String) -> Result { let ffi_rustbuffer_free_ident = format_ident!("ffi_{module_path}_rustbuffer_free"); let ffi_rustbuffer_reserve_ident = format_ident!("ffi_{module_path}_rustbuffer_reserve"); let reexport_hack_ident = format_ident!("{module_path}_uniffi_reexport_hack"); - let ffi_foreign_executor_callback_set_ident = - format_ident!("ffi_{module_path}_foreign_executor_callback_set"); let ffi_rust_future_scaffolding_fns = rust_future_scaffolding_fns(&module_path); Ok(quote! { @@ -87,13 +85,6 @@ pub fn setup_scaffolding(namespace: String) -> Result { uniffi::ffi::uniffi_rustbuffer_reserve(buf, additional, call_status) } - #[allow(clippy::missing_safety_doc, missing_docs)] - #[doc(hidden)] - #[no_mangle] - pub extern "C" fn #ffi_foreign_executor_callback_set_ident(callback: uniffi::ffi::ForeignExecutorCallback) { - uniffi::ffi::foreign_executor_callback_set(callback) - } - #ffi_rust_future_scaffolding_fns // Code to re-export the UniFFI scaffolding functions. @@ -175,8 +166,8 @@ fn rust_future_scaffolding_fns(module_path: &str) -> TokenStream { #[allow(clippy::missing_safety_doc, missing_docs)] #[doc(hidden)] #[no_mangle] - pub unsafe extern "C" fn #ffi_rust_future_poll(handle: ::uniffi::Handle, callback: ::uniffi::RustFutureContinuationCallback, data: ::uniffi::Handle) { - ::uniffi::ffi::rust_future_poll::<#return_type, crate::UniFfiTag>(handle, callback, data); + pub unsafe extern "C" fn #ffi_rust_future_poll(handle: ::uniffi::Handle, callback: ::uniffi::RustFutureContinuationCallback, data: ::uniffi::Handle, blocking_task_queue: u64) { + ::uniffi::ffi::rust_future_poll::<#return_type, crate::UniFfiTag>(handle, callback, data, blocking_task_queue); } #[allow(clippy::missing_safety_doc, missing_docs)] diff --git a/uniffi_meta/src/metadata.rs b/uniffi_meta/src/metadata.rs index 6e490a4866..a5a04bfb16 100644 --- a/uniffi_meta/src/metadata.rs +++ b/uniffi_meta/src/metadata.rs @@ -50,7 +50,7 @@ pub mod codes { pub const TYPE_CUSTOM: u8 = 22; pub const TYPE_RESULT: u8 = 23; //pub const TYPE_FUTURE: u8 = 24; - pub const TYPE_FOREIGN_EXECUTOR: u8 = 25; + pub const TYPE_BLOCKING_TASK_QUEUE: u8 = 25; pub const TYPE_UNIT: u8 = 255; // Literal codes diff --git a/uniffi_meta/src/reader.rs b/uniffi_meta/src/reader.rs index bf6525f2b5..08a078eb42 100644 --- a/uniffi_meta/src/reader.rs +++ b/uniffi_meta/src/reader.rs @@ -122,7 +122,7 @@ impl<'a> MetadataReader<'a> { codes::TYPE_STRING => Type::String, codes::TYPE_DURATION => Type::Duration, codes::TYPE_SYSTEM_TIME => Type::Timestamp, - codes::TYPE_FOREIGN_EXECUTOR => Type::ForeignExecutor, + codes::TYPE_BLOCKING_TASK_QUEUE => Type::BlockingTaskQueue, codes::TYPE_RECORD => Type::Record { module_path: self.read_string()?, name: self.read_string()?, diff --git a/uniffi_meta/src/types.rs b/uniffi_meta/src/types.rs index 24f8a6f2a8..2e81b0c835 100644 --- a/uniffi_meta/src/types.rs +++ b/uniffi_meta/src/types.rs @@ -85,7 +85,7 @@ pub enum Type { // How the object is implemented. imp: ObjectImpl, }, - ForeignExecutor, + BlockingTaskQueue, // Types defined in the component API, each of which has a string name. Record { module_path: String, diff --git a/uniffi_udl/src/resolver.rs b/uniffi_udl/src/resolver.rs index 14a7a4c6f1..1409c3a6ff 100644 --- a/uniffi_udl/src/resolver.rs +++ b/uniffi_udl/src/resolver.rs @@ -209,7 +209,7 @@ pub(crate) fn resolve_builtin_type(name: &str) -> Option { "f64" => Some(Type::Float64), "timestamp" => Some(Type::Timestamp), "duration" => Some(Type::Duration), - "ForeignExecutor" => Some(Type::ForeignExecutor), + "BlockingTaskQueue" => Some(Type::BlockingTaskQueue), _ => None, } }