Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Async blocking task queue #1837

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
59 changes: 59 additions & 0 deletions docs/manual/src/futures.md
Original file line number Diff line number Diff line change
Expand Up @@ -93,3 +93,62 @@ In this case, we need an event loop to run the Python async function, but there'
Use `uniffi_set_event_loop()` to handle this case.
It should be called before the Rust code makes the async call and passed an eventloop to use.

## Blocking tasks

Rust executors are designed around an assumption that the `Future::poll` function will return quickly.
This assumption, combined with cooperative scheduling, allows for a large number of futures to be handled by a small number of threads.
Foreign executors make similar assumptions and sometimes more extreme ones.
For example, the Python eventloop is single threaded -- if any task spends a long time between `await` points, then it will block all other tasks from progressing.

This raises the question of how async code can interact with blocking code that performs blocking IO, long-running computations without `await` breaks, etc.
UniFFI defines the `BlockingTaskQueue` type, which is a foreign object that schedules work on a thread where it's okay to block.

On Rust, `BlockingTaskQueue` is a UniFFI type that can safely run blocking code.
It's `execute` method works like tokio's [block_in_place](https://docs.rs/tokio/latest/tokio/task/fn.block_in_place.html) function.
It inputs a closure and runs it in the `BlockingTaskQueue`.
This closure can reference the outside scope (i.e. it does not need to be `'static`).
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure what this method should be named. I chose execute simply because I hadn't seen it used in other places so it didn't have the baggage of other names. I almost want to name in block_in_place since it matches the semantics of the tokio method. But the "in_place" part feels weird since we don't have a spawn_blocking method to contrast with.

For example:

```rust
#[derive(uniffi::Object)]
struct DataStore {
// Used to run blocking tasks
queue: uniffi::BlockingTaskQueue,
// Low-level DB object with blocking methods
db: Mutex<Database>,
}

#[uniffi::export]
impl DataStore {
#[uniffi::constructor]
fn new(queue: uniffi::BlockingTaskQueue) -> Self {
Self {
queue,
db: Mutex::new(Database::new())
}
}

async fn fetch_all_items(&self) -> Vec<DbItem> {
self.queue.execute(|| self.db.lock().fetch_all_items()).await
}
}
```

On the foreign side `BlockingTaskQueue` corresponds to a language-dependent class.

### Kotlin
Kotlin uses `CoroutineContext` for its `BlockingTaskQueue`.
Any `CoroutineContext` will work, but `Dispatchers.IO` is usually a good choice.
A DataStore from the example above can be created with `DataStore(Dispatchers.IO)`.

### Swift
Swift uses `DispatchQueue` for its `BlockingTaskQueue`.
The user-initiated global queue is normally a good choice.
A DataStore from the example above can be created with `DataStore(queue: DispatchQueue.global(qos: .userInitiated)`.
The `DispatchQueue` should be concurrent.

### Python

Python uses a `futures.Executor` for its `BlockingTaskQueue`.
`ThreadPoolExecutor` is typically a good choice.
A DataStore from the example above can be created with `DataStore(ThreadPoolExecutor())`.
59 changes: 58 additions & 1 deletion fixtures/futures/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,10 @@ use std::{
time::Duration,
};

use futures::future::{AbortHandle, Abortable, Aborted};
use futures::{
future::{AbortHandle, Abortable, Aborted},
stream::{FuturesUnordered, StreamExt},
};

/// Non-blocking timer future.
pub struct TimerFuture {
Expand Down Expand Up @@ -456,4 +459,58 @@ async fn cancel_delay_using_trait(obj: Arc<dyn AsyncParser>, delay_ms: i32) {
assert_eq!(future.await, Err(Aborted));
}

/// Async function that uses a blocking task queue to do its work
#[uniffi::export]
pub async fn calc_square(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.execute(|| value * value).await
}

/// Same as before, but this one runs multiple tasks
#[uniffi::export]
pub async fn calc_squares(queue: uniffi::BlockingTaskQueue, items: Vec<i32>) -> Vec<i32> {
// Use `FuturesUnordered` to test our blocking task queue code which is known to be a tricky API to work with.
// In particular, if we don't notify the waker then FuturesUnordered will not poll again.
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queue.execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// ...and this one uses multiple BlockingTaskQueues
#[uniffi::export]
pub async fn calc_squares_multi_queue(
queues: Vec<uniffi::BlockingTaskQueue>,
items: Vec<i32>,
) -> Vec<i32> {
let mut futures: FuturesUnordered<_> = (0..items.len())
.map(|i| {
// Test that we can use references from the surrounding scope
let items = &items;
queues[i].execute(move || items[i] * items[i])
})
.collect();
let mut results = vec![];
while let Some(result) = futures.next().await {
results.push(result);
}
results.sort();
results
}

/// Like calc_square, but it clones the BlockingTaskQueue first then drops both copies. Used to
/// test that a) the clone works and b) we correctly drop the references.
#[uniffi::export]
pub async fn calc_square_with_clone(queue: uniffi::BlockingTaskQueue, value: i32) -> i32 {
queue.clone().execute(|| value * value).await
}

uniffi::include_scaffolding!("futures");
73 changes: 58 additions & 15 deletions fixtures/futures/tests/bindings/test_futures.kts
Original file line number Diff line number Diff line change
@@ -1,9 +1,22 @@
import uniffi.fixture.futures.*
import java.util.concurrent.Executors
import kotlinx.coroutines.*
import kotlin.system.*

fun runAsyncTest(test: suspend CoroutineScope.() -> Unit) {
val initialBlockingTaskQueueHandleCount = uniffiBlockingTaskQueueHandleCount()
val initialPollHandleCount = uniffiPollHandleCount()
val time = runBlocking {
measureTimeMillis {
test()
}
}
assert(uniffiBlockingTaskQueueHandleCount() == initialBlockingTaskQueueHandleCount)
assert(uniffiPollHandleCount() == initialPollHandleCount)
}

// init UniFFI to get good measurements after that
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
alwaysReady()
}
Expand All @@ -24,7 +37,7 @@ fun assertApproximateTime(actualTime: Long, expectedTime: Int, testName: String
}

// Test `always_ready`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = alwaysReady()

Expand All @@ -35,7 +48,7 @@ runBlocking {
}

// Test `void`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = void()

Expand All @@ -46,7 +59,7 @@ runBlocking {
}

// Test `sleep`.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
sleep(200U)
}
Expand All @@ -55,7 +68,7 @@ runBlocking {
}

// Test sequential futures.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = sayAfter(100U, "Alice")
val resultBob = sayAfter(200U, "Bob")
Expand All @@ -68,7 +81,7 @@ runBlocking {
}

// Test concurrent futures.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = async { sayAfter(100U, "Alice") }
val resultBob = async { sayAfter(200U, "Bob") }
Expand All @@ -81,7 +94,7 @@ runBlocking {
}

// Test async methods.
runBlocking {
runAsyncTest {
val megaphone = newMegaphone()
val time = measureTimeMillis {
val resultAlice = megaphone.sayAfter(200U, "Alice")
Expand All @@ -92,7 +105,7 @@ runBlocking {
assertApproximateTime(time, 200, "async methods")
}

runBlocking {
runAsyncTest {
val megaphone = newMegaphone()
val time = measureTimeMillis {
val resultAlice = sayAfterWithMegaphone(megaphone, 200U, "Alice")
Expand All @@ -104,7 +117,7 @@ runBlocking {
}

// Test async method returning optional object
runBlocking {
runAsyncTest {
val megaphone = asyncMaybeNewMegaphone(true)
assert(megaphone != null)

Expand Down Expand Up @@ -213,7 +226,7 @@ runBlocking {


// Test with the Tokio runtime.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val resultAlice = sayAfterWithTokio(200U, "Alice")

Expand All @@ -224,7 +237,7 @@ runBlocking {
}

// Test fallible function/method.
runBlocking {
runAsyncTest {
val time1 = measureTimeMillis {
try {
fallibleMe(false)
Expand Down Expand Up @@ -289,7 +302,7 @@ runBlocking {
}

// Test record.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val result = newMyRecord("foo", 42U)

Expand All @@ -303,7 +316,7 @@ runBlocking {
}

// Test a broken sleep.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
brokenSleep(100U, 0U) // calls the waker twice immediately
sleep(100U) // wait for possible failure
Expand All @@ -317,7 +330,7 @@ runBlocking {


// Test a future that uses a lock and that is cancelled.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
val job = launch {
useSharedResource(SharedResourceOptions(releaseAfterMs=5000U, timeoutMs=100U))
Expand All @@ -336,11 +349,41 @@ runBlocking {
}

// Test a future that uses a lock and that is not cancelled.
runBlocking {
runAsyncTest {
val time = measureTimeMillis {
useSharedResource(SharedResourceOptions(releaseAfterMs=100U, timeoutMs=1000U))

useSharedResource(SharedResourceOptions(releaseAfterMs=0U, timeoutMs=1000U))
}
println("useSharedResource (not canceled): ${time}ms")
}

// Test blocking task queues
runAsyncTest {
withTimeout(1000) {
assert(calcSquare(Dispatchers.IO, 20) == 400)
}

withTimeout(1000) {
assert(calcSquares(Dispatchers.IO, listOf(1, -2, 3)) == listOf(1, 4, 9))
}

val executors = listOf(
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor(),
Executors.newSingleThreadExecutor(),
)
withTimeout(1000) {
assert(calcSquaresMultiQueue(executors.map { it.asCoroutineDispatcher() }, listOf(1, -2, 3)) == listOf(1, 4, 9))
}
for (executor in executors) {
executor.shutdown()
}
}

// Test blocking task queue cloning
runAsyncTest {
withTimeout(1000) {
assert(calcSquareWithClone(Dispatchers.IO, 20) == 400)
}
}
Loading