Skip to content

Commit 19b4124

Browse files
committed
migrate other services to kafka properties file
1 parent cbef6fc commit 19b4124

File tree

12 files changed

+146
-73
lines changed

12 files changed

+146
-73
lines changed

.env.example

Lines changed: 6 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -4,14 +4,13 @@ TIPS_INGRESS_PORT=8080
44
TIPS_INGRESS_RPC_MEMPOOL=http://localhost:2222
55
TIPS_INGRESS_DUAL_WRITE_MEMPOOL=false
66
TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE=/app/docker/ingress-kafka-properties
7-
TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress-rpc
7+
TIPS_INGRESS_KAFKA_INGRESS_TOPIC=tips-ingress
88
TIPS_INGRESS_LOG_LEVEL=info
99
TIPS_INGRESS_SEND_TRANSACTION_DEFAULT_LIFETIME_SECONDS=10800
1010

1111
# Audit service configuration
12-
TIPS_AUDIT_KAFKA_BROKERS=localhost:9092
12+
TIPS_AUDIT_KAFKA_PROPERTIES_FILE=/app/docker/audit-kafka-properties
1313
TIPS_AUDIT_KAFKA_TOPIC=tips-audit
14-
TIPS_AUDIT_KAFKA_GROUP_ID=local-audit
1514
TIPS_AUDIT_LOG_LEVEL=info
1615
TIPS_AUDIT_S3_BUCKET=tips
1716
TIPS_AUDIT_S3_CONFIG_TYPE=manual
@@ -24,7 +23,7 @@ TIPS_AUDIT_S3_SECRET_ACCESS_KEY=minioadmin
2423
TIPS_MAINTENANCE_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
2524
TIPS_MAINTENANCE_RPC_URL=http://localhost:2222
2625
TIPS_MAINTENANCE_RPC_POLL_INTERVAL_MS=250
27-
TIPS_MAINTENANCE_KAFKA_BROKERS=localhost:9092
26+
TIPS_MAINTENANCE_KAFKA_PROPERTIES_FILE=/app/docker/maintenance-kafka-properties
2827
TIPS_MAINTENANCE_FLASHBLOCKS_WS=ws://localhost:1115/ws
2928
TIPS_MAINTENANCE_KAFKA_TOPIC=tips-audit
3029
TIPS_MAINTENANCE_LOG_LEVEL=info
@@ -41,7 +40,7 @@ TIPS_UI_S3_SECRET_ACCESS_KEY=minioadmin
4140

4241
# Ingress Writer
4342
TIPS_INGRESS_WRITER_DATABASE_URL=postgresql://postgres:postgres@localhost:5432/postgres
44-
TIPS_INGRESS_WRITER_KAFKA_BROKERS=localhost:9092
45-
TIPS_INGRESS_WRITER_KAFKA_TOPIC=tips-ingress-rpc
46-
TIPS_INGRESS_WRITER_KAFKA_GROUP_ID=local-writer
43+
TIPS_INGRESS_WRITER_KAFKA_PROPERTIES_FILE=/app/docker/ingress-writer-kafka-properties
44+
TIPS_INGRESS_KAFKA_TOPIC=tips-ingress
45+
TIPS_INGRESS_WRITER_AUDIT_TOPIC=tips-audit
4746
TIPS_INGRESS_WRITER_LOG_LEVEL=info

crates/audit/src/bin/main.rs

