Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][client] Prevent retry topic and dead letter topic producer leaks when sending of message fails #23824

Merged
merged 9 commits into from
Jan 9, 2025
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertNotNull;
import static org.testng.Assert.assertNull;
Expand All @@ -40,9 +41,11 @@
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerBuilderImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.awaitility.Awaitility;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -1167,4 +1170,94 @@ public void testDeadLetterPolicyDeserialize() throws Exception {
consumerBuilder.loadConf(config);
assertEquals(((ConsumerBuilderImpl)consumerBuilder).getConf().getDeadLetterPolicy(), policy);
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce issue reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
@Test
public void testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);

AtomicInteger nackCounter = new AtomicInteger(0);
Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.deadLetterPolicy(DeadLetterPolicy.builder().maxRedeliverCount(3).deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
if (nackCounter.incrementAndGet() < 10) {
c.negativeAcknowledge(msg);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be <= 1 so that it doesn't leak producers",
dlqTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(dlqTopic).get().getProducers().size())
.describedAs("producer count of dlq topic %s should be 0 here",
dlqTopic)
.isEqualTo(0);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,12 @@
*/
package org.apache.pulsar.client.api;

import static org.assertj.core.api.Assertions.assertThat;
import static org.testng.Assert.assertEquals;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand All @@ -36,11 +36,10 @@
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.reflections.ReflectionUtils;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
Expand Down Expand Up @@ -617,10 +616,12 @@ public void testRetryTopicByCustomTopicName() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -629,7 +630,7 @@ public void testRetryTopicException() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -642,30 +643,16 @@ public void testRetryTopicException() throws Exception {
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (PulsarClientException.InvalidTopicNameException e) {
assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
} catch (Exception e) {
fail("exception should be PulsarClientException.InvalidTopicNameException");
fail("exception should be PulsarClientException.TopicTerminatedException");
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
}
consumer.close();
}


Expand Down Expand Up @@ -718,10 +705,12 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {

@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
String retryLetterTopic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/retry-topic");
final String topic = BrokerTestUtil.newUniqueName("persistent://my-property/my-ns/input-topic");
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
@Cleanup
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
Expand All @@ -730,7 +719,7 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.retryLetterTopic(retryLetterTopic)
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();
Expand All @@ -739,24 +728,11 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
}
}
admin.topics().terminateTopic(retryLetterTopic);

List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
Expand All @@ -769,16 +745,114 @@ public void testRetryTopicExceptionWithConcurrent() throws Exception {
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (Exception ignore) {

} finally {
} catch (PulsarClientException.TopicTerminatedException e) {
// ok
latch.countDown();
} catch (PulsarClientException e) {
// unexpected exception
fail("unexpected exception", e);
}
}).start();
}

latch.await();
latch.await(sendMessages, TimeUnit.SECONDS);
consumer.close();
}

@Data
static class Payload {
String number;

public Payload() {

}

public Payload(String number) {
this.number = number;
}
}

@Data
static class PayloadIncompatible {
long number;

public PayloadIncompatible() {

}

public PayloadIncompatible(long number) {
this.number = number;
}
}

// reproduce similar issue as reported in https://github.com/apache/pulsar/issues/20635#issuecomment-1709616321
// but for retry topic
@Test
public void testCloseRetryLetterTopicProducerOnExceptionToPreventProducerLeak() throws Exception {
String namespace = BrokerTestUtil.newUniqueName("my-property/my-ns");
admin.namespaces().createNamespace(namespace);
// don't enforce schema validation
admin.namespaces().setSchemaValidationEnforced(namespace, false);
// set schema compatibility strategy to always compatible
admin.namespaces().setSchemaCompatibilityStrategy(namespace, SchemaCompatibilityStrategy.ALWAYS_COMPATIBLE);

Schema<Payload> schema = Schema.AVRO(Payload.class);
Schema<PayloadIncompatible> schemaIncompatible = Schema.AVRO(
PayloadIncompatible.class);
String topic = BrokerTestUtil.newUniqueName("persistent://" + namespace
+ "/testCloseDeadLetterTopicProducerOnExceptionToPreventProducerLeak");
String dlqTopic = topic + "-DLQ";
String retryTopic = topic + "-RETRY";

// create topics
admin.topics().createNonPartitionedTopic(topic);
admin.topics().createNonPartitionedTopic(dlqTopic);
admin.topics().createNonPartitionedTopic(retryTopic);

Consumer<Payload> payloadConsumer = null;
try {
payloadConsumer = pulsarClient.newConsumer(schema).topic(topic)
.subscriptionType(SubscriptionType.Shared).subscriptionName("sub")
.ackTimeout(1, TimeUnit.SECONDS)
.negativeAckRedeliveryDelay(1, TimeUnit.MILLISECONDS)
.enableRetry(true)
.deadLetterPolicy(DeadLetterPolicy.builder().retryLetterTopic(retryTopic).maxRedeliverCount(3)
.deadLetterTopic(dlqTopic).build())
.messageListener((c, msg) -> {
try {
c.reconsumeLater(msg, 1, TimeUnit.MILLISECONDS);
} catch (PulsarClientException e) {
throw new RuntimeException(e);
}
}).subscribe();

// send a message to the topic with the incompatible schema
PayloadIncompatible payloadIncompatible = new PayloadIncompatible(123);
try (Producer<PayloadIncompatible> producer = pulsarClient.newProducer(schemaIncompatible).topic(topic)
.create()) {
producer.send(payloadIncompatible);
}

Thread.sleep(2000L);

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be <= 1 so that it doesn't leak producers",
retryTopic)
.isLessThanOrEqualTo(1);

} finally {
if (payloadConsumer != null) {
try {
payloadConsumer.close();
} catch (PulsarClientException e) {
// ignore
}
}
}

assertThat(pulsar.getBrokerService().getTopicReference(retryTopic).get().getProducers().size())
.describedAs("producer count of retry topic %s should be 0 here",
retryTopic)
.isEqualTo(0);
}
}
Loading
Loading