Skip to content

Commit 288f53c

Browse files
committed
spike backoff logic
1 parent d1dd6a6 commit 288f53c

File tree

4 files changed

+66
-26
lines changed

4 files changed

+66
-26
lines changed

Cargo.lock

Lines changed: 24 additions & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,3 +58,6 @@ aws-config = "1.1.7"
5858
aws-sdk-s3 = "1.106.0"
5959
aws-credential-types = "1.1.7"
6060
bytes = { version = "1.8.0", features = ["serde"] }
61+
62+
# tips-ingress
63+
backon = "1.5.2"

crates/ingress/Cargo.toml

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,4 +29,5 @@ rdkafka.workspace = true
2929
reth-rpc-eth-types.workspace = true
3030
serde.workspace = true
3131
serde_json.workspace = true
32-
async-trait.workspace = true
32+
async-trait.workspace = true
33+
backon.workspace = true

crates/ingress/src/queue.rs

Lines changed: 37 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,9 @@ use alloy_primitives::Address;
22
use alloy_rpc_types_mev::EthSendBundle;
33
use anyhow::{Error, Result};
44
use async_trait::async_trait;
5+
use backon::{ExponentialBuilder, Retryable};
56
use rdkafka::producer::{FutureProducer, FutureRecord};
7+
use tokio::time::Duration;
68
use tracing::{error, info};
79

810
/// A queue to buffer transactions
@@ -30,33 +32,43 @@ impl KafkaQueuePublisher {
3032
let key = sender.to_string();
3133
let payload = serde_json::to_vec(bundle)?;
3234

33-
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
35+
let enqueue = || async {
36+
let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);
3437

35-
match self
36-
.producer
37-
.send(record, tokio::time::Duration::from_secs(5))
38-
.await
39-
{
40-
Ok((partition, offset)) => {
41-
info!(
42-
sender = %sender,
43-
partition = partition,
44-
offset = offset,
45-
topic = %self.topic,
46-
"Successfully enqueued bundle"
47-
);
48-
Ok(())
49-
}
50-
Err((err, _)) => {
51-
error!(
52-
sender = %sender,
53-
error = %err,
54-
topic = %self.topic,
55-
"Failed to enqueue bundle"
56-
);
57-
Err(anyhow::anyhow!("Failed to enqueue bundle: {}", err))
38+
match self.producer.send(record, Duration::from_secs(5)).await {
39+
Ok((partition, offset)) => {
40+
info!(
41+
sender = %sender,
42+
partition = partition,
43+
offset = offset,
44+
topic = %self.topic,
45+
"Successfully enqueued bundle"
46+
);
47+
Ok(())
48+
}
49+
Err((err, _)) => {
50+
error!(
51+
sender = %sender,
52+
error = %err,
53+
topic = %self.topic,
54+
"Failed to enqueue bundle"
55+
);
56+
Err(anyhow::anyhow!("Failed to enqueue bundle: {}", err))
57+
}
5858
}
59-
}
59+
};
60+
61+
enqueue
62+
.retry(
63+
&ExponentialBuilder::default()
64+
.with_min_delay(Duration::from_millis(100))
65+
.with_max_delay(Duration::from_secs(5))
66+
.with_max_times(3),
67+
)
68+
.notify(|err: &anyhow::Error, dur: Duration| {
69+
info!("retrying to enqueue bundle {:?} after {:?}", err, dur);
70+
})
71+
.await
6072
}
6173
}
6274

0 commit comments

Comments
 (0)