Skip to content

Commit

Permalink
support redis cluster
Browse files Browse the repository at this point in the history
  • Loading branch information
HDT3213 committed Aug 20, 2023
1 parent a8447bd commit 023e160
Show file tree
Hide file tree
Showing 7 changed files with 341 additions and 143 deletions.
41 changes: 37 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ Core Advantages:
- Auto retry failed messages
- Works out of the box, Config Nothing and Deploy Nothing, A Redis is all you need.
- Natively adapted to the distributed environment, messages processed concurrently on multiple machines
. workers can be added, removed or migrated at any time
. Workers can be added, removed or migrated at any time
- Support Redis Cluster for high availability

## Install

Expand All @@ -26,8 +27,6 @@ DelayQueue requires a Go version with modules support. Run following command lin
go get github.com/hdt3213/delayqueue
```

> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
## Get Started

```go
Expand Down Expand Up @@ -69,6 +68,9 @@ func main() {
}
```

> if you are using github.com/go-redis/redis/v8 please use `go get github.com/hdt3213/delayqueue@v8`
> If you are using redis client other than go-redis, you could wrap your redis client into [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) interface
## Options

```go
Expand Down Expand Up @@ -125,7 +127,38 @@ WithDefaultRetryCount customizes the max number of retry, it effects of messages

use WithRetryCount during DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg to specific retry count of particular message

# More Details
## Cluster

If you are using Redis Cluster, please use `NewQueueOnCluster`

```go
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
},
})
callback := func(s string) bool {
return true
}
queue := NewQueueOnCluster("test", redisCli, callback)
```

If you are using transparent clusters, such as codis, twemproxy, or the redis of cluster architecture on aliyun, tencentcloud,
just use `NewQueue` and enable hash tag

```go
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
return true
}
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())
```

## More Details

Here is the complete flowchart:

Expand Down
34 changes: 34 additions & 0 deletions README_CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ DelayQueue 的主要优势:
- 自动重试处理失败的消息
- 开箱即用, 无需部署或安装中间件, 只需要一个 Redis 即可工作
- 原生适配分布式环境, 可在多台机器上并发的处理消息. 可以随时增加、减少或迁移 Worker
- 支持各类 Redis 集群

# 安装

Expand Down Expand Up @@ -64,6 +65,9 @@ func main() {
}
```

> 如果您仍在使用 redis/v8 请使用 v8 分支: `go get github.com/hdt3213/delayqueue@v8`
> 如果您在使用其他的 redis 客户端, 可以将其包装到 [RedisCli](https://pkg.go.dev/github.com/hdt3213/delayqueue#RedisCli) 接口中
# 选项

```go
Expand Down Expand Up @@ -117,6 +121,36 @@ WithDefaultRetryCount(count uint)

在调用 DelayQueue.SendScheduleMsg or DelayQueue.SendDelayMsg 发送消息时,可以调用 WithRetryCount 为这条消息单独指定重试次数。

# 集群

如果需要在 Redis Cluster 上工作, 请使用 `NewQueueOnCluster`:

```go
redisCli := redis.NewClusterClient(&redis.ClusterOptions{
Addrs: []string{
"127.0.0.1:7000",
"127.0.0.1:7001",
"127.0.0.1:7002",
},
})
callback := func(s string) bool {
return true
}
queue := NewQueueOnCluster("test", redisCli, callback)
```

如果是阿里云,腾讯云的 Redis 集群版或 codis, twemproxy 这类透明式的集群, 使用 `NewQueue` 并启用 UseHashTagKey() 即可:

```go
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
callback := func(s string) bool {
return true
}
queue := delayqueue.NewQueue("example", redisCli, callback, UseHashTagKey())
```

# 更多细节

完整流程如图所示:
Expand Down
97 changes: 54 additions & 43 deletions delayqueue.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,11 @@
package delayqueue

