-
Notifications
You must be signed in to change notification settings - Fork 232
Messages lost when broker closes connection #814
Comments
Thanks @iciclespider for the detailed report. |
A race condition can also trigger this for asynchronous producers. To reliably reproduce this behaviour, configure a kafka broker with an artificially low idle client disconnect time (say This effects any real world system that sporadically sends batches of messages, since the default idle timeout is 10 minutes. |
Thanks @andreweland for clarifying. This is one of the highest-priority issues at the moment. If anyone reading this would like to take a stab at fixing it, this is a great opportunity for contribution. |
@emmett9001 do you remember why the early return was necessary (ie this commit)? The fix should be a case of removing that, and then locking the |
Looks like #595 is the original PR containing that commit. The description has a good outline of my thinking at that time. |
Already queued messages are lost when a SocketDisconnectError is detected here.
This is occurring because the
Producer._update
method first closes all OwnedBrokers here, which results in the OwnedBroker settingself.running
to False here. Then,Producer._update
callsProducer._setup_owned_brokers
to reestablish the broker connection.Producer._setup_owned_brokers
(which also does it's ownOwnedBroker.stop
call) callsOwnedBrokers.flush
to collect all currently queued messages here.The attempt to collect all currently queued messages will never return any messages, because
OwnedBroker.flush
checks to see ifself.running
is False and if so, returns an empty list here.self.running
will be False, because that OwnedBroker was stopped.I discovered this in a long running test that in the course of queuing 3,200,000 messages, a couple of dozen messages would go missing.
The text was updated successfully, but these errors were encountered: