Skip to content

Commit 8c82258

Browse files
authored
chore: migrate to kafka props files for configs (#25)
* migrate to kafka properties file * migrate other services to kafka properties file * fmt
1 parent d5b9371 commit 8c82258

File tree

12 files changed

+161
-80
lines changed

12 files changed

+161
-80
lines changed

.env.example

Lines changed: 7 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -3,16 +3,14 @@ TIPS_INGRESS_ADDRESS=0.0.0.0
33
TIPS_INGRESS_PORT=8080
44
TIPS_INGRESS_RPC_MEMPOOL=http://localhost:2222
55
TIPS_INGRESS_DUAL_WRITE_MEMPOOL=false
6-
TIPS_INGRESS_KAFKA_BROKERS=localhost:9092
7-
TIPS_INGRESS_KAFKA_TOPIC=tips-audit
6+
TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-kafka-properties
7+
TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress
88
TIPS_INGRESS_LOG_LEVEL=info
9-
TIPS_INGRESS_KAFKA_QUEUE_TOPIC=tips-ingress-rpc
109
TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800
1110

1211
# Audit service configuration
13-
TIPS_AUDIT_KAFKA_BROKERS=localhost:9092
12+
TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties
1413
TIPS_AUDIT_KAFKA_TOPIC=tips-audit
15-
TIPS_AUDIT_KAFKA_GROUP_ID=local-audit
1614
TIPS_AUDIT_LOG_LEVEL=info
1715
TIPS_AUDIT_S3_BUCKET=tips
1816
TIPS_AUDIT_S3_CONFIG_TYPE=manual
@@ -25,7 +23,7 @@ TIPS_AUDIT_S3_SECRET_ACCESS_KEY=minioadmin
2523
TIPS_MAINTENANCE_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
2624
TIPS_MAINTENANCE_RPC_URL=http://localhost:2222
2725
TIPS_MAINTENANCE_RPC_POLL_INTERVAL_MS=250
28-
TIPS_MAINTENANCE_KAFKA_BROKERS=localhost:9092
26+
TIPS_MAINTENANCE_KAFKA_PROPERTIES_FILE=/app/docker/maintenance-kafka-properties
2927
TIPS_MAINTENANCE_FLASHBLOCKS_WS=ws://localhost:1115/ws
3028
TIPS_MAINTENANCE_KAFKA_TOPIC=tips-audit
3129
TIPS_MAINTENANCE_LOG_LEVEL=info
@@ -42,7 +40,7 @@ TIPS_UI_S3_SECRET_ACCESS_KEY=minioadmin
4240

4341
# Ingress Writer
4442
TIPS_INGRESS_WRITER_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
45-
TIPS_INGRESS_WRITER_KAFKA_BROKERS=localhost:9092
46-
TIPS_INGRESS_WRITER_KAFKA_TOPIC=tips-ingress-rpc
47-
TIPS_INGRESS_WRITER_KAFKA_GROUP_ID=local-writer
43+
TIPS_INGRESS_WRITER_KAFKA_PROPERTIES_FILE=/app/docker/ingress-writer-kafka-properties
44+
TIPS_INGRESS_KAFKA_TOPIC=tips-ingress
45+
TIPS_INGRESS_WRITER_AUDIT_TOPIC=tips-audit
4846
TIPS_INGRESS_WRITER_LOG_LEVEL=info

crates/audit/src/bin/main.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@ enum S3ConfigType {
1919
#[derive(Parser, Debug)]
2020
#[command(author, version, about, long_about = None)]
2121
struct Args {
22-
#[arg(long, env = "TIPS_AUDIT_KAFKA_BROKERS")]
23-
kafka_brokers: String,
22+
#[arg(long, env = "TIPS_AUDIT_KAFKA_PROPERTIES_FILE")]
23+
kafka_properties_file: String,
2424

2525
#[arg(long, env = "TIPS_AUDIT_KAFKA_TOPIC")]
2626
kafka_topic: String,
2727

28-
#[arg(long, env = "TIPS_AUDIT_KAFKA_GROUP_ID")]
29-
kafka_group_id: String,
30-
3128
#[arg(long, env = "TIPS_AUDIT_S3_BUCKET")]
3229
s3_bucket: String,
3330

@@ -80,14 +77,13 @@ async fn main() -> Result<()> {
8077
.init();
8178

8279
info!(
83-
kafka_brokers = %args.kafka_brokers,
80+
kafka_properties_file = %args.kafka_properties_file,
8481
kafka_topic = %args.kafka_topic,
85-
kafka_group_id = %args.kafka_group_id,
8682
s3_bucket = %args.s3_bucket,
8783
"Starting audit archiver"
8884
);
8985

90-
let consumer = create_kafka_consumer(&args.kafka_brokers, &args.kafka_group_id)?;
86+
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
9187
consumer.subscribe(&[&args.kafka_topic])?;
9288

9389
let reader = KafkaMempoolReader::new(consumer, args.kafka_topic.clone())?;

crates/audit/src/reader.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ use rdkafka::{
77
consumer::{Consumer, StreamConsumer},
88
message::Message,
99
};
10+
use std::fs;
1011
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1112
use tokio::time::sleep;
12-
use tracing::{debug, error};
13-
14-
pub fn create_kafka_consumer(kafka_brokers: &str, group_id: &str) -> Result<StreamConsumer> {
15-
let consumer: StreamConsumer = ClientConfig::new()
16-
.set("group.id", group_id)
17-
.set("bootstrap.servers", kafka_brokers)
18-
.set("enable.partition.eof", "false")
19-
.set("session.timeout.ms", "6000")
20-
.set("enable.auto.commit", "false")
21-
.set("auto.offset.reset", "earliest")
22-
.set("fetch.wait.max.ms", "100")
23-
.set("fetch.min.bytes", "1")
24-
.create()?;
13+
use tracing::{debug, error, info};
14+
15+
pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
16+
let client_config = load_kafka_config_from_file(kafka_properties_file)?;
17+
let consumer: StreamConsumer = client_config.create()?;
2518
Ok(consumer)
2619
}
2720

21+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
22+
let kafka_properties = fs::read_to_string(properties_file_path)?;
23+
info!("Kafka properties:\n{}", kafka_properties);
24+
25+
let mut client_config = ClientConfig::new();
26+
27+
for line in kafka_properties.lines() {
28+
let line = line.trim();
29+
if line.is_empty() || line.starts_with('#') {
30+
continue;
31+
}
32+
if let Some((key, value)) = line.split_once('=') {
33+
client_config.set(key.trim(), value.trim());
34+
}
35+
}
36+
37+
Ok(client_config)
38+
}
39+
2840
pub fn assign_topic_partition(consumer: &StreamConsumer, topic: &str) -> Result<()> {
2941
let mut tpl = TopicPartitionList::new();
3042
tpl.add_partition(topic, 0);

crates/ingress-rpc/src/main.rs

Lines changed: 29 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ use jsonrpsee::server::Server;
44
use op_alloy_network::Optimism;
55
use rdkafka::ClientConfig;
66
use rdkafka::producer::FutureProducer;
7+
use std::fs;
78
use std::net::IpAddr;
89
use tracing::{info, warn};
910
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
@@ -35,24 +36,16 @@ struct Config {
3536
dual_write_mempool: bool,
3637

3738
/// Kafka brokers for publishing mempool events
38-
#[arg(long, env = "TIPS_INGRESS_KAFKA_BROKERS")]
39-
kafka_brokers: String,
40-
41-
/// Kafka topic for publishing mempool events
42-
#[arg(
43-
long,
44-
env = "TIPS_INGRESS_KAFKA_TOPIC",
45-
default_value = "mempool-events"
46-
)]
47-
kafka_topic: String,
39+
#[arg(long, env = "TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE")]
40+
ingress_kafka_properties: String,
4841

4942
/// Kafka topic for queuing transactions before the DB Writer
5043
#[arg(
5144
long,
52-
env = "TIPS_INGRESS_KAFKA_QUEUE_TOPIC",
53-
default_value = "tips-ingress-rpc"
45+
env = "TIPS_INGRESS_KAFKA_INGRESS_TOPIC",
46+
default_value = "tips-ingress"
5447
)]
55-
queue_topic: String,
48+
ingress_topic: String,
5649

5750
#[arg(long, env = "TIPS_INGRESS_LOG_LEVEL", default_value = "info")]
5851
log_level: String,
@@ -106,12 +99,11 @@ async fn main() -> anyhow::Result<()> {
10699
.network::<Optimism>()
107100
.connect_http(config.mempool_url);
108101

109-
let queue_producer: FutureProducer = ClientConfig::new()
110-
.set("bootstrap.servers", &config.kafka_brokers)
111-
.set("message.timeout.ms", "5000")
112-
.create()?;
102+
let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?;
103+
104+
let queue_producer: FutureProducer = client_config.create()?;
113105

114-
let queue = KafkaQueuePublisher::new(queue_producer, config.queue_topic);
106+
let queue = KafkaQueuePublisher::new(queue_producer, config.ingress_topic);
115107

116108
let service = IngressService::new(
117109
provider,
@@ -133,3 +125,22 @@ async fn main() -> anyhow::Result<()> {
133125
handle.stopped().await;
134126
Ok(())
135127
}
128+
129+
fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result<ClientConfig> {
130+
let kafka_properties = fs::read_to_string(properties_file_path)?;
131+
info!("Kafka properties:\n{}", kafka_properties);
132+
133+
let mut client_config = ClientConfig::new();
134+
135+
for line in kafka_properties.lines() {
136+
let line = line.trim();
137+
if line.is_empty() || line.starts_with('#') {
138+
continue;
139+
}
140+
if let Some((key, value)) = line.split_once('=') {
141+
client_config.set(key.trim(), value.trim());
142+
}
143+
}
144+
145+
Ok(client_config)
146+
}

crates/ingress-writer/src/main.rs

Lines changed: 39 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use rdkafka::{
88
message::Message,
99
producer::FutureProducer,
1010
};
11+
use std::fs;
1112
use tips_audit::{BundleEvent, BundleEventPublisher, KafkaBundleEventPublisher};
1213
use tips_datastore::{BundleDatastore, postgres::PostgresDatastore};
1314
use tokio::time::Duration;
@@ -20,18 +21,18 @@ struct Args {
2021
#[arg(long, env = "TIPS_INGRESS_WRITER_DATABASE_URL")]
2122
database_url: String,
2223

23-
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_BROKERS")]
24-
kafka_brokers: String,
24+
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_PROPERTIES_FILE")]
25+
kafka_properties_file: String,
26+
27+
#[arg(long, env = "TIPS_INGRESS_KAFKA_TOPIC", default_value = "tips-ingress")]
28+
ingress_topic: String,
2529

2630
#[arg(
2731
long,
28-
env = "TIPS_INGRESS_WRITER_KAFKA_TOPIC",
29-
default_value = "tips-ingress-rpc"
32+
env = "TIPS_INGRESS_WRITER_AUDIT_TOPIC",
33+
default_value = "tips-audit"
3034
)]
31-
kafka_topic: String,
32-
33-
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_GROUP_ID")]
34-
kafka_group_id: String,
35+
audit_topic: String,
3536

3637
#[arg(long, env = "TIPS_INGRESS_WRITER_LOG_LEVEL", default_value = "info")]
3738
log_level: String,
@@ -129,31 +130,25 @@ async fn main() -> Result<()> {
129130
.with_env_filter(&args.log_level)
130131
.init();
131132

132-
let mut config = ClientConfig::new();
133-
config
134-
.set("group.id", &args.kafka_group_id)
135-
.set("bootstrap.servers", &args.kafka_brokers)
136-
.set("auto.offset.reset", "earliest")
137-
.set("enable.partition.eof", "false")
138-
.set("session.timeout.ms", "6000")
139-
.set("enable.auto.commit", "true");
140-
141-
let kafka_producer: FutureProducer = ClientConfig::new()
142-
.set("bootstrap.servers", &args.kafka_brokers)
143-
.set("message.timeout.ms", "5000")
144-
.create()?;
145-
146-
let publisher = KafkaBundleEventPublisher::new(kafka_producer, "tips-audit".to_string());
133+
let config = load_kafka_config_from_file(&args.kafka_properties_file)?;
134+
let kafka_producer: FutureProducer = config.create()?;
135+
136+
let publisher = KafkaBundleEventPublisher::new(kafka_producer, args.audit_topic.clone());
147137
let consumer = config.create()?;
148138

149139
let bundle_store = PostgresDatastore::connect(args.database_url).await?;
150140
bundle_store.run_migrations().await?;
151141

152-
let writer = IngressWriter::new(consumer, args.kafka_topic.clone(), bundle_store, publisher)?;
142+
let writer = IngressWriter::new(
143+
consumer,
144+
args.ingress_topic.clone(),
145+
bundle_store,
146+
publisher,
147+
)?;
153148

154149
info!(
155150
"Ingress Writer service started, consuming from topic: {}",
156-
args.kafka_topic
151+
args.ingress_topic
157152
);
158153
loop {
159154
match writer.insert_bundle().await {
@@ -167,3 +162,22 @@ async fn main() -> Result<()> {
167162
}
168163
}
169164
}
165+
166+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
167+
let kafka_properties = fs::read_to_string(properties_file_path)?;
168+
info!("Kafka properties:\n{}", kafka_properties);
169+
170+
let mut client_config = ClientConfig::new();
171+
172+
for line in kafka_properties.lines() {
173+
let line = line.trim();
174+
if line.is_empty() || line.starts_with('#') {
175+
continue;
176+
}
177+
if let Some((key, value)) = line.split_once('=') {
178+
client_config.set(key.trim(), value.trim());
179+
}
180+
}
181+
182+
Ok(client_config)
183+
}

crates/maintenance/src/main.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use clap::Parser;
99
use op_alloy_network::Optimism;
1010
use rdkafka::ClientConfig;
1111
use rdkafka::producer::FutureProducer;
12+
use std::fs;
1213
use std::sync::Arc;
1314
use tips_audit::KafkaBundleEventPublisher;
1415
use tips_datastore::PostgresDatastore;
@@ -19,8 +20,8 @@ use url::Url;
1920
#[derive(Parser, Clone)]
2021
#[command(author, version, about, long_about = None)]
2122
pub struct Args {
22-
#[arg(long, env = "TIPS_MAINTENANCE_KAFKA_BROKERS")]
23-
pub kafka_brokers: String,
23+
#[arg(long, env = "TIPS_MAINTENANCE_KAFKA_PROPERTIES_FILE")]
24+
pub kafka_properties_file: String,
2425

2526
#[arg(
2627
long,
@@ -100,10 +101,8 @@ async fn main() -> Result<()> {
100101

101102
let datastore = PostgresDatastore::connect(args.database_url.clone()).await?;
102103

103-
let kafka_producer: FutureProducer = ClientConfig::new()
104-
.set("bootstrap.servers", &args.kafka_brokers)
105-
.set("message.timeout.ms", "5000")
106-
.create()?;
104+
let client_config = load_kafka_config_from_file(&args.kafka_properties_file)?;
105+
let kafka_producer: FutureProducer = client_config.create()?;
107106

108107
let publisher = KafkaBundleEventPublisher::new(kafka_producer, args.kafka_topic.clone());
109108

@@ -125,3 +124,22 @@ async fn main() -> Result<()> {
125124

126125
Ok(())
127126
}
127+
128+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
129+
let kafka_properties = fs::read_to_string(properties_file_path)?;
130+
info!("Kafka properties:\n{}", kafka_properties);
131+
132+
let mut client_config = ClientConfig::new();
133+
134+
for line in kafka_properties.lines() {
135+
let line = line.trim();
136+
if line.is_empty() || line.starts_with('#') {
137+
continue;
138+
}
139+
if let Some((key, value)) = line.split_once('=') {
140+
client_config.set(key.trim(), value.trim());
141+
}
142+
}
143+
144+
Ok(client_config)
145+
}

docker-compose.tips.yml

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,8 @@ services:
88
- "8080:8080"
99
env_file:
1010
- .env.docker
11+
volumes:
12+
- ./docker/ingress-kafka-properties:/app/docker/ingress-kafka-properties:ro
1113
restart: unless-stopped
1214

1315
audit:
@@ -17,6 +19,8 @@ services:
1719
container_name: tips-audit
1820
env_file:
1921
- .env.docker
22+
volumes:
23+
- ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro
2024
restart: unless-stopped
2125

2226
maintenance:
@@ -26,6 +30,8 @@ services:
2630
container_name: tips-maintenance
2731
env_file:
2832
- .env.docker
33+
volumes:
34+
- ./docker/maintenance-kafka-properties:/app/docker/maintenance-kafka-properties:ro
2935
restart: unless-stopped
3036

3137
ui:
@@ -46,4 +52,6 @@ services:
4652
container_name: tips-ingress-writer
4753
env_file:
4854
- .env.docker
55+
volumes:
56+
- ./docker/ingress-writer-kafka-properties:/app/docker/ingress-writer-kafka-properties:ro
4957
restart: unless-stopped

0 commit comments

Comments
 (0)