File tree Expand file tree Collapse file tree 1 file changed +6
-13
lines changed
pulsar-broker/src/test/java/org/apache/pulsar/client/api Expand file tree Collapse file tree 1 file changed +6
-13
lines changed Original file line number Diff line number Diff line change @@ -262,8 +262,10 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
262262 String subName = "test-sub" ;
263263
264264 admin .topics ().createNonPartitionedTopic (topicName );
265+ @ Cleanup
265266 Producer <byte []> producer = pulsarClient .newProducer ().topic (topicName ).create ();
266267
268+ @ Cleanup
267269 Consumer <byte []> consumer = pulsarClient .newConsumer ()
268270 .topic (topicName )
269271 .subscriptionName (subName )
@@ -274,23 +276,14 @@ public void testNonDurableSubscriptionBackLogAfterTopicUnload() throws Exception
274276 String message = "my-message-" + i ;
275277 producer .send (message .getBytes ());
276278 }
277- producer .close ();
278279
279280 assertEquals (admin .topics ().getStats (topicName ).getSubscriptions ().get (subName ).getMsgBacklog (), 10 );
280281
281282 // 2. receive the message
282- Thread t = new Thread (() -> {
283- while (true ) {
284- Message <byte []> msg ;
285- try {
286- msg = consumer .receive ();
287- consumer .acknowledge (msg );
288- } catch (PulsarClientException e ) {
289- throw new RuntimeException (e );
290- }
291- }
292- });
293- t .start ();
283+ for (int i = 0 ; i < 10 ; i ++) {
284+ Message <byte []> msg = consumer .receive ();
285+ consumer .acknowledge (msg );
286+ }
294287
295288 // 3. consumed all messages and the msgBacklog is 0
296289 Awaitility .await ().untilAsserted (() ->
You can’t perform that action at this time.
0 commit comments