Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 27 additions & 5 deletions consensus/src/simplex/actors/batcher/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::{
simplex::{
actors::voter,
interesting,
metrics::Inbound,
metrics::{Inbound, Peer},
scheme::Scheme,
types::{Activity, Certificate, Vote},
},
Expand All @@ -15,12 +15,17 @@ use commonware_macros::select;
use commonware_p2p::{utils::codec::WrappedReceiver, Blocker, Receiver};
use commonware_runtime::{
spawn_cell,
telemetry::metrics::histogram::{self, Buckets},
telemetry::metrics::{
histogram::{self, Buckets},
status::GaugeExt,
},
Clock, ContextCell, Handle, Metrics, Spawner,
};
use commonware_utils::ordered::{Quorum, Set};
use futures::{channel::mpsc, StreamExt};
use prometheus_client::metrics::{counter::Counter, family::Family, histogram::Histogram};
use prometheus_client::metrics::{
counter::Counter, family::Family, gauge::Gauge, histogram::Histogram,
};
use rand_core::CryptoRngCore;
use std::{collections::BTreeMap, sync::Arc};
use tracing::{debug, trace, warn};
Expand Down Expand Up @@ -49,6 +54,7 @@ pub struct Actor<
added: Counter,
verified: Counter,
inbound_messages: Family<Inbound, Counter>,
latest_vote: Family<Peer, Gauge>,
batch_size: Histogram,
verify_latency: histogram::Timed<E>,
recover_latency: histogram::Timed<E>,
Expand Down Expand Up @@ -79,6 +85,15 @@ impl<
"number of inbound messages",
inbound_messages.clone(),
);
let latest_vote = Family::<Peer, Gauge>::default();
context.register(
"latest_vote",
"view of latest vote received per peer",
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This may be annoying to work with in practice in something like reshare because we namespace metrics by epoch (so you'd have to constantly be updating your dashboard I think?)

But that should be considered in a follow-up.

latest_vote.clone(),
);
for participant in cfg.scheme.participants().iter() {
latest_vote.get_or_create(&Peer::new(participant)).set(0);
}
context.register(
"batch_size",
"number of messages in a signature verification batch",
Expand All @@ -99,12 +114,11 @@ impl<
// TODO(#1833): Metrics should use the post-start context
let clock = Arc::new(context.clone());
let (sender, receiver) = mpsc::channel(cfg.mailbox_size);
let participants = cfg.scheme.participants().clone();
(
Self {
context: ContextCell::new(context),

participants,
participants: cfg.scheme.participants().clone(),
scheme: cfg.scheme,

blocker: cfg.blocker,
Expand All @@ -119,6 +133,7 @@ impl<
added,
verified,
inbound_messages,
latest_vote,
batch_size,
verify_latency: histogram::Timed::new(verify_latency, clock.clone()),
recover_latency: histogram::Timed::new(recover_latency, clock),
Expand Down Expand Up @@ -397,12 +412,19 @@ impl<
}

// Add the vote to the verifier
let peer = Peer::new(&sender);
if work
.entry(view)
.or_insert_with(|| self.new_round())
.add_network(sender, message)
.await {
self.added.inc();

// Update per-peer latest vote metric (only if higher than current)
let _ = self
.latest_vote
.get_or_create(&peer)
.try_set_max(view.get());
Copy link
Contributor Author

@patrick-ogrady patrick-ogrady Jan 6, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opted to move into post-network validation to avoid duplicating its checks (to ensure a peer, for example is in participants -> thank you based bugbot).

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This shouldn't happen with our current p2p but we should be defensive.

}
updated_view = view;
},
Expand Down
234 changes: 234 additions & 0 deletions consensus/src/simplex/actors/batcher/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1402,4 +1402,238 @@ mod tests {
votes_skipped_for_finalized_views(ed25519::fixture);
votes_skipped_for_finalized_views(secp256r1::fixture);
}

fn latest_vote_metric_tracking<S, F>(mut fixture: F)
where
S: Scheme<Sha256Digest, PublicKey = PublicKey>,
F: FnMut(&mut deterministic::Context, &[u8], u32) -> Fixture<S>,
{
let n = 5;
let quorum_size = quorum(n) as usize;
let namespace = b"batcher_test".to_vec();
let epoch = Epoch::new(333);
let executor = deterministic::Runner::timed(Duration::from_secs(10));
executor.start(|mut context| async move {
// Create simulated network
let (network, mut oracle) = Network::new(
context.with_label("network"),
NConfig {
max_size: 1024 * 1024,
disconnect_on_block: true,
tracked_peer_sets: None,
},
);
network.start();

// Get participants
let Fixture {
participants,
schemes,
..
} = fixture(&mut context, &namespace, n);

// Setup reporter mock
let reporter_cfg = mocks::reporter::Config {
participants: schemes[0].participants().clone(),
scheme: schemes[0].clone(),
elector: <RoundRobin>::default(),
};
let reporter =
mocks::reporter::Reporter::new(context.with_label("reporter"), reporter_cfg);

// Initialize batcher actor (participant 0)
let me = participants[0].clone();
let batcher_context = context.with_label("batcher");
let batcher_cfg = Config {
scheme: schemes[0].clone(),
blocker: oracle.control(me.clone()),
reporter: reporter.clone(),
activity_timeout: ViewDelta::new(10),
skip_timeout: ViewDelta::new(5),
epoch,
mailbox_size: 128,
};
let (batcher, mut batcher_mailbox) = Actor::new(batcher_context.clone(), batcher_cfg);

// Verify all participants are initialized to view 0 in the metric
let buffer = batcher_context.encode();
for participant in &participants {
let expected = format!("latest_vote{{peer=\"{}\"}} 0", participant);
assert!(
buffer.contains(&expected),
"Expected metric for participant {} to be initialized to 0, got: {}",
participant,
buffer
);
}

// Create voter mailbox for batcher to send to
let (voter_sender, mut voter_receiver) =
mpsc::channel::<voter::Message<S, Sha256Digest>>(1024);
let voter_mailbox = voter::Mailbox::new(voter_sender);

let (_vote_sender, vote_receiver) = oracle
.control(me.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
let (_certificate_sender, certificate_receiver) = oracle
.control(me.clone())
.register(1, TEST_QUOTA)
.await
.unwrap();

// Register participants on the network and set up links
let link = Link {
latency: Duration::from_millis(1),
jitter: Duration::from_millis(0),
success_rate: 1.0,
};
let mut participant_senders = Vec::new();
for (i, pk) in participants.iter().enumerate() {
if i == 0 {
participant_senders.push(None);
continue;
}
let (sender, _receiver) = oracle
.control(pk.clone())
.register(0, TEST_QUOTA)
.await
.unwrap();
oracle
.add_link(pk.clone(), me.clone(), link.clone())
.await
.unwrap();
participant_senders.push(Some(sender));
}

// Start the batcher
batcher.start(voter_mailbox, vote_receiver, certificate_receiver);

// Initialize batcher with view 5, participant 1 as leader
let view = View::new(5);
let leader = 1u32;
let active = batcher_mailbox.update(view, leader, View::zero()).await;
assert!(active);

// Build proposal and send enough votes to reach quorum
let round = Round::new(epoch, view);
let proposal = Proposal::new(round, View::zero(), Sha256::hash(b"test_payload"));

// Send votes from participants 1 through quorum_size-1 (excluding 0, our own)
for i in 1..quorum_size {
let vote = Notarize::sign(&schemes[i], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[i] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote).encode(),
true,
)
.await
.unwrap();
}
}

// Send our own vote to complete the quorum
let our_vote = Notarize::sign(&schemes[0], proposal.clone()).unwrap();
batcher_mailbox
.constructed(Vote::Notarize(our_vote))
.await;

// Give network time to deliver and batcher time to process and construct certificate
context.sleep(Duration::from_millis(100)).await;

// Receive proposal and certificate
loop {
let output = voter_receiver.next().await.unwrap();
match output {
voter::Message::Proposal(_) => continue,
voter::Message::Verified(Certificate::Notarization(n), _) => {
assert_eq!(n.view(), view, "Should construct notarization");
break;
}
_ => panic!("Unexpected message type"),
}
}

// Verify votes were tracked for participants who voted
let buffer = batcher_context.encode();
for (i, participant) in participants.iter().enumerate().take(quorum_size).skip(1) {
let expected = format!("latest_vote{{peer=\"{}\"}} 5", participant);
assert!(
buffer.contains(&expected),
"Expected participant {} to have latest_vote=5, got: {}",
i,
buffer
);
}

// Now send a vote from a participant who hasn't voted yet (after quorum)
// This tests that votes are still tracked even after certificate construction
let late_voter = quorum_size;
let late_vote = Notarize::sign(&schemes[late_voter], proposal.clone()).unwrap();
if let Some(ref mut sender) = participant_senders[late_voter] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(late_vote).encode(),
true,
)
.await
.unwrap();
}

// Give network time to deliver
context.sleep(Duration::from_millis(100)).await;

// Verify the late vote was still tracked
let buffer = batcher_context.encode();
let expected_late = format!("latest_vote{{peer=\"{}\"}} 5", participants[late_voter]);
assert!(
buffer.contains(&expected_late),
"Expected late voter (participant {}) to have latest_vote=5 even after quorum, got: {}",
late_voter,
buffer
);

// Send a vote for a LOWER view (view 3) from participant 1 who already voted at view 5
// to verify the metric doesn't decrease
let view3 = View::new(3);
let round3 = Round::new(epoch, view3);
let proposal3 = Proposal::new(round3, View::zero(), Sha256::hash(b"payload3"));
let vote_v3 = Notarize::sign(&schemes[1], proposal3).unwrap();
if let Some(ref mut sender) = participant_senders[1] {
sender
.send(
Recipients::One(me.clone()),
Vote::Notarize(vote_v3).encode(),
true,
)
.await
.unwrap();
}

context.sleep(Duration::from_millis(100)).await;

// Verify participant 1 STILL has latest_vote = 5 (not decreased to 3)
let buffer = batcher_context.encode();
let expected_v5 = format!("latest_vote{{peer=\"{}\"}} 5", participants[1]);
assert!(
buffer.contains(&expected_v5),
"Expected participant 1 to still have latest_vote=5 after receiving lower view vote, got: {}",
buffer
);
});
}

