Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -506,7 +506,7 @@ static void writeDynamicQuorumSnapshot(
VoterSet voterSet = initialControllers.toVoterSet(controllerListenerName);
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder().
setLastContainedLogTimestamp(Time.SYSTEM.milliseconds()).
setMaxBatchSize(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
setMaxBatchSizeBytes(KafkaRaftClient.MAX_BATCH_SIZE_BYTES).
setRawSnapshotWriter(FileRawSnapshotWriter.create(
clusterMetadataDirectory.toPath(),
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3781,7 +3781,7 @@ public Optional<SnapshotWriter<T>> createSnapshot(
return new RecordsSnapshotWriter.Builder()
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setTime(time)
.setMaxBatchSize(MAX_BATCH_SIZE_BYTES)
.setMaxBatchSizeBytes(MAX_BATCH_SIZE_BYTES)
.setMemoryPool(memoryPool)
.setRawSnapshotWriter(wrappedWriter)
.setKraftVersion(partitionState.kraftVersionAtOffset(lastContainedLogOffset))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ MemoryRecords create(
private final int epoch;
private final Time time;
private final int lingerMs;
private final int maxBatchSize;
private final int maxBatchSizeBytes;
private final int maxNumberOfBatches;
private final Compression compression;
private final MemoryPool memoryPool;
Expand All @@ -82,7 +82,7 @@ public BatchAccumulator(
int epoch,
long baseOffset,
int lingerMs,
int maxBatchSize,
int maxBatchSizeBytes,
int maxNumberOfBatches,
MemoryPool memoryPool,
Time time,
Expand All @@ -91,7 +91,7 @@ public BatchAccumulator(
) {
this.epoch = epoch;
this.lingerMs = lingerMs;
this.maxBatchSize = maxBatchSize;
this.maxBatchSizeBytes = maxBatchSizeBytes;
this.maxNumberOfBatches = maxNumberOfBatches;
this.memoryPool = memoryPool;
this.time = time;
Expand Down Expand Up @@ -182,12 +182,12 @@ private BatchBuilder<T> maybeAllocateBatch(

if (currentBatch != null) {
OptionalInt bytesNeeded = currentBatch.bytesNeeded(records, serializationCache);
if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > maxBatchSize) {
if (bytesNeeded.isPresent() && bytesNeeded.getAsInt() > maxBatchSizeBytes) {
throw new RecordBatchTooLargeException(
String.format(
"The total record(s) size of %d exceeds the maximum allowed batch size of %d",
bytesNeeded.getAsInt(),
maxBatchSize
maxBatchSizeBytes
)
);
} else if (bytesNeeded.isPresent()) {
Expand Down Expand Up @@ -231,7 +231,7 @@ public void allowDrain() {
public long appendControlMessages(MemoryRecordsCreator valueCreator) {
appendLock.lock();
try {
ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSize);
ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSizeBytes);
if (buffer != null) {
try {
forceDrain();
Expand Down Expand Up @@ -421,7 +421,7 @@ private void maybeCompleteDrain() {
}

private void startNewBatch() {
ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSize);
ByteBuffer buffer = memoryPool.tryAllocate(maxBatchSizeBytes);
if (buffer != null) {
currentBatch = new BatchBuilder<>(
buffer,
Expand All @@ -430,7 +430,7 @@ private void startNewBatch() {
nextOffset,
time.milliseconds(),
epoch,
maxBatchSize
maxBatchSizeBytes
);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,7 +44,7 @@ public final class RecordsSnapshotWriter<T> implements SnapshotWriter<T> {

private RecordsSnapshotWriter(
RawSnapshotWriter snapshot,
int maxBatchSize,
int maxBatchSizeBytes,
MemoryPool memoryPool,
Time time,
Compression compression,
Expand All @@ -57,7 +57,7 @@ private RecordsSnapshotWriter(
snapshot.snapshotId().epoch(),
0,
Integer.MAX_VALUE,
maxBatchSize,
maxBatchSizeBytes,
10, // maxNumberOfBatches
memoryPool,
time,
Expand Down Expand Up @@ -145,7 +145,7 @@ public static final class Builder {
private long lastContainedLogTimestamp = 0;
private Compression compression = Compression.NONE;
private Time time = Time.SYSTEM;
private int maxBatchSize = 1024;
private int maxBatchSizeBytes = 1024;
private MemoryPool memoryPool = MemoryPool.NONE;
private KRaftVersion kraftVersion = KRaftVersion.KRAFT_VERSION_1;
private Optional<VoterSet> voterSet = Optional.empty();
Expand All @@ -166,8 +166,8 @@ public Builder setTime(Time time) {
return this;
}

public Builder setMaxBatchSize(int maxBatchSize) {
this.maxBatchSize = maxBatchSize;
public Builder setMaxBatchSizeBytes(int maxBatchSizeBytes) {
this.maxBatchSizeBytes = maxBatchSizeBytes;
return this;
}

Expand Down Expand Up @@ -206,7 +206,7 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {

RecordsSnapshotWriter<T> writer = new RecordsSnapshotWriter<>(
rawSnapshotWriter.get(),
maxBatchSize,
maxBatchSizeBytes,
memoryPool,
time,
compression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,13 +50,13 @@ final class RecordsSnapshotWriterTest {
@Test
void testBuilderKRaftVersion0() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100, 10);
int maxBatchSize = 1024;
int maxBatchSizeBytes = 1024;
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.empty())
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeBytes(maxBatchSizeBytes)
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
Expand All @@ -68,7 +68,7 @@ void testBuilderKRaftVersion0() {
new MockRawSnapshotReader(snapshotId, buffer.get()),
STRING_SERDE,
BufferSupplier.NO_CACHING,
maxBatchSize,
maxBatchSizeBytes,
true,
new LogContext()
)
Expand Down Expand Up @@ -100,7 +100,7 @@ void testBuilderKRaftVersion0() {
@Test
void testBuilderKRaftVersion0WithVoterSet() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100, 10);
int maxBatchSize = 1024;
int maxBatchSizeBytes = 1024;
VoterSet voterSet = VoterSetTest.voterSet(
new HashMap<>(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))
);
Expand All @@ -109,7 +109,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setVoterSet(Optional.of(voterSet))
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeBytes(maxBatchSizeBytes)
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
Expand All @@ -120,7 +120,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
@Test
void testKBuilderRaftVersion1WithVoterSet() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100, 10);
int maxBatchSize = 1024;
int maxBatchSizeBytes = 1024;
VoterSet voterSet = VoterSetTest.voterSet(
new HashMap<>(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))
);
Expand All @@ -129,7 +129,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(voterSet))
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeBytes(maxBatchSizeBytes)
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
Expand All @@ -141,7 +141,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
new MockRawSnapshotReader(snapshotId, buffer.get()),
STRING_SERDE,
BufferSupplier.NO_CACHING,
maxBatchSize,
maxBatchSizeBytes,
true,
new LogContext()
)
Expand Down Expand Up @@ -181,13 +181,13 @@ void testKBuilderRaftVersion1WithVoterSet() {
@Test
void testBuilderKRaftVersion1WithoutVoterSet() {
OffsetAndEpoch snapshotId = new OffsetAndEpoch(100, 10);
int maxBatchSize = 1024;
int maxBatchSizeBytes = 1024;
AtomicReference<ByteBuffer> buffer = new AtomicReference<>(null);
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.empty())
.setTime(new MockTime())
.setMaxBatchSize(maxBatchSize)
.setMaxBatchSizeBytes(maxBatchSizeBytes)
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
Expand All @@ -199,7 +199,7 @@ void testBuilderKRaftVersion1WithoutVoterSet() {
new MockRawSnapshotReader(snapshotId, buffer.get()),
STRING_SERDE,
BufferSupplier.NO_CACHING,
maxBatchSize,
maxBatchSizeBytes,
true,
new LogContext()
)
Expand Down