From 39fb8e12bc3c8607b833e52360f783a795068720 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 3 Oct 2025 18:17:25 +0300 Subject: [PATCH 01/37] Support ConnectToBackingGroups Signed-off-by: Andrei Sandu --- Cargo.lock | 6 ++ cumulus/client/service/Cargo.toml | 6 ++ cumulus/client/service/src/lib.rs | 66 +++++++++++++++++++ .../polkadot-omni-node/lib/src/nodes/aura.rs | 35 ++++++++-- .../src/collator_side/mod.rs | 40 ++++++++++- .../src/validator_side/mod.rs | 12 ++++ polkadot/node/subsystem-types/src/messages.rs | 6 ++ 7 files changed, 164 insertions(+), 7 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 62b2015942dfc..4be25ebdeaf82 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4577,10 +4577,14 @@ dependencies = [ "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-streams", "futures", + "parity-scale-codec", + "polkadot-node-subsystem", + "polkadot-overseer", "polkadot-primitives", "prometheus", "sc-client-api", "sc-consensus", + "sc-consensus-aura", "sc-network", "sc-network-sync", "sc-network-transactions", @@ -4593,8 +4597,10 @@ dependencies = [ "sp-api", "sp-blockchain", "sp-consensus", + "sp-consensus-aura", "sp-core 28.0.0", "sp-io", + "sp-keystore", "sp-runtime", "sp-transaction-pool", ] diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index 3ea36d70b42b6..739b30270adaf 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -15,10 +15,12 @@ workspace = true async-channel = { workspace = true } futures = { workspace = true } prometheus = { workspace = true } +codec = { workspace = true, default-features = true } # Substrate sc-client-api = { workspace = true, default-features = true } sc-consensus = { workspace = true, default-features = true } +sc-consensus-aura = { workspace = true, default-features = true } sc-network = { workspace = true, default-features = true } sc-network-sync = { workspace = true, default-features = true } sc-network-transactions = { workspace = true, default-features = true } @@ -31,13 +33,17 @@ sc-utils = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } +sp-consensus-aura = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } sp-io = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-transaction-pool = { workspace = true, default-features = true } +sp-keystore = { workspace = true, default-features = true } # Polkadot polkadot-primitives = { workspace = true, default-features = true } +polkadot-node-subsystem = { workspace = true, default-features = true } +polkadot-overseer = { workspace = true, default-features = true } # Cumulus cumulus-client-cli = { workspace = true, default-features = true } diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 62199f2704162..aecfd50778273 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -59,6 +59,18 @@ use std::{ time::{Duration, Instant}, }; +use polkadot_overseer::Handle as OverseerHandle; +use sc_consensus_aura::AuraApi; +use sp_consensus_aura::Slot; + +use sp_core::Pair; +use sp_keystore::KeystorePtr; +use sp_runtime::DigestItem; + +use codec::Codec; +use polkadot_node_subsystem::messages::CollatorProtocolMessage; +use sc_consensus_aura::{standalone::claim_slot, CompatibleDigestItem}; + /// Host functions that should be used in parachain nodes. /// /// Contains the standard substrate host functions, as well as a @@ -73,6 +85,7 @@ pub type ParachainHostFunctions = ( // In practice here we expect no more than one queued messages. const RECOVERY_CHAN_SIZE: usize = 8; const LOG_TARGET_SYNC: &str = "sync::cumulus"; +const LOG_TARGET_COLLATOR_HELPER: &str = "aura::collator-helper"; /// A hint about how long the node should wait before attempting to recover missing block data /// from the data availability layer. @@ -458,6 +471,59 @@ where Err("Stopping following imported blocks. Could not determine parachain target block".into()) } +/// Task for triggering backing group connections early. +pub async fn collator_protocol_helper( + client: Arc, + keystore: KeystorePtr, + mut overseer_handle: OverseerHandle, +) where + Client: HeaderBackend + + Send + + Sync + + BlockBackend + + BlockchainEvents + + ProvideRuntimeApi + + 'static, + Client::Api: AuraApi, + P: Pair + Send + Sync, + P::Public: Codec, +{ + let mut import_notifications = client.import_notification_stream().fuse(); + + log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Started collator protocol helper"); + + while let Some(notification) = import_notifications.next().await { + // Determine if this node is the next author + let digest = notification.header.digest(); + let slot = digest + .logs() + .iter() + .find_map(|log| >::as_aura_pre_digest(log)); + + log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Imported block with slot: {:?}", slot); + + if let Some(slot) = slot { + let authorities = + client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); + + // Determine if this node is the next author. + if claim_slot::

(slot + 2, &authorities, &keystore).await.is_some() { + log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot comes next, sending preconnect message"); + + // Send a message to the collator protocol to pre-connect to backing groups + overseer_handle + .send_msg( + CollatorProtocolMessage::ConnectToBackingGroups, + "SlotBasedBlockImport", + ) + .await; + } + } + + // TODO: Send disconnect after we've imported the last block of our slot. + } +} + /// Task for logging candidate events and some related metrics. async fn parachain_informant( para_id: ParaId, diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 39234f5adf6a2..baade2b2e66c9 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -48,7 +48,7 @@ use cumulus_client_consensus_aura::{ use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; #[allow(deprecated)] -use cumulus_client_service::CollatorSybilResistance; +use cumulus_client_service::{collator_protocol_helper, CollatorSybilResistance}; use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId}; use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface}; use futures::prelude::*; @@ -217,7 +217,8 @@ where + pallet_transaction_payment_rpc::TransactionPaymentRuntimeApi + substrate_frame_rpc_system::AccountNonceApi + GetParachainInfo, - AuraId: AuraIdT + Sync, + AuraId: AuraIdT + Sync + Send, + ::Pair: Send + Sync, { if extra_args.authoring_policy == AuthoringPolicy::SlotBased { Box::new(AuraNode::< @@ -299,7 +300,8 @@ impl, RuntimeApi, AuraId> where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, - AuraId: AuraIdT + Sync, + AuraId: AuraIdT + Sync + Send, + ::Pair: Send + Sync, { fn start_consensus( client: Arc>, @@ -320,7 +322,7 @@ where relay_chain_slot_duration: Duration, para_id: ParaId, collator_key: CollatorPair, - _overseer_handle: OverseerHandle, + overseer_handle: OverseerHandle, announce_block: Arc>) + Send + Sync>, backend: Arc>, node_extra_args: NodeExtraArgs, @@ -341,6 +343,17 @@ where client.clone(), ); + // Spawn the collator protocol helper task + task_manager.spawn_essential_handle().spawn( + "collator-protocol-helper", + None, + collator_protocol_helper::<_, _, ::Pair>( + client.clone(), + keystore.clone(), + overseer_handle, + ), + ); + let client_for_aura = client.clone(); let params = SlotBasedParams { create_inherent_data_providers: move |_, ()| async move { Ok(()) }, @@ -430,7 +443,8 @@ impl, RuntimeApi, AuraId> where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, - AuraId: AuraIdT + Sync, + AuraId: AuraIdT + Sync + Send, + ::Pair: Send + Sync, { fn start_consensus( client: Arc>, @@ -465,6 +479,17 @@ where client.clone(), ); + // Spawn the collator protocol helper task + task_manager.spawn_essential_handle().spawn( + "collator-protocol-helper", + None, + collator_protocol_helper::<_, _, ::Pair>( + client.clone(), + keystore.clone(), + overseer_handle.clone(), + ), + ); + let params = aura::ParamsWithExport { export_pov: node_extra_args.export_pov, params: AuraParams { diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 9f22ea0c76a63..523f35b8def3f 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -346,6 +346,9 @@ struct State { /// An utility for tracking all collations produced by the collator. collation_tracker: CollationTracker, + + /// Should we be connected to backers ? + connect_to_backers: bool, } impl State { @@ -373,6 +376,7 @@ impl State { advertisement_timeouts: Default::default(), reputation, collation_tracker: Default::default(), + connect_to_backers: false, } } } @@ -796,6 +800,29 @@ async fn process_msg( use CollatorProtocolMessage::*; match msg { + ConnectToBackingGroups => { + // This message is used to instruct the collator to pre-connect to backing groups. + // For now, we do not need to take any action here. + gum::debug!( + target: LOG_TARGET, + "Received PreConnectToBackingGroups message." + ); + state.connect_to_backers = true; + + if let Some(para_id) = state.collating_on { + connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, para_id) + .await; + } + }, + DisconnectFromBackingGroups => { + // This message is used to instruct the collator to disconnect from backing groups. + // For now, we do not need to take any action here. + gum::debug!( + target: LOG_TARGET, + "Received DisconnectFromBackingGroups message." + ); + state.connect_to_backers = false; + }, CollateOn(id) => { state.collating_on = Some(id); state.implicit_view = Some(ImplicitView::new(Some(id))); @@ -1270,8 +1297,15 @@ async fn handle_network_msg( handle_our_view_change(ctx, runtime, state, view).await?; // Connect only if we are collating on a para. if let Some(para_id) = state.collating_on { - connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, para_id) + if state.connect_to_backers { + connect_to_validators( + ctx, + &state.implicit_view, + &state.per_relay_parent, + para_id, + ) .await; + } } }, PeerMessage(remote, msg) => { @@ -1763,7 +1797,9 @@ async fn run_inner( // Connect only if we are collating on a para. if let Some(para_id) = state.collating_on { - connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent, para_id).await; + if state.connect_to_backers { + connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent, para_id).await; + } } gum::trace!( diff --git a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs index 8ed62bb32208d..bc7e5ab87dec5 100644 --- a/polkadot/node/network/collator-protocol/src/validator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/validator_side/mod.rs @@ -1464,6 +1464,18 @@ async fn process_msg( let _timer = state.metrics.time_process_msg(); match msg { + ConnectToBackingGroups => { + gum::warn!( + target: LOG_TARGET, + "PreConnectToBackingGroups message is not expected on the validator side of the protocol", + ); + }, + DisconnectFromBackingGroups => { + gum::warn!( + target: LOG_TARGET, + "DisconnectFromBackingGroups message is not expected on the validator side of the protocol", + ); + }, CollateOn(id) => { gum::warn!( target: LOG_TARGET, diff --git a/polkadot/node/subsystem-types/src/messages.rs b/polkadot/node/subsystem-types/src/messages.rs index 28d8c0ebf7675..6866979fa8dce 100644 --- a/polkadot/node/subsystem-types/src/messages.rs +++ b/polkadot/node/subsystem-types/src/messages.rs @@ -262,6 +262,12 @@ pub enum CollatorProtocolMessage { /// /// The hash is the relay parent. Seconded(Hash, SignedFullStatement), + /// A message sent by Cumulus consensus engine to the collator protocol to + /// pre-connect to backing groups at all allowed relay parents. + ConnectToBackingGroups, + /// A message sent by Cumulus consensus engine to the collator protocol to + /// disconnect from backing groups. + DisconnectFromBackingGroups, } impl Default for CollatorProtocolMessage { From 47923b6f1751a6d5e3ff9c7b6c8e24049765d9e8 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 3 Oct 2025 18:25:53 +0300 Subject: [PATCH 02/37] fix origin Signed-off-by: Andrei Sandu --- cumulus/client/service/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index aecfd50778273..345822bdad6ef 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -514,7 +514,7 @@ pub async fn collator_protocol_helper( overseer_handle .send_msg( CollatorProtocolMessage::ConnectToBackingGroups, - "SlotBasedBlockImport", + "CollatorProtocolHelper", ) .await; } From 0ede4bb6fdc4ec04d8f34d141229b67cc48d0b02 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 3 Oct 2025 19:05:05 +0300 Subject: [PATCH 03/37] add disconnect mechanism Signed-off-by: Andrei Sandu --- cumulus/client/service/src/lib.rs | 42 +++++++++++++++++++++++-------- 1 file changed, 32 insertions(+), 10 deletions(-) diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 345822bdad6ef..e9227533da6dc 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -492,6 +492,9 @@ pub async fn collator_protocol_helper( log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Started collator protocol helper"); + // Our own slot number, known in advance. + let mut our_slot = None; + while let Some(notification) = import_notifications.next().await { // Determine if this node is the next author let digest = notification.header.digest(); @@ -502,25 +505,44 @@ pub async fn collator_protocol_helper( log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Imported block with slot: {:?}", slot); - if let Some(slot) = slot { - let authorities = - client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); + let Some(slot) = slot else { + continue; + }; + + let authorities = + client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); - // Determine if this node is the next author. - if claim_slot::

(slot + 2, &authorities, &keystore).await.is_some() { - log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot comes next, sending preconnect message"); + // Check if our slot has passed and we are not expected to author again in next slot. + match (our_slot, claim_slot::

(slot + 1, &authorities, &keystore).await.is_none()) { + (Some(last_slot), true) if slot > last_slot => { + log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, slot); - // Send a message to the collator protocol to pre-connect to backing groups + // Send a message to the collator protocol to stop pre-connecting to backing + // groups overseer_handle .send_msg( - CollatorProtocolMessage::ConnectToBackingGroups, + CollatorProtocolMessage::DisconnectFromBackingGroups, "CollatorProtocolHelper", ) .await; - } + + our_slot = None; + }, + _ => {}, } - // TODO: Send disconnect after we've imported the last block of our slot. + // Check if our slot is coming up next. This means that there is still another slot + // before our turn. + let target_slot = slot + 2; + if claim_slot::

(target_slot, &authorities, &keystore).await.is_some() { + log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot {} comes next, sending preconnect message ", target_slot ); + // Send a message to the collator protocol to pre-connect to backing groups + overseer_handle + .send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper") + .await; + + our_slot = Some(target_slot); + } } } From eb51a3e2f005577a091acc0a0432570141a3c98a Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 12:58:34 +0300 Subject: [PATCH 04/37] move collator_protocol_helper Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 92 +++++++++++++++++++ .../polkadot-omni-node/lib/src/nodes/aura.rs | 3 +- 2 files changed, 94 insertions(+), 1 deletion(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index e372162f21332..c4d348c0ad319 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -433,3 +433,95 @@ where Ok(block_import_params) } + +/// Task for triggering backing group connections early. +/// +/// This helper monitors block imports and proactively connects to backing groups +/// when the collator's slot is approaching, improving network connectivity for +/// slot-based collation. +pub async fn collator_protocol_helper( + client: std::sync::Arc, + keystore: sp_keystore::KeystorePtr, + mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle, +) where + Block: sp_runtime::traits::Block, + Client: sc_client_api::HeaderBackend + + Send + + Sync + + sc_client_api::BlockBackend + + sc_client_api::BlockchainEvents + + ProvideRuntimeApi + + 'static, + Client::Api: AuraApi, + P: sp_core::Pair + Send + Sync, + P::Public: Codec, +{ + use polkadot_node_subsystem::messages::CollatorProtocolMessage; + use sc_consensus_aura::CompatibleDigestItem; + use sp_runtime::DigestItem; + + let mut import_notifications = client.import_notification_stream().fuse(); + + tracing::debug!(target: crate::LOG_TARGET, "Started collator protocol helper"); + + // Our own slot number, known in advance. + let mut our_slot = None; + + while let Some(notification) = import_notifications.next().await { + // Determine if this node is the next author + let digest = notification.header.digest(); + let slot = digest + .logs() + .iter() + .find_map(|log| >::as_aura_pre_digest(log)); + + tracing::debug!(target: crate::LOG_TARGET, "Imported block with slot: {:?}", slot); + + let Some(slot) = slot else { + continue; + }; + + let authorities = + client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); + + // Check if our slot has passed and we are not expected to author again in next slot. + match ( + our_slot, + aura_internal::claim_slot::

