Skip to content

Commit

Permalink
KAFKA-17393: Remove log.message.format.version/message.format.version…
Browse files Browse the repository at this point in the history
… (KIP-724) (#18267)

Based on [KIP-724](https://cwiki.apache.org/confluence/display/KAFKA/KIP-724%3A+Drop+support+for+message+formats+v0+and+v1), the `log.message.format.version` and `message.format.version` can be removed in 4.0.

These configs effectively a no-op with inter-broker protocol version 3.0 or higher
since Apache Kafka 3.0, so the impact should be minimal.

Reviewers: Ismael Juma <[email protected]>
  • Loading branch information
FrankYang0529 authored Dec 21, 2024
1 parent 8bd3746 commit b4be178
Show file tree
Hide file tree
Showing 40 changed files with 218 additions and 1,028 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -198,26 +198,6 @@ public class TopicConfig {
public static final String PREALLOCATE_DOC = "True if we should preallocate the file on disk when " +
"creating a new log segment.";

/**
* @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
* for most situations.
*/
@Deprecated
public static final String MESSAGE_FORMAT_VERSION_CONFIG = "message.format.version";

/**
* @deprecated since 3.0, removal planned in 4.0. The default value for this config is appropriate
* for most situations.
*/
@Deprecated
public static final String MESSAGE_FORMAT_VERSION_DOC = "[DEPRECATED] Specify the message format version the broker " +
"will use to append messages to the logs. The value of this config is always assumed to be `3.0` if " +
"`inter.broker.protocol.version` is 3.0 or higher (the actual config value is ignored). Otherwise, the value should " +
"be a valid ApiVersion. Some examples are: 0.10.0, 1.1, 2.8, 3.0. By setting a particular message format version, the " +
"user is certifying that all the existing messages on disk are smaller or equal than the specified version. Setting " +
"this value incorrectly will cause consumers with older versions to break as they will receive messages with a format " +
"that they don't understand.";

public static final String MESSAGE_TIMESTAMP_TYPE_CONFIG = "message.timestamp.type";
public static final String MESSAGE_TIMESTAMP_TYPE_DOC = "Define whether the timestamp in the message is " +
"message create time or log append time.";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -857,14 +857,13 @@ private void maybeAllocateNewBatch(
) {
if (currentBatch == null) {
LogConfig logConfig = partitionWriter.config(tp);
byte magic = logConfig.recordVersion().value;
int maxBatchSize = logConfig.maxMessageSize();
long prevLastWrittenOffset = coordinator.lastWrittenOffset();
ByteBuffer buffer = bufferSupplier.get(maxBatchSize);

MemoryRecordsBuilder builder = new MemoryRecordsBuilder(
buffer,
magic,
RecordBatch.CURRENT_MAGIC_VALUE,
compression,
TimestampType.CREATE_TIME,
0L,
Expand Down
10 changes: 1 addition & 9 deletions core/src/main/scala/kafka/admin/ConfigCommand.scala
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import kafka.server.DynamicConfig
import kafka.utils.Implicits._
import kafka.utils.Logging
import org.apache.kafka.clients.admin.{Admin, AlterClientQuotasOptions, AlterConfigOp, AlterConfigsOptions, ConfigEntry, DescribeClusterOptions, DescribeConfigsOptions, ListTopicsOptions, ScramCredentialInfo, UserScramCredentialDeletion, UserScramCredentialUpsertion, ScramMechanism => PublicScramMechanism}
import org.apache.kafka.common.config.{ConfigResource, TopicConfig}
import org.apache.kafka.common.config.ConfigResource
import org.apache.kafka.common.errors.{InvalidConfigurationException, UnsupportedVersionException}
import org.apache.kafka.common.internals.Topic
import org.apache.kafka.common.protocol.ApiKeys
Expand All @@ -36,7 +36,6 @@ import org.apache.kafka.server.config.{ConfigType, QuotaConfig}
import org.apache.kafka.server.util.{CommandDefaultOptions, CommandLineUtils}
import org.apache.kafka.storage.internals.log.LogConfig

import scala.annotation.nowarn
import scala.jdk.CollectionConverters._
import scala.collection._

Expand Down Expand Up @@ -95,8 +94,6 @@ object ConfigCommand extends Logging {
}
}


@nowarn("cat=deprecation")
def parseConfigsToBeAdded(opts: ConfigCommandOptions): Properties = {
val props = new Properties
if (opts.options.has(opts.addConfigFile)) {
Expand All @@ -115,11 +112,6 @@ object ConfigCommand extends Logging {
//Create properties, parsing square brackets from values if necessary
configsToBeAdded.foreach(pair => props.setProperty(pair(0).trim, pair(1).replaceAll("\\[?\\]?", "").trim))
}
if (props.containsKey(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)) {
System.out.println(s"WARNING: The configuration ${TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG}=${props.getProperty(TopicConfig.MESSAGE_FORMAT_VERSION_CONFIG)} is specified. " +
"This configuration will be ignored if the version is newer than the inter.broker.protocol.version specified in the broker or " +
"if the inter.broker.protocol.version is 3.0 or newer. This configuration is deprecated and it will be removed in Apache Kafka 4.0.")
}
validatePropsKey(props)
props
}
Expand Down
Loading

0 comments on commit b4be178

Please sign in to comment.