Skip to content

Commit 8ef9e05

Browse files
committed
move publish to own func
1 parent 1a89d83 commit 8ef9e05

File tree

2 files changed

+21
-14
lines changed

2 files changed

+21
-14
lines changed

README.md

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,8 @@ The main entry point that provides a JSON-RPC API for receiving transactions and
2525
### 🔨 Maintenance (`crates/maintenance`)
2626
A service that maintains the health of the TIPS DataStore, by removing stale or included bundles.
2727

28+
### ✍️ Writer (`crates/writer`)
29+
A service that consumes bundles from Kafka and persists them to the datastore.
30+
2831
### 🖥️ UI (`ui`)
2932
A debug UI for viewing the state of the bundle store and S3.

crates/writer/src/main.rs

Lines changed: 18 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ use rdkafka::{
1111
use tips_audit::{KafkaMempoolEventPublisher, MempoolEvent, MempoolEventPublisher};
1212
use tips_datastore::{BundleDatastore, postgres::PostgresDatastore};
1313
use tokio::time::Duration;
14-
use tracing::{debug, error, info};
14+
use tracing::{debug, error, info, warn};
1515
use uuid::Uuid;
1616

1717
#[derive(Parser)]
@@ -59,7 +59,7 @@ where
5959
})
6060
}
6161

62-
async fn insert_bundle(&self) -> Result<Uuid> {
62+
async fn insert_bundle(&self) -> Result<(Uuid, EthSendBundle)> {
6363
match self.queue_consumer.recv().await {
6464
Ok(message) => {
6565
let payload = message
@@ -93,24 +93,27 @@ where
9393
.await
9494
.map_err(|e| anyhow::anyhow!("Failed to insert bundle after retries: {e}"))?;
9595

96-
if let Err(e) = self
97-
.publisher
98-
.publish(MempoolEvent::Created {
99-
bundle_id,
100-
bundle: bundle.clone(),
101-
})
102-
.await
103-
{
104-
error!(error = %e, bundle_id = %bundle_id, "Failed to publish MempoolEvent::Created");
105-
}
106-
Ok(bundle_id)
96+
Ok((bundle_id, bundle))
10797
}
10898
Err(e) => {
10999
error!(error = %e, "Error receiving message from Kafka");
110100
Err(e.into())
111101
}
112102
}
113103
}
104+
105+
async fn publish(&self, bundle_id: Uuid, bundle: &EthSendBundle) {
106+
if let Err(e) = self
107+
.publisher
108+
.publish(MempoolEvent::Created {
109+
bundle_id,
110+
bundle: bundle.clone(),
111+
})
112+
.await
113+
{
114+
warn!(error = %e, bundle_id = %bundle_id, "Failed to publish MempoolEvent::Created");
115+
}
116+
}
114117
}
115118

116119
#[tokio::main]
@@ -150,8 +153,9 @@ async fn main() -> Result<()> {
150153
);
151154
loop {
152155
match writer.insert_bundle().await {
153-
Ok(bundle_id) => {
156+
Ok((bundle_id, bundle)) => {
154157
info!(bundle_id = %bundle_id, "Successfully inserted bundle");
158+
writer.publish(bundle_id, &bundle).await;
155159
}
156160
Err(e) => {
157161
error!(error = %e, "Failed to process bundle");

0 commit comments

Comments
 (0)