Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

RabbitMQ Workflow had to be deactivated #9499

Open
barn4k opened this issue May 23, 2024 · 10 comments
Open

RabbitMQ Workflow had to be deactivated #9499

barn4k opened this issue May 23, 2024 · 10 comments

Comments

@barn4k
Copy link

barn4k commented May 23, 2024

Bug Description

Yesterday I have found that one of my workflows started to send workflow errors with message:

There was a problem with the trigger node "Start", for that reason did the workflow had to be deactivated

I've started to dig into the issue and didn't find anything suspicious. There was no failed execution either, but the workflow keeps sending that error for every 30 minutes.
I have a wait node that is configured to wait for 1 minute to fetch some stuff from an external service and if the results are in the pending state it asks for the status and waits for another 1 minute. Then there is a hard limit of 30 "waits" to finish the workflow and consider the result to be in the timeout state.
image

I suppose that the issue is with the RabbitMQ node that is set to remove the message after the Execution Finishes. In this case as the Wait node is set to wait for 1 minute at a time so the workflow won't be offloaded and the RabbitMQ node will "keep" the message (it remains in the unacked state in the broker) after 30 minutes the broker will terminate the session (considering it is stucked) so even if the workflow will be finished successfully, the RabbitMQ node will fail to delete the message from the queue as the connection is already terminated.

Error message ('Start' node here is the RabbitMQ node):

[
  {
    "trigger": {
      "error": {
        "message": "There was a problem with the trigger node \"Start\", for that reason did the workflow had to be deactivated",
        "timestamp": 1716456899271,
        "name": "WorkflowActivationError",
        "context": {},
        "cause": {}
      },
      "mode": "trigger"
    },
    "workflow": {
      "id": "fyg5VC8YuQnsibw9",
      "name": "0029 ATP - Sub - Scanned device status"
    }
  }
]

There was only one message in the queue, but it seems after 30 minutes (default timeout for the broker) the RabbitMQ node reconnected to the broker and grab the same message once more
image

To Reproduce

Copy the workflow and send something to the RabbitMQ

