@@ -57,6 +57,17 @@ def initialize(cluster:, logger:, instrumenter:, group:, offset_manager:, sessio
5757
5858 # The maximum number of bytes to fetch from a single partition, by topic.
5959 @max_bytes = { }
60+
61+ # Hash containing offsets for each topic and partition that has the
62+ # automatically_mark_as_processed feature disabled. Offset manager is only active
63+ # when everything is suppose to happen automatically. Otherwise we need to keep track of the
64+ # offset manually in memory for all the time
65+ # The key structure for this equals an array with topic and partition [topic, partition]
66+ # The value is equal to the offset of the last message we've received
67+ # @note It won't be updated in case user marks message as processed, because for the case
68+ # when user commits message other than last in a batch, this would make ruby-kafka refetch
69+ # some already consumed messages
70+ @current_offsets = Hash . new { |h , k | h [ k ] = { } }
6071 end
6172
6273 # Subscribes the consumer to a topic.
@@ -94,6 +105,7 @@ def subscribe(topic, default_offset: nil, start_from_beginning: true, max_bytes_
94105 # @return [nil]
95106 def stop
96107 @running = false
108+ @cluster . disconnect
97109 end
98110
99111 # Pause processing of a specific topic partition.
@@ -180,7 +192,11 @@ def paused?(topic, partition)
180192 # @return [nil]
181193 def each_message ( min_bytes : 1 , max_wait_time : 1 , automatically_mark_as_processed : true )
182194 consumer_loop do
183- batches = fetch_batches ( min_bytes : min_bytes , max_wait_time : max_wait_time )
195+ batches = fetch_batches (
196+ min_bytes : min_bytes ,
197+ max_wait_time : max_wait_time ,
198+ automatically_mark_as_processed : automatically_mark_as_processed
199+ )
184200
185201 batches . each do |batch |
186202 batch . messages . each do |message |
@@ -196,6 +212,7 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
196212
197213 begin
198214 yield message
215+ @current_offsets [ message . topic ] [ message . partition ] = message . offset
199216 rescue => e
200217 location = "#{ message . topic } /#{ message . partition } at offset #{ message . offset } "
201218 backtrace = e . backtrace . join ( "\n " )
@@ -216,6 +233,8 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
216233
217234 # We may not have received any messages, but it's still a good idea to
218235 # commit offsets if we've processed messages in the last set of batches.
236+ # This also ensures the offsets are retained if we haven't read any messages
237+ # since the offset retention period has elapsed.
219238 @offset_manager . commit_offsets_if_necessary
220239 end
221240 end
@@ -244,7 +263,11 @@ def each_message(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed
244263 # @return [nil]
245264 def each_batch ( min_bytes : 1 , max_wait_time : 1 , automatically_mark_as_processed : true )
246265 consumer_loop do
247- batches = fetch_batches ( min_bytes : min_bytes , max_wait_time : max_wait_time )
266+ batches = fetch_batches (
267+ min_bytes : min_bytes ,
268+ max_wait_time : max_wait_time ,
269+ automatically_mark_as_processed : automatically_mark_as_processed
270+ )
248271
249272 batches . each do |batch |
250273 unless batch . empty?
@@ -259,6 +282,7 @@ def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed:
259282
260283 begin
261284 yield batch
285+ @current_offsets [ batch . topic ] [ batch . partition ] = batch . last_offset
262286 rescue => e
263287 offset_range = ( batch . first_offset ..batch . last_offset )
264288 location = "#{ batch . topic } /#{ batch . partition } in offset range #{ offset_range } "
@@ -279,6 +303,12 @@ def each_batch(min_bytes: 1, max_wait_time: 1, automatically_mark_as_processed:
279303
280304 return if !@running
281305 end
306+
307+ # We may not have received any messages, but it's still a good idea to
308+ # commit offsets if we've processed messages in the last set of batches.
309+ # This also ensures the offsets are retained if we haven't read any messages
310+ # since the offset retention period has elapsed.
311+ @offset_manager . commit_offsets_if_necessary
282312 end
283313 end
284314
@@ -370,15 +400,13 @@ def join_group
370400 end
371401 end
372402
373- def fetch_batches ( min_bytes :, max_wait_time :)
403+ def fetch_batches ( min_bytes :, max_wait_time :, automatically_mark_as_processed : )
374404 join_group unless @group . member?
375405
376406 subscribed_partitions = @group . subscribed_partitions
377407
378408 @heartbeat . send_if_necessary
379409
380- raise NoPartitionsAssignedError if subscribed_partitions . empty?
381-
382410 operation = FetchOperation . new (
383411 cluster : @cluster ,
384412 logger : @logger ,
@@ -388,7 +416,18 @@ def fetch_batches(min_bytes:, max_wait_time:)
388416
389417 subscribed_partitions . each do |topic , partitions |
390418 partitions . each do |partition |
391- offset = @offset_manager . next_offset_for ( topic , partition )
419+ if automatically_mark_as_processed
420+ offset = @offset_manager . next_offset_for ( topic , partition )
421+ else
422+ # When automatic marking is off, the first poll needs to be based on the last committed
423+ # offset from Kafka, that's why we fallback in case of nil (it may not be 0)
424+ if @current_offsets [ topic ] . key? ( partition )
425+ offset = @current_offsets [ topic ] [ partition ] + 1
426+ else
427+ offset = @offset_manager . next_offset_for ( topic , partition )
428+ end
429+ end
430+
392431 max_bytes = @max_bytes . fetch ( topic )
393432
394433 if paused? ( topic , partition )
@@ -401,6 +440,13 @@ def fetch_batches(min_bytes:, max_wait_time:)
401440 end
402441
403442 operation . execute
443+ rescue NoPartitionsToFetchFrom
444+ backoff = max_wait_time > 0 ? max_wait_time : 1
445+
446+ @logger . info "There are no partitions to fetch from, sleeping for #{ backoff } s"
447+ sleep backoff
448+
449+ retry
404450 rescue OffsetOutOfRange => e
405451 @logger . error "Invalid offset for #{ e . topic } /#{ e . partition } , resetting to default offset"
406452
0 commit comments