From 44f6d0c30b710b184fe0fc42d85c953a90946678 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 13:32:57 -0400 Subject: [PATCH 01/13] spike --- Cargo.lock | 15 +++++++++++++++ Cargo.toml | 10 +++++++++- crates/rpc-exex/Cargo.toml | 19 +++++++++++++++++++ crates/rpc-exex/src/lib.rs | 32 ++++++++++++++++++++++++++++++++ 4 files changed, 75 insertions(+), 1 deletion(-) create mode 100644 crates/rpc-exex/Cargo.toml create mode 100644 crates/rpc-exex/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 4d6f59f5..c1893d76 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12585,6 +12585,21 @@ dependencies = [ "uuid", ] +[[package]] +name = "tips-rpc-exex" +version = "0.1.0" +dependencies = [ + "eyre", + "futures", + "reth", + "reth-exex", + "reth-optimism-cli", + "reth-optimism-node", + "reth-primitives", + "tokio", + "tracing", +] + [[package]] name = "tokio" version = "1.47.1" diff --git a/Cargo.toml b/Cargo.toml index 9114e9f3..d2bc89bd 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://github.com/base/tips" repository = "https://github.com/base/tips" [workspace] -members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer"] +members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/rpc-exex"] resolver = "2" [workspace.dependencies] @@ -15,12 +15,17 @@ tips-datastore = { path = "crates/datastore" } tips-audit = { path = "crates/audit" } tips-maintenance = { path = "crates/maintenance" } tips-ingress-writer = { path = "crates/ingress-writer" } +tips-rpc-exex = { path = "crates/rpc-exex" } # Reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } base-reth-flashblocks-rpc = { git = "https://github.com/base/node-reth", rev = "a1ae148a36354c88b356f80281fef12dad9f7737" } +reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } # alloy alloy-primitives = { version = "1.3.1", default-features = false, features = [ @@ -72,3 +77,6 @@ backon = "1.5.2" op-revm = { version = "10.1.0", default-features = false } revm-context-interface = "10.2.0" alloy-signer-local = "1.0.36" + +# tips-exex +futures = "0.3" diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml new file mode 100644 index 00000000..d223c1cb --- /dev/null +++ b/crates/rpc-exex/Cargo.toml @@ -0,0 +1,19 @@ +[package] +name = "tips-rpc-exex" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +reth-exex.workspace = true +reth.workspace = true +eyre.workspace = true +tracing.workspace = true +reth-optimism-node.workspace = true +reth-optimism-cli.workspace = true +reth-primitives.workspace = true +tokio.workspace = true +futures.workspace = true diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs new file mode 100644 index 00000000..e0a44435 --- /dev/null +++ b/crates/rpc-exex/src/lib.rs @@ -0,0 +1,32 @@ +use eyre::Result; +use futures::StreamExt; +use reth::api::FullNodeComponents; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use tracing::{debug, info}; + +pub async fn rpc_exex(mut ctx: ExExContext) -> Result<()> { + loop { + tokio::select! { + Some(notification) = ctx.notifications.next() => { + match notification { + Ok(ExExNotification::ChainCommitted { new }) => { + info!(committed_chain = ?new.range(), "Received commit"); + ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReorged { old, new }) => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReverted { old }) => { + info!(reverted_chain = ?old.range(), "Received revert"); + ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?; + } + Err(e) => { + debug!(target = "tips-rpc-exex", error = %e, "Error receiving notification"); + return Err(e); + } + } + } + } + } +} From 50b4a63c782b46d62206c61e0a57d52d82ce0c41 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 13:51:45 -0400 Subject: [PATCH 02/13] move validation --- Cargo.lock | 12 ++++++++++++ crates/rpc-exex/Cargo.toml | 12 ++++++++++++ crates/rpc-exex/src/lib.rs | 2 ++ crates/{ingress-rpc => rpc-exex}/src/validation.rs | 0 4 files changed, 26 insertions(+) rename crates/{ingress-rpc => rpc-exex}/src/validation.rs (100%) diff --git a/Cargo.lock b/Cargo.lock index c1893d76..ef5fd0d0 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12589,13 +12589,25 @@ dependencies = [ name = "tips-rpc-exex" version = "0.1.0" dependencies = [ + "alloy-consensus", + "alloy-primitives", + "alloy-provider", + "alloy-signer-local", + "async-trait", "eyre", "futures", + "jsonrpsee 0.26.0", + "op-alloy-consensus", + "op-alloy-network", + "op-revm", "reth", "reth-exex", "reth-optimism-cli", + "reth-optimism-evm", "reth-optimism-node", "reth-primitives", + "reth-rpc-eth-types", + "revm-context-interface", "tokio", "tracing", ] diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml index d223c1cb..00b30110 100644 --- a/crates/rpc-exex/Cargo.toml +++ b/crates/rpc-exex/Cargo.toml @@ -17,3 +17,15 @@ reth-optimism-cli.workspace = true reth-primitives.workspace = true tokio.workspace = true futures.workspace = true +alloy-consensus.workspace = true +alloy-primitives.workspace = true +alloy-provider.workspace = true +async-trait.workspace = true +jsonrpsee.workspace = true +op-alloy-consensus.workspace = true +op-alloy-network.workspace = true +op-revm.workspace = true +reth-optimism-evm.workspace = true +reth-rpc-eth-types.workspace = true +revm-context-interface.workspace = true +alloy-signer-local.workspace = true diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs index e0a44435..1923e91e 100644 --- a/crates/rpc-exex/src/lib.rs +++ b/crates/rpc-exex/src/lib.rs @@ -4,6 +4,8 @@ use reth::api::FullNodeComponents; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use tracing::{debug, info}; +mod validation; + pub async fn rpc_exex(mut ctx: ExExContext) -> Result<()> { loop { tokio::select! { diff --git a/crates/ingress-rpc/src/validation.rs b/crates/rpc-exex/src/validation.rs similarity index 100% rename from crates/ingress-rpc/src/validation.rs rename to crates/rpc-exex/src/validation.rs From 664377780a0cef65840baa56c65a50f80db72717 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 14:37:35 -0400 Subject: [PATCH 03/13] setup exex --- Cargo.lock | 1 + crates/ingress-rpc/src/main.rs | 3 +- crates/rpc-exex/Cargo.toml | 1 + crates/rpc-exex/src/lib.rs | 34 --------------- crates/rpc-exex/src/main.rs | 79 ++++++++++++++++++++++++++++++++++ 5 files changed, 83 insertions(+), 35 deletions(-) delete mode 100644 crates/rpc-exex/src/lib.rs create mode 100644 crates/rpc-exex/src/main.rs diff --git a/Cargo.lock b/Cargo.lock index ef5fd0d0..4af3db95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12594,6 +12594,7 @@ dependencies = [ "alloy-provider", "alloy-signer-local", "async-trait", + "clap", "eyre", "futures", "jsonrpsee 0.26.0", diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 6400bc02..c6a4cab7 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -4,15 +4,16 @@ use jsonrpsee::server::Server; use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; +use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; use std::fs; use std::net::IpAddr; +use tips_rpc_exex::RpcExEx; use tracing::{info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; mod queue; mod service; -mod validation; use queue::KafkaQueuePublisher; use service::{IngressApiServer, IngressService}; diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml index 00b30110..8df46821 100644 --- a/crates/rpc-exex/Cargo.toml +++ b/crates/rpc-exex/Cargo.toml @@ -29,3 +29,4 @@ reth-optimism-evm.workspace = true reth-rpc-eth-types.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true +clap.workspace = true diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs deleted file mode 100644 index 1923e91e..00000000 --- a/crates/rpc-exex/src/lib.rs +++ /dev/null @@ -1,34 +0,0 @@ -use eyre::Result; -use futures::StreamExt; -use reth::api::FullNodeComponents; -use reth_exex::{ExExContext, ExExEvent, ExExNotification}; -use tracing::{debug, info}; - -mod validation; - -pub async fn rpc_exex(mut ctx: ExExContext) -> Result<()> { - loop { - tokio::select! { - Some(notification) = ctx.notifications.next() => { - match notification { - Ok(ExExNotification::ChainCommitted { new }) => { - info!(committed_chain = ?new.range(), "Received commit"); - ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; - } - Ok(ExExNotification::ChainReorged { old, new }) => { - info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); - ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; - } - Ok(ExExNotification::ChainReverted { old }) => { - info!(reverted_chain = ?old.range(), "Received revert"); - ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?; - } - Err(e) => { - debug!(target = "tips-rpc-exex", error = %e, "Error receiving notification"); - return Err(e); - } - } - } - } - } -} diff --git a/crates/rpc-exex/src/main.rs b/crates/rpc-exex/src/main.rs new file mode 100644 index 00000000..6ebf9b22 --- /dev/null +++ b/crates/rpc-exex/src/main.rs @@ -0,0 +1,79 @@ +use clap::Parser; +use eyre::Result; +use futures::StreamExt; +use reth::api::FullNodeComponents; +use reth::builder::Node; +use reth::providers::providers::BlockchainProvider; +use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_node::OpNode; +use reth_optimism_node::args::RollupArgs; +use tracing::{debug, info}; + +mod validation; + +pub struct RpcExEx +where + Node: FullNodeComponents, +{ + ctx: ExExContext, +} + +impl RpcExEx +where + Node: FullNodeComponents, +{ + pub fn new(ctx: ExExContext) -> Self { + Self { ctx } + } + + pub async fn run(mut self) -> Result<()> { + info!(target = "tips-rpc-exex", "Starting RPC EXEX service"); + + loop { + tokio::select! { + Some(notification) = self.ctx.notifications.next() => { + match notification { + Ok(ExExNotification::ChainCommitted { new }) => { + info!(committed_chain = ?new.range(), "Received commit"); + self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReorged { old, new }) => { + info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; + } + Ok(ExExNotification::ChainReverted { old }) => { + info!(reverted_chain = ?old.range(), "Received revert"); + self.ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?; + } + Err(e) => { + debug!(target = "tips-rpc-exex", error = %e, "Error receiving notification"); + return Err(e); + } + } + } + } + } + } +} + +fn main() -> Result<()> { + let rollup_args = RollupArgs { + disable_txpool_gossip: true, + ..Default::default() + }; + Cli::::parse().run(|builder, _| async move { + let handler = builder + .node(OpNode::new(rollup_args)) + .install_exex("tips-rpc-exex", move |ctx| async move { + Ok(RpcExEx::new(ctx).run()) + }) + .launch() + .await?; + + handler.wait_for_node_exit().await?; + Ok(()) + })?; + + Ok(()) +} From 93074fad9e07ea543dc2378c626c2683646e4c24 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 15:52:08 -0400 Subject: [PATCH 04/13] basic framework --- Cargo.lock | 2 ++ Cargo.toml | 1 + crates/rpc-exex/Cargo.toml | 2 ++ crates/rpc-exex/src/main.rs | 55 +++++++++++++++++++++++++++++++++++-- 4 files changed, 58 insertions(+), 2 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 4af3db95..744131ab 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12600,9 +12600,11 @@ dependencies = [ "jsonrpsee 0.26.0", "op-alloy-consensus", "op-alloy-network", + "op-alloy-rpc-types", "op-revm", "reth", "reth-exex", + "reth-node-api", "reth-optimism-cli", "reth-optimism-evm", "reth-optimism-node", diff --git a/Cargo.toml b/Cargo.toml index d2bc89bd..4ae6d6a5 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -26,6 +26,7 @@ reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } +reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } # alloy alloy-primitives = { version = "1.3.1", default-features = false, features = [ diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml index 8df46821..4b3d37e3 100644 --- a/crates/rpc-exex/Cargo.toml +++ b/crates/rpc-exex/Cargo.toml @@ -30,3 +30,5 @@ reth-rpc-eth-types.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true clap.workspace = true +reth-node-api.workspace = true +op-alloy-rpc-types.workspace = true diff --git a/crates/rpc-exex/src/main.rs b/crates/rpc-exex/src/main.rs index 6ebf9b22..f510284b 100644 --- a/crates/rpc-exex/src/main.rs +++ b/crates/rpc-exex/src/main.rs @@ -1,13 +1,21 @@ +use alloy_consensus::constants::KECCAK_EMPTY; +use alloy_consensus::transaction::Recovered; +use alloy_primitives::Address; use clap::Parser; use eyre::Result; use futures::StreamExt; +use op_alloy_rpc_types::Transaction; +use op_revm::l1block::L1BlockInfo; use reth::api::FullNodeComponents; -use reth::builder::Node; -use reth::providers::providers::BlockchainProvider; +use reth::providers::AccountReader; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; +use reth_node_api::Block; +use reth_node_api::BlockBody; use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_evm::extract_l1_info_from_tx; use reth_optimism_node::OpNode; use reth_optimism_node::args::RollupArgs; +use reth_primitives::RecoveredBlock; use tracing::{debug, info}; mod validation; @@ -55,6 +63,49 @@ where } } } + + pub async fn validate_tx( + &mut self, + block: &RecoveredBlock, + address: Address, + tx: &Recovered, + data: &[u8], + ) -> Result<()> + where + B: Block, + { + let mut l1_info = self.fetch_l1_block_info(block)?; + let account = self.fetch_account_info(address)?; + validation::validate_tx(account, tx, data, &mut l1_info).await?; + Ok(()) + } + + fn fetch_l1_block_info(&mut self, block: &RecoveredBlock) -> Result + where + B: Block, + { + let l1_info = extract_l1_info_from_tx( + block + .body() + .transactions() + .first() + .expect("block contains no transactions"), + )?; + Ok(l1_info) + } + + fn fetch_account_info(&mut self, address: Address) -> Result { + let account = self + .ctx + .provider() + .basic_account(&address)? + .expect("account not found"); + Ok(validation::AccountInfo { + balance: account.balance, + nonce: account.nonce, + code_hash: account.bytecode_hash.unwrap_or(KECCAK_EMPTY), + }) + } } fn main() -> Result<()> { From 6d2e44a103bb43d918151b9154d39bd0d28a3e69 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 16:28:35 -0400 Subject: [PATCH 05/13] put rpc-exex into ingress-rpc --- Cargo.lock | 3 +++ crates/ingress-rpc/Cargo.toml | 3 +++ crates/ingress-rpc/src/main.rs | 32 +++++++++++++++++++++++-- crates/rpc-exex/src/{main.rs => lib.rs} | 14 +++++------ 4 files changed, 42 insertions(+), 10 deletions(-) rename crates/rpc-exex/src/{main.rs => lib.rs} (88%) diff --git a/Cargo.lock b/Cargo.lock index 744131ab..29bf244b 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12530,10 +12530,13 @@ dependencies = [ "op-alloy-network", "op-revm", "rdkafka", + "reth-optimism-cli", "reth-optimism-evm", + "reth-optimism-node", "reth-rpc-eth-types", "revm-context-interface", "serde_json", + "tips-rpc-exex", "tokio", "tracing", "tracing-subscriber 0.3.20", diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index 13a581b3..bec13e55 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -35,3 +35,6 @@ op-revm.workspace = true revm-context-interface.workspace = true alloy-signer-local.workspace = true reth-optimism-evm.workspace = true +tips-rpc-exex.workspace = true +reth-optimism-cli.workspace = true +reth-optimism-node.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index c6a4cab7..eb025c2f 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -5,6 +5,8 @@ use op_alloy_network::Optimism; use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; +use reth_optimism_node::OpNode; +use reth_optimism_node::args::RollupArgs; use std::fs; use std::net::IpAddr; use tips_rpc_exex::RpcExEx; @@ -116,14 +118,40 @@ async fn main() -> anyhow::Result<()> { let server = Server::builder().build(&bind_addr).await?; let addr = server.local_addr()?; - let handle = server.start(service.into_rpc()); + let server_handle = server.start(service.into_rpc()); info!( message = "Ingress RPC server started", address = %addr ); - handle.stopped().await; + let rollup_args = RollupArgs { + disable_txpool_gossip: true, + ..Default::default() + }; + + Cli::::parse() + .run(|builder, _| async move { + let exex_handle = builder + .node(OpNode::new(rollup_args)) + .install_exex("tips-rpc-exex", move |ctx| async move { + Ok(RpcExEx::new(ctx).run()) + }) + .launch() + .await?; + + tokio::select! { + _ = server_handle.stopped() => { + info!("Ingress RPC server stopped"); + } + _ = exex_handle.wait_for_node_exit() => { + info!("RPC ExEx stopped"); + } + } + Ok(()) + }) + .map_err(|e| anyhow::anyhow!(e))?; + Ok(()) } diff --git a/crates/rpc-exex/src/main.rs b/crates/rpc-exex/src/lib.rs similarity index 88% rename from crates/rpc-exex/src/main.rs rename to crates/rpc-exex/src/lib.rs index f510284b..5d76b736 100644 --- a/crates/rpc-exex/src/main.rs +++ b/crates/rpc-exex/src/lib.rs @@ -1,7 +1,6 @@ use alloy_consensus::constants::KECCAK_EMPTY; use alloy_consensus::transaction::Recovered; use alloy_primitives::Address; -use clap::Parser; use eyre::Result; use futures::StreamExt; use op_alloy_rpc_types::Transaction; @@ -11,10 +10,7 @@ use reth::providers::AccountReader; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::Block; use reth_node_api::BlockBody; -use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; use reth_optimism_evm::extract_l1_info_from_tx; -use reth_optimism_node::OpNode; -use reth_optimism_node::args::RollupArgs; use reth_primitives::RecoveredBlock; use tracing::{debug, info}; @@ -36,22 +32,22 @@ where } pub async fn run(mut self) -> Result<()> { - info!(target = "tips-rpc-exex", "Starting RPC EXEX service"); + info!(target = "tips-rpc-exex", "Starting RPC ExEx service"); loop { tokio::select! { Some(notification) = self.ctx.notifications.next() => { match notification { Ok(ExExNotification::ChainCommitted { new }) => { - info!(committed_chain = ?new.range(), "Received commit"); + debug!(committed_chain = ?new.range(), "Received commit"); self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; } Ok(ExExNotification::ChainReorged { old, new }) => { - info!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); + debug!(from_chain = ?old.range(), to_chain = ?new.range(), "Received reorg"); self.ctx.events.send(ExExEvent::FinishedHeight(new.tip().num_hash()))?; } Ok(ExExNotification::ChainReverted { old }) => { - info!(reverted_chain = ?old.range(), "Received revert"); + debug!(reverted_chain = ?old.range(), "Received revert"); self.ctx.events.send(ExExEvent::FinishedHeight(old.tip().num_hash()))?; } Err(e) => { @@ -108,6 +104,7 @@ where } } +/* fn main() -> Result<()> { let rollup_args = RollupArgs { disable_txpool_gossip: true, @@ -128,3 +125,4 @@ fn main() -> Result<()> { Ok(()) } +*/ From 437bccfa95d9a8f3b8f9a641e557b72a3b555967 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 17:14:34 -0400 Subject: [PATCH 06/13] add tips-common and integrate --- Cargo.lock | 12 ++++++++ Cargo.toml | 3 +- crates/common/Cargo.toml | 13 ++++++++ crates/common/src/lib.rs | 10 ++++++ crates/ingress-rpc/Cargo.toml | 2 ++ crates/ingress-rpc/src/main.rs | 9 ++++-- crates/ingress-rpc/src/service.rs | 27 +++++++++++++--- crates/rpc-exex/Cargo.toml | 1 + crates/rpc-exex/src/lib.rs | 51 ++++++++++++++----------------- 9 files changed, 92 insertions(+), 36 deletions(-) create mode 100644 crates/common/Cargo.toml create mode 100644 crates/common/src/lib.rs diff --git a/Cargo.lock b/Cargo.lock index 29bf244b..f8c87e7a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12491,6 +12491,15 @@ dependencies = [ "uuid", ] +[[package]] +name = "tips-common" +version = "0.1.0" +dependencies = [ + "alloy-consensus", + "alloy-primitives", + "op-alloy-consensus", +] + [[package]] name = "tips-datastore" version = "0.1.0" @@ -12528,6 +12537,7 @@ dependencies = [ "jsonrpsee 0.26.0", "op-alloy-consensus", "op-alloy-network", + "op-alloy-rpc-types", "op-revm", "rdkafka", "reth-optimism-cli", @@ -12536,6 +12546,7 @@ dependencies = [ "reth-rpc-eth-types", "revm-context-interface", "serde_json", + "tips-common", "tips-rpc-exex", "tokio", "tracing", @@ -12614,6 +12625,7 @@ dependencies = [ "reth-primitives", "reth-rpc-eth-types", "revm-context-interface", + "tips-common", "tokio", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index 4ae6d6a5..f1071d1b 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,7 +7,7 @@ homepage = "https://github.com/base/tips" repository = "https://github.com/base/tips" [workspace] -members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/rpc-exex"] +members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/rpc-exex", "crates/common"] resolver = "2" [workspace.dependencies] @@ -16,6 +16,7 @@ tips-audit = { path = "crates/audit" } tips-maintenance = { path = "crates/maintenance" } tips-ingress-writer = { path = "crates/ingress-writer" } tips-rpc-exex = { path = "crates/rpc-exex" } +tips-common = { path = "crates/common" } # Reth reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" } diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml new file mode 100644 index 00000000..acb64245 --- /dev/null +++ b/crates/common/Cargo.toml @@ -0,0 +1,13 @@ +[package] +name = "tips-common" +version.workspace = true +edition.workspace = true +rust-version.workspace = true +license.workspace = true +homepage.workspace = true +repository.workspace = true + +[dependencies] +op-alloy-consensus.workspace = true +alloy-primitives.workspace = true +alloy-consensus.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs new file mode 100644 index 00000000..ec3f279c --- /dev/null +++ b/crates/common/src/lib.rs @@ -0,0 +1,10 @@ +use alloy_consensus::transaction::Recovered; +use alloy_primitives::{Address, Bytes}; +use op_alloy_consensus::OpTxEnvelope; + +#[derive(Debug, Clone)] +pub struct ValidationData { + pub address: Address, + pub tx: Recovered, + pub data: Bytes, +} diff --git a/crates/ingress-rpc/Cargo.toml b/crates/ingress-rpc/Cargo.toml index bec13e55..5a1f5f49 100644 --- a/crates/ingress-rpc/Cargo.toml +++ b/crates/ingress-rpc/Cargo.toml @@ -38,3 +38,5 @@ reth-optimism-evm.workspace = true tips-rpc-exex.workspace = true reth-optimism-cli.workspace = true reth-optimism-node.workspace = true +op-alloy-rpc-types.workspace = true +tips-common.workspace = true diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index eb025c2f..64de1f34 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -9,7 +9,9 @@ use reth_optimism_node::OpNode; use reth_optimism_node::args::RollupArgs; use std::fs; use std::net::IpAddr; +use tips_common::ValidationData; use tips_rpc_exex::RpcExEx; +use tokio::sync::mpsc; use tracing::{info, warn}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; use url::Url; @@ -108,11 +110,15 @@ async fn main() -> anyhow::Result<()> { let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); + // Create mpsc channel for communication between service and exex to forward txs to validate + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let service = IngressService::new( provider, config.dual_write_mempool, queue, config.send_transaction_default_lifetime_seconds, + tx_sender, ); let bind_addr = format!("{}:{}", config.address, config.port); @@ -129,13 +135,12 @@ async fn main() -> anyhow::Result<()> { disable_txpool_gossip: true, ..Default::default() }; - Cli::::parse() .run(|builder, _| async move { let exex_handle = builder .node(OpNode::new(rollup_args)) .install_exex("tips-rpc-exex", move |ctx| async move { - Ok(RpcExEx::new(ctx).run()) + Ok(RpcExEx::new(ctx, tx_receiver).run()) }) .launch() .await?; diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index a48841d4..21218952 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,6 +1,5 @@ -use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx}; -use alloy_consensus::transaction::SignerRecoverable; -use alloy_primitives::{B256, Bytes}; +use alloy_consensus::transaction::{Recovered, SignerRecoverable}; +use alloy_primitives::{Address, B256, Bytes}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use alloy_rpc_types_mev::{EthBundleHash, EthCancelBundle, EthSendBundle}; use jsonrpsee::{ @@ -11,6 +10,8 @@ use op_alloy_consensus::OpTxEnvelope; use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; +use tips_common::ValidationData; +use tokio::sync::mpsc; use tracing::{info, warn}; use crate::queue::QueuePublisher; @@ -35,6 +36,7 @@ pub struct IngressService { dual_write_mempool: bool, queue: Queue, send_transaction_default_lifetime_seconds: u64, + tx_sender: mpsc::UnboundedSender, } impl IngressService { @@ -43,12 +45,14 @@ impl IngressService { dual_write_mempool: bool, queue: Queue, send_transaction_default_lifetime_seconds: u64, + tx_sender: mpsc::UnboundedSender, ) -> Self { Self { provider, dual_write_mempool, queue, send_transaction_default_lifetime_seconds, + tx_sender, } } } @@ -87,12 +91,25 @@ where .try_into_recovered() .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - let mut l1_block_info = self.provider.fetch_l1_block_info().await?; + /*let mut l1_block_info = self.provider.fetch_l1_block_info().await?; let account = self .provider .fetch_account_info(transaction.signer()) .await?; - validate_tx(account, &transaction, &data, &mut l1_block_info).await?; + validate_tx(account, &transaction, &data, &mut l1_block_info).await?;*/ + + // Send transaction data to ExEx for validation + // TODO: in the endgame version, this would push to a Redis cluster which the "Store" ExEx + // would read from. Instead, we are implementing v1 of this by having the ingress-rpc + // forward to the rpc-exex. + let validation_data = ValidationData { + address: transaction.signer(), + tx: transaction.clone(), + data: data.clone(), + }; + if let Err(e) = self.tx_sender.send(validation_data) { + warn!(message = "Failed to send transaction to ExEx", error = %e); + } let expiry_timestamp = SystemTime::now() .duration_since(UNIX_EPOCH) diff --git a/crates/rpc-exex/Cargo.toml b/crates/rpc-exex/Cargo.toml index 4b3d37e3..4962b6ec 100644 --- a/crates/rpc-exex/Cargo.toml +++ b/crates/rpc-exex/Cargo.toml @@ -32,3 +32,4 @@ alloy-signer-local.workspace = true clap.workspace = true reth-node-api.workspace = true op-alloy-rpc-types.workspace = true +tips-common.workspace = true diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs index 5d76b736..0c99e99c 100644 --- a/crates/rpc-exex/src/lib.rs +++ b/crates/rpc-exex/src/lib.rs @@ -1,17 +1,22 @@ use alloy_consensus::constants::KECCAK_EMPTY; +use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; use alloy_consensus::transaction::Recovered; use alloy_primitives::Address; use eyre::Result; use futures::StreamExt; -use op_alloy_rpc_types::Transaction; +use op_alloy_consensus::OpTxEnvelope; use op_revm::l1block::L1BlockInfo; use reth::api::FullNodeComponents; use reth::providers::AccountReader; +use reth::providers::BlockReaderIdExt; +use reth::providers::TransactionVariant; use reth_exex::{ExExContext, ExExEvent, ExExNotification}; use reth_node_api::Block; use reth_node_api::BlockBody; use reth_optimism_evm::extract_l1_info_from_tx; use reth_primitives::RecoveredBlock; +use tips_common::ValidationData; +use tokio::sync::mpsc; use tracing::{debug, info}; mod validation; @@ -21,14 +26,18 @@ where Node: FullNodeComponents, { ctx: ExExContext, + tx_receiver: mpsc::UnboundedReceiver, } impl RpcExEx where Node: FullNodeComponents, { - pub fn new(ctx: ExExContext) -> Self { - Self { ctx } + pub fn new( + ctx: ExExContext, + tx_receiver: mpsc::UnboundedReceiver, + ) -> Self { + Self { ctx, tx_receiver } } pub async fn run(mut self) -> Result<()> { @@ -56,15 +65,24 @@ where } } } + Some(validation_data) = self.tx_receiver.recv() => { + info!(target = "tips-rpc-exex", "Received transaction data for validation"); + + let block = self.ctx + .provider() + .block_with_senders_by_id(BlockId::Number(BlockNumberOrTag::Latest), TransactionVariant::WithHash)? + .expect("latest block not found"); + self.validate_tx(&block, validation_data.address, &validation_data.tx, &validation_data.data).await? + } } } } - pub async fn validate_tx( + async fn validate_tx( &mut self, block: &RecoveredBlock, address: Address, - tx: &Recovered, + tx: &Recovered, data: &[u8], ) -> Result<()> where @@ -103,26 +121,3 @@ where }) } } - -/* -fn main() -> Result<()> { - let rollup_args = RollupArgs { - disable_txpool_gossip: true, - ..Default::default() - }; - Cli::::parse().run(|builder, _| async move { - let handler = builder - .node(OpNode::new(rollup_args)) - .install_exex("tips-rpc-exex", move |ctx| async move { - Ok(RpcExEx::new(ctx).run()) - }) - .launch() - .await?; - - handler.wait_for_node_exit().await?; - Ok(()) - })?; - - Ok(()) -} -*/ From 6bb8357c227da2091c793582985f36ce059b58b2 Mon Sep 17 00:00:00 2001 From: William Law Date: Tue, 21 Oct 2025 17:16:09 -0400 Subject: [PATCH 07/13] remove unused code --- crates/ingress-rpc/src/service.rs | 11 +---- crates/rpc-exex/src/validation.rs | 70 ++----------------------------- 2 files changed, 6 insertions(+), 75 deletions(-) diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 21218952..826319e8 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -1,5 +1,5 @@ -use alloy_consensus::transaction::{Recovered, SignerRecoverable}; -use alloy_primitives::{Address, B256, Bytes}; +use alloy_consensus::transaction::SignerRecoverable; +use alloy_primitives::{B256, Bytes}; use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718}; use alloy_rpc_types_mev::{EthBundleHash, EthCancelBundle, EthSendBundle}; use jsonrpsee::{ @@ -91,13 +91,6 @@ where .try_into_recovered() .map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?; - /*let mut l1_block_info = self.provider.fetch_l1_block_info().await?; - let account = self - .provider - .fetch_account_info(transaction.signer()) - .await?; - validate_tx(account, &transaction, &data, &mut l1_block_info).await?;*/ - // Send transaction data to ExEx for validation // TODO: in the endgame version, this would push to a Redis cluster which the "Store" ExEx // would read from. Instead, we are implementing v1 of this by having the ingress-rpc diff --git a/crates/rpc-exex/src/validation.rs b/crates/rpc-exex/src/validation.rs index cacc66d8..8f2881a7 100644 --- a/crates/rpc-exex/src/validation.rs +++ b/crates/rpc-exex/src/validation.rs @@ -1,14 +1,9 @@ -use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; -use alloy_consensus::{Transaction, Typed2718, constants::KECCAK_EMPTY, transaction::Recovered}; -use alloy_primitives::{Address, B256, U256}; -use alloy_provider::{Provider, RootProvider}; -use async_trait::async_trait; +use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::Recovered}; +use alloy_primitives::{B256, U256}; use jsonrpsee::core::RpcResult; use op_alloy_consensus::interop::CROSS_L2_INBOX_ADDRESS; -use op_alloy_network::Optimism; use op_revm::{OpSpecId, l1block::L1BlockInfo}; -use reth_optimism_evm::extract_l1_info_from_tx; -use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError, SignError}; +use reth_rpc_eth_types::{EthApiError, RpcInvalidTransactionError}; use tracing::warn; /// Account info for a given address @@ -18,64 +13,6 @@ pub struct AccountInfo { pub code_hash: B256, } -/// Interface for fetching account info for a given address -#[async_trait] -pub trait AccountInfoLookup: Send + Sync { - async fn fetch_account_info(&self, address: Address) -> RpcResult; -} - -/// Implementation of the `AccountInfoLookup` trait for the `RootProvider` -#[async_trait] -impl AccountInfoLookup for RootProvider { - async fn fetch_account_info(&self, address: Address) -> RpcResult { - let account = self - .get_account(address) - .await - .map_err(|_| EthApiError::Signing(SignError::NoAccount))?; - Ok(AccountInfo { - balance: account.balance, - nonce: account.nonce, - code_hash: account.code_hash, - }) - } -} - -/// Interface for fetching L1 block info for a given block number -#[async_trait] -pub trait L1BlockInfoLookup: Send + Sync { - async fn fetch_l1_block_info(&self) -> RpcResult; -} - -/// Implementation of the `L1BlockInfoLookup` trait for the `RootProvider` -#[async_trait] -impl L1BlockInfoLookup for RootProvider { - async fn fetch_l1_block_info(&self) -> RpcResult { - let block = self - .get_block(BlockId::Number(BlockNumberOrTag::Latest)) - .full() - .await - .map_err(|e| { - warn!(message = "failed to fetch latest block", err = %e); - EthApiError::InternalEthError.into_rpc_err() - })? - .ok_or_else(|| { - warn!(message = "empty latest block returned"); - EthApiError::InternalEthError.into_rpc_err() - })?; - - let txs = block.transactions.clone(); - let first_tx = txs.first_transaction().ok_or_else(|| { - warn!(message = "block contains no transactions"); - EthApiError::InternalEthError.into_rpc_err() - })?; - - Ok(extract_l1_info_from_tx(&first_tx.clone()).map_err(|e| { - warn!(message = "failed to extract l1_info from tx", err = %e); - EthApiError::InternalEthError.into_rpc_err() - })?) - } -} - /// Helper function to validate a transaction. A valid transaction must satisfy the following criteria: /// - If the transaction is not EIP-4844 /// - If the transaction is not a cross chain tx @@ -164,6 +101,7 @@ mod tests { use alloy_consensus::SignableTransaction; use alloy_consensus::{Transaction, constants::KECCAK_EMPTY, transaction::SignerRecoverable}; use alloy_consensus::{TxEip1559, TxEip4844, TxEip7702}; + use alloy_primitives::Address; use alloy_primitives::{bytes, keccak256}; use alloy_signer_local::PrivateKeySigner; use op_alloy_consensus::OpTxEnvelope; From f36d78a1dba0a406a22b41fcbea7eb0dca141eca Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 10:47:29 -0400 Subject: [PATCH 08/13] fix cli parse args --- crates/ingress-rpc/src/main.rs | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 64de1f34..1d6391c9 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -131,14 +131,13 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - let rollup_args = RollupArgs { - disable_txpool_gossip: true, - ..Default::default() - }; - Cli::::parse() + Cli::::parse() .run(|builder, _| async move { let exex_handle = builder - .node(OpNode::new(rollup_args)) + .node(OpNode::new(RollupArgs { + disable_txpool_gossip: true, + ..Default::default() + })) .install_exex("tips-rpc-exex", move |ctx| async move { Ok(RpcExEx::new(ctx, tx_receiver).run()) }) From fcb903b2feae8ccaedf00a5fdfd657fe1f8b3bda Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 11:26:34 -0400 Subject: [PATCH 09/13] run exex --- crates/ingress-rpc/src/main.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 1d6391c9..094c4e9b 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -131,7 +131,8 @@ async fn main() -> anyhow::Result<()> { address = %addr ); - Cli::::parse() + let args = vec!["tips-ingress-rpc", "node"]; + Cli::::try_parse_from(args)? .run(|builder, _| async move { let exex_handle = builder .node(OpNode::new(RollupArgs { From 748aadefcd7296abe3f8713b165796878118b78a Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 12:42:37 -0400 Subject: [PATCH 10/13] make main non-async, move everything in cli runtime --- crates/ingress-rpc/src/main.rs | 65 ++++++++++++++++------------------ 1 file changed, 30 insertions(+), 35 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 094c4e9b..179d5a87 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -6,7 +6,6 @@ use rdkafka::ClientConfig; use rdkafka::producer::FutureProducer; use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser}; use reth_optimism_node::OpNode; -use reth_optimism_node::args::RollupArgs; use std::fs; use std::net::IpAddr; use tips_common::ValidationData; @@ -64,8 +63,7 @@ struct Config { send_transaction_default_lifetime_seconds: u64, } -#[tokio::main] -async fn main() -> anyhow::Result<()> { +fn main() -> anyhow::Result<()> { dotenvy::dotenv().ok(); let config = Config::parse(); @@ -99,46 +97,43 @@ async fn main() -> anyhow::Result<()> { mempool_url = %config.mempool_url ); - let provider: RootProvider = ProviderBuilder::new() - .disable_recommended_fillers() - .network::() - .connect_http(config.mempool_url); - - let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?; + let args = vec!["tips-ingress-rpc", "node", "--chain", "base-sepolia", "-v"]; + Cli::::try_parse_from(args)? + .run(|builder, _| async move { + let provider: RootProvider = ProviderBuilder::new() + .disable_recommended_fillers() + .network::() + .connect_http(config.mempool_url.clone()); - let queue_producer: FutureProducer = client_config.create()?; + let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties) + .expect("Failed to load kafka config"); - let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic); + let queue_producer: FutureProducer = client_config.create()?; - // Create mpsc channel for communication between service and exex to forward txs to validate - let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); + let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic.clone()); - let service = IngressService::new( - provider, - config.dual_write_mempool, - queue, - config.send_transaction_default_lifetime_seconds, - tx_sender, - ); - let bind_addr = format!("{}:{}", config.address, config.port); + // Create mpsc channel for communication between service and exex to forward txs to validate + let (tx_sender, tx_receiver) = mpsc::unbounded_channel::(); - let server = Server::builder().build(&bind_addr).await?; - let addr = server.local_addr()?; - let server_handle = server.start(service.into_rpc()); + let service = IngressService::new( + provider, + config.dual_write_mempool, + queue, + config.send_transaction_default_lifetime_seconds, + tx_sender, + ); + let bind_addr = format!("{}:{}", config.address, config.port); - info!( - message = "Ingress RPC server started", - address = %addr - ); + let server = Server::builder().build(&bind_addr).await?; + let addr = server.local_addr()?; + let server_handle = server.start(service.into_rpc()); - let args = vec!["tips-ingress-rpc", "node"]; - Cli::::try_parse_from(args)? - .run(|builder, _| async move { + info!( + message = "Ingress RPC server started", + address = %addr + ); let exex_handle = builder - .node(OpNode::new(RollupArgs { - disable_txpool_gossip: true, - ..Default::default() - })) + .node(OpNode::default()) .install_exex("tips-rpc-exex", move |ctx| async move { Ok(RpcExEx::new(ctx, tx_receiver).run()) }) From e30937db97dbc6eba76d7f39cc3784d0e8fb8799 Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 16:19:39 -0400 Subject: [PATCH 11/13] add chain --- crates/ingress-rpc/src/main.rs | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 179d5a87..4f2e6d6a 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -54,6 +54,10 @@ struct Config { #[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")] log_level: String, + /// Chain to connect to + #[arg(long, env = "TIPS_INGRESS_CHAIN", default_value = "base-sepolia")] + chain: String, + /// Default lifetime for sent transactions in seconds (default: 3 hours) #[arg( long, @@ -97,7 +101,7 @@ fn main() -> anyhow::Result<()> { mempool_url = %config.mempool_url ); - let args = vec!["tips-ingress-rpc", "node", "--chain", "base-sepolia", "-v"]; + let args =vec!["tips-ingress-rpc", "node", "--chain", &config.chain, "-v"]; Cli::::try_parse_from(args)? .run(|builder, _| async move { let provider: RootProvider = ProviderBuilder::new() From e88f104f2186c97160e3357fb44b56dda2602c4d Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 17:06:22 -0400 Subject: [PATCH 12/13] better error dont panic --- crates/ingress-rpc/src/main.rs | 10 ++++++++-- crates/rpc-exex/src/lib.rs | 5 +++-- 2 files changed, 11 insertions(+), 4 deletions(-) diff --git a/crates/ingress-rpc/src/main.rs b/crates/ingress-rpc/src/main.rs index 4f2e6d6a..5d1570a6 100644 --- a/crates/ingress-rpc/src/main.rs +++ b/crates/ingress-rpc/src/main.rs @@ -54,7 +54,7 @@ struct Config { #[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")] log_level: String, - /// Chain to connect to + /// Chain to connect to #[arg(long, env = "TIPS_INGRESS_CHAIN", default_value = "base-sepolia")] chain: String, @@ -101,7 +101,13 @@ fn main() -> anyhow::Result<()> { mempool_url = %config.mempool_url ); - let args =vec!["tips-ingress-rpc", "node", "--chain", &config.chain, "-v"]; + let args = vec![ + env!("CARGO_BIN_NAME"), + "node", + "--chain", + &config.chain, + "-v", + ]; Cli::::try_parse_from(args)? .run(|builder, _| async move { let provider: RootProvider = ProviderBuilder::new() diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs index 0c99e99c..702593a6 100644 --- a/crates/rpc-exex/src/lib.rs +++ b/crates/rpc-exex/src/lib.rs @@ -71,7 +71,7 @@ where let block = self.ctx .provider() .block_with_senders_by_id(BlockId::Number(BlockNumberOrTag::Latest), TransactionVariant::WithHash)? - .expect("latest block not found"); + .ok_or_else(|| eyre::eyre!("latest block not found"))?; self.validate_tx(&block, validation_data.address, &validation_data.tx, &validation_data.data).await? } } @@ -98,12 +98,13 @@ where where B: Block, { + // TODO: this errors on empty blocks, need to figure out how to handle this let l1_info = extract_l1_info_from_tx( block .body() .transactions() .first() - .expect("block contains no transactions"), + .ok_or_else(|| eyre::eyre!("block contains no transactions"))?, )?; Ok(l1_info) } From 4f187358b48a32337d5f2f40b50ddf4fe1c2d10c Mon Sep 17 00:00:00 2001 From: William Law Date: Wed, 22 Oct 2025 17:49:52 -0400 Subject: [PATCH 13/13] handle err on invalid txs --- Cargo.lock | 2 ++ crates/common/Cargo.toml | 2 ++ crates/common/src/lib.rs | 5 ++++- crates/ingress-rpc/src/service.rs | 22 +++++++++++++++++++++- crates/rpc-exex/src/lib.rs | 29 +++++++++-------------------- 5 files changed, 38 insertions(+), 22 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f8c87e7a..a30d7070 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -12497,7 +12497,9 @@ version = "0.1.0" dependencies = [ "alloy-consensus", "alloy-primitives", + "jsonrpsee 0.26.0", "op-alloy-consensus", + "tokio", ] [[package]] diff --git a/crates/common/Cargo.toml b/crates/common/Cargo.toml index acb64245..99479b18 100644 --- a/crates/common/Cargo.toml +++ b/crates/common/Cargo.toml @@ -11,3 +11,5 @@ repository.workspace = true op-alloy-consensus.workspace = true alloy-primitives.workspace = true alloy-consensus.workspace = true +tokio.workspace = true +jsonrpsee.workspace = true diff --git a/crates/common/src/lib.rs b/crates/common/src/lib.rs index ec3f279c..ee8567f6 100644 --- a/crates/common/src/lib.rs +++ b/crates/common/src/lib.rs @@ -1,10 +1,13 @@ use alloy_consensus::transaction::Recovered; use alloy_primitives::{Address, Bytes}; +use jsonrpsee::core::RpcResult; use op_alloy_consensus::OpTxEnvelope; +use tokio::sync::oneshot; -#[derive(Debug, Clone)] +#[derive(Debug)] pub struct ValidationData { pub address: Address, pub tx: Recovered, pub data: Bytes, + pub response_tx: oneshot::Sender>, } diff --git a/crates/ingress-rpc/src/service.rs b/crates/ingress-rpc/src/service.rs index 826319e8..14636cc7 100644 --- a/crates/ingress-rpc/src/service.rs +++ b/crates/ingress-rpc/src/service.rs @@ -11,7 +11,7 @@ use op_alloy_network::Optimism; use reth_rpc_eth_types::EthApiError; use std::time::{SystemTime, UNIX_EPOCH}; use tips_common::ValidationData; -use tokio::sync::mpsc; +use tokio::sync::{mpsc, oneshot}; use tracing::{info, warn}; use crate::queue::QueuePublisher; @@ -95,13 +95,33 @@ where // TODO: in the endgame version, this would push to a Redis cluster which the "Store" ExEx // would read from. Instead, we are implementing v1 of this by having the ingress-rpc // forward to the rpc-exex. + let (response_tx, response_rx) = oneshot::channel(); let validation_data = ValidationData { address: transaction.signer(), tx: transaction.clone(), data: data.clone(), + response_tx, }; + if let Err(e) = self.tx_sender.send(validation_data) { warn!(message = "Failed to send transaction to ExEx", error = %e); + // TODO: error on here? + } + + // Wait for validation result from ExEx + match response_rx.await { + Ok(Ok(())) => { + // Validation successful, continue processing + } + Ok(Err(validation_error)) => { + // Validation failed, return the error + return Err(validation_error); + } + Err(_) => { + // Channel was dropped, ExEx is not responding + warn!(message = "ExEx validation channel dropped"); + // TODO: error on here? + } } let expiry_timestamp = SystemTime::now() diff --git a/crates/rpc-exex/src/lib.rs b/crates/rpc-exex/src/lib.rs index 702593a6..72ac3bd4 100644 --- a/crates/rpc-exex/src/lib.rs +++ b/crates/rpc-exex/src/lib.rs @@ -1,10 +1,8 @@ use alloy_consensus::constants::KECCAK_EMPTY; use alloy_consensus::private::alloy_eips::{BlockId, BlockNumberOrTag}; -use alloy_consensus::transaction::Recovered; use alloy_primitives::Address; use eyre::Result; use futures::StreamExt; -use op_alloy_consensus::OpTxEnvelope; use op_revm::l1block::L1BlockInfo; use reth::api::FullNodeComponents; use reth::providers::AccountReader; @@ -17,7 +15,7 @@ use reth_optimism_evm::extract_l1_info_from_tx; use reth_primitives::RecoveredBlock; use tips_common::ValidationData; use tokio::sync::mpsc; -use tracing::{debug, info}; +use tracing::{debug, info, warn}; mod validation; @@ -72,28 +70,19 @@ where .provider() .block_with_senders_by_id(BlockId::Number(BlockNumberOrTag::Latest), TransactionVariant::WithHash)? .ok_or_else(|| eyre::eyre!("latest block not found"))?; - self.validate_tx(&block, validation_data.address, &validation_data.tx, &validation_data.data).await? + + let mut l1_info = self.fetch_l1_block_info(&block)?; + let account = self.fetch_account_info(validation_data.address)?; + let res = validation::validate_tx(account, &validation_data.tx, &validation_data.data, &mut l1_info).await; + + if validation_data.response_tx.send(res).is_err() { + warn!(target = "tips-rpc-exex", "Failed to send validation response - receiver dropped"); + } } } } } - async fn validate_tx( - &mut self, - block: &RecoveredBlock, - address: Address, - tx: &Recovered, - data: &[u8], - ) -> Result<()> - where - B: Block, - { - let mut l1_info = self.fetch_l1_block_info(block)?; - let account = self.fetch_account_info(address)?; - validation::validate_tx(account, tx, data, &mut l1_info).await?; - Ok(()) - } - fn fetch_l1_block_info(&mut self, block: &RecoveredBlock) -> Result where B: Block,