Skip to content

Commit

Permalink
Remove invalid test cases
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Jan 9, 2025
1 parent 615dd41 commit 9500a90
Showing 1 changed file with 0 additions and 129 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -23,33 +23,25 @@
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertNull;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;
import java.lang.reflect.Field;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import lombok.Data;
import org.apache.avro.AvroRuntimeException;
import org.apache.avro.reflect.Nullable;
import org.apache.pulsar.broker.BrokerTestUtil;
import org.apache.pulsar.client.api.schema.GenericRecord;
import org.apache.pulsar.client.impl.ConsumerImpl;
import org.apache.pulsar.client.impl.MultiTopicsConsumerImpl;
import org.apache.pulsar.client.util.RetryMessageUtil;
import org.apache.pulsar.common.policies.data.SchemaCompatibilityStrategy;
import org.reflections.ReflectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.Test;
import org.testng.collections.Lists;

@Test(groups = "broker-api")
public class RetryTopicTest extends ProducerConsumerBase {
Expand Down Expand Up @@ -617,61 +609,6 @@ public void testRetryTopicByCustomTopicName() throws Exception {
checkConsumer.close();
}


@Test(timeOut = 30000L)
public void testRetryTopicException() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
final int maxRedeliveryCount = 2;
final int sendMessages = 1;
// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.send(String.format("Hello Pulsar [%d]", i).getBytes());
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invlaid-topic#");
}
}
Message<byte[]> message = consumer.receive();
log.info("consumer received message : {} {}", message.getMessageId(), new String(message.getData()));
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (PulsarClientException.InvalidTopicNameException e) {
assertEquals(e.getClass(), PulsarClientException.InvalidTopicNameException.class);
} catch (Exception e) {
fail("exception should be PulsarClientException.InvalidTopicNameException");
}
consumer.close();
}


@Test(timeOut = 30000L)
public void testRetryProducerWillCloseByConsumer() throws Exception {
final String topicName = "persistent://my-property/my-ns/tp_" + UUID.randomUUID().toString();
Expand Down Expand Up @@ -718,72 +655,6 @@ public void testRetryProducerWillCloseByConsumer() throws Exception {
admin.topics().delete(topicDLQ, false);
}


@Test(timeOut = 30000L)
public void testRetryTopicExceptionWithConcurrent() throws Exception {
final String topic = "persistent://my-property/my-ns/retry-topic";
final int maxRedeliveryCount = 2;
final int sendMessages = 10;
// subscribe before publish
Consumer<byte[]> consumer = pulsarClient.newConsumer(Schema.BYTES)
.topic(topic)
.subscriptionName("my-subscription")
.subscriptionType(SubscriptionType.Shared)
.enableRetry(true)
.receiverQueueSize(100)
.deadLetterPolicy(DeadLetterPolicy.builder()
.maxRedeliverCount(maxRedeliveryCount)
.retryLetterTopic("persistent://my-property/my-ns/my-subscription-custom-Retry")
.build())
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest)
.subscribe();

Producer<byte[]> producer = pulsarClient.newProducer(Schema.BYTES)
.topic(topic)
.create();
for (int i = 0; i < sendMessages; i++) {
producer.newMessage().key("1").value(String.format("Hello Pulsar [%d]", i).getBytes()).send();
}
producer.close();

// mock a retry producer exception when reconsumelater is called
MultiTopicsConsumerImpl<byte[]> multiTopicsConsumer = (MultiTopicsConsumerImpl<byte[]>) consumer;
List<ConsumerImpl<byte[]>> consumers = multiTopicsConsumer.getConsumers();
for (ConsumerImpl<byte[]> c : consumers) {
Set<Field> deadLetterPolicyField =
ReflectionUtils.getAllFields(c.getClass(), ReflectionUtils.withName("deadLetterPolicy"));

if (deadLetterPolicyField.size() != 0) {
Field field = deadLetterPolicyField.iterator().next();
field.setAccessible(true);
DeadLetterPolicy deadLetterPolicy = (DeadLetterPolicy) field.get(c);
deadLetterPolicy.setRetryLetterTopic("#persistent://invalid-topic#");
}
}

List<Message<byte[]>> messages = Lists.newArrayList();
for (int i = 0; i < sendMessages; i++) {
messages.add(consumer.receive());
}

// mock call the reconsumeLater method concurrently
CountDownLatch latch = new CountDownLatch(messages.size());
for (Message<byte[]> message : messages) {
new Thread(() -> {
try {
consumer.reconsumeLater(message, 1, TimeUnit.SECONDS);
} catch (Exception ignore) {

} finally {
latch.countDown();
}
}).start();
}

latch.await();
consumer.close();
}

@Data
static class Payload {
String number;
Expand Down

0 comments on commit 9500a90

Please sign in to comment.