diff --git a/consensus/src/simplex/actors/batcher/actor.rs b/consensus/src/simplex/actors/batcher/actor.rs index 0dd5c20830..c2e9d971ff 100644 --- a/consensus/src/simplex/actors/batcher/actor.rs +++ b/consensus/src/simplex/actors/batcher/actor.rs @@ -3,7 +3,7 @@ use crate::{ simplex::{ actors::voter, interesting, - metrics::Inbound, + metrics::{Inbound, Peer}, scheme::Scheme, types::{Activity, Certificate, Vote}, }, @@ -15,12 +15,17 @@ use commonware_macros::select; use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver}; use commonware_runtime::{ spawn_cell, - telemetry::metrics::histogram::{self, Buckets}, + telemetry::metrics::{ + histogram::{self, Buckets}, + status::GaugeExt, + }, Clock, ContextCell, Handle, Metrics, Spawner, }; use commonware_utils::ordered::{Quorum, Set}; use futures::{channel::mpsc, StreamExt}; -use prometheus_client::metrics::{counter::Counter, family::Family, histogram::Histogram}; +use prometheus_client::metrics::{ + counter::Counter, family::Family, gauge::Gauge, histogram::Histogram, +}; use rand_core::CryptoRngCore; use std::{collections::BTreeMap, sync::Arc}; use tracing::{debug, trace, warn}; @@ -49,6 +54,7 @@ pub struct Actor< added: Counter, verified: Counter, inbound_messages: Family, + latest_vote: Family, batch_size: Histogram, verify_latency: histogram::Timed, recover_latency: histogram::Timed, @@ -79,6 +85,15 @@ impl< "number of inbound messages", inbound_messages.clone(), ); + let latest_vote = Family::::default(); + context.register( + "latest_vote", + "view of latest vote received per peer", + latest_vote.clone(), + ); + for participant in cfg.scheme.participants().iter() { + latest_vote.get_or_create(&Peer::new(participant)).set(0); + } context.register( "batch_size", "number of messages in a signature verification batch", @@ -99,12 +114,11 @@ impl< // TODO(#1833): Metrics should use the post-start context let clock = Arc::new(context.clone()); let (sender, receiver) = mpsc::channel(cfg.mailbox_size); - let participants = cfg.scheme.participants().clone(); ( Self { context: ContextCell::new(context), - participants, + participants: cfg.scheme.participants().clone(), scheme: cfg.scheme, blocker: cfg.blocker, @@ -119,6 +133,7 @@ impl< added, verified, inbound_messages, + latest_vote, batch_size, verify_latency: histogram::Timed::new(verify_latency, clock.clone()), recover_latency: histogram::Timed::new(recover_latency, clock), @@ -397,12 +412,19 @@ impl< } // Add the vote to the verifier + let peer = Peer::new(&sender); if work .entry(view) .or_insert_with(|| self.new_round()) .add_network(sender, message) .await { self.added.inc(); + + // Update per-peer latest vote metric (only if higher than current) + let _ = self + .latest_vote + .get_or_create(&peer) + .try_set_max(view.get()); } updated_view = view; }, diff --git a/consensus/src/simplex/actors/batcher/mod.rs b/consensus/src/simplex/actors/batcher/mod.rs index 8cc7a8e1aa..77bb1018e6 100644 --- a/consensus/src/simplex/actors/batcher/mod.rs +++ b/consensus/src/simplex/actors/batcher/mod.rs @@ -1402,4 +1402,238 @@ mod tests { votes_skipped_for_finalized_views(ed25519::fixture); votes_skipped_for_finalized_views(secp256r1::fixture); } + + fn latest_vote_metric_tracking(mut fixture: F) + where + S: Scheme, + F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture, + { + let n = 5; + let quorum_size = quorum(n) as usize; + let namespace = b"batcher_test".to_vec(); + let epoch = Epoch::new(333); + let executor = deterministic::Runner::timed(Duration::from_secs(10)); + executor.start(|mut context| async move { + // Create simulated network + let (network, mut oracle) = Network::new( + context.with_label("network"), + NConfig { + max_size: 1024 * 1024, + disconnect_on_block: true, + tracked_peer_sets: None, + }, + ); + network.start(); + + // Get participants + let Fixture { + participants, + schemes, + .. + } = fixture(&mut context, &namespace, n); + + // Setup reporter mock + let reporter_cfg = mocks::reporter::Config { + participants: schemes[0].participants().clone(), + scheme: schemes[0].clone(), + elector: ::default(), + }; + let reporter = + mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg); + + // Initialize batcher actor (participant 0) + let me = participants[0].clone(); + let batcher_context = context.with_label("batcher"); + let batcher_cfg = Config { + scheme: schemes[0].clone(), + blocker: oracle.control(me.clone()), + reporter: reporter.clone(), + activity_timeout: ViewDelta::new(10), + skip_timeout: ViewDelta::new(5), + epoch, + mailbox_size: 128, + }; + let (batcher, mut batcher_mailbox) = Actor::new(batcher_context.clone(), batcher_cfg); + + // Verify all participants are initialized to view 0 in the metric + let buffer = batcher_context.encode(); + for participant in &participants { + let expected = format!("latest_vote{{peer=\"{}\"}} 0", participant); + assert!( + buffer.contains(&expected), + "Expected metric for participant {} to be initialized to 0, got: {}", + participant, + buffer + ); + } + + // Create voter mailbox for batcher to send to + let (voter_sender, mut voter_receiver) = + mpsc::channel::>(1024); + let voter_mailbox = voter::Mailbox::new(voter_sender); + + let (_vote_sender, vote_receiver) = oracle + .control(me.clone()) + .register(0, TEST_QUOTA) + .await + .unwrap(); + let (_certificate_sender, certificate_receiver) = oracle + .control(me.clone()) + .register(1, TEST_QUOTA) + .await + .unwrap(); + + // Register participants on the network and set up links + let link = Link { + latency: Duration::from_millis(1), + jitter: Duration::from_millis(0), + success_rate: 1.0, + }; + let mut participant_senders = Vec::new(); + for (i, pk) in participants.iter().enumerate() { + if i == 0 { + participant_senders.push(None); + continue; + } + let (sender, _receiver) = oracle + .control(pk.clone()) + .register(0, TEST_QUOTA) + .await + .unwrap(); + oracle + .add_link(pk.clone(), me.clone(), link.clone()) + .await + .unwrap(); + participant_senders.push(Some(sender)); + } + + // Start the batcher + batcher.start(voter_mailbox, vote_receiver, certificate_receiver); + + // Initialize batcher with view 5, participant 1 as leader + let view = View::new(5); + let leader = 1u32; + let active = batcher_mailbox.update(view, leader, View::zero()).await; + assert!(active); + + // Build proposal and send enough votes to reach quorum + let round = Round::new(epoch, view); + let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload")); + + // Send votes from participants 1 through quorum_size-1 (excluding 0, our own) + for i in 1..quorum_size { + let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap(); + if let Some(ref mut sender) = participant_senders[i] { + sender + .send( + Recipients::One(me.clone()), + Vote::Notarize(vote).encode(), + true, + ) + .await + .unwrap(); + } + } + + // Send our own vote to complete the quorum + let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap(); + batcher_mailbox + .constructed(Vote::Notarize(our_vote)) + .await; + + // Give network time to deliver and batcher time to process and construct certificate + context.sleep(Duration::from_millis(100)).await; + + // Receive proposal and certificate + loop { + let output = voter_receiver.next().await.unwrap(); + match output { + voter::Message::Proposal(_) => continue, + voter::Message::Verified(Certificate::Notarization(n), _) => { + assert_eq!(n.view(), view, "Should construct notarization"); + break; + } + _ => panic!("Unexpected message type"), + } + } + + // Verify votes were tracked for participants who voted + let buffer = batcher_context.encode(); + for (i, participant) in participants.iter().enumerate().take(quorum_size).skip(1) { + let expected = format!("latest_vote{{peer=\"{}\"}} 5", participant); + assert!( + buffer.contains(&expected), + "Expected participant {} to have latest_vote=5, got: {}", + i, + buffer + ); + } + + // Now send a vote from a participant who hasn't voted yet (after quorum) + // This tests that votes are still tracked even after certificate construction + let late_voter = quorum_size; + let late_vote = Notarize::sign(&schemes[late_voter], proposal.clone()).unwrap(); + if let Some(ref mut sender) = participant_senders[late_voter] { + sender + .send( + Recipients::One(me.clone()), + Vote::Notarize(late_vote).encode(), + true, + ) + .await + .unwrap(); + } + + // Give network time to deliver + context.sleep(Duration::from_millis(100)).await; + + // Verify the late vote was still tracked + let buffer = batcher_context.encode(); + let expected_late = format!("latest_vote{{peer=\"{}\"}} 5", participants[late_voter]); + assert!( + buffer.contains(&expected_late), + "Expected late voter (participant {}) to have latest_vote=5 even after quorum, got: {}", + late_voter, + buffer + ); + + // Send a vote for a LOWER view (view 3) from participant 1 who already voted at view 5 + // to verify the metric doesn't decrease + let view3 = View::new(3); + let round3 = Round::new(epoch, view3); + let proposal3 = Proposal::new(round3, View::zero(), Sha256::hash(b"payload3")); + let vote_v3 = Notarize::sign(&schemes[1], proposal3).unwrap(); + if let Some(ref mut sender) = participant_senders[1] { + sender + .send( + Recipients::One(me.clone()), + Vote::Notarize(vote_v3).encode(), + true, + ) + .await + .unwrap(); + } + + context.sleep(Duration::from_millis(100)).await; + + // Verify participant 1 STILL has latest_vote = 5 (not decreased to 3) + let buffer = batcher_context.encode(); + let expected_v5 = format!("latest_vote{{peer=\"{}\"}} 5", participants[1]); + assert!( + buffer.contains(&expected_v5), + "Expected participant 1 to still have latest_vote=5 after receiving lower view vote, got: {}", + buffer + ); + }); + } + + #[test_traced] + fn test_latest_vote_metric_tracking() { + latest_vote_metric_tracking(bls12381_threshold::fixture::); + latest_vote_metric_tracking(bls12381_threshold::fixture::); + latest_vote_metric_tracking(bls12381_multisig::fixture::); + latest_vote_metric_tracking(bls12381_multisig::fixture::); + latest_vote_metric_tracking(ed25519::fixture); + latest_vote_metric_tracking(secp256r1::fixture); + } } diff --git a/consensus/src/simplex/metrics.rs b/consensus/src/simplex/metrics.rs index f3663da31e..eb482fe638 100644 --- a/consensus/src/simplex/metrics.rs +++ b/consensus/src/simplex/metrics.rs @@ -1,6 +1,19 @@ use commonware_utils::Array; use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue}; +#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] +pub struct Peer { + pub peer: String, +} + +impl Peer { + pub fn new(peer: &impl Array) -> Self { + Self { + peer: peer.to_string(), + } + } +} + #[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)] pub enum MessageType { Notarize, diff --git a/runtime/src/telemetry/metrics/status.rs b/runtime/src/telemetry/metrics/status.rs index e86d017586..74688e73cc 100644 --- a/runtime/src/telemetry/metrics/status.rs +++ b/runtime/src/telemetry/metrics/status.rs @@ -4,6 +4,7 @@ use prometheus_client::{ encoding::{EncodeLabelSet, EncodeLabelValue}, metrics::{counter::Counter as DefaultCounter, family::Family, gauge::Gauge}, }; +use std::sync::atomic::Ordering; /// Metric label that indicates status. #[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, EncodeLabelSet)] @@ -87,6 +88,10 @@ impl Drop for CounterGuard { pub trait GaugeExt { /// Sets the [`Gauge`] using a value convertible to `i64`, if conversion is not lossy, returning the previous value if successful. fn try_set>(&self, val: T) -> Result; + + /// Atomically sets the [`Gauge`] to the maximum of the current value and the provided value. + /// Returns the previous value. + fn try_set_max + Copy>(&self, val: T) -> Result; } impl GaugeExt for Gauge { @@ -96,4 +101,9 @@ impl GaugeExt for Gauge { let out = self.set(val); Ok(out) } + + fn try_set_max + Copy>(&self, val: T) -> Result { + let val = val.try_into()?; + Ok(self.inner().fetch_max(val, Ordering::Relaxed)) + } }