diff --git a/Cargo.lock b/Cargo.lock index 08b6cb4..fd22013 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5783,6 +5783,7 @@ dependencies = [ "rand 0.8.5", "serde", "serde_json", + "tokio", "toml", "tracing", "url", diff --git a/finalizer/src/actor.rs b/finalizer/src/actor.rs index 24974aa..bc9bd4f 100644 --- a/finalizer/src/actor.rs +++ b/finalizer/src/actor.rs @@ -1,3 +1,4 @@ +use crate::archive::backup_with_enclave; use crate::db::{Config as StateConfig, FinalizerState}; use crate::{FinalizerConfig, FinalizerMailbox, FinalizerMessage}; use alloy_eips::eip4895::Withdrawal; @@ -39,7 +40,7 @@ use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch} use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature}; use summit_types::{EngineClient, consensus_state::ConsensusState}; use tokio_util::sync::CancellationToken; -use tracing::{debug, info, warn}; +use tracing::{debug, error, info, warn}; const WRITE_BUFFER: NonZero = NZUsize!(1024 * 1024); @@ -57,6 +58,7 @@ pub struct Finalizer< S: Signer, V: Variant, > { + archive_mode: bool, mailbox: mpsc::Receiver, Block>>, pending_height_notifys: BTreeMap<(u64, Digest), Vec>>, context: ContextCell, @@ -137,6 +139,7 @@ impl< ( Self { + archive_mode: cfg.archive_mode, context: ContextCell::new(context), mailbox: rx, engine_client: cfg.engine_client, @@ -303,7 +306,7 @@ impl< "executing finalized block directly (no prior fork state)" ); execute_block( - &self.engine_client, + &mut self.engine_client, &self.context, &block, &mut self.canonical_state, @@ -445,6 +448,18 @@ impl< } } + if self.archive_mode { + // Should always be there + if let Some(checkpoint) = &self.canonical_state.pending_checkpoint { + if let Err(e) = + backup_with_enclave(self.canonical_state.epoch, checkpoint.clone()) + { + // This shouldnt be critical but it should be logged + error!("Unable to backup with enclave: {}", e); + } + } + } + #[cfg(feature = "prom")] let db_operations_start = Instant::now(); // This pending checkpoint should always exist, because it was created at the previous height. @@ -591,7 +606,7 @@ impl< // Execute the block into the cloned parent state execute_block( - &self.engine_client, + &mut self.engine_client, &self.context, &block, &mut fork_state, @@ -769,7 +784,7 @@ async fn execute_block< V: Variant, R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng, >( - engine_client: &C, + engine_client: &mut C, context: &ContextCell, block: &Block, state: &mut ConsensusState, diff --git a/finalizer/src/archive.rs b/finalizer/src/archive.rs new file mode 100644 index 0000000..ba14961 --- /dev/null +++ b/finalizer/src/archive.rs @@ -0,0 +1,50 @@ +use std::{ + io::{Read as _, Write as _}, + os::unix::net::UnixStream, +}; + +use commonware_codec::Encode as _; +use summit_types::checkpoint::Checkpoint; +use tracing::info; + +const ENCLAVE_SOCKET_PATH: &str = "/tmp/reth_enclave_socket.ipc"; + +pub(crate) fn backup_with_enclave(epoch: u64, checkpoint: Checkpoint) -> std::io::Result<()> { + info!("Starting backup procedure with enclave for epoch {epoch}"); + // Connect to socket + let mut stream = UnixStream::connect(ENCLAVE_SOCKET_PATH)?; + + info!("Connected to enclave unix socket"); + // Send epoch that is being backed up + stream.write_all(&epoch.to_le_bytes())?; + info!("Sent current epoch to enclave, waiting for ack"); + // wait for ack + wait_for_ack(&mut stream)?; + // send checkpoint length + let checkpoint_bytes = checkpoint.encode(); + + let len = checkpoint_bytes.len() as u32; + stream.write_all(&len.to_le_bytes())?; + + // send rest of data + stream.write_all(&checkpoint_bytes)?; + // block until final ack + wait_for_ack(&mut stream)?; + + Ok(()) +} + +fn wait_for_ack(stream: &mut UnixStream) -> std::io::Result<()> { + let mut buffer = [0; 3]; + stream.read_exact(&mut buffer)?; + + if &buffer == b"ACK" { + println!("Received ACK"); + Ok(()) + } else { + Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Expected ACK but got something else", + )) + } +} diff --git a/finalizer/src/config.rs b/finalizer/src/config.rs index c3c2ffe..47a9589 100644 --- a/finalizer/src/config.rs +++ b/finalizer/src/config.rs @@ -7,6 +7,7 @@ use summit_types::{EngineClient, PublicKey, consensus_state::ConsensusState}; use tokio_util::sync::CancellationToken; pub struct FinalizerConfig, V: Variant> { + pub archive_mode: bool, pub mailbox_size: usize, pub db_prefix: String, pub engine_client: C, diff --git a/finalizer/src/lib.rs b/finalizer/src/lib.rs index d92c241..f518091 100644 --- a/finalizer/src/lib.rs +++ b/finalizer/src/lib.rs @@ -3,4 +3,5 @@ pub use ingress::*; pub mod config; pub use config::*; pub mod actor; +mod archive; pub mod db; diff --git a/node/src/args.rs b/node/src/args.rs index e10cdbb..666a748 100644 --- a/node/src/args.rs +++ b/node/src/args.rs @@ -124,9 +124,18 @@ pub struct RunFlags { /// Path to a checkpoint file #[arg(long)] pub checkpoint_path: Option, + + /// Path to a checkpoint file. If not there summit will start normally + #[arg(long)] + pub checkpoint_or_default: Option, + /// IP address for this node (optional, will use genesis if not provided) #[arg(long)] pub ip: Option, + + /// Start this mode on archive mode and store a checkpoint for every epoch + #[arg(long)] + pub archive_mode: bool, } impl Command { @@ -165,15 +174,31 @@ impl Command { console_subscriber::init(); } - let maybe_checkpoint = flags.checkpoint_path.as_ref().map(|path| { - // TODO(matthias): verify the checkpoint - let checkpoint_bytes: Vec = - std::fs::read(path).expect("failed to read checkpoint from disk"); - let checkpoint = - Checkpoint::from_ssz_bytes(&checkpoint_bytes).expect("failed to parse checkpoint"); - ConsensusState::try_from(checkpoint) - .expect("failed to create consensus state from checkpoint") - }); + let maybe_checkpoint = if let Some(checkpoint) = &flags.checkpoint_or_default { + if std::fs::exists(checkpoint).unwrap_or_default() { + // TODO(matthias): verify the checkpoint + let checkpoint_bytes: Vec = + std::fs::read(checkpoint).expect("failed to read checkpoint from disk"); + let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_bytes) + .expect("failed to parse checkpoint"); + let state = ConsensusState::try_from(checkpoint) + .expect("failed to create consensus state from checkpoint"); + + Some(state) + } else { + None + } + } else { + flags.checkpoint_path.as_ref().map(|path| { + // TODO(matthias): verify the checkpoint + let checkpoint_bytes: Vec = + std::fs::read(path).expect("failed to read checkpoint from disk"); + let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_bytes) + .expect("failed to parse checkpoint"); + ConsensusState::try_from(checkpoint) + .expect("failed to create consensus state from checkpoint") + }) + }; let store_path = get_expanded_path(&flags.store_path).expect("Invalid store path"); let key_store = expect_key_store(&flags.key_store_path); @@ -342,6 +367,7 @@ impl Command { flags.db_prefix.clone(), &genesis, initial_state, + flags.archive_mode, ) .unwrap(); @@ -533,6 +559,7 @@ pub fn run_node_local( flags.db_prefix.clone(), &genesis, initial_state, + flags.archive_mode, ) .unwrap(); diff --git a/node/src/bin/execute_blocks.rs b/node/src/bin/execute_blocks.rs index 136c1d6..32a2680 100644 --- a/node/src/bin/execute_blocks.rs +++ b/node/src/bin/execute_blocks.rs @@ -73,7 +73,7 @@ async fn main() -> Result<()> { let client = HistoricalEngineClient::new(engine_ipc_path.clone(), block_dir.clone()).await; #[allow(unused)] #[cfg(feature = "bench")] - let client = EthereumHistoricalEngineClient::new(engine_ipc_path, block_dir).await; + let mut client = EthereumHistoricalEngineClient::new(engine_ipc_path, block_dir).await; // Load and commit blocks to Reth let genesis_hash: [u8; 32] = from_hex_formatted(genesis_hash_str) diff --git a/node/src/bin/stake_and_checkpoint.rs b/node/src/bin/stake_and_checkpoint.rs index 72985a0..d8fdc36 100644 --- a/node/src/bin/stake_and_checkpoint.rs +++ b/node/src/bin/stake_and_checkpoint.rs @@ -19,7 +19,6 @@ use clap::Parser; use commonware_cryptography::Sha256; use commonware_cryptography::{Hasher, PrivateKeyExt, Signer, bls12381, ed25519::PrivateKey}; use commonware_runtime::{Clock, Metrics as _, Runner as _, Spawner as _, tokio as cw_tokio}; -use commonware_utils::from_hex_formatted; use futures::{FutureExt, pin_mut}; use ssz::Decode; use std::collections::VecDeque; @@ -40,6 +39,7 @@ use summit_types::consensus_state::ConsensusState; use summit_types::execution_request::DepositRequest; use summit_types::execution_request::compute_deposit_data_root; use summit_types::reth::Reth; +use summit_types::rpc::CheckpointRes; use tokio::sync::mpsc; use tracing::Level; @@ -328,7 +328,7 @@ fn main() -> Result<(), Box> { // Retrieve checkpoint from first node println!("Retrieving checkpoint from node 0"); let checkpoint_state = loop { - match get_checkpoint(node0_rpc_port).await { + match get_latest_checkpoint(node0_rpc_port).await { Ok(Some(checkpoint)) => { let state = ConsensusState::try_from(&checkpoint) .expect("Failed to parse checkpoint"); @@ -629,15 +629,17 @@ async fn get_latest_height(rpc_port: u16) -> Result Result, Box> { - let url = format!("http://localhost:{}/get_checkpoint", rpc_port); +async fn get_latest_checkpoint( + rpc_port: u16, +) -> Result, Box> { + let url = format!("http://localhost:{}/get_latest_checkpoint", rpc_port); let response = reqwest::get(&url).await; match response { Ok(resp) if resp.status().is_success() => { - let hex_str = resp.text().await?; - let bytes = from_hex_formatted(&hex_str).ok_or("Failed to decode hex")?; - let checkpoint = Checkpoint::from_ssz_bytes(&bytes) + let checkpoint_resp: CheckpointRes = resp.json().await?; + // let bytes = from_hex_formatted(&hex_str).ok_or("Failed to decode hex")?; + let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_resp.checkpoint) .map_err(|e| format!("Failed to decode checkpoint: {:?}", e))?; Ok(Some(checkpoint)) } @@ -770,6 +772,7 @@ fn get_node_flags(node: usize) -> RunFlags { let path = format!("testnet/node{node}/"); RunFlags { + archive_mode: false, key_store_path: path.clone(), store_path: format!("{path}db"), port: (26600 + (node * 10)) as u16, @@ -784,6 +787,7 @@ fn get_node_flags(node: usize) -> RunFlags { #[cfg(any(feature = "base-bench", feature = "bench"))] bench_block_dir: None, checkpoint_path: None, + checkpoint_or_default: None, ip: None, } } diff --git a/node/src/bin/testnet.rs b/node/src/bin/testnet.rs index 8f52d34..6b7c063 100644 --- a/node/src/bin/testnet.rs +++ b/node/src/bin/testnet.rs @@ -204,6 +204,8 @@ fn get_node_flags(node: usize) -> RunFlags { #[cfg(any(feature = "base-bench", feature = "bench"))] bench_block_dir: None, checkpoint_path: None, + checkpoint_or_default: None, ip: None, + archive_mode: false, } } diff --git a/node/src/bin/withdraw_and_exit.rs b/node/src/bin/withdraw_and_exit.rs index 4ab02ad..d4cb958 100644 --- a/node/src/bin/withdraw_and_exit.rs +++ b/node/src/bin/withdraw_and_exit.rs @@ -382,6 +382,7 @@ fn get_node_flags(node: usize) -> RunFlags { let path = format!("testnet/node{node}/"); RunFlags { + archive_mode: false, key_store_path: path.clone(), store_path: format!("{path}db"), port: (26600 + (node * 10)) as u16, @@ -396,6 +397,7 @@ fn get_node_flags(node: usize) -> RunFlags { #[cfg(any(feature = "base-bench", feature = "bench"))] bench_block_dir: None, checkpoint_path: None, + checkpoint_or_default: None, ip: None, } } diff --git a/node/src/config.rs b/node/src/config.rs index dc5b897..c1468ad 100644 --- a/node/src/config.rs +++ b/node/src/config.rs @@ -55,6 +55,7 @@ pub struct EngineConfig> @@ -68,6 +69,7 @@ impl> db_prefix: String, genesis: &Genesis, initial_state: ConsensusState, + archive_mode: bool, ) -> Result { Ok(Self { engine_client, @@ -94,6 +96,7 @@ impl> .expect("bad eth_genesis_hash") .expect("bad eth_genesis_hash"), initial_state, + archive_mode, }) } } diff --git a/node/src/engine.rs b/node/src/engine.rs index 31f3316..edc44e0 100644 --- a/node/src/engine.rs +++ b/node/src/engine.rs @@ -271,6 +271,7 @@ where let (finalizer, initial_state, finalizer_mailbox) = Finalizer::new( context.with_label("finalizer"), FinalizerConfig { + archive_mode: cfg.archive_mode, mailbox_size: cfg.mailbox_size, db_prefix: cfg.partition_prefix.clone(), engine_client: cfg.engine_client, diff --git a/node/src/test_harness/common.rs b/node/src/test_harness/common.rs index c82b4e3..1045b45 100644 --- a/node/src/test_harness/common.rs +++ b/node/src/test_harness/common.rs @@ -536,6 +536,7 @@ pub fn get_default_engine_config< fetch_concurrent: 10, fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()), initial_state, + archive_mode: false, } } diff --git a/node/src/test_harness/mock_engine_client.rs b/node/src/test_harness/mock_engine_client.rs index 00da37e..561974b 100644 --- a/node/src/test_harness/mock_engine_client.rs +++ b/node/src/test_harness/mock_engine_client.rs @@ -312,7 +312,7 @@ impl MockEngineState { impl EngineClient for MockEngineClient { #[allow(unused)] async fn start_building_block( - &self, + &mut self, fork_choice_state: ForkchoiceState, timestamp: u64, withdrawals: Vec, @@ -372,7 +372,7 @@ impl EngineClient for MockEngineClient { Some(payload_id) } - async fn get_payload(&self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { + async fn get_payload(&mut self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { let state = self.state.lock().unwrap(); state @@ -382,7 +382,7 @@ impl EngineClient for MockEngineClient { .expect("Payload ID not found") } - async fn check_payload(&self, block: &Block) -> PayloadStatus { + async fn check_payload(&mut self, block: &Block) -> PayloadStatus { let mut state = self.state.lock().unwrap(); if state.force_invalid { @@ -418,7 +418,7 @@ impl EngineClient for MockEngineClient { status } - async fn commit_hash(&self, fork_choice_state: ForkchoiceState) { + async fn commit_hash(&mut self, fork_choice_state: ForkchoiceState) { let mut state = self.state.lock().unwrap(); // Update current head @@ -634,7 +634,7 @@ mod tests { #[tokio::test] async fn test_basic_engine_client() { let genesis_hash = [0; 32]; - let client = MockEngineClient::new( + let mut client = MockEngineClient::new( "test".to_string(), genesis_hash, Arc::new(Mutex::new(HashMap::new())), @@ -705,8 +705,8 @@ mod tests { let genesis_hash = [0; 32]; let network = MockEngineNetwork::new(genesis_hash); - let client1 = network.create_client("client1".to_string()); - let client2 = network.create_client("client2".to_string()); + let mut client1 = network.create_client("client1".to_string()); + let mut client2 = network.create_client("client2".to_string()); // Start in consensus assert!(network.verify_consensus(None, None).is_ok()); @@ -787,10 +787,10 @@ mod tests { // Simulate 3 rounds of block production for round in 1..=3 { - let producer = match round % 3 { - 1 => &client1, - 2 => &client2, - _ => &client3, + let mut producer = match round % 3 { + 1 => client1.clone(), + 2 => client2.clone(), + _ => client3.clone(), }; // Producer builds a block @@ -810,7 +810,7 @@ mod tests { ) .await .unwrap(); - let envelope = producer.get_payload(payload_id).await; + let envelope = producer.get_payload(payload_id.clone()).await; let new_block = envelope.envelope_inner.execution_payload.clone(); let new_fork_choice = ForkchoiceState { @@ -823,7 +823,7 @@ mod tests { producer.commit_hash(new_fork_choice).await; // Simulate network propagation - all other clients get the block via Engine API - for client in [&client1, &client2, &client3] { + for mut client in [client1.clone(), client2.clone(), client3.clone()] { if client.client_id() != producer.client_id() { // Each client validates the block (like receiving it from network) let block_for_validation = Block::::compute_digest( @@ -870,8 +870,8 @@ mod tests { let genesis_hash = [0; 32]; let network = MockEngineNetwork::new(genesis_hash); - let client1 = network.create_client("client1".to_string()); - let client2 = network.create_client("client2".to_string()); + let mut client1 = network.create_client("client1".to_string()); + let mut client2 = network.create_client("client2".to_string()); // Create a withdrawal for testing let withdrawal = Withdrawal { @@ -958,8 +958,8 @@ mod tests { let genesis_hash = [0; 32]; let network = MockEngineNetwork::new(genesis_hash); - let client1 = network.create_client("client1".to_string()); - let client2 = network.create_client("client2".to_string()); + let mut client1 = network.create_client("client1".to_string()); + let mut client2 = network.create_client("client2".to_string()); let client3 = network.create_client("client3".to_string()); // Start in consensus diff --git a/rpc/src/routes.rs b/rpc/src/routes.rs index f904a44..4e5bf2b 100644 --- a/rpc/src/routes.rs +++ b/rpc/src/routes.rs @@ -1,5 +1,6 @@ use std::sync::Arc; +use crate::{GenesisRpcState, PathSender, RpcState}; use alloy_primitives::{Address, U256, hex::FromHex as _}; use axum::{ Json, Router, @@ -10,17 +11,16 @@ use commonware_codec::{DecodeExt as _, Encode as _}; use commonware_consensus::Block as ConsensusBlock; use commonware_consensus::simplex::signing_scheme::Scheme; use commonware_cryptography::{Committable, Hasher as _, Sha256, Signer as _}; -use commonware_utils::{from_hex_formatted, hex}; +use commonware_utils::from_hex_formatted; use serde::{Deserialize, Serialize}; use ssz::Encode; use summit_types::{ KeyPaths, PROTOCOL_VERSION, PublicKey, execution_request::{DepositRequest, compute_deposit_data_root}, + rpc::{CheckpointInfoRes, CheckpointRes}, utils::get_expanded_path, }; -use crate::{GenesisRpcState, PathSender, RpcState}; - #[derive(Serialize)] struct PublicKeysResponse { node: String, @@ -54,7 +54,18 @@ impl RpcRoutes { Router::new() .route("/health", get(Self::handle_health_check)) .route("/get_public_keys", get(Self::handle_get_pub_keys::)) - .route("/get_checkpoint", get(Self::handle_get_checkpoint::)) + .route( + "/get_checkpoint/{epoch}", + get(Self::handle_get_checkpoint::), + ) + .route( + "/get_latest_checkpoint", + get(Self::handle_get_latest_checkpoint), + ) + .route( + "/get_latest_checkpoint_info", + get(Self::handle_get_latest_checkpoint_info), + ) .route( "/get_latest_height", get(Self::handle_latest_height::), @@ -188,18 +199,55 @@ impl RpcRoutes { async fn handle_get_checkpoint( State(state): State>>, - ) -> Result { + Path(epoch): Path, + ) -> Result, String> { + let maybe_checkpoint = state.finalizer_mailbox.clone().get_checkpoint(epoch).await; + let Some(checkpoint) = maybe_checkpoint else { + return Err("checkpoint not found".into()); + }; + + Ok(Json(CheckpointRes { + checkpoint: checkpoint.data.into(), + digest: checkpoint.digest.0, + epoch, + })) + } + + async fn handle_get_latest_checkpoint( + State(state): State>>, + ) -> Result, String> { let maybe_checkpoint = state .finalizer_mailbox .clone() .get_latest_checkpoint() .await; - let (Some(checkpoint), _) = maybe_checkpoint else { + let (Some(checkpoint), epoch) = maybe_checkpoint else { return Err("checkpoint not found".into()); }; - let encoded = checkpoint.as_ssz_bytes(); - Ok(hex(&encoded)) + Ok(Json(CheckpointRes { + checkpoint: checkpoint.as_ssz_bytes(), + digest: checkpoint.digest.0, + epoch, + })) + } + + async fn handle_get_latest_checkpoint_info( + State(state): State>>, + ) -> Result, String> { + let maybe_checkpoint = state + .finalizer_mailbox + .clone() + .get_latest_checkpoint() + .await; + let (Some(checkpoint), epoch) = maybe_checkpoint else { + return Err("checkpoint not found".into()); + }; + + Ok(Json(CheckpointInfoRes { + epoch, + digest: checkpoint.digest.0, + })) } async fn handle_latest_height( diff --git a/types/Cargo.toml b/types/Cargo.toml index a5b0d7f..2dc2ce3 100644 --- a/types/Cargo.toml +++ b/types/Cargo.toml @@ -30,6 +30,7 @@ dirs.workspace = true ethereum_ssz.workspace = true serde.workspace = true toml.workspace = true +tokio.workspace = true tracing.workspace = true bytes.workspace = true diff --git a/types/src/engine_client.rs b/types/src/engine_client.rs index 2c29dcc..e610000 100644 --- a/types/src/engine_client.rs +++ b/types/src/engine_client.rs @@ -32,7 +32,7 @@ use std::future::Future; pub trait EngineClient: Clone + Send + Sync + 'static { fn start_building_block( - &self, + &mut self, fork_choice_state: ForkchoiceState, timestamp: u64, withdrawals: Vec, @@ -40,34 +40,58 @@ pub trait EngineClient: Clone + Send + Sync + 'static { ) -> impl Future> + Send; fn get_payload( - &self, + &mut self, payload_id: PayloadId, ) -> impl Future + Send; fn check_payload( - &self, + &mut self, block: &Block, ) -> impl Future + Send; - fn commit_hash(&self, fork_choice_state: ForkchoiceState) -> impl Future + Send; + fn commit_hash( + &mut self, + fork_choice_state: ForkchoiceState, + ) -> impl Future + Send; } #[derive(Clone)] pub struct RethEngineClient { + engine_ipc_path: String, provider: RootProvider, } impl RethEngineClient { pub async fn new(engine_ipc_path: String) -> Self { - let ipc = IpcConnect::new(engine_ipc_path); + let ipc = IpcConnect::new(engine_ipc_path.clone()); let provider = ProviderBuilder::default().connect_ipc(ipc).await.unwrap(); - Self { provider } + Self { + provider, + engine_ipc_path, + } + } + + pub async fn wait_until_reconnect_available(&mut self) { + loop { + let ipc = IpcConnect::new(self.engine_ipc_path.clone()); + + match ProviderBuilder::default().connect_ipc(ipc).await { + Ok(provider) => { + self.provider = provider; + break; + } + Err(e) => { + error!("Failed to connect to IPC, retrying: {}", e); + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } + } + } } } impl EngineClient for RethEngineClient { async fn start_building_block( - &self, + &mut self, fork_choice_state: ForkchoiceState, timestamp: u64, withdrawals: Vec, @@ -82,11 +106,22 @@ impl EngineClient for RethEngineClient { // todo(dalton): we should make this something that we can associate with the simplex height parent_beacon_block_root: Some([1; 32].into()), }; - let res = self + + let res = match self .provider - .fork_choice_updated_v3(fork_choice_state, Some(payload_attributes)) + .fork_choice_updated_v3(fork_choice_state, Some(payload_attributes.clone())) .await - .unwrap(); + { + Ok(res) => res, + Err(e) if e.is_transport_error() => { + self.wait_until_reconnect_available().await; + self.provider + .fork_choice_updated_v3(fork_choice_state, Some(payload_attributes)) + .await + .expect("Failed to update fork choice after reconnect") + } + Err(_) => panic!("Unable to get a response"), + }; if res.is_invalid() { error!("invalid returned for forkchoice state {fork_choice_state:?}: {res:?}"); @@ -98,12 +133,23 @@ impl EngineClient for RethEngineClient { res.payload_id } - async fn get_payload(&self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { - self.provider.get_payload_v4(payload_id).await.unwrap() + async fn get_payload(&mut self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { + match self.provider.get_payload_v4(payload_id).await { + Ok(res) => res, + Err(e) if e.is_transport_error() => { + self.wait_until_reconnect_available().await; + self.provider + .get_payload_v4(payload_id) + .await + .expect("Failed to get payload after reconnect") + } + Err(_) => panic!("Unable to get a response"), + } } - async fn check_payload(&self, block: &Block) -> PayloadStatus { - self.provider + async fn check_payload(&mut self, block: &Block) -> PayloadStatus { + match self + .provider .new_payload_v4( block.payload.clone(), Vec::new(), @@ -111,14 +157,40 @@ impl EngineClient for RethEngineClient { block.execution_requests.clone(), ) .await - .unwrap() + { + Ok(res) => res, + Err(e) if e.is_transport_error() => { + self.wait_until_reconnect_available().await; + self.provider + .new_payload_v4( + block.payload.clone(), + Vec::new(), + [1; 32].into(), + block.execution_requests.clone(), + ) + .await + .expect("Failed to check payload after reconnect") + } + Err(_) => panic!("Unable to get a response"), + } } - async fn commit_hash(&self, fork_choice_state: ForkchoiceState) { - self.provider + async fn commit_hash(&mut self, fork_choice_state: ForkchoiceState) { + let _ = match self + .provider .fork_choice_updated_v3(fork_choice_state, None) .await - .unwrap(); + { + Ok(res) => res, + Err(e) if e.is_transport_error() => { + self.wait_until_reconnect_available().await; + self.provider + .fork_choice_updated_v3(fork_choice_state, None) + .await + .expect("Failed to get payload after reconnect") + } + Err(_) => panic!("Unable to get a response"), + }; } } @@ -170,7 +242,7 @@ pub mod base_benchmarking { impl EngineClient for HistoricalEngineClient { async fn start_building_block( - &self, + &mut self, fork_choice_state: ForkchoiceState, _timestamp: u64, _withdrawals: Vec, @@ -188,7 +260,7 @@ pub mod base_benchmarking { } } - async fn get_payload(&self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { + async fn get_payload(&mut self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { let block_num = u64::from_le_bytes(payload_id.0.into()); let filename = self .block_index @@ -221,7 +293,10 @@ pub mod base_benchmarking { } } - async fn check_payload(&self, block: &Block) -> PayloadStatus { + async fn check_payload( + &mut self, + block: &Block, + ) -> PayloadStatus { let timestamp = block.payload.payload_inner.payload_inner.timestamp; let canyon_activation = 1704992401u64; // January 11, 2024 - Canyon activation on Base @@ -282,7 +357,7 @@ pub mod base_benchmarking { } } - async fn commit_hash(&self, fork_choice_state: ForkchoiceState) { + async fn commit_hash(&mut self, fork_choice_state: ForkchoiceState) { self.provider .fork_choice_updated_v3(fork_choice_state, None) .await @@ -362,7 +437,7 @@ pub mod benchmarking { impl EngineClient for EthereumHistoricalEngineClient { async fn start_building_block( - &self, + &mut self, _fork_choice_state: ForkchoiceState, _timestamp: u64, _withdrawals: Vec, @@ -372,7 +447,7 @@ pub mod benchmarking { Some(PayloadId::new(next_block_num.to_le_bytes())) } - async fn get_payload(&self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { + async fn get_payload(&mut self, payload_id: PayloadId) -> ExecutionPayloadEnvelopeV4 { let block_num = u64::from_le_bytes(payload_id.0.into()); let filename = format!("block-{block_num}"); let file_path = self.block_dir.join(filename); @@ -398,7 +473,10 @@ pub mod benchmarking { } } - async fn check_payload(&self, block: &Block) -> PayloadStatus { + async fn check_payload( + &mut self, + block: &Block, + ) -> PayloadStatus { // For Ethereum, use standard engine_newPayloadV4 without Optimism-specific logic self.provider .new_payload_v4( @@ -411,7 +489,7 @@ pub mod benchmarking { .unwrap() } - async fn commit_hash(&self, fork_choice_state: ForkchoiceState) { + async fn commit_hash(&mut self, fork_choice_state: ForkchoiceState) { self.provider .fork_choice_updated_v3(fork_choice_state, None) .await diff --git a/types/src/lib.rs b/types/src/lib.rs index 7797e6f..650ba88 100644 --- a/types/src/lib.rs +++ b/types/src/lib.rs @@ -12,6 +12,7 @@ pub mod keystore; pub mod network_oracle; #[cfg(feature = "e2e")] pub mod reth; +pub mod rpc; pub mod scheme; pub mod utils; pub mod withdrawal; diff --git a/types/src/rpc.rs b/types/src/rpc.rs new file mode 100644 index 0000000..7905507 --- /dev/null +++ b/types/src/rpc.rs @@ -0,0 +1,14 @@ +use serde::{Deserialize, Serialize}; + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct CheckpointRes { + pub checkpoint: Vec, + pub digest: [u8; 32], + pub epoch: u64, +} + +#[derive(Debug, Serialize, Deserialize, Clone)] +pub struct CheckpointInfoRes { + pub epoch: u64, + pub digest: [u8; 32], +}