diff --git a/CHANGELOG.md b/CHANGELOG.md index 9faf9bd88..5fa1efc93 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ - [BREAKING] Move block proving from Blocker Producer to the Store ([#1579](https://github.com/0xMiden/miden-node/pull/1579)). - [BREAKING] Updated miden-base dependencies to use `next` branch; renamed `NoteInputs` to `NoteStorage`, `.inputs()` to `.storage()`, and database `inputs` column to `storage` ([#1595](https://github.com/0xMiden/miden-node/pull/1595)). +- Validator now persists validated transactions ([#1614](https://github.com/0xMiden/miden-node/pull/1614)). ### Changes diff --git a/Cargo.lock b/Cargo.lock index 47c301d69..5d4da3399 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3009,8 +3009,12 @@ name = "miden-node-validator" version = "0.14.0" dependencies = [ "anyhow", + "deadpool-diesel", + "diesel", + "diesel_migrations", "miden-node-proto", "miden-node-proto-build", + "miden-node-store", "miden-node-utils", "miden-protocol", "miden-tx", diff --git a/bin/node/.env b/bin/node/.env index fc4c2793e..6bdfa9a80 100644 --- a/bin/node/.env +++ b/bin/node/.env @@ -10,7 +10,7 @@ MIDEN_NODE_STORE_RPC_URL= MIDEN_NODE_STORE_NTX_BUILDER_URL= MIDEN_NODE_STORE_BLOCK_PRODUCER_URL= MIDEN_NODE_VALIDATOR_BLOCK_PRODUCER_URL= -MIDEN_NODE_VALIDATOR_INSECURE_SECRET_KEY= +MIDEN_NODE_VALIDATOR_KEY= MIDEN_NODE_RPC_URL=http://0.0.0.0:57291 MIDEN_NODE_DATA_DIRECTORY=./ MIDEN_NODE_ENABLE_OTEL=true diff --git a/bin/node/src/commands/bundled.rs b/bin/node/src/commands/bundled.rs index 9cfc654b1..d6154a5af 100644 --- a/bin/node/src/commands/bundled.rs +++ b/bin/node/src/commands/bundled.rs @@ -8,7 +8,6 @@ use miden_node_rpc::Rpc; use miden_node_store::Store; use miden_node_utils::grpc::UrlExt; use miden_node_validator::Validator; -use miden_protocol::block::BlockSigner; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::SecretKey; use miden_protocol::utils::Deserializable; use tokio::net::TcpListener; @@ -22,9 +21,10 @@ use crate::commands::{ ENV_BLOCK_PROVER_URL, ENV_ENABLE_OTEL, ENV_GENESIS_CONFIG_FILE, - ENV_VALIDATOR_INSECURE_SECRET_KEY, + ENV_VALIDATOR_KEY, INSECURE_VALIDATOR_KEY_HEX, NtxBuilderConfig, + ValidatorConfig, duration_to_human_readable_string, }; @@ -51,12 +51,12 @@ pub enum BundledCommand { /// /// If not provided, a predefined key is used. #[arg( - long = "validator.insecure.secret-key", - env = ENV_VALIDATOR_INSECURE_SECRET_KEY, - value_name = "VALIDATOR_INSECURE_SECRET_KEY", + long = "validator.key", + env = ENV_VALIDATOR_KEY, + value_name = "VALIDATOR_KEY", default_value = INSECURE_VALIDATOR_KEY_HEX )] - validator_insecure_secret_key: String, + validator_key: String, }, /// Runs all three node components in the same process. @@ -82,6 +82,9 @@ pub enum BundledCommand { #[command(flatten)] ntx_builder: NtxBuilderConfig, + #[command(flatten)] + validator: ValidatorConfig, + /// Enables the exporting of traces for OpenTelemetry. /// /// This can be further configured using environment variables as defined in the official @@ -99,15 +102,6 @@ pub enum BundledCommand { value_name = "DURATION" )] grpc_timeout: Duration, - - /// Insecure, hex-encoded validator secret key for development and testing purposes. - #[arg( - long = "validator.insecure.secret-key", - env = ENV_VALIDATOR_INSECURE_SECRET_KEY, - value_name = "VALIDATOR_INSECURE_SECRET_KEY", - default_value = INSECURE_VALIDATOR_KEY_HEX - )] - validator_insecure_secret_key: String, }, } @@ -118,14 +112,14 @@ impl BundledCommand { data_directory, accounts_directory, genesis_config_file, - validator_insecure_secret_key, + validator_key, } => { // Currently the bundled bootstrap is identical to the store's bootstrap. crate::commands::store::StoreCommand::Bootstrap { data_directory, accounts_directory, genesis_config_file, - validator_insecure_secret_key, + validator_key, } .handle() .await @@ -137,20 +131,18 @@ impl BundledCommand { data_directory, block_producer, ntx_builder, + validator, enable_otel: _, grpc_timeout, - validator_insecure_secret_key, } => { - let secret_key_bytes = hex::decode(validator_insecure_secret_key)?; - let signer = SecretKey::read_from_bytes(&secret_key_bytes)?; Self::start( rpc_url, block_prover_url, data_directory, - ntx_builder, block_producer, + ntx_builder, + validator, grpc_timeout, - signer, ) .await }, @@ -162,10 +154,10 @@ impl BundledCommand { rpc_url: Url, block_prover_url: Option, data_directory: PathBuf, - ntx_builder: NtxBuilderConfig, block_producer: BlockProducerConfig, + ntx_builder: NtxBuilderConfig, + validator: ValidatorConfig, grpc_timeout: Duration, - signer: impl BlockSigner + Send + Sync + 'static, ) -> anyhow::Result<()> { // Start listening on all gRPC urls so that inter-component connections can be created // before each component is fully started up. @@ -177,17 +169,19 @@ impl BundledCommand { .await .context("Failed to bind to RPC gRPC endpoint")?; - let block_producer_address = TcpListener::bind("127.0.0.1:0") - .await - .context("Failed to bind to block-producer gRPC endpoint")? - .local_addr() - .context("Failed to retrieve the block-producer's gRPC address")?; + let (block_producer_url, block_producer_address) = { + let socket_addr = TcpListener::bind("127.0.0.1:0") + .await + .context("Failed to bind to block-producer gRPC endpoint")? + .local_addr() + .context("Failed to retrieve the block-producer's gRPC address")?; + let url = Url::parse(&format!("http://{socket_addr}")) + .context("Failed to parse Block Producer URL")?; + (url, socket_addr) + }; - let validator_address = TcpListener::bind("127.0.0.1:0") - .await - .context("Failed to bind to validator gRPC endpoint")? - .local_addr() - .context("Failed to retrieve the validator's gRPC address")?; + // Validator URL is either specified remote, or generated local. + let (validator_url, validator_socket_address) = validator.to_addresses().await?; // Store addresses for each exposed API let store_rpc_listener = TcpListener::bind("127.0.0.1:0") @@ -231,74 +225,59 @@ impl BundledCommand { let should_start_ntx_builder = !ntx_builder.disabled; // Start block-producer. The block-producer's endpoint is available after loading completes. - let block_producer_id = join_set - .spawn({ - let store_url = Url::parse(&format!("http://{store_block_producer_address}")) - .context("Failed to parse URL")?; - let validator_url = Url::parse(&format!("http://{validator_address}")) - .context("Failed to parse URL")?; - async move { - BlockProducer { - block_producer_address, - store_url, - validator_url, - batch_prover_url: block_producer.batch_prover_url, - batch_interval: block_producer.batch_interval, - block_interval: block_producer.block_interval, - max_batches_per_block: block_producer.max_batches_per_block, - max_txs_per_batch: block_producer.max_txs_per_batch, - grpc_timeout, - mempool_tx_capacity: block_producer.mempool_tx_capacity, + let block_producer_id = { + let validator_url = validator_url.clone(); + join_set + .spawn({ + let store_url = Url::parse(&format!("http://{store_block_producer_address}")) + .context("Failed to parse URL")?; + async move { + BlockProducer { + block_producer_address, + store_url, + validator_url, + batch_prover_url: block_producer.batch_prover_url, + batch_interval: block_producer.batch_interval, + block_interval: block_producer.block_interval, + max_batches_per_block: block_producer.max_batches_per_block, + max_txs_per_batch: block_producer.max_txs_per_batch, + grpc_timeout, + mempool_tx_capacity: block_producer.mempool_tx_capacity, + } + .serve() + .await + .context("failed while serving block-producer component") } - .serve() - .await - .context("failed while serving block-producer component") - } - }) - .id(); + }) + .id() + }; - let validator_id = join_set - .spawn({ - async move { - Validator { - address: validator_address, + // Start RPC component. + let rpc_id = { + let block_producer_url = block_producer_url.clone(); + let validator_url = validator_url.clone(); + join_set + .spawn(async move { + let store_url = Url::parse(&format!("http://{store_rpc_address}")) + .context("Failed to parse URL")?; + Rpc { + listener: grpc_rpc, + store_url, + block_producer_url: Some(block_producer_url), + validator_url, grpc_timeout, - signer, } .serve() .await - .context("failed while serving validator component") - } - }) - .id(); - - // Start RPC component. - let rpc_id = join_set - .spawn(async move { - let store_url = Url::parse(&format!("http://{store_rpc_address}")) - .context("Failed to parse URL")?; - let block_producer_url = Url::parse(&format!("http://{block_producer_address}")) - .context("Failed to parse URL")?; - let validator_url = Url::parse(&format!("http://{validator_address}")) - .context("Failed to parse URL")?; - Rpc { - listener: grpc_rpc, - store_url, - block_producer_url: Some(block_producer_url), - validator_url, - grpc_timeout, - } - .serve() - .await - .context("failed while serving RPC component") - }) - .id(); + .context("failed while serving RPC component") + }) + .id() + }; // Lookup table so we can identify the failed component. let mut component_ids = HashMap::from([ (store_id, "store"), (block_producer_id, "block-producer"), - (validator_id, "validator"), (rpc_id, "rpc"), ]); @@ -306,10 +285,8 @@ impl BundledCommand { if should_start_ntx_builder { let store_ntx_builder_url = Url::parse(&format!("http://{store_ntx_builder_address}")) .context("Failed to parse URL")?; - let validator_url = Url::parse(&format!("http://{validator_address}")) - .context("Failed to parse URL")?; - let block_producer_url = Url::parse(&format!("http://{block_producer_address}")) - .context("Failed to parse URL")?; + let block_producer_url = block_producer_url.clone(); + let validator_url = validator_url.clone(); let builder_config = ntx_builder.into_builder_config( store_ntx_builder_url, @@ -331,6 +308,28 @@ impl BundledCommand { component_ids.insert(id, "ntx-builder"); } + // Start the Validator if we have bound a socket. + if let Some(address) = validator_socket_address { + let secret_key_bytes = hex::decode(validator.validator_key)?; + let signer = SecretKey::read_from_bytes(&secret_key_bytes)?; + let id = join_set + .spawn({ + async move { + Validator { + address, + grpc_timeout, + signer, + data_directory, + } + .serve() + .await + .context("failed while serving validator component") + } + }) + .id(); + component_ids.insert(id, "validator"); + } + // SAFETY: The joinset is definitely not empty. let component_result = join_set.join_next_with_id().await.unwrap(); diff --git a/bin/node/src/commands/mod.rs b/bin/node/src/commands/mod.rs index 5b1e8e52a..a4c908846 100644 --- a/bin/node/src/commands/mod.rs +++ b/bin/node/src/commands/mod.rs @@ -1,12 +1,15 @@ +use std::net::SocketAddr; use std::num::NonZeroUsize; use std::time::Duration; +use anyhow::Context; use miden_node_block_producer::{ DEFAULT_BATCH_INTERVAL, DEFAULT_BLOCK_INTERVAL, DEFAULT_MAX_BATCHES_PER_BLOCK, DEFAULT_MAX_TXS_PER_BATCH, }; +use tokio::net::TcpListener; use url::Url; pub mod block_producer; @@ -36,7 +39,7 @@ const ENV_MAX_TXS_PER_BATCH: &str = "MIDEN_MAX_TXS_PER_BATCH"; const ENV_MAX_BATCHES_PER_BLOCK: &str = "MIDEN_MAX_BATCHES_PER_BLOCK"; const ENV_MEMPOOL_TX_CAPACITY: &str = "MIDEN_NODE_MEMPOOL_TX_CAPACITY"; const ENV_NTX_SCRIPT_CACHE_SIZE: &str = "MIDEN_NTX_DATA_STORE_SCRIPT_CACHE_SIZE"; -const ENV_VALIDATOR_INSECURE_SECRET_KEY: &str = "MIDEN_NODE_VALIDATOR_INSECURE_SECRET_KEY"; +const ENV_VALIDATOR_KEY: &str = "MIDEN_NODE_VALIDATOR_KEY"; const DEFAULT_NTX_TICKER_INTERVAL: Duration = Duration::from_millis(200); const DEFAULT_TIMEOUT: Duration = Duration::from_secs(10); @@ -47,7 +50,49 @@ fn duration_to_human_readable_string(duration: Duration) -> String { humantime::format_duration(duration).to_string() } -/// Configuration for the Network Transaction Builder component +/// Configuration for the Validator component. +#[derive(clap::Args)] +pub struct ValidatorConfig { + /// Insecure, hex-encoded validator secret key for development and testing purposes. + /// Only used when the Validator URL argument is not set. + #[arg( + long = "validator.key", + env = ENV_VALIDATOR_KEY, + value_name = "VALIDATOR_KEY", + default_value = INSECURE_VALIDATOR_KEY_HEX + )] + validator_key: String, + + /// The remote Validator's gRPC URL. If unset, will default to running a Validator + /// in-process. If set, the insecure key argument is ignored. + #[arg(long = "validator.url", env = ENV_VALIDATOR_URL, value_name = "URL")] + validator_url: Option, +} + +impl ValidatorConfig { + /// Converts the [`ValidatorConfig`] into a URL and an optional [`SocketAddr`]. + /// + /// If the `validator_url` is set, it returns the URL and `None` for the [`SocketAddr`]. + /// + /// If `validator_url` is not set, it binds to a random port on localhost, creates a URL, + /// and returns the URL and the bound [`SocketAddr`]. + async fn to_addresses(&self) -> anyhow::Result<(Url, Option)> { + if let Some(url) = &self.validator_url { + Ok((url.clone(), None)) + } else { + let socket_addr = TcpListener::bind("127.0.0.1:0") + .await + .context("Failed to bind to validator gRPC endpoint")? + .local_addr() + .context("Failed to retrieve the validator's gRPC address")?; + let url = Url::parse(&format!("http://{socket_addr}")) + .context("Failed to parse Validator URL")?; + Ok((url, Some(socket_addr))) + } + } +} + +/// Configuration for the Network Transaction Builder component. #[derive(clap::Args)] pub struct NtxBuilderConfig { /// Disable spawning the network transaction builder. diff --git a/bin/node/src/commands/store.rs b/bin/node/src/commands/store.rs index a78655cd9..50df572da 100644 --- a/bin/node/src/commands/store.rs +++ b/bin/node/src/commands/store.rs @@ -20,7 +20,7 @@ use crate::commands::{ ENV_BLOCK_PROVER_URL, ENV_ENABLE_OTEL, ENV_GENESIS_CONFIG_FILE, - ENV_VALIDATOR_INSECURE_SECRET_KEY, + ENV_VALIDATOR_KEY, INSECURE_VALIDATOR_KEY_HEX, duration_to_human_readable_string, }; @@ -46,14 +46,16 @@ pub enum StoreCommand { genesis_config_file: Option, /// Insecure, hex-encoded validator secret key for development and testing purposes. /// + /// Used to sign the genesis block in the bootstrap process. + /// /// If not provided, a predefined key is used. #[arg( - long = "validator.insecure.secret-key", - env = ENV_VALIDATOR_INSECURE_SECRET_KEY, - value_name = "VALIDATOR_INSECURE_SECRET_KEY", + long = "validator.key", + env = ENV_VALIDATOR_KEY, + value_name = "VALIDATOR_KEY", default_value = INSECURE_VALIDATOR_KEY_HEX )] - validator_insecure_secret_key: String, + validator_key: String, }, /// Starts the store component. @@ -109,12 +111,12 @@ impl StoreCommand { data_directory, accounts_directory, genesis_config_file, - validator_insecure_secret_key, + validator_key, } => Self::bootstrap( &data_directory, &accounts_directory, genesis_config_file.as_ref(), - validator_insecure_secret_key, + validator_key, ), StoreCommand::Start { rpc_url, @@ -192,10 +194,10 @@ impl StoreCommand { data_directory: &Path, accounts_directory: &Path, genesis_config: Option<&PathBuf>, - validator_insecure_secret_key: String, + validator_key: String, ) -> anyhow::Result<()> { // Decode the validator key. - let signer = SecretKey::read_from_bytes(&hex::decode(validator_insecure_secret_key)?)?; + let signer = SecretKey::read_from_bytes(&hex::decode(validator_key)?)?; // Parse genesis config (or default if not given). let config = genesis_config diff --git a/bin/node/src/commands/validator.rs b/bin/node/src/commands/validator.rs index f543be301..461e446c1 100644 --- a/bin/node/src/commands/validator.rs +++ b/bin/node/src/commands/validator.rs @@ -1,3 +1,4 @@ +use std::path::PathBuf; use std::time::Duration; use anyhow::Context; @@ -9,8 +10,9 @@ use url::Url; use crate::commands::{ DEFAULT_TIMEOUT, + ENV_DATA_DIRECTORY, ENV_ENABLE_OTEL, - ENV_VALIDATOR_INSECURE_SECRET_KEY, + ENV_VALIDATOR_KEY, ENV_VALIDATOR_URL, INSECURE_VALIDATOR_KEY_HEX, duration_to_human_readable_string, @@ -40,29 +42,42 @@ pub enum ValidatorCommand { )] grpc_timeout: Duration, + /// Directory in which to store the validator's data. + #[arg(long, env = ENV_DATA_DIRECTORY, value_name = "DIR")] + data_directory: PathBuf, + /// Insecure, hex-encoded validator secret key for development and testing purposes. /// /// If not provided, a predefined key is used. - #[arg(long = "insecure.secret-key", env = ENV_VALIDATOR_INSECURE_SECRET_KEY, value_name = "INSECURE_SECRET_KEY", default_value = INSECURE_VALIDATOR_KEY_HEX)] - insecure_secret_key: String, + #[arg(long = "key", env = ENV_VALIDATOR_KEY, value_name = "VALIDATOR_KEY", default_value = INSECURE_VALIDATOR_KEY_HEX)] + validator_key: String, }, } impl ValidatorCommand { pub async fn handle(self) -> anyhow::Result<()> { let Self::Start { - url, grpc_timeout, insecure_secret_key, .. + url, + grpc_timeout, + validator_key, + data_directory, + .. } = self; let address = url.to_socket().context("Failed to extract socket address from validator URL")?; - let signer = SecretKey::read_from_bytes(hex::decode(insecure_secret_key)?.as_ref())?; + let signer = SecretKey::read_from_bytes(hex::decode(validator_key)?.as_ref())?; - Validator { address, grpc_timeout, signer } - .serve() - .await - .context("failed while serving validator component") + Validator { + address, + grpc_timeout, + signer, + data_directory, + } + .serve() + .await + .context("failed while serving validator component") } pub fn is_open_telemetry_enabled(&self) -> bool { diff --git a/crates/block-producer/src/server/tests.rs b/crates/block-producer/src/server/tests.rs index c404a2ae9..8c98e9da4 100644 --- a/crates/block-producer/src/server/tests.rs +++ b/crates/block-producer/src/server/tests.rs @@ -44,10 +44,13 @@ async fn block_producer_startup_is_robust_to_network_failures() { // start the validator task::spawn(async move { + let temp_dir = tempfile::tempdir().expect("tempdir should be created"); + let data_directory = temp_dir.path().to_path_buf(); Validator { address: validator_addr, grpc_timeout, signer: SecretKey::random(), + data_directory, } .serve() .await diff --git a/crates/store/src/db/manager.rs b/crates/store/src/db/manager.rs index fca9a33db..5ac72e0ad 100644 --- a/crates/store/src/db/manager.rs +++ b/crates/store/src/db/manager.rs @@ -36,12 +36,12 @@ impl ConnectionManagerError { /// Create a connection manager with per-connection setup /// /// Particularly, `foreign_key` checks are enabled and using a write-append-log for journaling. -pub(crate) struct ConnectionManager { +pub struct ConnectionManager { pub(crate) manager: deadpool_diesel::sqlite::Manager, } impl ConnectionManager { - pub(crate) fn new(database_path: &str) -> Self { + pub fn new(database_path: &str) -> Self { let manager = deadpool_diesel::sqlite::Manager::new( database_path.to_owned(), deadpool_diesel::sqlite::Runtime::Tokio1, @@ -78,6 +78,11 @@ impl deadpool::managed::Manager for ConnectionManager { pub(crate) fn configure_connection_on_creation( conn: &mut SqliteConnection, ) -> Result<(), ConnectionManagerError> { + // Wait up to 5 seconds for writer locks before erroring. + diesel::sql_query("PRAGMA busy_timeout=5000") + .execute(conn) + .map_err(ConnectionManagerError::ConnectionParamSetup)?; + // Enable the WAL mode. This allows concurrent reads while the transaction is being written, // this is required for proper synchronization of the servers in-memory and on-disk // representations (see [State::apply_block]) diff --git a/crates/store/src/db/mod.rs b/crates/store/src/db/mod.rs index a9b77eb9b..194bcfc27 100644 --- a/crates/store/src/db/mod.rs +++ b/crates/store/src/db/mod.rs @@ -224,6 +224,11 @@ impl From for NoteSyncRecord { } impl Db { + /// Creates a new database instance with the provided connection pool. + pub fn new(pool: deadpool_diesel::Pool) -> Self { + Self { pool } + } + /// Creates a new database and inserts the genesis block. #[instrument( target = COMPONENT, @@ -266,7 +271,7 @@ impl Db { } /// Create and commit a transaction with the queries added in the provided closure - pub(crate) async fn transact(&self, msg: M, query: Q) -> std::result::Result + pub async fn transact(&self, msg: M, query: Q) -> std::result::Result where Q: Send + for<'a, 't> FnOnce(&'a mut SqliteConnection) -> std::result::Result @@ -291,7 +296,7 @@ impl Db { } /// Run the query _without_ a transaction - pub(crate) async fn query(&self, msg: M, query: Q) -> std::result::Result + pub async fn query(&self, msg: M, query: Q) -> std::result::Result where Q: Send + FnOnce(&mut SqliteConnection) -> std::result::Result + 'static, R: Send + 'static, diff --git a/crates/store/src/db/models/conv.rs b/crates/store/src/db/models/conv.rs index 2e6313bf6..a52fe194d 100644 --- a/crates/store/src/db/models/conv.rs +++ b/crates/store/src/db/models/conv.rs @@ -50,7 +50,7 @@ pub struct DatabaseTypeConversionError { /// Convert from and to it's database representation and back /// /// We do not assume sanity of DB types. -pub(crate) trait SqlTypeConvert: Sized { +pub trait SqlTypeConvert: Sized { type Raw: Sized; fn to_raw_sql(self) -> Self::Raw; diff --git a/crates/store/src/lib.rs b/crates/store/src/lib.rs index 1cc028ac3..06bba2fe8 100644 --- a/crates/store/src/lib.rs +++ b/crates/store/src/lib.rs @@ -10,6 +10,10 @@ pub mod state; #[cfg(feature = "rocksdb")] pub use accounts::PersistentAccountTree; pub use accounts::{AccountTreeWithHistory, HistoricalError, InMemoryAccountTree}; +pub use db::Db; +pub use db::manager::ConnectionManager; +pub use db::models::conv::SqlTypeConvert; +pub use errors::{DatabaseError, DatabaseSetupError}; pub use genesis::GenesisState; pub use server::block_prover_client::BlockProver; pub use server::{DataDirectory, Store}; diff --git a/crates/validator/Cargo.toml b/crates/validator/Cargo.toml index 6115e7cff..26a76a2b3 100644 --- a/crates/validator/Cargo.toml +++ b/crates/validator/Cargo.toml @@ -18,8 +18,12 @@ workspace = true [dependencies] anyhow = { workspace = true } +deadpool-diesel = { workspace = true } +diesel = { workspace = true } +diesel_migrations = { workspace = true } miden-node-proto = { workspace = true } miden-node-proto-build = { features = ["internal"], workspace = true } +miden-node-store = { workspace = true } miden-node-utils = { features = ["testing"], workspace = true } miden-protocol = { workspace = true } miden-tx = { workspace = true } diff --git a/crates/validator/build.rs b/crates/validator/build.rs new file mode 100644 index 000000000..b9f947e17 --- /dev/null +++ b/crates/validator/build.rs @@ -0,0 +1,9 @@ +// This build.rs is required to trigger the `diesel_migrations::embed_migrations!` proc-macro in +// `validator/src/db/migrations.rs` to include the latest version of the migrations into the binary, see . +fn main() { + println!("cargo:rerun-if-changed=./src/db/migrations"); + // If we do one re-write, the default rules are disabled, + // hence we need to trigger explicitly on `Cargo.toml`. + // + println!("cargo:rerun-if-changed=Cargo.toml"); +} diff --git a/crates/validator/diesel.toml b/crates/validator/diesel.toml new file mode 100644 index 000000000..bdce9175f --- /dev/null +++ b/crates/validator/diesel.toml @@ -0,0 +1,5 @@ +# For documentation on how to configure this file, +# see https://diesel.rs/guides/configuring-diesel-cli + +[print_schema] +file = "src/db/schema.rs" diff --git a/crates/validator/src/block_validation/mod.rs b/crates/validator/src/block_validation/mod.rs index c1cab190b..143d2dee1 100644 --- a/crates/validator/src/block_validation/mod.rs +++ b/crates/validator/src/block_validation/mod.rs @@ -1,22 +1,24 @@ -use std::sync::Arc; - -use miden_protocol::block::{BlockNumber, BlockSigner, ProposedBlock}; +use miden_node_store::{DatabaseError, Db}; +use miden_protocol::block::{BlockSigner, ProposedBlock}; use miden_protocol::crypto::dsa::ecdsa_k256_keccak::Signature; use miden_protocol::errors::ProposedBlockError; -use miden_protocol::transaction::TransactionId; -use tracing::{Instrument, info_span}; +use miden_protocol::transaction::{TransactionHeader, TransactionId}; +use tracing::{info_span, instrument}; -use crate::server::ValidatedTransactions; +use crate::COMPONENT; +use crate::db::find_unvalidated_transactions; // BLOCK VALIDATION ERROR // ================================================================================================ #[derive(thiserror::Error, Debug)] pub enum BlockValidationError { - #[error("transaction {0} in block {1} has not been validated")] - TransactionNotValidated(TransactionId, BlockNumber), + #[error("block contains unvalidated transactions {0:?}")] + UnvalidatedTransactions(Vec), #[error("failed to build block")] - BlockBuildingFailed(#[from] ProposedBlockError), + BlockBuildingFailed(#[source] ProposedBlockError), + #[error("failed to select transactions")] + DatabaseError(#[source] DatabaseError), } // BLOCK VALIDATION @@ -24,33 +26,31 @@ pub enum BlockValidationError { /// Validates a block by checking that all transactions in the proposed block have been processed by /// the validator in the past. -/// -/// Removes the validated transactions from the cache upon success. +#[instrument(target = COMPONENT, skip_all, err)] pub async fn validate_block( proposed_block: ProposedBlock, signer: &S, - validated_transactions: Arc, + db: &Db, ) -> Result { - // Check that all transactions in the proposed block have been validated - let verify_span = info_span!("verify_transactions"); - for tx_header in proposed_block.transactions() { - let tx_id = tx_header.id(); - // TODO: LruCache is a poor abstraction since it locks many times. - if validated_transactions - .get(&tx_id) - .instrument(verify_span.clone()) - .await - .is_none() - { - return Err(BlockValidationError::TransactionNotValidated( - tx_id, - proposed_block.block_num(), - )); - } + // Search for any proposed transactions that have not previously been validated. + let proposed_tx_ids = + proposed_block.transactions().map(TransactionHeader::id).collect::>(); + let unvalidated_txs = db + .transact("find_unvalidated_transactions", move |conn| { + find_unvalidated_transactions(conn, &proposed_tx_ids) + }) + .await + .map_err(BlockValidationError::DatabaseError)?; + + // All proposed transactions must have been validated. + if !unvalidated_txs.is_empty() { + return Err(BlockValidationError::UnvalidatedTransactions(unvalidated_txs)); } // Build the block header. - let (header, _) = proposed_block.into_header_and_body()?; + let (header, _) = proposed_block + .into_header_and_body() + .map_err(BlockValidationError::BlockBuildingFailed)?; // Sign the header. let signature = info_span!("sign_block").in_scope(|| signer.sign(&header)); diff --git a/crates/validator/src/db/migrations.rs b/crates/validator/src/db/migrations.rs new file mode 100644 index 000000000..6896082be --- /dev/null +++ b/crates/validator/src/db/migrations.rs @@ -0,0 +1,25 @@ +use diesel::SqliteConnection; +use diesel_migrations::{EmbeddedMigrations, MigrationHarness, embed_migrations}; +use miden_node_store::DatabaseError; +use tracing::instrument; + +use crate::COMPONENT; + +// The rebuild is automatically triggered by `build.rs` as described in +// . +pub const MIGRATIONS: EmbeddedMigrations = embed_migrations!("src/db/migrations"); + +#[instrument(level = "debug", target = COMPONENT, skip_all, err)] +pub fn apply_migrations(conn: &mut SqliteConnection) -> std::result::Result<(), DatabaseError> { + let migrations = conn.pending_migrations(MIGRATIONS).expect("In memory migrations never fail"); + tracing::info!(target = COMPONENT, "Applying {} migration(s)", migrations.len()); + + let Err(e) = conn.run_pending_migrations(MIGRATIONS) else { + return Ok(()); + }; + tracing::warn!(target = COMPONENT, "Failed to apply migration: {e:?}"); + conn.revert_last_migration(MIGRATIONS) + .expect("Duality is maintained by the developer"); + + Ok(()) +} diff --git a/crates/validator/src/db/migrations/2025062000000_setup/down.sql b/crates/validator/src/db/migrations/2025062000000_setup/down.sql new file mode 100644 index 000000000..e69de29bb diff --git a/crates/validator/src/db/migrations/2025062000000_setup/up.sql b/crates/validator/src/db/migrations/2025062000000_setup/up.sql new file mode 100644 index 000000000..06297a970 --- /dev/null +++ b/crates/validator/src/db/migrations/2025062000000_setup/up.sql @@ -0,0 +1,10 @@ +CREATE TABLE validated_transactions ( + id BLOB NOT NULL, + block_num INTEGER NOT NULL, + account_id BLOB NOT NULL, + "transaction" BLOB NOT NULL, -- Binary encoded ExecutedTransaction. + PRIMARY KEY (id) +) WITHOUT ROWID; + +CREATE INDEX idx_validated_transactions_account_id ON validated_transactions(account_id); +CREATE INDEX idx_validated_transactions_block_num ON validated_transactions(block_num); diff --git a/crates/validator/src/db/mod.rs b/crates/validator/src/db/mod.rs new file mode 100644 index 000000000..26fe77c83 --- /dev/null +++ b/crates/validator/src/db/mod.rs @@ -0,0 +1,99 @@ +mod migrations; +mod models; +mod schema; + +use std::collections::HashSet; +use std::path::PathBuf; + +use diesel::SqliteConnection; +use diesel::prelude::*; +use miden_node_store::{ConnectionManager, DatabaseError, DatabaseSetupError}; +use miden_protocol::transaction::TransactionId; +use miden_protocol::utils::{Deserializable, Serializable}; +use tracing::instrument; + +use crate::COMPONENT; +use crate::db::migrations::apply_migrations; +use crate::db::models::ValidatedTransactionRowInsert; +use crate::tx_validation::ValidatedTransaction; + +/// Open a connection to the DB and apply any pending migrations. +#[instrument(target = COMPONENT, skip_all)] +pub async fn load(database_filepath: PathBuf) -> Result { + let manager = ConnectionManager::new(database_filepath.to_str().unwrap()); + let pool = deadpool_diesel::Pool::builder(manager).max_size(16).build()?; + + tracing::info!( + target: COMPONENT, + sqlite= %database_filepath.display(), + "Connected to the database" + ); + + let db = miden_node_store::Db::new(pool); + db.query("migrations", apply_migrations).await?; + Ok(db) +} + +/// Inserts a new validated transaction into the database. +#[instrument(target = COMPONENT, skip_all, fields(tx_id = %tx_info.tx_id()), err)] +pub(crate) fn insert_transaction( + conn: &mut SqliteConnection, + tx_info: &ValidatedTransaction, +) -> Result { + let row = ValidatedTransactionRowInsert::new(tx_info); + let count = diesel::insert_into(schema::validated_transactions::table) + .values(row) + .on_conflict_do_nothing() + .execute(conn)?; + Ok(count) +} + +/// Scans the database for transaction Ids that do not exist. +/// +/// If the resulting vector is empty, all supplied transaction ids have been validated in the past. +/// +/// # Raw SQL +/// +/// ```sql +/// SELECT +/// id +/// FROM +/// validated_transactions +/// WHERE +/// id IN (?, ...) +/// ORDER BY +/// id ASC +/// ``` +#[instrument(target = COMPONENT, skip(conn), err)] +pub(crate) fn find_unvalidated_transactions( + conn: &mut SqliteConnection, + tx_ids: &[TransactionId], +) -> Result, DatabaseError> { + if tx_ids.is_empty() { + return Ok(Vec::new()); + } + + // Convert TransactionIds to bytes for query. + let tx_id_bytes: Vec> = tx_ids.iter().map(TransactionId::to_bytes).collect(); + + // Query the database for matching transactions ids. + let raw_transaction_ids = schema::validated_transactions::table + .select(schema::validated_transactions::id) + .filter(schema::validated_transactions::id.eq_any(tx_id_bytes)) + .order(schema::validated_transactions::id.asc()) + .load::>(conn) + .map_err(DatabaseError::from)?; + + // Find any requested ids that the database does not contain. + let validated_tx_ids = raw_transaction_ids + .into_iter() + .map(|raw_id| TransactionId::read_from_bytes(&raw_id)) + .collect::, _>>()?; + let mut unvalidated_tx_ids = Vec::new(); + for tx_id in tx_ids { + if !validated_tx_ids.contains(tx_id) { + unvalidated_tx_ids.push(*tx_id); + } + } + Ok(unvalidated_tx_ids) +} diff --git a/crates/validator/src/db/models.rs b/crates/validator/src/db/models.rs new file mode 100644 index 000000000..e1e67086a --- /dev/null +++ b/crates/validator/src/db/models.rs @@ -0,0 +1,27 @@ +use diesel::prelude::*; +use miden_node_store::SqlTypeConvert; +use miden_tx::utils::Serializable; + +use crate::db::schema; +use crate::tx_validation::ValidatedTransaction; + +#[derive(Debug, Clone, PartialEq, Insertable)] +#[diesel(table_name = schema::validated_transactions)] +#[diesel(check_for_backend(diesel::sqlite::Sqlite))] +pub struct ValidatedTransactionRowInsert { + pub id: Vec, + pub block_num: i64, + pub account_id: Vec, + pub transaction: Vec, +} + +impl ValidatedTransactionRowInsert { + pub fn new(tx: &ValidatedTransaction) -> Self { + Self { + id: tx.tx_id().to_bytes(), + block_num: tx.block_num().to_raw_sql(), + account_id: tx.account_id().to_bytes(), + transaction: tx.to_bytes(), + } + } +} diff --git a/crates/validator/src/db/schema.rs b/crates/validator/src/db/schema.rs new file mode 100644 index 000000000..0d299dbfd --- /dev/null +++ b/crates/validator/src/db/schema.rs @@ -0,0 +1,8 @@ +diesel::table! { + validated_transactions (id, block_num, account_id, transaction) { + id -> Binary, + block_num -> BigInt, + account_id -> Binary, + transaction -> Binary, + } +} diff --git a/crates/validator/src/lib.rs b/crates/validator/src/lib.rs index a45112d27..a987304c3 100644 --- a/crates/validator/src/lib.rs +++ b/crates/validator/src/lib.rs @@ -1,4 +1,5 @@ mod block_validation; +mod db; mod server; mod tx_validation; diff --git a/crates/validator/src/server/mod.rs b/crates/validator/src/server/mod.rs index 89d28d25d..94bf41315 100644 --- a/crates/validator/src/server/mod.rs +++ b/crates/validator/src/server/mod.rs @@ -1,5 +1,5 @@ use std::net::SocketAddr; -use std::num::NonZeroUsize; +use std::path::PathBuf; use std::sync::Arc; use std::time::Duration; @@ -7,36 +7,26 @@ use anyhow::Context; use miden_node_proto::generated::validator::api_server; use miden_node_proto::generated::{self as proto}; use miden_node_proto_build::validator_api_descriptor; +use miden_node_store::Db; use miden_node_utils::ErrorReport; -use miden_node_utils::lru_cache::LruCache; use miden_node_utils::panic::catch_panic_layer_fn; use miden_node_utils::tracing::OpenTelemetrySpanExt; use miden_node_utils::tracing::grpc::grpc_trace_fn; use miden_protocol::block::{BlockSigner, ProposedBlock}; -use miden_protocol::transaction::{ - ProvenTransaction, - TransactionHeader, - TransactionId, - TransactionInputs, -}; +use miden_protocol::transaction::{ProvenTransaction, TransactionInputs}; use miden_tx::utils::{Deserializable, Serializable}; use tokio::net::TcpListener; use tokio_stream::wrappers::TcpListenerStream; use tonic::Status; use tower_http::catch_panic::CatchPanicLayer; use tower_http::trace::TraceLayer; -use tracing::{Instrument, info_span}; +use tracing::{info_span, instrument}; use crate::COMPONENT; use crate::block_validation::validate_block; +use crate::db::{insert_transaction, load}; use crate::tx_validation::validate_transaction; -/// Number of transactions to keep in the validated transactions cache. -const NUM_VALIDATED_TRANSACTIONS: NonZeroUsize = NonZeroUsize::new(10000).unwrap(); - -/// A type alias for a LRU cache that stores validated transactions. -pub type ValidatedTransactions = LruCache; - // VALIDATOR // ================================================================================ @@ -53,6 +43,9 @@ pub struct Validator { /// The signer used to sign blocks. pub signer: S, + + /// The data directory for the validator component's database files. + pub data_directory: PathBuf, } impl Validator { @@ -63,6 +56,11 @@ impl Validator { pub async fn serve(self) -> anyhow::Result<()> { tracing::info!(target: COMPONENT, endpoint=?self.address, "Initializing server"); + // Initialize database connection. + let db = load(self.data_directory.join("validator.sqlite3")) + .await + .context("failed to initialize validator database")?; + let listener = TcpListener::bind(self.address) .await .context("failed to bind to block producer address")?; @@ -86,7 +84,7 @@ impl Validator { .layer(CatchPanicLayer::custom(catch_panic_layer_fn)) .layer(TraceLayer::new_for_grpc().make_span_with(grpc_trace_fn)) .timeout(self.grpc_timeout) - .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer))) + .add_service(api_server::ApiServer::new(ValidatorServer::new(self.signer, db))) .add_service(reflection_service) .add_service(reflection_service_alpha) .serve_with_incoming(TcpListenerStream::new(listener)) @@ -103,14 +101,12 @@ impl Validator { /// Implements the gRPC API for the validator. struct ValidatorServer { signer: S, - validated_transactions: Arc, + db: Arc, } impl ValidatorServer { - fn new(signer: S) -> Self { - let validated_transactions = - Arc::new(ValidatedTransactions::new(NUM_VALIDATED_TRANSACTIONS)); - Self { signer, validated_transactions } + fn new(signer: S, db: Db) -> Self { + Self { signer, db: db.into() } } } @@ -128,6 +124,7 @@ impl api_server::Api for ValidatorServer } /// Receives a proven transaction, then validates and stores it. + #[instrument(target = COMPONENT, skip_all, err)] async fn submit_proven_transaction( &self, request: tonic::Request, @@ -150,17 +147,14 @@ impl api_server::Api for ValidatorServer tracing::Span::current().set_attribute("transaction.id", tx.id()); // Validate the transaction. - let validated_tx_header = validate_transaction(tx, inputs).await.map_err(|err| { + let tx_info = validate_transaction(tx, inputs).await.map_err(|err| { Status::invalid_argument(err.as_report_context("Invalid transaction")) })?; - // Register the validated transaction. - let tx_id = validated_tx_header.id(); - self.validated_transactions - .put(tx_id, validated_tx_header) - .instrument(info_span!("validated_txs.insert")) - .await; - + // Store the validated transaction. + self.db + .transact("insert_transaction", move |conn| insert_transaction(conn, &tx_info)) + .await?; Ok(tonic::Response::new(())) } @@ -181,11 +175,12 @@ impl api_server::Api for ValidatorServer // Validate the block. let signature = - validate_block(proposed_block, &self.signer, self.validated_transactions.clone()) - .await - .map_err(|err| { - tonic::Status::invalid_argument(format!("Failed to validate block: {err}",)) - })?; + validate_block(proposed_block, &self.signer, &self.db).await.map_err(|err| { + tonic::Status::invalid_argument(format!( + "Failed to validate block: {}", + err.as_report() + )) + })?; // Send the signature. info_span!("serialize").in_scope(|| { diff --git a/crates/validator/src/tx_validation/mod.rs b/crates/validator/src/tx_validation/mod.rs index 20d610aca..f2d1250a2 100644 --- a/crates/validator/src/tx_validation/mod.rs +++ b/crates/validator/src/tx_validation/mod.rs @@ -1,11 +1,15 @@ mod data_store; +mod validated_tx; pub use data_store::TransactionInputsDataStore; use miden_protocol::MIN_PROOF_SECURITY_LEVEL; use miden_protocol::transaction::{ProvenTransaction, TransactionHeader, TransactionInputs}; use miden_tx::auth::UnreachableAuth; use miden_tx::{TransactionExecutor, TransactionExecutorError, TransactionVerifier}; -use tracing::{Instrument, info_span}; +use tracing::{Instrument, info_span, instrument}; +pub use validated_tx::ValidatedTransaction; + +use crate::COMPONENT; // TRANSACTION VALIDATION ERROR // ================================================================================================ @@ -30,10 +34,11 @@ pub enum TransactionValidationError { /// provided proven transaction. /// /// Returns the header of the executed transaction if successful. +#[instrument(target = COMPONENT, skip_all, err)] pub async fn validate_transaction( proven_tx: ProvenTransaction, tx_inputs: TransactionInputs, -) -> Result { +) -> Result { // First, verify the transaction proof info_span!("verify").in_scope(|| { let tx_verifier = TransactionVerifier::new(MIN_PROOF_SECURITY_LEVEL); @@ -56,7 +61,7 @@ pub async fn validate_transaction( let executed_tx_header: TransactionHeader = (&executed_tx).into(); let proven_tx_header: TransactionHeader = (&proven_tx).into(); if executed_tx_header == proven_tx_header { - Ok(executed_tx_header) + Ok(ValidatedTransaction::new(executed_tx)) } else { Err(TransactionValidationError::Mismatch { proven_tx_header: proven_tx_header.into(), diff --git a/crates/validator/src/tx_validation/validated_tx.rs b/crates/validator/src/tx_validation/validated_tx.rs new file mode 100644 index 000000000..3ee7dfa45 --- /dev/null +++ b/crates/validator/src/tx_validation/validated_tx.rs @@ -0,0 +1,38 @@ +use miden_protocol::account::AccountId; +use miden_protocol::block::BlockNumber; +use miden_protocol::transaction::{ExecutedTransaction, TransactionId}; +use miden_tx::utils::Serializable; + +/// Re-executed and validated transaction that the Validator, or some ad-hoc +/// auditing procedure, might need to analyze. +/// +/// Constructed from an [`ExecutedTransaction`] that the Validator would have created while +/// re-executing and validating a [`miden_protocol::transaction::ProvenTransaction`]. +pub struct ValidatedTransaction(ExecutedTransaction); + +impl ValidatedTransaction { + /// Creates a new instance of [`ValidatedTransactionInfo`]. + pub fn new(tx: ExecutedTransaction) -> Self { + Self(tx) + } + + /// Returns ID of the transaction. + pub fn tx_id(&self) -> TransactionId { + self.0.id() + } + + /// Returns the block number in which the transaction was executed. + pub fn block_num(&self) -> BlockNumber { + self.0.block_header().block_num() + } + + /// Returns ID of the account against which this transaction was executed. + pub fn account_id(&self) -> AccountId { + self.0.account_delta().id() + } + + /// Returns the binary representation of the transaction info. + pub fn to_bytes(&self) -> Vec { + self.0.to_bytes() + } +}