Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 47 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

12 changes: 11 additions & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,20 +7,27 @@ homepage = "https://github.com/base/tips"
repository = "https://github.com/base/tips"

[workspace]
members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer"]
members = ["crates/datastore", "crates/audit", "crates/ingress-rpc", "crates/maintenance", "crates/ingress-writer", "crates/rpc-exex", "crates/common"]
resolver = "2"

[workspace.dependencies]
tips-datastore = { path = "crates/datastore" }
tips-audit = { path = "crates/audit" }
tips-maintenance = { path = "crates/maintenance" }
tips-ingress-writer = { path = "crates/ingress-writer" }
tips-rpc-exex = { path = "crates/rpc-exex" }
tips-common = { path = "crates/common" }

# Reth
reth = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-rpc-eth-types = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-optimism-evm = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
base-reth-flashblocks-rpc = { git = "https://github.com/base/node-reth", rev = "a1ae148a36354c88b356f80281fef12dad9f7737" }
reth-exex = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-optimism-node = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-optimism-cli = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-primitives = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }
reth-node-api = { git = "https://github.com/paradigmxyz/reth", tag = "v1.8.1" }

# alloy
alloy-primitives = { version = "1.3.1", default-features = false, features = [
Expand Down Expand Up @@ -72,3 +79,6 @@ backon = "1.5.2"
op-revm = { version = "10.1.0", default-features = false }
revm-context-interface = "10.2.0"
alloy-signer-local = "1.0.36"

# tips-exex
futures = "0.3"
15 changes: 15 additions & 0 deletions crates/common/Cargo.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
[package]
name = "tips-common"
version.workspace = true
edition.workspace = true
rust-version.workspace = true
license.workspace = true
homepage.workspace = true
repository.workspace = true

[dependencies]
op-alloy-consensus.workspace = true
alloy-primitives.workspace = true
alloy-consensus.workspace = true
tokio.workspace = true
jsonrpsee.workspace = true
13 changes: 13 additions & 0 deletions crates/common/src/lib.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
use alloy_consensus::transaction::Recovered;
use alloy_primitives::{Address, Bytes};
use jsonrpsee::core::RpcResult;
use op_alloy_consensus::OpTxEnvelope;
use tokio::sync::oneshot;

#[derive(Debug)]
pub struct ValidationData {
pub address: Address,
pub tx: Recovered<OpTxEnvelope>,
pub data: Bytes,
pub response_tx: oneshot::Sender<RpcResult<()>>,
}
5 changes: 5 additions & 0 deletions crates/ingress-rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,3 +35,8 @@ op-revm.workspace = true
revm-context-interface.workspace = true
alloy-signer-local.workspace = true
reth-optimism-evm.workspace = true
tips-rpc-exex.workspace = true
reth-optimism-cli.workspace = true
reth-optimism-node.workspace = true
op-alloy-rpc-types.workspace = true
tips-common.workspace = true
97 changes: 68 additions & 29 deletions crates/ingress-rpc/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,19 @@ use jsonrpsee::server::Server;
use op_alloy_network::Optimism;
use rdkafka::ClientConfig;
use rdkafka::producer::FutureProducer;
use reth_optimism_cli::{Cli, chainspec::OpChainSpecParser};
use reth_optimism_node::OpNode;
use std::fs;
use std::net::IpAddr;
use tips_common::ValidationData;
use tips_rpc_exex::RpcExEx;
use tokio::sync::mpsc;
use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

mod queue;
mod service;
mod validation;
use queue::KafkaQueuePublisher;
use service::{IngressApiServer, IngressService};

Expand Down Expand Up @@ -50,6 +54,10 @@ struct Config {
#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
log_level: String,

/// Chain to connect to
#[arg(long, env = "TIPS_INGRESS_CHAIN", default_value = "base-sepolia")]
chain: String,

/// Default lifetime for sent transactions in seconds (default: 3 hours)
#[arg(
long,
Expand All @@ -59,8 +67,7 @@ struct Config {
send_transaction_default_lifetime_seconds: u64,
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
fn main() -> anyhow::Result<()> {
dotenvy::dotenv().ok();

let config = Config::parse();
Expand Down Expand Up @@ -94,35 +101,67 @@ async fn main() -> anyhow::Result<()> {
mempool_url = %config.mempool_url
);

let provider: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.mempool_url);

let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?;

let queue_producer: FutureProducer = client_config.create()?;

let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic);

let service = IngressService::new(
provider,
config.dual_write_mempool,
queue,
config.send_transaction_default_lifetime_seconds,
);
let bind_addr = format!("{}:{}", config.address, config.port);
let args = vec![
env!("CARGO_BIN_NAME"),
"node",
"--chain",
&config.chain,
"-v",
];
Cli::<OpChainSpecParser, ()>::try_parse_from(args)?
.run(|builder, _| async move {
let provider: RootProvider<Optimism> = ProviderBuilder::new()
.disable_recommended_fillers()
.network::<Optimism>()
.connect_http(config.mempool_url.clone());

let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)
.expect("Failed to load kafka config");

let queue_producer: FutureProducer = client_config.create()?;

let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic.clone());

// Create mpsc channel for communication between service and exex to forward txs to validate
let (tx_sender, tx_receiver) = mpsc::unbounded_channel::<ValidationData>();

let service = IngressService::new(
provider,
config.dual_write_mempool,
queue,
config.send_transaction_default_lifetime_seconds,
tx_sender,
);
let bind_addr = format!("{}:{}", config.address, config.port);

let server = Server::builder().build(&bind_addr).await?;
let addr = server.local_addr()?;
let handle = server.start(service.into_rpc());
let server = Server::builder().build(&bind_addr).await?;
let addr = server.local_addr()?;
let server_handle = server.start(service.into_rpc());

info!(
message = "Ingress RPC server started",
address = %addr
);
info!(
message = "Ingress RPC server started",
address = %addr
);
let exex_handle = builder
.node(OpNode::default())
.install_exex("tips-rpc-exex", move |ctx| async move {
Ok(RpcExEx::new(ctx, tx_receiver).run())
})
.launch()
.await?;

tokio::select! {
_ = server_handle.stopped() => {
info!("Ingress RPC server stopped");
}
_ = exex_handle.wait_for_node_exit() => {
info!("RPC ExEx stopped");
}
}
Ok(())
})
.map_err(|e| anyhow::anyhow!(e))?;

handle.stopped().await;
Ok(())
}

