diff --git a/Cargo.lock b/Cargo.lock index 0656bdc..211e4f8 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5586,6 +5586,7 @@ dependencies = [ "summit-finalizer", "summit-syncer", "summit-types", + "tokio-util", "tracing", ] @@ -5611,6 +5612,7 @@ dependencies = [ "rand 0.8.5", "summit-syncer", "summit-types", + "tokio-util", "tracing", ] @@ -5655,6 +5657,7 @@ dependencies = [ "metrics", "rand 0.8.5", "summit-types", + "tokio-util", "tracing", ] diff --git a/application/Cargo.toml b/application/Cargo.toml index eefe5e2..6ce9a5b 100644 --- a/application/Cargo.toml +++ b/application/Cargo.toml @@ -24,6 +24,7 @@ alloy-eips.workspace = true alloy-primitives.workspace = true governor.workspace = true rand.workspace = true +tokio-util.workspace = true alloy-transport-ipc.workspace = true # For metrics - activate with `prom` feature diff --git a/application/src/actor.rs b/application/src/actor.rs index d3f067d..2275658 100644 --- a/application/src/actor.rs +++ b/application/src/actor.rs @@ -12,6 +12,7 @@ use futures::{ future::{self, Either, try_join}, }; use rand::Rng; +use tokio_util::sync::CancellationToken; use commonware_consensus::simplex::types::View; use futures::task::{Context, Poll}; @@ -59,6 +60,7 @@ pub struct Actor< engine_client: C, built_block: Arc>>, genesis_hash: [u8; 32], + cancellation_token: CancellationToken, } impl @@ -76,6 +78,7 @@ impl { @@ -195,8 +199,12 @@ impl { + info!("application received cancellation signal, exiting"); + break; + }, sig = &mut signal => { - info!("application terminated: {}", sig.unwrap()); + info!("runtime terminated, shutting down application: {}", sig.unwrap()); break; } } @@ -346,6 +354,14 @@ impl Drop + for Actor +{ + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} + fn handle_verify(block: &Block, parent: Block) -> bool { if block.eth_parent_hash() != parent.eth_block_hash() { return false; diff --git a/application/src/config.rs b/application/src/config.rs index 9f18f03..1d59639 100644 --- a/application/src/config.rs +++ b/application/src/config.rs @@ -1,4 +1,5 @@ use summit_types::EngineClient; +use tokio_util::sync::CancellationToken; #[derive(Clone)] pub struct ApplicationConfig { @@ -11,4 +12,6 @@ pub struct ApplicationConfig { pub partition_prefix: String, pub genesis_hash: [u8; 32], + + pub cancellation_token: CancellationToken, } diff --git a/example_genesis.toml b/example_genesis.toml index 0f5441f..fd1cdc4 100644 --- a/example_genesis.toml +++ b/example_genesis.toml @@ -10,15 +10,19 @@ namespace = "_SEISMIC_BFT" [[validators]] public_key = "1be3cb06d7cc347602421fb73838534e4b54934e28959de98906d120d0799ef2" ip_address = "127.0.0.1:26600" +withdrawal_credentials = "0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266" [[validators]] public_key = "32efa16e3cd62292db529e8f4babd27724b13b397edcf2b1dbe48f416ce40f0d" ip_address = "127.0.0.1:26610" +withdrawal_credentials = "0x70997970C51812dc3A010C7d01b50e0d17dc79C8" [[validators]] public_key = "ce9b314ac9d55d28bedf543164120eecf737380015c977eaa78d59894bbccf52" ip_address = "127.0.0.1:26620" +withdrawal_credentials = "0x3C44CdDdB6a900fa2b585dd299e03d12FA4293BC" [[validators]] public_key = "f205c8c88d5d1753843dd0fc9810390efd00d6f752dd555c0ad4000bfcac2226" ip_address = "127.0.0.1:26630" +withdrawal_credentials = "0x90F79bf6EB2c4f870365E785982E1f101E93b906" diff --git a/finalizer/Cargo.toml b/finalizer/Cargo.toml index 5a4aab1..3eaf076 100644 --- a/finalizer/Cargo.toml +++ b/finalizer/Cargo.toml @@ -22,6 +22,7 @@ bytes.workspace = true futures.workspace = true governor.workspace = true rand.workspace = true +tokio-util.workspace = true tracing.workspace = true diff --git a/finalizer/src/actor.rs b/finalizer/src/actor.rs index d9680e5..cd23a7d 100644 --- a/finalizer/src/actor.rs +++ b/finalizer/src/actor.rs @@ -31,6 +31,7 @@ use summit_types::registry::Registry; use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch}; use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature}; use summit_types::{BlockEnvelope, EngineClient, consensus_state::ConsensusState}; +use tokio_util::sync::CancellationToken; use tracing::{info, warn}; const WRITE_BUFFER: NonZero = NZUsize!(1024 * 1024); @@ -56,6 +57,9 @@ pub struct Finalizer< validator_withdrawal_period: u64, // in blocks validator_onboarding_limit_per_block: usize, oracle: O, + public_key: PublicKey, + validator_exit: bool, + cancellation_token: CancellationToken, } impl< @@ -106,6 +110,9 @@ impl< validator_minimum_stake: cfg.validator_minimum_stake, validator_withdrawal_period: cfg.validator_withdrawal_period, validator_onboarding_limit_per_block: cfg.validator_onboarding_limit_per_block, + public_key: cfg.public_key, + validator_exit: false, + cancellation_token: cfg.cancellation_token, }, FinalizerMailbox::new(tx), ) @@ -138,7 +145,15 @@ impl< let mut last_committed_timestamp: Option = None; let mut signal = self.context.stopped().fuse(); + let cancellation_token = self.cancellation_token.clone(); + loop { + if self.validator_exit { + // If the validator was removed from the committee, trigger coordinated shutdown + info!("Validator no longer on the committee, shutting down"); + self.cancellation_token.cancel(); + break; + } select! { msg = rx_finalize_blocks.next() => { let Some((envelope, notifier)) = msg else { @@ -168,8 +183,12 @@ impl< }, } } + _ = cancellation_token.cancelled().fuse() => { + info!("finalizer received cancellation signal, exiting"); + break; + }, sig = &mut signal => { - info!("finalizer terminated: {}", sig.unwrap()); + info!("runtime terminated, shutting down finalizer: {}", sig.unwrap()); break; } } @@ -341,6 +360,17 @@ impl< account.status = ValidatorStatus::Active; } + // If the node's public key is contained in the removed validator list, + // trigger an exit + if self + .state + .removed_validators + .iter() + .any(|pk| pk == &self.public_key) + { + self.validator_exit = true; + } + // TODO(matthias): remove keys in removed_validators from state or set inactive? self.registry.update_registry( // We add a delta to the view because the views are initialized with fixed-size @@ -461,7 +491,6 @@ impl< { continue; // Skip this withdrawal request } - // If after this withdrawal the validator balance would be less than the // minimum stake, then the full validator balance is withdrawn. if account.balance @@ -713,6 +742,28 @@ impl< let height = self.state.get_latest_height(); let _ = sender.send(ConsensusStateResponse::LatestHeight(height)); } + ConsensusStateRequest::GetValidatorBalance(public_key) => { + let mut key_bytes = [0u8; 32]; + key_bytes.copy_from_slice(&public_key); + + let balance = self + .state + .validator_accounts + .get(&key_bytes) + .map(|account| account.balance); + let _ = sender.send(ConsensusStateResponse::ValidatorBalance(balance)); + } } } } + +impl< + R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, + C: EngineClient, + O: NetworkOracle, +> Drop for Finalizer +{ + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} diff --git a/finalizer/src/config.rs b/finalizer/src/config.rs index 351170d..d64718e 100644 --- a/finalizer/src/config.rs +++ b/finalizer/src/config.rs @@ -2,6 +2,7 @@ use commonware_runtime::buffer::PoolRef; use summit_types::network_oracle::NetworkOracle; use summit_types::registry::Registry; use summit_types::{EngineClient, PublicKey, consensus_state::ConsensusState}; +use tokio_util::sync::CancellationToken; pub struct FinalizerConfig> { pub mailbox_size: usize, @@ -21,4 +22,7 @@ pub struct FinalizerConfig> { pub initial_state: ConsensusState, /// Protocol version for the consensus protocol pub protocol_version: u32, + /// The node's own public key + pub public_key: PublicKey, + pub cancellation_token: CancellationToken, } diff --git a/finalizer/src/ingress.rs b/finalizer/src/ingress.rs index 85a99f5..7cd7bc6 100644 --- a/finalizer/src/ingress.rs +++ b/finalizer/src/ingress.rs @@ -3,11 +3,12 @@ use futures::{ channel::{mpsc, oneshot}, }; use summit_types::{ - BlockAuxData, + BlockAuxData, PublicKey, checkpoint::Checkpoint, consensus_state_query::{ConsensusStateRequest, ConsensusStateResponse}, }; +#[allow(clippy::large_enum_variant)] pub enum FinalizerMessage { NotifyAtHeight { height: u64, @@ -89,4 +90,22 @@ impl FinalizerMailbox { }; height } + + pub async fn get_validator_balance(&self, public_key: PublicKey) -> Option { + let (response, rx) = oneshot::channel(); + let request = ConsensusStateRequest::GetValidatorBalance(public_key); + let _ = self + .sender + .clone() + .send(FinalizerMessage::QueryState { request, response }) + .await; + + let res = rx + .await + .expect("consensus state query response sender dropped"); + let ConsensusStateResponse::ValidatorBalance(balance) = res else { + unreachable!("request and response variants must match"); + }; + balance + } } diff --git a/node/Cargo.toml b/node/Cargo.toml index 5e67b04..fbea0f9 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -16,6 +16,11 @@ name = "stake-and-checkpoint" path = "src/bin/stake_and_checkpoint.rs" required-features = ["e2e"] +[[bin]] +name = "withdraw-and-exit" +path = "src/bin/withdraw_and_exit.rs" +required-features = ["e2e"] + [[bin]] name = "block-fetcher" path = "src/bin/block_fetcher.rs" diff --git a/node/src/args.rs b/node/src/args.rs index 7472eb1..09acaf7 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -196,7 +196,7 @@ impl Command { let genesis = Genesis::load_from_file(&flags.genesis_path).expect("Can not find genesis file"); - let mut committee: Vec<(PublicKey, SocketAddr)> = genesis + let mut committee: Vec<(PublicKey, SocketAddr, Address)> = genesis .validators .iter() .map(|v| v.try_into().expect("Invalid validator in genesis")) @@ -269,10 +269,17 @@ impl Command { .expect("This node is not on the committee") }; + let mut network_committee: Vec<(PublicKey, SocketAddr)> = committee + .into_iter() + .map(|(key, ip, _)| (key, ip)) + .collect(); let our_public_key = signer.public_key(); - if !committee.iter().any(|(key, _)| key == &our_public_key) { - committee.push((our_public_key, our_ip)); - committee.sort(); + if !network_committee + .iter() + .any(|(key, _)| key == &our_public_key) + { + network_committee.push((our_public_key, our_ip)); + network_committee.sort(); } // Configure telemetry @@ -314,7 +321,7 @@ impl Command { genesis.namespace.as_bytes(), SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), flags.port), our_ip, - committee.clone(), + network_committee.clone(), genesis.max_message_size_bytes as usize, ); p2p_cfg.mailbox_size = MAILBOX_SIZE; @@ -418,7 +425,7 @@ pub fn run_node_with_runtime( let genesis = Genesis::load_from_file(&flags.genesis_path).expect("Can not find genesis file"); - let mut committee: Vec<(PublicKey, SocketAddr)> = genesis + let mut committee: Vec<(PublicKey, SocketAddr, Address)> = genesis .validators .iter() .map(|v| v.try_into().expect("Invalid validator in genesis")) @@ -488,10 +495,17 @@ pub fn run_node_with_runtime( .expect("This node is not on the committee") }; + let mut network_committee: Vec<(PublicKey, SocketAddr)> = committee + .into_iter() + .map(|(key, ip, _)| (key, ip)) + .collect(); let our_public_key = signer.public_key(); - if !committee.iter().any(|(key, _)| key == &our_public_key) { - committee.push((our_public_key, our_ip)); - committee.sort(); + if !network_committee + .iter() + .any(|(key, _)| key == &our_public_key) + { + network_committee.push((our_public_key, our_ip)); + network_committee.sort(); } // configure network @@ -500,7 +514,7 @@ pub fn run_node_with_runtime( genesis.namespace.as_bytes(), SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), flags.port), our_ip, - committee, + network_committee, genesis.max_message_size_bytes as usize, ); p2p_cfg.mailbox_size = MAILBOX_SIZE; @@ -589,7 +603,7 @@ pub fn run_node_with_runtime( fn get_initial_state( genesis: &Genesis, - committee: &Vec<(PublicKey, SocketAddr)>, + committee: &Vec<(PublicKey, SocketAddr, Address)>, checkpoint: Option, ) -> ConsensusState { let genesis_hash: [u8; 32] = from_hex_formatted(&genesis.eth_genesis_hash) @@ -605,14 +619,14 @@ fn get_initial_state( }; let mut state = ConsensusState::new(forkchoice); // Add the genesis nodes to the consensus state with the minimum stake balance. - for (pubkey, _) in committee { + for (pubkey, _, address) in committee { let pubkey_bytes: [u8; 32] = pubkey .as_ref() .try_into() .expect("Public key must be 32 bytes"); let account = ValidatorAccount { // TODO(matthias): we have to add a withdrawal address to the genesis - withdrawal_credentials: Address::ZERO, + withdrawal_credentials: *address, balance: VALIDATOR_MINIMUM_STAKE, pending_withdrawal_amount: 0, status: ValidatorStatus::Active, diff --git a/node/src/bin/stake_and_checkpoint.rs b/node/src/bin/stake_and_checkpoint.rs index 1cb8baf..266cfe0 100644 --- a/node/src/bin/stake_and_checkpoint.rs +++ b/node/src/bin/stake_and_checkpoint.rs @@ -314,7 +314,7 @@ fn main() -> Result<(), Box> { println!("Error querying height: {}", e); } } - context.sleep(std::time::Duration::from_secs(1)).await; + context.sleep(Duration::from_secs(1)).await; } // Retrieve checkpoint from first node diff --git a/node/src/bin/withdraw_and_exit.rs b/node/src/bin/withdraw_and_exit.rs new file mode 100644 index 0000000..c1bd21d --- /dev/null +++ b/node/src/bin/withdraw_and_exit.rs @@ -0,0 +1,399 @@ +/* +This bin will start 4 reth nodes with an instance of consensus for each and keep running so you can run other tests or submit transactions + +Their rpc endpoints are localhost:8545-node_number +node0_port = 8545 +node1_port = 8544 +... +node3_port = 8542 + + +*/ +use alloy::network::{EthereumWallet, TransactionBuilder}; +use alloy::providers::{Provider, ProviderBuilder, WalletProvider}; +use alloy::rpc::types::TransactionRequest; +use alloy::signers::local::PrivateKeySigner; +use alloy_primitives::{Address, U256}; +use clap::Parser; +use commonware_codec::DecodeExt; +use commonware_runtime::{Clock, Metrics as _, Runner as _, Spawner as _, tokio as cw_tokio}; +use commonware_utils::from_hex_formatted; +use futures::{FutureExt, pin_mut}; +use std::collections::VecDeque; +use std::time::Duration; +use std::{ + fs, + io::{BufRead as _, BufReader, Write as _}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + str::FromStr as _, + thread::JoinHandle, +}; +use summit::args::{RunFlags, run_node_with_runtime}; +use summit::engine::{EPOCH_NUM_BLOCKS, VALIDATOR_MINIMUM_STAKE}; +use summit_types::PublicKey; +use summit_types::reth::Reth; +use tokio::sync::mpsc; +use tracing::Level; + +const NUM_NODES: u16 = 4; + +#[allow(unused)] +struct NodeRuntime { + thread: JoinHandle<()>, + stop_tx: mpsc::UnboundedSender<()>, +} + +#[derive(Parser, Debug)] +struct Args { + /// Path to the directory containing historical blocks for benchmarking + #[cfg(any(feature = "base-bench", feature = "bench"))] + #[arg(long)] + pub bench_block_dir: Option, + /// Path to the log directory + #[arg(long)] + pub log_dir: Option, + /// Path to the data directory for test + #[arg(long, default_value = "/tmp/summit_withdraw_test")] + pub data_dir: String, +} + +fn main() -> Result<(), Box> { + let args = Args::parse(); + + // Remove data_dir if it exists to start fresh + let data_dir_path = PathBuf::from(&args.data_dir); + if data_dir_path.exists() { + fs::remove_dir_all(&data_dir_path)?; + } + + // Create log directory if specified + if let Some(ref log_dir) = args.log_dir { + fs::remove_dir_all(log_dir)?; + fs::create_dir_all(log_dir)?; + } + + let storage_dir = data_dir_path.join("stores"); + + let cfg = cw_tokio::Config::default() + .with_tcp_nodelay(Some(true)) + .with_worker_threads(16) + .with_storage_directory(storage_dir) + .with_catch_panics(false); + let executor = cw_tokio::Runner::new(cfg); + + executor.start(|context| { + async move { + // Configure telemetry + let log_level = Level::from_str("info").expect("Invalid log level"); + cw_tokio::telemetry::init( + context.with_label("metrics"), + cw_tokio::telemetry::Logging { + level: log_level, + // todo: dont know what this does + json: false, + }, + Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6969)), + None, + ); + + // Vec to hold all the join handles + let mut handles = VecDeque::new(); + let mut node_runtimes: Vec = Vec::new(); + // let mut read_threads = Vec::new(); + + // Start all nodes at the beginning + for x in 0..NUM_NODES { + // Start Reth + println!("******* STARTING RETH FOR NODE {x}"); + + // Create data directory if it doesn't exist + let data_dir = format!("{}/node{}/data/reth_db", args.data_dir, x); + fs::create_dir_all(&data_dir).expect("Failed to create data directory"); + + // Build and spawn reth instance + let reth_builder = Reth::new() + .instance(x + 1) + .keep_stdout() + // .genesis(serde_json::from_str(&genesis_str).expect("invalid genesis")) + .data_dir(data_dir) + .arg("--enclave.mock-server") + .arg("--enclave.endpoint-port") + .arg(format!("1744{x}")) + .arg("--auth-ipc") + .arg("--auth-ipc.path") + .arg(format!("/tmp/reth_engine_api{x}.ipc")) + .arg("--metrics") + .arg(format!("0.0.0.0:{}", 9001 + x)); + + let mut reth = reth_builder.spawn(); + + // Get stdout handle + let stdout = reth.stdout().expect("Failed to get stdout"); + + let log_dir = args.log_dir.clone(); + context.clone().spawn(async move |_| { + let reader = BufReader::new(stdout); + let mut log_file = log_dir.as_ref().map(|dir| { + fs::File::create(format!("{}/node{}.log", dir, x)) + .expect("Failed to create log file") + }); + + for line in reader.lines() { + match line { + Ok(line) => { + if let Some(ref mut file) = log_file { + writeln!(file, "[Node {}] {}", x, line) + .expect("Failed to write to log file"); + } + } + Err(_e) => { + // eprintln!("[Node {}] Error reading line: {}", x, e); + } + } + } + }); + + let _auth_port = reth.auth_port().unwrap(); + + println!("Node {} rpc address: {}", x, reth.http_port()); + + handles.push_back(reth); + + #[allow(unused_mut)] + let mut flags = get_node_flags(x.into()); + + #[cfg(any(feature = "base-bench", feature = "bench"))] + { + flags.bench_block_dir = args.bench_block_dir.clone(); + } + + // Start our consensus engine in its own runtime/thread + let (stop_tx, mut stop_rx) = mpsc::unbounded_channel(); + let data_dir_clone = args.data_dir.clone(); + let thread = std::thread::spawn(move || { + let storage_dir = PathBuf::from(&data_dir_clone).join("stores").join(format!("node{}", x)); + let cfg = cw_tokio::Config::default() + .with_tcp_nodelay(Some(true)) + .with_worker_threads(4) + .with_storage_directory(storage_dir) + .with_catch_panics(true); + let executor = cw_tokio::Runner::new(cfg); + + executor.start(|node_context| async move { + let node_handle = node_context.clone().spawn(|ctx| async move { + run_node_with_runtime(ctx, flags, None).await.unwrap(); + }); + + // Wait for stop signal or node completion + let stop_fut = stop_rx.recv().fuse(); + pin_mut!(stop_fut); + futures::select! { + _ = stop_fut => { + println!("Node {} received stop signal, shutting down runtime...", x); + node_context.stop(0, Some(Duration::from_secs(30))).await.unwrap(); + } + _ = node_handle.fuse() => { + println!("Node {} handle completed", x); + } + } + }); + }); + + node_runtimes.push(NodeRuntime { thread, stop_tx }); + } + + // Wait a bit for nodes to be ready + context.sleep(Duration::from_secs(2)).await; + + // Send a withdrawal transaction to one of the Reth instances + println!("Sending deposit transaction to node 1"); + let node0_http_port = handles[1].http_port(); + let node0_url = format!("http://localhost:{}", node0_http_port); + + // Create a test private key and signer + let private_key = "0x7c852118294e51e653712a81e05800f419141751be58f605c371e15141b007a6"; + let signer = PrivateKeySigner::from_str(private_key).expect("Failed to create signer"); + let wallet = EthereumWallet::from(signer); + + // Create provider with wallet + let provider = ProviderBuilder::new() + .wallet(wallet) + .connect_http(node0_url.parse().expect("Invalid URL")); + + let withdrawal_contract_address = Address::from_str("0x00000961Ef480Eb55e80D19ad83579A64c007002").unwrap(); + let pub_key_bytes = from_hex_formatted("f205c8c88d5d1753843dd0fc9810390efd00d6f752dd555c0ad4000bfcac2226").ok_or("PublicKey bad format").unwrap(); + let pub_key_bytes_ar: [u8; 32] = pub_key_bytes.try_into().unwrap(); + let _public_key = PublicKey::decode(&pub_key_bytes_ar[..]).map_err(|_| "Unable to decode Public Key").unwrap(); + let withdrawal_amount = VALIDATOR_MINIMUM_STAKE; + let withdrawal_fee = U256::from(1000000000000000u64); // 0.001 ETH fee + + // Check balance before withdrawal + let withdrawal_credentials = Address::from_str("0x90F79bf6EB2c4f870365E785982E1f101E93b906").unwrap(); + let balance_before = provider.get_balance(withdrawal_credentials).await.expect("Failed to get balance before withdrawal"); + println!("Withdrawal credentials balance before: {} wei", balance_before); + + send_withdrawal_transaction(&provider, withdrawal_contract_address, &pub_key_bytes_ar, withdrawal_amount, withdrawal_fee, 0) + .await + .expect("failed to send deposit transaction"); + + // Wait for all nodes to continue making progress + let epoch_end = EPOCH_NUM_BLOCKS; + println!( + "Waiting for all {} nodes to reach height {}", + NUM_NODES, epoch_end + ); + loop { + let mut all_ready = true; + for idx in 0..(NUM_NODES - 1) { + let rpc_port = get_node_flags(idx as usize).rpc_port; + match get_latest_height(rpc_port).await { + Ok(height) => { + if height < epoch_end { + all_ready = false; + println!("Node {} at height {}", idx, height); + } + } + Err(e) => { + all_ready = false; + println!("Node {} error: {}", idx, e); + } + } + } + if all_ready { + println!("All nodes have reached height {}", epoch_end); + break; + } + context.sleep(Duration::from_secs(2)).await; + } + + context.sleep(Duration::from_secs(3)).await; + + // Check that the balance was incremented on the execution layer (Reth) + let node0_http_port = handles[0].http_port(); + let node0_url = format!("http://localhost:{}", node0_http_port); + let node0_provider = ProviderBuilder::new().connect_http(node0_url.parse().expect("Invalid URL")); + + // Check + + let balance_after = node0_provider.get_balance(withdrawal_credentials).await.expect("Failed to get balance after withdrawal"); + println!("Withdrawal credentials balance after: {} wei", balance_after); + + // The withdrawal amount was VALIDATOR_MINIMUM_STAKE (32 ETH in gwei) + // Converting to wei: 32_000_000_000 gwei * 10^9 = 32 * 10^18 wei + let expected_difference = U256::from(VALIDATOR_MINIMUM_STAKE) * U256::from(1_000_000_000u64); + let actual_difference = balance_after - balance_before; + + // Allow tolerance for gas fees (0.01 ETH = 10^16 wei) + let tolerance = U256::from(10_000_000_000_000_000u64); + let lower_bound = expected_difference - tolerance; + let upper_bound = expected_difference + tolerance; + assert!(actual_difference >= lower_bound && actual_difference <= upper_bound, + "Balance difference {} is outside expected range [{}, {}]", + actual_difference, lower_bound, upper_bound); + println!("Withdrawal successful: balance increased by {} wei (expected ~{})", + actual_difference, expected_difference); + + // Check that the validator was removed from the consensus state + let rpc_port = get_node_flags(0).rpc_port; + let validator_balance = get_validator_balance(rpc_port, "f205c8c88d5d1753843dd0fc9810390efd00d6f752dd555c0ad4000bfcac2226".to_string()).await; + if let Err(msg) = validator_balance { + assert_eq!(msg.to_string(), "Validator not found"); + println!("Validator that withdrew is not on the consensus state anymore"); + } else { + panic!("Validator should not be on the consensus state anymore"); + } + + Ok(()) + } + }) +} + +async fn send_withdrawal_transaction

