Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ TIPS_INGRESS_DUAL_WRITE_MEMPOOL=false
TIPS_INGRESS_KAFKA_BROKERS=localhost:9092
TIPS_INGRESS_KAFKA_TOPIC=tips-audit
TIPS_INGRESS_LOG_LEVEL=info
TIPS_INGRESS_KAFKA_QUEUE_TOPIC=tips-ingress

# Audit service configuration
TIPS_AUDIT_KAFKA_BROKERS=localhost:9092
Expand Down
27 changes: 27 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,6 @@ aws-config = "1.1.7"
aws-sdk-s3 = "1.106.0"
aws-credential-types = "1.1.7"
bytes = { version = "1.8.0", features = ["serde"] }

# tips-ingress
backon = "1.5.2"
6 changes: 5 additions & 1 deletion crates/ingress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,8 @@ op-alloy-consensus.workspace = true
eyre.workspace = true
dotenvy.workspace = true
rdkafka.workspace = true
reth-rpc-eth-types.workspace = true
reth-rpc-eth-types.workspace = true
serde.workspace = true
serde_json.workspace = true
async-trait.workspace = true
backon.workspace = true
26 changes: 24 additions & 2 deletions crates/ingress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

mod queue;
mod service;
use queue::KafkaQueuePublisher;
use service::{IngressApiServer, IngressService};
use tips_datastore::PostgresDatastore;

Expand Down Expand Up @@ -49,6 +51,14 @@ struct Config {
)]
kafka_topic: String,

/// Kafka topic for queuing transactions before the DB Writer
#[arg(
long,
env = "TIPS_INGRESS_KAFKA_QUEUE_TOPIC",
default_value = "tips-ingress"
)]
queue_topic: String,

#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
log_level: String,
}
Expand Down Expand Up @@ -101,9 +111,21 @@ async fn main() -> anyhow::Result<()> {
.set("message.timeout.ms", "5000")
.create()?;

let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
let queue_producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &config.kafka_brokers)
.set("message.timeout.ms", "5000")
.create()?;

let service = IngressService::new(provider, bundle_store, config.dual_write_mempool, publisher);
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
let queue = KafkaQueuePublisher::new(queue_producer, config.queue_topic);

let service = IngressService::new(
provider,
bundle_store,
config.dual_write_mempool,
publisher,
queue,
);
let bind_addr = format!("{}:{}", config.address, config.port);

let server = Server::builder().build(&bind_addr).await?;
Expand Down
113 changes: 113 additions & 0 deletions crates/ingress/src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
use alloy_primitives::Address;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::{Error, Result};
use async_trait::async_trait;
use backon::{ExponentialBuilder, Retryable};
use rdkafka::producer::{FutureProducer, FutureRecord};
use tokio::time::Duration;
use tracing::{error, info};

/// A queue to buffer transactions
#[async_trait]
pub trait QueuePublisher: Send + Sync {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
}

/// A queue to buffer transactions
pub struct KafkaQueuePublisher {
producer: FutureProducer,
topic: String,
}

impl KafkaQueuePublisher {
pub fn new(producer: FutureProducer, topic: String) -> Self {
Self { producer, topic }
}

pub async fn enqueue_bundle(
&self,
bundle: &EthSendBundle,
sender: Address,
) -> Result<(), Error> {
Comment on lines +27 to +31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some retry logic with a backoff to this function? Maybe we can pull in this crate:

https://crates.io/crates/backon

let key = sender.to_string();
let payload = serde_json::to_vec(bundle)?;

let enqueue = || async {
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);

match self.producer.send(record, Duration::from_secs(5)).await {
Ok((partition, offset)) => {
info!(
sender = %sender,
partition = partition,
offset = offset,
topic = %self.topic,
"Successfully enqueued bundle"
);
Ok(())
}
Err((err, _)) => {
error!(
sender = %sender,
error = %err,
topic = %self.topic,
"Failed to enqueue bundle"
);
Err(anyhow::anyhow!("Failed to enqueue bundle: {}", err))
}
}
};

enqueue
.retry(
&ExponentialBuilder::default()
.with_min_delay(Duration::from_millis(100))
.with_max_delay(Duration::from_secs(5))
.with_max_times(3),
Comment on lines +64 to +66
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

we could tune these params later?

)
.notify(|err: &anyhow::Error, dur: Duration| {
info!("retrying to enqueue bundle {:?} after {:?}", err, dur);
})
.await
}
}

#[async_trait]
impl QueuePublisher for KafkaQueuePublisher {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
self.enqueue_bundle(bundle, sender).await
}
}

#[cfg(test)]
mod tests {
use super::*;
use rdkafka::config::ClientConfig;
use tokio::time::{Duration, Instant};

fn create_test_bundle() -> EthSendBundle {
EthSendBundle::default()
}

#[tokio::test]
async fn test_backoff_retry_logic() {
// use an invalid broker address to trigger the backoff logic
let producer = ClientConfig::new()
.set("bootstrap.servers", "localhost:9999")
.set("message.timeout.ms", "100")
.create()
.expect("Producer creation failed");

let publisher = KafkaQueuePublisher::new(producer, "tips-ingress".to_string());
let bundle = create_test_bundle();
let sender = Address::ZERO;

let start = Instant::now();
let result = publisher.enqueue_bundle(&bundle, sender).await;
let elapsed = start.elapsed();

// the backoff tries at minimum 100ms, so verify we tried at least once
assert!(result.is_err());
assert!(elapsed >= Duration::from_millis(100));
}
}
19 changes: 16 additions & 3 deletions crates/ingress/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tips_audit::{MempoolEvent, MempoolEventPublisher};
use tips_datastore::BundleDatastore;
use tracing::{info, warn};

use crate::queue::QueuePublisher;

#[rpc(server, namespace = "eth")]
pub trait IngressApi {
/// `eth_sendBundle` can be used to send your bundles to the builder.
Expand All @@ -30,34 +32,38 @@ pub trait IngressApi {
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>;
}

pub struct IngressService<Store, Publisher> {
pub struct IngressService<Store, Publisher, Queue> {
provider: RootProvider<Optimism>,
datastore: Store,
dual_write_mempool: bool,
publisher: Publisher,
queue: Queue,
}

impl<Store, Publisher> IngressService<Store, Publisher> {
impl<Store, Publisher, Queue> IngressService<Store, Publisher, Queue> {
pub fn new(
provider: RootProvider<Optimism>,
datastore: Store,
dual_write_mempool: bool,
publisher: Publisher,
queue: Queue,
) -> Self {
Self {
provider,
datastore,
dual_write_mempool,
publisher,
queue,
}
}
}

#[async_trait]
impl<Store, Publisher> IngressApiServer for IngressService<Store, Publisher>
impl<Store, Publisher, Queue> IngressApiServer for IngressService<Store, Publisher, Queue>
where
Store: BundleDatastore + Sync + Send + 'static,
Publisher: MempoolEventPublisher + Sync + Send + 'static,
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
warn!(
Expand Down Expand Up @@ -99,6 +105,13 @@ where
..Default::default()
};

// queue the bundle
let sender = transaction.signer();
if let Err(e) = self.queue.publish(&bundle, sender).await {
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
}

// TODO: have DB Writer consume from the queue and move the insert_bundle logic there
let result = self
.datastore
.insert_bundle(bundle.clone())
Expand Down
1 change: 1 addition & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ services:
command: |
sh -c "
kafka-topics --create --if-not-exists --topic tips-audit --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
kafka-topics --create --if-not-exists --topic tips-ingress --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should we add a retention policy (time or size based)?

kafka-topics --list --bootstrap-server kafka:29092
"

Expand Down