Skip to content

Commit 43311b3

Browse files
committed
use bundle hash as key
1 parent cd3d9cf commit 43311b3

File tree

2 files changed

+17
-23
lines changed

2 files changed

+17
-23
lines changed

crates/ingress-rpc/src/queue.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,4 @@
1-
use alloy_primitives::Address;
1+
use alloy_primitives::B256;
22
use alloy_rpc_types_mev::EthSendBundle;
33
use anyhow::{Error, Result};
44
use async_trait::async_trait;
@@ -10,7 +10,7 @@ use tracing::{error, info};
1010
/// A queue to buffer transactions
1111
#[async_trait]
1212
pub trait QueuePublisher: Send + Sync {
13-
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
13+
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()>;
1414
}
1515

1616
/// A queue to buffer transactions
@@ -27,9 +27,9 @@ impl KafkaQueuePublisher {
2727
pub async fn enqueue_bundle(
2828
&self,
2929
bundle: &EthSendBundle,
30-
sender: Address,
30+
bundle_hash: &B256,
3131
) -> Result<(), Error> {
32-
let key = sender.to_string();
32+
let key = bundle_hash.to_string();
3333
let payload = serde_json::to_vec(bundle)?;
3434

3535
let enqueue = || async {
@@ -38,7 +38,7 @@ impl KafkaQueuePublisher {
3838
match self.producer.send(record, Duration::from_secs(5)).await {
3939
Ok((partition, offset)) => {
4040
info!(
41-
sender = %sender,
41+
bundle_hash = %bundle_hash,
4242
partition = partition,
4343
offset = offset,
4444
topic = %self.topic,
@@ -48,7 +48,7 @@ impl KafkaQueuePublisher {
4848
}
4949
Err((err, _)) => {
5050
error!(
51-
sender = %sender,
51+
bundle_hash = %bundle_hash,
5252
error = %err,
5353
topic = %self.topic,
5454
"Failed to enqueue bundle"
@@ -74,8 +74,8 @@ impl KafkaQueuePublisher {
7474

7575
#[async_trait]
7676
impl QueuePublisher for KafkaQueuePublisher {
77-
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
78-
self.enqueue_bundle(bundle, sender).await
77+
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> {
78+
self.enqueue_bundle(bundle, bundle_hash).await
7979
}
8080
}
8181

@@ -100,10 +100,10 @@ mod tests {
100100

101101
let publisher = KafkaQueuePublisher::new(producer, "tips-ingress-rpc".to_string());
102102
let bundle = create_test_bundle();
103-
let sender = Address::ZERO;
103+
let bundle_hash = bundle.bundle_hash();
104104

105105
let start = Instant::now();
106-
let result = publisher.enqueue_bundle(&bundle, sender).await;
106+
let result = publisher.enqueue_bundle(&bundle, &bundle_hash).await;
107107
let elapsed = start.elapsed();
108108

109109
// the backoff tries at minimum 100ms, so verify we tried at least once

crates/ingress-rpc/src/service.rs

Lines changed: 7 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,6 @@ where
8686
}
8787

8888
// Decode and validate all transactions
89-
let mut decoded_txs = Vec::new();
9089
let mut total_gas = 0u64;
9190
for tx_data in &bundle.txs {
9291
if tx_data.is_empty() {
@@ -103,8 +102,6 @@ where
103102

104103
// Add gas limit to total
105104
total_gas = total_gas.saturating_add(transaction.gas_limit());
106-
107-
decoded_txs.push(transaction);
108105
}
109106

110107
// Check max gas limit for the entire bundle
@@ -115,21 +112,18 @@ where
115112
.into_rpc_err());
116113
}
117114

118-
// For now, we'll use the first transaction's signer as the bundle sender
119-
// In a real implementation, you might want different logic here
120-
let sender = decoded_txs[0].signer();
121-
122115
// Queue the bundle
123-
if let Err(e) = self.queue.publish(&bundle, sender).await {
124-
warn!(message = "Failed to publish bundle to queue", sender = %sender, error = %e);
116+
let bundle_hash = bundle.bundle_hash();
117+
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
118+
warn!(message = "Failed to publish bundle to queue", bundle_hash = %bundle_hash, error = %e);
125119
return Err(EthApiError::InvalidParams("Failed to queue bundle".into()).into_rpc_err());
126120
}
127121

128122
info!(
129123
message = "queued bundle",
124+
bundle_hash = %bundle_hash,
130125
tx_count = bundle.txs.len(),
131126
total_gas = total_gas,
132-
sender = %sender
133127
);
134128

135129
Ok(EthBundleHash {
@@ -181,9 +175,9 @@ where
181175
};
182176

183177
// queue the bundle
184-
let sender = transaction.signer();
185-
if let Err(e) = self.queue.publish(&bundle, sender).await {
186-
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
178+
let bundle_hash = bundle.bundle_hash();
179+
if let Err(e) = self.queue.publish(&bundle, &bundle_hash).await {
180+
warn!(message = "Failed to publish Queue::enqueue_bundle", bundle_hash = %bundle_hash, error = %e);
187181
}
188182

189183
info!(message="queued singleton bundle", txn_hash=%transaction.tx_hash());

0 commit comments

Comments
 (0)