Skip to content

Commit

Permalink
checkpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
bendk committed Dec 12, 2023
1 parent e53e509 commit 2914ce2
Show file tree
Hide file tree
Showing 18 changed files with 667 additions and 151 deletions.
81 changes: 75 additions & 6 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

60 changes: 60 additions & 0 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,63 @@ This code uses `asyncio` to drive the future to completion, while our exposed fu
In Rust `Future` terminology this means the foreign bindings supply the "executor" - think event-loop, or async runtime. In this example it's `asyncio`. There's no requirement for a Rust event loop.

There are [some great API docs](https://docs.rs/uniffi_core/latest/uniffi_core/ffi/rustfuture/index.html) on the implementation that are well worth a read.

## Blocking tasks

Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
Foreign executors make similar assumptions and sometimes more extreme ones.
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.

This raises the question of how async code can interact with blocking code.
"blocking" here means code that preforms blocking IO, long-running computations without `await` breaks, etc.
To support this, UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.

On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
It inputs a closure and runs it in the `BlockingTaskQueue`.
This closure can reference the outside scope (it does not need to be `'static`).
For example:

```rust
#[derive(uniffi::Object)]
struct DataStore {
// Used to run blocking tasks
queue: uniffi::BlockingTaskQueue,
// Low-level DB object with blocking methods
db: Mutex<Database>,
}

#[uniffi::export]
impl DataStore {
#[uniffi::constructor]
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
Self {
queue,
db: Mutex::new(Database::new())
}
}

fn fetch_all_items(&self) -> Vec<DbItem> {
self.queue.execute(|| self.db.lock().fetch_all_items())
}
}
```

On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.

### Kotlin
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.

### Swift
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
The `DispatchQueue` should be concurrent for all in almost all circumstances -- the user-initiated global queue is normally a good choice.
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.

### Python

Python uses a `futures.Executor` for its `BlockingTaskQueue`.
`ThreadPoolExecutor` is typically a good choice.
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.
1 change: 1 addition & 0 deletions fixtures/futures/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ path = "src/bin.rs"

[dependencies]
uniffi = { path = "../../uniffi", version = "0.25", features = ["tokio", "cli"] }
futures = "0.3.29"
thiserror = "1.0"
tokio = { version = "1.24.1", features = ["time", "sync"] }
once_cell = "1.18.0"
Expand Down
51 changes: 51 additions & 0 deletions fixtures/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ use std::{
time::Duration,
};

use futures::stream::{FuturesUnordered, StreamExt};

/// Non-blocking timer future.
pub struct TimerFuture {
shared_state: Arc<Mutex<SharedState>>,
Expand Down Expand Up @@ -326,4 +328,53 @@ pub async fn use_shared_resource(options: SharedResourceOptions) -> Result<(), A
Ok(())
}

/// Async function that uses a blocking task queue to do its work
#[uniffi::export]
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.execute(|| value * value).await
}

/// Same as before, but this one runs multiple tasks
#[uniffi::export]
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
let mut futures: FuturesUnordered<_> = (0..items.len())
.into_iter()
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queue.execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// ...and this one uses multiple BlockingTaskQueues
#[uniffi::export]
pub async fn calc_squares_multi_queue(
queues: Vec<uniffi::BlockingTaskQueue>,
items: Vec<i32>,
) -> Vec<i32> {
let mut futures: FuturesUnordered<_> = (0..items.len())
.into_iter()
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queues[i].execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

uniffi::include_scaffolding!("futures");
Loading

0 comments on commit 2914ce2

Please sign in to comment.