Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
31 commits
Select commit Hold shift + click to select a range
39fb8e1
Support ConnectToBackingGroups
sandreim Oct 3, 2025
47923b6
fix origin
sandreim Oct 3, 2025
0ede4bb
add disconnect mechanism
sandreim Oct 3, 2025
eb51a3e
move collator_protocol_helper
sandreim Oct 4, 2025
17d1c6c
comment
sandreim Oct 4, 2025
06c444f
remove helper from service
sandreim Oct 4, 2025
8fe9db5
unused
sandreim Oct 4, 2025
3b63265
unused deps in crates
sandreim Oct 4, 2025
bda8c38
Update from github-actions[bot] running command 'prdoc generate --bum…
github-actions[bot] Oct 4, 2025
9cfd857
fix tests
sandreim Oct 4, 2025
e954ea4
Merge branch 'sandreim/collator_protocol_notifications' of github.com…
sandreim Oct 4, 2025
c2dce30
Update from github-actions[bot] running command 'fmt'
github-actions[bot] Oct 4, 2025
2d78334
fmt
sandreim Oct 6, 2025
6bec872
add offset
sandreim Oct 6, 2025
3deacd3
Merge branch 'sandreim/collator_protocol_notifications' of github.com…
sandreim Oct 6, 2025
b977e32
fmt
sandreim Oct 6, 2025
f0a76d5
prdoc
sandreim Oct 6, 2025
0dbdd9c
move log
sandreim Oct 6, 2025
699b00a
impl disconnect
sandreim Oct 6, 2025
597a50c
fix some stuff
sandreim Oct 6, 2025
bf35a8e
add some tests for connect/disconnect
sandreim Oct 6, 2025
334fd92
fix cargo toml
sandreim Oct 6, 2025
9b7b37f
fix prdoc
sandreim Oct 6, 2025
af9421f
Merge branch 'alexggh/cleanup_connecting_to_backing_group' of github.…
sandreim Oct 6, 2025
89fb028
review feedback
sandreim Oct 7, 2025
cae1c25
fix comment
sandreim Oct 7, 2025
471b898
fix test comments
sandreim Oct 7, 2025
b4e6db2
review
sandreim Oct 9, 2025
85c0597
fix validator disconnect
sandreim Oct 9, 2025
dbcd11a
make Basti happy
sandreim Oct 9, 2025
d3afbeb
fix comments
sandreim Oct 9, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions cumulus/client/consensus/aura/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ workspace = true
async-trait = { workspace = true }
codec = { features = ["derive"], workspace = true, default-features = true }
futures = { workspace = true }
futures-timer = { workspace = true }
parking_lot = { workspace = true }
schnellru = { workspace = true }
tokio = { workspace = true, features = ["macros"] }
Expand Down
71 changes: 51 additions & 20 deletions cumulus/client/consensus/aura/src/collators/lookahead.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,22 +45,26 @@ use polkadot_node_subsystem::messages::CollationGenerationMessage;
use polkadot_overseer::Handle as OverseerHandle;
use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption};

use crate::{collator as collator_util, collators::claim_queue_at, export_pov_to_path};
use crate::{
collator as collator_util,
collators::{claim_queue_at, collator_protocol_helper},
export_pov_to_path,
};
use futures::prelude::*;
use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf};
use sc_consensus::BlockImport;
use sp_api::ProvideRuntimeApi;
use sp_application_crypto::AppPublic;
use sp_blockchain::HeaderBackend;
use sp_consensus_aura::{AuraApi, Slot};
use sp_core::crypto::Pair;
use sp_core::{crypto::Pair, traits::SpawnNamed};
use sp_inherents::CreateInherentDataProviders;
use sp_keystore::KeystorePtr;
use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member};
use std::{path::PathBuf, sync::Arc, time::Duration};

