Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DLQ - Help wanted #10

Open
jairhenrique opened this issue Aug 14, 2020 · 13 comments
Open

DLQ - Help wanted #10

jairhenrique opened this issue Aug 14, 2020 · 13 comments

Comments

@jairhenrique
Copy link

@Bogdanp I've created an actor like this.

@dramatiq.actor(
    max_retries=10,
    min_backoff=(1 * MINUTES),
    max_backoff=(10 * MINUTES),
    throws=(FooBar, BarFoo),
    queue_name="my_queue",
)
def process_finish(reseller_id: str):
    ...

I have an error in this process and the worker exceed the number of retries [dramatiq.middleware.retries.Retries] [WARNING] Retries exceeded for message 'd3c95e20-40b6-47ce-877b-fff43bcd5a57'. but the message did not go to the dlq queue 🤔 .

The number of MAX_RECEIVES default is 5 in dlq setup.

Environment

python = 3.8.5
django-dramatiq = 0.9.1
dramatiq = 1.9.0
dramatiq-sqs =0.0.11

Configs

DRAMATIQ_BROKER = {
    "BROKER": "dramatiq_sqs.broker.SQSBroker",
    "OPTIONS": {
        "namespace": "my_namespace",
        "dead_letter": True,
        "region_name": os.getenv("AWS_REGION_NAME", "fake"),
        "aws_access_key_id": os.getenv("AWS_ACCESS_KEY_ID", "fake"),
        "aws_secret_access_key": os.getenv("AWS_SECRET_ACCESS_KEY", "fake"),
    },
    "MIDDLEWARE": [
        "dramatiq.middleware.AgeLimit",
        "dramatiq.middleware.TimeLimit",
        "dramatiq.middleware.Callbacks",
        "dramatiq.middleware.Retries",
        "my_project.middleware.dramatiq.log_message.LogMessageMiddleware",
    ],
}
@jairhenrique
Copy link
Author

Hi @pjsier!
I saw that you did the implementation of dlq. Is this working for you?

@jairhenrique
Copy link
Author

I think I found out what happens.
The retry middleware adds more information to the message, including the error traceback.
Probably sqs does not count as the same message because the message body is different.

@jairhenrique
Copy link
Author

@Bogdanp @pjsier

When I have success or fail on process of a message, the message is droped from queue.

    def ack(self, message: "_SQSMessage") -> None:
        message._sqs_message.delete()
        self.message_refc -= 1

    #: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
    nack = ack

The receive count used for dlq on sqs will never work.

SQS has three states of message (https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-visibility-timeout.html):

An Amazon SQS message has three basic states:
   1. Sent to a queue by a producer.
   2. Received from the queue by a consumer. (receive count increase if message is not dropped after visibility timeout period)
   3. Deleted from the queue.

So, when I delete a message on nack and enqueue a new message incrementing the number of retries, this is a new message with a new receive count.

Maybe the solution is on nack method check if a queue that belongs a message has a dlq configuration and do not delete a message. Maybe retry middleware can't be used with a dlq.

What is your opinion?

@Bogdanp
Copy link
Owner

Bogdanp commented Aug 20, 2020

Thanks for digging into this! Unfortunately, I haven't had time to look into this, but I'll try to take a look this weekend.

@liveFreeOrCode
Copy link

Checking in on this. I'm looking to implement the SQS broker, but we use the Retry middleware pretty heavily

@januszm
Copy link

januszm commented Jul 11, 2021

@jairhenrique were you able to solve this issue? Have you considered not using Retries and instead : don't delete the message, let it regain visibility, and be retrieved by a worker again? I haven't tried retries with Dramatiq yet but I have the impression that since, as you write, Retry mechanism in practice creates a new message, SQS DLQ makes no sense if Retry is turned on at all. So we have a situation here: either we use DLQ or Retries. Am I getting it right?

@chintan-synapse
Copy link

chintan-synapse commented Mar 3, 2022

@januszm 's I tried using Retry along with the following nack updates

def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

which does few following things

  • If I set retry > 0, Retry will keep creating new message and deleting old ones and eventually stops creating new messages after hitting max_retries ( I have kept it to 0 and let visibility timeout expire)
  • Consumer will continue consuming the last retried message till max_receive number of times ( sqs redrive policy )
  • Redrive policy kicks in and message is moved to dlq.

