-
Notifications
You must be signed in to change notification settings - Fork 12
Consumer
1. Kafka Consumer Config
2. Kafka Consumer Sampler
To Consume/Read a message from a Kafka topic you need to add Consumer components to the testplan.
- The Kafka Consumer config is responsible to hold the connection information, which includes security and other properties required to talk to the broker.
- The Kafka Consumer Sampler helps to read messages from the topic with the connection established using Config element.
Right click on Test Plan -> Add -> Config Element -> Kafka Consumer Config
Provide a Variable name to export the connection object (Which will be used in Sampler element)
Provide the Kafka connection configs (list of Brokers with comma separated)
Provide a Group ID (Make it unique, to define the group your consumer belongs to)
Define the topic name where you want to send the message (Case sensitive)
No Of Messages to Poll - This allows you to define the number of messages to read within a request (Defaults to 1)
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
Auto Commit - This will set the offset as read, once the message is consumed
Select the right security to connect to brokers (This will be completely based on how Kafka security is defined)
For JAAS Security, You need to add the below key and value to the Additional Properties
Config key: sasl.jaas.config
Config value: org.apache.kafka.common.security.scram.ScramLoginModule required username="<USERNAME>" password="<PASSWORD>";
Right click on Test Plan -> Add -> Sampler -> Kafka Consumer Sampler
Use the same Variable name which was defined in the config element
Poll timeout - This helps to set the polling timeout for consumer to read from topic (Defaults to 100 ms)
Commit Type - Defines the Commit type (Sync/Async)
Supported Consumer properties which can be added to Additional Properties field.
Property | Available Options | Default |
---|---|---|
auto.commit.interval.ms | positive integer | 5000 |
auto.offset.reset | [earliest, latest, none] | latest |
bootstrap.servers | comma-separated host:port pairs | localhost:9092 |
check.crcs | [true, false] | true |
client.id | string | "" |
connections.max.idle.ms | positive long | 540000 |
enable.auto.commit | [true, false] | true |
exclude.internal.topics | [true, false] | true |
fetch.max.bytes | positive long | 52428800 |
fetch.max.wait.ms | non-negative integer | 500 |
fetch.min.bytes | non-negative integer | 1 |
group.id | string | "" |
heartbeat.interval.ms | positive integer | 3000 |
interceptor.classes | fully-qualified class names | [] |
isolation.level | [read_uncommitted, read_committed] | read_uncommitted |
key.deserializer | fully-qualified class name | org.apache.kafka.common.serialization.StringDeserializer |
max.partition.fetch.bytes | positive integer | 1048576 |
max.poll.interval.ms | positive long | 300000 |
max.poll.records | positive integer | 500 |
metadata.max.age.ms | positive long | 300000 |
metadata.fetch.timeout.ms | positive long | 60000 |
receive.buffer.bytes | positive integer | 32768 |
reconnect.backoff.ms | non-negative long | 50 |
request.timeout.ms | positive integer | 30000 |
retry.backoff.ms | non-negative long | 100 |
sasl.jaas.config | string | null |
sasl.kerberos.kinit.cmd | string | /usr/bin/kinit |
sasl.kerberos.min.time.before.relogin | positive long | 60000 |
sasl.kerberos.service.name | string | null |
sasl.mechanism | [GSSAPI, PLAIN, SCRAM-SHA-256, SCRAM-SHA-512] | GSSAPI |
security.protocol | [PLAINTEXT, SSL, SASL_PLAINTEXT, SASL_SSL] | PLAINTEXT |
send.buffer.bytes | positive integer | 131072 |
session.timeout.ms | positive integer | 10000 |
value.deserializer | fully-qualified class name | org.apache.kafka.common.serialization.StringDeserializer |