diff --git a/queue/client.go b/queue/client.go index 69b5813..780dcfb 100644 --- a/queue/client.go +++ b/queue/client.go @@ -43,16 +43,11 @@ func (c *Client) Len(ctx context.Context, name string) (int64, error) { return lenScript.RunRO(ctx, c.rdb, []string{name}).Int64() } -// Lag calculates the aggregate lag (that is, number of messages that have not -// yet been read) of the consumer group for the given queue. It adds up the -// consumer group's lag of all the streams in the queue, based on the value -// reported by XINFO GROUPS for the stream. If any of the streams report a -// `nil` lag, Lag returns an error. -// -// As a side effect, Lag creates the consumer group if it does not already -// exist. -func (c *Client) Lag(ctx context.Context, queue string, group string) (int64, error) { - return lagScript.Run(ctx, c.rdb, []string{queue}, group).Int64() +// PendingCount counts the aggregate pending entries (that is, number of +// messages that have been read but not acknowledged) of the consumer group for +// the given queue, as reported by XPENDING. +func (c *Client) PendingCount(ctx context.Context, queue string, group string) (int64, error) { + return pendingCountScript.RunRO(ctx, c.rdb, []string{queue}, group).Int64() } // Read a single message from the queue. If the Block field of args is diff --git a/queue/client_test.go b/queue/client_test.go index fe97643..51bb292 100644 --- a/queue/client_test.go +++ b/queue/client_test.go @@ -67,10 +67,6 @@ func TestClientIntegration(t *testing.T) { ids := make(map[string]struct{}) for i := range 15 { - lag, err := client.Lag(ctx, "test", "mygroup") - require.NoError(t, err) - assert.EqualValues(t, 15-i, lag) - msg, err := client.Read(ctx, &queue.ReadArgs{ Name: "test", Group: "mygroup", @@ -81,6 +77,10 @@ func TestClientIntegration(t *testing.T) { assert.Contains(t, msg.Values, "type") assert.Contains(t, msg.Values, "id") ids[msg.Values["id"].(string)] = struct{}{} + + pendingCount, err := client.PendingCount(ctx, "test", "mygroup") + require.NoError(t, err) + assert.EqualValues(t, i+1, pendingCount) } // We should have read all the messages we enqueued @@ -99,11 +99,6 @@ func TestClientIntegration(t *testing.T) { length, err = client.Len(ctx, "test") require.NoError(t, err) assert.EqualValues(t, 15, length) - - // But Lag is now 0 (because we've XREADGROUPed everything) - lag, err := client.Lag(ctx, "test", "mygroup") - require.NoError(t, err) - assert.EqualValues(t, 0, lag) } // Check that the Block option works as expected diff --git a/queue/lag.lua b/queue/lag.lua deleted file mode 100644 index 534aac7..0000000 --- a/queue/lag.lua +++ /dev/null @@ -1,46 +0,0 @@ --- Lag commands take the form --- --- EVALSHA sha 1 key group --- --- Note: strictly, it is illegal for a script to manipulate keys that are not --- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all --- keys are on the same server (e.g. in cluster scenarios). In our case a single --- queue, which may be composed of multiple streams and metadata keys, is always --- on the same server. - -local base = KEYS[1] -local group = ARGV[1] - -local key_meta = base .. ':meta' - -local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1) -local result = 0 - -for idx = 0, streams-1 do - local stream = base .. ':s' .. idx - - -- the group must exist for us to measure its lag. we create it here; if it - -- already exists this returns a BUSYGROUP error which we ignore - redis.pcall('XGROUP', 'CREATE', stream, group, '0', 'MKSTREAM') - - local info = redis.pcall('XINFO', 'GROUPS', stream) - if info['err'] == 'ERR no such key' then - -- if the stream doesn't exist, treat it as zero lag - elseif info['err'] then - return redis.error_reply(info['err']..' accessing '..stream) - else - for i,v in ipairs(info) do - if v[2] == group then - if not v[12] then - -- lag can be nil; we propagate this to the caller - return redis.error_reply('ERR unknown lag for group '..group..' on stream '..stream) - end - - result = result + v[12] - break - end - end - end -end - -return result diff --git a/queue/pendingcount.lua b/queue/pendingcount.lua new file mode 100644 index 0000000..9bb8f8e --- /dev/null +++ b/queue/pendingcount.lua @@ -0,0 +1,34 @@ +-- pendingcount commands take the form +-- +-- EVALSHA sha 1 key group +-- +-- Note: strictly, it is illegal for a script to manipulate keys that are not +-- explicitly passed to EVAL{,SHA}, but in practice this is fine as long as all +-- keys are on the same server (e.g. in cluster scenarios). In our case a single +-- queue, which may be composed of multiple streams and metadata keys, is always +-- on the same server. + +local base = KEYS[1] +local group = ARGV[1] + +local key_meta = base .. ':meta' + +local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1) +local result = 0 + +for idx = 0, streams-1 do + local stream = base .. ':s' .. idx + + local info = redis.pcall('XPENDING', stream, group) + if info['err'] then + if string.match(info['err'], '^NOGROUP ') then + -- if either the stream or group don't exist, there are zero pending entries + else + return redis.error_reply(info['err']..' accessing '..stream) + end + else + result = result + info[1] + end +end + +return result diff --git a/queue/queue.go b/queue/queue.go index 1003ec0..a996a3f 100644 --- a/queue/queue.go +++ b/queue/queue.go @@ -44,9 +44,9 @@ var ( lenCmd string lenScript = redis.NewScript(lenCmd) - //go:embed lag.lua - lagCmd string - lagScript = redis.NewScript(lagCmd) + //go:embed pendingcount.lua + pendingCountCmd string + pendingCountScript = redis.NewScript(pendingCountCmd) //go:embed read.lua readCmd string @@ -61,7 +61,7 @@ func prepare(ctx context.Context, rdb redis.Cmdable) error { if err := lenScript.Load(ctx, rdb).Err(); err != nil { return err } - if err := lagScript.Load(ctx, rdb).Err(); err != nil { + if err := pendingCountScript.Load(ctx, rdb).Err(); err != nil { return err } if err := readScript.Load(ctx, rdb).Err(); err != nil {