Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[queue] remove legacy #158

Merged
merged 2 commits into from
Dec 4, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 0 additions & 53 deletions queue/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,59 +238,6 @@ func TestClientReadIntegration(t *testing.T) {
}
}

func TestClientReadLegacyStreamIntegration(t *testing.T) {
ctx := test.Context(t)
rdb := test.Redis(ctx, t)

ttl := 24 * time.Hour
client := queue.NewClient(rdb, ttl)
require.NoError(t, client.Prepare(ctx))

// Prepare a queue with 4 streams
require.NoError(t, rdb.HSet(ctx, "myqueue:meta", "streams", 4).Err())

// But also populate the default stream
for i := range 10 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: "myqueue",
Values: map[string]any{
"idx": fmt.Sprintf("default-%d", i),
},
}).Err())
}

for i := range 4 {
for j := range 10 {
require.NoError(t, rdb.XAdd(ctx, &redis.XAddArgs{
Stream: fmt.Sprintf("myqueue:s%d", i),
Values: map[string]any{
"idx": fmt.Sprintf("%d-%d", i, j),
},
}).Err())
}
}

msgs := make(map[string]struct{})
for {
msg, err := client.Read(ctx, &queue.ReadArgs{
Name: "myqueue",
Group: "mygroup",
Consumer: "mygroup:123",
})
if errors.Is(err, queue.Empty) {
break
}
require.NoError(t, err)
msgs[msg.Values["idx"].(string)] = struct{}{}
}

ttl, err := rdb.TTL(ctx, "myqueue:meta").Result()
require.NoError(t, err)
assert.Greater(t, ttl, 23*time.Hour)

assert.Len(t, msgs, 50)
}

func TestClientWriteIntegration(t *testing.T) {
ctx := test.Context(t)
rdb := test.Redis(ctx, t)
Expand Down
6 changes: 1 addition & 5 deletions queue/len.lua
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,7 @@ local key_meta = base .. ':meta'
local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)
local result = 0

-- LEGACY: Include the base stream. This can be removed once everything is using
-- new stream names.
result = result + redis.call('XLEN', base)

for idx = 0, streams do
for idx = 0, streams-1 do
result = result + redis.call('XLEN', base .. ':s' .. idx)
end

Expand Down
35 changes: 4 additions & 31 deletions queue/read.lua
Original file line number Diff line number Diff line change
Expand Up @@ -25,30 +25,13 @@ local key_meta = base .. ':meta'
-- How many streams are available to read?
local streams = tonumber(redis.call('HGET', key_meta, 'streams') or 1)

-- LEGACY: Check if a stream exists at the old name. If it does, it will be
-- checked alongside all other streams.
--
-- This can be removed once everything is writing to new stream names.
local check_default_stream = redis.call('XLEN', base) > 0

-- Loop over streams to find a message
local function hasprefix(str, prefix)
return string.sub(str, 1, string.len(prefix)) == prefix
end

local function checkstream (stream)
local grp
-- TODO: Remove this once we've migrated to using the new group everywhere.
-- This allows us to stop using the stream name as the consumer group name,
-- because new streams (`:sN`) will use the provided group name, while the old
-- stream will just use its own name as the group, which is the current
-- behavior.
if stream == base then
grp = base
else
grp = group
end
local reply = redis.pcall('XREADGROUP', 'GROUP', grp, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
local reply = redis.pcall('XREADGROUP', 'GROUP', group, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
-- false means a null reply from XREADGROUP, which means the stream is empty
if not reply then
return reply
Expand All @@ -59,10 +42,10 @@ local function checkstream (stream)
end

if hasprefix(reply['err'], 'NOGROUP No such key') then
redis.call('XGROUP', 'CREATE', stream, grp, '0', 'MKSTREAM')
redis.call('XGROUP', 'CREATE', stream, group, '0', 'MKSTREAM')
redis.call('EXPIRE', stream, ttl)
-- and try again, just once
return redis.pcall('XREADGROUP', 'GROUP', grp, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
return redis.pcall('XREADGROUP', 'GROUP', group, consumer, 'COUNT', 1, 'STREAMS', stream, '>')
end

return reply
Expand All @@ -76,19 +59,9 @@ end
-- is appropriately wrapped before using it.
local offset = tonumber(redis.call('HGET', key_meta, 'offset') or 0)

for idx = 0, streams do
for idx = 0, streams-1 do
local streamid = (offset + idx) % streams

-- LEGACY: for now, if we're checking stream 0, also check the default stream.
if streamid == 0 and check_default_stream then
local reply = checkstream(base)
if reply then
redis.call('HSET', key_meta, 'offset', (offset + idx + 1) % streams)
redis.call('EXPIRE', key_meta, ttl)
return reply
end
end

local reply = checkstream(base .. ':s' .. streamid)
if reply then
redis.call('HSET', key_meta, 'offset', (offset + idx + 1) % streams)
Expand Down
Loading