diff --git a/v1/brokers/sqs/sqs.go b/v1/brokers/sqs/sqs.go index f811e16e5..a55caf573 100644 --- a/v1/brokers/sqs/sqs.go +++ b/v1/brokers/sqs/sqs.go @@ -200,13 +200,15 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor return errors.New("received empty message, the delivery is " + delivery.GoString()) } + qURL := b.defaultQueueURL() + sig := new(tasks.Signature) decoder := json.NewDecoder(strings.NewReader(*delivery.Messages[0].Body)) decoder.UseNumber() if err := decoder.Decode(sig); err != nil { log.ERROR.Printf("unmarshal error. the delivery is %v", delivery) // if the unmarshal fails, remove the delivery from the queue - if delErr := b.deleteOne(delivery); delErr != nil { + if delErr := b.deleteOne(delivery, qURL); delErr != nil { log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, delErr) } return err @@ -215,11 +217,16 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor sig.SQSReceiptHandle = *delivery.Messages[0].ReceiptHandle } + if sig.RoutingKey != "" { + routingKeyQueueUrl := b.GetConfig().Broker + "/" + sig.RoutingKey + qURL = &routingKeyQueueUrl + } + // If the task is not registered return an error // and leave the message in the queue if !b.IsTaskRegistered(sig.Name) { if sig.IgnoreWhenTaskNotRegistered { - b.deleteOne(delivery) + b.deleteOne(delivery, qURL) } return fmt.Errorf("task %s is not registered", sig.Name) } @@ -233,17 +240,16 @@ func (b *Broker) consumeOne(delivery *awssqs.ReceiveMessageOutput, taskProcessor return err } // Delete message after successfully consuming and processing the message - if err = b.deleteOne(delivery); err != nil { + if err = b.deleteOne(delivery, qURL); err != nil { log.ERROR.Printf("error when deleting the delivery. delivery is %v, Error=%s", delivery, err) } return err } // deleteOne is a method delete a delivery from AWS SQS -func (b *Broker) deleteOne(delivery *awssqs.ReceiveMessageOutput) error { - qURL := b.defaultQueueURL() +func (b *Broker) deleteOne(delivery *awssqs.ReceiveMessageOutput, queueUrl *string) error { _, err := b.service.DeleteMessage(&awssqs.DeleteMessageInput{ - QueueUrl: qURL, + QueueUrl: queueUrl, ReceiptHandle: delivery.Messages[0].ReceiptHandle, })