inputs/redpanda: add fetch_max_wait option #3100
Merged
+63
−0
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
kgo.FetchMaxWait is a config option supported by franz-go. It makes it possible to use kgo.FetchMinBytes to force big batches, but have a rather low max wait time to fill the batch. This makes it possible to force the broker to send big batches if possible, but still wait only for a short time if there's not enough data.
Why add this option now?
This is especially important with the redpanda input, as it's using ordered franz-go. It will only send batches, if the previous batch with the partition has been consumed. If the broker keeps sending very small batches, e.g. size 1, it's likely to stall batched outputs. I could reproduce locally by using a producer that sends lots of batches of size 1.
I tried to overcome this ordering limitation by using batching in my output, but it doesn't work in this specific case. It will only add more records to the batch, if the previous batch of the partition was consumed, so in the extreme case of getting one record per kafka batch, for only one partition, i can't overcome it, rpcn will do only one record at a time.
Using kgo.FetchMinBytes in combination with kgo.FetchMaxWait can solve this problem.
But in any case, it is a useful tuning knob offered by franz-go, but also the standard Java client.