Skip to content

Consumer

Matt Howlett edited this page Dec 18, 2019 · 23 revisions

This page provides information specific to the Consumer class. Refer also to the general client wiki page which covers information relevant to all client types.

Basics

Kafka is quite different from traditional messaging systems such as RabbitMQ!

If you're new to Kafka, check out the following resources for more information about how consumption works in Kafka:

Configuration

https://github.com/edenhill/librdkafka/blob/master/CONFIGURATION.md

If you are in doubt, don't changes values away from the defaults - there is a lot of potential to cause unintended consequences. The choice of some of the default values is arguably not quite optimal (and they remain as they are for compatibility reasons), but on the whole they are good, and you won't go far wrong sticking to them.

Consumer Groups

  • Consumers work in groups. At any given time, one and only one consumer in a group will be assigned to read from each partition of a subscribed to topic.

  • You aren't required to use consumer groups. You can directly assign to specific partitions using the Assign method.

  • A group-rebalance is a stop-the-world event.

  • You must specify a GroupId in your configuration. You must do this even if you don't utilize any group functionality.

High Level Architecture

The Consumer API sits at a much high level of abstraction than the Kafka protocol, which is used to communicate with the cluster.

When you call Consume, you are pulling messages from an local in-memory queue - you are not directly sending requests to the cluster. Behind the scenes, the client orchestrates connecting to the required brokers, automatically correcting in the case of leader changes etc.

Although your application consumes messages one by one, messages are pulled by the client from brokers in batches for efficiency. By default, caching of messages on the client is very aggressive (optimized for high throughput). You may want to reduce this in scenarios. The configuration property you want is: QueuedMaxMessagesKbytes.

Errors

There are two types of error in Kafka - retryable and non-retryable. Generally, the client will automatically recover from any re-tryable error without the application developer needing to do anything.

Generally errors exposed via the error callback or in the log represent re-tryable errors and are for informational purposes only.

However, if an error has the IsFatal flag marked as true (should generally never happen), the consumer is in an unrecoverable state and must be re-created.

Partition Assignments

The term 'Assign' is used for more than one purpose unfortunately. There are two related concepts:

  1. The assignment given to a particular consumer by the consumer group.
  2. The partitions assigned to be read from by the consumer.

With the Java client, these are always the same. The .NET Client is more flexible - it allows you to override the assignment given to you by the group to be whatever you want. This is an advanced, uncommonly used feature. It might be useful for example to add a control message partition to be read by every consumer in addition to the assigned partitions.

Committing Offsets

The docs for the .NET client on the confluent website go into a fair bit of information about how to commit offsets:

https://docs.confluent.io/current/clients/dotnet.html#auto-offset-commit

TLDR: you should probably be using StoreOffsets, set EnableAutoOffsetStore to false and EnableAutoCommit to true (the default)

Question: I want to synchronously commit offsets after each consumed message. It's very slow. How do I make it fast?

librdkafka uses the same broker connection both for commits and fetching messages, thus a commit may be backed up behind a long-poll blocking Fetch request. The long-poll behavior of fetch requests is configured with the fetch.wait.max.ms property that defaults to 100ms. You can decrease that value to decrease offset commit latency at the expense of a larger number of Fetch requests (depending on your traffic pattern). Also see https://github.com/edenhill/librdkafka/issues/1787

todo: is this information still current?

What are the trade-offs regarding the number of .NET client consumer instances?

All else equal, you should aim to fully utilize you kafka client instances (whether consumer or producer). The primary advantage is that this enables better batching, which is a huge performance win because there is a large per-protocol-request overhead.

There is also overhead associated with each open connection to a broker, however this is negligible up to several thousand connections.