diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f5..a30d7070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12491,6 +12491,17 @@ dependencies = [ "uuid", ] +[[package]] +name = "tips-common" +version = "0.1.0" +dependencies = [ + "alloy-consensus", + "alloy-primitives", + "jsonrpsee 0.26.0", + "op-alloy-consensus", + "tokio", +] + [[package]] name = "tips-datastore" version = "0.1.0" @@ -12528,12 +12539,17 @@ dependencies = [ "jsonrpsee 0.26.0", "op-alloy-consensus", "op-alloy-network", + "op-alloy-rpc-types", "op-revm", "rdkafka", + "reth-optimism-cli", "reth-optimism-evm", + "reth-optimism-node", "reth-rpc-eth-types", "revm-context-interface", "serde_json", + "tips-common", + "tips-rpc-exex", "tokio", "tracing", "tracing-subscriber 0.3.20", @@ -12585,6 +12601,37 @@ dependencies = [ "uuid", ] +[[package]] +name = "tips-rpc-exex" +version = "0.1.0" +dependencies = [ + "alloy-consensus", + "alloy-primitives", + "alloy-provider", + "alloy-signer-local", + "async-trait", + "clap", + "eyre", + "futures", + "jsonrpsee 0.26.0", + "op-alloy-consensus", + "op-alloy-network", + "op-alloy-rpc-types", + "op-revm", + "reth", + "reth-exex", + "reth-node-api", + "reth-optimism-cli", + "reth-optimism-evm", + "reth-optimism-node", + "reth-primitives", + "reth-rpc-eth-types", + "revm-context-interface", + "tips-common", + "tokio", + "tracing", +] + [[package]] name = "tokio" version = "1.47.1" diff --git a/Cargo.toml b/Cargo.toml index 9114e9f3..f1071d1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://github.com/base/tips" repository = "https://github.com/base/tips" [workspace] -members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer"] +members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/rpc-exex", "crates/common"] resolver = "2" [workspace.dependencies] @@ -15,12 +15,19 @@ tips-datastore = { path = "crates/datastore" } tips-audit = { path = "crates/audit" } tips-maintenance = { path = "crates/maintenance" } tips-ingress-writer = { path = "crates/ingress-writer" } +tips-rpc-exex = { path = "crates/rpc-exex" } +tips-common = { path = "crates/common" } # Reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } base-reth-flashblocks-rpc = { git = "https://github.com/base/node-reth", rev = "a1ae148a36354c88b356f80281fef12dad9f7737" } +reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } # alloy alloy-primitives = { version = "1.3.1", default-features = false, features = [ @@ -72,3 +79,6 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +# tips-exex +futures = "0.3" diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml new file mode 100644 index 00000000..99479b18 --- /dev/null +++ b/crates/common/Cargo.toml @@ -0,0 +1,15 @@ +[package] +name = "tips-common" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +op-alloy-consensus.workspace = true +alloy-primitives.workspace = true +alloy-consensus.workspace = true +tokio.workspace = true +jsonrpsee.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs new file mode 100644 index 00000000..ee8567f6 --- /dev/null +++ b/crates/common/src/lib.rs @@ -0,0 +1,13 @@ +use alloy_consensus::transaction::Recovered; +use alloy_primitives::{Address, Bytes}; +use jsonrpsee::core::RpcResult; +use op_alloy_consensus::OpTxEnvelope; +use tokio::sync::oneshot; + +#[derive(Debug)] +pub struct ValidationData { + pub address: Address, + pub tx: Recovered, + pub data: Bytes, + pub response_tx: oneshot::Sender>, +} diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b3..5a1f5f49 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,8 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +tips-rpc-exex.workspace = true +reth-optimism-cli.workspace = true +reth-optimism-node.workspace = true +op-alloy-rpc-types.workspace = true +tips-common.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc02..5d1570a6 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -4,15 +4,19 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_node::OpNode; use std::fs; use std::net::IpAddr; +use tips_common::ValidationData; +use tips_rpc_exex::RpcExEx; +use tokio::sync::mpsc; use tracing::{info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; mod queue; mod service; -mod validation; use queue::KafkaQueuePublisher; use service::{IngressApiServer, IngressService}; @@ -50,6 +54,10 @@ struct Config { #[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")] log_level: String, + /// Chain to connect to + #[arg(long, env = "TIPS_INGRESS_CHAIN", default_value = "base-sepolia")] + chain: String, + /// Default lifetime for sent transactions in seconds (default: 3 hours) #[arg( long, @@ -59,8 +67,7 @@ struct Config { send_transaction_default_lifetime_seconds: u64, } -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); let config = Config::parse(); @@ -94,35 +101,67 @@ async fn main() -> anyhow::Result<()> { mempool_url = %config.mempool_url ); - let provider: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(config.mempool_url); - - let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?; - - let queue_producer: FutureProducer = client_config.create()?; - - let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); - - let service = IngressService::new( - provider, - config.dual_write_mempool, - queue, - config.send_transaction_default_lifetime_seconds, - ); - let bind_addr = format!("{}:{}", config.address, config.port); + let args = vec![ + env!("CARGO_BIN_NAME"), + "node", + "--chain", + &config.chain, + "-v", + ]; + Cli::::try_parse_from(args)? + .run(|builder, _| async move { + let provider: RootProvider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(config.mempool_url.clone()); + + let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties) + .expect("Failed to load kafka config"); + + let queue_producer: FutureProducer = client_config.create()?; + + let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic.clone()); + + // Create mpsc channel for communication between service and exex to forward txs to validate + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + + let service = IngressService::new( + provider, + config.dual_write_mempool, + queue, + config.send_transaction_default_lifetime_seconds, + tx_sender, + ); + let bind_addr = format!("{}:{}", config.address, config.port); - let server = Server::builder().build(&bind_addr).await?; - let addr = server.local_addr()?; - let handle = server.start(service.into_rpc()); + let server = Server::builder().build(&bind_addr).await?; + let addr = server.local_addr()?; + let server_handle = server.start(service.into_rpc()); - info!( - message = "Ingress RPC server started", - address = %addr - ); + info!( + message = "Ingress RPC server started", + address = %addr + ); + let exex_handle = builder + .node(OpNode::default()) + .install_exex("tips-rpc-exex", move |ctx| async move { + Ok(RpcExEx::new(ctx, tx_receiver).run()) + }) + .launch() + .await?; + + tokio::select! { + _ = server_handle.stopped() => { + info!("Ingress RPC server stopped"); + } + _ = exex_handle.wait_for_node_exit() => { + info!("RPC ExEx stopped"); + } + } + Ok(()) + }) + .map_err(|e| anyhow::anyhow!(e))?; - handle.stopped().await; Ok(()) } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d4..14636cc7 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,4 +1,3 @@ -use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx}; use alloy_consensus::transaction::SignerRecoverable; use alloy_primitives::{B256, Bytes}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; @@ -11,6 +10,8 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; +use tips_common::ValidationData; +use tokio::sync::{mpsc, oneshot}; use tracing::{info, warn}; use crate::queue::QueuePublisher; @@ -35,6 +36,7 @@ pub struct IngressService { dual_write_mempool: bool, queue: Queue, send_transaction_default_lifetime_seconds: u64, + tx_sender: mpsc::UnboundedSender, } impl IngressService { @@ -43,12 +45,14 @@ impl IngressService { dual_write_mempool: bool, queue: Queue, send_transaction_default_lifetime_seconds: u64, + tx_sender: mpsc::UnboundedSender, ) -> Self { Self { provider, dual_write_mempool, queue, send_transaction_default_lifetime_seconds, + tx_sender, } } } @@ -87,12 +91,38 @@ where .try_into_recovered() .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - let mut l1_block_info = self.provider.fetch_l1_block_info().await?; - let account = self - .provider - .fetch_account_info(transaction.signer()) - .await?; - validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + // Send transaction data to ExEx for validation + // TODO: in the endgame version, this would push to a Redis cluster which the "Store" ExEx + // would read from. Instead, we are implementing v1 of this by having the ingress-rpc + // forward to the rpc-exex. + let (response_tx, response_rx) = oneshot::channel(); + let validation_data = ValidationData { + address: transaction.signer(), + tx: transaction.clone(), + data: data.clone(), + response_tx, + }; + + if let Err(e) = self.tx_sender.send(validation_data) { + warn!(message = "Failed to send transaction to ExEx", error = %e); + // TODO: error on here? + } + + // Wait for validation result from ExEx + match response_rx.await { + Ok(Ok(())) => { + // Validation successful, continue processing + } + Ok(Err(validation_error)) => { + // Validation failed, return the error + return Err(validation_error); + } + Err(_) => { + // Channel was dropped, ExEx is not responding + warn!(message = "ExEx validation channel dropped"); + // TODO: error on here? + } + } let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml new file mode 100644 index 00000000..4962b6ec --- /dev/null +++ b/crates/rpc-exex/Cargo.toml @@ -0,0 +1,35 @@ +[package] +name = "tips-rpc-exex" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +reth-exex.workspace = true +reth.workspace = true +eyre.workspace = true +tracing.workspace = true +reth-optimism-node.workspace = true +reth-optimism-cli.workspace = true +reth-primitives.workspace = true +tokio.workspace = true +futures.workspace = true +alloy-consensus.workspace = true +alloy-primitives.workspace = true +alloy-provider.workspace = true +async-trait.workspace = true +jsonrpsee.workspace = true +op-alloy-consensus.workspace = true +op-alloy-network.workspace = true +op-revm.workspace = true +reth-optimism-evm.workspace = true +reth-rpc-eth-types.workspace = true +revm-context-interface.workspace = true +alloy-signer-local.workspace = true +clap.workspace = true +reth-node-api.workspace = true +op-alloy-rpc-types.workspace = true +tips-common.workspace = true diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs new file mode 100644 index 00000000..72ac3bd4 --- /dev/null +++ b/crates/rpc-exex/src/lib.rs @@ -0,0 +1,113 @@ +use alloy_consensus::constants::KECCAK_EMPTY; +use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; +use alloy_primitives::Address; +use eyre::Result; +use futures::StreamExt; +use op_revm::l1block::L1BlockInfo; +use reth::api::FullNodeComponents; +use reth::providers::AccountReader; +use reth::providers::BlockReaderIdExt; +use reth::providers::TransactionVariant; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::Block; +use reth_node_api::BlockBody; +use reth_optimism_evm::extract_l1_info_from_tx; +use reth_primitives::RecoveredBlock; +use tips_common::ValidationData; +use tokio::sync::mpsc; +use tracing::{debug, info, warn}; + +mod validation; + +pub struct RpcExEx +where + Node: FullNodeComponents, +{ + ctx: ExExContext, + tx_receiver: mpsc::UnboundedReceiver, +} + +impl RpcExEx +where + Node: FullNodeComponents, +{ + pub fn new( + ctx: ExExContext, + tx_receiver: mpsc::UnboundedReceiver, + ) -> Self { + Self { ctx, tx_receiver } + } + + pub async fn run(mut self) -> Result<()> { + info!(target = "tips-rpc-exex", "Starting RPC ExEx service"); + + loop { + tokio::select! { + Some(notification) = self.ctx.notifications.next() => { + match notification { + Ok(ExExNotification::ChainCommitted { new }) => { + debug!(committed_chain = ?new.range(), "Received commit"); + self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReorged { old, new }) => { + debug!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReverted { old }) => { + debug!(reverted_chain = ?old.range(), "Received revert"); + self.ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?; + } + Err(e) => { + debug!(target = "tips-rpc-exex", error = %e, "Error receiving notification"); + return Err(e); + } + } + } + Some(validation_data) = self.tx_receiver.recv() => { + info!(target = "tips-rpc-exex", "Received transaction data for validation"); + + let block = self.ctx + .provider() + .block_with_senders_by_id(BlockId::Number(BlockNumberOrTag::Latest), TransactionVariant::WithHash)? + .ok_or_else(|| eyre::eyre!("latest block not found"))?; + + let mut l1_info = self.fetch_l1_block_info(&block)?; + let account = self.fetch_account_info(validation_data.address)?; + let res = validation::validate_tx(account, &validation_data.tx, &validation_data.data, &mut l1_info).await; + + if validation_data.response_tx.send(res).is_err() { + warn!(target = "tips-rpc-exex", "Failed to send validation response - receiver dropped"); + } + } + } + } + } + + fn fetch_l1_block_info(&mut self, block: &RecoveredBlock) -> Result + where + B: Block, + { + // TODO: this errors on empty blocks, need to figure out how to handle this + let l1_info = extract_l1_info_from_tx( + block + .body() + .transactions() + .first() + .ok_or_else(|| eyre::eyre!("block contains no transactions"))?, + )?; + Ok(l1_info) + } + + fn fetch_account_info(&mut self, address: Address) -> Result { + let account = self + .ctx + .provider() + .basic_account(&address)? + .expect("account not found"); + Ok(validation::AccountInfo { + balance: account.balance, + nonce: account.nonce, + code_hash: account.bytecode_hash.unwrap_or(KECCAK_EMPTY), + }) + } +} diff --git a/crates/ingress-rpc/src/validation.rs b/crates/rpc-exex/src/validation.rs similarity index 85% rename from crates/ingress-rpc/src/validation.rs rename to crates/rpc-exex/src/validation.rs index cacc66d8..8f2881a7 100644 --- a/crates/ingress-rpc/src/validation.rs +++ b/crates/rpc-exex/src/validation.rs @@ -1,14 +1,9 @@ -use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; -use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered}; -use alloy_primitives::{Address, B256, U256}; -use alloy_provider::{Provider, RootProvider}; -use async_trait::async_trait; +use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::Recovered}; +use alloy_primitives::{B256, U256}; use jsonrpsee::core::RpcResult; use op_alloy_consensus::interop::CROSS_L2_INBOX_ADDRESS; -use op_alloy_network::Optimism; use op_revm::{OpSpecId, l1block::L1BlockInfo}; -use reth_optimism_evm::extract_l1_info_from_tx; -use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError}; +use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError}; use tracing::warn; /// Account info for a given address @@ -18,64 +13,6 @@ pub struct AccountInfo { pub code_hash: B256, } -/// Interface for fetching account info for a given address -#[async_trait] -pub trait AccountInfoLookup: Send + Sync { - async fn fetch_account_info(&self, address: Address) -> RpcResult; -} - -/// Implementation of the `AccountInfoLookup` trait for the `RootProvider` -#[async_trait] -impl AccountInfoLookup for RootProvider { - async fn fetch_account_info(&self, address: Address) -> RpcResult { - let account = self - .get_account(address) - .await - .map_err(|_| EthApiError::Signing(SignError::NoAccount))?; - Ok(AccountInfo { - balance: account.balance, - nonce: account.nonce, - code_hash: account.code_hash, - }) - } -} - -/// Interface for fetching L1 block info for a given block number -#[async_trait] -pub trait L1BlockInfoLookup: Send + Sync { - async fn fetch_l1_block_info(&self) -> RpcResult; -} - -/// Implementation of the `L1BlockInfoLookup` trait for the `RootProvider` -#[async_trait] -impl L1BlockInfoLookup for RootProvider { - async fn fetch_l1_block_info(&self) -> RpcResult { - let block = self - .get_block(BlockId::Number(BlockNumberOrTag::Latest)) - .full() - .await - .map_err(|e| { - warn!(message = "failed to fetch latest block", err = %e); - EthApiError::InternalEthError.into_rpc_err() - })? - .ok_or_else(|| { - warn!(message = "empty latest block returned"); - EthApiError::InternalEthError.into_rpc_err() - })?; - - let txs = block.transactions.clone(); - let first_tx = txs.first_transaction().ok_or_else(|| { - warn!(message = "block contains no transactions"); - EthApiError::InternalEthError.into_rpc_err() - })?; - - Ok(extract_l1_info_from_tx(&first_tx.clone()).map_err(|e| { - warn!(message = "failed to extract l1_info from tx", err = %e); - EthApiError::InternalEthError.into_rpc_err() - })?) - } -} - /// Helper function to validate a transaction. A valid transaction must satisfy the following criteria: /// - If the transaction is not EIP-4844 /// - If the transaction is not a cross chain tx @@ -164,6 +101,7 @@ mod tests { use alloy_consensus::SignableTransaction; use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::SignerRecoverable}; use alloy_consensus::{TxEip1559, TxEip4844, TxEip7702}; + use alloy_primitives::Address; use alloy_primitives::{bytes, keccak256}; use alloy_signer_local::PrivateKeySigner; use op_alloy_consensus::OpTxEnvelope;