Skip to content

Commit 2a42823

Browse files
committed
move to sync fn input in executor, ditch everything but spawn_blocking/custom/current_context, tweak defaults to enable concurrency limiting + use spawn_blocking for tokio feature, add more docs
1 parent be60053 commit 2a42823

26 files changed

+809
-1265
lines changed

Cargo.toml

Lines changed: 9 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -6,28 +6,28 @@ license = "MIT"
66
repository = "https://github.com/jlizen/compute-heavy-future-executor"
77
homepage = "https://github.com/jlizen/compute-heavy-future-executor"
88
rust-version = "1.70"
9-
exclude = ["/.github", "/examples", "/scripts"]
9+
exclude = ["/.github", "/Exampless", "/scripts"]
1010
readme = "README.md"
11-
description = "Additional executor patterns for handling compute-bounded, blocking futures."
11+
description = "Executor patterns for handling compute-bounded calls inside async contexts."
1212
categories = ["asynchronous"]
1313

1414
[features]
15-
tokio = ["tokio/rt"]
16-
tokio_block_in_place = ["tokio", "tokio/rt-multi-thread"]
17-
secondary_tokio_runtime = ["tokio", "tokio/rt-multi-thread", "dep:libc", "dep:num_cpus"]
15+
default = ["tokio"]
16+
tokio = ["tokio/rt",]
1817

1918
[dependencies]
20-
libc = { version = "0.2.168", optional = true }
2119
log = "0.4.22"
22-
num_cpus = { version = "1.0", optional = true }
23-
tokio = { version = "1.0", features = ["macros", "sync"] }
20+
num_cpus = "1.0"
21+
tokio = { version = "1.0", features = ["sync"] }
2422

2523
[dev-dependencies]
26-
tokio = { version = "1.0", features = ["full"]}
24+
tokio = { version = "1", features = ["full"]}
2725
futures-util = "0.3.31"
26+
rayon = "1"
2827

2928
[package.metadata.docs.rs]
3029
all-features = true
30+
rustdoc-args = ["--cfg", "docsrs"]
3131

3232

3333
[lints.rust]

README.md

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,89 @@
11
# compute-heavy-future-executor
22
Experimental crate that adds additional executor patterns to use with frequently blocking futures.
3+
4+
Today, when library authors are write async APIs, they don't have a good way to handle long-running sync segments.
5+
6+
An application author can use selective handling such as `tokio::task::spawn_blocking()` along with concurrency control to delegate sync segments to blocking threads. Or, they might send the work to a `rayon` threadpool.
7+
8+
But, library authors generally don't have this flexibility. As, they generally want to be agnostic across runtime. Or, even if they are `tokio`-specific, they generally don't want to call `tokio::task::spawn_blocking()` as it is
9+
suboptimal without extra configuration (concurrency control) as well as highly opinionated to send the work across threads.
10+
11+
This library aims to solve this problem by providing libray authors a static, globally scoped strategy that they can delegate blocking sync work to without drawing any conclusions about handling.
12+
13+
And then, the applications using the library can either rely on the default strategy that this package provides, or tune them with their preferred approach.
14+
15+
## Usage - Library Authors
16+
For library authors, it's as simple as adding a dependency enabling `compute-heavy-future-executor` (perhaps behind a feature flag).
17+
18+
The below will default to 'current context' execution (ie non-op) unless the caller enables the tokio feature.
19+
```
20+
[dependencies]
21+
compute-heavy-future-executor = { version = "0.1", default-features = false }
22+
```
23+
24+
Meanwhile to be slightly more opinionated, the below will enable usage of `spawn_blocking` with concurrency control
25+
by default unless the caller opts out:
26+
```
27+
[dependencies]
28+
compute-heavy-future-executor = { version = "0.1" }
29+
```
30+
31+
And then wrap any sync work by passing it as a closure to a global `execute_sync()` call:
32+
33+
```
34+
use compute_heavy_future_executor::execute_sync;
35+
36+
fn sync_work(input: String)-> u8 {
37+
std::thread::sleep(std::time::Duration::from_secs(5));
38+
println!("{input}");
39+
5
40+
}
41+
pub async fn a_future_that_has_blocking_sync_work() -> u8 {
42+
// relies on caller-specified strategy for translating execute_sync into a future that won't
43+
// block the current worker thread
44+
execute_sync(move || { sync_work("foo".to_string()) }).await.unwrap()
45+
}
46+
47+
```
48+
49+
## Usage - Application owners
50+
Application authors can benefit from this crate with no application code changes, if you are using
51+
a library that is itself using this crate.
52+
53+
If you want to customize the strategy beyond defaults, they can add
54+
`compute-heavy-future-executor` to their dependencies:
55+
56+
```
57+
[dependencies]
58+
// enables tokio and therefore spawn_blocking strategy by default
59+
compute-heavy-future-executor = { version = "0.1" }
60+
// used for example with custom executor
61+
rayon = "1"
62+
```
63+
64+
And then configure your global strategy as desired. For instance, see below for usage of rayon
65+
instead of `spawn_blocking()`.
66+
67+
```
68+
use std::sync::OnceLock;
69+
use rayon::ThreadPool;
70+
71+
use compute_heavy_future_executor::{
72+
global_sync_strategy_builder, CustomExecutorSyncClosure,
73+
};
74+
75+
static THREADPOOL: OnceLock<ThreadPool> = OnceLock::new();
76+
77+
fn initialize_strategy() {
78+
THREADPOOL.set(|| rayon::ThreadPoolBuilder::default().build().unwrap());
79+
80+
let custom_closure: CustomExecutorSyncClosure =
81+
Box::new(|f| Box::new(async move { Ok(THREADPOOL.get().unwrap().spawn(f)) }));
82+
83+
global_sync_strategy_builder()
84+
// probably no need for max concurrency as rayon already is defaulting to a thread per core
85+
// and using a task queue
86+
.initialize_custom_executor(custom_closure).unwrap();
87+
}
88+
89+
```

