Skip to content
This repository has been archived by the owner on Dec 5, 2023. It is now read-only.

initial workflow callback support #46

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft

Conversation

jkeifer
Copy link
Member

@jkeifer jkeifer commented Mar 11, 2022

What is this?

The issue of how to handle processing items dependent on n other items seems to be a reoccurring theme lately when discussing how best to structure cirrus workflows modeling such dependencies. Some purpose-built solutions have been suggested for particular instances of this issue, but a larger question has been asked: can we find a solution worthy of integration into cirrus to support this use-case more generally?

An idea: workflow callbacks

In thinking through this problem, I stumbled upon support within step functions for task tokens, which can be used to pause a step function execution until a token is returned. It works like this:

  • Step function sends a message to SQS with a task token
  • A lambda or some other resource handles the messages in that queue
  • When the response is ready then something calls SendTaskSuccess or SendtaskFailure, as appropriate, with the specified token
  • If success, the step function resumes at the next step. If failed, then the step function fails the current step.

Within cirrus, we could use this mechanism to wait on items to be published from other workflows. For example, the SQS could be the cirrus process SQS and the message could be the cirrus process payload for the child workflow. That payload would include a task token in it corresponding to the waiting step function task.

If we could guarantee a one-to-one relation between workflows in this process, we could simply have update-state call the SendTaskSuccess or SendTaskFailed using the task token from the input payload, if one is present. However, we have no such guarantees: multiple workflow might be dependent on the same items, so we have to add some complexity to handle these possible many-to-one relations.

I propose we use a dynamodb table to track these relations. The schema would looks something like:

  • payload_id: the ID from a workflow payload (partition key)
  • task_token: the token of the waiting step function task (sort key)
  • workflow_state: the final state of the workflow corresponding to payload_id
  • expiration_time: a field that can be used to track if the callback has been executed and can be used with a table ttl to expire records (so we don’t infinitely grow the table but can keep records around for a limited time as part of an audit trail)

We could use a table stream to have a lambda process all records when workflow_state is set and make the callback to SendTaskSuccess or SendTaskFailed, setting expiration_time when that happens. Then update-state would only have to query for all records in this table with the payload_id and update them with the workflow’s final state.

process would persist a task token in an input payload to that dynamo table regardless of the whether it kicks off a workflow. We could map the workflow_state values to the same as those for payload items, where PROCESSING means another update is coming, COMPLETED means to send a success notification, and INVALID triggers a failure notification (the other states encountered in process would trigger a workflow run and would therefore default to PROCESSING).

Here is a diagram of this flow, not sure if it makes sense or not though:

     workflow 1
       start
         |
         |
        find
    dependencies
         |
         |
     Parallel
     +---+---+
     |   |   |
     |   |   |
   create payload
     |   |   |
     |   |   |
    send payload
  to cirrus process  ---+-->  process (payload 123, callback a)  --->  workflow 2  ---> update-state
  (await callback)      |       |                                                                |
     |   |   |          +-->  process (payload 234, callback b) (already completed)              |
     |   |   |          |       | |                                                              |
     +-+-+-+-+          +-->  process (payload 345, callback c) (already in progress)            |
         |                      | | |                                                            |
         |                      | | |                                                            |
    do something                | | |     Dependency tracking table                              |
         |                      | | |     (payload id, callback token, workflow state, expiration time)
         |                      | | |                                                            |
        end                     | | +-->  345, c, PROCESSING, null  -------->  wait for update-state from that workflow
                                | |                                                              |
                                | +---->  234, b, COMPLETED, now + 2mo  ---->  lambda to send success to b
                                |                                                                |
                                +------>  123, a, PROCESSING, null  <---update to COMPLETED------+
                                                                          |
                                                                          +->  lambda to send success to a

Handling multiple dependencies within a workflow

It should be possible to use dynamic parallelism to create as many parallel tasks as required to queue up all other workflows, waiting until they all complete to resume the dependent root workflow.

Preventing hanging workflows

It would be possible to combine the waiting tasks with a timeout, so if the dependencies are not completed in a reasonable time the root workflow will fail, triggering awareness of the issue.

It would also be possible to have tooling/alarms looking for long-running step function executions.

Potential Negatives

Race conditions

Depending how we implement this, we could have end up with a risk of race conditions. I think by using the table stream and always having process add a row to the table we could mitigate that potential entirely, but I might be overlooking some edge cases.

Complexity

This seems like it might be a complex solution, but we need some external state tracking to make this work, so I’m not sure any alternatives would be any less complex.

AWS-centric

It could be hard to implement a mechanism like this on alternative platforms.

Capacity limitations

An advantage of this solution is that it does not require any daemon-like process to poll for state updates or handle events, outside the step function platform itself. Therefore we do not incur any costs for a workflow waiting on dependencies, aside from the fact that it uses up one of the 1,000,000 step function limit (and it appears this limit can be increased upon request).

In high-volume workflow situations, it could therefore be possible to cause a deadlock where too many workflows are waiting on items to be processed to be able to start workflows to process those items. I don’t see a good solution to this issue beyond preventing it in the first place.

As a best-practice, then, I would suggest not relying solely on this workflow dependency relationship to trigger processing for dependencies in high-volume situations. Generally it is better to have a process to ensure all lower layers items are as present as possible using an alternative feeder method, and rely on this dependency relationship only for a minimal number of items on “the edge” of the other feeder’s processing.

@jkeifer jkeifer self-assigned this Mar 11, 2022
@jkeifer jkeifer marked this pull request as draft March 11, 2022 17:57
@matthewhanson matthewhanson mentioned this pull request Apr 11, 2022
@jkeifer
Copy link
Member Author

jkeifer commented Apr 25, 2022

Spent some time on a mermaid chart, wanted to keep it somewhere:

flowchart LR
  subgraph Main Workflow
    1(Start Workflow)
    3(Process Callback Items)
    subgraph Callback A
      a1(Start A)
      a4(Await A)
    end
    subgraph Callback B
      b1(Start B)
      b4(Await B)
    end
  end
    subgraph Workflow A
      a2(Start)
      a3(End)
    end
    subgraph Workflow B
      b2(Start)
      b3(End)
    end
  1 --> a1
  1 --> b1
  a1 --> a2
  a2 --> a3
  a3 --> a4
  a4 --> 3
  a1 --> a4
  b1 --> b4
  b1 --> b2
  b2 --> b3
  b3 --> b4
  b4 --> 3
Loading

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant