Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions v1/brokers/redis/goredis.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
closeGoroutineWhenReturn := make(chan int)

// A receiving goroutine keeps popping messages from the queue by BLPOP
// If the message is valid and can be unmarshaled into a proper structure
Expand All @@ -115,6 +116,9 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce

for {
select {
case <-closeGoroutineWhenReturn:
close(deliveries)
return
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
close(deliveries)
Expand All @@ -139,6 +143,8 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce

for {
select {
case <-closeGoroutineWhenReturn:
return
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
return
Expand All @@ -163,12 +169,20 @@ func (b *BrokerGR) StartConsuming(consumerTag string, concurrency int, taskProce
}()

if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
close(closeGoroutineWhenReturn)
for d := range deliveries {
b.requeueMessage(context.Background(), d)
}
return b.GetRetry(), err
}

// Waiting for any tasks being processed to finish
b.processingWG.Wait()

close(closeGoroutineWhenReturn)
for d := range deliveries {
b.requeueMessage(context.Background(), d)
}
return b.GetRetry(), nil
}

Expand All @@ -183,6 +197,19 @@ func (b *BrokerGR) StopConsuming() {
b.rclient.Close()
}

func (b *BrokerGR) requeueMessage(ctx context.Context, task []byte) {
signature := new(tasks.Signature)
decoder := json.NewDecoder(bytes.NewReader(task))
decoder.UseNumber()
if err := decoder.Decode(signature); err != nil {
log.ERROR.Print(errs.NewErrCouldNotUnmarshalTaskSignature(task, err))
}

if err := b.Publish(context.Background(), signature); err != nil {
log.ERROR.Print(err)
}
}

// Publish places a new message on the default queue
func (b *BrokerGR) Publish(ctx context.Context, signature *tasks.Signature) error {
// Adjust routing key (this decides which queue the message will be published to)
Expand Down
14 changes: 14 additions & 0 deletions v1/brokers/redis/redis.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,7 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess
for i := 0; i < concurrency; i++ {
pool <- struct{}{}
}
closeGoroutineWhenReturn := make(chan int)

// A receiving goroutine keeps popping messages from the queue by BLPOP
// If the message is valid and can be unmarshaled into a proper structure
Expand All @@ -106,6 +107,9 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess

for {
select {
case <-closeGoroutineWhenReturn:
close(deliveries)
return
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
close(deliveries)
Expand Down Expand Up @@ -139,6 +143,8 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess

for {
select {
case <-closeGoroutineWhenReturn:
return
// A way to stop this goroutine from b.StopConsuming
case <-b.GetStopChan():
return
Expand All @@ -163,12 +169,20 @@ func (b *Broker) StartConsuming(consumerTag string, concurrency int, taskProcess
}()

if err := b.consume(deliveries, concurrency, taskProcessor); err != nil {
close(closeGoroutineWhenReturn)
for d := range deliveries {
b.requeueMessage(d, taskProcessor)
}
return b.GetRetry(), err
}

// Waiting for any tasks being processed to finish
b.processingWG.Wait()

close(closeGoroutineWhenReturn)
for d := range deliveries {
b.requeueMessage(d, taskProcessor)
}
return b.GetRetry(), nil
}

Expand Down