Skip to content

Commit

Permalink
Fix invalid tests in RetryTopicTest and fix MultiTopicsConsumerImpl.c…
Browse files Browse the repository at this point in the history
…lose
  • Loading branch information
lhotari committed Jan 9, 2025
1 parent 16c82e4 commit 1e509d0
Show file tree
Hide file tree
Showing 3 changed files with 51 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -39,11 +38,8 @@
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -620,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -632,7 +630,7 @@ public void testRetryTopicException() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -645,30 +643,16 @@ public void testRetryTopicException() throws Exception {
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (PulsarClientException.InvalidTopicNameException e) {
assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
} catch (Exception e) {
fail("exception should be PulsarClientException.InvalidTopicNameException");
fail("exception should be PulsarClientException.TopicTerminatedException");
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
}
consumer.close();
}


Expand Down Expand Up @@ -721,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -733,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -742,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
Expand All @@ -772,15 +745,17 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (Exception ignore) {

} finally {
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
latch.countDown();
} catch (PulsarClientException e) {
// unexpected exception
fail("unexpected exception", e);
}
}).start();
}

latch.await();
latch.await(sendMessages, TimeUnit.SECONDS);
consumer.close();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1109,10 +1109,29 @@ public void connectionFailed(PulsarClientException exception) {
public synchronized CompletableFuture<Void> closeAsync() {
CompletableFuture<Void> closeFuture = new CompletableFuture<>();

ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
}
}));
}
if (deadLetterProducer != null) {
closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing deadLetterProducer of consumer", ex);
}
}));
}
CompletableFuture<Void> compositeCloseFuture = FutureUtil.waitForAll(closeFutures);


if (getState() == State.Closing || getState() == State.Closed) {
closeConsumerTasks();
failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
return closeFuture;
return compositeCloseFuture;
}

consumersClosedCounter.increment();
Expand All @@ -1124,7 +1143,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
deregisterFromClientCnx();
client.cleanupConsumer(this);
failPendingReceive().whenComplete((r, t) -> closeFuture.complete(null));
return closeFuture;
return compositeCloseFuture;
}

stats.getStatTimeout().ifPresent(Timeout::cancel);
Expand All @@ -1151,23 +1170,7 @@ public synchronized CompletableFuture<Void> closeAsync() {
});
}

ArrayList<CompletableFuture<Void>> closeFutures = new ArrayList<>(4);
closeFutures.add(closeFuture);
if (retryLetterProducer != null) {
closeFutures.add(retryLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing retryLetterProducer of consumer", ex);
}
}));
}
if (deadLetterProducer != null) {
closeFutures.add(deadLetterProducer.thenCompose(p -> p.closeAsync()).whenComplete((ignore, ex) -> {
if (ex != null) {
log.warn("Exception ignored in closing deadLetterProducer of consumer", ex);
}
}));
}
return FutureUtil.waitForAll(closeFutures);
return compositeCloseFuture;
}

private void cleanupAtClose(CompletableFuture<Void> closeFuture, Throwable exception) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -638,7 +638,14 @@ public CompletableFuture<Void> closeAsync() {

CompletableFuture<Void> closeFuture = new CompletableFuture<>();
List<CompletableFuture<Void>> futureList = consumers.values().stream()
.map(ConsumerImpl::closeAsync).collect(Collectors.toList());
.map(consumer -> consumer.closeAsync().exceptionally(t -> {
Throwable cause = FutureUtil.unwrapCompletionException(t);
if (!(cause instanceof PulsarClientException.AlreadyClosedException)) {
log.warn("[{}] [{}] Error closing individual consumer", consumer.getTopic(),
consumer.getSubscription(), cause);
}
return null;
})).collect(Collectors.toList());

FutureUtil.waitForAll(futureList)
.thenComposeAsync((r) -> {
Expand Down

0 comments on commit 1e509d0

Please sign in to comment.