Lines changed: 4 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -19,15 +19,12 @@ enum S3ConfigType {
1919
#[derive(Parser, Debug)]
2020
#[command(author, version, about, long_about = None)]
2121
struct Args {
22-
#[arg(long, env = "TIPS_AUDIT_KAFKA_BROKERS")]
23-
kafka_brokers: String,
22+
#[arg(long, env = "TIPS_AUDIT_KAFKA_PROPERTIES_FILE")]
23+
kafka_properties_file: String,
2424

2525
#[arg(long, env = "TIPS_AUDIT_KAFKA_TOPIC")]
2626
kafka_topic: String,
2727

28-
#[arg(long, env = "TIPS_AUDIT_KAFKA_GROUP_ID")]
29-
kafka_group_id: String,
30-
3128
#[arg(long, env = "TIPS_AUDIT_S3_BUCKET")]
3229
s3_bucket: String,
3330

@@ -80,14 +77,13 @@ async fn main() -> Result<()> {
8077
.init();
8178

8279
info!(
83-
kafka_brokers = %args.kafka_brokers,
80+
kafka_properties_file = %args.kafka_properties_file,
8481
kafka_topic = %args.kafka_topic,
85-
kafka_group_id = %args.kafka_group_id,
8682
s3_bucket = %args.s3_bucket,
8783
"Starting audit archiver"
8884
);
8985

90-
let consumer = create_kafka_consumer(&args.kafka_brokers, &args.kafka_group_id)?;
86+
let consumer = create_kafka_consumer(&args.kafka_properties_file)?;
9187
consumer.subscribe(&[&args.kafka_topic])?;
9288

9389
let reader = KafkaMempoolReader::new(consumer, args.kafka_topic.clone())?;

crates/audit/src/reader.rs

Lines changed: 25 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -7,24 +7,36 @@ use rdkafka::{
77
consumer::{Consumer, StreamConsumer},
88
message::Message,
99
};
10+
use std::fs;
1011
use std::time::{Duration, SystemTime, UNIX_EPOCH};
1112
use tokio::time::sleep;
12-
use tracing::{debug, error};
13-
14-
pub fn create_kafka_consumer(kafka_brokers: &str, group_id: &str) -> Result<StreamConsumer> {
15-
let consumer: StreamConsumer = ClientConfig::new()
16-
.set("group.id", group_id)
17-
.set("bootstrap.servers", kafka_brokers)
18-
.set("enable.partition.eof", "false")
19-
.set("session.timeout.ms", "6000")
20-
.set("enable.auto.commit", "false")
21-
.set("auto.offset.reset", "earliest")
22-
.set("fetch.wait.max.ms", "100")
23-
.set("fetch.min.bytes", "1")
24-
.create()?;
13+
use tracing::{debug, error, info};
14+
15+
pub fn create_kafka_consumer(kafka_properties_file: &str) -> Result<StreamConsumer> {
16+
let client_config = load_kafka_config_from_file(kafka_properties_file)?;
17+
let consumer: StreamConsumer = client_config.create()?;
2518
Ok(consumer)
2619
}
2720

21+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
22+
let kafka_properties = fs::read_to_string(properties_file_path)?;
23+
info!("Kafka properties:\n{}", kafka_properties);
24+
25+
let mut client_config = ClientConfig::new();
26+
27+
for line in kafka_properties.lines() {
28+
let line = line.trim();
29+
if line.is_empty() || line.starts_with('#') {
30+
continue;
31+
}
32+
if let Some((key, value)) = line.split_once('=') {
33+
client_config.set(key.trim(), value.trim());
34+
}
35+
}
36+
37+
Ok(client_config)
38+
}
39+
2840
pub fn assign_topic_partition(consumer: &StreamConsumer, topic: &str) -> Result<()> {
2941
let mut tpl = TopicPartitionList::new();
3042
tpl.add_partition(topic, 0);

crates/ingress-rpc/src/main.rs

Lines changed: 21 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ struct Config {
4343
#[arg(
4444
long,
4545
env = "TIPS_INGRESS_KAFKA_INGRESS_TOPIC",
46-
default_value = "tips-ingress-rpc"
46+
default_value = "tips-ingress"
4747
)]
4848
ingress_topic: String,
4949

@@ -99,18 +99,7 @@ async fn main() -> anyhow::Result<()> {
9999
.network::<Optimism>()
100100
.connect_http(config.mempool_url);
101101

102-
let kafka_properties = fs::read_to_string(&config.ingress_kafka_properties)?;
103-
let mut client_config = ClientConfig::new();
104-
105-
for line in kafka_properties.lines() {
106-
let line = line.trim();
107-
if line.is_empty() || line.starts_with('#') {
108-
continue;
109-
}
110-
if let Some((key, value)) = line.split_once('=') {
111-
client_config.set(key.trim(), value.trim());
112-
}
113-
}
102+
let client_config = load_kafka_config_from_file(&config.ingress_kafka_properties)?;
114103

115104
let queue_producer: FutureProducer = client_config.create()?;
116105

@@ -136,3 +125,22 @@ async fn main() -> anyhow::Result<()> {
136125
handle.stopped().await;
137126
Ok(())
138127
}
128+
129+
fn load_kafka_config_from_file(properties_file_path: &str) -> anyhow::Result<ClientConfig> {
130+
let kafka_properties = fs::read_to_string(properties_file_path)?;
131+
info!("Kafka properties:\n{}", kafka_properties);
132+
133+
let mut client_config = ClientConfig::new();
134+
135+
for line in kafka_properties.lines() {
136+
let line = line.trim();
137+
if line.is_empty() || line.starts_with('#') {
138+
continue;
139+
}
140+
if let Some((key, value)) = line.split_once('=') {
141+
client_config.set(key.trim(), value.trim());
142+
}
143+
}
144+
145+
Ok(client_config)
146+
}

crates/ingress-writer/src/main.rs

Lines changed: 37 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ use rdkafka::{
88
message::Message,
99
producer::FutureProducer,
1010
};
11+
use std::fs;
1112
use tips_audit::{BundleEvent, BundleEventPublisher, KafkaBundleEventPublisher};
1213
use tips_datastore::{BundleDatastore, postgres::PostgresDatastore};
1314
use tokio::time::Duration;
@@ -20,18 +21,22 @@ struct Args {
2021
#[arg(long, env = "TIPS_INGRESS_WRITER_DATABASE_URL")]
2122
database_url: String,
2223

23-
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_BROKERS")]
24-
kafka_brokers: String,
24+
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_PROPERTIES_FILE")]
25+
kafka_properties_file: String,
2526

2627
#[arg(
2728
long,
28-
env = "TIPS_INGRESS_WRITER_KAFKA_TOPIC",
29-
default_value = "tips-ingress-rpc"
29+
env = "TIPS_INGRESS_KAFKA_TOPIC",
30+
default_value = "tips-ingress"
3031
)]
31-
kafka_topic: String,
32+
ingress_topic: String,
3233

33-
#[arg(long, env = "TIPS_INGRESS_WRITER_KAFKA_GROUP_ID")]
34-
kafka_group_id: String,
34+
#[arg(
35+
long,
36+
env = "TIPS_INGRESS_WRITER_AUDIT_TOPIC",
37+
default_value = "tips-audit"
38+
)]
39+
audit_topic: String,
3540

3641
#[arg(long, env = "TIPS_INGRESS_WRITER_LOG_LEVEL", default_value = "info")]
3742
log_level: String,
@@ -129,31 +134,20 @@ async fn main() -> Result<()> {
129134
.with_env_filter(&args.log_level)
130135
.init();
131136

132-
let mut config = ClientConfig::new();
133-
config
134-
.set("group.id", &args.kafka_group_id)
135-
.set("bootstrap.servers", &args.kafka_brokers)
136-
.set("auto.offset.reset", "earliest")
137-
.set("enable.partition.eof", "false")
138-
.set("session.timeout.ms", "6000")
139-
.set("enable.auto.commit", "true");
140-
141-
let kafka_producer: FutureProducer = ClientConfig::new()
142-
.set("bootstrap.servers", &args.kafka_brokers)
143-
.set("message.timeout.ms", "5000")
144-
.create()?;
145-
146-
let publisher = KafkaBundleEventPublisher::new(kafka_producer, "tips-audit".to_string());
137+
let config = load_kafka_config_from_file(&args.kafka_properties_file)?;
138+
let kafka_producer: FutureProducer = config.create()?;
139+
140+
let publisher = KafkaBundleEventPublisher::new(kafka_producer, args.audit_topic.clone());
147141
let consumer = config.create()?;
148142

149143
let bundle_store = PostgresDatastore::connect(args.database_url).await?;
150144
bundle_store.run_migrations().await?;
151145

152-
let writer = IngressWriter::new(consumer, args.kafka_topic.clone(), bundle_store, publisher)?;
146+
let writer = IngressWriter::new(consumer, args.ingress_topic.clone(), bundle_store, publisher)?;
153147

154148
info!(
155149
"Ingress Writer service started, consuming from topic: {}",
156-
args.kafka_topic
150+
args.ingress_topic
157151
);
158152
loop {
159153
match writer.insert_bundle().await {
@@ -167,3 +161,22 @@ async fn main() -> Result<()> {
167161
}
168162
}
169163
}
164+
165+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
166+
let kafka_properties = fs::read_to_string(properties_file_path)?;
167+
info!("Kafka properties:\n{}", kafka_properties);
168+
169+
let mut client_config = ClientConfig::new();
170+
171+
for line in kafka_properties.lines() {
172+
let line = line.trim();
173+
if line.is_empty() || line.starts_with('#') {
174+
continue;
175+
}
176+
if let Some((key, value)) = line.split_once('=') {
177+
client_config.set(key.trim(), value.trim());
178+
}
179+
}
180+
181+
Ok(client_config)
182+
}

crates/maintenance/src/main.rs

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ use clap::Parser;
99
use op_alloy_network::Optimism;
1010
use rdkafka::ClientConfig;
1111
use rdkafka::producer::FutureProducer;
12+
use std::fs;
1213
use std::sync::Arc;
1314
use tips_audit::KafkaBundleEventPublisher;
1415
use tips_datastore::PostgresDatastore;
@@ -19,8 +20,8 @@ use url::Url;
1920
#[derive(Parser, Clone)]
2021
#[command(author, version, about, long_about = None)]
2122
pub struct Args {
22-
#[arg(long, env = "TIPS_MAINTENANCE_KAFKA_BROKERS")]
23-
pub kafka_brokers: String,
23+
#[arg(long, env = "TIPS_MAINTENANCE_KAFKA_PROPERTIES_FILE")]
24+
pub kafka_properties_file: String,
2425

2526
#[arg(
2627
long,
@@ -100,10 +101,8 @@ async fn main() -> Result<()> {
100101

101102
let datastore = PostgresDatastore::connect(args.database_url.clone()).await?;
102103

103-
let kafka_producer: FutureProducer = ClientConfig::new()
104-
.set("bootstrap.servers", &args.kafka_brokers)
105-
.set("message.timeout.ms", "5000")
106-
.create()?;
104+
let client_config = load_kafka_config_from_file(&args.kafka_properties_file)?;
105+
let kafka_producer: FutureProducer = client_config.create()?;
107106

108107
let publisher = KafkaBundleEventPublisher::new(kafka_producer, args.kafka_topic.clone());
109108

@@ -125,3 +124,22 @@ async fn main() -> Result<()> {
125124

126125
Ok(())
127126
}
127+
128+
fn load_kafka_config_from_file(properties_file_path: &str) -> Result<ClientConfig> {
129+
let kafka_properties = fs::read_to_string(properties_file_path)?;
130+
info!("Kafka properties:\n{}", kafka_properties);
131+
132+
let mut client_config = ClientConfig::new();
133+
134+
for line in kafka_properties.lines() {
135+
let line = line.trim();
136+
if line.is_empty() || line.starts_with('#') {
137+
continue;
138+
}
139+
if let Some((key, value)) = line.split_once('=') {
140+
client_config.set(key.trim(), value.trim());
141+
}
142+
}
143+
144+
Ok(client_config)
145+
}

docker-compose.tips.yml

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,8 @@ services:
1919
container_name: tips-audit
2020
env_file:
2121
- .env.docker
22+
volumes:
23+
- ./docker/audit-kafka-properties:/app/docker/audit-kafka-properties:ro
2224
restart: unless-stopped
2325

2426
maintenance:
@@ -28,6 +30,8 @@ services:
2830
container_name: tips-maintenance
2931
env_file:
3032
- .env.docker
33+
volumes:
34+
- ./docker/maintenance-kafka-properties:/app/docker/maintenance-kafka-properties:ro
3135
restart: unless-stopped
3236

3337
ui:
@@ -48,4 +52,6 @@ services:
4852
container_name: tips-ingress-writer
4953
env_file:
5054
- .env.docker
55+
volumes:
56+
- ./docker/ingress-writer-kafka-properties:/app/docker/ingress-writer-kafka-properties:ro
5157
restart: unless-stopped

docker-compose.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ services:
5656
command: |
5757
sh -c "
5858
kafka-topics --create --if-not-exists --topic tips-audit --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
59-
kafka-topics --create --if-not-exists --topic tips-ingress-rpc --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
59+
kafka-topics --create --if-not-exists --topic tips-ingress --bootstrap-server kafka:29092 --partitions 3 --replication-factor 1
6060
kafka-topics --list --bootstrap-server kafka:29092
6161
"
6262

docker/audit-kafka-properties

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,10 @@
1+
# Kafka configuration properties for audit service
2+
bootstrap.servers=host.docker.internal:9094
3+
message.timeout.ms=5000
4+
group.id=local-audit
5+
enable.partition.eof=false
6+
session.timeout.ms=6000
7+
enable.auto.commit=false
8+
auto.offset.reset=earliest
9+
fetch.wait.max.ms=100
10+
fetch.min.bytes=1

docker/ingress-kafka-properties

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
# Kafka configuration properties for ingress service
2-
bootstrap.servers=host.docker.internal:9092
2+
bootstrap.servers=host.docker.internal:9094
33
message.timeout.ms=5000

0 commit comments

Comments
 (0)