From 79a87dc3b07e4d46c3235ee59ff0774f3387306c Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 30 Sep 2025 11:36:36 +0200 Subject: [PATCH 1/5] Add statement store benchmarks --- Cargo.lock | 1 + substrate/client/statement-store/Cargo.toml | 6 + .../benches/statement_store.rs | 309 ++++++++++++++++++ substrate/client/statement-store/src/lib.rs | 3 +- 4 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 substrate/client/statement-store/benches/statement_store.rs diff --git a/Cargo.lock b/Cargo.lock index e213136785be5..df4247cd6a8f9 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -20793,6 +20793,7 @@ dependencies = [ name = "sc-statement-store" version = "10.0.0" dependencies = [ + "criterion", "log", "parity-db", "parking_lot 0.12.3", diff --git a/substrate/client/statement-store/Cargo.toml b/substrate/client/statement-store/Cargo.toml index c0219b294cede..9953ad367ade8 100644 --- a/substrate/client/statement-store/Cargo.toml +++ b/substrate/client/statement-store/Cargo.toml @@ -32,3 +32,9 @@ tokio = { features = ["time"], workspace = true, default-features = true } [dev-dependencies] sp-tracing = { workspace = true } tempfile = { workspace = true } +criterion = { workspace = true, default-features = true } + +[[bench]] +name = "statement_store" +harness = false +required-features = [] diff --git a/substrate/client/statement-store/benches/statement_store.rs b/substrate/client/statement-store/benches/statement_store.rs new file mode 100644 index 0000000000000..01854ab119a75 --- /dev/null +++ b/substrate/client/statement-store/benches/statement_store.rs @@ -0,0 +1,309 @@ +// This file is part of Substrate. + +// Copyright (C) Parity Technologies (UK) Ltd. +// SPDX-License-Identifier: GPL-3.0-or-later WITH Classpath-exception-2.0 + +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. + +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU General Public License for more details. + +// You should have received a copy of the GNU General Public License +// along with this program. If not, see . + +use criterion::{criterion_group, criterion_main, Criterion}; +use sc_statement_store::Store; +use sp_core::Pair; +use sp_statement_store::{ + runtime_api::{InvalidStatement, ValidStatement, ValidateStatement}, + DecryptionKey, Proof, SignatureVerificationResult, Statement, StatementSource, StatementStore, + SubmitResult, Topic, +}; +use std::sync::Arc; + +type Extrinsic = sp_runtime::OpaqueExtrinsic; +type Hash = sp_core::H256; +type Hashing = sp_runtime::traits::BlakeTwo256; +type BlockNumber = u64; +type Header = sp_runtime::generic::Header; +type Block = sp_runtime::generic::Block; + +const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32]; +const STATEMENT_DATA_SIZE: usize = 256; +const INITIAL_STATEMENTS: usize = 1_000; +const OPERATIONS_COUNT: usize = 100; + +#[derive(Clone)] +struct TestClient; + +struct RuntimeApi { + _inner: TestClient, +} + +impl sp_api::ProvideRuntimeApi for TestClient { + type Api = RuntimeApi; + fn runtime_api(&self) -> sp_api::ApiRef { + RuntimeApi { _inner: self.clone() }.into() + } +} + +sp_api::mock_impl_runtime_apis! { + impl ValidateStatement for RuntimeApi { + fn validate_statement( + _source: StatementSource, + statement: Statement, + ) -> std::result::Result { + match statement.verify_signature() { + SignatureVerificationResult::Valid(_) => + Ok(ValidStatement { max_count: 100_000, max_size: 1_000_000 }), + SignatureVerificationResult::Invalid => Err(InvalidStatement::BadProof), + SignatureVerificationResult::NoSignature => { + if let Some(Proof::OnChain { block_hash, .. }) = statement.proof() { + if block_hash == &CORRECT_BLOCK_HASH { + Ok(ValidStatement { max_count: 100_000, max_size: 1_000_000 }) + } else { + Err(InvalidStatement::BadProof) + } + } else { + Err(InvalidStatement::BadProof) + } + }, + } + } + } +} + +impl sp_blockchain::HeaderBackend for TestClient { + fn header(&self, _hash: Hash) -> sp_blockchain::Result> { + unimplemented!() + } + fn info(&self) -> sp_blockchain::Info { + sp_blockchain::Info { + best_hash: CORRECT_BLOCK_HASH.into(), + best_number: 0, + genesis_hash: Default::default(), + finalized_hash: CORRECT_BLOCK_HASH.into(), + finalized_number: 1, + finalized_state: None, + number_leaves: 0, + block_gap: None, + } + } + fn status(&self, _hash: Hash) -> sp_blockchain::Result { + unimplemented!() + } + fn number(&self, _hash: Hash) -> sp_blockchain::Result> { + unimplemented!() + } + fn hash(&self, _number: BlockNumber) -> sp_blockchain::Result> { + unimplemented!() + } +} + +fn topic(data: u64) -> Topic { + let mut topic: Topic = Default::default(); + topic[0..8].copy_from_slice(&data.to_le_bytes()); + topic +} + +fn dec_key(data: u64) -> DecryptionKey { + let mut dec_key: DecryptionKey = Default::default(); + dec_key[0..8].copy_from_slice(&data.to_le_bytes()); + dec_key +} + +fn create_signed_statement( + id: u64, + topics: &[Topic], + dec_key: Option, + keypair: &sp_core::ed25519::Pair, +) -> Statement { + let mut statement = Statement::new(); + let mut data = vec![0u8; STATEMENT_DATA_SIZE]; + data[0..8].copy_from_slice(&id.to_le_bytes()); + statement.set_plain_data(data); + + for (i, topic) in topics.iter().enumerate() { + statement.set_topic(i, *topic); + } + + if let Some(key) = dec_key { + statement.set_decryption_key(key); + } + + statement.sign_ed25519_private(keypair); + statement +} + +fn setup_store(keypair: &sp_core::ed25519::Pair) -> (Store, tempfile::TempDir) { + let temp_dir = tempfile::Builder::new().tempdir().expect("Error creating test dir"); + let client = Arc::new(TestClient); + let mut path: std::path::PathBuf = temp_dir.path().into(); + path.push("db"); + let keystore = Arc::new(sc_keystore::LocalKeystore::in_memory()); + let store = Store::new(&path, Default::default(), client, keystore, None).unwrap(); + + for i in 0..INITIAL_STATEMENTS { + let topics = if i % 10 == 0 { vec![topic(0), topic(1)] } else { vec![] }; + let dec_key = if i % 5 == 0 { Some(dec_key(42)) } else { None }; + let statement = create_signed_statement(i as u64, &topics, dec_key, &keypair); + store.submit(statement, StatementSource::Local); + } + + (store, temp_dir) +} + +fn bench_submit(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + OPERATIONS_COUNT) + .map(|i| create_signed_statement(i as u64, &[], None, &keypair)) + .collect(); + + c.bench_function("submit", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + (store, _temp) + }, + |(store, _temp)| { + for statement in statements.clone() { + let result = store.submit(statement, StatementSource::Local); + assert!(matches!(result, SubmitResult::New(_))); + } + }, + criterion::BatchSize::LargeInput, + ) + }); +} + +fn bench_remove(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + + c.bench_function("remove", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + let hashes: Vec<_> = store + .statements() + .unwrap() + .into_iter() + .take(OPERATIONS_COUNT) + .map(|(hash, _)| hash) + .collect(); + (store, hashes, _temp) + }, + |(store, hashes, _temp)| { + for hash in hashes { + let _ = store.remove(&hash); + } + }, + criterion::BatchSize::LargeInput, + ) + }); +} + +fn bench_statement_lookup(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + + c.bench_function("statement_lookup", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + let hashes: Vec<_> = store + .statements() + .unwrap() + .into_iter() + .take(OPERATIONS_COUNT) + .map(|(hash, _)| hash) + .collect(); + (store, hashes, _temp) + }, + |(store, hashes, _temp)| { + for hash in hashes { + let _ = store.statement(&hash); + } + }, + criterion::BatchSize::LargeInput, + ) + }); +} + +fn bench_statements_all(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let (store, _temp) = setup_store(&keypair); + + c.bench_function("statements_all", |b| { + b.iter(|| { + let _ = store.statements(); + }) + }); +} + +fn bench_broadcasts(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let (store, _temp) = setup_store(&keypair); + let topics = vec![topic(0), topic(1)]; + + c.bench_function("broadcasts", |b| { + b.iter(|| { + let _ = store.broadcasts(&topics); + }) + }); +} + +fn bench_posted(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let (store, _temp) = setup_store(&keypair); + let key = dec_key(42); + + c.bench_function("posted", |b| { + b.iter(|| { + let _ = store.posted(&[], key); + }) + }); +} + +fn bench_maintain(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + + c.bench_function("maintain", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + // Mark statements for expiration by removing them + let hashes: Vec<_> = store + .statements() + .unwrap() + .into_iter() + .take(OPERATIONS_COUNT) + .map(|(hash, _)| hash) + .collect(); + for hash in hashes { + let _ = store.remove(&hash); + } + (store, _temp) + }, + |(store, _temp)| { + store.maintain(); + }, + criterion::BatchSize::LargeInput, + ) + }); +} + +criterion_group!( + benches, + bench_submit, + bench_remove, + bench_statement_lookup, + bench_statements_all, + bench_broadcasts, + bench_posted, + bench_maintain +); +criterion_main!(benches); diff --git a/substrate/client/statement-store/src/lib.rs b/substrate/client/statement-store/src/lib.rs index e58440eccbbe8..18c65511e054b 100644 --- a/substrate/client/statement-store/src/lib.rs +++ b/substrate/client/statement-store/src/lib.rs @@ -517,7 +517,8 @@ impl Store { /// Create a new instance. /// `path` will be used to open a statement database or create a new one if it does not exist. - fn new( + #[doc(hidden)] + pub fn new( path: &std::path::Path, options: Options, client: Arc, From d8d3a2074816e4864dd15513d8b56ebb246d890d Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 30 Sep 2025 16:22:49 +0200 Subject: [PATCH 2/5] Use multi thread --- .../benches/statement_store.rs | 105 ++++++++++++++---- 1 file changed, 84 insertions(+), 21 deletions(-) diff --git a/substrate/client/statement-store/benches/statement_store.rs b/substrate/client/statement-store/benches/statement_store.rs index 01854ab119a75..de057e9fe1ea3 100644 --- a/substrate/client/statement-store/benches/statement_store.rs +++ b/substrate/client/statement-store/benches/statement_store.rs @@ -36,7 +36,9 @@ type Block = sp_runtime::generic::Block; const CORRECT_BLOCK_HASH: [u8; 32] = [1u8; 32]; const STATEMENT_DATA_SIZE: usize = 256; const INITIAL_STATEMENTS: usize = 1_000; -const OPERATIONS_COUNT: usize = 100; +const NUM_THREADS: usize = 64; +const OPS_PER_THREAD: usize = 10; +const TOTAL_OPS: usize = NUM_THREADS * OPS_PER_THREAD; #[derive(Clone)] struct TestClient; @@ -160,7 +162,7 @@ fn setup_store(keypair: &sp_core::ed25519::Pair) -> (Store, tempfile::TempDir) { fn bench_submit(c: &mut Criterion) { let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); - let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + OPERATIONS_COUNT) + let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + TOTAL_OPS) .map(|i| create_signed_statement(i as u64, &[], None, &keypair)) .collect(); @@ -168,13 +170,23 @@ fn bench_submit(c: &mut Criterion) { b.iter_batched( || { let (store, _temp) = setup_store(&keypair); - (store, _temp) + (Arc::new(store), _temp) }, |(store, _temp)| { - for statement in statements.clone() { - let result = store.submit(statement, StatementSource::Local); - assert!(matches!(result, SubmitResult::New(_))); - } + std::thread::scope(|s| { + for thread_id in 0..NUM_THREADS { + let store = store.clone(); + let start = thread_id * OPS_PER_THREAD; + let end = start + OPS_PER_THREAD; + let thread_statements = statements[start..end].to_vec(); + s.spawn(move || { + for statement in thread_statements { + let result = store.submit(statement, StatementSource::Local); + assert!(matches!(result, SubmitResult::New(_))); + } + }); + } + }); }, criterion::BatchSize::LargeInput, ) @@ -192,15 +204,25 @@ fn bench_remove(c: &mut Criterion) { .statements() .unwrap() .into_iter() - .take(OPERATIONS_COUNT) + .take(TOTAL_OPS) .map(|(hash, _)| hash) .collect(); - (store, hashes, _temp) + (Arc::new(store), hashes, _temp) }, |(store, hashes, _temp)| { - for hash in hashes { - let _ = store.remove(&hash); - } + std::thread::scope(|s| { + for thread_id in 0..NUM_THREADS { + let store = store.clone(); + let start = thread_id * OPS_PER_THREAD; + let end = start + OPS_PER_THREAD; + let thread_hashes = hashes[start..end].to_vec(); + s.spawn(move || { + for hash in thread_hashes { + let _ = store.remove(&hash); + } + }); + } + }); }, criterion::BatchSize::LargeInput, ) @@ -218,15 +240,25 @@ fn bench_statement_lookup(c: &mut Criterion) { .statements() .unwrap() .into_iter() - .take(OPERATIONS_COUNT) + .take(TOTAL_OPS) .map(|(hash, _)| hash) .collect(); - (store, hashes, _temp) + (Arc::new(store), hashes, _temp) }, |(store, hashes, _temp)| { - for hash in hashes { - let _ = store.statement(&hash); - } + std::thread::scope(|s| { + for thread_id in 0..NUM_THREADS { + let store = store.clone(); + let start = thread_id * OPS_PER_THREAD; + let end = start + OPS_PER_THREAD; + let thread_hashes = hashes[start..end].to_vec(); + s.spawn(move || { + for hash in thread_hashes { + let _ = store.statement(&hash); + } + }); + } + }); }, criterion::BatchSize::LargeInput, ) @@ -236,10 +268,20 @@ fn bench_statement_lookup(c: &mut Criterion) { fn bench_statements_all(c: &mut Criterion) { let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); let (store, _temp) = setup_store(&keypair); + let store = Arc::new(store); c.bench_function("statements_all", |b| { b.iter(|| { - let _ = store.statements(); + std::thread::scope(|s| { + for _ in 0..NUM_THREADS { + let store = store.clone(); + s.spawn(move || { + for _ in 0..OPS_PER_THREAD { + let _ = store.statements(); + } + }); + } + }); }) }); } @@ -247,11 +289,22 @@ fn bench_statements_all(c: &mut Criterion) { fn bench_broadcasts(c: &mut Criterion) { let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); let (store, _temp) = setup_store(&keypair); + let store = Arc::new(store); let topics = vec![topic(0), topic(1)]; c.bench_function("broadcasts", |b| { b.iter(|| { - let _ = store.broadcasts(&topics); + std::thread::scope(|s| { + for _ in 0..NUM_THREADS { + let store = store.clone(); + let topics = topics.clone(); + s.spawn(move || { + for _ in 0..OPS_PER_THREAD { + let _ = store.broadcasts(&topics); + } + }); + } + }); }) }); } @@ -259,11 +312,21 @@ fn bench_broadcasts(c: &mut Criterion) { fn bench_posted(c: &mut Criterion) { let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); let (store, _temp) = setup_store(&keypair); + let store = Arc::new(store); let key = dec_key(42); c.bench_function("posted", |b| { b.iter(|| { - let _ = store.posted(&[], key); + std::thread::scope(|s| { + for _ in 0..NUM_THREADS { + let store = store.clone(); + s.spawn(move || { + for _ in 0..OPS_PER_THREAD { + let _ = store.posted(&[], key); + } + }); + } + }); }) }); } @@ -280,7 +343,7 @@ fn bench_maintain(c: &mut Criterion) { .statements() .unwrap() .into_iter() - .take(OPERATIONS_COUNT) + .take(TOTAL_OPS) .map(|(hash, _)| hash) .collect(); for hash in hashes { From d58b0ba2842f129fa47589e299fba00ac38660df Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Tue, 30 Sep 2025 16:32:09 +0200 Subject: [PATCH 3/5] Add mixed workload --- .../benches/statement_store.rs | 41 ++++++++++++++++++- 1 file changed, 40 insertions(+), 1 deletion(-) diff --git a/substrate/client/statement-store/benches/statement_store.rs b/substrate/client/statement-store/benches/statement_store.rs index de057e9fe1ea3..609641c508358 100644 --- a/substrate/client/statement-store/benches/statement_store.rs +++ b/substrate/client/statement-store/benches/statement_store.rs @@ -359,6 +359,44 @@ fn bench_maintain(c: &mut Criterion) { }); } +fn bench_mixed_workload(c: &mut Criterion) { + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + TOTAL_OPS) + .map(|i| create_signed_statement(i as u64, &[topic(0), topic(1)], None, &keypair)) + .collect(); + + c.bench_function("mixed_workload", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + (Arc::new(store), _temp) + }, + |(store, _temp)| { + std::thread::scope(|s| { + for thread_id in 0..NUM_THREADS { + let store = store.clone(); + let start = thread_id * OPS_PER_THREAD; + let end = start + OPS_PER_THREAD; + let thread_statements = statements[start..end].to_vec(); + let topics = vec![topic(0), topic(1)]; + s.spawn(move || { + for statement in thread_statements { + // Submit a statement + let result = store.submit(statement, StatementSource::Local); + assert!(matches!(result, SubmitResult::New(_))); + + // Query broadcasts + let _ = store.broadcasts(&topics); + } + }); + } + }); + }, + criterion::BatchSize::LargeInput, + ) + }); +} + criterion_group!( benches, bench_submit, @@ -367,6 +405,7 @@ criterion_group!( bench_statements_all, bench_broadcasts, bench_posted, - bench_maintain + bench_maintain, + bench_mixed_workload ); criterion_main!(benches); From 78802790c0fda42ed3d218f42b761507cd029a92 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Tue, 30 Sep 2025 16:18:27 +0000 Subject: [PATCH 4/5] Update from github-actions[bot] running command 'prdoc --audience node_dev --bump none' --- prdoc/pr_9884.prdoc | 14 ++++++++++++++ 1 file changed, 14 insertions(+) create mode 100644 prdoc/pr_9884.prdoc diff --git a/prdoc/pr_9884.prdoc b/prdoc/pr_9884.prdoc new file mode 100644 index 0000000000000..12fe580a2c3a6 --- /dev/null +++ b/prdoc/pr_9884.prdoc @@ -0,0 +1,14 @@ +title: Add criterion benches for statement-store +doc: +- audience: Node Dev + description: |- + # Description + + Adds micro benches simulating concurrent work with the statement-store. + + ## Integration + + This PR does not not affect downstream projects. +crates: +- name: sc-statement-store + bump: none From 4122e222ea9bdf09990e1290a68b7a13375f6804 Mon Sep 17 00:00:00 2001 From: Andrei Eres Date: Fri, 3 Oct 2025 10:10:21 +0200 Subject: [PATCH 5/5] fmt --- substrate/client/statement-store/Cargo.toml | 2 +- .../benches/statement_store.rs | 70 +++++++++---------- 2 files changed, 36 insertions(+), 36 deletions(-) diff --git a/substrate/client/statement-store/Cargo.toml b/substrate/client/statement-store/Cargo.toml index 9953ad367ade8..04edc6d0e74b9 100644 --- a/substrate/client/statement-store/Cargo.toml +++ b/substrate/client/statement-store/Cargo.toml @@ -30,9 +30,9 @@ sp-statement-store = { workspace = true, default-features = true } tokio = { features = ["time"], workspace = true, default-features = true } [dev-dependencies] +criterion = { workspace = true, default-features = true } sp-tracing = { workspace = true } tempfile = { workspace = true } -criterion = { workspace = true, default-features = true } [[bench]] name = "statement_store" diff --git a/substrate/client/statement-store/benches/statement_store.rs b/substrate/client/statement-store/benches/statement_store.rs index 609641c508358..6d8cc3ca51c26 100644 --- a/substrate/client/statement-store/benches/statement_store.rs +++ b/substrate/client/statement-store/benches/statement_store.rs @@ -360,41 +360,41 @@ fn bench_maintain(c: &mut Criterion) { } fn bench_mixed_workload(c: &mut Criterion) { - let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); - let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + TOTAL_OPS) - .map(|i| create_signed_statement(i as u64, &[topic(0), topic(1)], None, &keypair)) - .collect(); - - c.bench_function("mixed_workload", |b| { - b.iter_batched( - || { - let (store, _temp) = setup_store(&keypair); - (Arc::new(store), _temp) - }, - |(store, _temp)| { - std::thread::scope(|s| { - for thread_id in 0..NUM_THREADS { - let store = store.clone(); - let start = thread_id * OPS_PER_THREAD; - let end = start + OPS_PER_THREAD; - let thread_statements = statements[start..end].to_vec(); - let topics = vec![topic(0), topic(1)]; - s.spawn(move || { - for statement in thread_statements { - // Submit a statement - let result = store.submit(statement, StatementSource::Local); - assert!(matches!(result, SubmitResult::New(_))); - - // Query broadcasts - let _ = store.broadcasts(&topics); - } - }); - } - }); - }, - criterion::BatchSize::LargeInput, - ) - }); + let keypair = sp_core::ed25519::Pair::from_string("//Bench", None).unwrap(); + let statements: Vec<_> = (INITIAL_STATEMENTS..INITIAL_STATEMENTS + TOTAL_OPS) + .map(|i| create_signed_statement(i as u64, &[topic(0), topic(1)], None, &keypair)) + .collect(); + + c.bench_function("mixed_workload", |b| { + b.iter_batched( + || { + let (store, _temp) = setup_store(&keypair); + (Arc::new(store), _temp) + }, + |(store, _temp)| { + std::thread::scope(|s| { + for thread_id in 0..NUM_THREADS { + let store = store.clone(); + let start = thread_id * OPS_PER_THREAD; + let end = start + OPS_PER_THREAD; + let thread_statements = statements[start..end].to_vec(); + let topics = vec![topic(0), topic(1)]; + s.spawn(move || { + for statement in thread_statements { + // Submit a statement + let result = store.submit(statement, StatementSource::Local); + assert!(matches!(result, SubmitResult::New(_))); + + // Query broadcasts + let _ = store.broadcasts(&topics); + } + }); + } + }); + }, + criterion::BatchSize::LargeInput, + ) + }); } criterion_group!(