diff --git a/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs index 1781284fbeb71..eae6971edac88 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/metrics.rs @@ -15,7 +15,7 @@ // along with Polkadot. If not, see . use std::{ - collections::{HashMap, HashSet}, + collections::HashMap, time::{Duration, Instant}, }; @@ -231,179 +231,202 @@ pub(crate) const MAX_BACKING_DELAY: BlockNumber = 3; // Paras availability period. In practice, candidates time out in exceptional situations. pub(crate) const MAX_AVAILABILITY_DELAY: BlockNumber = 10; -// Collations are kept in the tracker, until they are included or expired +/// Collations are kept in the tracker, until they are included or expired #[derive(Default)] pub(crate) struct CollationTracker { - // Keep track of collation expiration block number. - expire: HashMap>, - // All un-expired collation entries + /// All un-expired collation entries entries: HashMap, } impl CollationTracker { - // Mark a tracked collation as backed and return the stats. - // After this call, the collation is no longer tracked. To measure - // inclusion time call `track` again with the returned stats. - // - // Block built on top of N is earliest backed at N + 1. - // Returns `None` if the collation is not tracked. + /// Mark a tracked collation as backed. + /// + /// Block built on top of N is earliest backed at N + 1. pub fn collation_backed( &mut self, block_number: BlockNumber, leaf: H256, receipt: CandidateReceipt, - metrics: &Metrics, - ) -> Option { + ) { let head = receipt.descriptor.para_head(); + let Some(entry) = self.entries.get_mut(&head) else { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?head, + "Backed collation not found in tracker", + ); + return; + }; - self.entries.remove(&head).map(|mut entry| { - let para_id = receipt.descriptor.para_id(); - let relay_parent = receipt.descriptor.relay_parent(); - - entry.backed_at = Some(block_number); + if entry.backed().is_some() { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?head, + "Collation already backed in a fork, skipping", + ); + return + } + entry.set_backed_at(block_number); + if let Some(latency) = entry.backed() { // Observe the backing latency since the collation was fetched. let maybe_latency = entry.backed_latency_metric.take().map(|metric| metric.stop_and_record()); - gum::debug!( target: crate::LOG_TARGET_STATS, - latency_blocks = ?entry.backed(), + latency_blocks = ?latency, latency_time = ?maybe_latency, relay_block = ?leaf, - ?relay_parent, - ?para_id, + relay_parent = ?entry.relay_parent, + para_id = ?receipt.descriptor.para_id(), ?head, "A fetched collation was backed on relay chain", ); - - metrics.on_collation_backed( - (block_number.saturating_sub(entry.relay_parent_number)) as f64, - ); - - entry - }) + } } - // Mark a previously backed collation as included and return the stats. - // After this call, the collation is no longer trackable. - // - // Block built on top of N is earliest backed at N + 1. - // Returns `None` if there collation is not in tracker. + /// Mark a previously backed collation as included. + /// + /// Block built on top of N is earliest included at N + 2. pub fn collation_included( &mut self, block_number: BlockNumber, leaf: H256, receipt: CandidateReceipt, - metrics: &Metrics, - ) -> Option { + ) { let head = receipt.descriptor.para_head(); + let Some(entry) = self.entries.get_mut(&head) else { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?head, + "Included collation not found in tracker", + ); + return; + }; - self.entries.remove(&head).map(|mut entry| { - entry.included_at = Some(block_number); - - if let Some(latency) = entry.included() { - metrics.on_collation_included(latency as f64); - - let para_id = receipt.descriptor.para_id(); - let relay_parent = receipt.descriptor.relay_parent(); - - gum::debug!( - target: crate::LOG_TARGET_STATS, - ?latency, - relay_block = ?leaf, - ?relay_parent, - ?para_id, - head = ?receipt.descriptor.para_head(), - "Collation included on relay chain", - ); - } + if entry.included().is_some() { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?head, + "Collation already included in a fork, skipping", + ); + return + } - entry - }) + entry.set_included_at(block_number); + if let Some(latency) = entry.included() { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?latency, + relay_block = ?leaf, + relay_parent = ?entry.relay_parent, + para_id = ?receipt.descriptor.para_id(), + head = ?receipt.descriptor.para_head(), + "Collation included on relay chain", + ); + } } - // Returns all the collations that have expired at `block_number`. + /// Returns all the collations that have expired at `block_number`. pub fn drain_expired(&mut self, block_number: BlockNumber) -> Vec { - let Some(expired) = self.expire.remove(&block_number) else { - // No collations built on all seen relay parents at height `block_number` - return Vec::new() - }; - + let expired = self + .entries + .iter() + .filter_map(|(head, entry)| entry.is_tracking_expired(block_number).then_some(*head)) + .collect::>(); expired .iter() .filter_map(|head| self.entries.remove(head)) .map(|mut entry| { - entry.expired_at = Some(block_number); + entry.set_expired_at(block_number); entry }) .collect::>() } - // Track a collation for a given period of time (TTL). TTL depends - // on the collation state. - // Collation is evicted after it expires. + /// Drain and return all collations that are possibly finalized at `block_number`. + /// + /// We only track the inclusion block number, not the inclusion block hash. + /// There is a small chance that a collation was included in a fork that is not finalized. + pub fn drain_finalized(&mut self, block_number: BlockNumber) -> Vec { + let finalized = self + .entries + .iter() + .filter_map(|(head, entry)| entry.is_possibly_finalized(block_number).then_some(*head)) + .collect::>(); + finalized + .iter() + .filter_map(|head| self.entries.remove(head)) + .collect::>() + } + + /// Track a collation for a given period of time (TTL). TTL depends + /// on the collation state. + /// Collation is evicted after it expires. pub fn track(&mut self, mut stats: CollationStats) { - // Check the state of collation to compute ttl. - let ttl = if stats.fetch_latency().is_none() { - // Disable the fetch timer, to prevent bogus observe on drop. - if let Some(fetch_latency_metric) = stats.fetch_latency_metric.take() { - fetch_latency_metric.stop_and_discard(); - } - // Collation was never fetched, expires ASAP - 0 - } else if stats.backed().is_none() { - MAX_BACKING_DELAY - } else if stats.included().is_none() { - // Set expiration date relative to relay parent block. - stats.backed().unwrap_or_default() + MAX_AVAILABILITY_DELAY - } else { - // If block included no reason to track it. - return - }; + // Disable the fetch timer, to prevent bogus observe on drop. + if let Some(fetch_latency_metric) = stats.fetch_latency_metric.take() { + fetch_latency_metric.stop_and_discard(); + } + + if let Some(entry) = self + .entries + .values() + .find(|entry| entry.relay_parent_number == stats.relay_parent_number) + { + gum::debug!( + target: crate::LOG_TARGET_STATS, + ?stats.relay_parent_number, + ?stats.relay_parent, + entry_relay_parent = ?entry.relay_parent, + "Collation built on a fork", + ); + } - self.expire - .entry(stats.relay_parent_number + ttl) - .and_modify(|heads| { - heads.insert(stats.head); - }) - .or_insert_with(|| HashSet::from_iter(vec![stats.head].into_iter())); self.entries.insert(stats.head, stats); } } -// Information about how collations live their lives. +/// Information about how collations live their lives. pub(crate) struct CollationStats { - // The pre-backing collation status information + /// The pre-backing collation status information pre_backing_status: CollationStatus, - // The block header hash. + /// The block header hash. head: Hash, - // The relay parent on top of which collation was built + /// The relay parent on top of which collation was built relay_parent_number: BlockNumber, - // The expiration block number if expired. + /// The relay parent hash. + relay_parent: Hash, + /// The expiration block number if expired. expired_at: Option, - // The backed block number. + /// The backed block number. backed_at: Option, - // The included block number if backed. + /// The included block number if backed. included_at: Option, - // The collation fetch time. + /// The collation fetch time. fetched_at: Option, - // Advertisement time + /// Advertisement time advertised_at: Instant, - // The collation fetch latency (seconds). + /// The collation fetch latency (seconds). fetch_latency_metric: Option, - // The collation backing latency (seconds). Duration since collation fetched - // until the import of a relay chain block where collation is backed. + /// The collation backing latency (seconds). Duration since collation fetched + /// until the import of a relay chain block where collation is backed. backed_latency_metric: Option, } impl CollationStats { /// Create new empty instance. - pub fn new(head: Hash, relay_parent_number: BlockNumber, metrics: &Metrics) -> Self { + pub fn new( + head: Hash, + relay_parent_number: BlockNumber, + relay_parent: Hash, + metrics: &Metrics, + ) -> Self { Self { pre_backing_status: CollationStatus::Created, head, relay_parent_number, + relay_parent, advertised_at: std::time::Instant::now(), backed_at: None, expired_at: None, @@ -414,6 +437,11 @@ impl CollationStats { } } + /// Returns the hash and number of the relay parent. + pub fn relay_parent(&self) -> (Hash, BlockNumber) { + (self.relay_parent, self.relay_parent_number) + } + /// Returns the age at which the collation expired. pub fn expired(&self) -> Option { let expired_at = self.expired_at?; @@ -449,6 +477,21 @@ impl CollationStats { self.fetched_at = Some(fetched_at); } + /// Set the timestamp at which collation is backed. + pub fn set_backed_at(&mut self, backed_at: BlockNumber) { + self.backed_at = Some(backed_at); + } + + /// Set the timestamp at which collation is included. + pub fn set_included_at(&mut self, included_at: BlockNumber) { + self.included_at = Some(included_at); + } + + /// Set the timestamp at which collation is expired. + pub fn set_expired_at(&mut self, expired_at: BlockNumber) { + self.expired_at = Some(expired_at); + } + /// Sets the pre-backing status of the collation. pub fn set_pre_backing_status(&mut self, status: CollationStatus) { self.pre_backing_status = status; @@ -468,6 +511,56 @@ impl CollationStats { pub fn set_backed_latency_metric(&mut self, timer: Option) { self.backed_latency_metric = timer; } + + /// Returns the time to live for the collation. + pub fn tracking_ttl(&self) -> BlockNumber { + if self.fetch_latency().is_none() { + 0 // Collation was never fetched, expires ASAP + } else if self.backed().is_none() { + MAX_BACKING_DELAY + } else if self.included().is_none() { + self.backed().expect("backed, checked above") + MAX_AVAILABILITY_DELAY + } else { + 0 // If block included no reason to track it. + } + } + + /// Returns the state of the collation at the moment of expiry. + pub fn expiry_state(&self) -> &'static str { + if self.fetch_latency().is_none() { + // If collation was not fetched, we rely on the status provided + // by the collator protocol. + self.pre_backing_status().label() + } else if self.backed().is_none() { + "fetched" + } else if self.included().is_none() { + "backed" + } else { + "none" + } + } + + /// Returns true if the collation is expired. + pub fn is_tracking_expired(&self, current_block: BlockNumber) -> bool { + // Don't expire included collations + if self.included().is_some() { + return false + } + let expiry_block = self.relay_parent_number + self.tracking_ttl(); + expiry_block <= current_block + } + + /// Check if this collation is possibly finalized based on block number. + /// + /// Returns `true` if the collation was included at or before `last_finalized`. + /// + /// We only track the inclusion block number, not the inclusion block hash. + /// There is a small chance that a collation was included in a fork that is not finalized. + pub fn is_possibly_finalized(&self, last_finalized: BlockNumber) -> bool { + self.included_at + .map(|included_at| included_at <= last_finalized) + .unwrap_or_default() + } } impl Drop for CollationStats { diff --git a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs index a9d4116e132ac..13893c7f221f7 100644 --- a/polkadot/node/network/collator-protocol/src/collator_side/mod.rs +++ b/polkadot/node/network/collator-protocol/src/collator_side/mod.rs @@ -39,7 +39,9 @@ use polkadot_node_network_protocol::{ }; use polkadot_node_primitives::{CollationSecondedSignal, PoV, Statement}; use polkadot_node_subsystem::{ - messages::{CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage}, + messages::{ + ChainApiMessage, CollatorProtocolMessage, NetworkBridgeEvent, NetworkBridgeTxMessage, + }, overseer, FromOrchestra, OverseerSignal, }; use polkadot_node_subsystem_util::{ @@ -103,6 +105,9 @@ const MAX_UNSHARED_UPLOAD_TIME: Duration = Duration::from_millis(150); /// Validators are obtained from [`ValidatorGroupsBuffer::validators_to_connect`]. const RECONNECT_AFTER_LEAF_TIMEOUT: Duration = Duration::from_secs(4); +/// Maximum number of parallel requests to send to the Chain API. +const MAX_PARALLEL_CHAIN_API_REQUESTS: usize = 10; + /// Future that when resolved indicates that we should update reserved peer-set /// of validators we want to be connected to. /// @@ -518,7 +523,7 @@ async fn distribute_collation( core_index, stats: per_relay_parent .block_number - .map(|n| CollationStats::new(para_head, n, &state.metrics)), + .map(|n| CollationStats::new(para_head, n, candidate_relay_parent, &state.metrics)), }, ); @@ -1271,7 +1276,6 @@ async fn process_block_events( leaf: Hash, maybe_block_number: Option, para_id: ParaId, - metrics: &Metrics, ) { if let Ok(events) = get_candidate_events(ctx.sender(), leaf).await { let Some(block_number) = maybe_block_number else { @@ -1292,7 +1296,7 @@ async fn process_block_events( if receipt.descriptor.para_id() != para_id { continue } - collation_tracker.collation_included(block_number, leaf, receipt, metrics); + collation_tracker.collation_included(block_number, leaf, receipt); }, CandidateEvent::CandidateBacked(receipt, _, _, _) => { if receipt.descriptor.para_id() != para_id { @@ -1300,14 +1304,7 @@ async fn process_block_events( } let Some(block_number) = maybe_block_number else { continue }; - let Some(stats) = - collation_tracker.collation_backed(block_number, leaf, receipt, metrics) - else { - continue - }; - - // Continue measuring inclusion latency. - collation_tracker.track(stats); + collation_tracker.collation_backed(block_number, leaf, receipt); }, _ => { // do not care about other events @@ -1344,15 +1341,7 @@ async fn handle_our_view_change( .per_relay_parent .insert(*leaf, PerRelayParent::new(para_id, claim_queue, block_number)); - process_block_events( - ctx, - &mut state.collation_tracker, - *leaf, - block_number, - para_id, - &state.metrics, - ) - .await; + process_block_events(ctx, &mut state.collation_tracker, *leaf, block_number, para_id).await; let allowed_ancestry = implicit_view .known_allowed_relay_parents_under(leaf, state.collating_on) .unwrap_or_default(); @@ -1473,6 +1462,9 @@ fn process_out_of_view_collation( collation_tracker.track(stats); } +/// Process collations that were expired +/// +/// Collations no more tracked after this call. fn process_expired_collations( expired_collations: Vec, removed: Hash, @@ -1480,18 +1472,7 @@ fn process_expired_collations( metrics: &Metrics, ) { for expired_collation in expired_collations { - let collation_state = if expired_collation.fetch_latency().is_none() { - // If collation was not fetched, we rely on the status provided - // by the collator protocol. - expired_collation.pre_backing_status().label() - } else if expired_collation.backed().is_none() { - "fetched" - } else if expired_collation.included().is_none() { - "backed" - } else { - "none" - }; - + let collation_state = expired_collation.expiry_state(); let age = expired_collation.expired().unwrap_or_default(); gum::debug!( target: crate::LOG_TARGET_STATS, @@ -1503,10 +1484,93 @@ fn process_expired_collations( "Collation expired", ); + // Report metrics for collations that were expired after being backed. + // The only way this can happen is if they time out availability. + if let Some(latency) = expired_collation.backed() { + metrics.on_collation_backed(latency as f64); + } metrics.on_collation_expired(age as f64, collation_state); } } +/// Process collations that may have been finalized on the relay chain. +/// +/// Collations are no longer tracked after this call. +/// +/// ## Limitations +/// +/// We only track the inclusion block number, not the inclusion block hash. +/// This means a collation included in a fork that was later dropped could still be counted as +/// finalized. This is acceptable for metrics because such cases are rare. +async fn process_possibly_finalized_collations( + collations: Vec, + last_finalized: (Hash, BlockNumber), + sender: &mut impl overseer::SubsystemSender, + metrics: &Metrics, +) { + if collations.is_empty() { + return + } + + let (last_hash, last_number) = last_finalized; + let mut blocks_to_request = collations + .iter() + .map(|stats| stats.relay_parent().1) + .filter(|n| *n != last_number) // No need to request `last_hash` for `last_number` + .collect::>(); + blocks_to_request.sort_unstable(); + blocks_to_request.dedup(); + + gum::debug!(target: LOG_TARGET_STATS, ?blocks_to_request, "Collations possibly finalized on blocks"); + + let mut finalized = vec![(last_hash, last_number)]; + + for chunk in blocks_to_request.as_slice().chunks(MAX_PARALLEL_CHAIN_API_REQUESTS) { + let futures = chunk.iter().map(|&bn| { + let mut sender = sender.clone(); + async move { + let (tx, rx) = oneshot::channel(); + sender.send_message(ChainApiMessage::FinalizedBlockHash(bn, tx)).await; + match rx.await { + Ok(Ok(Some(bh))) => Some((bh, bn)), + _ => { + gum::warn!(target: LOG_TARGET_STATS, block_number = ?bn, "Can't request hash for the finalized block from ChainApi"); + None + }, + } + } + }); + finalized.extend(futures::future::join_all(futures).await.into_iter().flatten()); + } + + for collation in collations { + if !finalized.contains(&collation.relay_parent()) { + // Omit collations built on forks: they're dropped but it's expected + gum::debug!( + target: crate::LOG_TARGET_STATS, + relay_parent = ?collation.relay_parent(), + head = ?collation.head(), + "Collation is built on a fork, skipping", + ); + continue; + } + + // Report metrics for finalized collations + if let Some(latency) = collation.backed() { + metrics.on_collation_backed(latency as f64); + } + if let Some(latency) = collation.included() { + metrics.on_collation_included(latency as f64); + } + gum::debug!( + target: crate::LOG_TARGET_STATS, + relay_parent = ?collation.relay_parent(), + head = ?collation.head(), + "Collation finalized, stop tracking", + ); + } +} + /// The collator protocol collator side main loop. #[overseer::contextbounds(CollatorProtocol, prefix = crate::overseer)] pub(crate) async fn run( @@ -1569,7 +1633,10 @@ async fn run_inner( *reconnect_timeout = futures_timer::Delay::new(RECONNECT_AFTER_LEAF_TIMEOUT).fuse(); } } - FromOrchestra::Signal(BlockFinalized(..)) => {} + FromOrchestra::Signal(BlockFinalized(hash, number)) => { + let possibly_finalized = state.collation_tracker.drain_finalized(number); + process_possibly_finalized_collations(possibly_finalized, (hash, number), ctx.sender(), &metrics).await; + } FromOrchestra::Signal(Conclude) => return Ok(()), }, CollationSendResult { relay_parent, candidate_hash, peer_id, timed_out } = diff --git a/prdoc/pr_9319.prdoc b/prdoc/pr_9319.prdoc new file mode 100644 index 0000000000000..1b329c78f1ec0 --- /dev/null +++ b/prdoc/pr_9319.prdoc @@ -0,0 +1,10 @@ +title: 'Collation metrics: exclude drops of fork-based collations to improve metrics + accuracy' +doc: +- audience: Node Dev + description: |- + Improves the accuracy of the collation metrics excluding drops of fork-based collations expected by design for lookahead collators. + Metrics are now only sent for collations that were either finalized or dropped. +crates: +- name: polkadot-collator-protocol + bump: patch