Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
180 changes: 91 additions & 89 deletions tools/src/main/java/org/apache/kafka/tools/VerifiableConsumer.java
Original file line number Diff line number Diff line change
Expand Up @@ -505,123 +505,125 @@ public long maxOffset() {

private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-consumer")
.defaultHelp(true)
.description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
.newArgumentParser("verifiable-consumer")
.defaultHelp(true)
.description("This tool consumes messages from a specific topic and emits consumer events (e.g. group rebalances, received messages, and offsets committed) as JSON objects to STDOUT.");
MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
.description("Group of arguments for connection to brokers")
.required(true);
.description("Group of arguments for connection to brokers")
.required(true);
connectionGroup.addArgument("--bootstrap-server")
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("bootstrapServer")
.help("The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
.action(store())
.required(true)
.type(String.class)
.dest("bootstrapServer")
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.help("The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Consumes messages from this topic.");
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Consumes messages from this topic.");

parser.addArgument("--group-protocol")
.action(store())
.required(false)
.type(String.class)
.setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
.metavar("GROUP_PROTOCOL")
.dest("groupProtocol")
.help(String.format("Group protocol (must be one of %s)", Arrays.stream(GroupProtocol.values())
.map(Object::toString).collect(Collectors.joining(", "))));
.action(store())
.required(false)
.type(String.class)
.setDefault(ConsumerConfig.DEFAULT_GROUP_PROTOCOL)
.dest("groupProtocol")
.metavar("GROUP-PROTOCOL")
.help(String.format("Group protocol (must be one of %s)", Arrays.stream(GroupProtocol.values())
.map(Object::toString).collect(Collectors.joining(", "))));

parser.addArgument("--group-remote-assignor")
.action(store())
.required(false)
.type(String.class)
.setDefault(ConsumerConfig.DEFAULT_GROUP_REMOTE_ASSIGNOR)
.metavar("GROUP_REMOTE_ASSIGNOR")
.dest("groupRemoteAssignor")
.help(String.format("Group remote assignor; only used if the group protocol is %s", GroupProtocol.CONSUMER.name()));
.action(store())
.required(false)
.type(String.class)
.setDefault(ConsumerConfig.DEFAULT_GROUP_REMOTE_ASSIGNOR)
.dest("groupRemoteAssignor")
.metavar("GROUP-REMOTE-ASSIGNOR")
.help(String.format("Group remote assignor; only used if the group protocol is %s", GroupProtocol.CONSUMER.name()));

parser.addArgument("--group-id")
.action(store())
.required(true)
.type(String.class)
.metavar("GROUP_ID")
.dest("groupId")
.help("The groupId shared among members of the consumer group");
.action(store())
.required(true)
.type(String.class)
.dest("groupId")
.metavar("GROUP-ID")
.help("The group id of the consumer group");

parser.addArgument("--group-instance-id")
.action(store())
.required(false)
.type(String.class)
.metavar("GROUP_INSTANCE_ID")
.dest("groupInstanceId")
.help("A unique identifier of the consumer instance");
.action(store())
.required(false)
.type(String.class)
.dest("groupInstanceId")
.metavar("GROUP-INSTANCE-ID")
.help("A unique identifier of the consumer instance");

parser.addArgument("--max-messages")
.action(store())
.required(false)
.type(Integer.class)
.setDefault(-1)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");
.action(store())
.required(false)
.type(Integer.class)
.setDefault(-1)
.dest("maxMessages")
.metavar("MAX-MESSAGES")
.help("Consume this many messages. If -1 (the default), the consumer will consume until the process is killed externally");

parser.addArgument("--session-timeout")
.action(store())
.required(false)
.type(Integer.class)
.metavar("TIMEOUT_MS")
.dest("sessionTimeout")
.help("Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer");
.action(store())
.required(false)
.type(Integer.class)
.dest("sessionTimeout")
.metavar("TIMEOUT-MS")
.help("Set the consumer's session timeout, note that this configuration is not supported when group protocol is consumer");

parser.addArgument("--verbose")
.action(storeTrue())
.type(Boolean.class)
.metavar("VERBOSE")
.help("Enable to log individual consumed records");
.action(storeTrue())
.type(Boolean.class)
.metavar("VERBOSE")
.help("Enable to log individual consumed records");

parser.addArgument("--enable-autocommit")
.action(storeTrue())
.type(Boolean.class)
.metavar("ENABLE-AUTOCOMMIT")
.dest("useAutoCommit")
.help("Enable offset auto-commit on consumer");
.action(storeTrue())
.type(Boolean.class)
.dest("useAutoCommit")
.metavar("ENABLE-AUTOCOMMIT")
.help("Enable offset auto-commit on consumer");

parser.addArgument("--reset-policy")
.action(store())
.required(false)
.setDefault("earliest")
.type(String.class)
.dest("resetPolicy")
.help("Set reset policy (must be either 'earliest', 'latest', or 'none')");
.action(store())
.required(false)
.setDefault("earliest")
.type(String.class)
.dest("resetPolicy")
.metavar("RESET-POLICY")
.help("Set reset policy (must be either 'earliest', 'latest', or 'none')");

parser.addArgument("--assignment-strategy")
.action(store())
.required(false)
.setDefault(RangeAssignor.class.getName())
.type(String.class)
.dest("assignmentStrategy")
.help(String.format("Set assignment strategy (e.g. %s); only used if the group protocol is %s", RoundRobinAssignor.class.getName(), GroupProtocol.CLASSIC.name()));
.action(store())
.required(false)
.setDefault(RangeAssignor.class.getName())
.type(String.class)
.dest("assignmentStrategy")
.metavar("ASSIGNMENT-STRATEGY")
.help(String.format("Set assignment strategy (e.g. %s); only used if the group protocol is %s", RoundRobinAssignor.class.getName(), GroupProtocol.CLASSIC.name()));

parser.addArgument("--consumer.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.help("(DEPRECATED) Consumer config properties file" +
"This option will be removed in a future version. Use --command-config instead");
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.help("(DEPRECATED) Consumer config properties file. " +
"This option will be removed in a future version. Use --command-config instead");

parser.addArgument("--command-config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.dest("commandConfigFile")
.help("Config properties file (config options shared with command line parameters will be overridden).");
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.dest("commandConfigFile")
.help("Config properties file (config options shared with command line parameters will be overridden).");

return parser;
}
Expand Down
98 changes: 49 additions & 49 deletions tools/src/main/java/org/apache/kafka/tools/VerifiableProducer.java
Original file line number Diff line number Diff line change
Expand Up @@ -110,69 +110,69 @@ public VerifiableProducer(KafkaProducer<String, String> producer, String topic,
/** Get the command-line argument parser. */
private static ArgumentParser argParser() {
ArgumentParser parser = ArgumentParsers
.newArgumentParser("verifiable-producer")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");
.newArgumentParser("verifiable-producer")
.defaultHelp(true)
.description("This tool produces increasing integers to the specified topic and prints JSON metadata to stdout on each \"send\" request, making externally visible which messages have been acked and which have not.");

parser.addArgument("--topic")
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
.action(store())
.required(true)
.type(String.class)
.metavar("TOPIC")
.help("Produce messages to this topic.");
MutuallyExclusiveGroup connectionGroup = parser.addMutuallyExclusiveGroup("Connection Group")
.description("Group of arguments for connection to brokers")
.required(true);
.description("Group of arguments for connection to brokers")
.required(true);
connectionGroup.addArgument("--bootstrap-server")
.action(store())
.required(false)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("bootstrapServer")
.help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");
.action(store())
.required(false)
.type(String.class)
.dest("bootstrapServer")
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.help("REQUIRED: The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--max-messages")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.dest("maxMessages")
.metavar("MAX-MESSAGES")
.help("Produce this many messages. If -1, produce messages until the process is killed externally.");

parser.addArgument("--throughput")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.metavar("THROUGHPUT")
.help("If set >= 0, throttle maximum message throughput to *approximately* THROUGHPUT messages/sec.");

parser.addArgument("--acks")
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on acks for details.");
.action(store())
.required(false)
.setDefault(-1)
.type(Integer.class)
.choices(0, 1, -1)
.metavar("ACKS")
.help("Acks required on each produced message. See Kafka docs on acks for details.");

parser.addArgument("--producer.config")
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.help("(DEPRECATED) Producer config properties file. " +
"This option will be removed in a future version. Use --command-config instead.");
.action(store())
.required(false)
.type(String.class)
.metavar("CONFIG-FILE")
.help("(DEPRECATED) Producer config properties file. " +
"This option will be removed in a future version. Use --command-config instead.");

parser.addArgument("--message-create-time")
.action(store())
.required(false)
.setDefault(-1L)
.type(Long.class)
.metavar("CREATETIME")
.dest("createTime")
.help("Send messages with creation time starting at the arguments value, in milliseconds since epoch");
.action(store())
.required(false)
.setDefault(-1L)
.type(Long.class)
.metavar("CREATE-TIME")
.dest("createTime")
.help("Send messages with creation time starting at the arguments value, in milliseconds since epoch");

parser.addArgument("--value-prefix")
.action(store())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -499,8 +499,8 @@ private static ArgumentParser argParser() {
.action(store())
.required(true)
.type(String.class)
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.dest("bootstrapServer")
.metavar("HOST1:PORT1[,HOST2:PORT2[...]]")
.help("The server(s) to connect to. Comma-separated list of Kafka brokers in the form HOST1:PORT1,HOST2:PORT2,...");

parser.addArgument("--topic")
Expand All @@ -514,17 +514,17 @@ private static ArgumentParser argParser() {
.action(store())
.required(true)
.type(String.class)
.metavar("GROUP_ID")
.dest("groupId")
.help("The groupId shared among members of the share group");
.metavar("GROUP-ID")
.help("The group id of the share group");

parser.addArgument("--max-messages")
.action(store())
.required(false)
.type(Integer.class)
.setDefault(-1)
.metavar("MAX-MESSAGES")
.dest("maxMessages")
.metavar("MAX-MESSAGES")
.help("Consume this many messages. If -1 (the default), the share consumers will consume until the process is killed externally");

parser.addArgument("--verbose")
Expand All @@ -539,6 +539,7 @@ private static ArgumentParser argParser() {
.setDefault("auto")
.type(String.class)
.dest("acknowledgementMode")
.metavar("ACKNOWLEDGEMENT-MODE")
.help("Acknowledgement mode for the share consumers (must be either 'auto', 'sync' or 'async')");

parser.addArgument("--offset-reset-strategy")
Expand All @@ -547,14 +548,15 @@ private static ArgumentParser argParser() {
.setDefault("")
.type(String.class)
.dest("offsetResetStrategy")
.help("Set share group reset strategy (must be either 'earliest' or 'latest')");
.metavar("OFFSET-RESET-STRATEGY")
.help("Share group offset reset strategy (must be either 'earliest' or 'latest')");

parser.addArgument("--command-config")
.action(store())
.required(false)
.type(String.class)
.dest("commandConfig")
.metavar("CONFIG_FILE")
.metavar("CONFIG-FILE")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Kafka currently uses two parser libraryies: joptsimple and argparse4j. As a result, not all tools support metavariables. I'm not sure why Kafka relies on both parsers, but it would be beneficial to unify them to ensure a consistent codebase and output message.

@AndrewJSchofield @lianetm @kirktrue @m1a2st WDYT?

Copy link
Collaborator

@Yunyung Yunyung Sep 3, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this is a bit annoying. If we have to choose between the two, we should probably use joptsimple everywhere, since most scripts already rely on CommandDefaultOptions, which works well with CommandLineUtils.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

While I agree in principle, the syntax supported by kafka-producer-perf-test is a bit different. We might accidentally cause existing commands to fail. So, care needed here for sure.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have opened https://issues.apache.org/jira/browse/KAFKA-19670

Since argparse4j provides more powerful features, we could consider using it for the codebase

.help("Config properties file (config options shared with command line parameters will be overridden).");

return parser;
Expand Down