Skip to content

Commit f21efc0

Browse files
authored
[ingress] introduce buffer queue (#8)
* spike queue * queue before datastore insert_bundle * remove unused dep * feedback * spike backoff logic * spike test
1 parent 015a5d9 commit f21efc0

File tree

8 files changed

+190
-6
lines changed

8 files changed

+190
-6
lines changed

.env.example

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ TIPS_INGRESS_DUAL_WRITE_MEMPOOL=false
77
TIPS_INGRESS_KAFKA_BROKERS=localhost:9092
88
TIPS_INGRESS_KAFKA_TOPIC=tips-audit
99
TIPS_INGRESS_LOG_LEVEL=info
10+
TIPS_INGRESS_KAFKA_QUEUE_TOPIC=tips-ingress
1011

1112
# Audit service configuration
1213
TIPS_AUDIT_KAFKA_BROKERS=localhost:9092

Cargo.lock

Lines changed: 27 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,6 @@ aws-config = "1.1.7"
5858
aws-sdk-s3 = "1.106.0"
5959
aws-credential-types = "1.1.7"
6060
bytes = { version = "1.8.0", features = ["serde"] }
61+
62+
# tips-ingress
63+
backon = "1.5.2"

crates/ingress/Cargo.toml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,4 +26,8 @@ op-alloy-consensus.workspace = true
2626
eyre.workspace = true
2727
dotenvy.workspace = true
2828
rdkafka.workspace = true
29-
reth-rpc-eth-types.workspace = true
29+
reth-rpc-eth-types.workspace = true
30+
serde.workspace = true
31+
serde_json.workspace = true
32+
async-trait.workspace = true
33+
backon.workspace = true

crates/ingress/src/main.rs

Lines changed: 24 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,9 @@ use tracing::{info, warn};
1010
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
1111
use url::Url;
1212

13+
mod queue;
1314
mod service;
15+
use queue::KafkaQueuePublisher;
1416
use service::{IngressApiServer, IngressService};
1517
use tips_datastore::PostgresDatastore;
1618

@@ -49,6 +51,14 @@ struct Config {
4951
)]
5052
kafka_topic: String,
5153

54+
/// Kafka topic for queuing transactions before the DB Writer
55+
#[arg(
56+
long,
57+
env = "TIPS_INGRESS_KAFKA_QUEUE_TOPIC",
58+
default_value = "tips-ingress"
59+
)]
60+
queue_topic: String,
61+
5262
#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
5363
log_level: String,
5464
}
@@ -101,9 +111,21 @@ async fn main() -> anyhow::Result<()> {
101111
.set("message.timeout.ms", "5000")
102112
.create()?;
103113

104-
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
114+
let queue_producer: FutureProducer = ClientConfig::new()
115+
.set("bootstrap.servers", &config.kafka_brokers)
116+
.set("message.timeout.ms", "5000")
117+
.create()?;
105118

106-
let service = IngressService::new(provider, bundle_store, config.dual_write_mempool, publisher);
119+
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
120+
let queue = KafkaQueuePublisher::new(queue_producer, config.queue_topic);
121+
122+
let service = IngressService::new(
123+
provider,
124+
bundle_store,
125+
config.dual_write_mempool,
126+
publisher,
127+
queue,
128+
);
107129
let bind_addr = format!("{}:{}", config.address, config.port);
108130

109131
let server = Server::builder().build(&bind_addr).await?;

crates/ingress/src/queue.rs

Lines changed: 113 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,113 @@
1+
use alloy_primitives::Address;
2+
use alloy_rpc_types_mev::EthSendBundle;
3+
use anyhow::{Error, Result};
4+
use async_trait::async_trait;
5+
use backon::{ExponentialBuilder, Retryable};
6+
use rdkafka::producer::{FutureProducer, FutureRecord};
7+
use tokio::time::Duration;
8+
use tracing::{error, info};
9+
10+
/// A queue to buffer transactions
11+
#[async_trait]
12+
pub trait QueuePublisher: Send + Sync {
13+
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
14+
}
15+
16+
/// A queue to buffer transactions
17+
pub struct KafkaQueuePublisher {
18+
producer: FutureProducer,
19+
topic: String,
20+
}
21+
22+
impl KafkaQueuePublisher {
23+
pub fn new(producer: FutureProducer, topic: String) -> Self {
24+
Self { producer, topic }
25+
}
26+
27+
pub async fn enqueue_bundle(
28+
&self,
29+
bundle: &EthSendBundle,
30+
sender: Address,
31+
) -> Result<(), Error> {
32+
let key = sender.to_string();
33+
let payload = serde_json::to_vec(bundle)?;
34+
35+
let enqueue = || async {
36+
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
37+
38+
match self.producer.send(record, Duration::from_secs(5)).await {
39+
Ok((partition, offset)) => {
40+
info!(
41+
sender = %sender,
42+
partition = partition,
43+
offset = offset,
44+
topic = %self.topic,
45+
"Successfully enqueued bundle"
46+
);
47+
Ok(())
48+
}
49+
Err((err, _)) => {
50+
error!(
51+
sender = %sender,
52+
error = %err,
53+
topic = %self.topic,
54+
"Failed to enqueue bundle"
55+
);
56+
Err(anyhow::anyhow!("Failed to enqueue bundle: {}", err))
57+
}
58+
}
59+
};
60+
61+
enqueue
62+
.retry(
63+
&ExponentialBuilder::default()
64+
.with_min_delay(Duration::from_millis(100))
65+
.with_max_delay(Duration::from_secs(5))
66+
.with_max_times(3),
67+
)
68+
.notify(|err: &anyhow::Error, dur: Duration| {
69+
info!("retrying to enqueue bundle {:?} after {:?}", err, dur);
70+
})
71+
.await
72+
}
73+
}
74+
75+
#[async_trait]
76+
impl QueuePublisher for KafkaQueuePublisher {
77+
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
78+
self.enqueue_bundle(bundle, sender).await
79+
}
80+
}
81+
82+
#[cfg(test)]
83+
mod tests {
84+
use super::*;
85+
use rdkafka::config::ClientConfig;
86+
use tokio::time::{Duration, Instant};
87+
88+
fn create_test_bundle() -> EthSendBundle {
89+
EthSendBundle::default()
90+
}
91+
92+
#[tokio::test]
93+
async fn test_backoff_retry_logic() {
94+
// use an invalid broker address to trigger the backoff logic
95+
let producer = ClientConfig::new()
96+
.set("bootstrap.servers", "localhost:9999")
97+
.set("message.timeout.ms", "100")
98+
.create()
99+
.expect("Producer creation failed");
100+
101+
let publisher = KafkaQueuePublisher::new(producer, "tips-ingress".to_string());
102+
let bundle = create_test_bundle();
103+
let sender = Address::ZERO;
104+
105+
let start = Instant::now();
106+
let result = publisher.enqueue_bundle(&bundle, sender).await;
107+
let elapsed = start.elapsed();
108+
109+
// the backoff tries at minimum 100ms, so verify we tried at least once
110+
assert!(result.is_err());
111+
assert!(elapsed >= Duration::from_millis(100));
112+
}
113+
}

