Skip to content

Commit

Permalink
Async blocking task support
Browse files Browse the repository at this point in the history
Added the `BlockingTaskQueue` type BlockingTaskQueue allows a Rust
closure to be scheduled on a foreign thread where blocking operations
are okay. The closure runs inside the parent future, which is nice
because it allows the closure to reference its outside scope.

On the foreign side, a `BlockingTaskQueue` is a native type that runs a
task in some sort of thread queue (`DispatchQueue`, `CoroutineContext`,
`futures.Executor`, etc.).

Updated handlemaps to always start at 1 rather than 0, which is reserved
for a NULL handle.

Added new tests for this in the futures fixtures.  Updated the tests to
check that handles are being released properly.
  • Loading branch information
bendk committed Mar 12, 2024
1 parent 075efd9 commit 5a7ee85
Show file tree
Hide file tree
Showing 44 changed files with 1,112 additions and 156 deletions.
81 changes: 75 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,3 +71,63 @@ pub trait SayAfterTrait: Send + Sync {
async fn say_after(&self, ms: u16, who: String) -> String;
}
```

## Blocking tasks

Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
Foreign executors make similar assumptions and sometimes more extreme ones.
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.

This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.

On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
It inputs a closure and runs it in the `BlockingTaskQueue`.
This closure can reference the outside scope (i.e. it does not need to be `'static`).
For example:

```rust
#[derive(uniffi::Object)]
struct DataStore {
// Used to run blocking tasks
queue: uniffi::BlockingTaskQueue,
// Low-level DB object with blocking methods
db: Mutex<Database>,
}

#[uniffi::export]
impl DataStore {
#[uniffi::constructor]
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
Self {
queue,
db: Mutex::new(Database::new())
}
}

async fn fetch_all_items(&self) -> Vec<DbItem> {
self.queue.execute(|| self.db.lock().fetch_all_items()).await
}
}
```

On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.

### Kotlin
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.

### Swift
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
The user-initiated global queue is normally a good choice.
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
The `DispatchQueue` should be concurrent.

### Python

Python uses a `futures.Executor` for its `BlockingTaskQueue`.
`ThreadPoolExecutor` is typically a good choice.
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.
1 change: 1 addition & 0 deletions fixtures/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ path = "src/bin.rs"
[dependencies]
uniffi = { workspace = true, features = ["tokio", "cli"] }
async-trait = "0.1"
futures = "0.3.29"
thiserror = "1.0"
tokio = { version = "1.24.1", features = ["time", "sync"] }
once_cell = "1.18.0"
Expand Down
56 changes: 56 additions & 0 deletions fixtures/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{
time::Duration,
};

use futures::stream::{FuturesUnordered, StreamExt};

/// Non-blocking timer future.
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
Expand Down Expand Up @@ -387,4 +389,58 @@ fn get_say_after_udl_traits() -> Vec<Arc<dyn SayAfterUdlTrait>> {
vec![Arc::new(SayAfterImpl1), Arc::new(SayAfterImpl2)]
}

/// Async function that uses a blocking task queue to do its work
#[uniffi::export]
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.execute(|| value * value).await
}

/// Same as before, but this one runs multiple tasks
#[uniffi::export]
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queue.execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// ...and this one uses multiple BlockingTaskQueues
#[uniffi::export]
pub async fn calc_squares_multi_queue(
queues: Vec<uniffi::BlockingTaskQueue>,
items: Vec<i32>,
) -> Vec<i32> {
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queues[i].execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
/// test that a) the clone works and b) we correctly drop the references.
#[uniffi::export]
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.clone().execute(|| value * value).await
}

uniffi::include_scaffolding!("futures");
Loading

0 comments on commit 5a7ee85

Please sign in to comment.