( + provider: &P, + withdrawal_contract_address: Address, + //validator_pubkey: &[u8; 48], + ed25519_pubkey: &[u8; 32], + withdrawal_amount: u64, // Amount in gwei + withdrawal_fee: U256, // Current fee required by the contract + nonce: u64, +) -> Result<(), Box> +where + P: Provider + WalletProvider, +{ + // Left-pad ed25519 key to 48 bytes for the contract (prepend zeros) + let mut padded_pubkey = [0u8; 48]; + padded_pubkey[16..48].copy_from_slice(ed25519_pubkey); + + // EIP-7002: Input is exactly 56 bytes: validator_pubkey (48 bytes) + amount (8 bytes, big-endian uint64) + let mut call_data = Vec::with_capacity(56); + + // Add validator pubkey (48 bytes) + call_data.extend_from_slice(&padded_pubkey); + + // Add withdrawal amount (8 bytes, big-endian uint64) + call_data.extend_from_slice(&withdrawal_amount.to_be_bytes()); + + let tx_request = TransactionRequest::default() + .to(withdrawal_contract_address) + .value(withdrawal_fee) // Must send enough ETH to cover withdrawal request fee + .input(call_data.into()) + .with_gas_limit(500_000) // Lower gas limit for simpler operation + .with_gas_price(1_000_000_000) // 1 gwei + .with_nonce(nonce); + + match provider.send_transaction(tx_request).await { + Ok(pending) => { + println!("Transaction sent: {}", pending.tx_hash()); + match pending.get_receipt().await { + Ok(receipt) => { + println!("Receipt: {:?}", receipt); + Ok(()) + } + Err(e) => panic!("Transaction failed: {e}"), + } + } + Err(e) => panic!("Error sending transaction: {}", e), + } +} + +async fn get_latest_height(rpc_port: u16) -> Result> { + let url = format!("http://localhost:{}/get_latest_height", rpc_port); + let response = reqwest::get(&url).await?.text().await?; + Ok(response.parse()?) +} + +async fn get_validator_balance( + rpc_port: u16, + public_key: String, +) -> Result> { + let url = format!( + "http://localhost:{}/get_validator_balance?public_key={}", + rpc_port, public_key + ); + let response = reqwest::get(&url).await?.text().await?; + let Ok(balance) = response.parse() else { + return Err(response.into()); + }; + Ok(balance) +} + +fn get_node_flags(node: usize) -> RunFlags { + let path = format!("testnet/node{node}/"); + + RunFlags { + key_path: format!("{path}key.pem"), + store_path: format!("{path}db"), + port: (26600 + (node * 10)) as u16, + prom_port: (28600 + (node * 10)) as u16, + rpc_port: (3030 + (node * 10)) as u16, + worker_threads: 2, + log_level: "debug".into(), + db_prefix: format!("{node}-quarts"), + genesis_path: "./example_genesis.toml".into(), + engine_ipc_path: format!("/tmp/reth_engine_api{node}.ipc"), + #[cfg(any(feature = "base-bench", feature = "bench"))] + bench_block_dir: None, + ip: None, + } +} diff --git a/node/src/engine.rs b/node/src/engine.rs index daab1e6..d652064 100644 --- a/node/src/engine.rs +++ b/node/src/engine.rs @@ -6,6 +6,7 @@ use commonware_p2p::{Receiver, Sender}; use commonware_runtime::buffer::PoolRef; use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; use commonware_utils::NZUsize; +use futures::FutureExt; use futures::channel::mpsc; use futures::future::try_join_all; use governor::clock::Clock as GClock; @@ -18,7 +19,8 @@ use summit_syncer::Orchestrator; use summit_types::network_oracle::NetworkOracle; use summit_types::registry::Registry; use summit_types::{Block, Digest, EngineClient, PrivateKey, PublicKey}; -use tracing::{error, warn}; +use tokio_util::sync::CancellationToken; +use tracing::{error, info, warn}; pub const PROTOCOL_VERSION: u32 = 1; @@ -35,9 +37,11 @@ const BUFFER_POOL_CAPACITY: NonZero = NZUsize!(8_192); // 32MB const VALIDATOR_ONBOARDING_LIMIT_PER_BLOCK: usize = 3; pub const VALIDATOR_MINIMUM_STAKE: u64 = 32_000_000_000; // in gwei -#[cfg(debug_assertions)] +#[cfg(feature = "e2e")] +pub const VALIDATOR_WITHDRAWAL_PERIOD: u64 = 10; +#[cfg(all(debug_assertions, not(feature = "e2e")))] pub const VALIDATOR_WITHDRAWAL_PERIOD: u64 = 5; -#[cfg(not(debug_assertions))] +#[cfg(all(not(debug_assertions), not(feature = "e2e")))] const VALIDATOR_WITHDRAWAL_PERIOD: u64 = 100; #[cfg(all(feature = "e2e", not(debug_assertions)))] pub const EPOCH_NUM_BLOCKS: u64 = 50; @@ -73,6 +77,7 @@ pub struct Engine< >, sync_height: u64, + cancellation_token: CancellationToken, } impl< @@ -87,6 +92,8 @@ impl< let sync_height = cfg.initial_state.latest_height; + let cancellation_token = CancellationToken::new(); + // create application let (application, application_mailbox) = summit_application::Actor::new( context.with_label("application"), @@ -95,6 +102,7 @@ impl< mailbox_size: cfg.mailbox_size, partition_prefix: cfg.partition_prefix.clone(), genesis_hash: cfg.genesis_hash, + cancellation_token: cancellation_token.clone(), }, ) .await; @@ -121,6 +129,7 @@ impl< activity_timeout: cfg.activity_timeout, namespace: cfg.namespace.clone(), buffer_pool: buffer_pool.clone(), + cancellation_token: cancellation_token.clone(), }; let (syncer, syncer_mailbox, orchestrator) = summit_syncer::Actor::new(context.with_label("syncer"), syncer_config).await; @@ -143,6 +152,8 @@ impl< genesis_hash: cfg.genesis_hash, initial_state: cfg.initial_state, protocol_version: PROTOCOL_VERSION, + public_key: cfg.signer.public_key(), + cancellation_token: cancellation_token.clone(), }, ) .await; @@ -187,6 +198,7 @@ impl< finalizer_mailbox, orchestrator, sync_height, + cancellation_token, } } @@ -261,19 +273,32 @@ impl< // start simplex let simplex_handle = self.simplex.start(voter_network, resolver_network); - // Wait for any actor to finish - if let Err(e) = try_join_all(vec![ + // Wait for either all actors to finish or cancellation signal + let actors_fut = try_join_all(vec![ app_handle, buffer_handle, finalizer_handle, syncer_handle, simplex_handle, ]) - .await - { - error!(?e, "engine failed"); - } else { - warn!("engine stopped"); + .fuse(); + let cancellation_fut = self.cancellation_token.cancelled().fuse(); + futures::pin_mut!(actors_fut, cancellation_fut); + + futures::select! { + result = actors_fut => { + if let Err(e) = result { + error!(?e, "engine failed"); + } else { + warn!("engine stopped"); + } + } + _ = cancellation_fut => { + info!("cancellation triggered, waiting for actors to finish"); + if let Err(e) = actors_fut.await { + error!(?e, "engine failed during graceful shutdown"); + } + } } } } diff --git a/rpc/src/routes.rs b/rpc/src/routes.rs index 3c87788..25d607f 100644 --- a/rpc/src/routes.rs +++ b/rpc/src/routes.rs @@ -2,17 +2,23 @@ use std::sync::Arc; use axum::{ Router, - extract::State, + extract::{Query, State}, routing::{get, post}, }; use commonware_codec::DecodeExt as _; use commonware_cryptography::Signer; use commonware_utils::{from_hex_formatted, hex}; +use serde::Deserialize; use ssz::Encode; -use summit_types::{PrivateKey, utils::get_expanded_path}; +use summit_types::{PrivateKey, PublicKey, utils::get_expanded_path}; use crate::{GenesisRpcState, PathSender, RpcState}; +#[derive(Deserialize)] +struct ValidatorBalanceQuery { + public_key: String, +} + pub(crate) struct RpcRoutes; impl RpcRoutes { @@ -25,6 +31,10 @@ impl RpcRoutes { .route("/get_public_key", get(Self::handle_get_pub_key)) .route("/get_checkpoint", get(Self::handle_get_checkpoint)) .route("/get_latest_height", get(Self::handle_latest_height)) + .route( + "/get_validator_balance", + get(Self::handle_get_validator_balance), + ) .with_state(state) } @@ -80,6 +90,27 @@ impl RpcRoutes { .to_string()) } + async fn handle_get_validator_balance( + State(state): State>, + Query(params): Query, + ) -> Result { + // Parse the public key from hex string + let key_bytes = + from_hex_formatted(¶ms.public_key).ok_or("Invalid hex format for public key")?; + let public_key = + PublicKey::decode(&*key_bytes).map_err(|_| "Unable to decode public key")?; + + let balance = state + .finalizer_mailbox + .get_validator_balance(public_key) + .await; + + match balance { + Some(balance) => Ok(balance.to_string()), + None => Err("Validator not found".to_string()), + } + } + async fn handle_send_genesis( State(state): State>, body: String, diff --git a/syncer/Cargo.toml b/syncer/Cargo.toml index 91506a1..9bb0445 100644 --- a/syncer/Cargo.toml +++ b/syncer/Cargo.toml @@ -21,6 +21,7 @@ bytes.workspace = true futures.workspace = true governor.workspace = true rand.workspace = true +tokio-util.workspace = true tracing.workspace = true # For metrics - activate with `prom` feature diff --git a/syncer/src/actor.rs b/syncer/src/actor.rs index be461f9..9e885aa 100644 --- a/syncer/src/actor.rs +++ b/syncer/src/actor.rs @@ -29,6 +29,7 @@ use metrics::histogram; use rand::Rng; use summit_types::registry::Registry; use summit_types::{Block, Digest, Finalized, Notarized, PublicKey, Signature}; +use tokio_util::sync::CancellationToken; use tracing::{debug, info, warn}; const PRUNABLE_ITEMS_PER_SECTION: NonZero = NZU64!(4_096); @@ -62,6 +63,7 @@ pub struct Actor Actor { @@ -162,6 +164,7 @@ impl Acto activity_timeout: config.activity_timeout, namespace: config.namespace, orchestrator_mailbox, + cancellation_token: config.cancellation_token, }, Mailbox::new(tx), Orchestrator::new(orchestrator_sender), @@ -744,11 +747,21 @@ impl Acto }, } }, + _ = self.cancellation_token.cancelled() => { + info!("syncer received cancellation signal, exiting"); + break; + }, sig = &mut signal => { - info!("syncer terminated: {}", sig.unwrap()); + info!("runtime terminated terminated, shutting down syncer: {}", sig.unwrap()); break; } } } } } + +impl Drop for Actor { + fn drop(&mut self) { + self.cancellation_token.cancel(); + } +} diff --git a/syncer/src/lib.rs b/syncer/src/lib.rs index f639d89..0df9146 100644 --- a/syncer/src/lib.rs +++ b/syncer/src/lib.rs @@ -5,6 +5,7 @@ use commonware_runtime::buffer::PoolRef; pub use ingress::*; use summit_types::PublicKey; use summit_types::registry::Registry; +use tokio_util::sync::CancellationToken; pub mod coordinator; pub mod handler; @@ -28,4 +29,6 @@ pub struct Config { pub namespace: String, pub buffer_pool: PoolRef, + + pub cancellation_token: CancellationToken, } diff --git a/types/src/consensus_state_query.rs b/types/src/consensus_state_query.rs index a8f1d3a..fc5a5ed 100644 --- a/types/src/consensus_state_query.rs +++ b/types/src/consensus_state_query.rs @@ -1,3 +1,4 @@ +use crate::PublicKey; use crate::checkpoint::Checkpoint; use futures::SinkExt; use futures::channel::{mpsc, oneshot}; @@ -5,11 +6,13 @@ use futures::channel::{mpsc, oneshot}; pub enum ConsensusStateRequest { GetCheckpoint, GetLatestHeight, + GetValidatorBalance(PublicKey), } pub enum ConsensusStateResponse { Checkpoint(Option), LatestHeight(u64), + ValidatorBalance(Option), } /// Used to send queries to the application finalizer to query the consensus state. @@ -76,4 +79,18 @@ impl ConsensusStateQuery { }; height } + + pub async fn get_validator_balance(&self, public_key: PublicKey) -> Option { + let (tx, rx) = oneshot::channel(); + let req = ConsensusStateRequest::GetValidatorBalance(public_key); + let _ = self.sender.clone().send((req, tx)).await; + + let res = rx + .await + .expect("consensus state query response sender dropped"); + let ConsensusStateResponse::ValidatorBalance(balance) = res else { + unreachable!("request and response variants must match"); + }; + balance + } } diff --git a/types/src/genesis.rs b/types/src/genesis.rs index 5ed18b8..13f2c67 100644 --- a/types/src/genesis.rs +++ b/types/src/genesis.rs @@ -1,4 +1,5 @@ use crate::PublicKey; +use alloy_primitives::Address; use commonware_codec::DecodeExt; use commonware_utils::from_hex_formatted; use serde::{Deserialize, Serialize}; @@ -41,17 +42,21 @@ pub struct Genesis { pub struct Validator { pub public_key: String, pub ip_address: String, + pub withdrawal_credentials: String, } -impl TryInto<(PublicKey, SocketAddr)> for &Validator { +impl TryInto<(PublicKey, SocketAddr, Address)> for &Validator { type Error = String; - fn try_into(self) -> Result<(PublicKey, SocketAddr), Self::Error> { + fn try_into(self) -> Result<(PublicKey, SocketAddr, Address), Self::Error> { let pub_key_bytes = from_hex_formatted(&self.public_key).ok_or("PublicKey bad format")?; Ok(( PublicKey::decode(&*pub_key_bytes).map_err(|_| "Unable to decode Public Key")?, self.ip_address.parse().map_err(|_| "Invalid ip address")?, + self.withdrawal_credentials + .parse() + .map_err(|_| "Invalid withdrawal credentials")?, )) } }