@@ -17,9 +17,9 @@ class ConsumerConfig(KafkaConfigBase, BaseSettings):
17
17
batch_consume_max_time_seconds : Optional [int ] = 10
18
18
batch_consume_store_messages : bool = False
19
19
heartbeat_timeout_ms : int = 4 * 60 // 2 * 1_000 # TODO document that this pairs with timeout_minutes
20
+ message_singleton_max_mb : int = 2
20
21
message_batch_max_mb : int = 5
21
22
message_queue_max_mb : int = 20
22
- message_total_max_mb : int = 2
23
23
poll_timeout_seconds : int = 5
24
24
timeout_minutes : int = 4 # TODO document that this pairs with heartbeat_timeout_ms
25
25
@@ -31,10 +31,10 @@ def as_client_dict(self):
31
31
return {
32
32
"auto.commit.interval.ms" : self .auto_commit_interval_seconds * 1_000 ,
33
33
"auto.offset.reset" : self .auto_offset_reset ,
34
- "fetch.max.bytes" : self .message_total_max_mb * (2 ** 20 ),
34
+ "fetch.max.bytes" : self .message_batch_max_mb * (2 ** 20 ),
35
35
"heartbeat.interval.ms" : (self .heartbeat_timeout_ms // 5 ) - ms_tolerance , # 5 failed heartbeats == bad consumer.
36
36
"max.poll.interval.ms" : self .timeout_minutes * 60_000 , # Max time between poll() calls before considered dead.
37
- "message.max.bytes" : self .message_batch_max_mb * (2 ** 20 ),
37
+ "message.max.bytes" : self .message_singleton_max_mb * (2 ** 20 ),
38
38
"queued.max.messages.kbytes" : self .message_queue_max_mb * (2 ** 10 ),
39
39
"session.timeout.ms" : self .heartbeat_timeout_ms , # need at least 1 heartbeat within "session" time to be considered alive;
40
40
}
0 commit comments