diff --git a/Cargo.lock b/Cargo.lock index 36c596d..0656bdc 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -54,6 +54,29 @@ version = "0.2.21" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" +[[package]] +name = "alloy" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2e2a5d689ccd182f1d138a61f081841b905034e0089f5278f6c200f2bcdab00a" +dependencies = [ + "alloy-consensus", + "alloy-contract", + "alloy-core", + "alloy-eips", + "alloy-genesis", + "alloy-network", + "alloy-provider", + "alloy-rpc-client", + "alloy-rpc-types", + "alloy-serde", + "alloy-signer", + "alloy-signer-local", + "alloy-transport", + "alloy-transport-http", + "alloy-trie", +] + [[package]] name = "alloy-chains" version = "0.2.5" @@ -104,6 +127,57 @@ dependencies = [ "serde", ] +[[package]] +name = "alloy-contract" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "de217ab604f1bcfa2e3b0aff86d50812d5931d47522f9f0a949cc263ec2d108e" +dependencies = [ + "alloy-consensus", + "alloy-dyn-abi", + "alloy-json-abi", + "alloy-network", + "alloy-network-primitives", + "alloy-primitives", + "alloy-provider", + "alloy-rpc-types-eth", + "alloy-sol-types", + "alloy-transport", + "futures", + "futures-util", + "serde_json", + "thiserror 2.0.12", +] + +[[package]] +name = "alloy-core" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad31216895d27d307369daa1393f5850b50bbbd372478a9fa951c095c210627e" +dependencies = [ + "alloy-dyn-abi", + "alloy-json-abi", + "alloy-primitives", + "alloy-rlp", + "alloy-sol-types", +] + +[[package]] +name = "alloy-dyn-abi" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7b95b3deca680efc7e9cba781f1a1db352fa1ea50e6384a514944dcf4419e652" +dependencies = [ + "alloy-json-abi", + "alloy-primitives", + "alloy-sol-type-parser", + "alloy-sol-types", + "itoa", + "serde", + "serde_json", + "winnow", +] + [[package]] name = "alloy-eip2124" version = "0.2.0" @@ -166,15 +240,16 @@ dependencies = [ [[package]] name = "alloy-genesis" -version = "1.0.19" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "696a83af273bfc512e02693bd4b5056c8c57898328bd0ce594013fb864de4dcf" +checksum = "33ba1cbc25a07e0142e8875fcbe80e1fdb02be8160ae186b90f4b9a69a72ed2b" dependencies = [ "alloy-eips", "alloy-primitives", "alloy-serde", "alloy-trie", "serde", + "serde_with", ] [[package]] @@ -415,6 +490,18 @@ dependencies = [ "wasmtimer", ] +[[package]] +name = "alloy-rpc-types" +version = "1.0.30" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "39676beaa50db545cf15447fc94ec5513b64e85a48357a0625b9a04aef08a910" +dependencies = [ + "alloy-primitives", + "alloy-rpc-types-eth", + "alloy-serde", + "serde", +] + [[package]] name = "alloy-rpc-types-any" version = "1.0.30" @@ -495,9 +582,9 @@ dependencies = [ [[package]] name = "alloy-signer-local" -version = "1.0.19" +version = "1.0.30" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6611724477b6b914202392c2f2d9dfd5c88e4eb8b1422ef90ac6cda3649b62a6" +checksum = "7bdeec36c8d9823102b571b3eab8b323e053dc19c12da14a9687bd474129bf2a" dependencies = [ "alloy-consensus", "alloy-network", @@ -529,6 +616,7 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "34d99282e7c9ef14eb62727981a985a01869e586d1dec729d3bb33679094c100" dependencies = [ + "alloy-json-abi", "alloy-sol-macro-input", "const-hex", "heck", @@ -547,12 +635,14 @@ version = "1.2.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "eda029f955b78e493360ee1d7bd11e1ab9f2a220a5715449babc79d6d0a01105" dependencies = [ + "alloy-json-abi", "const-hex", "dunce", "heck", "macro-string", "proc-macro2", "quote", + "serde_json", "syn 2.0.104", "syn-solidity", ] @@ -5414,6 +5504,7 @@ checksum = "734676eb262c623cec13c3155096e08d1f8f29adce39ba17948b18dad1e54142" name = "summit" version = "0.0.0" dependencies = [ + "alloy", "alloy-consensus", "alloy-eips", "alloy-network", @@ -5436,6 +5527,7 @@ dependencies = [ "console-subscriber", "dashmap 6.1.0", "dirs 6.0.0", + "ethereum_ssz", "eyre", "futures", "governor", @@ -5508,6 +5600,7 @@ dependencies = [ "commonware-codec", "commonware-consensus", "commonware-cryptography", + "commonware-resolver", "commonware-runtime", "commonware-storage", "commonware-utils", @@ -5529,6 +5622,7 @@ dependencies = [ "axum 0.8.4", "commonware-codec", "commonware-cryptography", + "commonware-runtime", "commonware-utils", "dirs 5.0.1", "ethereum_ssz", @@ -5570,6 +5664,7 @@ version = "0.0.0" dependencies = [ "alloy-consensus", "alloy-eips", + "alloy-genesis", "alloy-primitives", "alloy-provider", "alloy-rpc-types-engine", @@ -5579,17 +5674,22 @@ dependencies = [ "commonware-codec", "commonware-consensus", "commonware-cryptography", + "commonware-p2p", "commonware-resolver", + "commonware-runtime", "commonware-utils", "dirs 6.0.0", "ethereum_ssz", "futures", + "libc", "op-alloy-network", "op-alloy-rpc-types-engine", + "rand 0.8.5", "serde", "serde_json", "toml", "tracing", + "url", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index 8c3f56d..759bef2 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -27,6 +27,7 @@ commonware-macros = "0.0.62" alloy-consensus = "1.0.12" alloy-eips = { version = "1.0.19", features = ["ssz"] } +alloy-genesis = "1.0.9" alloy-network = "1.0.9" alloy-node-bindings = "1.0.9" alloy-rpc-client = { version = "1.0.9", features = [ @@ -41,6 +42,7 @@ alloy-transport-http = { version = "1.0.9", features = ["hyper", "jwt-auth"] } alloy-provider = { version = "1.0.9", features = ["hyper", "engine-api","ipc"] } alloy-primitives = "1.2.1" alloy-transport-ipc = { version = "1.0.9" } +url = "2.5" futures = "0.3.31" http = "1.0" diff --git a/application/src/actor.rs b/application/src/actor.rs index 6613ebf..d3f067d 100644 --- a/application/src/actor.rs +++ b/application/src/actor.rs @@ -7,7 +7,7 @@ use commonware_macros::select; use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; use commonware_utils::SystemTimeExt; use futures::{ - StreamExt as _, + FutureExt, StreamExt as _, channel::{mpsc, oneshot}, future::{self, Either, try_join}, }; @@ -87,105 +87,117 @@ impl { - info!("Handling message Genesis"); - let _ = response.send(self.genesis_hash.into()); - } - Message::Propose { - view, - parent, - mut response, - } => { - info!("{rand_id} Handling message Propose view: {}", view); - - let built = self.built_block.clone(); - select! { - res = self.handle_proposal(parent, &mut syncer,&mut finalizer, view) => { - match res { - Ok(block) => { - // store block - let digest = block.digest(); - { - let mut built = built.lock().expect("locked poisoned"); - *built = Some(block); + let mut signal = self.context.stopped().fuse(); + loop { + select! { + message = self.mailbox.next() => { + let Some(message) = message else { + break; + }; + match message { + Message::Genesis { response } => { + info!("Handling message Genesis"); + let _ = response.send(self.genesis_hash.into()); + } + Message::Propose { + view, + parent, + mut response, + } => { + info!("{rand_id} Handling message Propose view: {}", view); + + let built = self.built_block.clone(); + select! { + res = self.handle_proposal(parent, &mut syncer,&mut finalizer, view) => { + match res { + Ok(block) => { + // store block + let digest = block.digest(); + { + let mut built = built.lock().expect("locked poisoned"); + *built = Some(block); + } + + // send digest to consensus + let _ = response.send(digest); + }, + Err(e) => warn!("Failed to create a block for height {view}: {e}") } - - // send digest to consensus - let _ = response.send(digest); }, - Err(e) => warn!("Failed to create a block for height {view}: {e}") - } - }, - _ = oneshot_closed_future(&mut response) => { - // simplex dropped receiver - warn!(view, "proposal aborted"); + _ = oneshot_closed_future(&mut response) => { + // simplex dropped receiver + warn!(view, "proposal aborted"); + } + } + } + Message::Broadcast { payload } => { + info!("{rand_id} Handling message Broadcast"); + let Some(built_block) = + self.built_block.lock().expect("poisoned mutex").clone() + else { + warn!("Asked to broadcast a block with no built block"); + continue; + }; + // todo(dalton): This should be a hard assert but for testing im just going to log + if payload != built_block.digest() { + error!( + "The payload we were asked to broadcast is different then our built block" + ); } - } - } - Message::Broadcast { payload } => { - info!("{rand_id} Handling message Broadcast"); - let Some(built_block) = - self.built_block.lock().expect("poisoned mutex").clone() - else { - warn!("Asked to broadcast a block with no built block"); - continue; - }; - // todo(dalton): This should be a hard assert but for testing im just going to log - if payload != built_block.digest() { - error!( - "The payload we were asked to broadcast is different then our built block" - ); - } - - syncer.broadcast(built_block).await; - } - - Message::Verify { - view, - parent, - payload, - mut response, - } => { - info!("{rand_id} Handling message Verify view: {}", view); - // Get the parent block - let parent_request = if parent.1 == self.genesis_hash.into() { - Either::Left(future::ready(Ok(Block::genesis(self.genesis_hash)))) - } else { - Either::Right(syncer.get(Some(parent.0), parent.1).await) - }; - - let block_request = syncer.get(None, payload).await; - - // Wait for the blocks to be available or the request to be cancelled in a separate task (to - // continue processing other messages) - self.context.with_label("verify").spawn({ - let mut syncer = syncer.clone(); - move |_| async move { - let requester = try_join(parent_request, block_request); - select! { - result = requester => { - let (parent, block) = result.unwrap(); - - if handle_verify(&block, parent) { - // persist valid block - syncer.store_verified(view, block).await; + syncer.broadcast(built_block).await; + } - // respond - let _ = response.send(true); - } else { - info!("Unsucceful vote"); - let _ = response.send(false); + Message::Verify { + view, + parent, + payload, + mut response, + } => { + info!("{rand_id} Handling message Verify view: {}", view); + // Get the parent block + let parent_request = if parent.1 == self.genesis_hash.into() { + Either::Left(future::ready(Ok(Block::genesis(self.genesis_hash)))) + } else { + Either::Right(syncer.get(Some(parent.0), parent.1).await) + }; + + let block_request = syncer.get(None, payload).await; + + // Wait for the blocks to be available or the request to be cancelled in a separate task (to + // continue processing other messages) + self.context.with_label("verify").spawn({ + let mut syncer = syncer.clone(); + move |_| async move { + let requester = try_join(parent_request, block_request); + select! { + result = requester => { + let (parent, block) = result.unwrap(); + + if handle_verify(&block, parent) { + + // persist valid block + syncer.store_verified(view, block).await; + + // respond + let _ = response.send(true); + } else { + info!("Unsucceful vote"); + let _ = response.send(false); + } + }, + _ = oneshot_closed_future(&mut response) => { + warn!(view, "verify aborted"); + } } - }, - _ = oneshot_closed_future(&mut response) => { - warn!(view, "verify aborted"); } - } + }); } - }); + } + }, + sig = &mut signal => { + info!("application terminated: {}", sig.unwrap()); + break; } } } diff --git a/finalizer/Cargo.toml b/finalizer/Cargo.toml index d843ee7..5a4aab1 100644 --- a/finalizer/Cargo.toml +++ b/finalizer/Cargo.toml @@ -12,6 +12,7 @@ commonware-consensus.workspace = true commonware-cryptography.workspace = true commonware-runtime.workspace = true commonware-storage.workspace = true +commonware-resolver.workspace = true commonware-utils.workspace = true alloy-eips.workspace = true diff --git a/finalizer/src/actor.rs b/finalizer/src/actor.rs index 0fce48b..d9680e5 100644 --- a/finalizer/src/actor.rs +++ b/finalizer/src/actor.rs @@ -7,11 +7,12 @@ use alloy_primitives::hex; use alloy_rpc_types_engine::ForkchoiceState; use commonware_codec::{DecodeExt as _, ReadExt as _}; use commonware_cryptography::{Hasher, Sha256, Verifier as _}; +use commonware_resolver::p2p::Coordinator; use commonware_runtime::{Clock, Handle, Metrics, Spawner, Storage}; use commonware_storage::translator::TwoCap; use commonware_utils::{NZU64, NZUsize, hex}; use futures::channel::{mpsc, oneshot}; -use futures::{StreamExt as _, select}; +use futures::{FutureExt, StreamExt as _, select}; #[cfg(feature = "prom")] use metrics::{counter, histogram}; #[cfg(debug_assertions)] @@ -25,6 +26,7 @@ use summit_types::account::{ValidatorAccount, ValidatorStatus}; use summit_types::checkpoint::Checkpoint; use summit_types::consensus_state_query::{ConsensusStateRequest, ConsensusStateResponse}; use summit_types::execution_request::ExecutionRequest; +use summit_types::network_oracle::NetworkOracle; use summit_types::registry::Registry; use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch}; use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature}; @@ -32,10 +34,12 @@ use summit_types::{BlockEnvelope, EngineClient, consensus_state::ConsensusState} use tracing::{info, warn}; const WRITE_BUFFER: NonZero = NZUsize!(1024 * 1024); +const REGISTRY_CHANGE_VIEW_DELTA: u64 = 5; pub struct Finalizer< R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, C: EngineClient, + O: NetworkOracle, > { mailbox: mpsc::Receiver, pending_height_notifys: BTreeMap>>, @@ -51,12 +55,16 @@ pub struct Finalizer< validator_minimum_stake: u64, // in gwei validator_withdrawal_period: u64, // in blocks validator_onboarding_limit_per_block: usize, + oracle: O, } -impl - Finalizer +impl< + R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, + C: EngineClient, + O: NetworkOracle, +> Finalizer { - pub async fn new(context: R, cfg: FinalizerConfig) -> (Self, FinalizerMailbox) { + pub async fn new(context: R, cfg: FinalizerConfig) -> (Self, FinalizerMailbox) { let (tx, rx) = mpsc::channel(cfg.mailbox_size); // todo(dalton) pull mailbox size from config let state_cfg = StateConfig { log_journal_partition: format!("{}-finalizer_state-log", cfg.db_prefix), @@ -72,17 +80,13 @@ impl = None; + let mut signal = self.context.stopped().fuse(); loop { select! { msg = rx_finalize_blocks.next() => { @@ -162,6 +168,10 @@ impl { + info!("finalizer terminated: {}", sig.unwrap()); + break; + } } } } @@ -321,13 +331,30 @@ impl { +pub struct FinalizerConfig> { pub mailbox_size: usize, pub db_prefix: String, pub engine_client: C, pub registry: Registry, + pub oracle: O, pub epoch_num_of_blocks: u64, pub validator_max_withdrawals_per_block: usize, pub validator_minimum_stake: u64, // in gwei @@ -17,7 +18,7 @@ pub struct FinalizerConfig { pub buffer_pool: PoolRef, pub genesis_hash: [u8; 32], /// Optional initial state to initialize the finalizer with - pub initial_state: Option, + pub initial_state: ConsensusState, /// Protocol version for the consensus protocol pub protocol_version: u32, } diff --git a/node/Cargo.toml b/node/Cargo.toml index 2730dc3..5e67b04 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -11,6 +11,11 @@ path = "src/bin/testnet.rs" name = "genesis" path = "src/bin/genesis.rs" +[[bin]] +name = "stake-and-checkpoint" +path = "src/bin/stake_and_checkpoint.rs" +required-features = ["e2e"] + [[bin]] name = "block-fetcher" path = "src/bin/block_fetcher.rs" @@ -46,6 +51,11 @@ dirs.workspace = true tokio = { version = "1.45.1", features = ["sync"] } tokio-util.workspace = true +# stake and checkpoint bin deps +alloy = "1.0.23" +ethereum_ssz.workspace = true +reqwest.workspace = true + # testnet bin deps alloy-node-bindings.workspace = true alloy-network = { workspace = true } @@ -113,3 +123,4 @@ tokio-console = ["console-subscriber"] jemalloc = ["dep:tikv-jemalloc-ctl"] base-bench = ["summit-application/base-bench", "summit-types/base-bench"] bench = ["summit-application/bench", "summit-types/bench"] +e2e = [] diff --git a/node/src/args.rs b/node/src/args.rs index 24043bc..7472eb1 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -13,6 +13,10 @@ use commonware_runtime::{Handle, Metrics as _, Runner, Spawner as _, tokio}; use summit_rpc::{PathSender, start_rpc_server, start_rpc_server_for_genesis}; use tokio_util::sync::CancellationToken; +use alloy_primitives::{Address, B256}; +use alloy_rpc_types_engine::ForkchoiceState; +use commonware_codec::ReadExt; +use commonware_utils::from_hex_formatted; use futures::{channel::oneshot, future::try_join_all}; use governor::Quota; use std::{ @@ -26,8 +30,13 @@ use summit_types::engine_client::base_benchmarking::HistoricalEngineClient; #[cfg(feature = "bench")] use summit_types::engine_client::benchmarking::EthereumHistoricalEngineClient; +use crate::config::MAILBOX_SIZE; +use crate::engine::VALIDATOR_MINIMUM_STAKE; #[cfg(not(any(feature = "bench", feature = "base-bench")))] use summit_types::RethEngineClient; +use summit_types::account::{ValidatorAccount, ValidatorStatus}; +use summit_types::consensus_state::ConsensusState; +use summit_types::network_oracle::DiscoveryOracle; use summit_types::{Genesis, PublicKey, utils::get_expanded_path}; use tracing::{Level, error}; @@ -107,6 +116,9 @@ pub struct RunFlags { default_value_t = String::from("./example_genesis.toml") )] pub genesis_path: String, + /// IP address for this node (optional, will use genesis if not provided) + #[arg(long)] + pub ip: Option, } impl Command { @@ -190,7 +202,18 @@ impl Command { .map(|v| v.try_into().expect("Invalid validator in genesis")) .collect(); committee.sort(); - let peers: Vec = committee.iter().map(|v| v.0.clone()).collect(); + + let initial_state = get_initial_state(&genesis, &committee, None); + let mut peers: Vec = initial_state + .validator_accounts + .iter() + .filter(|(_, acc)| !(acc.status == ValidatorStatus::Inactive)) + .map(|(v, _)| { + let mut key_bytes = &v[..]; + PublicKey::read(&mut key_bytes).expect("failed to parse public key") + }) + .collect(); + peers.sort(); let engine_ipc_path = get_expanded_path(&flags.engine_ipc_path) .expect("failed to expand engine ipc path"); @@ -229,26 +252,28 @@ impl Command { let engine_client = RethEngineClient::new(engine_ipc_path.to_string_lossy().to_string()).await; - let config = EngineConfig::get_engine_config( - engine_client, - signer, - peers.clone(), - flags.db_prefix.clone(), - &genesis, - None, - ) - .unwrap(); + let our_ip = if let Some(ref ip_str) = flags.ip { + ip_str + .parse::() + .expect("Invalid IP address format") + } else { + committee + .iter() + .find_map(|v| { + if v.0 == signer.public_key() { + Some(v.1) + } else { + None + } + }) + .expect("This node is not on the committee") + }; - let our_ip = committee - .iter() - .find_map(|v| { - if v.0 == config.signer.public_key() { - Some(v.1) - } else { - None - } - }) - .expect("This node is not on the committee"); + let our_public_key = signer.public_key(); + if !committee.iter().any(|(key, _)| key == &our_public_key) { + committee.push((our_public_key, our_ip)); + committee.sort(); + } // Configure telemetry let log_level = Level::from_str(&flags.log_level).expect("Invalid log level"); @@ -279,29 +304,39 @@ impl Command { .parse::() .unwrap(); let config = MetricServerConfig::new(listen_addr, hooks); - MetricServer::new(config).serve().await.unwrap(); + let stop_signal = context.stopped(); + MetricServer::new(config).serve(stop_signal).await.unwrap(); } // configure network - let mut p2p_cfg = authenticated::discovery::Config::aggressive( - config.signer.clone(), + signer.clone(), genesis.namespace.as_bytes(), SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), flags.port), our_ip, committee.clone(), genesis.max_message_size_bytes as usize, ); - p2p_cfg.mailbox_size = config.mailbox_size; + p2p_cfg.mailbox_size = MAILBOX_SIZE; // Start p2p let (mut network, mut oracle) = authenticated::discovery::Network::new(context.with_label("network"), p2p_cfg); // Provide authorized peers - oracle - .register(0, committee.into_iter().map(|(key, _)| key).collect()) - .await; + oracle.register(0, peers.clone()).await; + + let oracle = DiscoveryOracle::new(oracle); + let config = EngineConfig::get_engine_config( + engine_client, + oracle, + signer, + peers, + flags.db_prefix.clone(), + &genesis, + initial_state, + ) + .unwrap(); // Register pending channel let pending_limit = Quota::per_second(NonZeroU32::new(128).unwrap()); @@ -332,8 +367,11 @@ impl Command { // Start RPC server let key_path = flags.key_path.clone(); let rpc_port = flags.rpc_port; + let stop_signal = context.stopped(); let rpc_handle = context.with_label("rpc").spawn(move |_context| async move { - if let Err(e) = start_rpc_server(finalizer_mailbox, key_path, rpc_port).await { + if let Err(e) = + start_rpc_server(finalizer_mailbox, key_path, rpc_port, stop_signal).await + { error!("RPC server failed: {}", e); } }); @@ -346,7 +384,11 @@ impl Command { } } -pub fn run_node_with_runtime(context: tokio::Context, flags: RunFlags) -> Handle<()> { +pub fn run_node_with_runtime( + context: tokio::Context, + flags: RunFlags, + checkpoint: Option, +) -> Handle<()> { context.spawn(async move |context| { let signer = expect_signer(&flags.key_path); @@ -383,7 +425,17 @@ pub fn run_node_with_runtime(context: tokio::Context, flags: RunFlags) -> Handle .collect(); committee.sort(); - let peers: Vec = committee.iter().map(|v| v.0.clone()).collect(); + let initial_state = get_initial_state(&genesis, &committee, checkpoint); + let mut peers: Vec = initial_state + .validator_accounts + .iter() + .filter(|(_, acc)| !(acc.status == ValidatorStatus::Inactive)) + .map(|(v, _)| { + let mut key_bytes = &v[..]; + PublicKey::read(&mut key_bytes).expect("failed to parse public key") + }) + .collect(); + peers.sort(); let engine_ipc_path = get_expanded_path(&flags.engine_ipc_path).expect("failed to expand engine ipc path"); @@ -419,44 +471,59 @@ pub fn run_node_with_runtime(context: tokio::Context, flags: RunFlags) -> Handle let engine_client = RethEngineClient::new(engine_ipc_path.to_string_lossy().to_string()).await; - let config = EngineConfig::get_engine_config( - engine_client, - signer, - peers.clone(), - flags.db_prefix.clone(), - &genesis, - None, - ) - .unwrap(); + let our_ip = if let Some(ref ip_str) = flags.ip { + ip_str + .parse::() + .expect("Invalid IP address format") + } else { + committee + .iter() + .find_map(|v| { + if v.0 == signer.public_key() { + Some(v.1) + } else { + None + } + }) + .expect("This node is not on the committee") + }; - let our_ip = committee - .iter() - .find_map(|v| { - if v.0 == config.signer.public_key() { - Some(v.1) - } else { - None - } - }) - .expect("This node is not on the committee"); + let our_public_key = signer.public_key(); + if !committee.iter().any(|(key, _)| key == &our_public_key) { + committee.push((our_public_key, our_ip)); + committee.sort(); + } // configure network - - let mut p2p_cfg = authenticated::lookup::Config::aggressive( - config.signer.clone(), + let mut p2p_cfg = authenticated::discovery::Config::aggressive( + signer.clone(), genesis.namespace.as_bytes(), SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), flags.port), our_ip, + committee, genesis.max_message_size_bytes as usize, ); - p2p_cfg.mailbox_size = config.mailbox_size; + p2p_cfg.mailbox_size = MAILBOX_SIZE; // Start p2p let (mut network, mut oracle) = - authenticated::lookup::Network::new(context.with_label("network"), p2p_cfg); + authenticated::discovery::Network::new(context.with_label("network"), p2p_cfg); // Provide authorized peers - oracle.register(0, committee).await; + oracle.register(0, peers.clone()).await; + + let oracle = DiscoveryOracle::new(oracle); + + let config = EngineConfig::get_engine_config( + engine_client, + oracle, + signer, + peers, + flags.db_prefix.clone(), + &genesis, + initial_state, + ) + .unwrap(); // Register pending channel let pending_limit = Quota::per_second(NonZeroU32::new(128).unwrap()); @@ -494,17 +561,21 @@ pub fn run_node_with_runtime(context: tokio::Context, flags: RunFlags) -> Handle let listen_addr = format!("0.0.0.0:{}", flags.prom_port) .parse::() .unwrap(); + let stop_signal = context.stopped(); let config = MetricServerConfig::new(listen_addr, hooks); - MetricServer::new(config).serve().await.unwrap(); + MetricServer::new(config).serve(stop_signal).await.unwrap(); } // Start RPC server let key_path = flags.key_path.clone(); let rpc_port = flags.rpc_port; + let stop_signal = context.stopped(); let rpc_handle = context .with_label("rpc_genesis") .spawn(move |_context| async move { - if let Err(e) = start_rpc_server(finalizer_mailbox, key_path, rpc_port).await { + if let Err(e) = + start_rpc_server(finalizer_mailbox, key_path, rpc_port, stop_signal).await + { error!("RPC server failed: {}", e); } }); @@ -515,3 +586,45 @@ pub fn run_node_with_runtime(context: tokio::Context, flags: RunFlags) -> Handle } }) } + +fn get_initial_state( + genesis: &Genesis, + committee: &Vec<(PublicKey, SocketAddr)>, + checkpoint: Option, +) -> ConsensusState { + let genesis_hash: [u8; 32] = from_hex_formatted(&genesis.eth_genesis_hash) + .map(|hash_bytes| hash_bytes.try_into()) + .expect("bad eth_genesis_hash") + .expect("bad eth_genesis_hash"); + let genesis_hash: B256 = genesis_hash.into(); + checkpoint.unwrap_or_else(|| { + let forkchoice = ForkchoiceState { + head_block_hash: genesis_hash, + safe_block_hash: genesis_hash, + finalized_block_hash: genesis_hash, + }; + let mut state = ConsensusState::new(forkchoice); + // Add the genesis nodes to the consensus state with the minimum stake balance. + for (pubkey, _) in committee { + let pubkey_bytes: [u8; 32] = pubkey + .as_ref() + .try_into() + .expect("Public key must be 32 bytes"); + let account = ValidatorAccount { + // TODO(matthias): we have to add a withdrawal address to the genesis + withdrawal_credentials: Address::ZERO, + balance: VALIDATOR_MINIMUM_STAKE, + pending_withdrawal_amount: 0, + status: ValidatorStatus::Active, + // TODO(matthias): this index is comes from the deposit contract. + // Since there is no deposit transaction for the genesis nodes, the index will still be + // 0 for the deposit contract. Right now we only use this index to avoid counting the same deposit request twice. + // Since we set the index to 0 here, we cannot rely on the uniqueness. The first actual deposit request will have + // index 0 as well. + last_deposit_index: 0, + }; + state.validator_accounts.insert(pubkey_bytes, account); + } + state + }) +} diff --git a/node/src/bin/genesis.rs b/node/src/bin/genesis.rs index 4895446..4f3b630 100644 --- a/node/src/bin/genesis.rs +++ b/node/src/bin/genesis.rs @@ -11,7 +11,7 @@ use std::fs; use std::path::Path; use summit_types::PublicKey; -const DEFAULT_GENESIS_FILE: &'static str = "./example_genesis.toml"; +const DEFAULT_GENESIS_FILE: &str = "./example_genesis.toml"; #[derive(Debug, Serialize, Deserialize)] pub struct GenesisConfig { @@ -44,8 +44,7 @@ pub struct Validator { impl Validator { pub fn ed25519_pubkey(&self) -> PublicKey { let pubkey_bytes = from_hex(&self.public_key).unwrap(); - let pubkey = PublicKey::decode(&pubkey_bytes[..]).unwrap(); - pubkey + PublicKey::decode(&pubkey_bytes[..]).unwrap() } } @@ -110,7 +109,7 @@ fn main() -> Result<(), Box> { // Write the updated genesis config let updated_genesis = toml::to_string_pretty(&genesis_config)?; - fs::write(&format!("{}/genesis.toml", args.out_dir), updated_genesis)?; + fs::write(format!("{}/genesis.toml", args.out_dir), updated_genesis)?; println!("Updated genesis config at {}", args.out_dir); println!("\nSetup complete for {} nodes", node_count); diff --git a/node/src/bin/stake_and_checkpoint.rs b/node/src/bin/stake_and_checkpoint.rs new file mode 100644 index 0000000..1cb8baf --- /dev/null +++ b/node/src/bin/stake_and_checkpoint.rs @@ -0,0 +1,827 @@ +/* +This bin will start 4 reth nodes with an instance of consensus for each and keep running so you can run other tests or submit transactions + +Their rpc endpoints are localhost:8545-node_number +node0_port = 8545 +node1_port = 8544 +... +node3_port = 8542 + + +*/ +use alloy::hex::FromHex; +use alloy::network::{EthereumWallet, TransactionBuilder}; +use alloy::providers::{Provider, ProviderBuilder, WalletProvider}; +use alloy::rpc::types::TransactionRequest; +use alloy::signers::local::PrivateKeySigner; +use alloy_primitives::{Address, U256, keccak256}; +use clap::Parser; +use commonware_cryptography::Sha256; +use commonware_cryptography::{Hasher, PrivateKeyExt, Signer, ed25519::PrivateKey}; +use commonware_runtime::{Clock, Metrics as _, Runner as _, Spawner as _, tokio as cw_tokio}; +use commonware_utils::from_hex_formatted; +use futures::{FutureExt, pin_mut}; +use ssz::Decode; +use std::collections::VecDeque; +use std::time::Duration; +use std::{ + fs, + io::{BufRead as _, BufReader, Write as _}, + net::{IpAddr, Ipv4Addr, SocketAddr}, + path::PathBuf, + str::FromStr as _, + thread::JoinHandle, +}; +use summit::args::{RunFlags, run_node_with_runtime}; +use summit::engine::{PROTOCOL_VERSION, VALIDATOR_MINIMUM_STAKE}; +use summit_types::checkpoint::Checkpoint; +use summit_types::consensus_state::ConsensusState; +use summit_types::execution_request::DepositRequest; +use summit_types::reth::Reth; +use tokio::sync::mpsc; +use tracing::Level; + +const NUM_NODES: u16 = 4; + +struct NodeRuntime { + thread: JoinHandle<()>, + stop_tx: mpsc::UnboundedSender<()>, +} + +#[derive(Parser, Debug)] +struct Args { + /// Path to the directory containing historical blocks for benchmarking + #[cfg(any(feature = "base-bench", feature = "bench"))] + #[arg(long)] + pub bench_block_dir: Option, + /// Path to the log directory + #[arg(long)] + pub log_dir: Option, + /// Path to the data directory for test + #[arg(long, default_value = "/tmp/summit_checkpointing_test")] + pub data_dir: String, + /// Height at which the joining node will download the checkpoint + #[arg(long, default_value_t = 1000)] + pub checkpoint_height: u64, + /// Height that all nodes must reach for the test to succeed + #[arg(long, default_value_t = 2000)] + pub stop_height: u64, +} + +fn main() -> Result<(), Box> { + let args = Args::parse(); + + // Remove data_dir if it exists to start fresh + let data_dir_path = PathBuf::from(&args.data_dir); + if data_dir_path.exists() { + fs::remove_dir_all(&data_dir_path)?; + } + + // Create log directory if specified + if let Some(ref log_dir) = args.log_dir { + fs::remove_dir_all(log_dir)?; + fs::create_dir_all(log_dir)?; + } + + let storage_dir = data_dir_path.join("stores"); + + let cfg = cw_tokio::Config::default() + .with_tcp_nodelay(Some(true)) + .with_worker_threads(16) + .with_storage_directory(storage_dir) + .with_catch_panics(false); + let executor = cw_tokio::Runner::new(cfg); + + executor.start(|context| { + async move { + // Configure telemetry + let log_level = Level::from_str("info").expect("Invalid log level"); + cw_tokio::telemetry::init( + context.with_label("metrics"), + cw_tokio::telemetry::Logging { + level: log_level, + // todo: dont know what this does + json: false, + }, + Some(SocketAddr::new(IpAddr::V4(Ipv4Addr::UNSPECIFIED), 6969)), + None, + ); + + // Vec to hold all the join handles + let mut handles = VecDeque::new(); + let mut node_runtimes: Vec = Vec::new(); + // let mut read_threads = Vec::new(); + + // Start all nodes at the beginning + for x in 0..NUM_NODES { + // Start Reth + println!("******* STARTING RETH FOR NODE {x}"); + + // Create data directory if it doesn't exist + let data_dir = format!("{}/node{}/data/reth_db", args.data_dir, x); + fs::create_dir_all(&data_dir).expect("Failed to create data directory"); + + // Build and spawn reth instance + let reth_builder = Reth::new() + .instance(x + 1) + .keep_stdout() + // .genesis(serde_json::from_str(&genesis_str).expect("invalid genesis")) + .data_dir(data_dir) + .arg("--enclave.mock-server") + .arg("--enclave.endpoint-port") + .arg(format!("1744{x}")) + .arg("--auth-ipc") + .arg("--auth-ipc.path") + .arg(format!("/tmp/reth_engine_api{x}.ipc")) + .arg("--metrics") + .arg(format!("0.0.0.0:{}", 9001 + x)); + + let mut reth = reth_builder.spawn(); + + // Get stdout handle + let stdout = reth.stdout().expect("Failed to get stdout"); + + let log_dir = args.log_dir.clone(); + context.clone().spawn(async move |_| { + let reader = BufReader::new(stdout); + let mut log_file = log_dir.as_ref().map(|dir| { + fs::File::create(format!("{}/node{}.log", dir, x)) + .expect("Failed to create log file") + }); + + for line in reader.lines() { + match line { + Ok(line) => { + if let Some(ref mut file) = log_file { + writeln!(file, "[Node {}] {}", x, line) + .expect("Failed to write to log file"); + } + } + Err(_e) => { + // eprintln!("[Node {}] Error reading line: {}", x, e); + } + } + } + }); + + let _auth_port = reth.auth_port().unwrap(); + + println!("Node {} rpc address: {}", x, reth.http_port()); + + handles.push_back(reth); + + #[allow(unused_mut)] + let mut flags = get_node_flags(x.into()); + + #[cfg(any(feature = "base-bench", feature = "bench"))] + { + flags.bench_block_dir = args.bench_block_dir.clone(); + } + + // Start our consensus engine in its own runtime/thread + let (stop_tx, mut stop_rx) = mpsc::unbounded_channel(); + let data_dir_clone = args.data_dir.clone(); + let thread = std::thread::spawn(move || { + let storage_dir = PathBuf::from(&data_dir_clone).join("stores").join(format!("node{}", x)); + let cfg = cw_tokio::Config::default() + .with_tcp_nodelay(Some(true)) + .with_worker_threads(4) + .with_storage_directory(storage_dir) + .with_catch_panics(true); + let executor = cw_tokio::Runner::new(cfg); + + executor.start(|node_context| async move { + let node_handle = node_context.clone().spawn(|ctx| async move { + run_node_with_runtime(ctx, flags, None).await.unwrap(); + }); + + // Wait for stop signal or node completion + let stop_fut = stop_rx.recv().fuse(); + pin_mut!(stop_fut); + futures::select! { + _ = stop_fut => { + println!("Node {} received stop signal, shutting down runtime...", x); + node_context.stop(0, Some(Duration::from_secs(30))).await.unwrap(); + } + _ = node_handle.fuse() => { + println!("Node {} handle completed", x); + } + } + }); + }); + + node_runtimes.push(NodeRuntime { thread, stop_tx }); + } + + // Wait a bit for nodes to be ready + context.sleep(Duration::from_secs(5)).await; + + // Send a deposit transaction to node0 + println!("Sending deposit transaction to node 0"); + let node0_http_port = handles[0].http_port(); + let node0_url = format!("http://localhost:{}", node0_http_port); + + // Create a test private key and signer + let private_key = "0xac0974bec39a17e36ba4a6b4d238ff944bacb478cbed5efcae784d7bf4f2ff80"; + let signer = PrivateKeySigner::from_str(private_key).expect("Failed to create signer"); + let wallet = EthereumWallet::from(signer); + + // Create provider with wallet + let provider = ProviderBuilder::new() + .wallet(wallet) + .connect_http(node0_url.parse().expect("Invalid URL")); + + // Deposit contract address (you'll need to set this to the actual address) + let deposit_contract = + Address::from_hex("0x00000000219ab540356cBB839Cbe05303d7705Fa").unwrap(); + + // Create test deposit parameters + // Generate a deterministic ed25519 key pair and get the public key + let ed25519_private_key = PrivateKey::from_seed(100); + let ed25519_public_key = ed25519_private_key.public_key(); + let ed25519_pubkey_bytes: [u8; 32] = ed25519_public_key.to_vec().try_into().unwrap(); + + // Withdrawal credentials (32 bytes) - 0x01 prefix for execution address withdrawal + // Format: 0x01 || 0x00...00 (11 bytes) || execution_address (20 bytes) + let mut withdrawal_credentials = [0u8; 32]; + withdrawal_credentials[0] = 0x01; // ETH1 withdrawal prefix + // Bytes 1-11 remain zero + // Set the last 20 bytes to the withdrawal address (using the same address as the sender) + let withdrawal_address = + Address::from_hex("0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266").unwrap(); + withdrawal_credentials[12..32].copy_from_slice(withdrawal_address.as_slice()); + + // Generate a random BLS signature (96 bytes) - for testing purposes only + + let amount = VALIDATOR_MINIMUM_STAKE; + + let deposit_request = DepositRequest { + pubkey: ed25519_public_key, + withdrawal_credentials, + amount, + signature: [0; 64], + index: 0, // not included in the signature + }; + + let protocol_version_digest = Sha256::hash(&PROTOCOL_VERSION.to_le_bytes()); + let message = deposit_request.as_message(protocol_version_digest); + //let signature: [u8; 64] = ed25519_private_key.sign(None, &message).as_ref().try_into().unwrap(); + let signature = ed25519_private_key.sign(None, &message); + let mut padded_signature = [0u8; 96]; + padded_signature[32..96].copy_from_slice(signature.as_ref()); + + /* + pub struct DepositRequest { + pub pubkey: PublicKey, // Validator ED25519 public key + pub withdrawal_credentials: [u8; 32], // Either hash of the BLS pubkey, or Ethereum address + pub amount: u64, // Amount in gwei + pub signature: [u8; 64], // ED signature + pub index: u64, + } + */ + + // Convert VALIDATOR_MINIMUM_STAKE (in gwei) to wei + let deposit_amount = U256::from(amount) * U256::from(1_000_000_000u64); // gwei to wei + + send_deposit_transaction( + &provider, + deposit_contract, + deposit_amount, + &ed25519_pubkey_bytes, + &withdrawal_credentials, + &padded_signature, + 0, // nonce + ) + .await + .expect("failed to send deposit transaction"); + + // Wait for nodes to reach checkpoint height + println!( + "Waiting for nodes to reach checkpoint height {}", + args.checkpoint_height + ); + let node0_rpc_port = get_node_flags(0).rpc_port; + loop { + match get_latest_height(node0_rpc_port).await { + Ok(height) if height >= args.checkpoint_height => { + println!("Nodes reached checkpoint height {}", height); + break; + } + Ok(height) => { + println!("Node 0 at height {}", height); + } + Err(e) => { + println!("Error querying height: {}", e); + } + } + context.sleep(std::time::Duration::from_secs(1)).await; + } + + // Retrieve checkpoint from first node + println!("Retrieving checkpoint from node 0"); + let checkpoint_state = loop { + match get_checkpoint(node0_rpc_port).await { + Ok(Some(checkpoint)) => { + let state = ConsensusState::try_from(&checkpoint) + .expect("Failed to parse checkpoint"); + println!("Retrieved checkpoint at height {}", state.latest_height); + break state; + } + Ok(None) => { + println!("Checkpoint not yet available"); + } + Err(e) => { + println!("Error retrieving checkpoint: {}", e); + } + } + context.sleep(Duration::from_secs(1)).await; + }; + + // Start the joining Reth node + let x = NUM_NODES; + let num_nodes = NUM_NODES + 1; + println!("******* STARTING RETH FOR NODE {} (joining node)", x); + let data_dir = format!("{}/node{}/data/reth_db", args.data_dir, x); + fs::create_dir_all(&data_dir).expect("Failed to create data directory"); + + // Copy db and static_files from node0 to initialize the joining node + + // Stop node0's consensus engine first (to avoid IPC errors) + let source_node = 0; + println!("Stopping node{} consensus engine...", source_node); + let node0_runtime = node_runtimes.remove(source_node); + + // Send stop signal and wait for runtime to shut down gracefully + node0_runtime.stop_tx.send(()).expect("Failed to send stop signal"); + println!("Waiting for node{} runtime to shut down...", source_node); + let _ = context.clone().spawn_blocking(false, move |_| { + node0_runtime.thread.join().expect("Failed to join node0 thread"); + }).await; + + // Give OS time to release ports (P2P sockets can take time to close) + println!("Waiting for ports to be released..."); + context.sleep(Duration::from_secs(3)).await; + + // Stop source reth instance and wait for graceful shutdown + let mut snapshot_reth = handles.pop_front().expect("No reth instance to snapshot"); + println!("Sending SIGTERM to node{} Reth and waiting for shutdown...", source_node); + snapshot_reth.terminate_and_wait().expect("Failed to terminate reth"); + println!("Node{} shut down successfully", source_node); + + let source_data_dir = format!("{}/node{}/data/reth_db", args.data_dir, source_node); + + println!("Copying db from node{} to node{}", source_node, x); + let source_db = format!("{}/db", source_data_dir); + let dest_db = format!("{}/db", data_dir); + copy_dir_all(&source_db, &dest_db).expect("Failed to copy db directory"); + + println!("Copying static_files from node{} to node{}", source_node, x); + let source_static = format!("{}/static_files", source_data_dir); + let dest_static = format!("{}/static_files", data_dir); + copy_dir_all(&source_static, &dest_static) + .expect("Failed to copy static_files directory"); + + // Restart nodeß's reth instance + //let reth_builder = Reth::new() + // .instance((source_node + 1) as u16) + // .keep_stdout() + // // .genesis(serde_json::from_str(&genesis_str).expect("invalid genesis")) + // .data_dir(source_data_dir.clone()) + // .arg("--enclave.mock-server") + // .arg("--enclave.endpoint-port") + // .arg(format!("1744{source_node}")) + // .arg("--auth-ipc") + // .arg("--auth-ipc.path") + // .arg(format!("/tmp/reth_engine_api{source_node}.ipc")) + // .arg("--metrics") + // .arg(format!("0.0.0.0:{}", 9001 + source_node)); + //let reth = reth_builder.spawn(); + //handles.push_front(reth); + + // Restart node0's consensus engine in a new runtime/thread + //println!("Restarting node{} consensus engine...", source_node); + //let (stop_tx, mut stop_rx) = mpsc::unbounded_channel(); + //let data_dir_clone = args.data_dir.clone(); + //let thread = std::thread::spawn(move || { + // let storage_dir = PathBuf::from(&data_dir_clone).join("stores").join(format!("node{}", source_node)); + // let cfg = cw_tokio::Config::default() + // .with_tcp_nodelay(Some(true)) + // .with_worker_threads(4) + // .with_storage_directory(storage_dir) + // .with_catch_panics(true); + // let executor = cw_tokio::Runner::new(cfg); + + // executor.start(|node_context| async move { + // let flags = get_node_flags(source_node); + // let node_handle = node_context.clone().spawn(|ctx| async move { + // run_node_with_runtime(ctx, flags, None).await.unwrap(); + // }); + + // // Wait for stop signal or node completion + // let stop_fut = stop_rx.recv().fuse(); + // pin_mut!(stop_fut); + // futures::select! { + // _ = stop_fut => { + // println!("Node {} received stop signal, shutting down runtime...", source_node); + // node_context.stop(0, Some(Duration::from_secs(30))).await.unwrap(); + // } + // _ = node_handle.fuse() => { + // println!("Node {} handle completed", source_node); + // } + // } + // }); + //}); + //node_runtimes.insert(source_node, NodeRuntime { thread, stop_tx }); + + // Start node4's reth instance + let reth_builder = Reth::new() + .instance(x + 1) + .keep_stdout() + .data_dir(data_dir) + .arg("--enclave.mock-server") + .arg("--enclave.endpoint-port") + .arg(format!("1744{x}")) + .arg("--auth-ipc") + .arg("--auth-ipc.path") + .arg(format!("/tmp/reth_engine_api{x}.ipc")) + .arg("--metrics") + .arg(format!("0.0.0.0:{}", 9001 + x)); + + let mut reth = reth_builder.spawn(); + + let stdout = reth.stdout().expect("Failed to get stdout"); + + let log_dir = args.log_dir.clone(); + context.clone().spawn(async move |_| { + let reader = BufReader::new(stdout); + let mut log_file = log_dir.as_ref().map(|dir| { + fs::File::create(format!("{}/node{}.log", dir, x)) + .expect("Failed to create log file") + }); + + for line in reader.lines() { + match line { + Ok(line) => { + if let Some(ref mut file) = log_file { + writeln!(file, "[Node {}] {}", x, line) + .expect("Failed to write to log file"); + } + } + Err(_e) => {} + } + } + }); + + println!("Node {} rpc address: {}", x, reth.http_port()); + handles.push_back(reth); + + // Start the 4th consensus node with checkpoint + #[allow(unused_mut)] + let mut flags = get_node_flags(x.into()); + + #[cfg(any(feature = "base-bench", feature = "bench"))] + { + flags.bench_block_dir = args.bench_block_dir.clone(); + } + + let signer_path = format!("{}/node{}/data/key.pem", args.data_dir, x); + let encoded_priv_key = ed25519_private_key.to_string(); + fs::write(&signer_path, encoded_priv_key).expect("Unable to write private key to disk"); + flags.key_path = signer_path; + flags.ip = Some("127.0.0.1:26640".to_string()); + + println!( + "Starting consensus engine for node {} with checkpoint", + ed25519_private_key.public_key() + ); + + // Start the joining node in its own runtime/thread + let (stop_tx, mut stop_rx) = mpsc::unbounded_channel(); + let data_dir_clone = args.data_dir.clone(); + let thread = std::thread::spawn(move || { + let storage_dir = PathBuf::from(&data_dir_clone).join("stores").join(format!("node{}", x)); + let cfg = cw_tokio::Config::default() + .with_tcp_nodelay(Some(true)) + .with_worker_threads(4) + .with_storage_directory(storage_dir) + .with_catch_panics(true); + let executor = cw_tokio::Runner::new(cfg); + + executor.start(|node_context| async move { + let node_handle = node_context.clone().spawn(|ctx| async move { + run_node_with_runtime(ctx, flags, Some(checkpoint_state)).await.unwrap(); + }); + + // Wait for stop signal or node completion + let stop_fut = stop_rx.recv().fuse(); + pin_mut!(stop_fut); + futures::select! { + _ = stop_fut => { + println!("Node {} received stop signal, shutting down runtime...", x); + node_context.stop(0, Some(Duration::from_secs(30))).await.unwrap(); + } + _ = node_handle.fuse() => { + println!("Node {} handle completed", x); + } + } + }); + }); + + node_runtimes.push(NodeRuntime { thread, stop_tx }); + + // Wait for all nodes to continue making progress + println!( + "Waiting for all {} nodes to reach height {}", + num_nodes, args.stop_height + ); + loop { + let mut all_ready = true; + //for idx in 0..num_nodes { + // Skip node0 + for idx in 1..num_nodes { + let rpc_port = get_node_flags(idx as usize).rpc_port; + match get_latest_height(rpc_port).await { + Ok(height) => { + if height < args.stop_height { + all_ready = false; + println!("Node {} at height {}", idx, height); + } + } + Err(e) => { + all_ready = false; + println!("Node {} error: {}", idx, e); + } + } + } + if all_ready { + println!("All nodes have reached target height!"); + break; + } + context.sleep(Duration::from_secs(2)).await; + } + + println!("Test completed successfully!"); + + // Send stop signals to all nodes first + println!("Sending stop signals to all {} nodes...", node_runtimes.len()); + for (idx, node_runtime) in node_runtimes.iter().enumerate() { + println!("Sending stop signal to node index {}...", idx); + let _ = node_runtime.stop_tx.send(()); + } + + // Now wait for all threads to finish + println!("Waiting for all nodes to shut down..."); + for (idx, node_runtime) in node_runtimes.into_iter().enumerate() { + println!("Waiting for node index {} to join...", idx); + let _ = context.clone().spawn_blocking(false, move |_| { + match node_runtime.thread.join() { + Ok(_) => println!("Node index {} thread joined successfully", idx), + Err(e) => println!("Node index {} thread join failed: {:?}", idx, e), + } + }).await; + } + + println!("All nodes shut down cleanly"); + Ok(()) + } + }) +} + +fn copy_dir_all(src: &str, dst: &str) -> std::io::Result<()> { + fs::create_dir_all(dst)?; + for entry in fs::read_dir(src)? { + let entry = entry?; + let ty = entry.file_type()?; + let src_path = entry.path(); + let dst_path = PathBuf::from(dst).join(entry.file_name()); + + if ty.is_dir() { + copy_dir_all( + src_path.to_str().expect("Invalid path"), + dst_path.to_str().expect("Invalid path"), + )?; + } else { + fs::copy(&src_path, &dst_path)?; + } + } + Ok(()) +} + +async fn get_latest_height(rpc_port: u16) -> Result> { + let url = format!("http://localhost:{}/get_latest_height", rpc_port); + let response = reqwest::get(&url).await?.text().await?; + Ok(response.parse()?) +} + +async fn get_checkpoint(rpc_port: u16) -> Result, Box> { + let url = format!("http://localhost:{}/get_checkpoint", rpc_port); + let response = reqwest::get(&url).await; + + match response { + Ok(resp) if resp.status().is_success() => { + let hex_str = resp.text().await?; + let bytes = from_hex_formatted(&hex_str).ok_or("Failed to decode hex")?; + let checkpoint = Checkpoint::from_ssz_bytes(&bytes) + .map_err(|e| format!("Failed to decode checkpoint: {:?}", e))?; + Ok(Some(checkpoint)) + } + _ => Ok(None), + } +} + +async fn send_deposit_transaction

( + provider: &P, + deposit_contract_address: Address, + deposit_amount: U256, + ed25519_pubkey: &[u8; 32], + withdrawal_credentials: &[u8; 32], + signature: &[u8; 96], + nonce: u64, +) -> Result<(), Box> +where + P: Provider + WalletProvider, +{ + // Left-pad ed25519 key to 48 bytes for the contract (prepend zeros) + let mut padded_pubkey = [0u8; 48]; + padded_pubkey[16..48].copy_from_slice(ed25519_pubkey); + + // Compute the correct deposit data root for this transaction + let deposit_data_root = compute_deposit_data_root( + ed25519_pubkey, + withdrawal_credentials, + deposit_amount, + signature, + ); + + // Create deposit function call data: deposit(bytes,bytes,bytes,bytes32) + let function_selector = &keccak256("deposit(bytes,bytes,bytes,bytes32)")[0..4]; + let mut call_data = function_selector.to_vec(); + + // ABI encode parameters - calculate offsets for 4 parameters (3 dynamic + 1 fixed) + let offset_to_pubkey = 4 * 32; + let offset_to_withdrawal_creds = offset_to_pubkey + 32 + padded_pubkey.len().div_ceil(32) * 32; + let offset_to_signature = + offset_to_withdrawal_creds + 32 + withdrawal_credentials.len().div_ceil(32) * 32; + + // Add parameter offsets + let mut offset_bytes = vec![0u8; 32]; + offset_bytes[28..32].copy_from_slice(&(offset_to_pubkey as u32).to_be_bytes()); + call_data.extend_from_slice(&offset_bytes); + + offset_bytes.fill(0); + offset_bytes[28..32].copy_from_slice(&(offset_to_withdrawal_creds as u32).to_be_bytes()); + call_data.extend_from_slice(&offset_bytes); + + offset_bytes.fill(0); + offset_bytes[28..32].copy_from_slice(&(offset_to_signature as u32).to_be_bytes()); + call_data.extend_from_slice(&offset_bytes); + + // Add the fixed bytes32 parameter (deposit_data_root) + call_data.extend_from_slice(&deposit_data_root); + + // Add dynamic data + let mut length_bytes = [0u8; 32]; + + // Padded pubkey (48 bytes) - already padded to 48, need to pad to next 32-byte boundary (64) + length_bytes[28..32].copy_from_slice(&(padded_pubkey.len() as u32).to_be_bytes()); + call_data.extend_from_slice(&length_bytes); + call_data.extend_from_slice(&padded_pubkey); + call_data.extend_from_slice(&[0u8; 16]); // Pad 48 to 64 bytes (next 32-byte boundary) + + // Withdrawal credentials (32 bytes) - already aligned + length_bytes.fill(0); + length_bytes[28..32].copy_from_slice(&(withdrawal_credentials.len() as u32).to_be_bytes()); + call_data.extend_from_slice(&length_bytes); + call_data.extend_from_slice(withdrawal_credentials); + + // Signature (96 bytes) - already aligned to 32-byte boundary + length_bytes.fill(0); + length_bytes[28..32].copy_from_slice(&(signature.len() as u32).to_be_bytes()); + call_data.extend_from_slice(&length_bytes); + call_data.extend_from_slice(signature); + + let tx_request = TransactionRequest::default() + .with_to(deposit_contract_address) + .with_value(deposit_amount) + .with_input(call_data) + .with_gas_limit(500_000) + .with_gas_price(1_000_000_000) // 1 gwei + .with_nonce(nonce); + + match provider.send_transaction(tx_request).await { + Ok(pending) => { + println!("Transaction sent: {}", pending.tx_hash()); + match pending.get_receipt().await { + Ok(receipt) => { + println!("Receipt: {:?}", receipt); + Ok(()) + } + Err(e) => panic!("Transaction failed: {e}"), + } + } + Err(e) => panic!("Error sending transaction: {}", e), + } +} + +fn compute_deposit_data_root( + ed25519_pubkey: &[u8; 32], + withdrawal_credentials: &[u8; 32], + amount: U256, + signature: &[u8; 96], +) -> [u8; 32] { + /* + bytes32 pubkey_root = sha256(abi.encodePacked(pubkey, bytes16(0))); + bytes32 signature_root = sha256(abi.encodePacked( + sha256(abi.encodePacked(signature[:64])), + sha256(abi.encodePacked(signature[64:], bytes32(0))) + )); + bytes32 node = sha256(abi.encodePacked( + sha256(abi.encodePacked(pubkey_root, withdrawal_credentials)), + sha256(abi.encodePacked(amount, bytes24(0), signature_root)) + )); + */ + + // Left-pad ed25519 key to 48 bytes (prepend zeros) + let mut padded_pubkey = [0u8; 48]; + padded_pubkey[16..48].copy_from_slice(ed25519_pubkey); + + // 1. pubkey_root = sha256(padded_pubkey || bytes16(0)) + let mut hasher = Sha256::new(); + hasher.update(&padded_pubkey); + hasher.update(&[0u8; 16]); // bytes16(0) + let pubkey_root = hasher.finalize(); + + // 2. signature_root = sha256(sha256(signature[0:64]) || sha256(signature[64:96] || bytes32(0))) + let mut hasher = Sha256::new(); + hasher.update(&signature[0..64]); + let sig_part1 = hasher.finalize(); + + let mut hasher = Sha256::new(); + hasher.update(&signature[64..96]); + hasher.update(&[0u8; 32]); // bytes32(0) + let sig_part2 = hasher.finalize(); + + let mut hasher = Sha256::new(); + hasher.update(&sig_part1); + hasher.update(&sig_part2); + let signature_root = hasher.finalize(); + + // 3. Convert amount to 8-byte little-endian (gwei) + let amount_gwei = amount / U256::from(10).pow(U256::from(9)); // Convert wei to gwei + let amount_u64 = amount_gwei.to::(); // Convert to u64 (should fit for reasonable amounts) + let amount_bytes = amount_u64.to_le_bytes(); // 8 bytes little-endian + + // 4. node = sha256(sha256(pubkey_root || withdrawal_credentials) || sha256(amount || bytes24(0) || signature_root)) + let mut hasher = Sha256::new(); + hasher.update(&pubkey_root); + hasher.update(withdrawal_credentials); + let left_node = hasher.finalize(); + + let mut hasher = Sha256::new(); + hasher.update(&amount_bytes); + hasher.update(&[0u8; 24]); // bytes24(0) + hasher.update(&signature_root); + let right_node = hasher.finalize(); + + let mut hasher = Sha256::new(); + hasher.update(&left_node); + hasher.update(&right_node); + let deposit_data_root = hasher.finalize(); + + let digest_bytes: &[u8] = deposit_data_root.as_ref(); + digest_bytes + .try_into() + .expect("SHA-256 digest is always 32 bytes") +} + +fn get_node_flags(node: usize) -> RunFlags { + let path = format!("testnet/node{node}/"); + + RunFlags { + key_path: format!("{path}key.pem"), + store_path: format!("{path}db"), + port: (26600 + (node * 10)) as u16, + prom_port: (28600 + (node * 10)) as u16, + rpc_port: (3030 + (node * 10)) as u16, + worker_threads: 2, + log_level: "debug".into(), + db_prefix: format!("{node}-quarts"), + genesis_path: "./example_genesis.toml".into(), + engine_ipc_path: format!("/tmp/reth_engine_api{node}.ipc"), + #[cfg(any(feature = "base-bench", feature = "bench"))] + bench_block_dir: None, + ip: None, + } +} + +/* +This test only works if the deposit contract is deployed. The contract can be added as a pre-deploy to the Reth genesis like this: + +"0x00000000219ab540356cBB839Cbe05303d7705Fa": { + "code": "0x60806040526004361061003f5760003560e01c806301ffc9a71461004457806322895118146100b6578063621fd130146101e3578063c5f2892f14610273575b600080fd5b34801561005057600080fd5b5061009c6004803603602081101561006757600080fd5b8101908080357bffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916906020019092919050505061029e565b604051808215151515815260200191505060405180910390f35b6101e1600480360360808110156100cc57600080fd5b81019080803590602001906401000000008111156100e957600080fd5b8201836020820111156100fb57600080fd5b8035906020019184600183028401116401000000008311171561011d57600080fd5b90919293919293908035906020019064010000000081111561013e57600080fd5b82018360208201111561015057600080fd5b8035906020019184600183028401116401000000008311171561017257600080fd5b90919293919293908035906020019064010000000081111561019357600080fd5b8201836020820111156101a557600080fd5b803590602001918460018302840111640100000000831117156101c757600080fd5b909192939192939080359060200190929190505050610370565b005b3480156101ef57600080fd5b506101f8610fd0565b6040518080602001828103825283818151815260200191508051906020019080838360005b8381101561023857808201518184015260208101905061021d565b50505050905090810190601f1680156102655780820380516001836020036101000a031916815260200191505b509250505060405180910390f35b34801561027f57600080fd5b50610288610fe2565b6040518082815260200191505060405180910390f35b60007f01ffc9a7000000000000000000000000000000000000000000000000000000007bffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916827bffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916148061036957507f85640907000000000000000000000000000000000000000000000000000000007bffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916827bffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916145b9050919050565b603087879050146103cc576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260268152602001806116ec6026913960400191505060405180910390fd5b60208585905014610428576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260368152602001806116836036913960400191505060405180910390fd5b60608383905014610484576040517f08c379a000000000000000000000000000000000000000000000000000000000815260040180806020018281038252602981526020018061175f6029913960400191505060405180910390fd5b670de0b6b3a76400003410156104e5576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260268152602001806117396026913960400191505060405180910390fd5b6000633b9aca0034816104f457fe5b061461054b576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260338152602001806116b96033913960400191505060405180910390fd5b6000633b9aca00348161055a57fe5b04905067ffffffffffffffff80168111156105c0576040517f08c379a00000000000000000000000000000000000000000000000000000000081526004018080602001828103825260278152602001806117126027913960400191505060405180910390fd5b60606105cb82611314565b90507f649bbc62d0e31342afea4e5cd82d4049e7e1ee912fc0889aa790803be39038c589898989858a8a610600602054611314565b60405180806020018060200180602001806020018060200186810386528e8e82818152602001925080828437600081840152601f19601f82011690508083019250505086810385528c8c82818152602001925080828437600081840152601f19601f82011690508083019250505086810384528a818151815260200191508051906020019080838360005b838110156106a657808201518184015260208101905061068b565b50505050905090810190601f1680156106d35780820380516001836020036101000a031916815260200191505b508681038352898982818152602001925080828437600081840152601f19601f820116905080830192505050868103825287818151815260200191508051906020019080838360005b8381101561073757808201518184015260208101905061071c565b50505050905090810190601f1680156107645780820380516001836020036101000a031916815260200191505b509d505050505050505050505050505060405180910390a1600060028a8a600060801b6040516020018084848082843780830192505050826fffffffffffffffffffffffffffffffff19166fffffffffffffffffffffffffffffffff1916815260100193505050506040516020818303038152906040526040518082805190602001908083835b6020831061080e57805182526020820191506020810190506020830392506107eb565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610850573d6000803e3d6000fd5b5050506040513d602081101561086557600080fd5b8101908080519060200190929190505050905060006002808888600090604092610891939291906115da565b6040516020018083838082843780830192505050925050506040516020818303038152906040526040518082805190602001908083835b602083106108eb57805182526020820191506020810190506020830392506108c8565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa15801561092d573d6000803e3d6000fd5b5050506040513d602081101561094257600080fd5b8101908080519060200190929190505050600289896040908092610968939291906115da565b6000801b604051602001808484808284378083019250505082815260200193505050506040516020818303038152906040526040518082805190602001908083835b602083106109cd57805182526020820191506020810190506020830392506109aa565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610a0f573d6000803e3d6000fd5b5050506040513d6020811015610a2457600080fd5b810190808051906020019092919050505060405160200180838152602001828152602001925050506040516020818303038152906040526040518082805190602001908083835b60208310610a8e5780518252602082019150602081019050602083039250610a6b565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610ad0573d6000803e3d6000fd5b5050506040513d6020811015610ae557600080fd5b810190808051906020019092919050505090506000600280848c8c604051602001808481526020018383808284378083019250505093505050506040516020818303038152906040526040518082805190602001908083835b60208310610b615780518252602082019150602081019050602083039250610b3e565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610ba3573d6000803e3d6000fd5b5050506040513d6020811015610bb857600080fd5b8101908080519060200190929190505050600286600060401b866040516020018084805190602001908083835b60208310610c085780518252602082019150602081019050602083039250610be5565b6001836020036101000a0380198251168184511680821785525050505050509050018367ffffffffffffffff191667ffffffffffffffff1916815260180182815260200193505050506040516020818303038152906040526040518082805190602001908083835b60208310610c935780518252602082019150602081019050602083039250610c70565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610cd5573d6000803e3d6000fd5b5050506040513d6020811015610cea57600080fd5b810190808051906020019092919050505060405160200180838152602001828152602001925050506040516020818303038152906040526040518082805190602001908083835b60208310610d545780518252602082019150602081019050602083039250610d31565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610d96573d6000803e3d6000fd5b5050506040513d6020811015610dab57600080fd5b81019080805190602001909291905050509050858114610e16576040517f08c379a000000000000000000000000000000000000000000000000000000000815260040180806020018281038252605481526020018061162f6054913960600191505060405180910390fd5b6001602060020a0360205410610e77576040517f08c379a000000000000000000000000000000000000000000000000000000000815260040180806020018281038252602181526020018061160e6021913960400191505060405180910390fd5b60016020600082825401925050819055506000602054905060008090505b6020811015610fb75760018083161415610ec8578260008260208110610eb757fe5b018190555050505050505050610fc7565b600260008260208110610ed757fe5b01548460405160200180838152602001828152602001925050506040516020818303038152906040526040518082805190602001908083835b60208310610f335780518252602082019150602081019050602083039250610f10565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa158015610f75573d6000803e3d6000fd5b5050506040513d6020811015610f8a57600080fd5b8101908080519060200190929190505050925060028281610fa757fe5b0491508080600101915050610e95565b506000610fc057fe5b5050505050505b50505050505050565b6060610fdd602054611314565b905090565b6000806000602054905060008090505b60208110156111d057600180831614156110e05760026000826020811061101557fe5b01548460405160200180838152602001828152602001925050506040516020818303038152906040526040518082805190602001908083835b60208310611071578051825260208201915060208101905060208303925061104e565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa1580156110b3573d6000803e3d6000fd5b5050506040513d60208110156110c857600080fd5b810190808051906020019092919050505092506111b6565b600283602183602081106110f057fe5b015460405160200180838152602001828152602001925050506040516020818303038152906040526040518082805190602001908083835b6020831061114b5780518252602082019150602081019050602083039250611128565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa15801561118d573d6000803e3d6000fd5b5050506040513d60208110156111a257600080fd5b810190808051906020019092919050505092505b600282816111c057fe5b0491508080600101915050610ff2565b506002826111df602054611314565b600060401b6040516020018084815260200183805190602001908083835b6020831061122057805182526020820191506020810190506020830392506111fd565b6001836020036101000a0380198251168184511680821785525050505050509050018267ffffffffffffffff191667ffffffffffffffff1916815260180193505050506040516020818303038152906040526040518082805190602001908083835b602083106112a55780518252602082019150602081019050602083039250611282565b6001836020036101000a038019825116818451168082178552505050505050905001915050602060405180830381855afa1580156112e7573d6000803e3d6000fd5b5050506040513d60208110156112fc57600080fd5b81019080805190602001909291905050509250505090565b6060600867ffffffffffffffff8111801561132e57600080fd5b506040519080825280601f01601f1916602001820160405280156113615781602001600182028036833780820191505090505b50905060008260c01b90508060076008811061137957fe5b1a60f81b8260008151811061138a57fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a905350806006600881106113c657fe5b1a60f81b826001815181106113d757fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a9053508060056008811061141357fe5b1a60f81b8260028151811061142457fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a9053508060046008811061146057fe5b1a60f81b8260038151811061147157fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a905350806003600881106114ad57fe5b1a60f81b826004815181106114be57fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a905350806002600881106114fa57fe5b1a60f81b8260058151811061150b57fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a9053508060016008811061154757fe5b1a60f81b8260068151811061155857fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a9053508060006008811061159457fe5b1a60f81b826007815181106115a557fe5b60200101907effffffffffffffffffffffffffffffffffffffffffffffffffffffffffffff1916908160001a90535050919050565b600080858511156115ea57600080fd5b838611156115f757600080fd5b600185028301915084860390509450949250505056fe4465706f736974436f6e74726163743a206d65726b6c6520747265652066756c6c4465706f736974436f6e74726163743a207265636f6e7374727563746564204465706f7369744461746120646f6573206e6f74206d6174636820737570706c696564206465706f7369745f646174615f726f6f744465706f736974436f6e74726163743a20696e76616c6964207769746864726177616c5f63726564656e7469616c73206c656e6774684465706f736974436f6e74726163743a206465706f7369742076616c7565206e6f74206d756c7469706c65206f6620677765694465706f736974436f6e74726163743a20696e76616c6964207075626b6579206c656e6774684465706f736974436f6e74726163743a206465706f7369742076616c756520746f6f20686967684465706f736974436f6e74726163743a206465706f7369742076616c756520746f6f206c6f774465706f736974436f6e74726163743a20696e76616c6964207369676e6174757265206c656e677468a2646970667358221220061922152bf33e33341dc256ce0c64bb49c53fc5bbc7d9cc77b02b9623906e9364736f6c634300060b0033", + "balance": "0x0" +} + +Also this Address 0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266 should have enough funds to send a transaction. + + */ diff --git a/node/src/bin/testnet.rs b/node/src/bin/testnet.rs index f969b92..9c6501d 100644 --- a/node/src/bin/testnet.rs +++ b/node/src/bin/testnet.rs @@ -167,7 +167,8 @@ fn main() -> Result<(), Box> { } // Start our consensus engine - let handle = run_node_with_runtime(context.with_label(&format!("node{x}")), flags); + let handle = + run_node_with_runtime(context.with_label(&format!("node{x}")), flags, None); consensus_handles.push(handle); } @@ -204,5 +205,6 @@ fn get_node_flags(node: usize) -> RunFlags { engine_ipc_path: format!("/tmp/reth_engine_api{node}.ipc"), #[cfg(any(feature = "base-bench", feature = "bench"))] bench_block_dir: None, + ip: None, } } diff --git a/node/src/config.rs b/node/src/config.rs index 8f40406..a27f6d2 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -4,26 +4,26 @@ use crate::keys::read_ed_key_from_path; use anyhow::{Context, Result}; use commonware_utils::from_hex_formatted; use governor::Quota; -use summit_types::checkpoint::Checkpoint; +use summit_types::consensus_state::ConsensusState; +use summit_types::network_oracle::NetworkOracle; use summit_types::{EngineClient, Genesis, PrivateKey, PublicKey}; - /* DEFAULTS */ pub const PENDING_CHANNEL: u32 = 0; pub const RESOLVER_CHANNEL: u32 = 1; pub const BROADCASTER_CHANNEL: u32 = 2; pub const BACKFILLER_CHANNEL: u32 = 3; +pub const MAILBOX_SIZE: usize = 16384; const FETCH_TIMEOUT: Duration = Duration::from_secs(5); const FETCH_CONCURRENT: usize = 4; const MAX_FETCH_COUNT: usize = 16; const MAX_FETCH_SIZE: usize = 512 * 1024; -const MAILBOX_SIZE: usize = 16384; const DEQUE_SIZE: usize = 10; pub const MESSAGE_BACKLOG: usize = 16384; const BACKFILL_QUOTA: u32 = 10; // in seconds const FETCH_RATE_P2P: u32 = 128; // in seconds -pub struct EngineConfig { +pub struct EngineConfig> { pub engine_client: C, pub partition_prefix: String, pub signer: PrivateKey, @@ -32,6 +32,8 @@ pub struct EngineConfig { pub backfill_quota: Quota, pub deque_size: usize, + pub oracle: O, + pub leader_timeout: Duration, pub notarization_timeout: Duration, pub nullify_retry: Duration, @@ -46,23 +48,25 @@ pub struct EngineConfig { pub namespace: String, pub genesis_hash: [u8; 32], - pub checkpoint: Option, + pub initial_state: ConsensusState, } -impl EngineConfig { +impl> EngineConfig { pub fn get_engine_config( engine_client: C, + oracle: O, signer: PrivateKey, participants: Vec, db_prefix: String, genesis: &Genesis, - checkpoint: Option, + initial_state: ConsensusState, ) -> Result { Ok(Self { engine_client, partition_prefix: db_prefix, signer, participants, + oracle, mailbox_size: MAILBOX_SIZE, backfill_quota: Quota::per_second(NonZeroU32::new(BACKFILL_QUOTA).unwrap()), deque_size: DEQUE_SIZE, @@ -81,7 +85,7 @@ impl EngineConfig { .map(|hash_bytes| hash_bytes.try_into()) .expect("bad eth_genesis_hash") .expect("bad eth_genesis_hash"), - checkpoint, + initial_state, }) } } diff --git a/node/src/engine.rs b/node/src/engine.rs index 8ed272f..daab1e6 100644 --- a/node/src/engine.rs +++ b/node/src/engine.rs @@ -15,6 +15,7 @@ use summit_application::ApplicationConfig; use summit_finalizer::actor::Finalizer; use summit_finalizer::{FinalizerConfig, FinalizerMailbox}; use summit_syncer::Orchestrator; +use summit_types::network_oracle::NetworkOracle; use summit_types::registry::Registry; use summit_types::{Block, Digest, EngineClient, PrivateKey, PublicKey}; use tracing::{error, warn}; @@ -38,9 +39,11 @@ pub const VALIDATOR_MINIMUM_STAKE: u64 = 32_000_000_000; // in gwei pub const VALIDATOR_WITHDRAWAL_PERIOD: u64 = 5; #[cfg(not(debug_assertions))] const VALIDATOR_WITHDRAWAL_PERIOD: u64 = 100; +#[cfg(all(feature = "e2e", not(debug_assertions)))] +pub const EPOCH_NUM_BLOCKS: u64 = 50; #[cfg(debug_assertions)] pub const EPOCH_NUM_BLOCKS: u64 = 10; -#[cfg(not(debug_assertions))] +#[cfg(all(not(debug_assertions), not(feature = "e2e")))] const EPOCH_NUM_BLOCKS: u64 = 1000; const VALIDATOR_MAX_WITHDRAWALS_PER_BLOCK: usize = 16; // @@ -48,6 +51,7 @@ const VALIDATOR_MAX_WITHDRAWALS_PER_BLOCK: usize = 16; pub struct Engine< E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics, C: EngineClient, + O: NetworkOracle, > { context: E, application: summit_application::Actor, @@ -55,7 +59,7 @@ pub struct Engine< buffer_mailbox: buffered::Mailbox, syncer: summit_syncer::Actor, syncer_mailbox: summit_syncer::Mailbox, - finalizer: Finalizer, + finalizer: Finalizer, pub finalizer_mailbox: FinalizerMailbox, orchestrator: Orchestrator, simplex: Simplex< @@ -71,23 +75,17 @@ pub struct Engine< sync_height: u64, } -impl - Engine +impl< + E: Clock + GClock + Rng + CryptoRng + Spawner + Storage + Metrics, + C: EngineClient, + O: NetworkOracle, +> Engine { - pub async fn new(context: E, cfg: EngineConfig) -> Self { + pub async fn new(context: E, cfg: EngineConfig) -> Self { let registry = Registry::new(cfg.participants.clone()); let buffer_pool = PoolRef::new(BUFFER_POOL_PAGE_SIZE, BUFFER_POOL_CAPACITY); - // Convert checkpoint to ConsensusState if provided - let initial_state = cfg.checkpoint.as_ref().map(|checkpoint| { - summit_types::consensus_state::ConsensusState::try_from(checkpoint) - .expect("failed to load consensus state from checkpoint") - }); - - let sync_height = initial_state - .as_ref() - .map(|state| state.latest_height) - .unwrap_or(0); + let sync_height = cfg.initial_state.latest_height; // create application let (application, application_mailbox) = summit_application::Actor::new( @@ -135,6 +133,7 @@ impl eyre::Result<()> { + pub async fn serve(&self, stop_signal: Signal) -> eyre::Result<()> { let MetricServerConfig { listen_addr, hooks } = &self.config; let hooks = hooks.clone(); self.start_endpoint( *listen_addr, Arc::new(move || hooks.iter().for_each(|hook| hook())), + stop_signal, ) .await .wrap_err("Could not start Prometheus endpoint")?; @@ -61,36 +64,45 @@ impl MetricServer { &self, listen_addr: SocketAddr, hook: Arc, + stop_signal: Signal, ) -> eyre::Result<()> { let listener = tokio::net::TcpListener::bind(listen_addr) .await .wrap_err("Could not bind to address")?; - //task_executor.spawn_with_graceful_shutdown_signal(|mut signal| async move { tokio::spawn(async move { + let mut stop_signal = stop_signal.fuse(); loop { - let Ok((stream, _remote_addr)) = listener.accept().await else { - tracing::error!("failed to accept connection"); - continue; - }; - - let handle = install_prometheus_recorder(); - let hook = hook.clone(); - let service = tower::service_fn(move |_| { - (hook)(); - let metrics = handle.handle().render(); - let mut response = Response::new(metrics); - response - .headers_mut() - .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain")); - async move { Ok::<_, Infallible>(response) } - }); - - tokio::task::spawn(async move { - let _ = jsonrpsee_server::serve(stream, service) - .await - .inspect_err(|error| tracing::debug!(%error, "failed to serve request")); - }); + tokio::select! { + result = listener.accept() => { + let Ok((stream, _remote_addr)) = result else { + tracing::error!("failed to accept connection"); + continue; + }; + + let handle = install_prometheus_recorder(); + let hook = hook.clone(); + let service = tower::service_fn(move |_| { + (hook)(); + let metrics = handle.handle().render(); + let mut response = Response::new(metrics); + response + .headers_mut() + .insert(CONTENT_TYPE, HeaderValue::from_static("text/plain")); + async move { Ok::<_, Infallible>(response) } + }); + + tokio::task::spawn(async move { + let _ = jsonrpsee_server::serve(stream, service) + .await + .inspect_err(|error| tracing::debug!(%error, "failed to serve request")); + }); + } + sig = &mut stop_signal => { + tracing::info!("Metrics server shutting down: {}", sig.unwrap()); + break; + } + } } }); @@ -195,6 +207,7 @@ const fn describe_io_stats() {} #[cfg(test)] mod tests { use super::*; + use commonware_runtime::signal::Stopper; use reqwest::Client; use socket2::{Domain, Socket, Type}; use std::net::{SocketAddr, TcpListener}; @@ -216,7 +229,10 @@ mod tests { let listen_addr = get_random_available_addr(); let config = MetricServerConfig::new(listen_addr, hooks); - MetricServer::new(config).serve().await.unwrap(); + let stopper = Stopper::new(); + let signal = stopper.stopped(); + + MetricServer::new(config).serve(signal).await.unwrap(); // Send request to the metrics endpoint let url = format!("http://{listen_addr}"); diff --git a/node/src/test_harness/common.rs b/node/src/test_harness/common.rs index da634bc..35dd87c 100644 --- a/node/src/test_harness/common.rs +++ b/node/src/test_harness/common.rs @@ -1,10 +1,11 @@ use commonware_cryptography::{Hasher, PrivateKeyExt, Sha256, Signer}; -use crate::engine::PROTOCOL_VERSION; +use crate::engine::{PROTOCOL_VERSION, VALIDATOR_MINIMUM_STAKE}; use crate::test_harness::mock_engine_client::MockEngineNetwork; use crate::{config::EngineConfig, engine::Engine}; use alloy_eips::eip7685::Requests; -use alloy_primitives::{Address, Bytes}; +use alloy_primitives::{Address, B256, Bytes}; +use alloy_rpc_types_engine::ForkchoiceState; use commonware_codec::Write; use commonware_p2p::simulated::{self, Link, Network, Oracle, Receiver, Sender}; use commonware_runtime::{ @@ -18,8 +19,10 @@ use std::{ collections::{HashMap, HashSet}, num::NonZeroU32, }; -use summit_types::checkpoint::Checkpoint; +use summit_types::account::{ValidatorAccount, ValidatorStatus}; +use summit_types::consensus_state::ConsensusState; use summit_types::execution_request::{DepositRequest, ExecutionRequest, WithdrawalRequest}; +use summit_types::network_oracle::NetworkOracle; use summit_types::{Digest, EngineClient, PrivateKey, PublicKey}; pub const GENESIS_HASH: &str = "0x683713729fcb72be6f3d8b88c8cda3e10569d73b9640d3bf6f5184d94bd97616"; @@ -156,6 +159,13 @@ pub fn run_until_height( .try_into() .expect("failed to convert genesis hash"); let engine_client_network = MockEngineNetwork::new(genesis_hash); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -173,12 +183,13 @@ pub fn run_until_height( let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; @@ -253,6 +264,48 @@ pub fn get_domain() -> Digest { Sha256::hash(&PROTOCOL_VERSION.to_le_bytes()) } +pub fn get_initial_state( + genesis_hash: [u8; 32], + committee: &Vec, + withdrawal_credentials: Option<&Vec

>, + checkpoint: Option, + balance: u64, +) -> ConsensusState { + let addresses = vec![Address::ZERO; committee.len()]; + let addresses = withdrawal_credentials.unwrap_or(&addresses); + let genesis_hash: B256 = genesis_hash.into(); + checkpoint.unwrap_or_else(|| { + let forkchoice = ForkchoiceState { + head_block_hash: genesis_hash, + safe_block_hash: genesis_hash, + finalized_block_hash: genesis_hash, + }; + let mut state = ConsensusState::new(forkchoice); + // Add the genesis nodes to the consensus state with the minimum stake balance. + for (pubkey, address) in committee.iter().zip(addresses.iter()) { + let pubkey_bytes: [u8; 32] = pubkey + .as_ref() + .try_into() + .expect("Public key must be 32 bytes"); + let account = ValidatorAccount { + // TODO(matthias): we have to add a withdrawal address to the genesis + withdrawal_credentials: *address, + balance, + pending_withdrawal_amount: 0, + status: ValidatorStatus::Active, + // TODO(matthias): this index is comes from the deposit contract. + // Since there is no deposit transaction for the genesis nodes, the index will still be + // 0 for the deposit contract. Right now we only use this index to avoid counting the same deposit request twice. + // Since we set the index to 0 here, we cannot rely on the uniqueness. The first actual deposit request will have + // index 0 as well. + last_deposit_index: 0, + }; + state.validator_accounts.insert(pubkey_bytes, account); + } + state + }) +} + /// Parse a substring from a metric name using XML-like tags /// /// # Arguments @@ -409,17 +462,19 @@ pub fn execution_requests_to_requests(execution_requests: Vec) /// /// # Returns /// * `EngineConfig` - A fully configured engine config with sensible defaults for testing -pub fn get_default_engine_config( +pub fn get_default_engine_config>( engine_client: C, + oracle: O, partition_prefix: String, genesis_hash: [u8; 32], namespace: String, signer: PrivateKey, participants: Vec, - checkpoint: Option, -) -> EngineConfig { + initial_state: ConsensusState, +) -> EngineConfig { EngineConfig { engine_client, + oracle, partition_prefix, genesis_hash, namespace, @@ -438,6 +493,18 @@ pub fn get_default_engine_config( _max_fetch_size: 1024 * 512, fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), - checkpoint, + initial_state, } } + +pub struct DummyOracle {} + +impl Default for DummyOracle { + fn default() -> Self { + Self {} + } +} + +impl NetworkOracle for DummyOracle { + async fn register(&mut self, _index: u64, _peers: Vec) {} +} diff --git a/node/src/tests/checkpointing.rs b/node/src/tests/checkpointing.rs index bd3a4ae..dbdffab 100644 --- a/node/src/tests/checkpointing.rs +++ b/node/src/tests/checkpointing.rs @@ -1,6 +1,6 @@ -use crate::engine::{EPOCH_NUM_BLOCKS, Engine}; +use crate::engine::{EPOCH_NUM_BLOCKS, Engine, VALIDATOR_MINIMUM_STAKE}; use crate::test_harness::common; -use crate::test_harness::common::get_default_engine_config; +use crate::test_harness::common::{DummyOracle, get_default_engine_config, get_initial_state}; use crate::test_harness::mock_engine_client::MockEngineNetworkBuilder; use commonware_cryptography::{PrivateKeyExt, Signer}; use commonware_macros::test_traced; @@ -12,7 +12,6 @@ use commonware_utils::from_hex_formatted; use std::collections::{HashMap, HashSet}; use std::time::Duration; use summit_types::PrivateKey; -use summit_types::checkpoint::Checkpoint; use summit_types::consensus_state::ConsensusState; #[test_traced("INFO")] @@ -63,6 +62,13 @@ fn test_checkpoint_created() { let stop_height = EPOCH_NUM_BLOCKS + 1; let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -80,12 +86,13 @@ fn test_checkpoint_created() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -215,6 +222,13 @@ fn test_previous_header_hash_matches() { let stop_height = EPOCH_NUM_BLOCKS + 1; let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -232,12 +246,13 @@ fn test_previous_header_hash_matches() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -376,23 +391,23 @@ fn test_single_engine_with_checkpoint() { let mut consensus_state = ConsensusState::default(); consensus_state.set_latest_height(50); // Set a specific height - // Create a checkpoint from the consensus state - let checkpoint = Checkpoint::new(&consensus_state); - // Configure engine with the checkpoint let public_key = signer.public_key(); let uid = format!("validator-{public_key}"); let namespace = String::from("_SEISMIC_BFT"); let engine_client = engine_client_network.create_client(uid.clone()); + let latest_height = consensus_state.latest_height; + let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - Some(checkpoint.clone()), + consensus_state, ); let engine = Engine::new(context.with_label(&uid), config).await; @@ -412,9 +427,9 @@ fn test_single_engine_with_checkpoint() { // The finalizer should have been initialized with our checkpoint at height 50 // Since consensus is running, the height might be >= 50 assert!( - current_height >= consensus_state.latest_height, + current_height >= latest_height, "Expected height >= {}, got {}", - consensus_state.latest_height, + latest_height, current_height ); @@ -472,6 +487,13 @@ fn test_node_joins_later_with_checkpoint() { .expect("failed to convert genesis hash"); let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -493,12 +515,13 @@ fn test_node_joins_later_with_checkpoint() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -557,12 +580,13 @@ fn test_node_joins_later_with_checkpoint() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer_joining_later, validators.clone(), - Some(checkpoint), + consensus_state, ); let engine = Engine::new(context.with_label(&uid), config).await; @@ -684,6 +708,13 @@ fn test_node_joins_later_with_checkpoint_not_in_genesis() { .expect("failed to convert genesis hash"); let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -705,12 +736,13 @@ fn test_node_joins_later_with_checkpoint_not_in_genesis() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, initial_validators.to_vec(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -769,12 +801,13 @@ fn test_node_joins_later_with_checkpoint_not_in_genesis() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer_joining_later, initial_validators.to_vec(), - Some(checkpoint), + consensus_state, ); let engine = Engine::new(context.with_label(&uid), config).await; diff --git a/node/src/tests/execution_requests.rs b/node/src/tests/execution_requests.rs index b51ab9e..1199928 100644 --- a/node/src/tests/execution_requests.rs +++ b/node/src/tests/execution_requests.rs @@ -1,6 +1,6 @@ use crate::engine::{EPOCH_NUM_BLOCKS, Engine, VALIDATOR_MINIMUM_STAKE}; use crate::test_harness::common; -use crate::test_harness::common::get_default_engine_config; +use crate::test_harness::common::{DummyOracle, get_default_engine_config, get_initial_state}; use crate::test_harness::mock_engine_client::MockEngineNetworkBuilder; use alloy_primitives::{Address, hex}; use commonware_cryptography::{PrivateKeyExt, Signer}; @@ -85,6 +85,7 @@ fn test_deposit_request_single() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -102,12 +103,13 @@ fn test_deposit_request_single() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -271,6 +273,8 @@ fn test_deposit_request_top_up() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + // Set the validator balance to 0 + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -288,12 +292,13 @@ fn test_deposit_request_top_up() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -385,7 +390,7 @@ fn test_deposit_and_withdrawal_request_single() { // Adds a deposit request to the block at height 5, and then adds a withdrawal request // to the block at height 7. // It is verified that the validator balance is correctly decremented after the withdrawal, - // and that the withdrawal request that is send to the execution layer matches the + // and that the withdrawal request that is sent to the execution layer matches the // withdrawal request (execution request) that was initially added to block 7. let n = 10; let link = Link { @@ -433,7 +438,7 @@ fn test_deposit_and_withdrawal_request_single() { // Create a single deposit request using the helper let (test_deposit, _) = common::create_deposit_request( - 1, + n as u64, // use a private key seed that doesn't exist on the consensus state VALIDATOR_MINIMUM_STAKE, common::get_domain(), None, @@ -455,7 +460,7 @@ fn test_deposit_and_withdrawal_request_single() { let requests2 = common::execution_requests_to_requests(execution_requests2); // Create execution requests map (add deposit to block 5) - // The deposit request will processed after 10 blocks because `EPOCH_NUM_BLOCKS` + // The deposit request will be processed after 10 blocks because `EPOCH_NUM_BLOCKS` // is set to 10 in debug mode. // The withdrawal request should be added after block 10, otherwise it will be ignored, because // the account doesn't exist yet. @@ -469,6 +474,7 @@ fn test_deposit_and_withdrawal_request_single() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -486,12 +492,13 @@ fn test_deposit_and_withdrawal_request_single() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -639,7 +646,7 @@ fn test_partial_withdrawal_balance_below_minimum_stake() { // Create a single deposit request using the helper let (test_deposit, _) = common::create_deposit_request( - 1, + n as u64, VALIDATOR_MINIMUM_STAKE, common::get_domain(), None, @@ -681,6 +688,13 @@ fn test_partial_withdrawal_balance_below_minimum_stake() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -698,12 +712,13 @@ fn test_partial_withdrawal_balance_below_minimum_stake() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -877,7 +892,7 @@ fn test_deposit_less_than_min_stake_and_withdrawal() { let requests2 = common::execution_requests_to_requests(execution_requests2); // Create execution requests map (add deposit to block 5) - // The deposit request will processed after 10 blocks because `EPOCH_NUM_BLOCKS` + // The deposit request will be processed after 10 blocks because `EPOCH_NUM_BLOCKS` // is set to 10 in debug mode. // The withdrawal request should be added after block 10, otherwise it will be ignored, because // the account doesn't exist yet. @@ -891,6 +906,8 @@ fn test_deposit_less_than_min_stake_and_withdrawal() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + // Set the validator balance to 0 + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -908,12 +925,13 @@ fn test_deposit_less_than_min_stake_and_withdrawal() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -970,16 +988,14 @@ fn test_deposit_less_than_min_stake_and_withdrawal() { if metric.ends_with("withdrawal_validator_balance") { let balance = value.parse::().unwrap(); // Parse the pubkey from the metric name using helper function - if let Some(ed_pubkey_hex) = common::parse_metric_substring(metric, "pubkey") { - let creds = - common::parse_metric_substring(metric, "creds").expect("creds missing"); - assert_eq!(creds, hex::encode(test_withdrawal.source_address)); - assert_eq!(ed_pubkey_hex, test_deposit.pubkey.to_string()); - assert_eq!(balance, test_deposit.amount - test_withdrawal.amount); - processed_requests.insert(metric.to_string()); - } else { - println!("{}: {} (failed to parse pubkey)", metric, value); - } + let ed_pubkey_hex = common::parse_metric_substring(metric, "pubkey") + .expect(&format!("{}: {} (failed to parse pubkey)", metric, value)); + let creds = + common::parse_metric_substring(metric, "creds").expect("creds missing"); + assert_eq!(creds, hex::encode(test_withdrawal.source_address)); + assert_eq!(ed_pubkey_hex, test_deposit.pubkey.to_string()); + assert_eq!(balance, test_deposit.amount - test_withdrawal.amount); + processed_requests.insert(metric.to_string()); } if processed_requests.len() as u32 >= n && height_reached.len() as u32 == n { success = true; @@ -1116,6 +1132,8 @@ fn test_deposit_and_withdrawal_request_multiple() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + // Set the validator balance to 0 + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -1133,12 +1151,13 @@ fn test_deposit_and_withdrawal_request_multiple() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -1328,6 +1347,8 @@ fn test_deposit_request_invalid_signature() { let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash) .with_execution_requests(execution_requests_map) .build(); + // Set the validator balance to 0 + let initial_state = get_initial_state(genesis_hash, &validators, None, None, 0); // Create instances let mut public_keys = HashSet::new(); @@ -1345,12 +1366,13 @@ fn test_deposit_request_invalid_signature() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); diff --git a/node/src/tests/syncer.rs b/node/src/tests/syncer.rs index 83bb0e4..c8b6314 100644 --- a/node/src/tests/syncer.rs +++ b/node/src/tests/syncer.rs @@ -1,6 +1,6 @@ -use crate::engine::{EPOCH_NUM_BLOCKS, Engine}; +use crate::engine::{EPOCH_NUM_BLOCKS, Engine, VALIDATOR_MINIMUM_STAKE}; use crate::test_harness::common; -use crate::test_harness::common::get_default_engine_config; +use crate::test_harness::common::{DummyOracle, get_default_engine_config, get_initial_state}; use crate::test_harness::mock_engine_client::MockEngineNetworkBuilder; use commonware_cryptography::{PrivateKeyExt, Signer}; use commonware_macros::test_traced; @@ -63,6 +63,13 @@ fn test_node_joins_later_no_checkpoint() { .expect("failed to convert genesis hash"); let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -84,12 +91,13 @@ fn test_node_joins_later_no_checkpoint() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, validators.clone(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -134,12 +142,13 @@ fn test_node_joins_later_no_checkpoint() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer_joining_later, validators.clone(), - None, + initial_state, // pass initial state (start from genesis) ); let engine = Engine::new(context.with_label(&uid), config).await; @@ -261,6 +270,13 @@ fn test_node_joins_later_no_checkpoint_not_in_genesis() { .expect("failed to convert genesis hash"); let engine_client_network = MockEngineNetworkBuilder::new(genesis_hash).build(); + let initial_state = get_initial_state( + genesis_hash, + &validators, + None, + None, + VALIDATOR_MINIMUM_STAKE, + ); // Create instances let mut public_keys = HashSet::new(); @@ -282,12 +298,13 @@ fn test_node_joins_later_no_checkpoint_not_in_genesis() { let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer, initial_validators.to_vec(), - None, + initial_state.clone(), ); let engine = Engine::new(context.with_label(&uid), config).await; consensus_state_queries.insert(idx, engine.finalizer_mailbox.clone()); @@ -334,12 +351,13 @@ fn test_node_joins_later_no_checkpoint_not_in_genesis() { // since historical blocks were finalized by only those 4 validators let config = get_default_engine_config( engine_client, + DummyOracle::default(), uid.clone(), genesis_hash, namespace, signer_joining_later, initial_validators.to_vec(), - None, + initial_state, // pass initial state (start from genesis) ); let engine = Engine::new(context.with_label(&uid), config).await; diff --git a/rpc/Cargo.toml b/rpc/Cargo.toml index 33b3286..de5c57f 100644 --- a/rpc/Cargo.toml +++ b/rpc/Cargo.toml @@ -18,6 +18,7 @@ summit-types = { workspace = true } commonware-cryptography = { workspace = true } commonware-utils = { workspace = true } commonware-codec = { workspace = true } +commonware-runtime = { workspace = true } ethereum_ssz.workspace = true dirs = "5.0.1" serde = { workspace = true } diff --git a/rpc/src/lib.rs b/rpc/src/lib.rs index 8606e42..79a7393 100644 --- a/rpc/src/lib.rs +++ b/rpc/src/lib.rs @@ -1,8 +1,8 @@ pub mod routes; -use std::sync::Mutex; - use crate::routes::RpcRoutes; +use commonware_runtime::signal::Signal; use futures::channel::oneshot; +use std::sync::Mutex; use summit_finalizer::FinalizerMailbox; use tokio::net::TcpListener; use tokio_util::sync::CancellationToken; @@ -25,6 +25,7 @@ pub async fn start_rpc_server( finalizer_mailbox: FinalizerMailbox, key_path: String, port: u16, + stop_signal: Signal, ) -> anyhow::Result<()> { let state = RpcState::new(key_path, finalizer_mailbox); @@ -33,8 +34,12 @@ pub async fn start_rpc_server( let listener = TcpListener::bind(format!("0.0.0.0:{port}")).await?; println!("RPC Server listening on http://0.0.0.0:{port}"); - - axum::serve(listener, server).await?; + axum::serve(listener, server) + .with_graceful_shutdown(async move { + let sig = stop_signal.await.unwrap(); + println!("RPC server stopped: {sig}"); + }) + .await?; Ok(()) } diff --git a/rpc/src/routes.rs b/rpc/src/routes.rs index e91a49d..3c87788 100644 --- a/rpc/src/routes.rs +++ b/rpc/src/routes.rs @@ -24,6 +24,7 @@ impl RpcRoutes { .route("/health", get(Self::handle_health_check)) .route("/get_public_key", get(Self::handle_get_pub_key)) .route("/get_checkpoint", get(Self::handle_get_checkpoint)) + .route("/get_latest_height", get(Self::handle_latest_height)) .with_state(state) } @@ -71,6 +72,14 @@ impl RpcRoutes { Ok(hex(&encoded)) } + async fn handle_latest_height(State(state): State>) -> Result { + Ok(state + .finalizer_mailbox + .get_latest_height() + .await + .to_string()) + } + async fn handle_send_genesis( State(state): State>, body: String, diff --git a/syncer/src/actor.rs b/syncer/src/actor.rs index 874af84..be461f9 100644 --- a/syncer/src/actor.rs +++ b/syncer/src/actor.rs @@ -22,14 +22,14 @@ use commonware_storage::{ translator::TwoCap, }; use commonware_utils::NZU64; -use futures::{StreamExt as _, channel::mpsc}; +use futures::{FutureExt, StreamExt as _, channel::mpsc}; use governor::Quota; #[cfg(feature = "prom")] use metrics::histogram; use rand::Rng; use summit_types::registry::Registry; use summit_types::{Block, Digest, Finalized, Notarized, PublicKey, Signature}; -use tracing::{debug, warn}; +use tracing::{debug, info, warn}; const PRUNABLE_ITEMS_PER_SECTION: NonZero = NZU64!(4_096); const IMMUTABLE_ITEMS_PER_SECTION: NonZero = NZU64!(262_144); @@ -217,6 +217,7 @@ impl Acto let mut requested_blocks = BTreeSet::new(); let mut last_view_processed: u64 = 0; let mut outstanding_notarize = BTreeSet::new(); + let mut signal = self.context.stopped().fuse(); loop { // Cancel useless requests let mut to_cancel = Vec::new(); @@ -742,6 +743,10 @@ impl Acto } }, } + }, + sig = &mut signal => { + info!("syncer terminated: {}", sig.unwrap()); + break; } } } diff --git a/types/Cargo.toml b/types/Cargo.toml index 694af64..3a166b0 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -9,6 +9,8 @@ commonware-consensus.workspace = true commonware-codec.workspace = true commonware-utils.workspace = true commonware-resolver.workspace = true +commonware-p2p.workspace = true +commonware-runtime.workspace = true alloy-rpc-types-engine.workspace = true alloy-eips.workspace = true @@ -18,6 +20,11 @@ alloy-provider.workspace = true alloy-transport-ipc.workspace = true futures.workspace = true + +# for reth.rs module (e2e feature only) +alloy-genesis = { workspace = true, optional = true } +rand = { workspace = true, optional = true } +url = { workspace = true, optional = true } anyhow.workspace = true dirs.workspace = true ethereum_ssz.workspace = true @@ -31,7 +38,9 @@ serde_json = { workspace = true, optional = true } # for benchmarking op-alloy-network = { version = "0.19.1", optional = true } op-alloy-rpc-types-engine = { version = "0.19.1", optional = true } +libc = { version = "0.2.174", optional = true} [features] base-bench = ["op-alloy-network", "op-alloy-rpc-types-engine","serde_json"] -bench = ["serde_json"] \ No newline at end of file +bench = ["serde_json"] +e2e = ["libc", "alloy-genesis", "rand", "url"] \ No newline at end of file diff --git a/types/src/lib.rs b/types/src/lib.rs index fa76a25..6417320 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -7,7 +7,10 @@ pub mod engine_client; pub mod execution_request; pub mod genesis; pub mod header; +pub mod network_oracle; pub mod registry; +#[cfg(feature = "e2e")] +pub mod reth; pub mod utils; pub mod withdrawal; diff --git a/types/src/network_oracle.rs b/types/src/network_oracle.rs new file mode 100644 index 0000000..8ad0d39 --- /dev/null +++ b/types/src/network_oracle.rs @@ -0,0 +1,23 @@ +use commonware_cryptography::PublicKey; +use commonware_p2p::authenticated::discovery::Oracle; +use commonware_runtime::{Metrics, Spawner}; + +pub trait NetworkOracle: Send + Sync + 'static { + fn register(&mut self, index: u64, peers: Vec) -> impl Future + Send; +} + +pub struct DiscoveryOracle { + oracle: Oracle, +} + +impl DiscoveryOracle { + pub fn new(oracle: Oracle) -> Self { + Self { oracle } + } +} + +impl NetworkOracle for DiscoveryOracle { + async fn register(&mut self, index: u64, peers: Vec) { + self.oracle.register(index, peers).await; + } +} diff --git a/types/src/reth.rs b/types/src/reth.rs new file mode 100644 index 0000000..2fe211f --- /dev/null +++ b/types/src/reth.rs @@ -0,0 +1,646 @@ +//! Utilities for launching a Reth dev-mode instance. +//! Mostly taken from: https://docs.rs/alloy/latest/alloy/node_bindings/struct.RethInstance.html + +use alloy_genesis::Genesis; +use rand::Rng; +use std::{ + borrow::Cow, + ffi::OsString, + fs::create_dir, + io::{BufRead, BufReader}, + net::SocketAddr, + path::PathBuf, + process::{Child, ChildStdout, Command, Stdio}, + time::{Duration, Instant}, +}; +use url::Url; + +/// Node startup timeout +const NODE_STARTUP_TIMEOUT: Duration = Duration::from_secs(30); + +/// The exposed APIs +const API: &str = "eth,net,web3,txpool,trace,rpc,reth,ots,admin,debug"; + +/// The reth command +const RETH: &str = "reth"; + +/// The default HTTP port for Reth. +const DEFAULT_HTTP_PORT: u16 = 8545; + +/// The default WS port for Reth. +const DEFAULT_WS_PORT: u16 = 8546; + +/// The default auth port for Reth. +const DEFAULT_AUTH_PORT: u16 = 8551; + +/// The default P2P port for Reth. +const DEFAULT_P2P_PORT: u16 = 30303; + +/// Extract value from a log line given a key (copied from alloy-node-bindings utils.rs) +fn extract_value<'a>(key: &str, line: &'a str) -> Option<&'a str> { + let mut key_equal = Cow::from(key); + let mut key_colon = Cow::from(key); + + // Prepare both key variants + if !key_equal.ends_with('=') { + key_equal = format!("{}=", key).into(); + } + if !key_colon.ends_with(": ") { + key_colon = format!("{}: ", key).into(); + } + + // Try to find the key with '=' + if let Some(pos) = line.find(key_equal.as_ref()) { + let start = pos + key_equal.len(); + let end = line[start..] + .find(' ') + .map(|i| start + i) + .unwrap_or(line.len()); + if start <= line.len() && end <= line.len() { + return Some(line[start..end].trim()); + } + } + + // If not found, try to find the key with ': ' + if let Some(pos) = line.find(key_colon.as_ref()) { + let start = pos + key_colon.len(); + let end = line[start..] + .find(',') + .map(|i| start + i) + .unwrap_or(line.len()); + if start <= line.len() && end <= line.len() { + return Some(line[start..end].trim()); + } + } + + // If neither variant matches, return None + None +} + +/// Extract endpoint from a log line (copied from alloy-node-bindings utils.rs) +fn extract_endpoint(key: &str, line: &str) -> Option { + extract_value(key, line) + .map(|val| val.trim_start_matches("Some(").trim_end_matches(')')) + .and_then(|val| val.parse().ok()) +} + +/// Node error types +#[derive(Debug)] +pub enum NodeError { + /// No stdout available + NoStdout, + /// Spawn error + SpawnError(std::io::Error), + /// Create dir error + CreateDirError(std::io::Error), + /// Timeout + Timeout, + /// Read line error + ReadLineError(std::io::Error), + /// Fatal error + Fatal(String), +} + +impl std::fmt::Display for NodeError { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + NodeError::NoStdout => write!(f, "No stdout available"), + NodeError::SpawnError(e) => write!(f, "Spawn error: {}", e), + NodeError::CreateDirError(e) => write!(f, "Create dir error: {}", e), + NodeError::Timeout => write!(f, "Timeout"), + NodeError::ReadLineError(e) => write!(f, "Read line error: {}", e), + NodeError::Fatal(s) => write!(f, "Fatal error: {}", s), + } + } +} + +impl std::error::Error for NodeError {} + +/// A Reth instance. Will close the instance when dropped. +/// +/// Construct this using [`Reth`]. +#[derive(Debug)] +pub struct RethInstance { + pid: Child, + instance: u16, + http_port: u16, + ws_port: u16, + auth_port: Option, + p2p_port: Option, + ipc: Option, + data_dir: Option, + genesis: Option, +} + +impl RethInstance { + /// Returns the instance number of this instance. + pub const fn instance(&self) -> u16 { + self.instance + } + + /// Returns the HTTP port of this instance. + pub const fn http_port(&self) -> u16 { + self.http_port + } + + /// Returns the WS port of this instance. + pub const fn ws_port(&self) -> u16 { + self.ws_port + } + + /// Returns the auth port of this instance. + pub const fn auth_port(&self) -> Option { + self.auth_port + } + + /// Returns the p2p port of this instance. + /// If discovery is disabled, this will be `None`. + pub const fn p2p_port(&self) -> Option { + self.p2p_port + } + + /// Returns the HTTP endpoint of this instance. + #[doc(alias = "http_endpoint")] + pub fn endpoint(&self) -> String { + format!("http://localhost:{}", self.http_port) + } + + /// Returns the Websocket endpoint of this instance. + pub fn ws_endpoint(&self) -> String { + format!("ws://localhost:{}", self.ws_port) + } + + /// Returns the IPC endpoint of this instance. + pub fn ipc_endpoint(&self) -> String { + self.ipc + .clone() + .map_or_else(|| "reth.ipc".to_string(), |ipc| ipc.display().to_string()) + } + + /// Returns the HTTP endpoint url of this instance. + #[doc(alias = "http_endpoint_url")] + pub fn endpoint_url(&self) -> Url { + Url::parse(&self.endpoint()).unwrap() + } + + /// Returns the Websocket endpoint url of this instance. + pub fn ws_endpoint_url(&self) -> Url { + Url::parse(&self.ws_endpoint()).unwrap() + } + + /// Returns the path to this instances' data directory. + pub const fn data_dir(&self) -> Option<&PathBuf> { + self.data_dir.as_ref() + } + + /// Returns the genesis configuration used to configure this instance + pub const fn genesis(&self) -> Option<&Genesis> { + self.genesis.as_ref() + } + + /// Takes the stdout contained in the child process. + /// + /// This leaves a `None` in its place, so calling methods that require a stdout to be present + /// will fail if called after this. + pub fn stdout(&mut self) -> Result { + self.pid.stdout.take().ok_or(NodeError::NoStdout) + } + + /// Send SIGTERM to gracefully shutdown the Reth process and wait for it to exit + pub fn terminate_and_wait(&mut self) -> std::io::Result<()> { + // Send SIGTERM for graceful shutdown + unsafe { + libc::kill(self.pid.id() as i32, libc::SIGTERM); + } + + // Wait for the process to exit + self.pid.wait()?; + Ok(()) + } +} + +impl Drop for RethInstance { + fn drop(&mut self) { + //self.pid.kill().expect("could not kill reth"); + //signal::kill(Pid::from_raw(child.id() as i32), Signal::SIGTERM).unwrap(); + unsafe { + libc::kill(self.pid.id() as i32, libc::SIGTERM); + } + } +} + +/// Builder for launching `reth`. +/// +/// # Panics +/// +/// If `spawn` is called without `reth` being available in the user's $PATH +/// +/// # Example +/// +/// ```no_run +/// use summit_types::reth::Reth; +/// +/// let port = 8545u16; +/// let url = format!("http://localhost:{}", port).to_string(); +/// +/// let reth = Reth::new().instance(1).block_time("12sec").spawn(); +/// +/// drop(reth); // this will kill the instance +/// ``` +#[derive(Clone, Debug, Default)] +#[must_use = "This Builder struct does nothing unless it is `spawn`ed"] +pub struct Reth { + dev: bool, + http_port: u16, + ws_port: u16, + auth_port: u16, + p2p_port: u16, + block_time: Option, + instance: u16, + discovery_enabled: bool, + program: Option, + ipc_path: Option, + ipc_enabled: bool, + data_dir: Option, + chain_or_path: Option, + genesis: Option, + args: Vec, + keep_stdout: bool, +} + +impl Reth { + /// Creates an empty Reth builder. + /// + /// The instance number is set to a random number between 1 and 200 by default to reduce the + /// odds of port conflicts. This can be changed with [`Reth::instance`]. Set to 0 to use the + /// default ports. 200 is the maximum number of instances that can be run set by Reth. + pub fn new() -> Self { + Self { + dev: false, + http_port: DEFAULT_HTTP_PORT, + ws_port: DEFAULT_WS_PORT, + auth_port: DEFAULT_AUTH_PORT, + p2p_port: DEFAULT_P2P_PORT, + block_time: None, + instance: rand::thread_rng().gen_range(1..200), + discovery_enabled: true, + program: None, + ipc_path: None, + ipc_enabled: false, + data_dir: None, + chain_or_path: None, + genesis: None, + args: Vec::new(), + keep_stdout: false, + } + } + + /// Creates a Reth builder which will execute `reth` at the given path. + /// + /// # Example + /// + /// ``` + /// use summit_types::reth::Reth; + /// # fn a() { + /// let reth = Reth::at("../reth/target/release/reth").spawn(); + /// + /// println!("Reth running at `{}`", reth.endpoint()); + /// # } + /// ``` + pub fn at(path: impl Into) -> Self { + Self::new().path(path) + } + + /// Sets the `path` to the `reth` executable + /// + /// By default, it's expected that `reth` is in `$PATH`, see also + /// [`std::process::Command::new()`] + pub fn path>(mut self, path: T) -> Self { + self.program = Some(path.into()); + self + } + + /// Enable `dev` mode for the Reth instance. + pub const fn dev(mut self) -> Self { + self.dev = true; + self + } + + /// Sets the HTTP port for the Reth instance. + /// Note: this resets the instance number to 0 to allow for custom ports. + pub const fn http_port(mut self, http_port: u16) -> Self { + self.http_port = http_port; + self.instance = 0; + self + } + + /// Sets the WS port for the Reth instance. + /// Note: this resets the instance number to 0 to allow for custom ports. + pub const fn ws_port(mut self, ws_port: u16) -> Self { + self.ws_port = ws_port; + self.instance = 0; + self + } + + /// Sets the auth port for the Reth instance. + /// Note: this resets the instance number to 0 to allow for custom ports. + pub const fn auth_port(mut self, auth_port: u16) -> Self { + self.auth_port = auth_port; + self.instance = 0; + self + } + + /// Sets the p2p port for the Reth instance. + /// Note: this resets the instance number to 0 to allow for custom ports. + pub const fn p2p_port(mut self, p2p_port: u16) -> Self { + self.p2p_port = p2p_port; + self.instance = 0; + self + } + + /// Sets the block time for the Reth instance. + /// Parses strings using + /// This is only used if `dev` mode is enabled. + pub fn block_time(mut self, block_time: &str) -> Self { + self.block_time = Some(block_time.to_string()); + self + } + + /// Disables discovery for the Reth instance. + pub const fn disable_discovery(mut self) -> Self { + self.discovery_enabled = false; + self + } + + /// Sets the chain name or path to a chain spec for the Reth instance. + /// Passed through to `reth --chain `. + pub fn chain_or_path(mut self, chain_or_path: &str) -> Self { + self.chain_or_path = Some(chain_or_path.to_string()); + self + } + + /// Enable IPC for the Reth instance. + pub const fn enable_ipc(mut self) -> Self { + self.ipc_enabled = true; + self + } + + /// Sets the instance number for the Reth instance. Set to 0 to use the default ports. + /// By default, a random number between 1 and 200 is used. + pub const fn instance(mut self, instance: u16) -> Self { + self.instance = instance; + self + } + + /// Sets the IPC path for the socket. + pub fn ipc_path>(mut self, path: T) -> Self { + self.ipc_path = Some(path.into()); + self + } + + /// Sets the data directory for reth. + pub fn data_dir>(mut self, path: T) -> Self { + self.data_dir = Some(path.into()); + self + } + + /// Sets the `genesis.json` for the Reth instance. + /// + /// If this is set, reth will be initialized with `reth init` and the `--datadir` option will be + /// set to the same value as `data_dir`. + /// + /// This is destructive and will overwrite any existing data in the data directory. + pub fn genesis(mut self, genesis: Genesis) -> Self { + self.genesis = Some(genesis); + self + } + + /// Keep the handle to reth's stdout in order to read from it. + /// + /// Caution: if the stdout handle isn't used, this can end up blocking. + pub const fn keep_stdout(mut self) -> Self { + self.keep_stdout = true; + self + } + + /// Adds an argument to pass to `reth`. + /// + /// Pass any arg that is not supported by the builder. + pub fn arg>(mut self, arg: T) -> Self { + self.args.push(arg.into()); + self + } + + /// Adds multiple arguments to pass to `reth`. + /// + /// Pass any args that is not supported by the builder. + pub fn args(mut self, args: I) -> Self + where + I: IntoIterator, + S: Into, + { + for arg in args { + self = self.arg(arg); + } + self + } + + /// Consumes the builder and spawns `reth`. + /// + /// # Panics + /// + /// If spawning the instance fails at any point. + #[track_caller] + pub fn spawn(self) -> RethInstance { + self.try_spawn().unwrap() + } + + /// Consumes the builder and spawns `reth`. If spawning fails, returns an error. + pub fn try_spawn(self) -> Result { + let bin_path = self + .program + .as_ref() + .map_or_else(|| RETH.as_ref(), |bin| bin.as_os_str()) + .to_os_string(); + let mut cmd = Command::new(&bin_path); + // `reth` uses stdout for its logs + cmd.stdout(Stdio::piped()); + + // Use Reth's `node` subcommand. + cmd.arg("node"); + + // Set the ports if they are not the default. + if self.http_port != DEFAULT_HTTP_PORT { + cmd.arg("--http.port").arg(self.http_port.to_string()); + } + + if self.ws_port != DEFAULT_WS_PORT { + cmd.arg("--ws.port").arg(self.ws_port.to_string()); + } + + if self.auth_port != DEFAULT_AUTH_PORT { + cmd.arg("--authrpc.port").arg(self.auth_port.to_string()); + } + + if self.p2p_port != DEFAULT_P2P_PORT { + cmd.arg("--discovery.port").arg(self.p2p_port.to_string()); + } + + // If the `dev` flag is set, enable it. + if self.dev { + // Enable the dev mode. + // This mode uses a local proof-of-authority consensus engine with either fixed block + // times or automatically mined blocks. + // Disables network discovery and enables local http server. + // Prefunds 20 accounts derived by mnemonic "test test test test test test test test + // test test test junk" with 10 000 ETH each. + cmd.arg("--dev"); + + // If the block time is set, use it. + if let Some(block_time) = self.block_time { + cmd.arg("--dev.block-time").arg(block_time); + } + } + + // If IPC is not enabled on the builder, disable it. + if !self.ipc_enabled { + cmd.arg("--ipcdisable"); + } + + // Open the HTTP API. + cmd.arg("--http"); + cmd.arg("--http.api").arg(API); + + // Open the WS API. + cmd.arg("--ws"); + cmd.arg("--ws.api").arg(API); + + // Configure the IPC path if it is set. + if let Some(ipc) = &self.ipc_path { + cmd.arg("--ipcpath").arg(ipc); + } + + // If the instance is set, use it. + // Set the `instance` to 0 to use the default ports. + // By defining a custom `http_port`, `ws_port`, `auth_port`, or `p2p_port`, the instance + // number will be set to 0 automatically. + if self.instance > 0 { + cmd.arg("--instance").arg(self.instance.to_string()); + } + + if let Some(data_dir) = &self.data_dir { + cmd.arg("--datadir").arg(data_dir); + + // create the directory if it doesn't exist + if !data_dir.exists() { + create_dir(data_dir).map_err(NodeError::CreateDirError)?; + } + } + + if self.discovery_enabled { + // Verbosity is required to read the P2P port from the logs. + cmd.arg("--verbosity").arg("-vvv"); + } else { + cmd.arg("--disable-discovery"); + cmd.arg("--no-persist-peers"); + } + + if let Some(chain_or_path) = self.chain_or_path { + cmd.arg("--chain").arg(chain_or_path); + } + + // Disable color output to make parsing logs easier. + cmd.arg("--color").arg("never"); + + // Add any additional arguments. + cmd.args(self.args); + + let mut child = cmd.spawn().map_err(NodeError::SpawnError)?; + + let stdout = child.stdout.take().ok_or(NodeError::NoStdout)?; + + let start = Instant::now(); + let mut reader = BufReader::new(stdout); + + let mut http_port = 0; + let mut ws_port = 0; + let mut auth_port = 0; + let mut p2p_port = 0; + + let mut ports_started = false; + let mut p2p_started = !self.discovery_enabled; + + loop { + if start + NODE_STARTUP_TIMEOUT <= Instant::now() { + let _ = child.kill(); + return Err(NodeError::Timeout); + } + + let mut line = String::with_capacity(120); + reader + .read_line(&mut line) + .map_err(NodeError::ReadLineError)?; + + if line.contains("RPC HTTP server started") { + if let Some(addr) = extract_endpoint("url=", &line) { + http_port = addr.port(); + } + } + + if line.contains("RPC WS server started") { + if let Some(addr) = extract_endpoint("url=", &line) { + ws_port = addr.port(); + } + } + + if line.contains("RPC auth server started") { + if let Some(addr) = extract_endpoint("url=", &line) { + auth_port = addr.port(); + } + } + + // Encountered a critical error, exit early. + if line.contains("ERROR") { + let _ = child.kill(); + return Err(NodeError::Fatal(line)); + } + + if http_port != 0 && ws_port != 0 && auth_port != 0 { + ports_started = true; + } + + if self.discovery_enabled { + if line.contains("Updated local ENR") { + if let Some(port) = extract_endpoint("IpV4 UDP Socket", &line) { + p2p_port = port.port(); + p2p_started = true; + } + } + } else { + p2p_started = true; + } + + // If all ports have started we are ready to be queried. + if ports_started && p2p_started { + break; + } + } + + if self.keep_stdout { + // re-attach the stdout handle if requested + child.stdout = Some(reader.into_inner()); + } + + Ok(RethInstance { + pid: child, + instance: self.instance, + http_port, + ws_port, + p2p_port: (p2p_port != 0).then_some(p2p_port), + ipc: self.ipc_path, + data_dir: self.data_dir, + auth_port: Some(auth_port), + genesis: self.genesis, + }) + } +}