-
Notifications
You must be signed in to change notification settings - Fork 3
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[queue] create PendingCount (to replace Lag) #160
Conversation
This adds a new PendingCount command which returns the aggregate number of pending messages in a queue. ----- I tried working with Lag but it has the problem that it is fundamentally heuristic, based on keeping a track of two counters: - how many messages have ever been added to the stream - how many messages have ever been read by the given consumer group "Lag" is then the difference between these counters - the number of messages that have been added to the stream but not read by the consumer group. Unfortunately, these counters can desync horribly if you ever XDEL a message before it gets read by a consumer group. This is because deleting a message does not decrement the "added to stream" counter, nor does it increment the "read by consumer group" counter. Currently we rely on being able to XDEL messages. Fixing that would be nontrivial. But we can measure the size of the PEL with XPENDING, and we can measure the length of the stream with XLEN, so we can calculate lag as PendingCount() - Len().
This should be |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thinking about your PR description, this has the opposite problem in that it requires we do call XDEL
on everything after we're done with it. But since that's the behaviour of our system currently, and that behaviour is relied on by our autoscaler for working out how many pods to run, I think that's fine!
er.. yes |
This is just like #160, but we calculate length and pending count atomically. Before this change, I was subtracting the pending count from the length to get a "waiting messages" metric; but as these numbers could not be measured atomically I sometimes saw negative waiting messages. This changes PendingCount() into Stats() which returns both length and pending count.
This is just like #160, but we calculate length and pending count atomically. Before this change, I was subtracting the pending count from the length to get a "waiting messages" metric; but as these numbers could not be measured atomically I sometimes saw negative waiting messages. This changes PendingCount() into Stats() which returns both length and pending count.
This adds a new PendingCount command which returns the aggregate number of pending messages in a queue.
I tried working with Lag but it has the problem that it is fundamentally heuristic, based on keeping a track of two counters:
"Lag" is then the difference between these counters - the number of messages that have been added to the stream but not read by the consumer group.
Unfortunately, these counters can desync horribly if you ever XDEL a message before it gets read by a consumer group. This is because deleting a message does not decrement the "added to stream" counter, nor does it increment the "read by consumer group" counter.
Currently we rely on being able to XDEL messages. Fixing that would be nontrivial.
But we can measure the size of the PEL with XPENDING, and we can measure the length of the stream with XLEN, so we can calculate lag as Len() - PendingCount().