#[test_traced]
fn test_latest_vote_metric_tracking() {
latest_vote_metric_tracking(bls12381_threshold::fixture::<MinPk, _>);
latest_vote_metric_tracking(bls12381_threshold::fixture::<MinSig, _>);
latest_vote_metric_tracking(bls12381_multisig::fixture::<MinPk, _>);
latest_vote_metric_tracking(bls12381_multisig::fixture::<MinSig, _>);
latest_vote_metric_tracking(ed25519::fixture);
latest_vote_metric_tracking(secp256r1::fixture);
}
}
13 changes: 13 additions & 0 deletions consensus/src/simplex/metrics.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,19 @@
use commonware_utils::Array;
use prometheus_client::encoding::{EncodeLabelSet, EncodeLabelValue};

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
pub struct Peer {
pub peer: String,
}

impl Peer {
pub fn new(peer: &impl Array) -> Self {
Self {
peer: peer.to_string(),
}
}
}

#[derive(Clone, Debug, Hash, PartialEq, Eq, EncodeLabelValue)]
pub enum MessageType {
Notarize,
Expand Down
10 changes: 10 additions & 0 deletions runtime/src/telemetry/metrics/status.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use prometheus_client::{
encoding::{EncodeLabelSet, EncodeLabelValue},
metrics::{counter::Counter as DefaultCounter, family::Family, gauge::Gauge},
};
use std::sync::atomic::Ordering;

/// Metric label that indicates status.
#[derive(Clone, Copy, Debug, Hash, PartialEq, Eq, EncodeLabelSet)]
Expand Down Expand Up @@ -87,6 +88,10 @@ impl Drop for CounterGuard {
pub trait GaugeExt {
/// Sets the [`Gauge`] using a value convertible to `i64`, if conversion is not lossy, returning the previous value if successful.
fn try_set<T: TryInto<i64>>(&self, val: T) -> Result<i64, T::Error>;

/// Atomically sets the [`Gauge`] to the maximum of the current value and the provided value.
/// Returns the previous value.
fn try_set_max<T: TryInto<i64> + Copy>(&self, val: T) -> Result<i64, T::Error>;
}

impl GaugeExt for Gauge {
Expand All @@ -96,4 +101,9 @@ impl GaugeExt for Gauge {
let out = self.set(val);
Ok(out)
}

fn try_set_max<T: TryInto<i64> + Copy>(&self, val: T) -> Result<i64, T::Error> {
let val = val.try_into()?;
Ok(self.inner().fetch_max(val, Ordering::Relaxed))
}
}
Loading