Note: Default MAX_VISIBILITY_TIMEOUT for subsequent receive_message() on sqs calls is set too high(7200 sec), you might want to reconfigure that as well.

@jairhenrique
Copy link
Author

@januszm @chintan-synapse I try it:

import argparse
import dramatiq
import dramatiq
import random
import sys

from dramatiq_sqs import SQSBroker

broker = SQSBroker(
    namespace="dramatiq_sqs_tests",
    middleware=[],
    dead_letter=True,
    aws_access_key_id="xxxx",
    aws_secret_access_key="xxx",
    region_name="us-east-1"

)
dramatiq.set_broker(broker)

@dramatiq.actor
def add(x, y):
    raise Exception('fake')

And rewrite _SQSConsumer with:

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    #: Messages are added to DLQ by SQS redrive policy, so no actions are necessary
    def nack(self, message: "_SQSMessage") -> None:
        print('NACK')
        pass

Whem I raise Exception, ack is called and the message was deleted by message._sqs_message.delete().

I'm trying to figure out the best way to not ack the message and not call .delete()

@jairhenrique
Copy link
Author

@januszm @chintan-synapse @Bogdanp I open this pr on dramatiq.

If it make sense, I will change _SqsConsumer to:

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

@jairhenrique
Copy link
Author

With this implementation, the Retries middleware will doesn't work with sqs :(

    def ack(self, message: "_SQSMessage") -> None:        
        message._sqs_message.delete()
        self.message_refc -= 1

    def nack(self, message: "_SQSMessage") -> None:
        self.message_refc -= 1

I keep thinking of a solution.

@shayts7
Copy link

shayts7 commented Oct 29, 2023

Hi Guys, We use dramatiq with SQS and do see a value in retries + DLQ - that way there is more certainty that the messages that will enter the DLQ are real bugs and not rumtime problems...
Do you know to tell if there is a solution that can be provided?

@jamie-chang-globality
Copy link

We've got a workaround by forcing max_received to be 1 and overriding nack.

    def nack(self, message: _SQSMessage) -> None:
        """
        Set visibility timeout so SQS immediately tries to read the message.
        `max_receives` are assumed to be 1, so that the message will
        enter DLQ after one single `nack`.

        Retries can now be handled by `Retries` middleware in as opposed to sqs.
        """

        message._sqs_message.change_visibility(VisibilityTimeout=0)

@DHUKK
Copy link

DHUKK commented Jan 28, 2025

Current situation

It looks like the core Dramatiq implementation of retries will not work with the SQS native retries and dead letter queues
(ie. via the RedrivePolicy and maxReceiveCount configuration).

The core Dramatiq retry middleware logic will always re-queue a new message from within the retries middleware and then always ack the original message.
With the current dramatiq_sqs ack+nack implementation the original message will always be deleted from the queue after processing whether it failed or succeeded.

SQS maxReceiveCount only works if the exact same message is received multiple times and not deleted/re-queued as the re-queued message is technically a different message.

Workaround

Using @jamie-chang-globality's workaround does work to an extent (see limitation below) but still relies on the core retry logic that deletes and re-queues a message.

Having max_receives==1 also has a limitation which is that if the consumer is interrupted/crashes etc during execution of a task then the message will end up on the DLQ without any retries being attempted.

Suggestion

I believe we could have an SQS retry middleware that replaces the core middleware when using SQS.

If a DLQ is configured:

  • The middleware would act similarly, however, would not queue a new message.
  • nack implementation will just call message._sqs_message.change_visibility(VisibilityTimeout=0)

If a DLQs is not configured then:

  • The middleware would check ApproximateReceiveCount of the message instead of message.options["retries"]
  • nack implementation will delete the message

This could allow the use of native SQS retries and DLQ routing.

The functional change that this would likely have on retries is that it would mean that the retried message would:

  • not be re-queued at the back of the queue
  • not contain data about the number of retries/traceback.
    • ApproximateReceiveCount can be used for retry count etc.

Alternative

An alternative would be to remove the native support for DLQ redrive policy and manually move tasks to DLQs

@Bogdanp Do you have any opinions on this? As it functionally changes the concept of deleting and re-queueing messages for retries.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

8 participants