crates/ingress/src/service.rs

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,8 @@ use tips_audit::{MempoolEvent, MempoolEventPublisher};
1515
use tips_datastore::BundleDatastore;
1616
use tracing::{info, warn};
1717

18+
use crate::queue::QueuePublisher;
19+
1820
#[rpc(server, namespace = "eth")]
1921
pub trait IngressApi {
2022
/// `eth_sendBundle` can be used to send your bundles to the builder.
@@ -30,34 +32,38 @@ pub trait IngressApi {
3032
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>;
3133
}
3234

33-
pub struct IngressService<Store, Publisher> {
35+
pub struct IngressService<Store, Publisher, Queue> {
3436
provider: RootProvider<Optimism>,
3537
datastore: Store,
3638
dual_write_mempool: bool,
3739
publisher: Publisher,
40+
queue: Queue,
3841
}
3942

40-
impl<Store, Publisher> IngressService<Store, Publisher> {
43+
impl<Store, Publisher, Queue> IngressService<Store, Publisher, Queue> {
4144
pub fn new(
4245
provider: RootProvider<Optimism>,
4346
datastore: Store,
4447
dual_write_mempool: bool,
4548
publisher: Publisher,
49+
queue: Queue,
4650
) -> Self {
4751
Self {
4852
provider,
4953
datastore,
5054
dual_write_mempool,
5155
publisher,
56+
queue,
5257
}
5358
}
5459
}
5560

5661
#[async_trait]
57-
impl<Store, Publisher> IngressApiServer for IngressService<Store, Publisher>
62+
impl<Store, Publisher, Queue> IngressApiServer for IngressService<Store, Publisher, Queue>
5863
where
5964
Store: BundleDatastore + Sync + Send + 'static,
6065
Publisher: MempoolEventPublisher + Sync + Send + 'static,
66+
Queue: QueuePublisher + Sync + Send + 'static,
6167
{
6268
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
6369
warn!(
@@ -99,6 +105,13 @@ where
99105
..Default::default()
100106
};
101107

108+
// queue the bundle
109+
let sender = transaction.signer();
110+
if let Err(e) = self.queue.publish(&bundle, sender).await {
111+
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
112+
}
113+
114+
// TODO: have DB Writer consume from the queue and move the insert_bundle logic there
102115
let result = self
103116
.datastore
104117
.insert_bundle(bundle.clone())

docker-compose.yml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,7 @@ services:
5555
command: |
5656
sh -c "
5757
kafka-topics --create --if-not-exists --topic tips-audit --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
58+
kafka-topics --create --if-not-exists --topic tips-ingress --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
5859
kafka-topics --list --bootstrap-server kafka:29092
5960
"
6061

0 commit comments

Comments
 (0)