Skip to content
Merged
Show file tree
Hide file tree
Changes from 13 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
247 changes: 165 additions & 82 deletions polkadot/node/network/collator-protocol/src/collator_side/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
// along with Polkadot. If not, see <http://www.gnu.org/licenses/>.

use std::{
collections::{HashMap, HashSet},
collections::HashMap,
time::{Duration, Instant},
};

Expand Down Expand Up @@ -234,140 +234,152 @@ pub(crate) const MAX_AVAILABILITY_DELAY: BlockNumber = 10;
// Collations are kept in the tracker, until they are included or expired
#[derive(Default)]
pub(crate) struct CollationTracker {
// Keep track of collation expiration block number.
expire: HashMap<BlockNumber, HashSet<Hash>>,
// All un-expired collation entries
entries: HashMap<Hash, CollationStats>,
}

impl CollationTracker {
// Mark a tracked collation as backed and return the stats.
// After this call, the collation is no longer tracked. To measure
// inclusion time call `track` again with the returned stats.
// Mark a tracked collation as backed.
//
// Block built on top of N is earliest backed at N + 1.
// Returns `None` if the collation is not tracked.
pub fn collation_backed(
&mut self,
block_number: BlockNumber,
leaf: H256,
receipt: CandidateReceipt,
metrics: &Metrics,
) -> Option<CollationStats> {
) {
let head = receipt.descriptor.para_head();
let Some(entry) = self.entries.get_mut(&head) else {
gum::debug!(
target: crate::LOG_TARGET_STATS,
?head,
"Backed collation not found in tracker",
);
return;
};

self.entries.remove(&head).map(|mut entry| {
let para_id = receipt.descriptor.para_id();
let relay_parent = receipt.descriptor.relay_parent();

entry.backed_at = Some(block_number);
if entry.backed().is_some() {
gum::debug!(
target: crate::LOG_TARGET_STATS,
?head,
"Collation already backed in a fork, skipping",
);
return
}

entry.set_backed_at(block_number);
if let Some(latency) = entry.backed() {
// Observe the backing latency since the collation was fetched.
let maybe_latency =
entry.backed_latency_metric.take().map(|metric| metric.stop_and_record());

gum::debug!(
target: crate::LOG_TARGET_STATS,
latency_blocks = ?entry.backed(),
latency_blocks = ?latency,
latency_time = ?maybe_latency,
relay_block = ?leaf,
?relay_parent,
?para_id,
relay_parent = ?entry.relay_parent,
para_id = ?receipt.descriptor.para_id(),
?head,
"A fetched collation was backed on relay chain",
);

metrics.on_collation_backed(
(block_number.saturating_sub(entry.relay_parent_number)) as f64,
);

entry
})
}
}

// Mark a previously backed collation as included and return the stats.
// After this call, the collation is no longer trackable.
// Mark a previously backed collation as included.
//
// Block built on top of N is earliest backed at N + 1.
// Returns `None` if there collation is not in tracker.
// Block built on top of N is earliest included at N + 2.
pub fn collation_included(
&mut self,
block_number: BlockNumber,
leaf: H256,
receipt: CandidateReceipt,
metrics: &Metrics,
) -> Option<CollationStats> {
) {
let head = receipt.descriptor.para_head();
let Some(entry) = self.entries.get_mut(&head) else {
gum::debug!(
target: crate::LOG_TARGET_STATS,
?head,
"Included collation not found in tracker",
);
return;
};

self.entries.remove(&head).map(|mut entry| {
entry.included_at = Some(block_number);

if let Some(latency) = entry.included() {
metrics.on_collation_included(latency as f64);

let para_id = receipt.descriptor.para_id();
let relay_parent = receipt.descriptor.relay_parent();

gum::debug!(
target: crate::LOG_TARGET_STATS,
?latency,
relay_block = ?leaf,
?relay_parent,
?para_id,
head = ?receipt.descriptor.para_head(),
"Collation included on relay chain",
);
}
if entry.included().is_some() {
gum::debug!(
target: crate::LOG_TARGET_STATS,
?head,
"Collation already included in a fork, skipping",
);
return
}

entry
})
entry.set_included_at(block_number);
if let Some(latency) = entry.included() {
gum::debug!(
target: crate::LOG_TARGET_STATS,
?latency,
relay_block = ?leaf,
relay_parent = ?entry.relay_parent,
para_id = ?receipt.descriptor.para_id(),
head = ?receipt.descriptor.para_head(),
"Collation included on relay chain",
);
}
}

// Returns all the collations that have expired at `block_number`.
pub fn drain_expired(&mut self, block_number: BlockNumber) -> Vec<CollationStats> {
let Some(expired) = self.expire.remove(&block_number) else {
// No collations built on all seen relay parents at height `block_number`
return Vec::new()
};

let expired = self
.entries
.iter()
.filter_map(|(head, entry)| entry.is_tracking_expired(block_number).then_some(*head))
.collect::<Vec<_>>();
expired
.iter()
.filter_map(|head| self.entries.remove(head))
.map(|mut entry| {
entry.expired_at = Some(block_number);
entry.set_expired_at(block_number);
entry
})
.collect::<Vec<_>>()
}

// Returns all the collations that possibly finalized at `block_number`
pub fn drain_finalized(&mut self, block_number: BlockNumber) -> Vec<CollationStats> {
let finalized = self
.entries
.iter()
.filter_map(|(head, entry)| entry.is_possibly_finalized(block_number).then_some(*head))
.collect::<Vec<_>>();
finalized
.iter()
.filter_map(|head| self.entries.remove(head))
.collect::<Vec<_>>()
}

// Track a collation for a given period of time (TTL). TTL depends
// on the collation state.
// Collation is evicted after it expires.
pub fn track(&mut self, mut stats: CollationStats) {
// Check the state of collation to compute ttl.
let ttl = if stats.fetch_latency().is_none() {
// Disable the fetch timer, to prevent bogus observe on drop.
if let Some(fetch_latency_metric) = stats.fetch_latency_metric.take() {
fetch_latency_metric.stop_and_discard();
}
// Collation was never fetched, expires ASAP
0
} else if stats.backed().is_none() {
MAX_BACKING_DELAY
} else if stats.included().is_none() {
// Set expiration date relative to relay parent block.
stats.backed().unwrap_or_default() + MAX_AVAILABILITY_DELAY
} else {
// If block included no reason to track it.
return
};
// Disable the fetch timer, to prevent bogus observe on drop.
if let Some(fetch_latency_metric) = stats.fetch_latency_metric.take() {
fetch_latency_metric.stop_and_discard();
}

if let Some(entry) = self
.entries
.values()
.find(|entry| entry.relay_parent_number == stats.relay_parent_number)
{
gum::debug!(
target: crate::LOG_TARGET_STATS,
?stats.relay_parent_number,
?stats.relay_parent,
entry_relay_parent = ?entry.relay_parent,
"Collation built on a fork",
);
}

self.expire
.entry(stats.relay_parent_number + ttl)
.and_modify(|heads| {
heads.insert(stats.head);
})
.or_insert_with(|| HashSet::from_iter(vec![stats.head].into_iter()));
self.entries.insert(stats.head, stats);
}
}
Expand All @@ -380,6 +392,8 @@ pub(crate) struct CollationStats {
head: Hash,
// The relay parent on top of which collation was built
relay_parent_number: BlockNumber,
// The relay parent hash.
relay_parent: Hash,
// The expiration block number if expired.
expired_at: Option<BlockNumber>,
// The backed block number.
Expand All @@ -399,11 +413,17 @@ pub(crate) struct CollationStats {

impl CollationStats {
/// Create new empty instance.
pub fn new(head: Hash, relay_parent_number: BlockNumber, metrics: &Metrics) -> Self {
pub fn new(
head: Hash,
relay_parent_number: BlockNumber,
relay_parent: Hash,
metrics: &Metrics,
) -> Self {
Self {
pre_backing_status: CollationStatus::Created,
head,
relay_parent_number,
relay_parent,
advertised_at: std::time::Instant::now(),
backed_at: None,
expired_at: None,
Expand All @@ -414,6 +434,11 @@ impl CollationStats {
}
}

/// Returns the hash and number of the relay parent.
pub fn relay_parent(&self) -> (Hash, BlockNumber) {
(self.relay_parent, self.relay_parent_number)
}

/// Returns the age at which the collation expired.
pub fn expired(&self) -> Option<BlockNumber> {
let expired_at = self.expired_at?;
Expand Down Expand Up @@ -449,6 +474,20 @@ impl CollationStats {
self.fetched_at = Some(fetched_at);
}

/// Set the timestamp at which collation is backed.
pub fn set_backed_at(&mut self, backed_at: BlockNumber) {
self.backed_at = Some(backed_at);
}

/// Set the timestamp at which collation is included.
pub fn set_included_at(&mut self, included_at: BlockNumber) {
self.included_at = Some(included_at);
}

pub fn set_expired_at(&mut self, expired_at: BlockNumber) {
self.expired_at = Some(expired_at);
}

/// Sets the pre-backing status of the collation.
pub fn set_pre_backing_status(&mut self, status: CollationStatus) {
self.pre_backing_status = status;
Expand All @@ -468,6 +507,50 @@ impl CollationStats {
pub fn set_backed_latency_metric(&mut self, timer: Option<HistogramTimer>) {
self.backed_latency_metric = timer;
}

/// Returns the time to live for the collation.
pub fn tracking_ttl(&self) -> BlockNumber {
if self.fetch_latency().is_none() {
0 // Collation was never fetched, expires ASAP
} else if self.backed().is_none() {
MAX_BACKING_DELAY
} else if self.included().is_none() {
self.backed().expect("backed, checked above") + MAX_AVAILABILITY_DELAY
} else {
0 // If block included no reason to track it.
}
}

/// Returns the state of the collation at the moment of expiry.
pub fn expiry_state(&self) -> &'static str {
if self.fetch_latency().is_none() {
// If collation was not fetched, we rely on the status provided
// by the collator protocol.
self.pre_backing_status().label()
} else if self.backed().is_none() {
"fetched"
} else if self.included().is_none() {
"backed"
} else {
"none"
}
}

/// Returns true if the collation is expired.
pub fn is_tracking_expired(&self, current_block: BlockNumber) -> bool {
// Don't expire included collations
if self.included().is_some() {
return false
}
let expiry_block = self.relay_parent_number + self.tracking_ttl();
expiry_block <= current_block
}

pub fn is_possibly_finalized(&self, last_finalized: BlockNumber) -> bool {
self.included_at
.map(|included_at| included_at <= last_finalized)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we see some colation included it doesn't really mean it is finalized, because we don't even know if it is on the fork that is getting finalized. However, this should still work fine, because eventually the collation should be backed/included eventually as it was clearly backed offchain. Only issue is that the relay parent expires.

Please add some docs about the limitations of the measurement we are doing.

.unwrap_or_default()
}
}

impl Drop for CollationStats {
Expand Down
Loading
Loading