diff --git a/v1/brokers/redis/goredis.go b/v1/brokers/redis/goredis.go index 7cf2e66b..cdcb7fb7 100644 --- a/v1/brokers/redis/goredis.go +++ b/v1/brokers/redis/goredis.go @@ -105,6 +105,7 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce for i := 0; i < concurrency; i++ { pool <- struct{}{} } + closeGoroutineWhenReturn := make(chan int) // A receiving goroutine keeps popping messages from the queue by BLPOP // If the message is valid and can be unmarshaled into a proper structure @@ -115,6 +116,9 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce for { select { + case <-closeGoroutineWhenReturn: + close(deliveries) + return // A way to stop this goroutine from b.StopConsuming case <-b.GetStopChan(): close(deliveries) @@ -139,6 +143,8 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce for { select { + case <-closeGoroutineWhenReturn: + return // A way to stop this goroutine from b.StopConsuming case <-b.GetStopChan(): return @@ -163,12 +169,20 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce }() if err := b.consume(deliveries, concurrency, taskProcessor); err != nil { + close(closeGoroutineWhenReturn) + for d := range deliveries { + b.requeueMessage(context.Background(), d) + } return b.GetRetry(), err } // Waiting for any tasks being processed to finish b.processingWG.Wait() + close(closeGoroutineWhenReturn) + for d := range deliveries { + b.requeueMessage(context.Background(), d) + } return b.GetRetry(), nil } @@ -183,6 +197,19 @@ func (b *BrokerGR) StopConsuming() { b.rclient.Close() } +func (b *BrokerGR) requeueMessage(ctx context.Context, task []byte) { + signature := new(tasks.Signature) + decoder := json.NewDecoder(bytes.NewReader(task)) + decoder.UseNumber() + if err := decoder.Decode(signature); err != nil { + log.ERROR.Print(errs.NewErrCouldNotUnmarshalTaskSignature(task, err)) + } + + if err := b.Publish(context.Background(), signature); err != nil { + log.ERROR.Print(err) + } +} + // Publish places a new message on the default queue func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) error { // Adjust routing key (this decides which queue the message will be published to) diff --git a/v1/brokers/redis/redis.go b/v1/brokers/redis/redis.go index 1683f13e..3d5e70ae 100644 --- a/v1/brokers/redis/redis.go +++ b/v1/brokers/redis/redis.go @@ -96,6 +96,7 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess for i := 0; i < concurrency; i++ { pool <- struct{}{} } + closeGoroutineWhenReturn := make(chan int) // A receiving goroutine keeps popping messages from the queue by BLPOP // If the message is valid and can be unmarshaled into a proper structure @@ -106,6 +107,9 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess for { select { + case <-closeGoroutineWhenReturn: + close(deliveries) + return // A way to stop this goroutine from b.StopConsuming case <-b.GetStopChan(): close(deliveries) @@ -139,6 +143,8 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess for { select { + case <-closeGoroutineWhenReturn: + return // A way to stop this goroutine from b.StopConsuming case <-b.GetStopChan(): return @@ -163,12 +169,20 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess }() if err := b.consume(deliveries, concurrency, taskProcessor); err != nil { + close(closeGoroutineWhenReturn) + for d := range deliveries { + b.requeueMessage(d, taskProcessor) + } return b.GetRetry(), err } // Waiting for any tasks being processed to finish b.processingWG.Wait() + close(closeGoroutineWhenReturn) + for d := range deliveries { + b.requeueMessage(d, taskProcessor) + } return b.GetRetry(), nil }