Skip to content

Commit

Permalink
Async blocking task support
Browse files Browse the repository at this point in the history
Added the `BlockingTaskQueue` type BlockingTaskQueue allows a Rust
closure to be scheduled on a foreign thread where blocking operations
are okay. The closure runs inside the parent future, which is nice
because it allows the closure to reference its outside scope.

On the foreign side, a `BlockingTaskQueue` is a native type that runs a
task in some sort of thread queue (`DispatchQueue`, `CoroutineContext`,
`futures.Executor`, etc.).

Updated handlemaps to always start at 1 rather than 0, which is reserved
for a NULL handle.

Added new tests for this in the futures fixtures.  Updated the tests to
check that handles are being released properly.
  • Loading branch information
bendk committed Mar 23, 2024
1 parent 17482fa commit 6931e33
Show file tree
Hide file tree
Showing 42 changed files with 1,036 additions and 155 deletions.
59 changes: 59 additions & 0 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,62 @@ In this case, we need an event loop to run the Python async function, but there'
Use `uniffi_set_event_loop()` to handle this case.
It should be called before the Rust code makes the async call and passed an eventloop to use.

## Blocking tasks

Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
Foreign executors make similar assumptions and sometimes more extreme ones.
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.

This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.

On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
It inputs a closure and runs it in the `BlockingTaskQueue`.
This closure can reference the outside scope (i.e. it does not need to be `'static`).
For example:

```rust
#[derive(uniffi::Object)]
struct DataStore {
// Used to run blocking tasks
queue: uniffi::BlockingTaskQueue,
// Low-level DB object with blocking methods
db: Mutex<Database>,
}

#[uniffi::export]
impl DataStore {
#[uniffi::constructor]
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
Self {
queue,
db: Mutex::new(Database::new())
}
}

async fn fetch_all_items(&self) -> Vec<DbItem> {
self.queue.execute(|| self.db.lock().fetch_all_items()).await
}
}
```

On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.

### Kotlin
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.

### Swift
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
The user-initiated global queue is normally a good choice.
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
The `DispatchQueue` should be concurrent.

### Python

Python uses a `futures.Executor` for its `BlockingTaskQueue`.
`ThreadPoolExecutor` is typically a good choice.
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.
59 changes: 58 additions & 1 deletion fixtures/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use std::{
time::Duration,
};

use futures::future::{AbortHandle, Abortable, Aborted};
use futures::{
future::{AbortHandle, Abortable, Aborted},
stream::{FuturesUnordered, StreamExt},
};

/// Non-blocking timer future.
pub struct TimerFuture {
Expand Down Expand Up @@ -456,4 +459,58 @@ async fn cancel_delay_using_trait(obj: Arc<dyn AsyncParser>, delay_ms: i32) {
assert_eq!(future.await, Err(Aborted));
}

/// Async function that uses a blocking task queue to do its work
#[uniffi::export]
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.execute(|| value * value).await
}

/// Same as before, but this one runs multiple tasks
#[uniffi::export]
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queue.execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// ...and this one uses multiple BlockingTaskQueues
#[uniffi::export]
pub async fn calc_squares_multi_queue(
queues: Vec<uniffi::BlockingTaskQueue>,
items: Vec<i32>,
) -> Vec<i32> {
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queues[i].execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
/// test that a) the clone works and b) we correctly drop the references.
#[uniffi::export]
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.clone().execute(|| value * value).await
}

uniffi::include_scaffolding!("futures");
73 changes: 58 additions & 15 deletions fixtures/futures/tests/bindings/test_futures.kts
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import uniffi.fixture.futures.*
import java.util.concurrent.Executors
import kotlinx.coroutines.*
import kotlin.system.*

fun runAsyncTest(test: suspend CoroutineScope.() -> Unit) {
val initialBlockingTaskQueueHandleCount = uniffiBlockingTaskQueueHandleCount()
val initialPollHandleCount = uniffiPollHandleCount()
val time = runBlocking {
measureTimeMillis {
test()
}
}
assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueHandleCount)
assert(uniffiPollHandleCount() == initialPollHandleCount)
}

// init UniFFI to get good measurements after that
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
alwaysReady()
}
Expand All @@ -24,7 +37,7 @@ fun assertApproximateTime(actualTime: Long, expectedTime: Int, testName: String
}

// Test `always_ready`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = alwaysReady()

Expand All @@ -35,7 +48,7 @@ runBlocking {
}

// Test `void`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = void()

Expand All @@ -46,7 +59,7 @@ runBlocking {
}

// Test `sleep`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
sleep(200U)
}
Expand All @@ -55,7 +68,7 @@ runBlocking {
}

// Test sequential futures.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = sayAfter(100U, "Alice")
val resultBob = sayAfter(200U, "Bob")
Expand All @@ -68,7 +81,7 @@ runBlocking {
}

// Test concurrent futures.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = async { sayAfter(100U, "Alice") }
val resultBob = async { sayAfter(200U, "Bob") }
Expand All @@ -81,7 +94,7 @@ runBlocking {
}

// Test async methods.
runBlocking {
runAsyncTest {
val megaphone = newMegaphone()
val time = measureTimeMillis {
val resultAlice = megaphone.sayAfter(200U, "Alice")
Expand All @@ -92,7 +105,7 @@ runBlocking {
assertApproximateTime(time, 200, "async methods")
}

runBlocking {
runAsyncTest {
val megaphone = newMegaphone()
val time = measureTimeMillis {
val resultAlice = sayAfterWithMegaphone(megaphone, 200U, "Alice")
Expand All @@ -104,7 +117,7 @@ runBlocking {
}

// Test async method returning optional object
runBlocking {
runAsyncTest {
val megaphone = asyncMaybeNewMegaphone(true)
assert(megaphone != null)

Expand Down Expand Up @@ -213,7 +226,7 @@ runBlocking {


// Test with the Tokio runtime.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = sayAfterWithTokio(200U, "Alice")

Expand All @@ -224,7 +237,7 @@ runBlocking {
}

// Test fallible function/method.
runBlocking {
runAsyncTest {
val time1 = measureTimeMillis {
try {
fallibleMe(false)
Expand Down Expand Up @@ -289,7 +302,7 @@ runBlocking {
}

// Test record.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = newMyRecord("foo", 42U)

Expand All @@ -303,7 +316,7 @@ runBlocking {
}

// Test a broken sleep.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
brokenSleep(100U, 0U) // calls the waker twice immediately
sleep(100U) // wait for possible failure
Expand All @@ -317,7 +330,7 @@ runBlocking {


// Test a future that uses a lock and that is cancelled.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U))
Expand All @@ -336,11 +349,41 @@ runBlocking {
}

// Test a future that uses a lock and that is not cancelled.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))

useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
}
println("useSharedResource (not canceled): ${time}ms")
}

// Test blocking task queues
runAsyncTest {
withTimeout(1000) {
assert(calcSquare(Dispatchers.IO, 20) == 400)
}

withTimeout(1000) {
assert(calcSquares(Dispatchers.IO, listOf(1, -2, 3)) == listOf(1, 4, 9))
}

val executors = listOf(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor(),
)
withTimeout(1000) {
assert(calcSquaresMultiQueue(executors.map { it.asCoroutineDispatcher() }, listOf(1, -2, 3)) == listOf(1, 4, 9))
}
for (executor in executors) {
executor.shutdown()
}
}

// Test blocking task queue cloning
runAsyncTest {
withTimeout(1000) {
assert(calcSquareWithClone(Dispatchers.IO, 20) == 400)
}
}
Loading

0 comments on commit 6931e33

Please sign in to comment.