Expand Down
44 changes: 37 additions & 7 deletions crates/ingress-rpc/src/service.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,3 @@
use crate::validation::{AccountInfoLookup, L1BlockInfoLookup, validate_tx};
use alloy_consensus::transaction::SignerRecoverable;
use alloy_primitives::{B256, Bytes};
use alloy_provider::{Provider, RootProvider, network::eip2718::Decodable2718};
Expand All @@ -11,6 +10,8 @@ use op_alloy_consensus::OpTxEnvelope;
use op_alloy_network::Optimism;
use reth_rpc_eth_types::EthApiError;
use std::time::{SystemTime, UNIX_EPOCH};
use tips_common::ValidationData;
use tokio::sync::{mpsc, oneshot};
use tracing::{info, warn};

use crate::queue::QueuePublisher;
Expand All @@ -35,6 +36,7 @@ pub struct IngressService<Queue> {
dual_write_mempool: bool,
queue: Queue,
send_transaction_default_lifetime_seconds: u64,
tx_sender: mpsc::UnboundedSender<ValidationData>,
}

impl<Queue> IngressService<Queue> {
Expand All @@ -43,12 +45,14 @@ impl<Queue> IngressService<Queue> {
dual_write_mempool: bool,
queue: Queue,
send_transaction_default_lifetime_seconds: u64,
tx_sender: mpsc::UnboundedSender<ValidationData>,
) -> Self {
Self {
provider,
dual_write_mempool,
queue,
send_transaction_default_lifetime_seconds,
tx_sender,
}
}
}
Expand Down Expand Up @@ -87,12 +91,38 @@ where
.try_into_recovered()
.map_err(|_| EthApiError::FailedToDecodeSignedTransaction.into_rpc_err())?;

let mut l1_block_info = self.provider.fetch_l1_block_info().await?;
let account = self
.provider
.fetch_account_info(transaction.signer())
.await?;
validate_tx(account, &transaction, &data, &mut l1_block_info).await?;
// Send transaction data to ExEx for validation
// TODO: in the endgame version, this would push to a Redis cluster which the "Store" ExEx
// would read from. Instead, we are implementing v1 of this by having the ingress-rpc
// forward to the rpc-exex.
let (response_tx, response_rx) = oneshot::channel();
let validation_data = ValidationData {
address: transaction.signer(),
tx: transaction.clone(),
data: data.clone(),
response_tx,
};

if let Err(e) = self.tx_sender.send(validation_data) {
warn!(message = "Failed to send transaction to ExEx", error = %e);
// TODO: error on here?
}

// Wait for validation result from ExEx
match response_rx.await {
Ok(Ok(())) => {
// Validation successful, continue processing
}
Ok(Err(validation_error)) => {
// Validation failed, return the error
return Err(validation_error);
}
Err(_) => {
// Channel was dropped, ExEx is not responding
warn!(message = "ExEx validation channel dropped");
// TODO: error on here?
}
}

let expiry_timestamp = SystemTime::now()
.duration_since(UNIX_EPOCH)
Expand Down
Loading