Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
4859d81
[runtime/deterministic] Fix `create_pool`
clabby Dec 25, 2025
96c730b
[parallel] Introduce data parallelism abstractions
clabby Dec 25, 2025
928e08e
docs and tests
clabby Dec 25, 2025
8685e10
integrate into coding
clabby Dec 25, 2025
07bb569
`fold_init` + map convenience functions
clabby Dec 25, 2025
95e64ca
refine docs
clabby Dec 25, 2025
d768a72
cfg-if
clabby Dec 25, 2025
ec92d96
dedup fold/fold_init
clabby Dec 25, 2025
4aef69c
complete coding integration
clabby Dec 25, 2025
a1cceb6
lint
clabby Dec 25, 2025
2585585
crate metadata + `no_std` checks
clabby Dec 25, 2025
e54551f
integrate into math
clabby Dec 26, 2025
ae1d610
new cryptography port (with bls12381 refactor)
clabby Jan 3, 2026
305fc54
only register current thread if not already contained in a thread pool
clabby Jan 3, 2026
dccc9cb
handle cryptography fallout (`reshare` + `consensus`)
clabby Jan 3, 2026
a0828a1
nit
clabby Jan 3, 2026
5f01e74
Allow supplying thread pool in bls threshold scheme
clabby Jan 4, 2026
3b515fa
Make threshold scheme generic over `Strategy`
clabby Jan 5, 2026
9b2624b
Disable default features for `parallel` in workspace
clabby Jan 5, 2026
3a203f9
review
clabby Jan 5, 2026
cf92733
Remove `IntoStrategyIterator` in favor of `ParallelBridge`
clabby Jan 6, 2026
9b931b0
remove `Strategy::join`
clabby Jan 6, 2026
59fb94e
simplify tests
clabby Jan 6, 2026
3c204ad
Upfront allocation over `par_bridge`
clabby Jan 6, 2026
5f19a75
Add helper for constructing a thread pool
clabby Jan 6, 2026
90e46b9
Use `impl Strategy` in `Space::msm`
clabby Jan 6, 2026
c24ec88
Rename `Strategy` -> `Parallel`, `Parallel` -> `ParallelRayon`, `Sequ…
clabby Jan 6, 2026
3952835
new description
clabby Jan 6, 2026
a9384e0
review
clabby Jan 6, 2026
848f48f
Move `create_pool` to `{tokio, deterministic}::Context`
clabby Jan 6, 2026
9d0ff05
change names (again.)
clabby Jan 6, 2026
674bcba
Add `create_strategy helper`
clabby Jan 6, 2026
c2d8af5
rename trait (again)
clabby Jan 6, 2026
da4d93a
Remove dangling `create_pool` methods
clabby Jan 6, 2026
6cc61ae
doc tests
clabby Jan 6, 2026
075777b
rm deterministic comment on dedicated threads
clabby Jan 6, 2026
92d9540
rebase
clabby Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .github/scripts/check_no_std.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ no_std_packages=(
commonware-utils
commonware-cryptography
commonware-storage
commonware-parallel
)

target="riscv32imac-unknown-none-elf"
Expand Down
5 changes: 5 additions & 0 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,11 @@ jobs:
continue-on-error: true
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
- name: Publish parallel
run: cargo publish --manifest-path parallel/Cargo.toml
continue-on-error: true
env:
CARGO_REGISTRY_TOKEN: ${{ secrets.CARGO_REGISTRY_TOKEN }}
- name: Publish codec
run: cargo publish --manifest-path codec/Cargo.toml
continue-on-error: true
Expand Down
21 changes: 20 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ members = [
"macros",
"math",
"p2p",
"parallel",
"resolver",
"runtime",
"storage",
Expand Down Expand Up @@ -89,6 +90,7 @@ commonware-deployer = { version = "0.0.64", path = "deployer", default-features
commonware-macros = { version = "0.0.64", path = "macros" }
commonware-math = { version = "0.0.64", path = "math", default-features = false }
commonware-p2p = { version = "0.0.64", path = "p2p" }
commonware-parallel = { version = "0.0.64", path = "parallel", default-features = false }
commonware-resolver = { version = "0.0.64", path = "resolver" }
commonware-runtime = { version = "0.0.64", path = "runtime" }
commonware-storage = { version = "0.0.64", path = "storage", default-features = false }
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ _Primitives are designed for deployment in adversarial environments. If you find
* [deployer](./deployer/README.md): Deploy infrastructure across cloud providers.
* [math](./math/README.md): Create and manipulate mathematical objects.
* [p2p](./p2p/README.md): Communicate with authenticated peers over encrypted connections.
* [parallel](./parallel/README.md): Parallelize fold operations with pluggable execution strategies.
* [resolver](./resolver/README.md): Resolve data identified by a fixed-length key.
* [runtime](./runtime/README.md): Execute asynchronous tasks with a configurable scheduler.
* [storage](./storage/README.md): Persist and retrieve data from an abstract store.
Expand Down
1 change: 1 addition & 0 deletions coding/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ bytes.workspace = true
commonware-codec = { workspace = true, features = ["std"] }
commonware-cryptography = { workspace = true, features = ["std"] }
commonware-math.workspace = true
commonware-parallel = { workspace = true, features = ["std"] }
commonware-storage = { workspace = true, features = ["std"] }
commonware-utils = { workspace = true, features = ["std"] }
futures.workspace = true
Expand Down
1 change: 1 addition & 0 deletions coding/fuzz/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ cargo-fuzz = true
[dependencies]
arbitrary = { workspace = true, features = ["derive"] }
commonware-cryptography.workspace = true
commonware-parallel = { workspace = true, features = ["std"] }
libfuzzer-sys.workspace = true
rand_chacha.workspace = true

Expand Down
7 changes: 4 additions & 3 deletions coding/fuzz/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,9 @@
use arbitrary::{Arbitrary, Unstructured};
use commonware_coding::{Config, Scheme};
use commonware_parallel::Sequential;
use std::iter;

const CONCURRENCY: usize = 1;
const STRATEGY: Sequential = Sequential;

#[derive(Debug)]
struct Shuffle {
Expand Down Expand Up @@ -70,7 +71,7 @@ pub fn fuzz<S: Scheme>(input: FuzzInput) {
minimum_shards: min,
extra_shards: recovery,
};
let (commitment, shards) = S::encode(&config, data.as_slice(), CONCURRENCY).unwrap();
let (commitment, shards) = S::encode(&config, data.as_slice(), &STRATEGY).unwrap();
assert_eq!(shards.len(), (recovery + min) as usize);
// We don't use enumerate to get u16s.
let mut shards = (0u16..).zip(shards).collect::<Vec<_>>();
Expand All @@ -97,7 +98,7 @@ pub fn fuzz<S: Scheme>(input: FuzzInput) {
&commitment,
my_checking_data,
&checked_shards,
CONCURRENCY,
&STRATEGY,
)
.unwrap();
assert_eq!(&decoded, &data);
Expand Down
46 changes: 35 additions & 11 deletions coding/src/benches/bench.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
use commonware_coding::{Config, Scheme};
use commonware_parallel::{Rayon, Sequential};
use commonware_utils::NZUsize;
use criterion::{criterion_main, BatchSize, Criterion};
use rand::{RngCore, SeedableRng as _};
use rand_chacha::ChaCha8Rng;
Expand All @@ -18,6 +20,7 @@ pub(crate) fn benchmark_encode_generic<S: Scheme>(name: &str, c: &mut Criterion)
minimum_shards: min as u16,
extra_shards: (chunks - min) as u16,
};
let strategy = Rayon::new(NZUsize!(conc)).unwrap();
c.bench_function(
&format!("{name}/msg_len={data_length} chunks={chunks} conc={conc}"),
|b| {
Expand All @@ -28,7 +31,13 @@ pub(crate) fn benchmark_encode_generic<S: Scheme>(name: &str, c: &mut Criterion)
rng.fill_bytes(&mut data);
data
},
|data| S::encode(&config, data.as_slice(), conc),
|data| {
if conc > 1 {
S::encode(&config, data.as_slice(), &strategy).unwrap()
} else {
S::encode(&config, data.as_slice(), &Sequential).unwrap()
}
},
BatchSize::SmallInput,
);
},
Expand All @@ -49,6 +58,7 @@ pub(crate) fn benchmark_decode_generic<S: Scheme>(name: &str, c: &mut Criterion)
minimum_shards: min as u16,
extra_shards: (chunks - min) as u16,
};
let strategy = Rayon::new(NZUsize!(conc)).unwrap();
c.bench_function(
&format!("{name}/msg_len={data_length} chunks={chunks} conc={conc}"),
|b| {
Expand All @@ -59,8 +69,11 @@ pub(crate) fn benchmark_decode_generic<S: Scheme>(name: &str, c: &mut Criterion)
rng.fill_bytes(&mut data);

// Encode data
let (commitment, mut shards) =
S::encode(&config, data.as_slice(), conc).unwrap();
let (commitment, mut shards) = if conc > 1 {
S::encode(&config, data.as_slice(), &strategy).unwrap()
} else {
S::encode(&config, data.as_slice(), &Sequential).unwrap()
};

let my_shard = shards.pop().unwrap();
let reshards = shards
Expand Down Expand Up @@ -99,14 +112,25 @@ pub(crate) fn benchmark_decode_generic<S: Scheme>(name: &str, c: &mut Criterion)
.unwrap()
})
.collect::<Vec<_>>();
S::decode(
&config,
&commitment,
checking_data,
&checked_shards,
conc,
)
.unwrap();
if conc > 1 {
S::decode(
&config,
&commitment,
checking_data,
&checked_shards,
&strategy,
)
.unwrap()
} else {
S::decode(
&config,
&commitment,
checking_data,
&checked_shards,
&Sequential,
)
.unwrap()
}
},
BatchSize::SmallInput,
);
Expand Down
6 changes: 3 additions & 3 deletions coding/src/benches/bench_size.rs
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
use commonware_codec::EncodeSize as _;
use commonware_coding::{Config, NoCoding, ReedSolomon, Scheme, Zoda};
use commonware_cryptography::Sha256;
use commonware_parallel::Sequential;
use rand::{RngCore as _, SeedableRng as _};
use rand_chacha::ChaCha8Rng;

const CONCURRENCY: usize = 1;
const STRATEGY: Sequential = Sequential;

fn benchmark_size<S: Scheme>(name: &str) {
let mut rng = ChaCha8Rng::seed_from_u64(0);
Expand All @@ -24,8 +25,7 @@ fn benchmark_size<S: Scheme>(name: &str) {
data
};

let (commitment, mut shards) =
S::encode(&config, data.as_slice(), CONCURRENCY).unwrap();
let (commitment, mut shards) = S::encode(&config, data.as_slice(), &STRATEGY).unwrap();
let shard = shards.pop().unwrap();
println!(
"{} (shard)/msg_len={} chunks={}: {} B",
Expand Down
20 changes: 11 additions & 9 deletions coding/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

use bytes::Buf;
use commonware_codec::{Codec, FixedSize, Read, Write};
use commonware_parallel::Strategy;
use std::fmt::Debug;

mod reed_solomon;
Expand Down Expand Up @@ -83,16 +84,17 @@ pub struct CodecConfig {
/// ```
/// use commonware_coding::{Config, ReedSolomon, Scheme as _};
/// use commonware_cryptography::Sha256;
/// use commonware_parallel::Sequential;
///
/// const CONCURRENCY: usize = 1;
/// const STRATEGY: Sequential = Sequential;
///
/// type RS = ReedSolomon<Sha256>;
///
/// let config = Config { minimum_shards: 2, extra_shards: 1 };
/// let data = b"Hello!";
/// // Turn the data into shards, and a commitment to those shards.
/// let (commitment, shards) =
/// RS::encode(&config, data.as_slice(), CONCURRENCY).unwrap();
/// RS::encode(&config, data.as_slice(), &STRATEGY).unwrap();
///
/// // Each person produces reshards, their own checked shard, and checking data
/// // to check other peoples reshards.
Expand All @@ -113,11 +115,11 @@ pub struct CodecConfig {
/// checked_shards.push(RS::check(&config, &commitment, &checking_data, i as u16, reshard).unwrap())
/// }
///
/// let data2 = RS::decode(&config, &commitment, checking_data, &checked_shards[..2], CONCURRENCY).unwrap();
/// let data2 = RS::decode(&config, &commitment, checking_data, &checked_shards[..2], &STRATEGY).unwrap();
/// assert_eq!(&data[..], &data2[..]);
///
/// // Decoding works with different shards, with a guarantee to get the same result.
/// let data3 = RS::decode(&config, &commitment, checking_data, &checked_shards[1..], CONCURRENCY).unwrap();
/// let data3 = RS::decode(&config, &commitment, checking_data, &checked_shards[1..], &STRATEGY).unwrap();
/// assert_eq!(&data[..], &data3[..]);
/// ```
pub trait Scheme: Debug + Clone + Send + Sync + 'static {
Expand Down Expand Up @@ -148,7 +150,7 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static {
fn encode(
config: &Config,
data: impl Buf,
concurrency: usize,
strategy: &impl Strategy,
) -> Result<(Self::Commitment, Vec<Self::Shard>), Self::Error>;

/// Take your own shard, check it, and produce a [Scheme::ReShard] to forward to others.
Expand Down Expand Up @@ -197,7 +199,7 @@ pub trait Scheme: Debug + Clone + Send + Sync + 'static {
commitment: &Self::Commitment,
checking_data: Self::CheckingData,
shards: &[Self::CheckedShard],
concurrency: usize,
strategy: &impl Strategy,
) -> Result<Vec<u8>, Self::Error>;
}

Expand All @@ -214,9 +216,9 @@ mod test {
use crate::reed_solomon::ReedSolomon;
use commonware_codec::Encode;
use commonware_cryptography::Sha256;
use commonware_parallel::Sequential;
use std::cmp::Reverse;

const CONCURRENCY: usize = 1;
const MAX_DATA_BYTES: usize = 1 << 31;

fn general_test<S: Scheme>(
Expand Down Expand Up @@ -249,7 +251,7 @@ mod test {
let read_cfg = CodecConfig {
maximum_shard_size: MAX_DATA_BYTES,
};
let (commitment, shards) = S::encode(&config, data, CONCURRENCY).unwrap();
let (commitment, shards) = S::encode(&config, data, &Sequential).unwrap();
// Pick out the packets we want, in reverse order.
let ((_, _, checking_data, my_checked_shard, _), other_packets) = {
let mut out = shards
Expand Down Expand Up @@ -283,7 +285,7 @@ mod test {
&commitment,
checking_data,
&checked_shards,
CONCURRENCY,
&Sequential,
)
.unwrap();
assert_eq!(&decoded, data, "{name} failed");
Expand Down
Loading
Loading