@@ -4,6 +4,7 @@ use jsonrpsee::server::Server;
44use op_alloy_network:: Optimism ;
55use rdkafka:: ClientConfig ;
66use rdkafka:: producer:: FutureProducer ;
7+ use std:: fs;
78use std:: net:: IpAddr ;
89use tracing:: { info, warn} ;
910use tracing_subscriber:: { layer:: SubscriberExt , util:: SubscriberInitExt } ;
@@ -35,24 +36,16 @@ struct Config {
3536 dual_write_mempool : bool ,
3637
3738 /// Kafka brokers for publishing mempool events
38- #[ arg( long, env = "TIPS_INGRESS_KAFKA_BROKERS" ) ]
39- kafka_brokers : String ,
40-
41- /// Kafka topic for publishing mempool events
42- #[ arg(
43- long,
44- env = "TIPS_INGRESS_KAFKA_TOPIC" ,
45- default_value = "mempool-events"
46- ) ]
47- kafka_topic : String ,
39+ #[ arg( long, env = "TIPS_INGRESS_KAFKA_INGRESS_PROPERTIES_FILE" ) ]
40+ ingress_kafka_properties : String ,
4841
4942 /// Kafka topic for queuing transactions before the DB Writer
5043 #[ arg(
5144 long,
52- env = "TIPS_INGRESS_KAFKA_QUEUE_TOPIC " ,
45+ env = "TIPS_INGRESS_KAFKA_INGRESS_TOPIC " ,
5346 default_value = "tips-ingress-rpc"
5447 ) ]
55- queue_topic : String ,
48+ ingress_topic : String ,
5649
5750 #[ arg( long, env = "TIPS_INGRESS_LOG_LEVEL" , default_value = "info" ) ]
5851 log_level : String ,
@@ -106,12 +99,22 @@ async fn main() -> anyhow::Result<()> {
10699 . network :: < Optimism > ( )
107100 . connect_http ( config. mempool_url ) ;
108101
109- let queue_producer: FutureProducer = ClientConfig :: new ( )
110- . set ( "bootstrap.servers" , & config. kafka_brokers )
111- . set ( "message.timeout.ms" , "5000" )
112- . create ( ) ?;
102+ let kafka_properties = fs:: read_to_string ( & config. ingress_kafka_properties ) ?;
103+ let mut client_config = ClientConfig :: new ( ) ;
104+
105+ for line in kafka_properties. lines ( ) {
106+ let line = line. trim ( ) ;
107+ if line. is_empty ( ) || line. starts_with ( '#' ) {
108+ continue ;
109+ }
110+ if let Some ( ( key, value) ) = line. split_once ( '=' ) {
111+ client_config. set ( key. trim ( ) , value. trim ( ) ) ;
112+ }
113+ }
114+
115+ let queue_producer: FutureProducer = client_config. create ( ) ?;
113116
114- let queue = KafkaQueuePublisher :: new ( queue_producer, config. queue_topic ) ;
117+ let queue = KafkaQueuePublisher :: new ( queue_producer, config. ingress_topic ) ;
115118
116119 let service = IngressService :: new (
117120 provider,
0 commit comments