{
  "meta": {
    "templateCredsSetupCompleted": true,
    "instanceId": "b2b258e8cf8e4b51c801b3bc5a9906c37aad0545d9cdbdf096a6542b5d192385"
  },
  "nodes": [
    {
      "parameters": {
        "queue": "test_queue30m",
        "options": {
          "assertQueue": true,
          "acknowledge": "executionFinishes"
        }
      },
      "id": "f0b44c22-09df-4bad-8dac-47903fd4ea28",
      "name": "RabbitMQ Trigger",
      "type": "n8n-nodes-base.rabbitmqTrigger",
      "typeVersion": 1,
      "position": [
        820,
        380
      ],
      "credentials": {
        "rabbitmq": {
          "id": "aS74IeKoi3bRoXCp",
          "name": "RabbitMQ account"
        }
      }
    },
    {
      "parameters": {},
      "id": "483097bf-cc14-4c4d-83af-b6723a6f088c",
      "name": "End",
      "type": "n8n-nodes-base.noOp",
      "typeVersion": 1,
      "position": [
        1760,
        360
      ]
    },
    {
      "parameters": {},
      "id": "0940be7e-d70a-462e-a9be-0ee0cb43b8c5",
      "name": "Do Smth",
      "type": "n8n-nodes-base.noOp",
      "typeVersion": 1,
      "position": [
        1040,
        380
      ]
    },
    {
      "parameters": {
        "amount": 1,
        "unit": "minutes"
      },
      "id": "6e734c0e-9429-4a6f-9920-c3f1ece2dd59",
      "name": "Wait 1m",
      "type": "n8n-nodes-base.wait",
      "typeVersion": 1.1,
      "position": [
        1240,
        380
      ],
      "webhookId": "9bde1337-f90a-4922-8609-84e051492d5e"
    },
    {
      "parameters": {
        "conditions": {
          "options": {
            "caseSensitive": true,
            "leftValue": "",
            "typeValidation": "strict"
          },
          "conditions": [
            {
              "id": "e159b403-3981-4349-bc37-681015a6132e",
              "leftValue": "={{ $runIndex }}",
              "rightValue": 31,
              "operator": {
                "type": "number",
                "operation": "gt"
              }
            }
          ],
          "combinator": "and"
        },
        "options": {}
      },
      "id": "ba006715-40cd-4364-b3c9-e7fa7152df65",
      "name": "If 31 runs",
      "type": "n8n-nodes-base.if",
      "typeVersion": 2,
      "position": [
        1460,
        380
      ]
    }
  ],
  "connections": {
    "RabbitMQ Trigger": {
      "main": [
        [
          {
            "node": "Do Smth",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Do Smth": {
      "main": [
        [
          {
            "node": "Wait 1m",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "Wait 1m": {
      "main": [
        [
          {
            "node": "If 31 runs",
            "type": "main",
            "index": 0
          }
        ]
      ]
    },
    "If 31 runs": {
      "main": [
        [
          {
            "node": "End",
            "type": "main",
            "index": 0
          }
        ],
        [
          {
            "node": "Wait 1m",
            "type": "main",
            "index": 0
          }
        ]
      ]
    }
  },
  "pinData": {}
}

After 30 minutes there will be an error being sent to the error trigger, but the execution will remain in the In Progress state and don't fail. The message will be grabbed again and the workflow will start once more

Expected behavior

There will be no duplicated messages and during the first execution the RabbitMQ node will keep the message (and keep the connection up). Plus, after the execution finishes the node will remove the message from the queue, that is not happening right now due to the first execution has released the message (the broker forced to terminate the connection on the server side) and the next execution held the message again.

Current workaround is to set the node to remove the message Immediately. But in that case there is no option to set the Parallel Message Processing Limit option (it will be removed from the option list)

Operating System

Docker

n8n Version

1.41.1

Node.js Version

docker

Database

PostgreSQL

Execution mode

main (default)

@Joffcom
Copy link
Member

Joffcom commented May 29, 2024

Hey @barn4k,

Thanks for the report I am testing this now.

@Joffcom
Copy link
Member

Joffcom commented May 31, 2024

Hey @barn4k,

So it looks to be as you said, RabbitMQ is timing out so the initial node can't use the connection to delete the message. In cases like this if you are expecting there to be a long running connection using the RMQ node with the delete option would be the best approach.

image

Then add the delete node to the end and you are good to go
image

@barn4k
Copy link
Author

barn4k commented May 31, 2024

Actually, I think that one won't help 😄

When the connection drops, the message will be available for the workflow and will be grabbed again. So when it comes for the Deletion node to step in - there will be no available messages to be deleted (it will be in use by the next execution). And the Delete node will be executed successfully even if it won't delete anything :)

I've found only one way to avoid it - Using the "Immediately" option to delete the message from the queue. But in this case, I can't set the "Parallel message processing limit".

So maybe, it's possible to set the execution into the error state when the connection will be dropped?

Nevertheless, I'll try the Delete option, maybe it works somehow another

@barn4k
Copy link
Author

barn4k commented May 31, 2024

Hmm... seems with the Delete RMQ Node the situation is a little different - The messages have been read but they never leave the Unacked state as the n8n node won't say it finished the message processing

@Joffcom
Copy link
Member

Joffcom commented Jun 3, 2024

@barn4k that is interesting in my test case it worked. Do you have the main node set to use the delete node a bit like with Webhook / Responsd to Webhook?

@barn4k
Copy link
Author

barn4k commented Jun 5, 2024

@Joffcom yep, same issue with it.
image

image

It says that it has deleted the message but keeps fetching it constantly.
image

The only way it can be resolved is by using the "Immediately" Delete option.

@Joffcom
Copy link
Member

Joffcom commented Jun 5, 2024

Hey @barn4k,

That is odd, Is the message actually being deleted.

@barn4k
Copy link
Author

barn4k commented Jun 5, 2024

I suppose the logic behind it is the same:

  • A message arrives into the RMQ broker
  • RMQ Node grabbing it (the session has been opened and the message became in the un-ack state)
  • Workflow doing some long-time stuff
  • RMQ broker forces the connection to stop after 30 mins and thus releases the message
  • New execution is started because there is a message available in the channel
  • Old execution is going to the end stage after the session is already being disabled and message has been processed with the different execution
  • Old execution reaches the RMQ Delete Node but there is no more message, which has been handled within the RMQ Node (due to the connection close issue). It says that the message is deleted but it can't delete it due to the fact that the same message is already processed with the new execution (you can see in the screen below that the new execution has started before the old one ended. For the same message)

image

@barn4k
Copy link
Author

barn4k commented Jun 5, 2024

From my perspective, the possible ways to eliminate the issue are below:

  1. Set a longer timeout for the broker service (e.g. 1 hour)
  2. Set the Delete RMQ node somewhere before the long-term chain
  3. Set the RMQ node to remove the message Immediately
  4. Refactor the RMQ trigger behavior so it keeps sending keep-alive packets until the execution is done.

The first three options are not actually a "fix", it's more like a way that the node is acting. So in that case the docs should be updated to include that behavior

@Joffcom
Copy link
Member

Joffcom commented Jun 6, 2024

I wonder if this comes down to how we execute nodes internally which may mean option 4 would be a fairly large core change (in theory).

I will do some digging in the amqlib docs to see if there is an option we can pass to the connection. I think a warning in the node about the broker timeout could be a good starting point.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

2 participants