Skip to content

Conversation

apoorvmittal10
Copy link
Contributor

@apoorvmittal10 apoorvmittal10 commented Oct 17, 2025

Updating params for share partition tests for readability.

@github-actions github-actions bot added triage PRs from the community core Kafka Broker tests Test fixes (including flaky tests) KIP-932 Queues for Kafka labels Oct 17, 2025
@apoorvmittal10 apoorvmittal10 added ci-approved and removed triage PRs from the community labels Oct 17, 2025
Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 thanks for this patch


private MemoryRecords createMemoryRecords(long baseOffset, int numRecords) {
try (MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(numRecords, baseOffset)) {
try (MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(baseOffset, numRecords)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public static MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) {
        return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset);
    }

it seems this change is incorrect.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems I missed pushing the changes in file: ShareFetchTestUtils.java. There were other local changes and missed the file. Pushed now.

private MemoryRecords createMemoryRecords(Map<Long, Integer> recordsPerOffset) {
ByteBuffer buffer = ByteBuffer.allocate(1024);
recordsPerOffset.forEach((offset, numOfRecords) -> memoryRecordsBuilder(buffer, numOfRecords, offset).close());
recordsPerOffset.forEach((offset, numOfRecords) -> memoryRecordsBuilder(buffer, offset, numOfRecords).close());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

    public static MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) {
        MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE,
            TimestampType.CREATE_TIME, startOffset, 2);
        for (int i = 0; i < numOfRecords; i++) {
            builder.appendWithOffset(startOffset + i, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes());
        }
        return builder;
    }

ditto

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same, pushed the file.

@apoorvmittal10
Copy link
Contributor Author

@chia7712 Can you please re-review.

Copy link
Member

@chia7712 chia7712 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@apoorvmittal10 thanks for this patch. While this PR does not change every order, that should be fine if all tests pass. However, some comments are now incorrect

memoryRecords(2, 1).records().forEach(builder::append);
}
// Do not include batch from offset 5. And compact batch starting at offset 20.
try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

line#7559 needs to be updated too

fetchAcquiredRecords(sharePartition, memoryRecords(10, 12), 12);

sharePartition.acknowledge(MEMBER_ID, List.of(
new ShareAcknowledgementBatch(5, 11, List.of((byte) 2)),
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check line#7224

MemoryRecords records1 = memoryRecords(3);
String memberId1 = MEMBER_ID;
String memberId2 = "member-2";

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please take a look at line#7176

public void testCanAcquireRecordsReturnsTrue() {
SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build();

assertEquals(0, sharePartition.startOffset());
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

please check line#6769 - it has incorrect comment now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

ci-approved core Kafka Broker KIP-932 Queues for Kafka tests Test fixes (including flaky tests)

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants