Skip to content
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions application/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ alloy-eips.workspace = true
alloy-primitives.workspace = true
governor.workspace = true
rand.workspace = true
tokio-util.workspace = true
alloy-transport-ipc.workspace = true

# For metrics - activate with `prom` feature
Expand Down
18 changes: 17 additions & 1 deletion application/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ use futures::{
future::{self, Either, try_join},
};
use rand::Rng;
use tokio_util::sync::CancellationToken;

use commonware_consensus::simplex::types::View;
use futures::task::{Context, Poll};
Expand Down Expand Up @@ -59,6 +60,7 @@ pub struct Actor<
engine_client: C,
built_block: Arc<Mutex<Option<Block>>>,
genesis_hash: [u8; 32],
cancellation_token: CancellationToken,
}

impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: EngineClient>
Expand All @@ -76,6 +78,7 @@ impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: E
engine_client: cfg.engine_client,
built_block: Arc::new(Mutex::new(None)),
genesis_hash,
cancellation_token: cfg.cancellation_token,
},
Mailbox::new(tx),
)
Expand All @@ -88,6 +91,7 @@ impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: E
pub async fn run(mut self, mut syncer: SyncerMailbox, mut finalizer: FinalizerMailbox) {
let rand_id: u8 = rand::random();
let mut signal = self.context.stopped().fuse();
let cancellation_token = self.cancellation_token.clone();
loop {
select! {
message = self.mailbox.next() => {
Expand Down Expand Up @@ -195,8 +199,12 @@ impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: E
}
}
},
_ = cancellation_token.cancelled() => {
info!("application received cancellation signal, exiting");
break;
},
sig = &mut signal => {
info!("application terminated: {}", sig.unwrap());
info!("runtime terminated, shutting down application: {}", sig.unwrap());
break;
}
}
Expand Down Expand Up @@ -346,6 +354,14 @@ impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: E
}
}

impl<R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: EngineClient> Drop
for Actor<R, C>
{
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}

fn handle_verify(block: &Block, parent: Block) -> bool {
if block.eth_parent_hash() != parent.eth_block_hash() {
return false;
Expand Down
3 changes: 3 additions & 0 deletions application/src/config.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
use summit_types::EngineClient;
use tokio_util::sync::CancellationToken;

#[derive(Clone)]
pub struct ApplicationConfig<C: EngineClient> {
Expand All @@ -11,4 +12,6 @@ pub struct ApplicationConfig<C: EngineClient> {
pub partition_prefix: String,

pub genesis_hash: [u8; 32],

pub cancellation_token: CancellationToken,
}
4 changes: 4 additions & 0 deletions example_genesis.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,19 @@ namespace = "_SEISMIC_BFT"
[[validators]]
public_key = "1be3cb06d7cc347602421fb73838534e4b54934e28959de98906d120d0799ef2"
ip_address = "127.0.0.1:26600"
withdrawal_credentials = "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266"

[[validators]]
public_key = "32efa16e3cd62292db529e8f4babd27724b13b397edcf2b1dbe48f416ce40f0d"
ip_address = "127.0.0.1:26610"
withdrawal_credentials = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8"

[[validators]]
public_key = "ce9b314ac9d55d28bedf543164120eecf737380015c977eaa78d59894bbccf52"
ip_address = "127.0.0.1:26620"
withdrawal_credentials = "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC"

[[validators]]
public_key = "f205c8c88d5d1753843dd0fc9810390efd00d6f752dd555c0ad4000bfcac2226"
ip_address = "127.0.0.1:26630"
withdrawal_credentials = "0x90F79bf6EB2c4f870365E785982E1f101E93b906"
1 change: 1 addition & 0 deletions finalizer/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ bytes.workspace = true
futures.workspace = true
governor.workspace = true
rand.workspace = true
tokio-util.workspace = true
tracing.workspace = true


Expand Down
55 changes: 53 additions & 2 deletions finalizer/src/actor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use summit_types::registry::Registry;
use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch};
use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature};
use summit_types::{BlockEnvelope, EngineClient, consensus_state::ConsensusState};
use tokio_util::sync::CancellationToken;
use tracing::{info, warn};

const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024);
Expand All @@ -56,6 +57,9 @@ pub struct Finalizer<
validator_withdrawal_period: u64, // in blocks
validator_onboarding_limit_per_block: usize,
oracle: O,
public_key: PublicKey,
validator_exit: bool,
cancellation_token: CancellationToken,
}

impl<
Expand Down Expand Up @@ -106,6 +110,9 @@ impl<
validator_minimum_stake: cfg.validator_minimum_stake,
validator_withdrawal_period: cfg.validator_withdrawal_period,
validator_onboarding_limit_per_block: cfg.validator_onboarding_limit_per_block,
public_key: cfg.public_key,
validator_exit: false,
cancellation_token: cfg.cancellation_token,
},
FinalizerMailbox::new(tx),
)
Expand Down Expand Up @@ -138,7 +145,15 @@ impl<

let mut last_committed_timestamp: Option<Instant> = None;
let mut signal = self.context.stopped().fuse();
let cancellation_token = self.cancellation_token.clone();

loop {
if self.validator_exit {
// If the validator was removed from the committee, trigger coordinated shutdown
info!("Validator no longer on the committee, shutting down");
self.cancellation_token.cancel();
break;
}
select! {
msg = rx_finalize_blocks.next() => {
let Some((envelope, notifier)) = msg else {
Expand Down Expand Up @@ -168,8 +183,12 @@ impl<
},
}
}
_ = cancellation_token.cancelled().fuse() => {
info!("finalizer received cancellation signal, exiting");
break;
},
sig = &mut signal => {
info!("finalizer terminated: {}", sig.unwrap());
info!("runtime terminated, shutting down finalizer: {}", sig.unwrap());
break;
}
}
Expand Down Expand Up @@ -341,6 +360,17 @@ impl<
account.status = ValidatorStatus::Active;
}

// If the node's public key is contained in the removed validator list,
// trigger an exit
if self
.state
.removed_validators
.iter()
.any(|pk| pk == &self.public_key)
{
self.validator_exit = true;
}

// TODO(matthias): remove keys in removed_validators from state or set inactive?
self.registry.update_registry(
// We add a delta to the view because the views are initialized with fixed-size
Expand Down Expand Up @@ -461,7 +491,6 @@ impl<
{
continue; // Skip this withdrawal request
}

// If after this withdrawal the validator balance would be less than the
// minimum stake, then the full validator balance is withdrawn.
if account.balance
Expand Down Expand Up @@ -713,6 +742,28 @@ impl<
let height = self.state.get_latest_height();
let _ = sender.send(ConsensusStateResponse::LatestHeight(height));
}
ConsensusStateRequest::GetValidatorBalance(public_key) => {
let mut key_bytes = [0u8; 32];
key_bytes.copy_from_slice(&public_key);

let balance = self
.state
.validator_accounts
.get(&key_bytes)
.map(|account| account.balance);
let _ = sender.send(ConsensusStateResponse::ValidatorBalance(balance));
}
}
}
}

impl<
R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng,
C: EngineClient,
O: NetworkOracle<PublicKey>,
> Drop for Finalizer<R, C, O>
{
fn drop(&mut self) {
self.cancellation_token.cancel();
}
}
4 changes: 4 additions & 0 deletions finalizer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use commonware_runtime::buffer::PoolRef;
use summit_types::network_oracle::NetworkOracle;
use summit_types::registry::Registry;
use summit_types::{EngineClient, PublicKey, consensus_state::ConsensusState};
use tokio_util::sync::CancellationToken;

pub struct FinalizerConfig<C: EngineClient, O: NetworkOracle<PublicKey>> {
pub mailbox_size: usize,
Expand All @@ -21,4 +22,7 @@ pub struct FinalizerConfig<C: EngineClient, O: NetworkOracle<PublicKey>> {
pub initial_state: ConsensusState,
/// Protocol version for the consensus protocol
pub protocol_version: u32,
/// The node's own public key
pub public_key: PublicKey,
pub cancellation_token: CancellationToken,
}
21 changes: 20 additions & 1 deletion finalizer/src/ingress.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,12 @@ use futures::{
channel::{mpsc, oneshot},
};
use summit_types::{
BlockAuxData,
BlockAuxData, PublicKey,
checkpoint::Checkpoint,
consensus_state_query::{ConsensusStateRequest, ConsensusStateResponse},
};

#[allow(clippy::large_enum_variant)]
pub enum FinalizerMessage {
NotifyAtHeight {
height: u64,
Expand Down Expand Up @@ -89,4 +90,22 @@ impl FinalizerMailbox {
};
height
}

pub async fn get_validator_balance(&self, public_key: PublicKey) -> Option<u64> {
let (response, rx) = oneshot::channel();
let request = ConsensusStateRequest::GetValidatorBalance(public_key);
let _ = self
.sender
.clone()
.send(FinalizerMessage::QueryState { request, response })
.await;

let res = rx
.await
.expect("consensus state query response sender dropped");
let ConsensusStateResponse::ValidatorBalance(balance) = res else {
unreachable!("request and response variants must match");
};
balance
}
}
5 changes: 5 additions & 0 deletions node/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,11 @@ name = "stake-and-checkpoint"
path = "src/bin/stake_and_checkpoint.rs"
required-features = ["e2e"]

[[bin]]
name = "withdraw-and-exit"
path = "src/bin/withdraw_and_exit.rs"
required-features = ["e2e"]

[[bin]]
name = "block-fetcher"
path = "src/bin/block_fetcher.rs"
Expand Down
Loading