diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java index 69c7f404fdd57..98c698995a0f4 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentTopic.java @@ -3229,9 +3229,11 @@ public void checkInactiveSubscriptions() { .toMillis(nsExpirationTime == null ? defaultExpirationTime : nsExpirationTime); if (expirationTimeMillis > 0) { subscriptions.forEach((subName, sub) -> { - if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected() - || sub.isReplicated() - || isCompactionSubscription(subName)) { + if (sub.dispatcher != null && sub.dispatcher.isConsumerConnected()) { + sub.cursor.updateLastActive(); + return; + } + if (sub.isReplicated() || isCompactionSubscription(subName)) { return; } if (System.currentTimeMillis() - sub.cursor.getLastActive() > expirationTimeMillis) { diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java index be1221b7fab41..199bff7ef561e 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/service/BrokerServiceTest.java @@ -22,6 +22,7 @@ import static org.apache.pulsar.common.naming.SystemTopicNames.TRANSACTION_COORDINATOR_LOG; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.any; +import static org.mockito.Mockito.doNothing; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.spy; @@ -1313,6 +1314,66 @@ public void testCheckInactiveSubscriptionsShouldNotDeleteCompactionCursor() thro } + @Test + public void testCheckInactiveSubscriptionWhenNoMessageToAck() throws Exception { + String namespace = "prop/testInactiveSubscriptionWhenNoMessageToAck"; + + try { + admin.namespaces().createNamespace(namespace); + } catch (PulsarAdminException.ConflictException e) { + // Ok.. (if test fails intermittently and namespace is already created) + } + // set enable subscription expiration. + admin.namespaces().setSubscriptionExpirationTime(namespace, 1); + + String topic = "persistent://" + namespace + "/my-topic"; + Producer producer = pulsarClient.newProducer().topic(topic).create(); + producer.send("test".getBytes()); + producer.close(); + + // create consumer to consume all messages + Consumer consumer = pulsarClient.newConsumer().topic(topic).subscriptionName("sub1") + .subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe(); + consumer.acknowledge(consumer.receive()); + + Optional topicOptional = pulsar.getBrokerService().getTopic(topic, true).get(); + assertTrue(topicOptional.isPresent()); + PersistentTopic persistentTopic = (PersistentTopic) topicOptional.get(); + + // wait for 1min, but consumer is still connected all the time. + // so subscription should not be deleted. + Thread.sleep(60000); + persistentTopic.checkInactiveSubscriptions(); + assertTrue(persistentTopic.getSubscriptions().containsKey("sub1")); + PersistentSubscription sub = persistentTopic.getSubscription("sub1"); + + // shutdown pulsar ungracefully + // disable the updateLastActive method to simulate the ungraceful shutdown + ManagedCursorImpl cursor = (ManagedCursorImpl) sub.getCursor(); + ManagedCursorImpl spyCursor = Mockito.spy(cursor); + doNothing().when(spyCursor).updateLastActive(); + Field cursorField = PersistentSubscription.class.getDeclaredField("cursor"); + cursorField.setAccessible(true); + cursorField.set(sub, spyCursor); + + // restart pulsar + consumer.close(); + restartBroker(); + + admin.lookups().lookupTopic(topic); + topicOptional = pulsar.getBrokerService().getTopic(topic, true).get(); + assertTrue(topicOptional.isPresent()); + persistentTopic = (PersistentTopic) topicOptional.get(); + persistentTopic.checkInactiveSubscriptions(); + // wait for two seconds to complete the async task + Thread.sleep(2000); + + // check if subscription is still present + assertTrue(persistentTopic.getSubscriptions().containsKey("sub1")); + sub = (PersistentSubscription) persistentTopic.getSubscription("sub1"); + assertNotNull(sub); + } + /** * Verifies brokerService should not have deadlock and successfully remove topic from topicMap on topic-failure and * it should not introduce deadlock while performing it.