Skip to content

Commit

Permalink
Fix:测试用例添加结束判断
Browse files Browse the repository at this point in the history
  • Loading branch information
by1aN authored and HDT3213 committed Mar 26, 2024
1 parent 2880c32 commit b4e9e62
Showing 1 changed file with 11 additions and 5 deletions.
16 changes: 11 additions & 5 deletions delayqueue_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -167,22 +167,28 @@ func TestDelayQueue_StopConsume(t *testing.T) {
}

func TestDelayQueue_AsyncConsume(t *testing.T) {
size := 10
redisCli := redis.NewClient(&redis.Options{
Addr: "127.0.0.1:6379",
})
redisCli.FlushDB(context.Background())
queue := NewQueue("exampleAsync", redisCli, func(payload string) bool {
// callback returns true to confirm successful consumption.
// If callback returns false or not return within maxConsumeDuration, DelayQueue will re-deliver this message
var queue *DelayQueue
var received int
queue = NewQueue("exampleAsync", redisCli, func(payload string) bool {
println(payload)
received++
if received == size {
queue.StopConsume()
t.Log("send stop signal")
}
return true
}).WithDefaultRetryCount(1)

// send schedule message
go func() {
for {
time.Sleep(time.Second * 1)
err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*2))
time.Sleep(time.Millisecond * 500)
err := queue.SendScheduleMsg(time.Now().String(), time.Now().Add(time.Second*1))
if err != nil {
panic(err)
}
Expand Down

0 comments on commit b4e9e62

Please sign in to comment.