Skip to content
Draft
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,7 @@ class DumpLogSegmentsTest {
)

val lastContainedLogTimestamp = 10000
val emptyOptional: Optional[java.util.List[ApiMessageAndVersion]] = Optional.empty()

Using.resource(
new RecordsSnapshotWriter.Builder()
Expand All @@ -601,7 +602,7 @@ class DumpLogSegmentsTest {
.setRawSnapshotWriter(metadataLog.createNewSnapshot(new OffsetAndEpoch(0, 0)).get)
.setKraftVersion(KRaftVersion.KRAFT_VERSION_1)
.setVoterSet(Optional.of(VoterSetTest.voterSet(VoterSetTest.voterMap(IntStream.of(1, 2, 3), true))))
.build(MetadataRecordSerde.INSTANCE)
.build(MetadataRecordSerde.INSTANCE, emptyOptional)
Copy link
Contributor

@kevin-wu24 kevin-wu24 Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We should be able to remove this. This applies to other instances where we build the RecordsSnapshotWriter.

) { snapshotWriter =>
snapshotWriter.append(metadataRecords)
snapshotWriter.freeze()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -437,14 +437,11 @@ void doFormat(BootstrapMetadata bootstrapMetadata) throws Exception {
directoryTypes.get(writeLogDir).description(), writeLogDir,
MetadataVersion.FEATURE_NAME, releaseVersion);
Files.createDirectories(Paths.get(writeLogDir));
BootstrapDirectory bootstrapDirectory = new BootstrapDirectory(writeLogDir);
bootstrapDirectory.writeBinaryFile(bootstrapMetadata);
if (directoryTypes.get(writeLogDir).isDynamicMetadataDirectory()) {
writeDynamicQuorumSnapshot(writeLogDir,
initialControllers.get(),
featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
}
writeBoostrapSnapshot(writeLogDir,
bootstrapMetadata,
initialControllers.get(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to pass the optional for initialControllers here. We can only do a .get() if initialControllers.isPresent().

featureLevels.get(KRaftVersion.FEATURE_NAME),
controllerListenerName);
});
copier.setWriteErrorHandler((errorLogDir, e) -> {
throw new FormatterException("Error while writing meta.properties file " +
Expand Down Expand Up @@ -492,8 +489,9 @@ static DirectoryType calculate(
}
}

static void writeDynamicQuorumSnapshot(
static void writeBoostrapSnapshot(
String writeLogDir,
BootstrapMetadata bootstrapMetadata,
DynamicVoters initialControllers,
short kraftVersion,
String controllerListenerName
Expand All @@ -511,8 +509,9 @@ static void writeDynamicQuorumSnapshot(
Snapshots.BOOTSTRAP_SNAPSHOT_ID)).
setKraftVersion(KRaftVersion.fromFeatureLevel(kraftVersion)).
setVoterSet(Optional.of(voterSet));
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde())) {
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
writer.freeze();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde(), Optional.of(bootstrapMetadata.records()))) {
writer.freeze();
try (RecordsSnapshotWriter<ApiMessageAndVersion> writer = builder.build(new MetadataRecordSerde()))) {
writer.append(bootstrapMetadata.records());
writer.freeze();

Copy link
Contributor Author

@mannoopj mannoopj Oct 20, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wouldn't this write the bootstrap records after the control records, since in RecordsSnapshotWriter.build() is where we append the kraft records and we would be calling that ahead of writer.append in this scenario? we want the bootstrap records ahead correct?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It doesn't matter what order we write them in because KRaft only reads the control records, and the metadata module will only read the "data" records. When we read the 0-0.checkpoint back into memory, we only deal with either its control records or its data records, not both in the same code.

}

}
Copy link
Contributor

@kevin-wu24 kevin-wu24 Oct 15, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There is a clearer way to make these changes. This method is what writes the 0-0.checkpoint currently. We should pass the bootstrap metadata object here and append the metadata records using writer.append.append(bootstrapMetadata.records()) before calling writer.freeze().

}
Original file line number Diff line number Diff line change
Expand Up @@ -729,7 +729,7 @@ public Optional<SnapshotWriter<ApiMessageAndVersion>> createSnapshot(
.setLastContainedLogTimestamp(lastContainedLogTimestamp)
.setTime(new MockTime())
.setRawSnapshotWriter(createNewSnapshot(snapshotId))
.build(new MetadataRecordSerde())
.build(new MetadataRecordSerde(), Optional.empty())
);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3726,7 +3726,7 @@ public Optional<SnapshotWriter<T>> createSnapshot(
.setRawSnapshotWriter(wrappedWriter)
.setKraftVersion(partitionState.kraftVersionAtOffset(lastContainedLogOffset))
.setVoterSet(partitionState.voterSetAtOffset(lastContainedLogOffset))
.build(serde);
.build(serde, Optional.empty());
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
import org.apache.kafka.raft.VoterSet;
import org.apache.kafka.raft.internals.BatchAccumulator;
import org.apache.kafka.raft.internals.BatchAccumulator.CompletedBatch;
import org.apache.kafka.server.common.ApiMessageAndVersion;
import org.apache.kafka.server.common.KRaftVersion;
import org.apache.kafka.server.common.OffsetAndEpoch;
import org.apache.kafka.server.common.serialization.RecordSerde;
Expand Down Expand Up @@ -191,7 +192,7 @@ public Builder setVoterSet(Optional<VoterSet> voterSet) {
return this;
}

public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde, Optional<List<T>> bootstrapRecords) {
if (rawSnapshotWriter.isEmpty()) {
throw new IllegalStateException("Builder::build called without a RawSnapshotWriter");
} else if (rawSnapshotWriter.get().sizeInBytes() != 0) {
Expand All @@ -213,6 +214,8 @@ public <T> RecordsSnapshotWriter<T> build(RecordSerde<T> serde) {
serde
);

bootstrapRecords.ifPresent(writer::append);

writer.accumulator.appendControlMessages((baseOffset, epoch, compression, buffer) -> {
long now = time.milliseconds();
try (MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2224,7 +2224,7 @@ private static SnapshotWriter<String> snapshotWriter(RaftClientTestContext conte
return new RecordsSnapshotWriter.Builder()
.setTime(context.time)
.setRawSnapshotWriter(snapshot)
.build(new StringSerde());
.build(new StringSerde(), Optional.empty());
}

private static final class MemorySnapshotWriter implements RawSnapshotWriter {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ Builder withEmptySnapshot(OffsetAndEpoch snapshotId) {
.setTime(time)
.setKraftVersion(KRaftVersion.KRAFT_VERSION_0)
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
.build(SERDE)
.build(SERDE, Optional.empty())
) {
snapshot.freeze();
}
Expand Down Expand Up @@ -363,7 +363,7 @@ Builder withBootstrapSnapshot(Optional<VoterSet> voters) {
.setKraftVersion(kraftVersion)
.setVoterSet(voters);

try (RecordsSnapshotWriter<String> writer = builder.build(SERDE)) {
try (RecordsSnapshotWriter<String> writer = builder.build(SERDE, Optional.empty())) {
writer.freeze();
}
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ void testUpdateWithEmptySnapshot() {
// Create a snapshot that doesn't have any kraft.version or voter set control records
RecordsSnapshotWriter.Builder builder = new RecordsSnapshotWriter.Builder()
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get());
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down Expand Up @@ -234,7 +234,7 @@ void testUpdateWithSnapshot() {
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(new OffsetAndEpoch(10, epoch)).get())
.setKraftVersion(kraftVersion)
.setVoterSet(Optional.of(voterSet));
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down Expand Up @@ -272,7 +272,7 @@ void testUpdateWithSnapshotAndLogOverride() {
.setRawSnapshotWriter(log.createNewSnapshotUnchecked(snapshotId).get())
.setKraftVersion(kraftVersion)
.setVoterSet(Optional.of(snapshotVoterSet));
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<?> writer = builder.build(STRING_SERDE, Optional.empty())) {
writer.freeze();
}
log.truncateToLatestSnapshot();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ public void testControlRecordIterationWithKraftVersion0() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.append(List.of("a", "b", "c"));
snapshot.append(List.of("d", "e", "f"));
snapshot.append(List.of("g", "h", "i"));
Expand Down Expand Up @@ -221,7 +221,7 @@ public void testControlRecordIterationWithKraftVersion1() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(new OffsetAndEpoch(100, 10), buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.append(List.of("a", "b", "c"));
snapshot.append(List.of("d", "e", "f"));
snapshot.append(List.of("g", "h", "i"));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ void testBuilderKRaftVersion0() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down Expand Up @@ -114,7 +114,7 @@ void testBuilderKRaftVersion0WithVoterSet() {
new MockRawSnapshotWriter(snapshotId, buffer::set)
);

assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE));
assertThrows(IllegalStateException.class, () -> builder.build(STRING_SERDE, Optional.empty()));
}

@Test
Expand All @@ -133,7 +133,7 @@ void testKBuilderRaftVersion1WithVoterSet() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down Expand Up @@ -191,7 +191,7 @@ void testBuilderKRaftVersion1WithoutVoterSet() {
.setRawSnapshotWriter(
new MockRawSnapshotWriter(snapshotId, buffer::set)
);
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE)) {
try (RecordsSnapshotWriter<String> snapshot = builder.build(STRING_SERDE, Optional.empty())) {
snapshot.freeze();
}

Expand Down