Skip to content

Commit ef58f74

Browse files
committed
move publisher to ingresswriter
1 parent d5ba259 commit ef58f74

File tree

6 files changed

+40
-29
lines changed

6 files changed

+40
-29
lines changed

Cargo.lock

Lines changed: 1 addition & 0 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

crates/ingress/src/main.rs

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,10 +3,8 @@ use clap::Parser;
33
use jsonrpsee::server::Server;
44
use op_alloy_network::Optimism;
55
use rdkafka::ClientConfig;
6-
use rdkafka::consumer::Consumer;
76
use rdkafka::producer::FutureProducer;
87
use std::net::IpAddr;
9-
use tips_audit::{KafkaMempoolEventPublisher, create_kafka_consumer};
108
use tracing::{info, warn};
119
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
1210
use url::Url;
@@ -99,20 +97,14 @@ async fn main() -> anyhow::Result<()> {
9997
.network::<Optimism>()
10098
.connect_http(config.mempool_url);
10199

102-
let kafka_producer: FutureProducer = ClientConfig::new()
103-
.set("bootstrap.servers", &config.kafka_brokers)
104-
.set("message.timeout.ms", "5000")
105-
.create()?;
106-
107100
let queue_producer: FutureProducer = ClientConfig::new()
108101
.set("bootstrap.servers", &config.kafka_brokers)
109102
.set("message.timeout.ms", "5000")
110103
.create()?;
111104

112-
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, config.kafka_topic);
113105
let queue = KafkaQueuePublisher::new(queue_producer, config.queue_topic.clone());
114106

115-
let service = IngressService::new(provider, config.dual_write_mempool, publisher, queue);
107+
let service = IngressService::new(provider, config.dual_write_mempool, queue);
116108
let bind_addr = format!("{}:{}", config.address, config.port);
117109

118110
let server = Server::builder().build(&bind_addr).await?;

crates/ingress/src/service.rs

