Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 4 additions & 1 deletion crates/ingress/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,7 @@ op-alloy-consensus.workspace = true
eyre.workspace = true
dotenvy.workspace = true
rdkafka.workspace = true
reth-rpc-eth-types.workspace = true
reth-rpc-eth-types.workspace = true
serde.workspace = true
serde_json.workspace = true
async-trait.workspace = true
26 changes: 24 additions & 2 deletions crates/ingress/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,9 @@ use tracing::{info, warn};
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
use url::Url;

mod queue;
mod service;
use queue::KafkaQueuePublisher;
use service::{IngressApiServer, IngressService};
use tips_datastore::PostgresDatastore;

Expand Down Expand Up @@ -49,6 +51,14 @@ struct Config {
)]
kafka_topic: String,

/// Kafka topic for queuing transactions before the DB Writer
#[arg(
long,
env = "TIPS_INGRESS_KAFKA_QUEUE_TOPIC",
default_value = "transaction-queue"
)]
queue_topic: String,

#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
log_level: String,
}
Expand Down Expand Up @@ -101,9 +111,21 @@ async fn main() -> anyhow::Result<()> {
.set("message.timeout.ms", "5000")
.create()?;

let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
let queue_producer: FutureProducer = ClientConfig::new()
.set("bootstrap.servers", &config.kafka_brokers)
.set("message.timeout.ms", "5000")
.create()?;

let service = IngressService::new(provider, bundle_store, config.dual_write_mempool, publisher);
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
let queue = KafkaQueuePublisher::new(queue_producer, config.queue_topic);

let service = IngressService::new(
provider,
bundle_store,
config.dual_write_mempool,
publisher,
queue,
);
let bind_addr = format!("{}:{}", config.address, config.port);

let server = Server::builder().build(&bind_addr).await?;
Expand Down
68 changes: 68 additions & 0 deletions crates/ingress/src/queue.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
use alloy_primitives::Address;
use alloy_rpc_types_mev::EthSendBundle;
use anyhow::{Error, Result};
use async_trait::async_trait;
use rdkafka::producer::{FutureProducer, FutureRecord};
use tracing::{error, info};

/// A queue to buffer transactions
#[async_trait]
pub trait QueuePublisher: Send + Sync {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()>;
}

/// A queue to buffer transactions
pub struct KafkaQueuePublisher {
producer: FutureProducer,
topic: String,
}

impl KafkaQueuePublisher {
pub fn new(producer: FutureProducer, topic: String) -> Self {
Self { producer, topic }
}

pub async fn enqueue_bundle(
&self,
bundle: &EthSendBundle,
sender: Address,
) -> Result<(), Error> {
Comment on lines +27 to +31
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add some retry logic with a backoff to this function? Maybe we can pull in this crate:

https://crates.io/crates/backon

let key = sender.to_string();
let payload = serde_json::to_vec(bundle)?;

let record = FutureRecord::to(&self.topic).key(&key).payload(&payload);

match self
.producer
.send(record, tokio::time::Duration::from_secs(5))
.await
{
Ok((partition, offset)) => {
info!(
sender = %sender,
partition = partition,
offset = offset,
topic = %self.topic,
"Successfully enqueued bundle"
);
Ok(())
}
Err((err, _)) => {
error!(
sender = %sender,
error = %err,
topic = %self.topic,
"Failed to enqueue bundle"
);
Err(anyhow::anyhow!("Failed to enqueue bundle: {}", err))
}
}
}
}

#[async_trait]
impl QueuePublisher for KafkaQueuePublisher {
async fn publish(&self, bundle: &EthSendBundle, sender: Address) -> Result<()> {
self.enqueue_bundle(bundle, sender).await
}
}
19 changes: 16 additions & 3 deletions crates/ingress/src/service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ use tips_audit::{MempoolEvent, MempoolEventPublisher};
use tips_datastore::BundleDatastore;
use tracing::{info, warn};

use crate::queue::QueuePublisher;

#[rpc(server, namespace = "eth")]
pub trait IngressApi {
/// `eth_sendBundle` can be used to send your bundles to the builder.
Expand All @@ -30,34 +32,38 @@ pub trait IngressApi {
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>;
}

pub struct IngressService<Store, Publisher> {
pub struct IngressService<Store, Publisher, Queue> {
provider: RootProvider<Optimism>,
datastore: Store,
dual_write_mempool: bool,
publisher: Publisher,
queue: Queue,
}

impl<Store, Publisher> IngressService<Store, Publisher> {
impl<Store, Publisher, Queue> IngressService<Store, Publisher, Queue> {
pub fn new(
provider: RootProvider<Optimism>,
datastore: Store,
dual_write_mempool: bool,
publisher: Publisher,
queue: Queue,
) -> Self {
Self {
provider,
datastore,
dual_write_mempool,
publisher,
queue,
}
}
}

#[async_trait]
impl<Store, Publisher> IngressApiServer for IngressService<Store, Publisher>
impl<Store, Publisher, Queue> IngressApiServer for IngressService<Store, Publisher, Queue>
where
Store: BundleDatastore + Sync + Send + 'static,
Publisher: MempoolEventPublisher + Sync + Send + 'static,
Queue: QueuePublisher + Sync + Send + 'static,
{
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {
warn!(
Expand Down Expand Up @@ -99,6 +105,13 @@ where
..Default::default()
};

// queue the bundle
let sender = transaction.signer();
if let Err(e) = self.queue.publish(&bundle, sender).await {
warn!(message = "Failed to publish Queue::enqueue_bundle", sender = %sender, error = %e);
}

// TODO: have DB Writer consume from the queue and move the insert_bundle logic there
let result = self
.datastore
.insert_bundle(bundle.clone())
Expand Down