import (
"context"
"errors"
"fmt"
"github.com/google/uuid"
"github.com/redis/go-redis/v9"
"log"
"strconv"
"sync"
"time"
)
Expand All @@ -14,14 +14,15 @@ import (
type DelayQueue struct {
// name for this Queue. Make sure the name is unique in redis database
name string
redisCli *redis.Client
redisCli RedisCli
cb func(string) bool
pendingKey string // sorted set: message id -> delivery time
readyKey string // list
unAckKey string // sorted set: message id -> retry time
retryKey string // list
retryCountKey string // hash: message id -> remain retry count
garbageKey string // set: message id
useHashTag bool
ticker *time.Ticker
logger *log.Logger
close chan struct{}
Expand All @@ -31,8 +32,24 @@ type DelayQueue struct {
defaultRetryCount uint
fetchInterval time.Duration
fetchLimit uint
concurrent uint
}

concurrent uint
// NilErr represents redis nil
var NilErr = errors.New("nil")

// RedisCli is abstraction for redis client, required commands only not all commands
type RedisCli interface {
Eval(script string, keys []string, args []interface{}) (interface{}, error) // args should be string, integer or float
Set(key string, value string, expiration time.Duration) error
Get(key string) (string, error)
Del(keys []string) error
HSet(key string, field string, value string) error
HDel(key string, fields []string) error
SMembers(key string) ([]string, error)
SRem(key string, members []string) error
ZAdd(key string, values map[string]float64) error
ZRem(key string, fields []string) error
}

type hashTagKeyOpt int
Expand All @@ -45,9 +62,9 @@ func UseHashTagKey() interface{} {
return hashTagKeyOpt(1)
}

// NewQueue creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
// NewQueue0 creates a new queue, use DelayQueue.StartConsume to consume or DelayQueue.SendScheduleMsg to publish message
// callback returns true to confirm successful consumption. If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts ...interface{}) *DelayQueue {
func NewQueue0(name string, cli RedisCli, callback func(string) bool, opts ...interface{}) *DelayQueue {
if name == "" {
panic("name is required")
}
Expand Down Expand Up @@ -80,6 +97,7 @@ func NewQueue(name string, cli *redis.Client, callback func(string) bool, opts .
retryKey: keyPrefix + ":retry",
retryCountKey: keyPrefix + ":retry:cnt",
garbageKey: keyPrefix + ":garbage",
useHashTag: useHashTag,
close: make(chan struct{}, 1),
maxConsumeDuration: 5 * time.Second,
msgTTL: time.Hour,
Expand Down Expand Up @@ -132,6 +150,9 @@ func (q *DelayQueue) WithDefaultRetryCount(count uint) *DelayQueue {
}

func (q *DelayQueue) genMsgKey(idStr string) string {
if q.useHashTag {
return "{dp:" + q.name + "}" + ":msg:" + idStr
}
return "dp:" + q.name + ":msg:" + idStr
}

Expand Down Expand Up @@ -165,21 +186,20 @@ func (q *DelayQueue) SendScheduleMsg(payload string, t time.Time, opts ...interf
}
// generate id
idStr := uuid.Must(uuid.NewRandom()).String()
ctx := context.Background()
now := time.Now()
// store msg
msgTTL := t.Sub(now) + q.msgTTL // delivery + q.msgTTL
err := q.redisCli.Set(ctx, q.genMsgKey(idStr), payload, msgTTL).Err()
err := q.redisCli.Set(q.genMsgKey(idStr), payload, msgTTL)
if err != nil {
return fmt.Errorf("store msg failed: %v", err)
}
// store retry count
err = q.redisCli.HSet(ctx, q.retryCountKey, idStr, retryCount).Err()
err = q.redisCli.HSet(q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
if err != nil {
return fmt.Errorf("store retry count failed: %v", err)
}
// put to pending
err = q.redisCli.ZAdd(ctx, q.pendingKey, redis.Z{Score: float64(t.Unix()), Member: idStr}).Err()
err = q.redisCli.ZAdd(q.pendingKey, map[string]float64{idStr: float64(t.Unix())})
if err != nil {
return fmt.Errorf("push to pending failed: %v", err)
}
Expand Down Expand Up @@ -214,10 +234,9 @@ redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from pendi

func (q *DelayQueue) pending2Ready() error {
now := time.Now().Unix()
ctx := context.Background()
keys := []string{q.pendingKey, q.readyKey}
err := q.redisCli.Eval(ctx, pending2ReadyScript, keys, now).Err()
if err != nil && err != redis.Nil {
_, err := q.redisCli.Eval(pending2ReadyScript, keys, []interface{}{now})
if err != nil && err != NilErr {
return fmt.Errorf("pending2ReadyScript failed: %v", err)
}
return nil
Expand All @@ -235,10 +254,9 @@ return msg

func (q *DelayQueue) ready2Unack() (string, error) {
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
ctx := context.Background()
keys := []string{q.readyKey, q.unAckKey}
ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime).Result()
if err == redis.Nil {
ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime})
if err == NilErr {
return "", err
}
if err != nil {
Expand All @@ -253,11 +271,10 @@ func (q *DelayQueue) ready2Unack() (string, error) {

func (q *DelayQueue) retry2Unack() (string, error) {
retryTime := time.Now().Add(q.maxConsumeDuration).Unix()
ctx := context.Background()
keys := []string{q.retryKey, q.unAckKey}
ret, err := q.redisCli.Eval(ctx, ready2UnackScript, keys, retryTime, q.retryKey, q.unAckKey).Result()
if err == redis.Nil {
return "", redis.Nil
ret, err := q.redisCli.Eval(ready2UnackScript, keys, []interface{}{retryTime, q.retryKey, q.unAckKey})
if err == NilErr {
return "", NilErr
}
if err != nil {
return "", fmt.Errorf("ready2UnackScript failed: %v", err)
Expand All @@ -270,9 +287,8 @@ func (q *DelayQueue) retry2Unack() (string, error) {
}

func (q *DelayQueue) callback(idStr string) error {
ctx := context.Background()
payload, err := q.redisCli.Get(ctx, q.genMsgKey(idStr)).Result()
if err == redis.Nil {
payload, err := q.redisCli.Get(q.genMsgKey(idStr))
if err == NilErr {
return nil
}
if err != nil {
Expand Down Expand Up @@ -326,24 +342,21 @@ func (q *DelayQueue) batchCallback(ids []string) {
}

func (q *DelayQueue) ack(idStr string) error {
ctx := context.Background()
err := q.redisCli.ZRem(ctx, q.unAckKey, idStr).Err()
err := q.redisCli.ZRem(q.unAckKey, []string{idStr})
if err != nil {
return fmt.Errorf("remove from unack failed: %v", err)
}
// msg key has ttl, ignore result of delete
_ = q.redisCli.Del(ctx, q.genMsgKey(idStr)).Err()
q.redisCli.HDel(ctx, q.retryCountKey, idStr)
_ = q.redisCli.Del([]string{q.genMsgKey(idStr)})
_ = q.redisCli.HDel(q.retryCountKey, []string{idStr})
return nil
}

func (q *DelayQueue) nack(idStr string) error {
ctx := context.Background()
// update retry time as now, unack2Retry will move it to retry immediately
err := q.redisCli.ZAdd(ctx, q.unAckKey, redis.Z{
Member: idStr,
Score: float64(time.Now().Unix()),
}).Err()
err := q.redisCli.ZAdd(q.unAckKey, map[string]float64{
idStr: float64(time.Now().Unix()),
})
if err != nil {
return fmt.Errorf("negative ack failed: %v", err)
}
Expand Down Expand Up @@ -392,19 +405,17 @@ redis.call('ZRemRangeByScore', KEYS[1], '0', ARGV[1]) -- remove msgs from unack
`

func (q *DelayQueue) unack2Retry() error {
ctx := context.Background()
keys := []string{q.unAckKey, q.retryCountKey, q.retryKey, q.garbageKey}
now := time.Now()
err := q.redisCli.Eval(ctx, unack2RetryScript, keys, now.Unix()).Err()
if err != nil && err != redis.Nil {
_, err := q.redisCli.Eval(unack2RetryScript, keys, []interface{}{now.Unix()})
if err != nil && err != NilErr {
return fmt.Errorf("unack to retry script failed: %v", err)
}
return nil
}

func (q *DelayQueue) garbageCollect() error {
ctx := context.Background()
msgIds, err := q.redisCli.SMembers(ctx, q.garbageKey).Result()
msgIds, err := q.redisCli.SMembers(q.garbageKey)
if err != nil {
return fmt.Errorf("smembers failed: %v", err)
}
Expand All @@ -416,12 +427,12 @@ func (q *DelayQueue) garbageCollect() error {
for _, idStr := range msgIds {
msgKeys = append(msgKeys, q.genMsgKey(idStr))
}
err = q.redisCli.Del(ctx, msgKeys...).Err()
if err != nil && err != redis.Nil {
err = q.redisCli.Del(msgKeys)
if err != nil && err != NilErr {
return fmt.Errorf("del msgs failed: %v", err)
}
err = q.redisCli.SRem(ctx, q.garbageKey, msgIds).Err()
if err != nil && err != redis.Nil {
err = q.redisCli.SRem(q.garbageKey, msgIds)
if err != nil && err != NilErr {
return fmt.Errorf("remove from garbage key failed: %v", err)
}
return nil
Expand All @@ -437,7 +448,7 @@ func (q *DelayQueue) consume() error {
ids := make([]string, 0, q.fetchLimit)
for {
idStr, err := q.ready2Unack()
if err == redis.Nil { // consumed all
if err == NilErr { // consumed all
break
}
if err != nil {
Expand All @@ -464,7 +475,7 @@ func (q *DelayQueue) consume() error {
ids = make([]string, 0, q.fetchLimit)
for {
idStr, err := q.retry2Unack()
if err == redis.Nil { // consumed all
if err == NilErr { // consumed all
break
}
if err != nil {
Expand Down
Loading

0 comments on commit 023e160

Please sign in to comment.