diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java index db129b54533a8..798568c7daf68 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/compaction/EventTimeOrderCompactor.java @@ -95,16 +95,15 @@ protected boolean compactMessage(String topic, Map protected boolean compactBatchMessage(String topic, Map> latestForKey, RawMessage m, MessageMetadata metadata, MessageId id) { boolean deletedMessage = false; + boolean hasMessagesRetained = false; try { - int numMessagesInBatch = metadata.getNumMessagesInBatch(); - int deleteCnt = 0; - for (MessageCompactionData mcd : extractMessageCompactionDataFromBatch(m, metadata)) { if (mcd.key() == null) { if (!topicCompactionRetainNullKey) { // record delete null-key message event - deleteCnt++; mxBean.addCompactionRemovedEvent(topic); + } else { + hasMessagesRetained = true; } continue; } @@ -120,15 +119,15 @@ protected boolean compactBatchMessage(String topic, Map latestForK protected boolean compactBatchMessage(String topic, Map latestForKey, RawMessage m, MessageMetadata metadata, MessageId id) { boolean deletedMessage = false; + boolean hasMessagesRetained = false; try { - int numMessagesInBatch = metadata.getNumMessagesInBatch(); - int deleteCnt = 0; for (ImmutableTriple e : extractIdsAndKeysAndSizeFromBatch( m, metadata)) { if (e != null) { if (e.getMiddle() == null) { if (!topicCompactionRetainNullKey) { // record delete null-key message event - deleteCnt++; mxBean.addCompactionRemovedEvent(topic); + } else { + hasMessagesRetained = true; } continue; } @@ -99,14 +99,14 @@ protected boolean compactBatchMessage(String topic, Map lates if (old != null) { mxBean.addCompactionRemovedEvent(topic); } + hasMessagesRetained = true; } else { latestForKey.remove(e.getMiddle()); - deleteCnt++; mxBean.addCompactionRemovedEvent(topic); } } } - if (deleteCnt == numMessagesInBatch) { + if (!hasMessagesRetained) { deletedMessage = true; } } catch (IOException ioe) {