From 1ea3cc4a2604ce0b22b292fca558ff16b0fc62a7 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Fri, 17 Oct 2025 18:53:04 +0100 Subject: [PATCH 1/3] MINOR: Correcting the order of params in share fetch/partition tests --- .../server/share/ShareFetchUtilsTest.java | 4 +- .../server/share/SharePartitionTest.java | 552 +++++++++--------- 2 files changed, 278 insertions(+), 278 deletions(-) diff --git a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java index e3a77158dafc4..3f1ae5293d513 100644 --- a/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java +++ b/core/src/test/java/kafka/server/share/ShareFetchUtilsTest.java @@ -636,7 +636,7 @@ public Stream provideArguments(ExtensionContext context) th } private MemoryRecords createMemoryRecords(long baseOffset, int numRecords) { - try (MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(numRecords, baseOffset)) { + try (MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(baseOffset, numRecords)) { return recordsBuilder.build(); } } @@ -657,7 +657,7 @@ public Stream provideArguments(ExtensionContext context) th private MemoryRecords createMemoryRecords(Map recordsPerOffset) { ByteBuffer buffer = ByteBuffer.allocate(1024); - recordsPerOffset.forEach((offset, numOfRecords) -> memoryRecordsBuilder(buffer, numOfRecords, offset).close()); + recordsPerOffset.forEach((offset, numOfRecords) -> memoryRecordsBuilder(buffer, offset, numOfRecords).close()); buffer.flip(); return MemoryRecords.readableRecords(buffer); diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index 25432b4ae15e7..bf868cae92961 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1124,7 +1124,7 @@ public void testMaybeInitializeAndAcquire() { // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. // The records in the batch are from 10 to 49. - MemoryRecords records = memoryRecords(40, 10); + MemoryRecords records = memoryRecords(10, 40); // Set max fetch records to 1, records will be acquired till the first gap is encountered. List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1240,7 +1240,7 @@ public void testMaybeInitializeAndAcquireWithHigherMaxFetchRecords() { // Create a single batch record that covers the entire range from 10 to 30 of initial read gap. // The records in the batch are from 10 to 49. - MemoryRecords records = memoryRecords(40, 10); + MemoryRecords records = memoryRecords(10, 40); // Set max fetch records to 500, all records should be acquired. List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1318,7 +1318,7 @@ public void testMaybeInitializeAndAcquireWithFetchBatchLastOffsetWithinCachedBat // Create a single batch record that ends in between the cached batch and the fetch offset is // post startOffset. - MemoryRecords records = memoryRecords(16, 12); + MemoryRecords records = memoryRecords(12, 16); // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1396,7 +1396,7 @@ public void testMaybeInitializeAndAcquireWithFetchBatchPriorStartOffset() { assertEquals(30L, sharePartition.persisterReadResultGapWindow().endOffset()); // Create a single batch record where first offset is prior startOffset. - MemoryRecords records = memoryRecords(16, 6); + MemoryRecords records = memoryRecords(6, 16); // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, @@ -1466,13 +1466,13 @@ public void testMaybeInitializeAndAcquireWithMultipleBatches() { // Create multiple batch records that covers the entire range from 5 to 30 of initial read gap. // The records in the batch are from 5 to 49. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 2, 5).close(); - memoryRecordsBuilder(buffer, 1, 8).close(); - memoryRecordsBuilder(buffer, 2, 10).close(); - memoryRecordsBuilder(buffer, 6, 13).close(); - memoryRecordsBuilder(buffer, 3, 19).close(); - memoryRecordsBuilder(buffer, 9, 22).close(); - memoryRecordsBuilder(buffer, 19, 31).close(); + memoryRecordsBuilder(buffer, 5, 2).close(); + memoryRecordsBuilder(buffer, 8, 1).close(); + memoryRecordsBuilder(buffer, 10, 2).close(); + memoryRecordsBuilder(buffer, 13, 6).close(); + memoryRecordsBuilder(buffer, 19, 3).close(); + memoryRecordsBuilder(buffer, 22, 9).close(); + memoryRecordsBuilder(buffer, 31, 19).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); // Set max fetch records to 1, records will be acquired till the first gap is encountered. @@ -1631,11 +1631,11 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesAndLastOffsetWithinC // Create multiple batch records that ends in between the cached batch and the fetch offset is // post startOffset. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 2, 7).close(); - memoryRecordsBuilder(buffer, 2, 10).close(); - memoryRecordsBuilder(buffer, 6, 13).close(); + memoryRecordsBuilder(buffer, 7, 2).close(); + memoryRecordsBuilder(buffer, 10, 2).close(); + memoryRecordsBuilder(buffer, 13, 6).close(); // Though 19 offset is a gap but still be acquired. - memoryRecordsBuilder(buffer, 8, 20).close(); + memoryRecordsBuilder(buffer, 20, 8).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. @@ -1787,7 +1787,7 @@ public void testAcquireMultipleRecords() throws InterruptedException { .withState(SharePartitionState.ACTIVE) .withSharePartitionMetrics(sharePartitionMetrics) .build(); - MemoryRecords records = memoryRecords(5, 10); + MemoryRecords records = memoryRecords(10, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3L, 5); @@ -1865,9 +1865,9 @@ public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws Interrupte // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 10).close(); + memoryRecordsBuilder(buffer, 10, 5).close(); memoryRecordsBuilder(buffer, 15, 15).close(); - memoryRecordsBuilder(buffer, 15, 30).close(); + memoryRecordsBuilder(buffer, 30, 15).close(); buffer.flip(); @@ -1904,7 +1904,7 @@ public void testAcquireWithMultipleBatchesAndMaxFetchRecords() throws Interrupte @Test public void testAcquireMultipleRecordsWithOverlapAndNewBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records = memoryRecords(5, 0); + MemoryRecords records = memoryRecords(5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3, 5); @@ -1912,7 +1912,7 @@ public void testAcquireMultipleRecordsWithOverlapAndNewBatch() { assertEquals(5, sharePartition.nextFetchOffset()); // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should be ignored. - records = memoryRecords(10, 0); + records = memoryRecords(10); acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3, 5); assertArrayEquals(expectedAcquiredRecords(memoryRecords(5, 5), 1).toArray(), acquiredRecordsList.toArray()); @@ -1923,7 +1923,7 @@ public void testAcquireMultipleRecordsWithOverlapAndNewBatch() { @Test public void testAcquireSameBatchAgain() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records = memoryRecords(5, 10); + MemoryRecords records = memoryRecords(10, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 3, 5); @@ -1937,7 +1937,7 @@ public void testAcquireSameBatchAgain() { assertEquals(15, sharePartition.nextFetchOffset()); // Send subset of the same batch again, no records should be returned. - MemoryRecords subsetRecords = memoryRecords(2, 10); + MemoryRecords subsetRecords = memoryRecords(10, 2); acquiredRecordsList = fetchAcquiredRecords(sharePartition, subsetRecords, 3, 0); // No records should be returned as the batch is already acquired. @@ -1996,10 +1996,10 @@ public void testAcquireWithBatchSizeAndMultipleBatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Create 4 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 2).close(); - memoryRecordsBuilder(buffer, 5, 10).close(); - memoryRecordsBuilder(buffer, 7, 15).close(); - memoryRecordsBuilder(buffer, 6, 22).close(); + memoryRecordsBuilder(buffer, 2, 5).close(); + memoryRecordsBuilder(buffer, 10, 5).close(); + memoryRecordsBuilder(buffer, 15, 7).close(); + memoryRecordsBuilder(buffer, 22, 6).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -2047,9 +2047,9 @@ public void testAcquireWithBatchSizeAndMaxFetchRecords() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Create 3 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 0).close(); - memoryRecordsBuilder(buffer, 15, 5).close(); - memoryRecordsBuilder(buffer, 15, 20).close(); + memoryRecordsBuilder(buffer, 0, 5).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); + memoryRecordsBuilder(buffer, 20, 15).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( @@ -2087,7 +2087,7 @@ public void testAcquireSingleBatchWithBatchSizeAndEndOffsetLargerThanBatchFirstO SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); sharePartition.updateCacheAndOffsets(8L); - MemoryRecords records = memoryRecords(10, 5); + MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition.acquire( MEMBER_ID, 5 /* Batch size */, @@ -2115,8 +2115,8 @@ public void testAcquireWithBatchSizeAndEndOffsetLargerThanBatchFirstOffset() // Create 2 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 8, 2).close(); - memoryRecordsBuilder(buffer, 7, 10).close(); + memoryRecordsBuilder(buffer, 2, 8).close(); + memoryRecordsBuilder(buffer, 10, 7).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -2157,8 +2157,8 @@ public void testAcquireBatchSkipWithBatchSizeAndEndOffsetLargerThanFirstBatch() // Create 2 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 8, 2).close(); - memoryRecordsBuilder(buffer, 7, 10).close(); + memoryRecordsBuilder(buffer, 2, 8).close(); + memoryRecordsBuilder(buffer, 10, 7).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -2193,7 +2193,7 @@ public void testAcquireWithMaxInFlightRecordsAndTryAcquireNewBatch() { BATCH_SIZE, 500 /* Max fetch records */, DEFAULT_FETCH_OFFSET, - fetchPartitionData(memoryRecords(10, 0), 0), + fetchPartitionData(memoryRecords(10), 0), FETCH_ISOLATION_HWM), 10); // Validate all 10 records will be acquired as the maxInFlightRecords is 20. @@ -2202,10 +2202,10 @@ public void testAcquireWithMaxInFlightRecordsAndTryAcquireNewBatch() { // Create 4 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 10).close(); - memoryRecordsBuilder(buffer, 10, 15).close(); - memoryRecordsBuilder(buffer, 5, 25).close(); - memoryRecordsBuilder(buffer, 2, 30).close(); + memoryRecordsBuilder(buffer, 10, 5).close(); + memoryRecordsBuilder(buffer, 15, 10).close(); + memoryRecordsBuilder(buffer, 25, 5).close(); + memoryRecordsBuilder(buffer, 30, 2).close(); buffer.flip(); @@ -2251,10 +2251,10 @@ public void testAcquireWithMaxInFlightRecordsAndReleaseLastOffset() { // Create 4 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 10).close(); - memoryRecordsBuilder(buffer, 10, 15).close(); - memoryRecordsBuilder(buffer, 5, 25).close(); - memoryRecordsBuilder(buffer, 3, 30).close(); + memoryRecordsBuilder(buffer, 10, 5).close(); + memoryRecordsBuilder(buffer, 15, 10).close(); + memoryRecordsBuilder(buffer, 25, 5).close(); + memoryRecordsBuilder(buffer, 30, 3).close(); buffer.flip(); @@ -2482,7 +2482,7 @@ public void testAcknowledgeSingleRecordBatch() { .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records1 = memoryRecords(1, 0); + MemoryRecords records1 = memoryRecords(1); MemoryRecords records2 = memoryRecords(1, 1); // Another batch is acquired because if there is only 1 batch, and it is acknowledged, the batch will be removed from cachedState @@ -2515,7 +2515,7 @@ public void testAcknowledgeMultipleRecordBatch() { .withReplicaManager(replicaManager) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records = memoryRecords(10, 5); + MemoryRecords records = memoryRecords(5, 10); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 10); assertEquals(1, acquiredRecordsList.size()); @@ -2536,9 +2536,9 @@ public void testAcknowledgeMultipleRecordBatch() { @Test public void testAcknowledgeMultipleRecordBatchWithGapOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 5); // Gap from 15-17 offsets. recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); MemoryRecords records2 = recordsBuilder.build(); @@ -2589,9 +2589,9 @@ public void testAcknowledgeMultipleRecordBatchWithGapOffsets() { @Test public void testAcknowledgeMultipleSubsetRecordBatchWithGapOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -2678,13 +2678,13 @@ public void testAcknowledgeOutOfRangeCachedDataFirstBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // Create data for the batch with offsets 0-4. - MemoryRecords records = memoryRecords(5, 0); + MemoryRecords records = memoryRecords(5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 5); assertEquals(1, acquiredRecordsList.size()); // Create data for the batch with offsets 20-24. - records = memoryRecords(5, 20); + records = memoryRecords(20, 5); acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 5); assertEquals(1, acquiredRecordsList.size()); @@ -2699,7 +2699,7 @@ public void testAcknowledgeOutOfRangeCachedDataFirstBatch() { assertFutureThrows(InvalidRequestException.class, ackResult); // Create data for the batch with offsets 5-10. - records = memoryRecords(6, 5); + records = memoryRecords(5, 6); acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 6); assertEquals(1, acquiredRecordsList.size()); @@ -2776,8 +2776,8 @@ public void testAcknowledgeWhenOffsetNotAcquired() { public void testAcknowledgeRollbackWithFullBatchError() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 5); - MemoryRecords records2 = memoryRecords(5, 10); - MemoryRecords records3 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(10, 5); + MemoryRecords records3 = memoryRecords(15, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 5); assertEquals(1, acquiredRecordsList.size()); @@ -2814,8 +2814,8 @@ public void testAcknowledgeRollbackWithFullBatchError() { public void testAcknowledgeRollbackWithSubsetError() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); MemoryRecords records1 = memoryRecords(5, 5); - MemoryRecords records2 = memoryRecords(5, 10); - MemoryRecords records3 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(10, 5); + MemoryRecords records3 = memoryRecords(15, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 5); assertEquals(1, acquiredRecordsList.size()); @@ -2853,7 +2853,7 @@ public void testAcknowledgeRollbackWithSubsetError() { @Test public void testAcquireReleasedRecord() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records = memoryRecords(5, 10); + MemoryRecords records = memoryRecords(10, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 5); @@ -2889,13 +2889,13 @@ public void testAcquireReleasedRecord() { public void testAcquireReleasedRecordMultipleBatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // First fetch request with 5 records starting from offset 10. - MemoryRecords records1 = memoryRecords(5, 10); + MemoryRecords records1 = memoryRecords(10, 5); // Second fetch request with 5 records starting from offset 15. - MemoryRecords records2 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(15, 5); // Third fetch request with 5 records starting from offset 23, gap of 3 offsets. - MemoryRecords records3 = memoryRecords(5, 23); + MemoryRecords records3 = memoryRecords(23, 5); // Fourth fetch request with 5 records starting from offset 28. - MemoryRecords records4 = memoryRecords(5, 28); + MemoryRecords records4 = memoryRecords(28, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records1, 5); @@ -2975,7 +2975,7 @@ public void testAcquireReleasedRecordMultipleBatches() { assertEquals(15, sharePartition.nextFetchOffset()); // Acquire partial records from batch 2. - MemoryRecords subsetRecords = memoryRecords(2, 17); + MemoryRecords subsetRecords = memoryRecords(17, 2); acquiredRecordsList = fetchAcquiredRecords(sharePartition, subsetRecords, 2); assertArrayEquals(expectedAcquiredRecords(17, 18, 2).toArray(), acquiredRecordsList.toArray()); @@ -2984,7 +2984,7 @@ public void testAcquireReleasedRecordMultipleBatches() { // Acquire partial records from record 4 to further test if the next fetch offset move // accordingly once complete record 2 is also acquired. - subsetRecords = memoryRecords(1, 28); + subsetRecords = memoryRecords(28, 1); acquiredRecordsList = fetchAcquiredRecords(sharePartition, subsetRecords, 1); assertArrayEquals(expectedAcquiredRecords(28, 28, 2).toArray(), acquiredRecordsList.toArray()); @@ -3021,7 +3021,7 @@ public void testAcquireGapAtBeginningAndRecordsFetchedFromGap() { sharePartition.maybeInitialize(); // All records fetched are part of the gap. The gap is from 11 to 20, fetched offsets are 11 to 15. - MemoryRecords records = memoryRecords(5, 11); + MemoryRecords records = memoryRecords(11, 5); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 5); @@ -3060,7 +3060,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightBatches() { sharePartition.maybeInitialize(); // Fetched offsets overlap the inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. - MemoryRecords records = memoryRecords(15, 11); + MemoryRecords records = memoryRecords(11, 15); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 10); @@ -3105,7 +3105,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsOverlapInFlightAvailableBa sharePartition.maybeInitialize(); // Fetched offsets overlap the inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. - MemoryRecords records = memoryRecords(15, 11); + MemoryRecords records = memoryRecords(11, 15); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 15); @@ -3153,7 +3153,7 @@ public void testAcquireWhenCachedStateContainsGapsAndRecordsFetchedFromNonGapOff // Fetched records are part of inFlightBatch 11-20 with state AVAILABLE. Fetched offsets also overlap the // inFlight batches. The gap is from 11 to 20, but fetched records are from 11 to 25. - MemoryRecords records = memoryRecords(15, 11); + MemoryRecords records = memoryRecords(11, 15); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 15); @@ -3197,7 +3197,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsOverlapMultipleInFlightBat sharePartition.maybeInitialize(); - MemoryRecords records = memoryRecords(75, 11); + MemoryRecords records = memoryRecords(11, 75); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 55); @@ -3253,7 +3253,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsEndJustBeforeGap() { sharePartition.maybeInitialize(); - MemoryRecords records = memoryRecords(20, 11); + MemoryRecords records = memoryRecords(11, 20); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 20); @@ -3298,7 +3298,7 @@ public void testAcquireGapAtBeginningAndFetchedRecordsIncludeGapOffsetsAtEnd() { sharePartition.maybeInitialize(); - MemoryRecords records = memoryRecords(65, 11); + MemoryRecords records = memoryRecords(11, 65); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 45); @@ -3350,9 +3350,9 @@ public void testAcquireWhenRecordsFetchedFromGapAndMaxFetchRecordsIsExceeded() { // Creating 3 batches of records with a total of 8 records ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 3, 21).close(); - memoryRecordsBuilder(buffer, 3, 24).close(); - memoryRecordsBuilder(buffer, 2, 27).close(); + memoryRecordsBuilder(buffer, 21, 3).close(); + memoryRecordsBuilder(buffer, 24, 3).close(); + memoryRecordsBuilder(buffer, 27, 2).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3498,8 +3498,8 @@ public void testAcquireWhenRecordsFetchedFromGapAndPartitionContainsNaturalGaps( sharePartition.maybeInitialize(); ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 11, 10).close(); - memoryRecordsBuilder(buffer, 21, 30).close(); + memoryRecordsBuilder(buffer, 10, 11).close(); + memoryRecordsBuilder(buffer, 30, 21).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3548,8 +3548,8 @@ public void testAcquireCachedStateInitialGapMatchesWithActualPartitionGap() { // Creating 2 batches starting from 21, such that there is a natural gap from 11 to 20 ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 15, 21).close(); - memoryRecordsBuilder(buffer, 25, 36).close(); + memoryRecordsBuilder(buffer, 21, 15).close(); + memoryRecordsBuilder(buffer, 36, 25).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3592,8 +3592,8 @@ public void testAcquireCachedStateInitialGapOverlapsWithActualPartitionGap() { // Creating 2 batches starting from 16, such that there is a natural gap from 11 to 15 ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 20, 16).close(); - memoryRecordsBuilder(buffer, 25, 36).close(); + memoryRecordsBuilder(buffer, 16, 20).close(); + memoryRecordsBuilder(buffer, 36, 25).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3764,7 +3764,7 @@ public void testAcquisitionLockForAcquiringMultipleRecords() throws InterruptedE .withState(SharePartitionState.ACTIVE) .withSharePartitionMetrics(sharePartitionMetrics) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); assertEquals(1, sharePartition.timer().size()); assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); @@ -3792,13 +3792,13 @@ public void testAcquisitionLockForAcquiringMultipleRecordsWithOverlapAndNewBatch .withSharePartitionMetrics(sharePartitionMetrics) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); // Add records from 0-9 offsets, 5-9 should be acquired and 0-4 should be ignored. - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 5); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); @@ -3827,7 +3827,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -3842,7 +3842,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx () -> assertionFailedMessage(sharePartition, Map.of(10L, List.of()))); // Acquire the same batch again. - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // Acquisition lock timeout task should be created on re-acquire action. assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); @@ -3853,7 +3853,7 @@ public void testAcquisitionLockForAcquiringSameBatchAgain() throws InterruptedEx public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(1, 0), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(1), 1); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -3885,7 +3885,7 @@ public void testAcquisitionLockOnAcknowledgingSingleRecordBatch() throws Interru public void testAcquisitionLockOnAcknowledgingMultipleRecordBatch() throws InterruptedException { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -3916,13 +3916,13 @@ public void testAcquisitionLockOnAcknowledgingMultipleRecordBatchWithGapOffsets( .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 5); // Gap from 15-17 offsets. recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); MemoryRecords records2 = recordsBuilder.build(); - MemoryRecords records3 = memoryRecords(2, 1); + MemoryRecords records3 = memoryRecords(1, 2); fetchAcquiredRecords(sharePartition, records3, 2); @@ -3972,7 +3972,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(8, 10), 8); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 8), 8); assertNotNull(sharePartition.cachedState().get(10L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -3989,7 +3989,7 @@ public void testAcquisitionLockForAcquiringSubsetBatchAgain() throws Interrupted () -> assertionFailedMessage(sharePartition, Map.of(10L, List.of()))); // Acquire subset of records again. - fetchAcquiredRecords(sharePartition, memoryRecords(3, 12), 3); + fetchAcquiredRecords(sharePartition, memoryRecords(12, 3), 3); // Acquisition lock timeout task should be created only on offsets which have been acquired again. assertNull(sharePartition.cachedState().get(10L).offsetState().get(10L).acquisitionLockTimeoutTask()); @@ -4038,9 +4038,9 @@ public void testAcquisitionLockOnAcknowledgingMultipleSubsetRecordBatchWithGapOf .withDefaultAcquisitionLockTimeoutMs(ACQUISITION_LOCK_TIMEOUT_MS) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -4136,9 +4136,9 @@ public void testAcquisitionLockTimeoutCauseMaxDeliveryCountExceed() throws Inter .withState(SharePartitionState.ACTIVE) .build(); - // Adding memoryRecords(10, 0) in the sharePartition to make sure that SPSO doesn't move forward when delivery count of records2 + // Adding memoryRecords(10) in the sharePartition to make sure that SPSO doesn't move forward when delivery count of records2 // exceed the max delivery count. - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); fetchAcquiredRecords(sharePartition, memoryRecords(10, 10), 10); @@ -4185,7 +4185,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -4201,7 +4201,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForward() throws InterruptedE DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of()))); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(0L).acquisitionLockTimeoutTask()); assertNotNull(sharePartition.cachedState().get(0L).offsetState().get(1L).acquisitionLockTimeoutTask()); @@ -4260,7 +4260,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -4275,7 +4275,7 @@ public void testAcquisitionLockTimeoutCauseSPSOMoveForwardAndClearCachedState() DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of(0L, List.of()))); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); assertNotNull(sharePartition.cachedState().get(0L).batchAcquisitionLockTimeoutTask()); assertEquals(1, sharePartition.timer().size()); @@ -4404,7 +4404,7 @@ public void testAcquisitionLockOnBatchWithWriteShareGroupStateFailure() throws I PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); assertEquals(1, sharePartition.timer().size()); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); @@ -4437,7 +4437,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(6, 5), 6); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6); assertEquals(1, sharePartition.timer().size()); assertNotNull(sharePartition.cachedState().get(5L).batchAcquisitionLockTimeoutTask()); @@ -4479,7 +4479,7 @@ public void testAcquisitionLockOnOffsetWithWriteShareGroupStateFailure() throws public void testReleaseSingleRecordBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(1, 0), 1); + fetchAcquiredRecords(sharePartition, memoryRecords(1), 1); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -4497,7 +4497,7 @@ public void testReleaseSingleRecordBatch() { public void testReleaseMultipleRecordBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -4513,10 +4513,10 @@ public void testReleaseMultipleRecordBatch() { @Test public void testReleaseMultipleAcknowledgedRecordBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records0 = memoryRecords(5, 0); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records0 = memoryRecords(5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecords records2 = memoryRecords(9, 10); + MemoryRecords records2 = memoryRecords(10, 9); fetchAcquiredRecords(sharePartition, records0, 5); fetchAcquiredRecords(sharePartition, records1, 2); @@ -4538,10 +4538,10 @@ public void testReleaseMultipleAcknowledgedRecordBatch() { @Test public void testReleaseAcknowledgedMultipleSubsetRecordBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -4590,9 +4590,9 @@ public void testReleaseAcknowledgedMultipleSubsetRecordBatch() { @Test public void testReleaseAcquiredRecordsWithAnotherMember() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(1, 5); + MemoryRecords records1 = memoryRecords(5, 1); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -4660,9 +4660,9 @@ public void testReleaseAcquiredRecordsWithAnotherMember() { @Test public void testReleaseAcquiredRecordsWithAnotherMemberAndSubsetAcknowledged() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -4776,9 +4776,9 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 0), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(10), 10); - MemoryRecords records2 = memoryRecords(5, 10); + MemoryRecords records2 = memoryRecords(10, 5); fetchAcquiredRecords(sharePartition, records2, 5); sharePartition.acknowledge(MEMBER_ID, List.of( @@ -4803,11 +4803,11 @@ public void testMaxDeliveryCountLimitNotExceededForRecordsSubsetAfterReleaseAcqu .withState(SharePartitionState.ACTIVE) .build(); // First fetch request with 5 records starting from offset 10. - MemoryRecords records1 = memoryRecords(5, 10); + MemoryRecords records1 = memoryRecords(10, 5); // Second fetch request with 5 records starting from offset 15. - MemoryRecords records2 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(15, 5); // third fetch request with 5 records starting from offset20. - MemoryRecords records3 = memoryRecords(5, 20); + MemoryRecords records3 = memoryRecords(20, 5); fetchAcquiredRecords(sharePartition, records1, 5); fetchAcquiredRecords(sharePartition, records2, 5); @@ -4864,11 +4864,11 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { .withState(SharePartitionState.ACTIVE) .build(); // First fetch request with 5 records starting from offset 10. - MemoryRecords records1 = memoryRecords(5, 10); + MemoryRecords records1 = memoryRecords(10, 5); // Second fetch request with 5 records starting from offset 15. - MemoryRecords records2 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(15, 5); // Third fetch request with 5 records starting from offset 20. - MemoryRecords records3 = memoryRecords(5, 20); + MemoryRecords records3 = memoryRecords(20, 5); fetchAcquiredRecords(sharePartition, records1, 5); fetchAcquiredRecords(sharePartition, records2, 5); @@ -4900,7 +4900,7 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetCacheCleared() { public void testReleaseAcquiredRecordsSubsetWithAnotherMember() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 7); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7); sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte) 1)))); @@ -4937,7 +4937,7 @@ public void testReleaseBatchWithWriteShareGroupStateFailure() { PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertTrue(releaseResult.isCompletedExceptionally()); @@ -4965,7 +4965,7 @@ public void testReleaseOffsetWithWriteShareGroupStateFailure() { PartitionFactory.newPartitionErrorData(0, Errors.NONE.code(), Errors.NONE.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(6, 5), 6); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6); sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 9, List.of((byte) 1)))); @@ -5001,7 +5001,7 @@ public void testReleaseOffsetWithWriteShareGroupStateFailure() { public void testAcquisitionLockOnReleasingMultipleRecordBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); CompletableFuture releaseResult = sharePartition.releaseAcquiredRecords(MEMBER_ID); assertNull(releaseResult.join()); @@ -5022,9 +5022,9 @@ public void testAcquisitionLockOnReleasingAcknowledgedMultipleSubsetRecordBatchW SharePartition sharePartition = SharePartitionBuilder.builder() .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(2, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 2); // Gap from 12-13 offsets. recordsBuilder.appendWithOffset(14, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); // Gap for 15 offset. @@ -5109,13 +5109,13 @@ public void testLsoMovementOnInitializationSharePartition() { public void testLsoMovementForArchivingBatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 12), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 17), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 22), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 27), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 32), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(12, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(17, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(22, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(27, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(32, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(2, 6, List.of((byte) 1)), @@ -5164,10 +5164,10 @@ public void testLsoMovementForArchivingAllAvailableBatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. - fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(11, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(21, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(31, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(41, 10), 10); // After the acknowledgements, the state of share partition will be: // 1. 11 -> 20: AVAILABLE @@ -5217,10 +5217,10 @@ public void testLsoMovementForArchivingAllAvailableOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. - fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(11, 10), 10); fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 31), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 41), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(31, 10), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(41, 10), 10); // After the acknowledgements, the share partition state will be: // 1. 11 -> 20: AVAILABLE @@ -5279,8 +5279,8 @@ public void testLsoMovementForArchivingAllAvailableOffsets() { public void testLsoMovementForArchivingOffsets() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(4, 8, List.of((byte) 1)))); @@ -5327,8 +5327,8 @@ public void testLsoMovementForArchivingOffsets() { public void testLsoMovementForArchivingOffsetsWithStartAndEndBatchesNotFullMatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // LSO is at 4. sharePartition.updateCacheAndOffsets(4); @@ -5363,8 +5363,8 @@ public void testLsoMovementForArchivingOffsetsWithStartAndEndBatchesNotFullMatch public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatches() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // LSO is at 4. sharePartition.updateCacheAndOffsets(4); @@ -5399,8 +5399,8 @@ public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatches() { public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostAcceptAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // LSO is at 4. sharePartition.updateCacheAndOffsets(4); @@ -5445,8 +5445,8 @@ public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostA public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostReleaseAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // LSO is at 4. sharePartition.updateCacheAndOffsets(4); @@ -5491,8 +5491,8 @@ public void testLsoMovementForArchivingOffsetsWithStartOffsetNotFullMatchesPostR public void testLsoMovementToEndOffset() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( @@ -5523,8 +5523,8 @@ public void testLsoMovementToEndOffset() { public void testLsoMovementToEndOffsetWhereEndOffsetIsAvailable() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( @@ -5556,8 +5556,8 @@ public void testLsoMovementToEndOffsetWhereEndOffsetIsAvailable() { public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // Acknowledge with RELEASE action. sharePartition.acknowledge(MEMBER_ID, List.of( @@ -5588,8 +5588,8 @@ public void testLsoMovementAheadOfEndOffsetPostAcknowledgement() { public void testLsoMovementAheadOfEndOffset() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // LSO is at 14. sharePartition.updateCacheAndOffsets(14); @@ -5610,11 +5610,11 @@ public void testLsoMovementAheadOfEndOffset() { public void testLsoMovementWithGapsInCachedStateMap() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(5, 2); + MemoryRecords records1 = memoryRecords(2, 5); // Gap of 7-9. - MemoryRecords records2 = memoryRecords(5, 10); + MemoryRecords records2 = memoryRecords(10, 5); // Gap of 15-19. - MemoryRecords records3 = memoryRecords(5, 20); + MemoryRecords records3 = memoryRecords(20, 5); fetchAcquiredRecords(sharePartition, records1, 5); fetchAcquiredRecords(sharePartition, records2, 5); @@ -5642,9 +5642,9 @@ public void testLsoMovementWithGapsInCachedStateMap() { public void testLsoMovementWithGapsInCachedStateMapAndAcknowledgedBatch() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(5, 2); + MemoryRecords records1 = memoryRecords(2, 5); // Gap of 7-9. - MemoryRecords records2 = memoryRecords(5, 10); + MemoryRecords records2 = memoryRecords(10, 5); fetchAcquiredRecords(sharePartition, records1, 5); fetchAcquiredRecords(sharePartition, records2, 5); @@ -5672,9 +5672,9 @@ public void testLsoMovementWithGapsInCachedStateMapAndAcknowledgedBatch() { public void testLsoMovementPostGapsInAcknowledgements() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(2, 5); + MemoryRecords records1 = memoryRecords(5, 2); // Untracked gap of 3 offsets from 7-9. - MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(5, 10); + MemoryRecordsBuilder recordsBuilder = memoryRecordsBuilder(10, 5); // Gap from 15-17 offsets. recordsBuilder.appendWithOffset(18, 0L, TestUtils.randomString(10).getBytes(), TestUtils.randomString(10).getBytes()); MemoryRecords records2 = recordsBuilder.build(); @@ -5718,11 +5718,11 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); - sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM); + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 30), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 35), 5); @@ -5793,7 +5793,7 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToStartOfBat SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // LSO is at 10. sharePartition.updateCacheAndOffsets(10); @@ -5820,7 +5820,7 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovementToMiddleOfBa SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // LSO is at 11. sharePartition.updateCacheAndOffsets(11); @@ -5853,7 +5853,7 @@ public void testReleaseAcquiredRecordsDecreaseDeliveryCount() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(12, 13, List.of((byte) 1)))); @@ -5902,11 +5902,11 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); - sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM); + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 30), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 35), 5); @@ -5976,7 +5976,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToStartOf .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // LSO is at 10. sharePartition.updateCacheAndOffsets(10); @@ -6005,7 +6005,7 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovementToMiddleO .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // LSO is at 11. sharePartition.updateCacheAndOffsets(11); @@ -6083,8 +6083,8 @@ public void testScheduleAcquisitionLockTimeoutValueUpdatesSuccessfully() { public void testAcknowledgeBatchAndOffsetPostLsoMovement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); // LSO is at 12. sharePartition.updateCacheAndOffsets(12); @@ -6138,9 +6138,9 @@ public void testAcknowledgeBatchAndOffsetPostLsoMovement() { public void testAcknowledgeBatchPostLsoMovement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 20), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); // LSO is at 14. sharePartition.updateCacheAndOffsets(14); @@ -6194,7 +6194,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); // LSO is at 7. sharePartition.updateCacheAndOffsets(7); @@ -6216,7 +6216,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledge() throws In DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of())); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); assertEquals(15, sharePartition.nextFetchOffset()); assertEquals(10, sharePartition.startOffset()); @@ -6244,7 +6244,7 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(2, 1), 2); + fetchAcquiredRecords(sharePartition, memoryRecords(1, 2), 2); // LSO is at 3. sharePartition.updateCacheAndOffsets(3); @@ -6521,7 +6521,7 @@ public void testWriteShareGroupStateWithNoOpStatePersister() { public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); + fetchAcquiredRecords(sharePartition, memoryRecords(250), 250); assertFalse(sharePartition.canAcquireRecords()); @@ -6541,7 +6541,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeAccept() { public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); + fetchAcquiredRecords(sharePartition, memoryRecords(250), 250); assertFalse(sharePartition.canAcquireRecords()); @@ -6561,7 +6561,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgeTypeReject() { public void testMaybeUpdateCachedStateWhenAcknowledgeTypeRelease() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(250, 0), 250); + fetchAcquiredRecords(sharePartition, memoryRecords(250), 250); assertFalse(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( @@ -6585,7 +6585,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForBatchS .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 15); assertTrue(sharePartition.canAcquireRecords()); fetchAcquiredRecords(sharePartition, memoryRecords(15, 15), 15); @@ -6610,7 +6610,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsFromBeginningForEntire .withMaxInflightRecords(20) .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 15); assertTrue(sharePartition.canAcquireRecords()); fetchAcquiredRecords(sharePartition, memoryRecords(15, 15), 15); @@ -6636,7 +6636,7 @@ public void testMaybeUpdateCachedStateWhenAcknowledgementsInBetween() { .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 15); assertTrue(sharePartition.canAcquireRecords()); fetchAcquiredRecords(sharePartition, memoryRecords(15, 15), 15); @@ -6665,7 +6665,7 @@ public void testMaybeUpdateCachedStateWhenAllRecordsInCachedStateAreAcknowledged .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 15); assertTrue(sharePartition.canAcquireRecords()); fetchAcquiredRecords(sharePartition, memoryRecords(15, 15), 15); @@ -6687,13 +6687,13 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(20, 0), 20); + fetchAcquiredRecords(sharePartition, memoryRecords(20), 20); assertTrue(sharePartition.canAcquireRecords()); fetchAcquiredRecords(sharePartition, memoryRecords(20, 20), 20); assertTrue(sharePartition.canAcquireRecords()); - fetchAcquiredRecords(sharePartition, memoryRecords(20, 40), 20); + fetchAcquiredRecords(sharePartition, memoryRecords(40, 20), 20); assertTrue(sharePartition.canAcquireRecords()); // First Acknowledgement for the first batch of records 0-19. @@ -6719,7 +6719,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() assertEquals(79, sharePartition.endOffset()); assertEquals(80, sharePartition.nextFetchOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(100, 80), 100); + fetchAcquiredRecords(sharePartition, memoryRecords(80, 100), 100); assertFalse(sharePartition.canAcquireRecords()); // Final Acknowledgement, all records are acknowledged here. @@ -6796,7 +6796,7 @@ public void testCanAcquireRecordsReturnsTrue() { assertEquals(0, sharePartition.startOffset()); assertEquals(0, sharePartition.endOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(150, 0), 150); + fetchAcquiredRecords(sharePartition, memoryRecords(150), 150); assertTrue(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); @@ -6810,10 +6810,10 @@ public void testCanAcquireRecordsChangeResponsePostAcknowledgement() { assertEquals(0, sharePartition.startOffset()); assertEquals(0, sharePartition.endOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(150, 0), 150); + fetchAcquiredRecords(sharePartition, memoryRecords(150), 150); assertTrue(sharePartition.canAcquireRecords()); - fetchAcquiredRecords(sharePartition, memoryRecords(100, 150), 100); + fetchAcquiredRecords(sharePartition, memoryRecords(150, 100), 100); assertFalse(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(249, sharePartition.endOffset()); @@ -6830,12 +6830,12 @@ public void testCanAcquireRecordsChangeResponsePostAcknowledgement() { public void testCanAcquireRecordsAfterReleaseAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(150, 0), 150); + fetchAcquiredRecords(sharePartition, memoryRecords(150), 150); assertTrue(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(149, sharePartition.endOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(100, 150), 100); + fetchAcquiredRecords(sharePartition, memoryRecords(150, 100), 100); assertFalse(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(249, sharePartition.endOffset()); @@ -6854,12 +6854,12 @@ public void testCanAcquireRecordsAfterReleaseAcknowledgement() { public void testCanAcquireRecordsAfterArchiveAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(150, 0), 150); + fetchAcquiredRecords(sharePartition, memoryRecords(150), 150); assertTrue(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(149, sharePartition.endOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(100, 150), 100); + fetchAcquiredRecords(sharePartition, memoryRecords(150, 100), 100); assertFalse(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(249, sharePartition.endOffset()); @@ -6877,12 +6877,12 @@ public void testCanAcquireRecordsAfterArchiveAcknowledgement() { public void testCanAcquireRecordsAfterAcceptAcknowledgement() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(150, 0), 150); + fetchAcquiredRecords(sharePartition, memoryRecords(150), 150); assertTrue(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(149, sharePartition.endOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(100, 150), 100); + fetchAcquiredRecords(sharePartition, memoryRecords(150, 100), 100); assertFalse(sharePartition.canAcquireRecords()); assertEquals(0, sharePartition.startOffset()); assertEquals(249, sharePartition.endOffset()); @@ -6912,7 +6912,7 @@ public void testAcknowledgeBatchWithWriteShareGroupStateFailure() { PartitionFactory.newPartitionErrorData(0, Errors.UNKNOWN_TOPIC_OR_PARTITION.code(), Errors.UNKNOWN_TOPIC_OR_PARTITION.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 10); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 14, List.of((byte) 1)))); @@ -6941,7 +6941,7 @@ public void testAcknowledgeOffsetWithWriteShareGroupStateFailure() { PartitionFactory.newPartitionErrorData(0, Errors.GROUP_ID_NOT_FOUND.code(), Errors.GROUP_ID_NOT_FOUND.message()))))); Mockito.when(persister.writeState(Mockito.any())).thenReturn(CompletableFuture.completedFuture(writeShareGroupStateResult)); - fetchAcquiredRecords(sharePartition, memoryRecords(6, 5), 6); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 6), 6); CompletableFuture ackResult = sharePartition.acknowledge( MEMBER_ID, List.of(new ShareAcknowledgementBatch(8, 10, List.of((byte) 3)))); @@ -6968,7 +6968,7 @@ public void testAcknowledgeOffsetWithWriteShareGroupStateFailure() { public void testAcknowledgeSubsetWithAnotherMember() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 7); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 7); sharePartition.acknowledge(MEMBER_ID, List.of(new ShareAcknowledgementBatch(5, 7, List.of((byte) 1)))); @@ -6985,9 +6985,9 @@ public void testAcknowledgeWithAnotherMemberRollbackBatchError() { fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, fetchPartitionData(memoryRecords(5, 10)), FETCH_ISOLATION_HWM); + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, fetchPartitionData(memoryRecords(10, 5)), FETCH_ISOLATION_HWM); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 15), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(15, 5), 5); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), @@ -7015,8 +7015,8 @@ public void testAcknowledgeWithAnotherMemberRollbackSubsetError() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 10), 5); - sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(5, 15)), FETCH_ISOLATION_HWM); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 5), 5); + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); CompletableFuture ackResult = sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(5, 9, List.of((byte) 2)), @@ -7045,7 +7045,7 @@ public void testMaxDeliveryCountLimitExceededForRecordBatch() { .withMaxDeliveryCount(2) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records = memoryRecords(10, 5); + MemoryRecords records = memoryRecords(5, 10); fetchAcquiredRecords(sharePartition, records, 10); sharePartition.acknowledge(MEMBER_ID, List.of( @@ -7069,9 +7069,9 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubset() { .withState(SharePartitionState.ACTIVE) .build(); // First fetch request with 5 records starting from offset 10. - MemoryRecords records1 = memoryRecords(5, 10); + MemoryRecords records1 = memoryRecords(10, 5); // Second fetch request with 5 records starting from offset 15. - MemoryRecords records2 = memoryRecords(5, 15); + MemoryRecords records2 = memoryRecords(15, 5); fetchAcquiredRecords(sharePartition, records1, 5); fetchAcquiredRecords(sharePartition, records2, 5); @@ -7103,14 +7103,14 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetAndCachedStateNotCl .withState(SharePartitionState.ACTIVE) .build(); // First fetch request with 5 records starting from offset 0. - MemoryRecords records1 = memoryRecords(5, 0); + MemoryRecords records1 = memoryRecords(5); fetchAcquiredRecords(sharePartition, records1, 5); sharePartition.acknowledge(MEMBER_ID, new ArrayList<>(List.of( new ShareAcknowledgementBatch(0, 1, List.of((byte) 2))))); // Send next batch from offset 0, only 2 records should be acquired. - fetchAcquiredRecords(sharePartition, memoryRecords(2, 0), 2); + fetchAcquiredRecords(sharePartition, memoryRecords(2), 2); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(0, 4, List.of((byte) 2)))); @@ -7129,7 +7129,7 @@ public void testMaxDeliveryCountLimitExceededForRecordsSubsetAndCachedStateNotCl @Test public void testNextFetchOffsetPostAcquireAndAcknowledgeFunctionality() { SharePartition sharePartition = SharePartitionBuilder.builder().withState(SharePartitionState.ACTIVE).build(); - MemoryRecords records1 = memoryRecords(10, 0); + MemoryRecords records1 = memoryRecords(10); String memberId1 = "memberId-1"; String memberId2 = "memberId-2"; @@ -7162,7 +7162,7 @@ public void testNextFetchOffsetWithMultipleConsumers() { .withMaxInflightRecords(100) .withState(SharePartitionState.ACTIVE) .build(); - MemoryRecords records1 = memoryRecords(3, 0); + MemoryRecords records1 = memoryRecords(3); String memberId1 = MEMBER_ID; String memberId2 = "member-2"; @@ -7190,7 +7190,7 @@ public void testNumberOfWriteCallsOnUpdates() { .withState(SharePartitionState.ACTIVE) .build()); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(2, 6, List.of((byte) 1)))); // Acknowledge records will induce 1 write state RPC call via function isWriteShareGroupStateSuccessful. @@ -7208,7 +7208,7 @@ public void testReacquireSubsetWithAnotherMember() { MemoryRecords records1 = memoryRecords(5, 5); fetchAcquiredRecords(sharePartition, records1, 5); - fetchAcquiredRecords(sharePartition, memoryRecords(12, 10), 12); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 12), 12); sharePartition.acknowledge(MEMBER_ID, List.of( new ShareAcknowledgementBatch(5, 11, List.of((byte) 2)), @@ -7295,7 +7295,7 @@ public void testAcquireWithWriteShareGroupStateDelay() { // persister.writeState RPC will not complete instantaneously due to which commit won't happen for acknowledged offsets. Mockito.when(persister.writeState(Mockito.any())).thenReturn(future); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); List acknowledgementBatches = new ArrayList<>(); @@ -7314,7 +7314,7 @@ public void testAcquireWithWriteShareGroupStateDelay() { // Even though offsets 2-3, 5-9 are in available state, but they won't be acquired since they are still in transition from ACQUIRED // to AVAILABLE state as the write state RPC has not completed yet, so the commit hasn't happened yet. - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 5); assertEquals(3, sharePartition.cachedState().size()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(0L).state()); @@ -7328,7 +7328,7 @@ public void testAcquireWithWriteShareGroupStateDelay() { // persister.writeState RPC will complete now. This is going to commit all the acknowledged batches. Hence, their // rollBack state will become null and they will be available for acquire again. future.complete(writeShareGroupStateResult); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 0), 7); + fetchAcquiredRecords(sharePartition, memoryRecords(15), 7); assertEquals(3, sharePartition.cachedState().size()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(0L).state()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(0L).offsetState().get(1L).state()); @@ -7381,7 +7381,7 @@ public void testCacheUpdateWhenBatchHasOngoingTransition() { // Acquire a single batch. fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, - fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(21, 10)), FETCH_ISOLATION_HWM ), 10 ); @@ -7425,7 +7425,7 @@ public void testCacheUpdateWhenOffsetStateHasOngoingTransition() { // Acquire a single batch. fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, - fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(21, 10)), FETCH_ISOLATION_HWM ), 10 ); @@ -7482,9 +7482,9 @@ public void testAcquireAndAcknowledgeWithRecordsAheadOfRecordBatchStartOffset() try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 5, 2)) { // Append records from offset 10. - memoryRecords(2, 10).records().forEach(builder::append); + memoryRecords(10, 2).records().forEach(builder::append); // Append records from offset 15. - memoryRecords(2, 15).records().forEach(builder::append); + memoryRecords(15, 2).records().forEach(builder::append); } buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -7535,17 +7535,17 @@ public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { // Create 3 batches of records for a single acquire. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 0).close(); - memoryRecordsBuilder(buffer, 15, 5).close(); - memoryRecordsBuilder(buffer, 15, 20).close(); + memoryRecordsBuilder(buffer, 0, 5).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); + memoryRecordsBuilder(buffer, 20, 15).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); // Acquire batch (0-34) which shall create single cache entry. fetchAcquiredRecords(sharePartition, records, 35); // Acquire another 3 individual batches of records. - fetchAcquiredRecords(sharePartition, memoryRecords(5, 40), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 45), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 50), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(40, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(45, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(50, 15), 15); // Release all batches in the cache. sharePartition.releaseAcquiredRecords(MEMBER_ID); // Validate cache has 4 entries. @@ -7562,9 +7562,9 @@ public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 20, 2)) { // Append 2 records for 20 offset batch starting from offset 20. - memoryRecords(2, 20).records().forEach(builder::append); + memoryRecords(20, 2).records().forEach(builder::append); // And append 2 records matching the end offset of the batch. - memoryRecords(2, 33).records().forEach(builder::append); + memoryRecords(33, 2).records().forEach(builder::append); } // Send the full batch at offset 40. memoryRecordsBuilder(buffer, 5, 40).close(); @@ -7572,11 +7572,11 @@ public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 50, 2)) { // Append 5 records for 50 offset batch starting from offset 51. - memoryRecords(5, 51).records().forEach(builder::append); + memoryRecords(51, 5).records().forEach(builder::append); // Append 2 records for in middle of the batch. - memoryRecords(2, 58).records().forEach(builder::append); + memoryRecords(58, 2).records().forEach(builder::append); // And append 1 record prior to the end offset. - memoryRecords(1, 63).records().forEach(builder::append); + memoryRecords(63, 1).records().forEach(builder::append); } buffer.flip(); records = MemoryRecords.readableRecords(buffer); @@ -7626,9 +7626,9 @@ public void testAcquireWhenBatchesRemovedForFetchOffset() { .withState(SharePartitionState.ACTIVE) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 0), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(5), 5); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 15), 15); // Release the batches in the cache. sharePartition.releaseAcquiredRecords(MEMBER_ID); // Validate cache has 3 entries. @@ -7680,7 +7680,7 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch() { .build(); fetchAcquiredRecords(sharePartition, memoryRecords(5, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 15); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 15), 15); // Acknowledge subset of the first batch offsets. sharePartition.acknowledge(MEMBER_ID, List.of( // Accept the 3 offsets of first batch. @@ -7691,7 +7691,7 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetWithinBatch() { assertEquals(2, sharePartition.cachedState().size()); // Mark fetch offset within the first batch to 8, first available offset. - fetchAcquiredRecords(sharePartition, memoryRecords(15, 10), 8, 0, 15); + fetchAcquiredRecords(sharePartition, memoryRecords(10, 15), 8, 0, 15); assertEquals(25, sharePartition.nextFetchOffset()); // The next fetch offset has been updated, but the start offset should remain unchanged since // the acquire operation only marks offsets as archived. The start offset will be correctly @@ -7722,9 +7722,9 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() { // Create 3 batches of records for a single acquire. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 0).close(); - memoryRecordsBuilder(buffer, 15, 5).close(); - memoryRecordsBuilder(buffer, 15, 20).close(); + memoryRecordsBuilder(buffer, 0, 5).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); + memoryRecordsBuilder(buffer, 20, 15).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -7802,8 +7802,8 @@ public void testFilterRecordBatchesFromAcquiredRecords() { new AcquiredRecords().setFirstOffset(20).setLastOffset(25).setDeliveryCount((short) 1) ); List recordBatches1 = List.of( - memoryRecordsBuilder(3, 2).build().batches().iterator().next(), - memoryRecordsBuilder(3, 12).build().batches().iterator().next() + memoryRecordsBuilder(2, 3).build().batches().iterator().next(), + memoryRecordsBuilder(12, 3).build().batches().iterator().next() ); assertEquals( List.of( @@ -7821,8 +7821,8 @@ public void testFilterRecordBatchesFromAcquiredRecords() { new AcquiredRecords().setFirstOffset(31).setLastOffset(40).setDeliveryCount((short) 3) ); List recordBatches2 = List.of( - memoryRecordsBuilder(21, 5).build().batches().iterator().next(), - memoryRecordsBuilder(5, 31).build().batches().iterator().next() + memoryRecordsBuilder(5, 21).build().batches().iterator().next(), + memoryRecordsBuilder(31, 5).build().batches().iterator().next() ); assertEquals( List.of( @@ -7840,8 +7840,8 @@ public void testFilterRecordBatchesFromAcquiredRecords() { new AcquiredRecords().setFirstOffset(0).setLastOffset(19).setDeliveryCount((short) 1) ); List recordBatches3 = List.of( - memoryRecordsBuilder(1, 8).build().batches().iterator().next(), - memoryRecordsBuilder(1, 18).build().batches().iterator().next() + memoryRecordsBuilder(8, 1).build().batches().iterator().next(), + memoryRecordsBuilder(18, 1).build().batches().iterator().next() ); assertEquals( @@ -7861,12 +7861,12 @@ public void testAcquireWithReadCommittedIsolationLevel() { .build()); ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 10).close(); - memoryRecordsBuilder(buffer, 5, 15).close(); - memoryRecordsBuilder(buffer, 15, 20).close(); - memoryRecordsBuilder(buffer, 8, 50).close(); - memoryRecordsBuilder(buffer, 10, 58).close(); - memoryRecordsBuilder(buffer, 5, 70).close(); + memoryRecordsBuilder(buffer, 10, 5).close(); + memoryRecordsBuilder(buffer, 15, 5).close(); + memoryRecordsBuilder(buffer, 20, 15).close(); + memoryRecordsBuilder(buffer, 50, 8).close(); + memoryRecordsBuilder(buffer, 58, 10).close(); + memoryRecordsBuilder(buffer, 70, 5).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -7876,9 +7876,9 @@ public void testAcquireWithReadCommittedIsolationLevel() { // We won't be utilizing the aborted transactions passed in fetchPartitionData. when(sharePartition.fetchAbortedTransactionRecordBatches(fetchPartitionData.records.batches(), fetchPartitionData.abortedTransactions.get())).thenReturn( List.of( - memoryRecordsBuilder(5, 10).build().batches().iterator().next(), - memoryRecordsBuilder(10, 58).build().batches().iterator().next(), - memoryRecordsBuilder(5, 70).build().batches().iterator().next() + memoryRecordsBuilder(10, 5).build().batches().iterator().next(), + memoryRecordsBuilder(58, 10).build().batches().iterator().next(), + memoryRecordsBuilder(70, 5).build().batches().iterator().next() ) ); @@ -8199,7 +8199,7 @@ public void testAcquireWhenBatchHasOngoingTransition() { // Acquire a single batch with member-1. fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 21, - fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(21, 10)), FETCH_ISOLATION_HWM ), 10 ); @@ -8222,7 +8222,7 @@ public void testAcquireWhenBatchHasOngoingTransition() { // transition for this batch. fetchAcquiredRecords( sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, - fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(21, 10)), FETCH_ISOLATION_HWM ), 0 ); @@ -8239,7 +8239,7 @@ public void testAcquireWhenBatchHasOngoingTransition() { // Acquire the same batch with member-2. 10 records will be acquired. fetchAcquiredRecords( sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 21, - fetchPartitionData(memoryRecords(10, 21)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(21, 10)), FETCH_ISOLATION_HWM ), 10 ); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(21L).batchState()); @@ -8258,7 +8258,7 @@ public void testNextFetchOffsetWhenBatchHasOngoingTransition() { // Acquire a single batch 0-9 with member-1. fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, - fetchPartitionData(memoryRecords(10, 0)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(10)), FETCH_ISOLATION_HWM ), 10 ); @@ -8316,7 +8316,7 @@ public void testNextFetchOffsetWhenOffsetsHaveOngoingTransition() { // Acquire a single batch 0-50 with member-1. fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, - fetchPartitionData(memoryRecords(50, 0)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(50)), FETCH_ISOLATION_HWM ), 50 ); @@ -8370,8 +8370,8 @@ public void testAcquisitionLockTimeoutWithConcurrentAcknowledgement() throws Int // Create 2 batches of records. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 5, 0).close(); - memoryRecordsBuilder(buffer, 15, 5).close(); + memoryRecordsBuilder(buffer, 0, 5).close(); + memoryRecordsBuilder(buffer, 5, 15).close(); buffer.flip(); @@ -8472,8 +8472,8 @@ public void testLsoMovementWithWriteStateRPCFailuresInAcknowledgement() { .withPersister(persister) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // Validate that there is no ongoing transition. assertFalse(sharePartition.cachedState().get(2L).batchHasOngoingStateTransition()); @@ -8546,7 +8546,7 @@ public void testAcquisitionLockTimeoutWithWriteStateRPCFailure() throws Interrup fetchAcquiredRecords( sharePartition.acquire(MEMBER_ID, BATCH_SIZE, MAX_FETCH_RECORDS, 0, - fetchPartitionData(memoryRecords(2, 0)), FETCH_ISOLATION_HWM + fetchPartitionData(memoryRecords(2)), FETCH_ISOLATION_HWM ), 2 ); @@ -8609,8 +8609,8 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep .withPersister(persister) .build(); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); // Futures which will be completed later, so the batch state has ongoing transition. CompletableFuture future1 = new CompletableFuture<>(); @@ -8659,8 +8659,8 @@ public void testRecordArchivedWithWriteStateRPCFailure() throws InterruptedExcep // Acquisition lock timeout task has run already and next fetch offset is moved to 2. assertEquals(2, sharePartition.nextFetchOffset()); // Send the same batches again. - fetchAcquiredRecords(sharePartition, memoryRecords(5, 2), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 7), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(2, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(7, 5), 5); future1 = new CompletableFuture<>(); future2 = new CompletableFuture<>(); @@ -8789,11 +8789,11 @@ private List fetchAcquiredRecords(ShareAcquiredRecords shareAcq } private MemoryRecords memoryRecords(int numOfRecords) { - return memoryRecords(numOfRecords, 0); + return memoryRecords(0, numOfRecords); } - private MemoryRecords memoryRecords(int numOfRecords, long startOffset) { - try (MemoryRecordsBuilder builder = memoryRecordsBuilder(numOfRecords, startOffset)) { + private MemoryRecords memoryRecords(long startOffset, int numOfRecords) { + try (MemoryRecordsBuilder builder = memoryRecordsBuilder(startOffset, numOfRecords)) { return builder.build(); } } From 81d0f96f57641e13266ca8e6f265608c854d4ba9 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Fri, 17 Oct 2025 19:33:45 +0100 Subject: [PATCH 2/3] Adding missed file --- .../server/share/fetch/ShareFetchTestUtils.java | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java index db3aa45d6876a..4764347d3b998 100644 --- a/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java +++ b/server/src/test/java/org/apache/kafka/server/share/fetch/ShareFetchTestUtils.java @@ -79,7 +79,7 @@ public static void validateRotatedListEquals( public static FileRecords createFileRecords(Map recordsPerOffset) throws IOException { FileRecords fileRecords = FileRecords.open(tempFile()); for (Entry entry : recordsPerOffset.entrySet()) { - try (MemoryRecordsBuilder records = memoryRecordsBuilder(entry.getValue(), entry.getKey())) { + try (MemoryRecordsBuilder records = memoryRecordsBuilder(entry.getKey(), entry.getValue())) { fileRecords.append(records.build()); } } @@ -89,23 +89,23 @@ public static FileRecords createFileRecords(Map recordsPerOffset) /** * Create a memory records builder with the given number of records and start offset. * - * @param numOfRecords The number of records to create. * @param startOffset The start offset of the records. + * @param numOfRecords The number of records to create. * @return The memory records builder. */ - public static MemoryRecordsBuilder memoryRecordsBuilder(int numOfRecords, long startOffset) { - return memoryRecordsBuilder(ByteBuffer.allocate(1024), numOfRecords, startOffset); + public static MemoryRecordsBuilder memoryRecordsBuilder(long startOffset, int numOfRecords) { + return memoryRecordsBuilder(ByteBuffer.allocate(1024), startOffset, numOfRecords); } /** * Create a memory records builder with the number of records and start offset, in the given buffer. * * @param buffer The buffer to write the records to. - * @param numOfRecords The number of records to create. * @param startOffset The start offset of the records. + * @param numOfRecords The number of records to create. * @return The memory records builder. */ - public static MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, int numOfRecords, long startOffset) { + public static MemoryRecordsBuilder memoryRecordsBuilder(ByteBuffer buffer, long startOffset, int numOfRecords) { MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, startOffset, 2); for (int i = 0; i < numOfRecords; i++) { From f185383f4ed2e354cf864104aefa37b85c1eef83 Mon Sep 17 00:00:00 2001 From: Apoorv Mittal Date: Mon, 20 Oct 2025 13:31:49 +0100 Subject: [PATCH 3/3] Correcting additional params --- .../server/share/SharePartitionTest.java | 64 +++++++++---------- 1 file changed, 32 insertions(+), 32 deletions(-) diff --git a/core/src/test/java/kafka/server/share/SharePartitionTest.java b/core/src/test/java/kafka/server/share/SharePartitionTest.java index bf868cae92961..b3068abbedaa0 100644 --- a/core/src/test/java/kafka/server/share/SharePartitionTest.java +++ b/core/src/test/java/kafka/server/share/SharePartitionTest.java @@ -1716,10 +1716,10 @@ public void testMaybeInitializeAndAcquireWithMultipleBatchesPriorStartOffset() { // Create multiple batch records where multiple batches base offsets are prior startOffset. ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 2, 3).close(); - memoryRecordsBuilder(buffer, 1, 6).close(); - memoryRecordsBuilder(buffer, 4, 8).close(); - memoryRecordsBuilder(buffer, 10, 13).close(); + memoryRecordsBuilder(buffer, 3, 2).close(); + memoryRecordsBuilder(buffer, 6, 1).close(); + memoryRecordsBuilder(buffer, 8, 4).close(); + memoryRecordsBuilder(buffer, 13, 10).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); // Set max fetch records to 500, records should be acquired till the last offset of the fetched batch. @@ -2234,7 +2234,7 @@ public void testAcquireWithMaxInFlightRecordsAndTryAcquireNewBatch() { BATCH_SIZE, 500 /* Max fetch records */, 25 /* Fetch Offset */, - fetchPartitionData(memoryRecords(10, 25), 10), + fetchPartitionData(memoryRecords(25, 10), 10), FETCH_ISOLATION_HWM), 0); @@ -3401,8 +3401,8 @@ public void testAcquireMaxFetchRecordsExceededAfterAcquiringGaps() { // Creating 3 batches of records with a total of 8 records ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 10, 11).close(); - memoryRecordsBuilder(buffer, 10, 21).close(); + memoryRecordsBuilder(buffer, 11, 10).close(); + memoryRecordsBuilder(buffer, 21, 10).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3450,8 +3450,8 @@ public void testAcquireMaxFetchRecordsExceededBeforeAcquiringGaps() { // Creating 3 batches of records with a total of 8 records ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 10, 11).close(); - memoryRecordsBuilder(buffer, 20, 21).close(); + memoryRecordsBuilder(buffer, 11, 10).close(); + memoryRecordsBuilder(buffer, 21, 20).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3638,9 +3638,9 @@ public void testAcquireCachedStateGapInBetweenOverlapsWithActualPartitionGap() { // Creating 3 batches starting from 11, such that there is a natural gap from 26 to 30 ByteBuffer buffer = ByteBuffer.allocate(4096); - memoryRecordsBuilder(buffer, 10, 11).close(); - memoryRecordsBuilder(buffer, 15, 21).close(); - memoryRecordsBuilder(buffer, 20, 41).close(); + memoryRecordsBuilder(buffer, 11, 10).close(); + memoryRecordsBuilder(buffer, 21, 15).close(); + memoryRecordsBuilder(buffer, 41, 20).close(); buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -3692,7 +3692,7 @@ public void testAcquireWhenRecordsFetchedAfterGapsAreFetched() { sharePartition.maybeInitialize(); // Fetched records are from 21 to 35 - MemoryRecords records = memoryRecords(15, 21); + MemoryRecords records = memoryRecords(21, 15); List acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 10); // Since the gap if only from 21 to 30 and the next batch is ARCHIVED, only 10 gap offsets will be acquired as a single batch @@ -3712,7 +3712,7 @@ public void testAcquireWhenRecordsFetchedAfterGapsAreFetched() { assertEquals(40, persisterReadResultGapWindow.endOffset()); // Fetching from the nextFetchOffset so that endOffset moves ahead - records = memoryRecords(15, 41); + records = memoryRecords(41, 15); acquiredRecordsList = fetchAcquiredRecords(sharePartition, records, 15); @@ -5218,7 +5218,7 @@ public void testLsoMovementForArchivingAllAvailableOffsets() { // A client acquires 4 batches, 11 -> 20, 21 -> 30, 31 -> 40, 41 -> 50. fetchAcquiredRecords(sharePartition, memoryRecords(11, 10), 10); - fetchAcquiredRecords(sharePartition, memoryRecords(10, 21), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(21, 10), 10); fetchAcquiredRecords(sharePartition, memoryRecords(31, 10), 10); fetchAcquiredRecords(sharePartition, memoryRecords(41, 10), 10); @@ -5723,9 +5723,9 @@ public void testReleaseAcquiredRecordsBatchesPostStartOffsetMovement() { sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 30), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 35), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(25, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(30, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(35, 5), 5); // Acknowledge records. sharePartition.acknowledge(MEMBER_ID, List.of( @@ -5907,9 +5907,9 @@ public void testAcquisitionLockTimeoutForBatchesPostStartOffsetMovement() throws sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 15, fetchPartitionData(memoryRecords(15, 5)), FETCH_ISOLATION_HWM); fetchAcquiredRecords(sharePartition, memoryRecords(20, 5), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 25), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 30), 5); - fetchAcquiredRecords(sharePartition, memoryRecords(5, 35), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(25, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(30, 5), 5); + fetchAcquiredRecords(sharePartition, memoryRecords(35, 5), 5); // Acknowledge records. sharePartition.acknowledge(MEMBER_ID, List.of( @@ -6266,8 +6266,8 @@ public void testLsoMovementThenAcquisitionLockTimeoutThenAcknowledgeBatchLastOff DEFAULT_MAX_WAIT_ACQUISITION_LOCK_TIMEOUT_MS, () -> assertionFailedMessage(sharePartition, Map.of())); - fetchAcquiredRecords(sharePartition, memoryRecords(2, 3), 2); - fetchAcquiredRecords(sharePartition, memoryRecords(3, 5), 3); + fetchAcquiredRecords(sharePartition, memoryRecords(3, 2), 2); + fetchAcquiredRecords(sharePartition, memoryRecords(5, 3), 3); assertEquals(8, sharePartition.nextFetchOffset()); assertEquals(3, sharePartition.startOffset()); @@ -6705,7 +6705,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() assertEquals(59, sharePartition.endOffset()); assertEquals(60, sharePartition.nextFetchOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(20, 60), 20); + fetchAcquiredRecords(sharePartition, memoryRecords(60, 20), 20); assertTrue(sharePartition.canAcquireRecords()); sharePartition.acknowledge(MEMBER_ID, List.of( @@ -6732,7 +6732,7 @@ public void testMaybeUpdateCachedStateMultipleAcquisitionsAndAcknowledgements() assertEquals(180, sharePartition.endOffset()); assertEquals(180, sharePartition.nextFetchOffset()); - fetchAcquiredRecords(sharePartition, memoryRecords(20, 180), 20); + fetchAcquiredRecords(sharePartition, memoryRecords(180, 20), 20); assertEquals(1, sharePartition.cachedState().size()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(180L).batchState()); @@ -6766,7 +6766,7 @@ public void testMaybeUpdateCachedStateGapAfterLastOffsetAcknowledged() { sharePartition.maybeInitialize(); // Acquiring the first AVAILABLE batch from 11 to 20 - fetchAcquiredRecords(sharePartition, memoryRecords(10, 11), 10); + fetchAcquiredRecords(sharePartition, memoryRecords(11, 10), 10); assertTrue(sharePartition.canAcquireRecords()); // Sending acknowledgement for the first batch from 11 to 20 @@ -7173,7 +7173,7 @@ public void testNextFetchOffsetWithMultipleConsumers() { new ShareAcknowledgementBatch(0, 2, List.of((byte) 2)))); assertEquals(0, sharePartition.nextFetchOffset()); - sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3, fetchPartitionData(memoryRecords(2, 3)), FETCH_ISOLATION_HWM); + sharePartition.acquire(memberId2, BATCH_SIZE, MAX_FETCH_RECORDS, 3, fetchPartitionData(memoryRecords(3, 2)), FETCH_ISOLATION_HWM); assertEquals(0, sharePartition.nextFetchOffset()); sharePartition.acquire(memberId1, BATCH_SIZE, MAX_FETCH_RECORDS, DEFAULT_FETCH_OFFSET, fetchPartitionData(records1), FETCH_ISOLATION_HWM); @@ -7221,7 +7221,7 @@ public void testReacquireSubsetWithAnotherMember() { assertEquals(10, sharePartition.nextFetchOffset()); // Reacquire with another member. - sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, fetchPartitionData(memoryRecords(7, 10)), FETCH_ISOLATION_HWM); + sharePartition.acquire("member-2", BATCH_SIZE, MAX_FETCH_RECORDS, 10, fetchPartitionData(memoryRecords(10, 7)), FETCH_ISOLATION_HWM); assertEquals(17, sharePartition.nextFetchOffset()); assertEquals(RecordState.ACQUIRED, sharePartition.cachedState().get(5L).batchState()); @@ -7556,7 +7556,7 @@ public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 0, 2)) { // Append only 2 records for 0 offset batch starting from offset 1. - memoryRecords(2, 1).records().forEach(builder::append); + memoryRecords(1, 2).records().forEach(builder::append); } // Do not include batch from offset 5. And compact batch starting at offset 20. try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, @@ -7567,7 +7567,7 @@ public void testAcquireWhenBatchesAreRemovedFromBetweenInSubsequentFetchData() { memoryRecords(33, 2).records().forEach(builder::append); } // Send the full batch at offset 40. - memoryRecordsBuilder(buffer, 5, 40).close(); + memoryRecordsBuilder(buffer, 40, 5).close(); // Do not include batch from offset 45. And compact the batch at offset 50. try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 50, 2)) { @@ -7639,7 +7639,7 @@ public void testAcquireWhenBatchesRemovedForFetchOffset() { try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 5, 2)) { // Append only 4 records for 5th offset batch starting from offset 6. - memoryRecords(4, 6).records().forEach(builder::append); + memoryRecords(6, 4).records().forEach(builder::append); } buffer.flip(); MemoryRecords records = MemoryRecords.readableRecords(buffer); @@ -7740,7 +7740,7 @@ public void testAcquireWhenBatchesRemovedForFetchOffsetForSameCachedBatch() { try (MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, Compression.NONE, TimestampType.CREATE_TIME, 5, 2)) { // Append only 4 records for 5th offset batch starting from offset 6. - memoryRecords(4, 6).records().forEach(builder::append); + memoryRecords(6, 4).records().forEach(builder::append); } buffer.flip(); records = MemoryRecords.readableRecords(buffer);