Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] [broker] fix incorrect delete sub when lastActive do not update. #31

Closed
wants to merge 5 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -3229,9 +3229,11 @@ public void checkInactiveSubscriptions() {
.toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime);
if (expirationTimeMillis > 0) {
subscriptions.forEach((subName, sub) -> {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()
|| sub.isReplicated()
|| isCompactionSubscription(subName)) {
if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) {
sub.cursor.updateLastActive();
return;
}
if (sub.isReplicated() || isCompactionSubscription(subName)) {
return;
}
if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.doNothing;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.spy;
Expand Down Expand Up @@ -1313,6 +1314,66 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro

}

@Test
public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws Exception {
String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck";

try {
admin.namespaces().createNamespace(namespace);
} catch (PulsarAdminException.ConflictException e) {
// Ok.. (if test fails intermittently and namespace is already created)
}
// set enable subscription expiration.
admin.namespaces().setSubscriptionExpirationTime(namespace, 1);

String topic = "persistent://" + namespace + "/my-topic";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topic).create();
producer.send("test".getBytes());
producer.close();

// create consumer to consume all messages
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
consumer.acknowledge(consumer.receive());

Optional<Topic> topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
assertTrue(topicOptional.isPresent());
PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get();

// wait for 1min, but consumer is still connected all the time.
// so subscription should not be deleted.
Thread.sleep(60000);
persistentTopic.checkInactiveSubscriptions();
assertTrue(persistentTopic.getSubscriptions().containsKey("sub1"));
PersistentSubscription sub = persistentTopic.getSubscription("sub1");

// shutdown pulsar ungracefully
// disable the updateLastActive method to simulate the ungraceful shutdown
ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor();
ManagedCursorImpl spyCursor = Mockito.spy(cursor);
doNothing().when(spyCursor).updateLastActive();
Field cursorField = PersistentSubscription.class.getDeclaredField("cursor");
cursorField.setAccessible(true);
cursorField.set(sub, spyCursor);

// restart pulsar
consumer.close();
restartBroker();

admin.lookups().lookupTopic(topic);
topicOptional = pulsar.getBrokerService().getTopic(topic, true).get();
assertTrue(topicOptional.isPresent());
persistentTopic = (PersistentTopic) topicOptional.get();
persistentTopic.checkInactiveSubscriptions();
// wait for two seconds to complete the async task
Thread.sleep(2000);

// check if subscription is still present
assertTrue(persistentTopic.getSubscriptions().containsKey("sub1"));
sub = (PersistentSubscription) persistentTopic.getSubscription("sub1");
assertNotNull(sub);
}

/**
* Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and
* it should not introduce deadlock while performing it.
Expand Down
Loading