Skip to content

Commit

Permalink
Async blocking task support
Browse files Browse the repository at this point in the history
Added the `BlockingTaskQueue` type BlockingTaskQueue allows a Rust
closure to be scheduled on a foreign thread where blocking operations
are okay. The closure runs inside the parent future, which is nice
because it allows the closure to reference its outside scope.

On the foreign side, a `BlockingTaskQueue` is a native type that runs a
task in some sort of thread queue (`DispatchQueue`, `CoroutineContext`,
`futures.Executor`, etc.).

Added new tests for this in the futures fixtures.  Updated the tests to
check that handles are being released properly.
  • Loading branch information
bendk committed Dec 13, 2023
1 parent 5308aa0 commit 6f3fa1d
Show file tree
Hide file tree
Showing 52 changed files with 1,274 additions and 315 deletions.
81 changes: 75 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,63 @@ This code uses `asyncio` to drive the future to completion, while our exposed fu
In Rust `Future` terminology this means the foreign bindings supply the "executor" - think event-loop, or async runtime. In this example it's `asyncio`. There's no requirement for a Rust event loop.

There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read.

## Blocking tasks

Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
Foreign executors make similar assumptions and sometimes more extreme ones.
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.

This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.

On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
It inputs a closure and runs it in the `BlockingTaskQueue`.
This closure can reference the outside scope (i.e. it does not need to be `'static`).
For example:

```rust
#[derive(uniffi::Object)]
struct DataStore {
// Used to run blocking tasks
queue: uniffi::BlockingTaskQueue,
// Low-level DB object with blocking methods
db: Mutex<Database>,
}

#[uniffi::export]
impl DataStore {
#[uniffi::constructor]
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
Self {
queue,
db: Mutex::new(Database::new())
}
}

fn fetch_all_items(&self) -> Vec<DbItem> {
self.queue.execute(|| self.db.lock().fetch_all_items())
}
}
```

On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.

### Kotlin
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.

### Swift
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
The user-initiated global queue is normally a good choice.
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
The `DispatchQueue` should be concurrent.

### Python

Python uses a `futures.Executor` for its `BlockingTaskQueue`.
`ThreadPoolExecutor` is typically a good choice.
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.
1 change: 1 addition & 0 deletions fixtures/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ path = "src/bin.rs"

[dependencies]
uniffi = { path = "../../uniffi", version = "0.25", features = ["tokio", "cli"] }
futures = "0.3.29"
thiserror = "1.0"
tokio = { version = "1.24.1", features = ["time", "sync"] }
once_cell = "1.18.0"
Expand Down
56 changes: 56 additions & 0 deletions fixtures/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{
time::Duration,
};

use futures::stream::{FuturesUnordered, StreamExt};

/// Non-blocking timer future.
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
Expand Down Expand Up @@ -326,4 +328,58 @@ pub async fn use_shared_resource(options: SharedResourceOptions) -> Result<(), A
Ok(())
}

/// Async function that uses a blocking task queue to do its work
#[uniffi::export]
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.execute(|| value * value).await
}

/// Same as before, but this one runs multiple tasks
#[uniffi::export]
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queue.execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// ...and this one uses multiple BlockingTaskQueues
#[uniffi::export]
pub async fn calc_squares_multi_queue(
queues: Vec<uniffi::BlockingTaskQueue>,
items: Vec<i32>,
) -> Vec<i32> {
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queues[i].execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
/// test that a) the clone works and b) we correctly drop the references.
#[uniffi::export]
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.clone().execute(|| value * value).await
}

uniffi::include_scaffolding!("futures");
Loading

0 comments on commit 6f3fa1d

Please sign in to comment.