File tree Expand file tree Collapse file tree 1 file changed +33
-0
lines changed
Expand file tree Collapse file tree 1 file changed +33
-0
lines changed Original file line number Diff line number Diff line change @@ -78,3 +78,36 @@ impl QueuePublisher for KafkaQueuePublisher {
7878 self . enqueue_bundle ( bundle, sender) . await
7979 }
8080}
81+
82+ #[ cfg( test) ]
83+ mod tests {
84+ use super :: * ;
85+ use rdkafka:: config:: ClientConfig ;
86+ use tokio:: time:: { Duration , Instant } ;
87+
88+ fn create_test_bundle ( ) -> EthSendBundle {
89+ EthSendBundle :: default ( )
90+ }
91+
92+ #[ tokio:: test]
93+ async fn test_backoff_retry_logic ( ) {
94+ // use an invalid broker address to trigger the backoff logic
95+ let producer = ClientConfig :: new ( )
96+ . set ( "bootstrap.servers" , "localhost:9999" )
97+ . set ( "message.timeout.ms" , "100" )
98+ . create ( )
99+ . expect ( "Producer creation failed" ) ;
100+
101+ let publisher = KafkaQueuePublisher :: new ( producer, "tips-ingress" . to_string ( ) ) ;
102+ let bundle = create_test_bundle ( ) ;
103+ let sender = Address :: ZERO ;
104+
105+ let start = Instant :: now ( ) ;
106+ let result = publisher. enqueue_bundle ( & bundle, sender) . await ;
107+ let elapsed = start. elapsed ( ) ;
108+
109+ // the backoff tries at minimum 100ms, so verify we tried at least once
110+ assert ! ( result. is_err( ) ) ;
111+ assert ! ( elapsed >= Duration :: from_millis( 100 ) ) ;
112+ }
113+ }
You can’t perform that action at this time.
0 commit comments