(slot + 1, &authorities, &keystore) + .await + .is_none(), + ) { + (Some(last_slot), true) if slot > last_slot => { + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, slot); + + // Send a message to the collator protocol to stop pre-connecting to backing + // groups + overseer_handle + .send_msg( + CollatorProtocolMessage::DisconnectFromBackingGroups, + "CollatorProtocolHelper", + ) + .await; + + our_slot = None; + }, + _ => {}, + } + + // Check if our slot is coming up next. This means that there is still another slot + // before our turn. + let target_slot = slot + 2; + if aura_internal::claim_slot::

(target_slot, &authorities, &keystore) + .await + .is_some() + { + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} comes next, sending preconnect message ", target_slot ); + // Send a message to the collator protocol to pre-connect to backing groups + overseer_handle + .send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper") + .await; + + our_slot = Some(target_slot); + } + } +} diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index baade2b2e66c9..28c5301fcf343 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -39,6 +39,7 @@ use cumulus_client_consensus_aura::collators::slot_based::{ self as slot_based, Params as SlotBasedParams, }; use cumulus_client_consensus_aura::{ + collator::collator_protocol_helper, collators::{ lookahead::{self as aura, Params as AuraParams}, slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle}, @@ -48,7 +49,7 @@ use cumulus_client_consensus_aura::{ use cumulus_client_consensus_proposer::ProposerInterface; use cumulus_client_consensus_relay_chain::Verifier as RelayChainVerifier; #[allow(deprecated)] -use cumulus_client_service::{collator_protocol_helper, CollatorSybilResistance}; +use cumulus_client_service::CollatorSybilResistance; use cumulus_primitives_core::{relay_chain::ValidationCode, GetParachainInfo, ParaId}; use cumulus_relay_chain_interface::{OverseerHandle, RelayChainInterface}; use futures::prelude::*; From 17d1c6c53225fcf01d6c32f0c18664b4028cff55 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 13:03:25 +0300 Subject: [PATCH 05/37] comment Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index c4d348c0ad319..952433853305d 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -437,8 +437,7 @@ where /// Task for triggering backing group connections early. /// /// This helper monitors block imports and proactively connects to backing groups -/// when the collator's slot is approaching, improving network connectivity for -/// slot-based collation. +/// when the collator's slot is approaching, improving network connectivity. pub async fn collator_protocol_helper( client: std::sync::Arc, keystore: sp_keystore::KeystorePtr, From 06c444f2e71f0442917cc0d447df5365b1a96241 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 13:03:52 +0300 Subject: [PATCH 06/37] remove helper from service Signed-off-by: Andrei Sandu --- cumulus/client/service/src/lib.rs | 86 ------------------------------- 1 file changed, 86 deletions(-) diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index e9227533da6dc..79fd63192971b 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -60,16 +60,6 @@ use std::{ }; use polkadot_overseer::Handle as OverseerHandle; -use sc_consensus_aura::AuraApi; -use sp_consensus_aura::Slot; - -use sp_core::Pair; -use sp_keystore::KeystorePtr; -use sp_runtime::DigestItem; - -use codec::Codec; -use polkadot_node_subsystem::messages::CollatorProtocolMessage; -use sc_consensus_aura::{standalone::claim_slot, CompatibleDigestItem}; /// Host functions that should be used in parachain nodes. /// @@ -85,7 +75,6 @@ pub type ParachainHostFunctions = ( // In practice here we expect no more than one queued messages. const RECOVERY_CHAN_SIZE: usize = 8; const LOG_TARGET_SYNC: &str = "sync::cumulus"; -const LOG_TARGET_COLLATOR_HELPER: &str = "aura::collator-helper"; /// A hint about how long the node should wait before attempting to recover missing block data /// from the data availability layer. @@ -471,81 +460,6 @@ where Err("Stopping following imported blocks. Could not determine parachain target block".into()) } -/// Task for triggering backing group connections early. -pub async fn collator_protocol_helper( - client: Arc, - keystore: KeystorePtr, - mut overseer_handle: OverseerHandle, -) where - Client: HeaderBackend - + Send - + Sync - + BlockBackend - + BlockchainEvents - + ProvideRuntimeApi - + 'static, - Client::Api: AuraApi, - P: Pair + Send + Sync, - P::Public: Codec, -{ - let mut import_notifications = client.import_notification_stream().fuse(); - - log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Started collator protocol helper"); - - // Our own slot number, known in advance. - let mut our_slot = None; - - while let Some(notification) = import_notifications.next().await { - // Determine if this node is the next author - let digest = notification.header.digest(); - let slot = digest - .logs() - .iter() - .find_map(|log| >::as_aura_pre_digest(log)); - - log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Imported block with slot: {:?}", slot); - - let Some(slot) = slot else { - continue; - }; - - let authorities = - client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); - - // Check if our slot has passed and we are not expected to author again in next slot. - match (our_slot, claim_slot::

(slot + 1, &authorities, &keystore).await.is_none()) { - (Some(last_slot), true) if slot > last_slot => { - log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, slot); - - // Send a message to the collator protocol to stop pre-connecting to backing - // groups - overseer_handle - .send_msg( - CollatorProtocolMessage::DisconnectFromBackingGroups, - "CollatorProtocolHelper", - ) - .await; - - our_slot = None; - }, - _ => {}, - } - - // Check if our slot is coming up next. This means that there is still another slot - // before our turn. - let target_slot = slot + 2; - if claim_slot::

(target_slot, &authorities, &keystore).await.is_some() { - log::debug!(target: LOG_TARGET_COLLATOR_HELPER, "Our slot {} comes next, sending preconnect message ", target_slot ); - // Send a message to the collator protocol to pre-connect to backing groups - overseer_handle - .send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper") - .await; - - our_slot = Some(target_slot); - } - } -} - /// Task for logging candidate events and some related metrics. async fn parachain_informant( para_id: ParaId, From 8fe9db55ab6eb9ef4012a01def6c394b29791133 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 13:05:49 +0300 Subject: [PATCH 07/37] unused Signed-off-by: Andrei Sandu --- cumulus/client/service/src/lib.rs | 2 -- 1 file changed, 2 deletions(-) diff --git a/cumulus/client/service/src/lib.rs b/cumulus/client/service/src/lib.rs index 79fd63192971b..62199f2704162 100644 --- a/cumulus/client/service/src/lib.rs +++ b/cumulus/client/service/src/lib.rs @@ -59,8 +59,6 @@ use std::{ time::{Duration, Instant}, }; -use polkadot_overseer::Handle as OverseerHandle; - /// Host functions that should be used in parachain nodes. /// /// Contains the standard substrate host functions, as well as a From 3b632659d135ebf0452359e95ac66d40069c15e8 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 13:17:19 +0300 Subject: [PATCH 08/37] unused deps in crates Signed-off-by: Andrei Sandu --- Cargo.lock | 5 ----- cumulus/client/service/Cargo.toml | 5 ----- 2 files changed, 10 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4be25ebdeaf82..26f4df522c18f 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4577,14 +4577,11 @@ dependencies = [ "cumulus-relay-chain-minimal-node", "cumulus-relay-chain-streams", "futures", - "parity-scale-codec", - "polkadot-node-subsystem", "polkadot-overseer", "polkadot-primitives", "prometheus", "sc-client-api", "sc-consensus", - "sc-consensus-aura", "sc-network", "sc-network-sync", "sc-network-transactions", @@ -4597,10 +4594,8 @@ dependencies = [ "sp-api", "sp-blockchain", "sp-consensus", - "sp-consensus-aura", "sp-core 28.0.0", "sp-io", - "sp-keystore", "sp-runtime", "sp-transaction-pool", ] diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index 739b30270adaf..bcbf3c1ee536e 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -15,12 +15,10 @@ workspace = true async-channel = { workspace = true } futures = { workspace = true } prometheus = { workspace = true } -codec = { workspace = true, default-features = true } # Substrate sc-client-api = { workspace = true, default-features = true } sc-consensus = { workspace = true, default-features = true } -sc-consensus-aura = { workspace = true, default-features = true } sc-network = { workspace = true, default-features = true } sc-network-sync = { workspace = true, default-features = true } sc-network-transactions = { workspace = true, default-features = true } @@ -33,16 +31,13 @@ sc-utils = { workspace = true, default-features = true } sp-api = { workspace = true, default-features = true } sp-blockchain = { workspace = true, default-features = true } sp-consensus = { workspace = true, default-features = true } -sp-consensus-aura = { workspace = true, default-features = true } sp-core = { workspace = true, default-features = true } sp-io = { workspace = true, default-features = true } sp-runtime = { workspace = true, default-features = true } sp-transaction-pool = { workspace = true, default-features = true } -sp-keystore = { workspace = true, default-features = true } # Polkadot polkadot-primitives = { workspace = true, default-features = true } -polkadot-node-subsystem = { workspace = true, default-features = true } polkadot-overseer = { workspace = true, default-features = true } # Cumulus From bda8c384e1649f41b77ed0d4d66c2fd31a556ea0 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 4 Oct 2025 10:25:03 +0000 Subject: [PATCH 09/37] Update from github-actions[bot] running command 'prdoc generate --bump major' --- prdoc/pr_9929.prdoc | 19 +++++++++++++++++++ 1 file changed, 19 insertions(+) create mode 100644 prdoc/pr_9929.prdoc diff --git a/prdoc/pr_9929.prdoc b/prdoc/pr_9929.prdoc new file mode 100644 index 0000000000000..9494bd809a28e --- /dev/null +++ b/prdoc/pr_9929.prdoc @@ -0,0 +1,19 @@ +title: '[WIP] Cumulus: pre-connect to backers before own slot' +doc: +- audience: Todo + description: |- + On top of https://github.com/paritytech/polkadot-sdk/pull/9178. Implements a mechanism to pre-connect to backers, see https://github.com/paritytech/polkadot-sdk/issues/9767#issuecomment-3306292493 + + TODO: + - [ ] improve logic to handle 24s slots, we'd want to start connecting later than the begging of the prev author slot. +crates: +- name: cumulus-client-service + bump: major +- name: polkadot-omni-node-lib + bump: major +- name: polkadot-collator-protocol + bump: major +- name: polkadot-node-subsystem-types + bump: major +- name: cumulus-client-consensus-aura + bump: major From 9cfd857dc093207b329f164a6a87d456d7cf1117 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Sat, 4 Oct 2025 15:45:03 +0300 Subject: [PATCH 10/37] fix tests Signed-off-by: Andrei Sandu --- .../src/collator_side/tests/mod.rs | 29 +++++++++++++++++++ .../tests/prospective_parachains.rs | 8 +++++ 2 files changed, 37 insertions(+) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 1627e93c75dec..9c3423ba176b2 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -567,6 +567,7 @@ fn v1_protocol_rejected() { ReputationAggregator::new(|_| true), |mut test_harness| async move { let virtual_overseer = &mut test_harness.virtual_overseer; + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -623,6 +624,9 @@ fn advertise_and_send_collation() { let mut virtual_overseer = test_harness.virtual_overseer; let mut req_v2_cfg = test_harness.req_v2_cfg; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id), @@ -823,6 +827,9 @@ fn delay_reputation_change() { let mut virtual_overseer = test_harness.virtual_overseer; let mut req_v2_cfg = test_harness.req_v2_cfg; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id), @@ -976,6 +983,12 @@ fn collators_declare_to_connected_peers() { let peer = test_state.validator_peer_id[0]; let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + overseer_send( + &mut test_harness.virtual_overseer, + CollatorProtocolMessage::ConnectToBackingGroups, + ) + .await; + overseer_send( &mut test_harness.virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id), @@ -1024,6 +1037,8 @@ fn collations_are_only_advertised_to_validators_with_correct_view() { let peer2 = test_state.current_group_validator_peer_ids()[1]; let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -1099,6 +1114,8 @@ fn collate_on_two_different_relay_chain_blocks() { let peer2 = test_state.current_group_validator_peer_ids()[1]; let validator_id2 = test_state.current_group_validator_authority_ids()[1].clone(); + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -1189,6 +1206,8 @@ fn validator_reconnect_does_not_advertise_a_second_time() { let peer = test_state.current_group_validator_peer_ids()[0]; let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -1253,6 +1272,8 @@ fn collators_reject_declare_messages() { let peer = test_state.current_group_validator_peer_ids()[0]; let validator_id = test_state.current_group_validator_authority_ids()[0].clone(); + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -1320,6 +1341,8 @@ where let virtual_overseer = &mut test_harness.virtual_overseer; let req_cfg = &mut test_harness.req_v2_cfg; + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -1471,6 +1494,9 @@ fn connect_to_group_in_view() { let mut virtual_overseer = test_harness.virtual_overseer; let mut req_cfg = test_harness.req_v2_cfg; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id), @@ -1602,6 +1628,9 @@ fn connect_with_no_cores_assigned() { let mut virtual_overseer = test_harness.virtual_overseer; let req_cfg = test_harness.req_v2_cfg; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; + overseer_send( &mut virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id), diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index 48f40fec6f05c..b307d0bf97186 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -341,6 +341,8 @@ fn distribute_collation_from_implicit_view(#[case] validator_sends_view_first: b |mut test_harness| async move { let virtual_overseer = &mut test_harness.virtual_overseer; + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + // Set collating para id. overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -516,6 +518,8 @@ fn distribute_collation_up_to_limit() { // Grandparent of head `a`. let head_b = Hash::from_low_u64_be(130); + overseer_send(virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + // Set collating para id. overseer_send(virtual_overseer, CollatorProtocolMessage::CollateOn(test_state.para_id)) .await; @@ -644,6 +648,8 @@ fn send_parent_head_data_for_elastic_scaling() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + // Set collating para id. overseer_send( &mut virtual_overseer, @@ -773,6 +779,8 @@ fn advertise_and_send_collation_by_hash() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + // Set collating para id. overseer_send( &mut virtual_overseer, From c2dce30db1974faed59f2a603828e8e6b83ae3d8 Mon Sep 17 00:00:00 2001 From: "cmd[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Sat, 4 Oct 2025 14:17:20 +0000 Subject: [PATCH 11/37] Update from github-actions[bot] running command 'fmt' --- cumulus/client/service/Cargo.toml | 2 +- .../src/collator_side/tests/prospective_parachains.rs | 6 ++++-- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/cumulus/client/service/Cargo.toml b/cumulus/client/service/Cargo.toml index bcbf3c1ee536e..ccd866b46e314 100644 --- a/cumulus/client/service/Cargo.toml +++ b/cumulus/client/service/Cargo.toml @@ -37,8 +37,8 @@ sp-runtime = { workspace = true, default-features = true } sp-transaction-pool = { workspace = true, default-features = true } # Polkadot -polkadot-primitives = { workspace = true, default-features = true } polkadot-overseer = { workspace = true, default-features = true } +polkadot-primitives = { workspace = true, default-features = true } # Cumulus cumulus-client-cli = { workspace = true, default-features = true } diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index b307d0bf97186..cde84136d3342 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -648,7 +648,8 @@ fn send_parent_head_data_for_elastic_scaling() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; - overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; // Set collating para id. overseer_send( @@ -779,7 +780,8 @@ fn advertise_and_send_collation_by_hash() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; - overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; // Set collating para id. overseer_send( From 2d78334763a8e09c3cb64fca4d268bbdffa5baac Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 11:48:22 +0300 Subject: [PATCH 12/37] fmt Signed-off-by: Andrei Sandu --- .../src/collator_side/tests/prospective_parachains.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs index b307d0bf97186..cde84136d3342 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/prospective_parachains.rs @@ -648,7 +648,8 @@ fn send_parent_head_data_for_elastic_scaling() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; - overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; // Set collating para id. overseer_send( @@ -779,7 +780,8 @@ fn advertise_and_send_collation_by_hash() { let head_b = Hash::from_low_u64_be(129); let head_b_num: u32 = 63; - overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups).await; + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; // Set collating para id. overseer_send( From 6bec87250bf537e168fa7fda416931a60cf20ba1 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 11:52:47 +0300 Subject: [PATCH 13/37] add offset Signed-off-by: Andrei Sandu --- Cargo.lock | 1 + cumulus/client/consensus/aura/Cargo.toml | 1 + cumulus/client/consensus/aura/src/collator.rs | 64 ++++++++++++++++--- .../polkadot-omni-node/lib/src/nodes/aura.rs | 16 +++-- 4 files changed, 66 insertions(+), 16 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 26f4df522c18f..acc5a141b6e28 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4358,6 +4358,7 @@ dependencies = [ "cumulus-test-client", "cumulus-test-relay-sproof-builder", "futures", + "futures-timer", "parity-scale-codec", "parking_lot 0.12.3", "polkadot-node-primitives", diff --git a/cumulus/client/consensus/aura/Cargo.toml b/cumulus/client/consensus/aura/Cargo.toml index 8dca303ffebdb..6dd43eeeaa40c 100644 --- a/cumulus/client/consensus/aura/Cargo.toml +++ b/cumulus/client/consensus/aura/Cargo.toml @@ -15,6 +15,7 @@ workspace = true async-trait = { workspace = true } codec = { features = ["derive"], workspace = true, default-features = true } futures = { workspace = true } +futures-timer = { workspace = true } parking_lot = { workspace = true } schnellru = { workspace = true } tokio = { workspace = true, features = ["macros"] } diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 952433853305d..2b082692277af 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -59,6 +59,13 @@ use sp_state_machine::StorageChanges; use sp_timestamp::Timestamp; use std::{error::Error, time::Duration}; +/// The slot offset to start pre-connecting to backing groups. Represented as number +/// of seconds before own slot starts. +const PRE_CONNECT_SLOT_OFFSET: Duration = Duration::from_secs(6); + +/// Task name for the collator protocol helper. +pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper"; + /// Parameters for instantiating a [`Collator`]. pub struct Params { /// A builder for inherent data builders. @@ -438,10 +445,11 @@ where /// /// This helper monitors block imports and proactively connects to backing groups /// when the collator's slot is approaching, improving network connectivity. -pub async fn collator_protocol_helper( +pub async fn collator_protocol_helper( client: std::sync::Arc, keystore: sp_keystore::KeystorePtr, mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle, + spawn_handle: Spawner, ) where Block: sp_runtime::traits::Block, Client: sc_client_api::HeaderBackend @@ -454,6 +462,7 @@ pub async fn collator_protocol_helper( Client::Api: AuraApi, P: sp_core::Pair + Send + Sync, P::Public: Codec, + Spawner: sp_core::traits::SpawnNamed, { use polkadot_node_subsystem::messages::CollatorProtocolMessage; use sc_consensus_aura::CompatibleDigestItem; @@ -480,6 +489,12 @@ pub async fn collator_protocol_helper( continue; }; + // Determine current slot duration. + let Ok(slot_duration) = client.runtime_api().slot_duration(notification.header.hash()) + else { + continue; + }; + let authorities = client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); @@ -512,15 +527,46 @@ pub async fn collator_protocol_helper( let target_slot = slot + 2; if aura_internal::claim_slot::

(target_slot, &authorities, &keystore) .await - .is_some() + .is_none() { - tracing::debug!(target: crate::LOG_TARGET, "Our slot {} comes next, sending preconnect message ", target_slot ); - // Send a message to the collator protocol to pre-connect to backing groups - overseer_handle - .send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper") - .await; - - our_slot = Some(target_slot); + continue; } + + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is due soon", target_slot ); + + // Determine our own slot timestamp. + let Some(own_slot_ts) = target_slot.timestamp(slot_duration) else { + tracing::warn!(target: crate::LOG_TARGET, "Failed to get own slot duration"); + + continue; + }; + + let pre_connect_delay = own_slot_ts + .saturating_sub(*Timestamp::current()) + .saturating_sub(PRE_CONNECT_SLOT_OFFSET.as_millis() as u64); + + tracing::debug!(target: crate::LOG_TARGET, "Pre-connecting to backing groups in {}ms", pre_connect_delay); + + let mut overseer_handle_clone = overseer_handle.clone(); + spawn_handle.spawn( + "send-pre-connect-message", + Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), + async move { + tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); + + if pre_connect_delay > 0 { + futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)).await; + } + // Send a message to the collator protocol to pre-connect to backing groups + overseer_handle_clone + .send_msg( + CollatorProtocolMessage::ConnectToBackingGroups, + "CollatorProtocolHelper", + ) + .await; + } + .boxed(), + ); + our_slot = Some(target_slot); } } diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 28c5301fcf343..6ec7267eb3e06 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -39,7 +39,7 @@ use cumulus_client_consensus_aura::collators::slot_based::{ self as slot_based, Params as SlotBasedParams, }; use cumulus_client_consensus_aura::{ - collator::collator_protocol_helper, + collator::{collator_protocol_helper, COLLATOR_PROTOCOL_HELPER_TASK_GROUP}, collators::{ lookahead::{self as aura, Params as AuraParams}, slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle}, @@ -346,12 +346,13 @@ where // Spawn the collator protocol helper task task_manager.spawn_essential_handle().spawn( - "collator-protocol-helper", - None, - collator_protocol_helper::<_, _, ::Pair>( + COLLATOR_PROTOCOL_HELPER_TASK_GROUP, + Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), + collator_protocol_helper::<_, _, ::Pair, _>( client.clone(), keystore.clone(), overseer_handle, + task_manager.spawn_handle(), ), ); @@ -482,12 +483,13 @@ where // Spawn the collator protocol helper task task_manager.spawn_essential_handle().spawn( - "collator-protocol-helper", - None, - collator_protocol_helper::<_, _, ::Pair>( + COLLATOR_PROTOCOL_HELPER_TASK_GROUP, + Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), + collator_protocol_helper::<_, _, ::Pair, _>( client.clone(), keystore.clone(), overseer_handle.clone(), + task_manager.spawn_handle(), ), ); From b977e320084ce7158f15ec2ed45a773c1d0a6a02 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 12:01:50 +0300 Subject: [PATCH 14/37] fmt Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 2b082692277af..b7408c4b283c9 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -555,7 +555,8 @@ pub async fn collator_protocol_helper( tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); if pre_connect_delay > 0 { - futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)).await; + futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)) + .await; } // Send a message to the collator protocol to pre-connect to backing groups overseer_handle_clone From f0a76d5396b725aaab08145d86a08078c6bfdbbe Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 12:13:39 +0300 Subject: [PATCH 15/37] prdoc Signed-off-by: Andrei Sandu --- prdoc/pr_9929.prdoc | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/prdoc/pr_9929.prdoc b/prdoc/pr_9929.prdoc index 9494bd809a28e..742172b90bb07 100644 --- a/prdoc/pr_9929.prdoc +++ b/prdoc/pr_9929.prdoc @@ -1,19 +1,17 @@ -title: '[WIP] Cumulus: pre-connect to backers before own slot' +title: 'Cumulus: pre-connect to backers before own slot' doc: -- audience: Todo +- audience: Node Dev description: |- - On top of https://github.com/paritytech/polkadot-sdk/pull/9178. Implements a mechanism to pre-connect to backers, see https://github.com/paritytech/polkadot-sdk/issues/9767#issuecomment-3306292493 - - TODO: - - [ ] improve logic to handle 24s slots, we'd want to start connecting later than the begging of the prev author slot. + Implements a mechanism to pre-connect to backers, see https://github.com/paritytech/polkadot-sdk/issues/9767#issuecomment-3306292493. + Improve backing group connectivity. crates: - name: cumulus-client-service - bump: major + bump: minor - name: polkadot-omni-node-lib - bump: major + bump: minor - name: polkadot-collator-protocol - bump: major + bump: patch - name: polkadot-node-subsystem-types bump: major - name: cumulus-client-consensus-aura - bump: major + bump: minor From 0dbdd9c6f70486d4ef016443c536095a3a4946d1 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 12:13:56 +0300 Subject: [PATCH 16/37] move log Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index b7408c4b283c9..1d9ff1abf4e9b 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -552,12 +552,13 @@ pub async fn collator_protocol_helper( "send-pre-connect-message", Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), async move { - tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); - if pre_connect_delay > 0 { futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)) .await; } + + tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); + // Send a message to the collator protocol to pre-connect to backing groups overseer_handle_clone .send_msg( From 699b00a8d9e663b8f58b501f8988822c61722be2 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 13:26:14 +0300 Subject: [PATCH 17/37] impl disconnect Signed-off-by: Andrei Sandu --- .../src/collator_side/mod.rs | 111 +++++++++++++----- 1 file changed, 79 insertions(+), 32 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 523f35b8def3f..0c151f5e18505 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -406,7 +406,15 @@ async fn distribute_collation( // We should already be connected to the validators, but if we aren't, we will try to connect to // them now. - connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, id).await; + update_validator_connections( + ctx, + &state.peer_ids, + &state.implicit_view, + &state.per_relay_parent, + id, + true, + ) + .await; let per_relay_parent = match state.per_relay_parent.get_mut(&candidate_relay_parent) { Some(per_relay_parent) => per_relay_parent, @@ -671,14 +679,15 @@ fn list_of_backing_validators_in_view( backing_validators.into_iter().collect() } -/// Updates a set of connected validators based on their advertisement-bits -/// in a validators buffer. +/// Connect or disconnect to/from all backers at all viable relay parents. #[overseer::contextbounds(CollatorProtocol, prefix = self::overseer)] -async fn connect_to_validators( +async fn update_validator_connections( ctx: &mut Context, + peer_ids: &HashMap>, implicit_view: &Option, per_relay_parent: &HashMap, para_id: ParaId, + connect: bool, ) { let cores_assigned = has_assigned_cores(implicit_view, per_relay_parent); // If no cores are assigned to the para, we still need to send a ConnectToValidators request to @@ -690,22 +699,36 @@ async fn connect_to_validators( Vec::new() }; - gum::trace!( - target: LOG_TARGET, - ?cores_assigned, - "Sending connection request to validators: {:?}", - validator_ids, - ); - // ignore address resolution failure // will reissue a new request on new collation let (failed, _) = oneshot::channel(); - ctx.send_message(NetworkBridgeTxMessage::ConnectToValidators { - validator_ids, - peer_set: PeerSet::Collation, - failed, - }) - .await; + + let msg = if connect { + gum::trace!( + target: LOG_TARGET, + ?cores_assigned, + "Sending connection request to validators: {:?}", + validator_ids, + ); + NetworkBridgeTxMessage::ConnectToValidators { + validator_ids, + peer_set: PeerSet::Collation, + failed, + } + } else { + // Get all connected peer_ids on the Collation peer set. + let connected_validator_peer_ids: Vec<_> = peer_ids.keys().cloned().collect(); + + gum::trace!( + target: LOG_TARGET, + ?cores_assigned, + "Disconnecting from validators: {:?}", + connected_validator_peer_ids, + ); + NetworkBridgeTxMessage::DisconnectPeers(connected_validator_peer_ids, PeerSet::Collation) + }; + + ctx.send_message(msg).await; } /// Advertise collation to the given `peer`. @@ -810,8 +833,15 @@ async fn process_msg( state.connect_to_backers = true; if let Some(para_id) = state.collating_on { - connect_to_validators(ctx, &state.implicit_view, &state.per_relay_parent, para_id) - .await; + update_validator_connections( + ctx, + &state.peer_ids, + &state.implicit_view, + &state.per_relay_parent, + para_id, + state.connect_to_backers, + ) + .await; } }, DisconnectFromBackingGroups => { @@ -822,6 +852,18 @@ async fn process_msg( "Received DisconnectFromBackingGroups message." ); state.connect_to_backers = false; + + if let Some(para_id) = state.collating_on { + update_validator_connections( + ctx, + &state.peer_ids, + &state.implicit_view, + &state.per_relay_parent, + para_id, + state.connect_to_backers, + ) + .await; + } }, CollateOn(id) => { state.collating_on = Some(id); @@ -1297,15 +1339,15 @@ async fn handle_network_msg( handle_our_view_change(ctx, runtime, state, view).await?; // Connect only if we are collating on a para. if let Some(para_id) = state.collating_on { - if state.connect_to_backers { - connect_to_validators( - ctx, - &state.implicit_view, - &state.per_relay_parent, - para_id, - ) - .await; - } + update_validator_connections( + ctx, + &state.peer_ids, + &state.implicit_view, + &state.per_relay_parent, + para_id, + state.connect_to_backers, + ) + .await; } }, PeerMessage(remote, msg) => { @@ -1794,12 +1836,17 @@ async fn run_inner( ); } _ = reconnect_timeout => { - // Connect only if we are collating on a para. if let Some(para_id) = state.collating_on { - if state.connect_to_backers { - connect_to_validators(&mut ctx, &state.implicit_view, &state.per_relay_parent, para_id).await; - } + update_validator_connections( + &mut ctx, + &state.peer_ids, + &state.implicit_view, + &state.per_relay_parent, + para_id, + state.connect_to_backers, + ) + .await; } gum::trace!( From 597a50c523ecfa8daada7612c1e0fe153ba02fbd Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 13:31:24 +0300 Subject: [PATCH 18/37] fix some stuff Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 10 +++------- 1 file changed, 3 insertions(+), 7 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 1d9ff1abf4e9b..a65adfd6ec940 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -38,12 +38,13 @@ use cumulus_primitives_core::{ use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::{Collation, MaybeCompressedPoV}; +use polkadot_node_subsystem::messages::CollatorProtocolMessage; use polkadot_primitives::{Header as PHeader, Id as ParaId}; use crate::collators::RelayParentData; use futures::prelude::*; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction}; -use sc_consensus_aura::standalone as aura_internal; +use sc_consensus_aura::{standalone as aura_internal, CompatibleDigestItem}; use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_consensus::BlockOrigin; @@ -455,19 +456,14 @@ pub async fn collator_protocol_helper( Client: sc_client_api::HeaderBackend + Send + Sync - + sc_client_api::BlockBackend - + sc_client_api::BlockchainEvents + ProvideRuntimeApi + + sc_client_api::BlockchainEvents + 'static, Client::Api: AuraApi, P: sp_core::Pair + Send + Sync, P::Public: Codec, Spawner: sp_core::traits::SpawnNamed, { - use polkadot_node_subsystem::messages::CollatorProtocolMessage; - use sc_consensus_aura::CompatibleDigestItem; - use sp_runtime::DigestItem; - let mut import_notifications = client.import_notification_stream().fuse(); tracing::debug!(target: crate::LOG_TARGET, "Started collator protocol helper"); From bf35a8e661894e777260d3bf845c6dd3fdcdae0c Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 18:04:05 +0300 Subject: [PATCH 19/37] add some tests for connect/disconnect Signed-off-by: Andrei Sandu --- Cargo.lock | 1 + .../node/network/collator-protocol/Cargo.toml | 2 + .../src/collator_side/mod.rs | 4 + .../src/collator_side/tests/mod.rs | 260 +++++++++++++++++- 4 files changed, 266 insertions(+), 1 deletion(-) diff --git a/Cargo.lock b/Cargo.lock index acc5a141b6e28..1bede150f60e7 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -15048,6 +15048,7 @@ dependencies = [ "fatality", "futures", "futures-timer", + "itertools 0.11.0", "parity-scale-codec", "polkadot-node-network-protocol", "polkadot-node-primitives", diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index cd9ea08446a1f..ea41065ecb0a6 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -49,6 +49,8 @@ sp-keyring = { workspace = true, default-features = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives-test-helpers = { workspace = true } +itertools = { workspace = true } + [features] default = [] diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 0c151f5e18505..9b78ad7684952 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -719,6 +719,10 @@ async fn update_validator_connections( // Get all connected peer_ids on the Collation peer set. let connected_validator_peer_ids: Vec<_> = peer_ids.keys().cloned().collect(); + if connected_validator_peer_ids.is_empty() { + return + } + gum::trace!( target: LOG_TARGET, ?cores_assigned, diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 9c3423ba176b2..182709c658a05 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -32,13 +32,14 @@ use sp_core::crypto::Pair; use sp_keyring::Sr25519Keyring; use sp_runtime::traits::AppVerify; +use itertools::Itertools; use polkadot_node_network_protocol::{ peer_set::CollationVersion, request_response::{ v2::{CollationFetchingRequest, CollationFetchingResponse}, IncomingRequest, ReqProtocolNames, }, - view, + view, ObservedRole, }; use polkadot_node_primitives::BlockData; use polkadot_node_subsystem::{ @@ -1700,3 +1701,260 @@ fn connect_with_no_cores_assigned() { }, ); } + +#[test] +fn no_connection_without_preconnect_message() { + let test_state = TestState::default(); + let local_peer_id = test_state.local_peer_id; + let collator_pair = test_state.collator_pair.clone(); + + test_harness( + local_peer_id, + collator_pair, + ReputationAggregator::new(|_| true), + |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + let req_cfg = test_harness.req_v2_cfg; + + // NOTE: We intentionally DO NOT send ConnectToBackingGroups here + // to verify that connections are not made without the pre-connect message. + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ) + .await; + + // Update view without expecting any connections (None parameter) + update_view( + None, // No connections should be made + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; + + // Verify that no ConnectToValidators message was sent + // by attempting to receive a message with a short timeout. + // We should either timeout or receive messages that are NOT ConnectToValidators. + let timeout = Duration::from_millis(250); + match overseer_recv_with_timeout(&mut virtual_overseer, timeout).await { + None => { + // Timeout is fine - no messages were sent + }, + Some(msg) => { + // If we received a message, it should NOT be ConnectToValidators + panic!("Unexpected message was sent by subsystem: {:?}", msg); + }, + } + + TestHarness { virtual_overseer, req_v2_cfg: req_cfg } + }, + ); +} + +#[test] +fn distribute_collation_forces_connect() { + let test_state = TestState::default(); + let local_peer_id = test_state.local_peer_id; + let collator_pair = test_state.collator_pair.clone(); + + test_harness( + local_peer_id, + collator_pair, + ReputationAggregator::new(|_| true), + |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + let req_cfg = test_harness.req_v2_cfg; + + // NOTE: We intentionally DO NOT send ConnectToBackingGroups here + // to verify that connections are not made without the pre-connect message. + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ) + .await; + + // Update view without expecting any connections (None parameter) + update_view( + None, // No connections should be made + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; + + // Verify that no ConnectToValidators message was sent + // by attempting to receive a message with a short timeout. + // We should either timeout or receive messages that are NOT ConnectToValidators. + let timeout = Duration::from_millis(250); + match overseer_recv_with_timeout(&mut virtual_overseer, timeout).await { + None => { + // Timeout is fine - no messages were sent + }, + Some(msg) => { + // If we received a message, it should NOT be ConnectToValidators + panic!("Unexpected message was sent by subsystem: {:?}", msg); + }, + } + + // Distribute a collation + let _ = distribute_collation( + &mut virtual_overseer, + test_state.current_group_validator_authority_ids(), + &test_state, + test_state.relay_parent, + ) + .await; + + for (val, peer) in test_state + .current_group_validator_authority_ids() + .into_iter() + .zip(test_state.current_group_validator_peer_ids()) + { + connect_peer(&mut virtual_overseer, peer, CollationVersion::V2, Some(val.clone())) + .await; + } + + // Expect advertisement for the candidate + expect_declare_msg( + &mut virtual_overseer, + &test_state, + &test_state.current_group_validator_peer_ids()[0], + ) + .await; + + TestHarness { virtual_overseer, req_v2_cfg: req_cfg } + }, + ); +} + +#[test] +fn connect_advertise_disconnect_three_backing_groups() { + // Create a test state with 3 non-empty backing groups + let mut test_state = TestState::default(); + let para_id = test_state.para_id; + + // We have 5 validators total (indices 0-4) + // Group 0: validators [0, 1] + // Group 1: validators [2, 3] + // Group 2: validators [4] + test_state.session_info.validator_groups = vec![ + vec![ValidatorIndex(0), ValidatorIndex(1)], + vec![ValidatorIndex(2), ValidatorIndex(3)], + vec![ValidatorIndex(4)], + vec![], + ] + .into_iter() + .collect(); + + // Assign our para_id to 3 cores (0, 1, 2) which will map to 3 groups + test_state.claim_queue.clear(); + test_state.claim_queue.insert(CoreIndex(0), [para_id].into_iter().collect()); + test_state.claim_queue.insert(CoreIndex(1), [para_id].into_iter().collect()); + test_state.claim_queue.insert(CoreIndex(2), [para_id].into_iter().collect()); + test_state.claim_queue.insert(CoreIndex(3), VecDeque::new()); + + let local_peer_id = test_state.local_peer_id; + let collator_pair = test_state.collator_pair.clone(); + + test_harness( + local_peer_id, + collator_pair, + ReputationAggregator::new(|_| true), + |test_harness| async move { + let mut virtual_overseer = test_harness.virtual_overseer; + let req_cfg = test_harness.req_v2_cfg; + + // Send the pre-connect message + overseer_send(&mut virtual_overseer, CollatorProtocolMessage::ConnectToBackingGroups) + .await; + + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::CollateOn(test_state.para_id), + ) + .await; + + // Get validators from all 3 backing groups + let mut expected_validators = Vec::new(); + for core_idx in [0, 1, 2] { + let validators = test_state.validator_authority_ids_for_core(CoreIndex(core_idx)); + expected_validators.extend(validators); + } + + // Remove duplicates while preserving order + let mut seen = std::collections::HashSet::new(); + expected_validators.retain(|v| seen.insert(v.clone())); + + // Verify we're connecting to all 5 validators (from 3 groups) + // Group 0 (Core 0): 2 validators + // Group 1 (Core 1): 2 validators + // Group 2 (Core 2): 1 validator + assert_eq!( + expected_validators.len(), + 5, + "Expected 5 unique validators from 3 backing groups" + ); + + // Update view and expect connections to all validators from all 3 backing groups + update_view( + Some(expected_validators.clone()), + &test_state, + &mut virtual_overseer, + vec![(test_state.relay_parent, 10)], + 1, + ) + .await; + + // Generate NetworkBridgeEvent::PeerConnected messages for each expected validator peer + // Use some random peer ids + let validator_peer_ids: Vec<_> = + (0..expected_validators.len()).map(|_| PeerId::random()).sorted().collect(); + + for (auth_id, peer_id) in expected_validators.iter().zip(validator_peer_ids.iter()) { + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::NetworkBridgeUpdate( + NetworkBridgeEvent::PeerConnected( + *peer_id, + ObservedRole::Authority, + CollationVersion::V2.into(), + Some(HashSet::from([auth_id.clone()])), + ), + ), + ) + .await; + } + + // Expect collation advertisement for each validator + for peer_id in validator_peer_ids.iter() { + expect_declare_msg(&mut virtual_overseer, &test_state, peer_id).await; + } + + // Send the disconnect message + overseer_send( + &mut virtual_overseer, + CollatorProtocolMessage::DisconnectFromBackingGroups, + ) + .await; + + // Expect a DisconnectPeers for all connected peers + assert_matches!( + overseer_recv(&mut virtual_overseer).await, + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::DisconnectPeers( + peer_ids, + PeerSet::Collation, + )) => { + // We should disconnect from all peers we were connected to + assert_eq!(peer_ids.into_iter().sorted().collect::>(), validator_peer_ids, "Expected to disconnect from all validators"); + } + ); + + TestHarness { virtual_overseer, req_v2_cfg: req_cfg } + }, + ); +} From 334fd921f14ce4974919915a91a22efe948f9852 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 18:32:18 +0300 Subject: [PATCH 20/37] fix cargo toml Signed-off-by: Andrei Sandu --- polkadot/node/network/collator-protocol/Cargo.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index ea41065ecb0a6..daa76f3c01111 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -47,9 +47,9 @@ sc-network = { workspace = true, default-features = true } sp-core = { features = ["std"], workspace = true, default-features = true } sp-keyring = { workspace = true, default-features = true } +itertools = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives-test-helpers = { workspace = true } -itertools = { workspace = true } [features] From 9b7b37f904d80b4c71feb592d48ee4add6c10d15 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Mon, 6 Oct 2025 18:34:38 +0300 Subject: [PATCH 21/37] fix prdoc Signed-off-by: Andrei Sandu --- prdoc/pr_9929.prdoc | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/prdoc/pr_9929.prdoc b/prdoc/pr_9929.prdoc index 742172b90bb07..af9dec337b381 100644 --- a/prdoc/pr_9929.prdoc +++ b/prdoc/pr_9929.prdoc @@ -1,9 +1,10 @@ -title: 'Cumulus: pre-connect to backers before own slot' +title: Pre-connect to backers before own slot doc: - audience: Node Dev description: |- Implements a mechanism to pre-connect to backers, see https://github.com/paritytech/polkadot-sdk/issues/9767#issuecomment-3306292493. Improve backing group connectivity. + crates: - name: cumulus-client-service bump: minor From 89fb028e1d2782946c8e2cbbd9d2153e7667d0e0 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 7 Oct 2025 11:40:43 +0300 Subject: [PATCH 22/37] review feedback Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index a65adfd6ec940..3d6795148266d 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -474,10 +474,9 @@ pub async fn collator_protocol_helper( while let Some(notification) = import_notifications.next().await { // Determine if this node is the next author let digest = notification.header.digest(); - let slot = digest - .logs() - .iter() - .find_map(|log| >::as_aura_pre_digest(log)); + + let slot = + digest.convert_first(>::as_aura_pre_digest); tracing::debug!(target: crate::LOG_TARGET, "Imported block with slot: {:?}", slot); @@ -515,6 +514,11 @@ pub async fn collator_protocol_helper( our_slot = None; }, + (Some(_), false) => { + // `last_slot` is `Some` means we alredy sent pre-connect message, no need to + // proceed further. + continue + }, _ => {}, } From cae1c25e1d07cc9e50da624f180dccf466d77632 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 7 Oct 2025 11:42:32 +0300 Subject: [PATCH 23/37] fix comment Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 3d6795148266d..fc02c80cc954d 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -515,7 +515,7 @@ pub async fn collator_protocol_helper( our_slot = None; }, (Some(_), false) => { - // `last_slot` is `Some` means we alredy sent pre-connect message, no need to + // `our_slot` is `Some` means we alredy sent pre-connect message, no need to // proceed further. continue }, From 471b8980e0d36e81ae629ee5d25b53f66f5ce8db Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Tue, 7 Oct 2025 11:48:12 +0300 Subject: [PATCH 24/37] fix test comments Signed-off-by: Andrei Sandu --- .../collator-protocol/src/collator_side/tests/mod.rs | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 182709c658a05..36d1c7820eaae 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -1744,7 +1744,7 @@ fn no_connection_without_preconnect_message() { // Timeout is fine - no messages were sent }, Some(msg) => { - // If we received a message, it should NOT be ConnectToValidators + // No message expected here panic!("Unexpected message was sent by subsystem: {:?}", msg); }, } @@ -1789,14 +1789,14 @@ fn distribute_collation_forces_connect() { // Verify that no ConnectToValidators message was sent // by attempting to receive a message with a short timeout. - // We should either timeout or receive messages that are NOT ConnectToValidators. + // We expect timeout here. let timeout = Duration::from_millis(250); match overseer_recv_with_timeout(&mut virtual_overseer, timeout).await { None => { // Timeout is fine - no messages were sent }, Some(msg) => { - // If we received a message, it should NOT be ConnectToValidators + // No message expected here panic!("Unexpected message was sent by subsystem: {:?}", msg); }, } From b4e6db2681071d9e62702064225f6514c541ae65 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 9 Oct 2025 17:17:47 +0300 Subject: [PATCH 25/37] review Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index fc02c80cc954d..9bba5b69da4b1 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -552,10 +552,8 @@ pub async fn collator_protocol_helper( "send-pre-connect-message", Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), async move { - if pre_connect_delay > 0 { - futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)) - .await; - } + futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)) + .await; tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); From 85c05979847f0fdf172b14002fb51b9ae2cbc648 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 9 Oct 2025 18:07:37 +0300 Subject: [PATCH 26/37] fix validator disconnect Signed-off-by: Andrei Sandu --- .../collator-protocol/src/collator_side/mod.rs | 10 ++++++++-- .../src/collator_side/tests/mod.rs | 16 +++++++++------- 2 files changed, 17 insertions(+), 9 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 9b78ad7684952..57f3361a26d6c 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -716,7 +716,8 @@ async fn update_validator_connections( failed, } } else { - // Get all connected peer_ids on the Collation peer set. + // Get all connected peer_ids on the Collation peer set + // This is just for logging purposes. let connected_validator_peer_ids: Vec<_> = peer_ids.keys().cloned().collect(); if connected_validator_peer_ids.is_empty() { @@ -729,7 +730,12 @@ async fn update_validator_connections( "Disconnecting from validators: {:?}", connected_validator_peer_ids, ); - NetworkBridgeTxMessage::DisconnectPeers(connected_validator_peer_ids, PeerSet::Collation) + + NetworkBridgeTxMessage::ConnectToValidators { + validator_ids: vec![], + peer_set: PeerSet::Collation, + failed, + } }; ctx.send_message(msg).await; diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 36d1c7820eaae..754a8c960f55a 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -1942,15 +1942,17 @@ fn connect_advertise_disconnect_three_backing_groups() { ) .await; - // Expect a DisconnectPeers for all connected peers + // Expect a DisconnectPeers for all connected validators assert_matches!( overseer_recv(&mut virtual_overseer).await, - AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::DisconnectPeers( - peer_ids, - PeerSet::Collation, - )) => { - // We should disconnect from all peers we were connected to - assert_eq!(peer_ids.into_iter().sorted().collect::>(), validator_peer_ids, "Expected to disconnect from all validators"); + AllMessages::NetworkBridgeTx(NetworkBridgeTxMessage::ConnectToValidators{ + validator_ids, + peer_set, + failed: _, + }) => { + // We should disconnect from all validators we were connected to + assert_eq!(validator_ids, vec![], "Expected to disconnect from all validators"); + assert_eq!(peer_set, PeerSet::Collation); } ); From dbcd11afed2fcd49acf0e2ab696be419783b2623 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 9 Oct 2025 20:48:33 +0300 Subject: [PATCH 27/37] make Basti happy Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collator.rs | 139 +----------------- .../consensus/aura/src/collators/lookahead.rs | 71 ++++++--- .../consensus/aura/src/collators/mod.rs | 108 +++++++++++++- .../slot_based/block_builder_task.rs | 54 ++++++- .../aura/src/collators/slot_based/mod.rs | 12 +- .../polkadot-omni-node/lib/src/nodes/aura.rs | 35 +---- 6 files changed, 222 insertions(+), 197 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collator.rs b/cumulus/client/consensus/aura/src/collator.rs index 9bba5b69da4b1..e372162f21332 100644 --- a/cumulus/client/consensus/aura/src/collator.rs +++ b/cumulus/client/consensus/aura/src/collator.rs @@ -38,13 +38,12 @@ use cumulus_primitives_core::{ use cumulus_relay_chain_interface::RelayChainInterface; use polkadot_node_primitives::{Collation, MaybeCompressedPoV}; -use polkadot_node_subsystem::messages::CollatorProtocolMessage; use polkadot_primitives::{Header as PHeader, Id as ParaId}; use crate::collators::RelayParentData; use futures::prelude::*; use sc_consensus::{BlockImport, BlockImportParams, ForkChoiceStrategy, StateAction}; -use sc_consensus_aura::{standalone as aura_internal, CompatibleDigestItem}; +use sc_consensus_aura::standalone as aura_internal; use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_consensus::BlockOrigin; @@ -60,13 +59,6 @@ use sp_state_machine::StorageChanges; use sp_timestamp::Timestamp; use std::{error::Error, time::Duration}; -/// The slot offset to start pre-connecting to backing groups. Represented as number -/// of seconds before own slot starts. -const PRE_CONNECT_SLOT_OFFSET: Duration = Duration::from_secs(6); - -/// Task name for the collator protocol helper. -pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper"; - /// Parameters for instantiating a [`Collator`]. pub struct Params { /// A builder for inherent data builders. @@ -441,132 +433,3 @@ where Ok(block_import_params) } - -/// Task for triggering backing group connections early. -/// -/// This helper monitors block imports and proactively connects to backing groups -/// when the collator's slot is approaching, improving network connectivity. -pub async fn collator_protocol_helper( - client: std::sync::Arc, - keystore: sp_keystore::KeystorePtr, - mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle, - spawn_handle: Spawner, -) where - Block: sp_runtime::traits::Block, - Client: sc_client_api::HeaderBackend - + Send - + Sync - + ProvideRuntimeApi - + sc_client_api::BlockchainEvents - + 'static, - Client::Api: AuraApi, - P: sp_core::Pair + Send + Sync, - P::Public: Codec, - Spawner: sp_core::traits::SpawnNamed, -{ - let mut import_notifications = client.import_notification_stream().fuse(); - - tracing::debug!(target: crate::LOG_TARGET, "Started collator protocol helper"); - - // Our own slot number, known in advance. - let mut our_slot = None; - - while let Some(notification) = import_notifications.next().await { - // Determine if this node is the next author - let digest = notification.header.digest(); - - let slot = - digest.convert_first(>::as_aura_pre_digest); - - tracing::debug!(target: crate::LOG_TARGET, "Imported block with slot: {:?}", slot); - - let Some(slot) = slot else { - continue; - }; - - // Determine current slot duration. - let Ok(slot_duration) = client.runtime_api().slot_duration(notification.header.hash()) - else { - continue; - }; - - let authorities = - client.runtime_api().authorities(notification.header.hash()).unwrap_or_default(); - - // Check if our slot has passed and we are not expected to author again in next slot. - match ( - our_slot, - aura_internal::claim_slot::

(slot + 1, &authorities, &keystore) - .await - .is_none(), - ) { - (Some(last_slot), true) if slot > last_slot => { - tracing::debug!(target: crate::LOG_TARGET, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, slot); - - // Send a message to the collator protocol to stop pre-connecting to backing - // groups - overseer_handle - .send_msg( - CollatorProtocolMessage::DisconnectFromBackingGroups, - "CollatorProtocolHelper", - ) - .await; - - our_slot = None; - }, - (Some(_), false) => { - // `our_slot` is `Some` means we alredy sent pre-connect message, no need to - // proceed further. - continue - }, - _ => {}, - } - - // Check if our slot is coming up next. This means that there is still another slot - // before our turn. - let target_slot = slot + 2; - if aura_internal::claim_slot::

(target_slot, &authorities, &keystore) - .await - .is_none() - { - continue; - } - - tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is due soon", target_slot ); - - // Determine our own slot timestamp. - let Some(own_slot_ts) = target_slot.timestamp(slot_duration) else { - tracing::warn!(target: crate::LOG_TARGET, "Failed to get own slot duration"); - - continue; - }; - - let pre_connect_delay = own_slot_ts - .saturating_sub(*Timestamp::current()) - .saturating_sub(PRE_CONNECT_SLOT_OFFSET.as_millis() as u64); - - tracing::debug!(target: crate::LOG_TARGET, "Pre-connecting to backing groups in {}ms", pre_connect_delay); - - let mut overseer_handle_clone = overseer_handle.clone(); - spawn_handle.spawn( - "send-pre-connect-message", - Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), - async move { - futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)) - .await; - - tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); - - // Send a message to the collator protocol to pre-connect to backing groups - overseer_handle_clone - .send_msg( - CollatorProtocolMessage::ConnectToBackingGroups, - "CollatorProtocolHelper", - ) - .await; - } - .boxed(), - ); - our_slot = Some(target_slot); - } -} diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index 55835ef4dcb8b..ad68893e97550 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -45,7 +45,11 @@ use polkadot_node_subsystem::messages::CollationGenerationMessage; use polkadot_overseer::Handle as OverseerHandle; use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption}; -use crate::{collator as collator_util, collators::claim_queue_at, export_pov_to_path}; +use crate::{ + collator as collator_util, + collators::{claim_queue_at, collator_protocol_helper}, + export_pov_to_path, +}; use futures::prelude::*; use sc_client_api::{backend::AuxStore, BlockBackend, BlockOf}; use sc_consensus::BlockImport; @@ -53,14 +57,14 @@ use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; use sp_consensus_aura::{AuraApi, Slot}; -use sp_core::crypto::Pair; +use sp_core::{crypto::Pair, traits::SpawnNamed}; use sp_inherents::CreateInherentDataProviders; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member}; use std::{path::PathBuf, sync::Arc, time::Duration}; /// Parameters for [`run`]. -pub struct Params { +pub struct Params { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this /// collator. @@ -96,11 +100,13 @@ pub struct Params { /// The maximum percentage of the maximum PoV size that the collator can use. /// It will be removed once is fixed. pub max_pov_percentage: Option, + /// Spawner for spawning tasks. + pub spawner: Spawner, } /// Run async-backing-friendly Aura. -pub fn run( - params: Params, +pub fn run( + params: Params, ) -> impl Future + Send + 'static where Block: BlockT, @@ -122,17 +128,21 @@ where Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, - P: Pair, + Spawner: SpawnNamed + Clone + 'static, + P: Pair + Send + Sync + 'static, P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, { - run_with_export::<_, P, _, _, _, _, _, _, _, _>(ParamsWithExport { params, export_pov: None }) + run_with_export::<_, P, _, _, _, _, _, _, _, _, _>(ParamsWithExport { + params, + export_pov: None, + }) } /// Parameters for [`run_with_export`]. -pub struct ParamsWithExport { +pub struct ParamsWithExport { /// The parameters. - pub params: Params, + pub params: Params, /// When set, the collator will export every produced `POV` to this folder. pub export_pov: Option, @@ -142,7 +152,7 @@ pub struct ParamsWithExport( +pub fn run_with_export( ParamsWithExport { mut params, export_pov }: ParamsWithExport< BI, CIDP, @@ -152,6 +162,7 @@ pub fn run_with_export, ) -> impl Future + Send + 'static where @@ -174,7 +185,8 @@ where Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, - P: Pair, + Spawner: SpawnNamed + Clone + 'static, + P: Pair + Send + Sync + 'static, P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, { @@ -215,6 +227,8 @@ where collator_util::Collator::::new(params) }; + let mut our_slot = None; + while let Some(relay_parent_header) = import_notifications.next().await { let relay_parent = relay_parent_header.hash(); @@ -290,14 +304,18 @@ where relay_chain_slot_duration = ?params.relay_chain_slot_duration, "Adjusted relay-chain slot to parachain slot" ); - Some(super::can_build_upon::<_, _, P>( + Some(( slot_now, - relay_slot, - timestamp, - block_hash, - included_block.hash(), - para_client, - &keystore, + slot_duration, + super::can_build_upon::<_, _, P>( + slot_now, + relay_slot, + timestamp, + block_hash, + included_block.hash(), + para_client, + &keystore, + ), )) }; @@ -316,8 +334,21 @@ where // scheduled chains this ensures that the backlog will grow steadily. for n_built in 0..2 { let slot_claim = match can_build_upon(parent_hash) { - Some(fut) => match fut.await { - None => break, + Some((current_slot, slot_duration, fut)) => match fut.await { + None => { + our_slot = collator_protocol_helper::<_, _, P, _>( + params.para_client.clone(), + params.keystore.clone(), + params.overseer_handle.clone(), + params.spawner.clone(), + parent_hash, + slot_duration, + current_slot, + our_slot, + ) + .await; + break + }, Some(c) => c, }, None => break, diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index 2ce38d0c07b68..4e1c14a42ad6d 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -27,7 +27,8 @@ use cumulus_client_consensus_common::{self as consensus_common, ParentSearchPara use cumulus_primitives_aura::{AuraUnincludedSegmentApi, Slot}; use cumulus_primitives_core::{relay_chain::Header as RelayHeader, BlockT}; use cumulus_relay_chain_interface::RelayChainInterface; -use polkadot_node_subsystem::messages::RuntimeApiRequest; +use futures::prelude::*; +use polkadot_node_subsystem::messages::{CollatorProtocolMessage, RuntimeApiRequest}; use polkadot_node_subsystem_util::runtime::ClaimQueueSnapshot; use polkadot_primitives::{ Hash as RelayHash, Id as ParaId, OccupiedCoreAssumption, ValidationCodeHash, @@ -35,9 +36,11 @@ use polkadot_primitives::{ }; use sc_consensus_aura::{standalone as aura_internal, AuraApi}; use sp_api::{ApiExt, ProvideRuntimeApi, RuntimeApiInfo}; +use sp_consensus_aura::SlotDuration; use sp_core::Pair; use sp_keystore::KeystorePtr; use sp_timestamp::Timestamp; +use std::time::Duration; pub mod basic; pub mod lookahead; @@ -52,6 +55,109 @@ pub mod slot_based; // sanity check. const PARENT_SEARCH_DEPTH: usize = 30; +/// The slot offset to start pre-connecting to backing groups. Represented as number +/// of seconds before own slot starts. +const PRE_CONNECT_SLOT_OFFSET: Duration = Duration::from_secs(6); + +/// Task name for the collator protocol helper. +pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper"; + +/// Helper for triggering backing group connections early. +/// +/// Returns the updated `our_slot` value. +pub async fn collator_protocol_helper( + client: std::sync::Arc, + keystore: sp_keystore::KeystorePtr, + mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle, + spawn_handle: Spawner, + best_block: Block::Hash, + slot_duration: SlotDuration, + current_slot: Slot, + our_slot: Option, +) -> Option +where + Block: sp_runtime::traits::Block, + Client: sc_client_api::HeaderBackend + Send + Sync + ProvideRuntimeApi + 'static, + Client::Api: AuraApi, + P: sp_core::Pair + Send + Sync, + P::Public: Codec, + Spawner: sp_core::traits::SpawnNamed, +{ + let authorities = client.runtime_api().authorities(best_block).unwrap_or_default(); + + // Check if our slot has passed and we are not expected to author again in next slot. + match ( + our_slot, + aura_internal::claim_slot::

(current_slot + 1, &authorities, &keystore) + .await + .is_none(), + ) { + (Some(last_slot), true) if current_slot > last_slot => { + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} has passed, current slot is {}, sending disconnect message", last_slot, current_slot); + + // Send a message to the collator protocol to stop pre-connecting to backing + // groups + overseer_handle + .send_msg( + CollatorProtocolMessage::DisconnectFromBackingGroups, + "CollatorProtocolHelper", + ) + .await; + + return None; + }, + (Some(_), false) => { + // `our_slot` is `Some` means we alredy sent pre-connect message, no need to + // proceed further. + return our_slot + }, + _ => {}, + } + + // Check if our slot is coming up next. This means that there is still another slot + // before our turn. + let target_slot = current_slot + 2; + if aura_internal::claim_slot::

(target_slot, &authorities, &keystore) + .await + .is_none() + { + return our_slot + } + + tracing::debug!(target: crate::LOG_TARGET, "Our slot {} is due soon", target_slot ); + + // Determine our own slot timestamp. + let Some(own_slot_ts) = target_slot.timestamp(slot_duration) else { + tracing::warn!(target: crate::LOG_TARGET, "Failed to get own slot timestamp"); + + return our_slot; + }; + + let pre_connect_delay = own_slot_ts + .saturating_sub(*Timestamp::current()) + .saturating_sub(PRE_CONNECT_SLOT_OFFSET.as_millis() as u64); + + tracing::debug!(target: crate::LOG_TARGET, "Pre-connecting to backing groups in {}ms", pre_connect_delay); + + let mut overseer_handle_clone = overseer_handle.clone(); + spawn_handle.spawn( + "send-pre-connect-message", + Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), + async move { + futures_timer::Delay::new(std::time::Duration::from_millis(pre_connect_delay)).await; + + tracing::debug!(target: crate::LOG_TARGET, "Sending pre-connect message"); + + // Send a message to the collator protocol to pre-connect to backing groups + overseer_handle_clone + .send_msg(CollatorProtocolMessage::ConnectToBackingGroups, "CollatorProtocolHelper") + .await; + } + .boxed(), + ); + Some(target_slot) +} + /// Check the `local_validation_code_hash` against the validation code hash in the relay chain /// state. /// diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index e6360a3c8408e..bb2260e94fc57 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -21,7 +21,7 @@ use super::CollatorMessage; use crate::{ collator as collator_util, collators::{ - check_validation_code_or_log, + check_validation_code_or_log, collator_protocol_helper, slot_based::{ relay_chain_data_cache::{RelayChainData, RelayChainDataCache}, slot_timer::{SlotInfo, SlotTimer}, @@ -50,7 +50,7 @@ use sp_api::ProvideRuntimeApi; use sp_application_crypto::AppPublic; use sp_blockchain::HeaderBackend; use sp_consensus_aura::AuraApi; -use sp_core::crypto::Pair; +use sp_core::{crypto::Pair, traits::SpawnNamed}; use sp_inherents::CreateInherentDataProviders; use sp_keystore::KeystorePtr; use sp_runtime::traits::{Block as BlockT, Header as HeaderT, Member, Zero}; @@ -67,6 +67,7 @@ pub struct BuilderTaskParams< CHP, Proposer, CS, + Spawner, > { /// Inherent data providers. Only non-consensus inherent data should be provided, i.e. /// the timestamp, slot, and paras inherents should be omitted, as they are set by this @@ -107,11 +108,38 @@ pub struct BuilderTaskParams< /// The maximum percentage of the maximum PoV size that the collator can use. /// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed. pub max_pov_percentage: Option, + /// Spawner for spawning tasks. + pub spawn_handle: Spawner, + /// Handle to the overseer for sending messages. + pub overseer_handle: cumulus_relay_chain_interface::OverseerHandle, } /// Run block-builder. -pub fn run_block_builder( - params: BuilderTaskParams, +pub fn run_block_builder< + Block, + P, + BI, + CIDP, + Client, + Backend, + RelayClient, + CHP, + Proposer, + CS, + Spawner, +>( + params: BuilderTaskParams< + Block, + BI, + CIDP, + Client, + Backend, + RelayClient, + CHP, + Proposer, + CS, + Spawner, + >, ) -> impl Future + Send + 'static where Block: BlockT, @@ -134,9 +162,10 @@ where Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, - P: Pair, + P: Pair + Send + Sync + 'static, P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, + Spawner: SpawnNamed + Clone + 'static, { async move { tracing::info!(target: LOG_TARGET, "Starting slot-based block-builder task."); @@ -156,6 +185,8 @@ where para_backend, slot_offset, max_pov_percentage, + spawn_handle, + overseer_handle, } = params; let mut slot_timer = SlotTimer::<_, _, P>::new_with_offset( @@ -179,7 +210,7 @@ where }; let mut relay_chain_data_cache = RelayChainDataCache::new(relay_client.clone(), para_id); - + let mut our_slot = None; loop { // We wait here until the next slot arrives. if slot_timer.wait_until_next_slot().await.is_err() { @@ -319,6 +350,17 @@ where slot = ?para_slot.slot, "Not building block." ); + our_slot = collator_protocol_helper::<_, _, P, _>( + para_client.clone(), + keystore.clone(), + overseer_handle.clone(), + spawn_handle.clone(), + best_hash, + para_slot_duration, + para_slot.slot, + our_slot, + ) + .await; continue }, }; diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs index c939fb8d1275a..fb09079d024d5 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/mod.rs @@ -172,10 +172,10 @@ pub fn run + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + Clone + 'static, CHP: consensus_common::ValidationCodeHashProvider + Send + 'static, - P: Pair + 'static, + P: Pair + Send + Sync + 'static, P::Public: AppPublic + Member + Codec, P::Signature: TryFrom> + Member + Codec, - Spawner: SpawnNamed, + Spawner: SpawnNamed + Clone + 'static, { let Params { create_inherent_data_providers, @@ -213,6 +213,10 @@ pub fn run(collator_task_params); + let overseer_handle = relay_client + .overseer_handle() + .expect("Relay chain interface should provide overseer handle"); + let block_builder_params = block_builder_task::BuilderTaskParams { create_inherent_data_providers, block_import, @@ -229,10 +233,12 @@ pub fn run(block_builder_params); + run_block_builder::(block_builder_params); spawner.spawn_blocking( "slot-based-block-builder", diff --git a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs index 6ec7267eb3e06..70d4084a28992 100644 --- a/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs +++ b/cumulus/polkadot-omni-node/lib/src/nodes/aura.rs @@ -39,7 +39,6 @@ use cumulus_client_consensus_aura::collators::slot_based::{ self as slot_based, Params as SlotBasedParams, }; use cumulus_client_consensus_aura::{ - collator::{collator_protocol_helper, COLLATOR_PROTOCOL_HELPER_TASK_GROUP}, collators::{ lookahead::{self as aura, Params as AuraParams}, slot_based::{SlotBasedBlockImport, SlotBasedBlockImportHandle}, @@ -250,7 +249,8 @@ impl, RuntimeApi, AuraId> where RuntimeApi: ConstructNodeRuntimeApi>, RuntimeApi::RuntimeApi: AuraRuntimeApi, - AuraId: AuraIdT + Sync, + AuraId: AuraIdT + Sync + Send, + ::Pair: Send + Sync, { #[docify::export_content] fn launch_slot_based_collator( @@ -279,7 +279,7 @@ where CHP: cumulus_client_consensus_common::ValidationCodeHashProvider + Send + 'static, Proposer: ProposerInterface + Send + Sync + 'static, CS: CollatorServiceInterface + Send + Sync + Clone + 'static, - Spawner: SpawnNamed, + Spawner: SpawnNamed + Clone + 'static, { slot_based::run::::Pair, _, _, _, _, _, _, _, _, _>( params_with_export, @@ -323,7 +323,7 @@ where relay_chain_slot_duration: Duration, para_id: ParaId, collator_key: CollatorPair, - overseer_handle: OverseerHandle, + _overseer_handle: OverseerHandle, announce_block: Arc>) + Send + Sync>, backend: Arc>, node_extra_args: NodeExtraArgs, @@ -344,18 +344,6 @@ where client.clone(), ); - // Spawn the collator protocol helper task - task_manager.spawn_essential_handle().spawn( - COLLATOR_PROTOCOL_HELPER_TASK_GROUP, - Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), - collator_protocol_helper::<_, _, ::Pair, _>( - client.clone(), - keystore.clone(), - overseer_handle, - task_manager.spawn_handle(), - ), - ); - let client_for_aura = client.clone(); let params = SlotBasedParams { create_inherent_data_providers: move |_, ()| async move { Ok(()) }, @@ -481,18 +469,6 @@ where client.clone(), ); - // Spawn the collator protocol helper task - task_manager.spawn_essential_handle().spawn( - COLLATOR_PROTOCOL_HELPER_TASK_GROUP, - Some(COLLATOR_PROTOCOL_HELPER_TASK_GROUP), - collator_protocol_helper::<_, _, ::Pair, _>( - client.clone(), - keystore.clone(), - overseer_handle.clone(), - task_manager.spawn_handle(), - ), - ); - let params = aura::ParamsWithExport { export_pov: node_extra_args.export_pov, params: AuraParams { @@ -517,12 +493,13 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, max_pov_percentage: node_extra_args.max_pov_percentage, + spawner: task_manager.spawn_handle(), }, }; let fut = async move { wait_for_aura(client).await; - aura::run_with_export::::Pair, _, _, _, _, _, _, _, _>( + aura::run_with_export::::Pair, _, _, _, _, _, _, _, _, _>( params, ) .await; From d3afbebd6868150d397968194fc658f32606c6c0 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Thu, 9 Oct 2025 20:51:22 +0300 Subject: [PATCH 28/37] fix comments Signed-off-by: Andrei Sandu --- .../node/network/collator-protocol/src/collator_side/mod.rs | 4 ---- 1 file changed, 4 deletions(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index 57f3361a26d6c..1ade4528c75da 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -834,8 +834,6 @@ async fn process_msg( match msg { ConnectToBackingGroups => { - // This message is used to instruct the collator to pre-connect to backing groups. - // For now, we do not need to take any action here. gum::debug!( target: LOG_TARGET, "Received PreConnectToBackingGroups message." @@ -855,8 +853,6 @@ async fn process_msg( } }, DisconnectFromBackingGroups => { - // This message is used to instruct the collator to disconnect from backing groups. - // For now, we do not need to take any action here. gum::debug!( target: LOG_TARGET, "Received DisconnectFromBackingGroups message." From d4b17d9414406c2eebe50aefbd06c2477be8229e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 11:19:16 +0300 Subject: [PATCH 29/37] fix cumulus-test-service Signed-off-by: Andrei Sandu --- cumulus/test/service/src/lib.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cumulus/test/service/src/lib.rs b/cumulus/test/service/src/lib.rs index d0d2325c21b6c..cd57f9b1bb188 100644 --- a/cumulus/test/service/src/lib.rs +++ b/cumulus/test/service/src/lib.rs @@ -495,9 +495,10 @@ where authoring_duration: Duration::from_millis(2000), reinitialize: false, max_pov_percentage: None, + spawner: task_manager.spawn_handle(), }; - let fut = aura::run::(params); + let fut = aura::run::(params); task_manager.spawn_essential_handle().spawn("aura", None, fut); } } From 920a015072050773692c8ba31523f349f102b82e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 11:48:05 +0300 Subject: [PATCH 30/37] fix node tenplate Signed-off-by: Andrei Sandu --- templates/parachain/node/src/service.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/templates/parachain/node/src/service.rs b/templates/parachain/node/src/service.rs index 4ce1bba376e8d..ad67f97d6bc57 100644 --- a/templates/parachain/node/src/service.rs +++ b/templates/parachain/node/src/service.rs @@ -219,10 +219,12 @@ fn start_consensus( authoring_duration: Duration::from_millis(2000), reinitialize: false, max_pov_percentage: None, + spawner: task_manager.spawn_handle(), }; - let fut = aura::run::( - params, - ); + let fut = + aura::run::( + params, + ); task_manager.spawn_essential_handle().spawn("aura", None, fut); Ok(()) From ff8f2c79a87160fedabe75e5ac8b2e059f645844 Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Fri, 10 Oct 2025 11:49:43 +0300 Subject: [PATCH 31/37] Update polkadot/node/network/collator-protocol/Cargo.toml Co-authored-by: Alexandru Gheorghe <49718502+alexggh@users.noreply.github.com> --- polkadot/node/network/collator-protocol/Cargo.toml | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/network/collator-protocol/Cargo.toml b/polkadot/node/network/collator-protocol/Cargo.toml index daa76f3c01111..92592045c0e44 100644 --- a/polkadot/node/network/collator-protocol/Cargo.toml +++ b/polkadot/node/network/collator-protocol/Cargo.toml @@ -51,7 +51,6 @@ itertools = { workspace = true } polkadot-node-subsystem-test-helpers = { workspace = true } polkadot-primitives-test-helpers = { workspace = true } - [features] default = [] experimental-collator-protocol = ["async-trait", "tokio"] From 339e80ebcac0dc66ce6207a03cb81cf77ec72c5f Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 11:53:03 +0300 Subject: [PATCH 32/37] to much spawning Signed-off-by: Andrei Sandu --- cumulus/client/consensus/aura/src/collators/lookahead.rs | 2 +- .../aura/src/collators/slot_based/block_builder_task.rs | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index ad68893e97550..a99c3fb9801f1 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -100,7 +100,7 @@ pub struct Params is fixed. pub max_pov_percentage: Option, - /// Spawner for spawning tasks. + /// Required for pre-connecting to backing groups task. pub spawner: Spawner, } diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index bb2260e94fc57..192cb4740d77b 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -108,7 +108,7 @@ pub struct BuilderTaskParams< /// The maximum percentage of the maximum PoV size that the collator can use. /// It will be removed once https://github.com/paritytech/polkadot-sdk/issues/6020 is fixed. pub max_pov_percentage: Option, - /// Spawner for spawning tasks. + /// Required for pre-connecting to backing groups task. pub spawn_handle: Spawner, /// Handle to the overseer for sending messages. pub overseer_handle: cumulus_relay_chain_interface::OverseerHandle, From fb0e4635f6ce852ceffca035f222edecb8a5d67e Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 12:19:03 +0300 Subject: [PATCH 33/37] review Signed-off-by: Andrei Sandu --- .../consensus/aura/src/collators/lookahead.rs | 12 ++++---- .../consensus/aura/src/collators/mod.rs | 12 ++++---- .../slot_based/block_builder_task.rs | 30 +++++++++---------- 3 files changed, 27 insertions(+), 27 deletions(-) diff --git a/cumulus/client/consensus/aura/src/collators/lookahead.rs b/cumulus/client/consensus/aura/src/collators/lookahead.rs index a99c3fb9801f1..bcc50e8078c41 100644 --- a/cumulus/client/consensus/aura/src/collators/lookahead.rs +++ b/cumulus/client/consensus/aura/src/collators/lookahead.rs @@ -47,7 +47,7 @@ use polkadot_primitives::{CollatorPair, Id as ParaId, OccupiedCoreAssumption}; use crate::{ collator as collator_util, - collators::{claim_queue_at, collator_protocol_helper}, + collators::{claim_queue_at, update_backing_group_connections}, export_pov_to_path, }; use futures::prelude::*; @@ -336,11 +336,11 @@ where let slot_claim = match can_build_upon(parent_hash) { Some((current_slot, slot_duration, fut)) => match fut.await { None => { - our_slot = collator_protocol_helper::<_, _, P, _>( - params.para_client.clone(), - params.keystore.clone(), - params.overseer_handle.clone(), - params.spawner.clone(), + our_slot = update_backing_group_connections::<_, _, P, _>( + ¶ms.para_client, + ¶ms.keystore, + &mut params.overseer_handle.clone(), + ¶ms.spawner, parent_hash, slot_duration, current_slot, diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index 4e1c14a42ad6d..292f0dcf5bdc0 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -65,11 +65,11 @@ pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper" /// Helper for triggering backing group connections early. /// /// Returns the updated `our_slot` value. -pub async fn collator_protocol_helper( - client: std::sync::Arc, - keystore: sp_keystore::KeystorePtr, - mut overseer_handle: cumulus_relay_chain_interface::OverseerHandle, - spawn_handle: Spawner, +pub async fn update_backing_group_connections( + client: &std::sync::Arc, + keystore: &sp_keystore::KeystorePtr, + overseer_handle: &mut cumulus_relay_chain_interface::OverseerHandle, + spawn_handle: &Spawner, best_block: Block::Hash, slot_duration: SlotDuration, current_slot: Slot, @@ -81,7 +81,7 @@ where Client::Api: AuraApi, P: sp_core::Pair + Send + Sync, P::Public: Codec, - Spawner: sp_core::traits::SpawnNamed, + Spawner: sp_core::traits::SpawnNamed + Clone, { let authorities = client.runtime_api().authorities(best_block).unwrap_or_default(); diff --git a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs index 192cb4740d77b..ec992bc956d6b 100644 --- a/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs +++ b/cumulus/client/consensus/aura/src/collators/slot_based/block_builder_task.rs @@ -21,12 +21,12 @@ use super::CollatorMessage; use crate::{ collator as collator_util, collators::{ - check_validation_code_or_log, collator_protocol_helper, + check_validation_code_or_log, slot_based::{ relay_chain_data_cache::{RelayChainData, RelayChainDataCache}, slot_timer::{SlotInfo, SlotTimer}, }, - RelayParentData, + update_backing_group_connections, RelayParentData, }, LOG_TARGET, }; @@ -340,21 +340,21 @@ where Some(slot) => slot, None => { tracing::debug!( - target: crate::LOG_TARGET, - unincluded_segment_len = parent.depth, - relay_parent = ?relay_parent, - relay_parent_num = %relay_parent_header.number(), - included_hash = ?included_header_hash, - included_num = %included_header.number(), - parent = ?parent_hash, - slot = ?para_slot.slot, + target: crate::LOG_TARGET, + unincluded_segment_len = parent.depth, + relay_parent = ?relay_parent, + relay_parent_num = %relay_parent_header.number(), + included_hash = ?included_header_hash, + included_num = %included_header.number(), + parent = ?parent_hash, + slot = ?para_slot.slot, "Not building block." ); - our_slot = collator_protocol_helper::<_, _, P, _>( - para_client.clone(), - keystore.clone(), - overseer_handle.clone(), - spawn_handle.clone(), + our_slot = update_backing_group_connections::<_, _, P, _>( + ¶_client, + &keystore, + &mut overseer_handle.clone(), + &spawn_handle, best_hash, para_slot_duration, para_slot.slot, From 5261e54f6f3d8cf71aee7065f2bc35abd34d33f4 Mon Sep 17 00:00:00 2001 From: Andrei Sandu Date: Fri, 10 Oct 2025 17:50:23 +0300 Subject: [PATCH 34/37] test comment Signed-off-by: Andrei Sandu --- .../network/collator-protocol/src/collator_side/tests/mod.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs index 754a8c960f55a..38e089dafc5c2 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/tests/mod.rs @@ -1737,7 +1737,6 @@ fn no_connection_without_preconnect_message() { // Verify that no ConnectToValidators message was sent // by attempting to receive a message with a short timeout. - // We should either timeout or receive messages that are NOT ConnectToValidators. let timeout = Duration::from_millis(250); match overseer_recv_with_timeout(&mut virtual_overseer, timeout).await { None => { From 05a498914aed758d6c94829e68e457966214db38 Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Fri, 10 Oct 2025 19:00:56 +0300 Subject: [PATCH 35/37] Update cumulus/client/consensus/aura/src/collators/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- cumulus/client/consensus/aura/src/collators/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index a5ec43bcac242..f08a0d1278150 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -69,7 +69,7 @@ pub const COLLATOR_PROTOCOL_HELPER_TASK_GROUP: &str = "collator-protocol-helper" /// Helper for triggering backing group connections early. /// /// Returns the updated `our_slot` value. -pub async fn update_backing_group_connections( +pub(crate) async fn update_backing_group_connections( client: &std::sync::Arc, keystore: &sp_keystore::KeystorePtr, overseer_handle: &mut cumulus_relay_chain_interface::OverseerHandle, From 762e1d5940936497de77dbfcd27e0542bd521ad5 Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Fri, 10 Oct 2025 19:01:09 +0300 Subject: [PATCH 36/37] Update polkadot/node/network/collator-protocol/src/collator_side/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- .../network/collator-protocol/src/collator_side/mod.rs | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index ad08bd741e514..75fd28fcd7f01 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -774,7 +774,12 @@ async fn update_validator_connections( "Disconnecting from validators: {:?}", connected_validator_peer_ids, ); - + // Disconnect from all connected validators on the `Collation` protocol. + NetworkBridgeTxMessage::ConnectToValidators { + validator_ids: vec![], + peer_set: PeerSet::Collation, + failed, + } NetworkBridgeTxMessage::ConnectToValidators { validator_ids: vec![], peer_set: PeerSet::Collation, From c07e11738dd948159c7af8cefa70a112c87164af Mon Sep 17 00:00:00 2001 From: Andrei Sandu <54316454+sandreim@users.noreply.github.com> Date: Fri, 10 Oct 2025 19:24:34 +0300 Subject: [PATCH 37/37] Update cumulus/client/consensus/aura/src/collators/mod.rs MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Co-authored-by: Bastian Köcher --- cumulus/client/consensus/aura/src/collators/mod.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cumulus/client/consensus/aura/src/collators/mod.rs b/cumulus/client/consensus/aura/src/collators/mod.rs index f08a0d1278150..f854068c0c824 100644 --- a/cumulus/client/consensus/aura/src/collators/mod.rs +++ b/cumulus/client/consensus/aura/src/collators/mod.rs @@ -87,7 +87,7 @@ where P::Public: Codec, Spawner: sp_core::traits::SpawnNamed + Clone, { - let authorities = client.runtime_api().authorities(best_block).unwrap_or_default(); + let authorities = client.runtime_api().authorities(best_block).ok()?; // Check if our slot has passed and we are not expected to author again in next slot. match (