src/block_in_place.rs

Lines changed: 0 additions & 45 deletions
This file was deleted.

src/concurrency_limit.rs

Lines changed: 6 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -23,19 +23,13 @@ impl ConcurrencyLimit {
2323
/// Internally turns errors into a no-op (`None`) and outputs log lines.
2424
pub(crate) async fn acquire_permit(&self) -> Option<OwnedSemaphorePermit> {
2525
match self.semaphore.clone() {
26-
Some(semaphore) => {
27-
match semaphore
28-
.acquire_owned()
29-
.await
30-
.map_err(|err| Error::Semaphore(err))
31-
{
32-
Ok(permit) => Some(permit),
33-
Err(err) => {
34-
log::error!("failed to acquire permit: {err}");
35-
None
36-
}
26+
Some(semaphore) => match semaphore.acquire_owned().await.map_err(Error::Semaphore) {
27+
Ok(permit) => Some(permit),
28+
Err(err) => {
29+
log::error!("failed to acquire permit: {err}");
30+
None
3731
}
38-
}
32+
},
3933
None => None,
4034
}
4135
}

src/custom_executor.rs

Lines changed: 0 additions & 55 deletions
This file was deleted.

src/error.rs

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -2,33 +2,30 @@ use core::fmt;
22

33
use crate::ExecutorStrategy;
44

5+
/// An error from the custom executor
56
#[non_exhaustive]
67
#[derive(Debug)]
78
pub enum Error {
9+
/// Executor has already had a global strategy configured.
810
AlreadyInitialized(ExecutorStrategy),
9-
InvalidConfig(InvalidConfig),
11+
/// Issue listening on the custom executor response channel.
1012
RecvError(tokio::sync::oneshot::error::RecvError),
13+
/// Error enforcing concurrency
1114
Semaphore(tokio::sync::AcquireError),
15+
/// Dynamic error from the custom executor closure
1216
BoxError(Box<dyn std::error::Error + Send + Sync>),
1317
#[cfg(feature = "tokio")]
18+
/// Background spawn blocking task panicked
1419
JoinError(tokio::task::JoinError),
1520
}
1621

17-
#[derive(Debug)]
18-
pub struct InvalidConfig {
19-
pub field: &'static str,
20-
pub received: String,
21-
pub expected: &'static str,
22-
}
23-
2422
impl fmt::Display for Error {
2523
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
2624
match self {
2725
Error::AlreadyInitialized(strategy) => write!(
2826
f,
29-
"global strategy is already initialzed with strategy: {strategy:#?}"
27+
"global strategy is already initialized with strategy: {strategy:#?}"
3028
),
31-
Error::InvalidConfig(err) => write!(f, "invalid config: {err:#?}"),
3229
Error::BoxError(err) => write!(f, "custom executor error: {err}"),
3330
Error::RecvError(err) => write!(f, "error in custom executor response channel: {err}"),
3431
Error::Semaphore(err) => write!(
Lines changed: 9 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,6 @@
1-
use crate::{concurrency_limit::ConcurrencyLimit, error::Error, ComputeHeavyFutureExecutor};
1+
use crate::{concurrency_limit::ConcurrencyLimit, error::Error};
2+
3+
use super::ExecuteSync;
24

35
pub(crate) struct CurrentContextExecutor {
46
concurrency_limit: ConcurrencyLimit,
@@ -12,16 +14,15 @@ impl CurrentContextExecutor {
1214
}
1315
}
1416

15-
impl ComputeHeavyFutureExecutor for CurrentContextExecutor {
16-
async fn execute<F, O>(&self, fut: F) -> Result<O, Error>
17+
impl ExecuteSync for CurrentContextExecutor {
18+
async fn execute_sync<F, R>(&self, f: F) -> Result<R, Error>
1719
where
18-
F: std::future::Future<Output = O> + Send + 'static,
19-
O: Send + 'static,
20+
F: FnOnce() -> R + Send + 'static,
21+
R: Send + 'static,
2022
{
2123
let _permit = self.concurrency_limit.acquire_permit().await;
2224

23-
Ok(fut.await)
24-
25-
// implicit permit drop
25+
Ok(f())
26+
// permit implicitly drops
2627
}
2728
}

src/executor/custom_executor.rs

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,62 @@
1+
use std::future::Future;
2+
3+
use crate::{concurrency_limit::ConcurrencyLimit, error::Error};
4+
5+
use super::ExecuteSync;
6+
7+
/// A closure that accepts an arbitrary sync function and returns a future that executes it.
8+
/// The Custom Executor will implicitly wrap the input function in a oneshot
9+
/// channel to erase its input/output type.
10+
pub type CustomExecutorSyncClosure = Box<
11+
dyn Fn(
12+
Box<dyn FnOnce() + Send + 'static>,
13+
) -> Box<
14+
dyn Future<Output = Result<(), Box<dyn std::error::Error + Send + Sync>>>
15+
+ Send
16+
+ 'static,
17+
> + Send
18+
+ Sync,
19+
>;
20+
21+
pub(crate) struct CustomExecutor {
22+
closure: CustomExecutorSyncClosure,
23+
concurrency_limit: ConcurrencyLimit,
24+
}
25+
26+
impl CustomExecutor {
27+
pub(crate) fn new(closure: CustomExecutorSyncClosure, max_concurrency: Option<usize>) -> Self {
28+
Self {
29+
closure,
30+
concurrency_limit: ConcurrencyLimit::new(max_concurrency),
31+
}
32+
}
33+
}
34+
35+
impl ExecuteSync for CustomExecutor {
36+
// the compiler correctly is pointing out that the custom closure isn't guaranteed to call f.
37+
// but, we leave that to the implementer to guarantee since we are limited by working with static signatures
38+
#[allow(unused_variables)]
39+
async fn execute_sync<F, R>(&self, f: F) -> Result<R, Error>
40+
where
41+
F: FnOnce() -> R + Send + 'static,
42+
R: Send + 'static,
43+
{
44+
let _permit = self.concurrency_limit.acquire_permit().await;
45+
46+
let (tx, rx) = tokio::sync::oneshot::channel();
47+
48+
let wrapped_input_closure = Box::new(|| {
49+
let res = f();
50+
if tx.send(res).is_err() {
51+
log::trace!("custom sync executor foreground dropped before it could receive the result of the sync closure");
52+
}
53+
});
54+
55+
Box::into_pin((self.closure)(wrapped_input_closure))
56+
.await
57+
.map_err(Error::BoxError)?;
58+
59+
rx.await.map_err(Error::RecvError)
60+
// permit implicitly drops
61+
}
62+
}

0 commit comments

Comments
 (0)