@@ -78,3 +78,51 @@ impl QueuePublisher for KafkaQueuePublisher {
7878 self . enqueue_bundle ( bundle, sender) . await
7979 }
8080}
81+
82+ #[ cfg( test) ]
83+ mod tests {
84+ use super :: * ;
85+ use rdkafka:: config:: ClientConfig ;
86+ use tokio:: time:: { Duration , Instant } ;
87+
88+ fn create_test_bundle ( ) -> EthSendBundle {
89+ EthSendBundle :: default ( )
90+ }
91+
92+ #[ tokio:: test]
93+ async fn test_successful_enqueue ( ) {
94+ let producer = ClientConfig :: new ( )
95+ . set ( "bootstrap.servers" , "localhost:9092" )
96+ . set ( "message.timeout.ms" , "5000" )
97+ . create ( )
98+ . expect ( "Producer creation failed" ) ;
99+ let publisher = KafkaQueuePublisher :: new ( producer, "tips-ingress" . to_string ( ) ) ;
100+ let bundle = create_test_bundle ( ) ;
101+ let sender = Address :: ZERO ;
102+
103+ let result = publisher. enqueue_bundle ( & bundle, sender) . await ;
104+ assert ! ( result. is_ok( ) ) ;
105+ }
106+
107+ #[ tokio:: test]
108+ async fn test_backoff_retry_logic ( ) {
109+ // use an invalid broker address to trigger the backoff logic
110+ let producer = ClientConfig :: new ( )
111+ . set ( "bootstrap.servers" , "localhost:9999" )
112+ . set ( "message.timeout.ms" , "100" )
113+ . create ( )
114+ . expect ( "Producer creation failed" ) ;
115+
116+ let publisher = KafkaQueuePublisher :: new ( producer, "tips-ingress" . to_string ( ) ) ;
117+ let bundle = create_test_bundle ( ) ;
118+ let sender = Address :: ZERO ;
119+
120+ let start = Instant :: now ( ) ;
121+ let result = publisher. enqueue_bundle ( & bundle, sender) . await ;
122+ let elapsed = start. elapsed ( ) ;
123+
124+ // the backoff tries at minimum 100ms, so verify we tried at least once
125+ assert ! ( result. is_err( ) ) ;
126+ assert ! ( elapsed >= Duration :: from_millis( 100 ) ) ;
127+ }
128+ }
0 commit comments