Skip to content

Commit 884e5c6

Browse files
committed
combine publish/enqueue fns
1 parent 0771f7b commit 884e5c6

File tree

1 file changed

+6
-14
lines changed

1 file changed

+6
-14
lines changed

crates/ingress-rpc/src/queue.rs

Lines changed: 6 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
use alloy_primitives::B256;
22
use alloy_rpc_types_mev::EthSendBundle;
3-
use anyhow::{Error, Result};
3+
use anyhow::Result;
44
use async_trait::async_trait;
55
use backon::{ExponentialBuilder, Retryable};
66
use rdkafka::producer::{FutureProducer, FutureRecord};
@@ -23,12 +23,11 @@ impl KafkaQueuePublisher {
2323
pub fn new(producer: FutureProducer, topic: String) -> Self {
2424
Self { producer, topic }
2525
}
26+
}
2627

27-
pub async fn enqueue_bundle(
28-
&self,
29-
bundle: &EthSendBundle,
30-
bundle_hash: &B256,
31-
) -> Result<(), Error> {
28+
#[async_trait]
29+
impl QueuePublisher for KafkaQueuePublisher {
30+
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> {
3231
let key = bundle_hash.to_string();
3332
let payload = serde_json::to_vec(bundle)?;
3433

@@ -72,13 +71,6 @@ impl KafkaQueuePublisher {
7271
}
7372
}
7473

75-
#[async_trait]
76-
impl QueuePublisher for KafkaQueuePublisher {
77-
async fn publish(&self, bundle: &EthSendBundle, bundle_hash: &B256) -> Result<()> {
78-
self.enqueue_bundle(bundle, bundle_hash).await
79-
}
80-
}
81-
8274
#[cfg(test)]
8375
mod tests {
8476
use super::*;
@@ -103,7 +95,7 @@ mod tests {
10395
let bundle_hash = bundle.bundle_hash();
10496

10597
let start = Instant::now();
106-
let result = publisher.enqueue_bundle(&bundle, &bundle_hash).await;
98+
let result = publisher.publish(&bundle, &bundle_hash).await;
10799
let elapsed = start.elapsed();
108100

109101
// the backoff tries at minimum 100ms, so verify we tried at least once

0 commit comments

Comments
 (0)