Skip to content

Conversation

coderzc
Copy link
Member

@coderzc coderzc commented Sep 17, 2025

Fixes #xyz

Main Issue: #xyz

PIP: #xyz

Motivation

Since the messages that were previously compacted were not included in the batch extract results, it is inaccurate to determine whether this batchMessage needs to be deleted by counting the number of message deletions.
This will increase the range of messages read in phaseTwo of compaction.

protected boolean compactBatchMessage(String topic, Map<String, MessageId> latestForKey,
RawMessage m, MessageMetadata metadata, MessageId id) {
boolean deletedMessage = false;
try {
int numMessagesInBatch = metadata.getNumMessagesInBatch();
int deleteCnt = 0;
for (ImmutableTriple<MessageId, String, Integer> e : extractIdsAndKeysAndSizeFromBatch(
m, metadata)) {
if (e != null) {
if (e.getMiddle() == null) {
if (!topicCompactionRetainNullKey) {
// record delete null-key message event
deleteCnt++;
mxBean.addCompactionRemovedEvent(topic);
}
continue;
}
if (e.getRight() > 0) {
MessageId old = latestForKey.put(e.getMiddle(), e.getLeft());
if (old != null) {
mxBean.addCompactionRemovedEvent(topic);
}
} else {
latestForKey.remove(e.getMiddle());
deleteCnt++;
mxBean.addCompactionRemovedEvent(topic);
}
}
}
if (deleteCnt == numMessagesInBatch) {
deletedMessage = true;
}
} catch (IOException ioe) {
log.info(
"Error decoding batch for message {}. Whole batch will be included in output",
id, ioe);
}
return deletedMessage;
}

if (!smm.isCompactedOut()) {
messageCompactionDataList.add(new MessageCompactionData(id,
smm.hasPartitionKey() ? smm.getPartitionKey() : null,
smm.hasPayloadSize() ? smm.getPayloadSize() : 0, smm.getEventTime()));

Modifications

Counts whether no messages have been retained instead of message deletion counts.

Verifying this change

  • Make sure that the change passes the CI checks.

(Please pick either of the following options)

This change is a trivial rework / code cleanup without any test coverage.

(or)

This change is already covered by existing tests, such as (please describe tests).

(or)

This change added tests and can be verified as follows:

(example:)

  • Added integration tests for end-to-end deployment with large payloads (10MB)
  • Extended integration test for recovery after broker failure

Does this pull request potentially affect one of the following parts:

If the box was checked, please highlight the changes

  • Dependencies (add or upgrade a dependency)
  • The public API
  • The schema
  • The default values of configurations
  • The threading model
  • The binary protocol
  • The REST endpoints
  • The admin CLI options
  • The metrics
  • Anything that affects deployment

Documentation

  • doc
  • doc-required
  • doc-not-needed
  • doc-complete

Matching PR in forked repository

PR in forked repository:

Copy link

@coderzc Please add the following content to your PR description and select a checkbox:

- [ ] `doc` <!-- Your PR contains doc changes -->
- [ ] `doc-required` <!-- Your PR changes impact docs and you will update later -->
- [ ] `doc-not-needed` <!-- Your PR changes do not impact docs -->
- [ ] `doc-complete` <!-- Docs have been already added -->

@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-label-missing labels Sep 17, 2025
@coderzc coderzc marked this pull request as draft September 17, 2025 03:42
@coderzc coderzc changed the title [fix][broker] Fixed the logic of judging whether batchMessage is deleted in phaseOne of compaction [improve][broker] optimize the logic of judging whether batchMessage is deleted in phaseOne of compaction Sep 17, 2025
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
doc-not-needed Your PR changes do not impact docs
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant