Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

23 changes: 19 additions & 4 deletions finalizer/src/actor.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::archive::backup_with_enclave;
use crate::db::{Config as StateConfig, FinalizerState};
use crate::{FinalizerConfig, FinalizerMailbox, FinalizerMessage};
use alloy_eips::eip4895::Withdrawal;
Expand Down Expand Up @@ -39,7 +40,7 @@ use summit_types::utils::{is_last_block_of_epoch, is_penultimate_block_of_epoch}
use summit_types::{Block, BlockAuxData, Digest, FinalizedHeader, PublicKey, Signature};
use summit_types::{EngineClient, consensus_state::ConsensusState};
use tokio_util::sync::CancellationToken;
use tracing::{debug, info, warn};
use tracing::{debug, error, info, warn};

const WRITE_BUFFER: NonZero<usize> = NZUsize!(1024 * 1024);

Expand All @@ -57,6 +58,7 @@ pub struct Finalizer<
S: Signer<PublicKey = PublicKey>,
V: Variant,
> {
archive_mode: bool,
mailbox: mpsc::Receiver<FinalizerMessage<bls12381_multisig::Scheme<PublicKey, V>, Block<S, V>>>,
pending_height_notifys: BTreeMap<(u64, Digest), Vec<oneshot::Sender<()>>>,
context: ContextCell<R>,
Expand Down Expand Up @@ -137,6 +139,7 @@ impl<

(
Self {
archive_mode: cfg.archive_mode,
context: ContextCell::new(context),
mailbox: rx,
engine_client: cfg.engine_client,
Expand Down Expand Up @@ -303,7 +306,7 @@ impl<
"executing finalized block directly (no prior fork state)"
);
execute_block(
&self.engine_client,
&mut self.engine_client,
&self.context,
&block,
&mut self.canonical_state,
Expand Down Expand Up @@ -445,6 +448,18 @@ impl<
}
}

if self.archive_mode {
// Should always be there
if let Some(checkpoint) = &self.canonical_state.pending_checkpoint {
if let Err(e) =
backup_with_enclave(self.canonical_state.epoch, checkpoint.clone())
{
// This shouldnt be critical but it should be logged
error!("Unable to backup with enclave: {}", e);
}
}
}

#[cfg(feature = "prom")]
let db_operations_start = Instant::now();
// This pending checkpoint should always exist, because it was created at the previous height.
Expand Down Expand Up @@ -591,7 +606,7 @@ impl<

// Execute the block into the cloned parent state
execute_block(
&self.engine_client,
&mut self.engine_client,
&self.context,
&block,
&mut fork_state,
Expand Down Expand Up @@ -769,7 +784,7 @@ async fn execute_block<
V: Variant,
R: Storage + Metrics + Clock + Spawner + governor::clock::Clock + Rng,
>(
engine_client: &C,
engine_client: &mut C,
context: &ContextCell<R>,
block: &Block<S, V>,
state: &mut ConsensusState,
Expand Down
50 changes: 50 additions & 0 deletions finalizer/src/archive.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
use std::{
io::{Read as _, Write as _},
os::unix::net::UnixStream,
};

use commonware_codec::Encode as _;
use summit_types::checkpoint::Checkpoint;
use tracing::info;

const ENCLAVE_SOCKET_PATH: &str = "/tmp/reth_enclave_socket.ipc";

pub(crate) fn backup_with_enclave(epoch: u64, checkpoint: Checkpoint) -> std::io::Result<()> {
info!("Starting backup procedure with enclave for epoch {epoch}");
// Connect to socket
let mut stream = UnixStream::connect(ENCLAVE_SOCKET_PATH)?;

info!("Connected to enclave unix socket");
// Send epoch that is being backed up
stream.write_all(&epoch.to_le_bytes())?;
info!("Sent current epoch to enclave, waiting for ack");
// wait for ack
wait_for_ack(&mut stream)?;
// send checkpoint length
let checkpoint_bytes = checkpoint.encode();

let len = checkpoint_bytes.len() as u32;
stream.write_all(&len.to_le_bytes())?;

// send rest of data
stream.write_all(&checkpoint_bytes)?;
// block until final ack
wait_for_ack(&mut stream)?;

Ok(())
}

fn wait_for_ack(stream: &mut UnixStream) -> std::io::Result<()> {
let mut buffer = [0; 3];
stream.read_exact(&mut buffer)?;

if &buffer == b"ACK" {
println!("Received ACK");
Ok(())
} else {
Err(std::io::Error::new(
std::io::ErrorKind::InvalidData,
"Expected ACK but got something else",
))
}
}
1 change: 1 addition & 0 deletions finalizer/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use summit_types::{EngineClient, PublicKey, consensus_state::ConsensusState};
use tokio_util::sync::CancellationToken;

pub struct FinalizerConfig<C: EngineClient, O: NetworkOracle<PublicKey>, V: Variant> {
pub archive_mode: bool,
pub mailbox_size: usize,
pub db_prefix: String,
pub engine_client: C,
Expand Down
1 change: 1 addition & 0 deletions finalizer/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,5 @@ pub use ingress::*;
pub mod config;
pub use config::*;
pub mod actor;
mod archive;
pub mod db;
45 changes: 36 additions & 9 deletions node/src/args.rs
Original file line number Diff line number Diff line change
Expand Up @@ -124,9 +124,18 @@ pub struct RunFlags {
/// Path to a checkpoint file
#[arg(long)]
pub checkpoint_path: Option<String>,

/// Path to a checkpoint file. If not there summit will start normally
#[arg(long)]
pub checkpoint_or_default: Option<String>,

/// IP address for this node (optional, will use genesis if not provided)
#[arg(long)]
pub ip: Option<String>,

/// Start this mode on archive mode and store a checkpoint for every epoch
#[arg(long)]
pub archive_mode: bool,
}

impl Command {
Expand Down Expand Up @@ -165,15 +174,31 @@ impl Command {
console_subscriber::init();
}

let maybe_checkpoint = flags.checkpoint_path.as_ref().map(|path| {
// TODO(matthias): verify the checkpoint
let checkpoint_bytes: Vec<u8> =
std::fs::read(path).expect("failed to read checkpoint from disk");
let checkpoint =
Checkpoint::from_ssz_bytes(&checkpoint_bytes).expect("failed to parse checkpoint");
ConsensusState::try_from(checkpoint)
.expect("failed to create consensus state from checkpoint")
});
let maybe_checkpoint = if let Some(checkpoint) = &flags.checkpoint_or_default {
if std::fs::exists(checkpoint).unwrap_or_default() {
// TODO(matthias): verify the checkpoint
let checkpoint_bytes: Vec<u8> =
std::fs::read(checkpoint).expect("failed to read checkpoint from disk");
let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_bytes)
.expect("failed to parse checkpoint");
let state = ConsensusState::try_from(checkpoint)
.expect("failed to create consensus state from checkpoint");

Some(state)
} else {
None
}
} else {
flags.checkpoint_path.as_ref().map(|path| {
// TODO(matthias): verify the checkpoint
let checkpoint_bytes: Vec<u8> =
std::fs::read(path).expect("failed to read checkpoint from disk");
let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_bytes)
.expect("failed to parse checkpoint");
ConsensusState::try_from(checkpoint)
.expect("failed to create consensus state from checkpoint")
})
};

let store_path = get_expanded_path(&flags.store_path).expect("Invalid store path");
let key_store = expect_key_store(&flags.key_store_path);
Expand Down Expand Up @@ -342,6 +367,7 @@ impl Command {
flags.db_prefix.clone(),
&genesis,
initial_state,
flags.archive_mode,
)
.unwrap();

Expand Down Expand Up @@ -533,6 +559,7 @@ pub fn run_node_local(
flags.db_prefix.clone(),
&genesis,
initial_state,
flags.archive_mode,
)
.unwrap();

Expand Down
2 changes: 1 addition & 1 deletion node/src/bin/execute_blocks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ async fn main() -> Result<()> {
let client = HistoricalEngineClient::new(engine_ipc_path.clone(), block_dir.clone()).await;
#[allow(unused)]
#[cfg(feature = "bench")]
let client = EthereumHistoricalEngineClient::new(engine_ipc_path, block_dir).await;
let mut client = EthereumHistoricalEngineClient::new(engine_ipc_path, block_dir).await;

// Load and commit blocks to Reth
let genesis_hash: [u8; 32] = from_hex_formatted(genesis_hash_str)
Expand Down
18 changes: 11 additions & 7 deletions node/src/bin/stake_and_checkpoint.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ use clap::Parser;
use commonware_cryptography::Sha256;
use commonware_cryptography::{Hasher, PrivateKeyExt, Signer, bls12381, ed25519::PrivateKey};
use commonware_runtime::{Clock, Metrics as _, Runner as _, Spawner as _, tokio as cw_tokio};
use commonware_utils::from_hex_formatted;
use futures::{FutureExt, pin_mut};
use ssz::Decode;
use std::collections::VecDeque;
Expand All @@ -40,6 +39,7 @@ use summit_types::consensus_state::ConsensusState;
use summit_types::execution_request::DepositRequest;
use summit_types::execution_request::compute_deposit_data_root;
use summit_types::reth::Reth;
use summit_types::rpc::CheckpointRes;
use tokio::sync::mpsc;
use tracing::Level;

Expand Down Expand Up @@ -328,7 +328,7 @@ fn main() -> Result<(), Box<dyn std::error::Error>> {
// Retrieve checkpoint from first node
println!("Retrieving checkpoint from node 0");
let checkpoint_state = loop {
match get_checkpoint(node0_rpc_port).await {
match get_latest_checkpoint(node0_rpc_port).await {
Ok(Some(checkpoint)) => {
let state = ConsensusState::try_from(&checkpoint)
.expect("Failed to parse checkpoint");
Expand Down Expand Up @@ -629,15 +629,17 @@ async fn get_latest_height(rpc_port: u16) -> Result<u64, Box<dyn std::error::Err
Ok(response.parse()?)
}

async fn get_checkpoint(rpc_port: u16) -> Result<Option<Checkpoint>, Box<dyn std::error::Error>> {
let url = format!("http://localhost:{}/get_checkpoint", rpc_port);
async fn get_latest_checkpoint(
rpc_port: u16,
) -> Result<Option<Checkpoint>, Box<dyn std::error::Error>> {
let url = format!("http://localhost:{}/get_latest_checkpoint", rpc_port);
let response = reqwest::get(&url).await;

match response {
Ok(resp) if resp.status().is_success() => {
let hex_str = resp.text().await?;
let bytes = from_hex_formatted(&hex_str).ok_or("Failed to decode hex")?;
let checkpoint = Checkpoint::from_ssz_bytes(&bytes)
let checkpoint_resp: CheckpointRes = resp.json().await?;
// let bytes = from_hex_formatted(&hex_str).ok_or("Failed to decode hex")?;
let checkpoint = Checkpoint::from_ssz_bytes(&checkpoint_resp.checkpoint)
.map_err(|e| format!("Failed to decode checkpoint: {:?}", e))?;
Ok(Some(checkpoint))
}
Expand Down Expand Up @@ -770,6 +772,7 @@ fn get_node_flags(node: usize) -> RunFlags {
let path = format!("testnet/node{node}/");

RunFlags {
archive_mode: false,
key_store_path: path.clone(),
store_path: format!("{path}db"),
port: (26600 + (node * 10)) as u16,
Expand All @@ -784,6 +787,7 @@ fn get_node_flags(node: usize) -> RunFlags {
#[cfg(any(feature = "base-bench", feature = "bench"))]
bench_block_dir: None,
checkpoint_path: None,
checkpoint_or_default: None,
ip: None,
}
}
Expand Down
2 changes: 2 additions & 0 deletions node/src/bin/testnet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,8 @@ fn get_node_flags(node: usize) -> RunFlags {
#[cfg(any(feature = "base-bench", feature = "bench"))]
bench_block_dir: None,
checkpoint_path: None,
checkpoint_or_default: None,
ip: None,
archive_mode: false,
}
}
2 changes: 2 additions & 0 deletions node/src/bin/withdraw_and_exit.rs
Original file line number Diff line number Diff line change
Expand Up @@ -382,6 +382,7 @@ fn get_node_flags(node: usize) -> RunFlags {
let path = format!("testnet/node{node}/");

RunFlags {
archive_mode: false,
key_store_path: path.clone(),
store_path: format!("{path}db"),
port: (26600 + (node * 10)) as u16,
Expand All @@ -396,6 +397,7 @@ fn get_node_flags(node: usize) -> RunFlags {
#[cfg(any(feature = "base-bench", feature = "bench"))]
bench_block_dir: None,
checkpoint_path: None,
checkpoint_or_default: None,
ip: None,
}
}
3 changes: 3 additions & 0 deletions node/src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ pub struct EngineConfig<C: EngineClient, S: Signer + ZeroizeOnDrop, O: NetworkOr
pub genesis_hash: [u8; 32],

pub initial_state: ConsensusState,
pub archive_mode: bool,
}

impl<C: EngineClient, S: Signer + ZeroizeOnDrop, O: NetworkOracle<S::PublicKey>>
Expand All @@ -68,6 +69,7 @@ impl<C: EngineClient, S: Signer + ZeroizeOnDrop, O: NetworkOracle<S::PublicKey>>
db_prefix: String,
genesis: &Genesis,
initial_state: ConsensusState,
archive_mode: bool,
) -> Result<Self> {
Ok(Self {
engine_client,
Expand All @@ -94,6 +96,7 @@ impl<C: EngineClient, S: Signer + ZeroizeOnDrop, O: NetworkOracle<S::PublicKey>>
.expect("bad eth_genesis_hash")
.expect("bad eth_genesis_hash"),
initial_state,
archive_mode,
})
}
}
Expand Down
1 change: 1 addition & 0 deletions node/src/engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,7 @@ where
let (finalizer, initial_state, finalizer_mailbox) = Finalizer::new(
context.with_label("finalizer"),
FinalizerConfig {
archive_mode: cfg.archive_mode,
mailbox_size: cfg.mailbox_size,
db_prefix: cfg.partition_prefix.clone(),
engine_client: cfg.engine_client,
Expand Down
1 change: 1 addition & 0 deletions node/src/test_harness/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -536,6 +536,7 @@ pub fn get_default_engine_config<
fetch_concurrent: 10,
fetch_rate_per_peer: Quota::per_second(NonZeroU32::new(10).unwrap()),
initial_state,
archive_mode: false,
}
}

Expand Down
Loading