From 6c095f7db4975fb30466a97f7fcf8b02a9cd137d Mon Sep 17 00:00:00 2001 From: Philip Potter Date: Mon, 9 Dec 2024 16:18:06 +0000 Subject: [PATCH] [queue] create PendingCount (to replace Lag) (#160) This adds a new PendingCount command which returns the aggregate number of pending messages in a queue. ----- I tried working with Lag but it has the problem that it is fundamentally heuristic, based on keeping a track of two counters: - how many messages have ever been added to the stream - how many messages have ever been read by the given consumer group "Lag" is then the difference between these counters - the number of messages that have been added to the stream but not read by the consumer group. Unfortunately, these counters can desync horribly if you ever XDEL a message before it gets read by a consumer group. This is because deleting a message does not decrement the "added to stream" counter, nor does it increment the "read by consumer group" counter. Currently we rely on being able to XDEL messages. Fixing that would be nontrivial. But we can measure the size of the PEL with XPENDING, and we can measure the length of the stream with XLEN, so we can calculate lag as PendingCount() - Len(). --- queue/client.go | 15 +++++--------- queue/client_test.go | 13 ++++-------- queue/lag.lua | 46 ------------------------------------------ queue/pendingcount.lua | 34 +++++++++++++++++++++++++++++++ queue/queue.go | 8 ++++---- 5 files changed, 47 insertions(+), 69 deletions(-) delete mode 100644 queue/lag.lua create mode 100644 queue/pendingcount.lua diff --git a/queue/client.go b/queue/client.go index 69b5813..780dcfb 100644 --- a/queue/client.go +++ b/queue/client.go @@ -43,16 +43,11 @@ func (c *Client) Len(ctx context.Context, name string) (int64, error) { return lenScript.RunRO(ctx, c.rdb, []string{name}).Int64() } -// Lag calculates the aggregate lag (that is, number of messages that have not -// yet been read) of the consumer group for the given queue. It adds up the -// consumer group's lag of all the streams in the queue, based on the value -// reported by XINFO GROUPS for the stream. If any of the streams report a -// `nil` lag, Lag returns an error. -// -// As a side effect, Lag creates the consumer group if it does not already -// exist. -func (c *Client) Lag(ctx context.Context, queue string, group string) (int64, error) { - return lagScript.Run(ctx, c.rdb, []string{queue}, group).Int64() +// PendingCount counts the aggregate pending entries (that is, number of +// messages that have been read but not acknowledged) of the consumer group for +// the given queue, as reported by XPENDING. +func (c *Client) PendingCount(ctx context.Context, queue string, group string) (int64, error) { + return pendingCountScript.RunRO(ctx, c.rdb, []string{queue}, group).Int64() } // Read a single message from the queue. If the Block field of args is diff --git a/queue/client_test.go b/queue/client_test.go index fe97643..51bb292 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -67,10 +67,6 @@ func TestClientIntegration(t *testing.T) { ids := make(map[string]struct{}) for i := range 15 { - lag, err := client.Lag(ctx, "test", "mygroup") - require.NoError(t, err) - assert.EqualValues(t, 15-i, lag) - msg, err := client.Read(ctx, &queue.ReadArgs{ Name: "test", Group: "mygroup", @@ -81,6 +77,10 @@ func TestClientIntegration(t *testing.T) { assert.Contains(t, msg.Values, "type") assert.Contains(t, msg.Values, "id") ids[msg.Values["id"].(string)] = struct{}{} + + pendingCount, err := client.PendingCount(ctx, "test", "mygroup") + require.NoError(t, err) + assert.EqualValues(t, i+1, pendingCount) } // We should have read all the messages we enqueued @@ -99,11 +99,6 @@ func TestClientIntegration(t *testing.T) { length, err = client.Len(ctx, "test") require.NoError(t, err) assert.EqualValues(t, 15, length) - - // But Lag is now 0 (because we've XREADGROUPed everything) - lag, err := client.Lag(ctx, "test", "mygroup") - require.NoError(t, err) - assert.EqualValues(t, 0, lag) } // Check that the Block option works as expected diff --git a/queue/lag.lua b/queue/lag.lua deleted file mode 100644 index 534aac7..0000000 --- a/queue/lag.lua +++ /dev/null @@ -1,46 +0,0 @@ --- Lag commands take the form --- --- EVALSHA sha 1 key group --- --- Note: strictly, it is illegal for a script to manipulate keys that are not --- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all --- keys are on the same server (e.g. in cluster scenarios). In our case a single --- queue, which may be composed of multiple streams and metadata keys, is always --- on the same server. - -local base = KEYS[1] -local group = ARGV[1] - -local key_meta = base .. ':meta' - -local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1) -local result = 0 - -for idx = 0, streams-1 do - local stream = base .. ':s' .. idx - - -- the group must exist for us to measure its lag. we create it here; if it - -- already exists this returns a BUSYGROUP error which we ignore - redis.pcall('XGROUP', 'CREATE', stream, group, '0', 'MKSTREAM') - - local info = redis.pcall('XINFO', 'GROUPS', stream) - if info['err'] == 'ERR no such key' then - -- if the stream doesn't exist, treat it as zero lag - elseif info['err'] then - return redis.error_reply(info['err']..' accessing '..stream) - else - for i,v in ipairs(info) do - if v[2] == group then - if not v[12] then - -- lag can be nil; we propagate this to the caller - return redis.error_reply('ERR unknown lag for group '..group..' on stream '..stream) - end - - result = result + v[12] - break - end - end - end -end - -return result diff --git a/queue/pendingcount.lua b/queue/pendingcount.lua new file mode 100644 index 0000000..9bb8f8e --- /dev/null +++ b/queue/pendingcount.lua @@ -0,0 +1,34 @@ +-- pendingcount commands take the form +-- +-- EVALSHA sha 1 key group +-- +-- Note: strictly, it is illegal for a script to manipulate keys that are not +-- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all +-- keys are on the same server (e.g. in cluster scenarios). In our case a single +-- queue, which may be composed of multiple streams and metadata keys, is always +-- on the same server. + +local base = KEYS[1] +local group = ARGV[1] + +local key_meta = base .. ':meta' + +local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1) +local result = 0 + +for idx = 0, streams-1 do + local stream = base .. ':s' .. idx + + local info = redis.pcall('XPENDING', stream, group) + if info['err'] then + if string.match(info['err'], '^NOGROUP ') then + -- if either the stream or group don't exist, there are zero pending entries + else + return redis.error_reply(info['err']..' accessing '..stream) + end + else + result = result + info[1] + end +end + +return result diff --git a/queue/queue.go b/queue/queue.go index 1003ec0..a996a3f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -44,9 +44,9 @@ var ( lenCmd string lenScript = redis.NewScript(lenCmd) - //go:embed lag.lua - lagCmd string - lagScript = redis.NewScript(lagCmd) + //go:embed pendingcount.lua + pendingCountCmd string + pendingCountScript = redis.NewScript(pendingCountCmd) //go:embed read.lua readCmd string @@ -61,7 +61,7 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error { if err := lenScript.Load(ctx, rdb).Err(); err != nil { return err } - if err := lagScript.Load(ctx, rdb).Err(); err != nil { + if err := pendingCountScript.Load(ctx, rdb).Err(); err != nil { return err } if err := readScript.Load(ctx, rdb).Err(); err != nil {