Skip to content

Commit

Permalink
[queue] remove legacy (#158)
Browse files Browse the repository at this point in the history
* [queue] Fix off-by-one error

Our loop bounds were wrong here.

This isn't a big deal in len.lua because XLEN on a nonexistent key returns 0.

It's not a big deal in read.lua because we immediately take (offset + idx) %
streams, folding into the range [0, streams) anyway.  It just means we check the
original stream twice in the loop.

However this breaks if you copy-paste the pattern for some other purpose, like
calling XINFO GROUPS on a key.  That command errors if the key doesn't exist.

* [queue] Remove legacy code for checking old base streams

This is now fully rolled out, we have no base streams any more.  I have checked
this in all our clusters with:

    KEYS input:prediction:??-????????????????????????????????

and found a bunch of empty streams with no TTLs.  I manually deleted them.
  • Loading branch information
philandstuff authored Dec 4, 2024
1 parent dfa00e7 commit 3490956
Show file tree
Hide file tree
Showing 3 changed files with 5 additions and 89 deletions.
53 changes: 0 additions & 53 deletions queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,59 +238,6 @@ func TestClientReadIntegration(t *testing.T) {
}
}

func TestClientReadLegacyStreamIntegration(t *testing.T) {
ctx := test.Context(t)
rdb := test.Redis(ctx, t)

ttl := 24 * time.Hour
client := queue.NewClient(rdb, ttl)
require.NoError(t, client.Prepare(ctx))

// Prepare a queue with 4 streams
require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", 4).Err())

// But also populate the default stream
for i := range 10 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "myqueue",
Values: map[string]any{
"idx": fmt.Sprintf("default-%d", i),
},
}).Err())
}

for i := range 4 {
for j := range 10 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: fmt.Sprintf("myqueue:s%d", i),
Values: map[string]any{
"idx": fmt.Sprintf("%d-%d", i, j),
},
}).Err())
}
}

msgs := make(map[string]struct{})
for {
msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "myqueue",
Group: "mygroup",
Consumer: "mygroup:123",
})
if errors.Is(err, queue.Empty) {
break
}
require.NoError(t, err)
msgs[msg.Values["idx"].(string)] = struct{}{}
}

ttl, err := rdb.TTL(ctx, "myqueue:meta").Result()
require.NoError(t, err)
assert.Greater(t, ttl, 23*time.Hour)

assert.Len(t, msgs, 50)
}

func TestClientWriteIntegration(t *testing.T) {
ctx := test.Context(t)
rdb := test.Redis(ctx, t)
Expand Down
6 changes: 1 addition & 5 deletions queue/len.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ local key_meta = base .. ':meta'
local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)
local result = 0

-- LEGACY: Include the base stream. This can be removed once everything is using
-- new stream names.
result = result + redis.call('XLEN', base)

for idx = 0, streams do
for idx = 0, streams-1 do
result = result + redis.call('XLEN', base .. ':s' .. idx)
end

Expand Down
35 changes: 4 additions & 31 deletions queue/read.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,13 @@ local key_meta = base .. ':meta'
-- How many streams are available to read?
local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)

-- LEGACY: Check if a stream exists at the old name. If it does, it will be
-- checked alongside all other streams.
--
-- This can be removed once everything is writing to new stream names.
local check_default_stream = redis.call('XLEN', base) > 0

-- Loop over streams to find a message
local function hasprefix(str, prefix)
return string.sub(str, 1, string.len(prefix)) == prefix
end

local function checkstream (stream)
local grp
-- TODO: Remove this once we've migrated to using the new group everywhere.
-- This allows us to stop using the stream name as the consumer group name,
-- because new streams (`:sN`) will use the provided group name, while the old
-- stream will just use its own name as the group, which is the current
-- behavior.
if stream == base then
grp = base
else
grp = group
end
local reply = redis.pcall('XREADGROUP', 'GROUP', grp, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
local reply = redis.pcall('XREADGROUP', 'GROUP', group, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
-- false means a null reply from XREADGROUP, which means the stream is empty
if not reply then
return reply
Expand All @@ -59,10 +42,10 @@ local function checkstream (stream)
end

if hasprefix(reply['err'], 'NOGROUP No such key') then
redis.call('XGROUP', 'CREATE', stream, grp, '0', 'MKSTREAM')
redis.call('XGROUP', 'CREATE', stream, group, '0', 'MKSTREAM')
redis.call('EXPIRE', stream, ttl)
-- and try again, just once
return redis.pcall('XREADGROUP', 'GROUP', grp, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
return redis.pcall('XREADGROUP', 'GROUP', group, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
end

return reply
Expand All @@ -76,19 +59,9 @@ end
-- is appropriately wrapped before using it.
local offset = tonumber(redis.call('HGET', key_meta, 'offset') or 0)

for idx = 0, streams do
for idx = 0, streams-1 do
local streamid = (offset + idx) % streams

-- LEGACY: for now, if we're checking stream 0, also check the default stream.
if streamid == 0 and check_default_stream then
local reply = checkstream(base)
if reply then
redis.call('HSET', key_meta, 'offset', (offset + idx + 1) % streams)
redis.call('EXPIRE', key_meta, ttl)
return reply
end
end

local reply = checkstream(base .. ':s' .. streamid)
if reply then
redis.call('HSET', key_meta, 'offset', (offset + idx + 1) % streams)
Expand Down

0 comments on commit 3490956

Please sign in to comment.