/// Parameters for [`run`].
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
/// Inherent data providers. Only non-consensus inherent data should be provided, i.e.
/// the timestamp, slot, and paras inherents should be omitted, as they are set by this
/// collator.
Expand Down Expand Up @@ -96,11 +100,13 @@ pub struct Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
/// The maximum percentage of the maximum PoV size that the collator can use.
/// It will be removed once <https://github.com/paritytech/polkadot-sdk/issues/6020> is fixed.
pub max_pov_percentage: Option<u32>,
/// Spawner for spawning tasks.
pub spawner: Spawner,
}

/// Run async-backing-friendly Aura.
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
pub fn run<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>(
params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,
) -> impl Future<Output = ()> + Send + 'static
where
Block: BlockT,
Expand All @@ -122,17 +128,21 @@ where
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
Spawner: SpawnNamed + Clone + 'static,
P: Pair + Send + Sync + 'static,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None })
run_with_export::<_, P, _, _, _, _, _, _, _, _, _>(ParamsWithExport {
params,
export_pov: None,
})
}

/// Parameters for [`run_with_export`].
pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS> {
pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner> {
/// The parameters.
pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>,
pub params: Params<BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>,

/// When set, the collator will export every produced `POV` to this folder.
pub export_pov: Option<PathBuf>,
Expand All @@ -142,7 +152,7 @@ pub struct ParamsWithExport<BI, CIDP, Client, Backend, RClient, CHP, Proposer, C
///
/// This is exactly the same as [`run`], but it supports the optional export of each produced `POV`
/// to the file system.
pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS>(
pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Proposer, CS, Spawner>(
ParamsWithExport { mut params, export_pov }: ParamsWithExport<
BI,
CIDP,
Expand All @@ -152,6 +162,7 @@ pub fn run_with_export<Block, P, BI, CIDP, Client, Backend, RClient, CHP, Propos
CHP,
Proposer,
CS,
Spawner,
>,
) -> impl Future<Output = ()> + Send + 'static
where
Expand All @@ -174,7 +185,8 @@ where
Proposer: ProposerInterface<Block> + Send + Sync + 'static,
CS: CollatorServiceInterface<Block> + Send + Sync + 'static,
CHP: consensus_common::ValidationCodeHashProvider<Block::Hash> + Send + 'static,
P: Pair,
Spawner: SpawnNamed + Clone + 'static,
P: Pair + Send + Sync + 'static,
P::Public: AppPublic + Member + Codec,
P::Signature: TryFrom<Vec<u8>> + Member + Codec,
{
Expand Down Expand Up @@ -215,6 +227,8 @@ where
collator_util::Collator::<Block, P, _, _, _, _, _>::new(params)
};

let mut our_slot = None;

while let Some(relay_parent_header) = import_notifications.next().await {
let relay_parent = relay_parent_header.hash();

Expand Down Expand Up @@ -290,14 +304,18 @@ where
relay_chain_slot_duration = ?params.relay_chain_slot_duration,
"Adjusted relay-chain slot to parachain slot"
);
Some(super::can_build_upon::<_, _, P>(
Some((
slot_now,
relay_slot,
timestamp,
block_hash,
included_block.hash(),
para_client,
&keystore,
slot_duration,
super::can_build_upon::<_, _, P>(
slot_now,
relay_slot,
timestamp,
block_hash,
included_block.hash(),
para_client,
&keystore,
),
))
};

Expand All @@ -316,8 +334,21 @@ where
// scheduled chains this ensures that the backlog will grow steadily.
for n_built in 0..2 {
let slot_claim = match can_build_upon(parent_hash) {
Some(fut) => match fut.await {
None => break,
Some((current_slot, slot_duration, fut)) => match fut.await {
None => {
our_slot = collator_protocol_helper::<_, _, P, _>(
params.para_client.clone(),
params.keystore.clone(),
params.overseer_handle.clone(),
params.spawner.clone(),
parent_hash,
slot_duration,
current_slot,
our_slot,
)
.await;
break
},
Some(c) => c,
},
None => break,
Expand Down
108 changes: 107 additions & 1 deletion cumulus/client/consensus/aura/src/collators/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,17 +27,20 @@ use cumulus_client_consensus_common::{self as consensus_common, ParentSearchPara
use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot};
use cumulus_primitives_core::{relay_chain::Header as RelayHeader, BlockT};
use cumulus_relay_chain_interface::RelayChainInterface;
use polkadot_node_subsystem::messages::RuntimeApiRequest;
use futures::prelude::*;
use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest};
use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot;
use polkadot_primitives::{
Hash as RelayHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash,
DEFAULT_SCHEDULING_LOOKAHEAD,
};
use sc_consensus_aura::{standalone as aura_internal, AuraApi};
use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo};
use sp_consensus_aura::SlotDuration;
use sp_core::Pair;
use sp_keystore::KeystorePtr;
use sp_timestamp::Timestamp;
use std::time::Duration;

pub mod basic;
pub mod lookahead;
Expand All @@ -52,6 +55,109 @@ pub mod slot_based;
// sanity check.
const PARENT_SEARCH_DEPTH: usize = 30;

/// The slot offset to start pre-connecting to backing groups. Represented as number
/// of seconds before own slot starts.
const PRE_CONNECT_SLOT_OFFSET: Duration = Duration::from_secs(6);

/// Task name for the collator protocol helper.
pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper";

/// Helper for triggering backing group connections early.
///
/// Returns the updated `our_slot` value.
pub async fn collator_protocol_helper<Block, Client, P, Spawner>(
client: std::sync::Arc<Client>,
keystore: sp_keystore::KeystorePtr,
mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle,
spawn_handle: Spawner,
best_block: Block::Hash,
slot_duration: SlotDuration,
current_slot: Slot,
our_slot: Option<Slot>,
) -> Option<Slot>
where
Block: sp_runtime::traits::Block,
Client: sc_client_api::HeaderBackend<Block> + Send + Sync + ProvideRuntimeApi<Block> + 'static,
Client::Api: AuraApi<Block, P::Public>,
P: sp_core::Pair + Send + Sync,
P::Public: Codec,
Spawner: sp_core::traits::SpawnNamed,
{
let authorities = client.runtime_api().authorities(best_block).unwrap_or_default();

// Check if our slot has passed and we are not expected to author again in next slot.
match (
our_slot,
aura_internal::claim_slot::<P>(current_slot + 1, &authorities, &keystore)
.await
.is_none(),
) {
(Some(last_slot), true) if current_slot > last_slot => {
tracing::debug!(target: crate::LOG_TARGET, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, current_slot);

// Send a message to the collator protocol to stop pre-connecting to backing
// groups
overseer_handle
.send_msg(
CollatorProtocolMessage::DisconnectFromBackingGroups,
"CollatorProtocolHelper",
)
.await;

return None;
},
(Some(_), false) => {
// `our_slot` is `Some` means we alredy sent pre-connect message, no need to
// proceed further.
return our_slot
},
_ => {},
}

// Check if our slot is coming up next. This means that there is still another slot
// before our turn.
let target_slot = current_slot + 2;
if aura_internal::claim_slot::<P>(target_slot, &authorities, &keystore)
.await
.is_none()
{
return our_slot
}

tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is due soon", target_slot );

// Determine our own slot timestamp.
let Some(own_slot_ts) = target_slot.timestamp(slot_duration) else {
tracing::warn!(target: crate::LOG_TARGET, "Failed to get own slot timestamp");

return our_slot;
};

let pre_connect_delay = own_slot_ts
.saturating_sub(*Timestamp::current())
.saturating_sub(PRE_CONNECT_SLOT_OFFSET.as_millis() as u64);

tracing::debug!(target: crate::LOG_TARGET, "Pre-connecting to backing groups in {}ms", pre_connect_delay);

let mut overseer_handle_clone = overseer_handle.clone();
spawn_handle.spawn(
"send-pre-connect-message",
Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP),
async move {
futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)).await;

tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message");

// Send a message to the collator protocol to pre-connect to backing groups
overseer_handle_clone
.send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper")
.await;
}
.boxed(),
);
Some(target_slot)
}

/// Check the `local_validation_code_hash` against the validation code hash in the relay chain
/// state.
///
Expand Down
Loading
Loading