Lines changed: 4 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,6 @@ use jsonrpsee::{
1111
use op_alloy_consensus::OpTxEnvelope;
1212
use op_alloy_network::Optimism;
1313
use reth_rpc_eth_types::EthApiError;
14-
use tips_audit::{MempoolEvent, MempoolEventPublisher};
1514
use tracing::{info, warn};
1615

1716
use crate::queue::QueuePublisher;
@@ -31,33 +30,25 @@ pub trait IngressApi {
3130
async fn send_raw_transaction(&self, tx: Bytes) -> RpcResult<B256>;
3231
}
3332

34-
pub struct IngressService<Publisher, Queue> {
33+
pub struct IngressService<Queue> {
3534
provider: RootProvider<Optimism>,
3635
dual_write_mempool: bool,
37-
publisher: Publisher,
3836
queue: Queue,
3937
}
4038

41-
impl<Publisher, Queue> IngressService<Publisher, Queue> {
42-
pub fn new(
43-
provider: RootProvider<Optimism>,
44-
dual_write_mempool: bool,
45-
publisher: Publisher,
46-
queue: Queue,
47-
) -> Self {
39+
impl<Queue> IngressService<Queue> {
40+
pub fn new(provider: RootProvider<Optimism>, dual_write_mempool: bool, queue: Queue) -> Self {
4841
Self {
4942
provider,
5043
dual_write_mempool,
51-
publisher,
5244
queue,
5345
}
5446
}
5547
}
5648

5749
#[async_trait]
58-
impl<Publisher, Queue> IngressApiServer for IngressService<Publisher, Queue>
50+
impl<Queue> IngressApiServer for IngressService<Queue>
5951
where
60-
Publisher: MempoolEventPublisher + Sync + Send + 'static,
6152
Queue: QueuePublisher + Sync + Send + 'static,
6253
{
6354
async fn send_bundle(&self, _bundle: EthSendBundle) -> RpcResult<EthBundleHash> {

crates/writer/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ path = "src/main.rs"
99

1010
[dependencies]
1111
tips-datastore.workspace = true
12+
tips-audit.workspace=true
1213
alloy-rpc-types-mev.workspace = true
1314
tokio.workspace = true
1415
tracing.workspace = true

crates/writer/src/main.rs

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -6,15 +6,16 @@ use rdkafka::{
66
config::ClientConfig,
77
consumer::{Consumer, StreamConsumer},
88
message::Message,
9+
producer::FutureProducer,
910
};
11+
use tips_audit::{KafkaMempoolEventPublisher, MempoolEvent, MempoolEventPublisher};
1012
use tips_datastore::{BundleDatastore, postgres::PostgresDatastore};
1113
use tokio::time::Duration;
1214
use tracing::{debug, error, info};
1315
use uuid::Uuid;
1416

1517
#[derive(Parser)]
16-
#[command(name = "tips-writer")]
17-
#[command(about = "TIPS Writer Service - Consumes bundles from Kafka and writes to datastore")]
18+
#[command(author, version, about, long_about = None)]
1819
struct Args {
1920
#[arg(long, env = "TIPS_WRITER_DATABASE_URL")]
2021
database_url: String,
@@ -33,24 +34,28 @@ struct Args {
3334
}
3435

3536
/// IngressWriter consumes bundles sent from the Ingress service and writes them to the datastore
36-
pub struct IngressWriter<Store> {
37+
pub struct IngressWriter<Store, Publisher> {
3738
queue_consumer: StreamConsumer,
3839
datastore: Store,
40+
publisher: Publisher,
3941
}
4042

41-
impl<Store> IngressWriter<Store>
43+
impl<Store, Publisher> IngressWriter<Store, Publisher>
4244
where
4345
Store: BundleDatastore + Send + Sync + 'static,
46+
Publisher: MempoolEventPublisher + Sync + Send + 'static,
4447
{
4548
pub fn new(
4649
queue_consumer: StreamConsumer,
4750
queue_topic: String,
4851
datastore: Store,
52+
publisher: Publisher,
4953
) -> Result<Self> {
5054
queue_consumer.subscribe(&[queue_topic.as_str()])?;
5155
Ok(Self {
5256
queue_consumer,
5357
datastore,
58+
publisher,
5459
})
5560
}
5661

@@ -75,7 +80,7 @@ where
7580
.map_err(|e| anyhow::anyhow!("Failed to insert bundle: {e}"))
7681
};
7782

78-
insert
83+
let bundle_id = insert
7984
.retry(
8085
&ExponentialBuilder::default()
8186
.with_min_delay(Duration::from_millis(100))
@@ -86,7 +91,19 @@ where
8691
info!("Retrying to insert bundle {:?} after {:?}", err, dur);
8792
})
8893
.await
89-
.map_err(|e| anyhow::anyhow!("Failed to insert bundle after retries: {e}"))
94+
.map_err(|e| anyhow::anyhow!("Failed to insert bundle after retries: {e}"))?;
95+
96+
if let Err(e) = self
97+
.publisher
98+
.publish(MempoolEvent::Created {
99+
bundle_id,
100+
bundle: bundle.clone(),
101+
})
102+
.await
103+
{
104+
error!(error = %e, bundle_id = %bundle_id, "Failed to publish MempoolEvent::Created");
105+
}
106+
Ok(bundle_id)
90107
}
91108
Err(e) => {
92109
error!(error = %e, "Error receiving message from Kafka");
@@ -114,9 +131,15 @@ async fn main() -> Result<()> {
114131
.set("session.timeout.ms", "6000")
115132
.set("enable.auto.commit", "true");
116133

134+
let kafka_producer: FutureProducer = ClientConfig::new()
135+
.set("bootstrap.servers", &args.kafka_brokers)
136+
.set("message.timeout.ms", "5000")
137+
.create()?;
138+
139+
let publisher = KafkaMempoolEventPublisher::new(kafka_producer, "tips-audit".to_string());
117140
let consumer = config.create()?;
118141
let datastore = PostgresDatastore::connect(args.database_url).await?;
119-
let writer = IngressWriter::new(consumer, args.kafka_topic.clone(), datastore)?;
142+
let writer = IngressWriter::new(consumer, args.kafka_topic.clone(), datastore, publisher)?;
120143

121144
info!(
122145
"Ingress Writer service started, consuming from topic: {}",

justfile

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,5 +46,8 @@ ingress:
4646
maintenance:
4747
cargo run --bin tips-maintenance
4848

49+
writer:
50+
cargo run --bin tips-writer
51+
4952
ui:
5053
cd ui && yarn dev

0 commit comments

Comments
 (0)