Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat skip states for workflows #1075

Open
wants to merge 58 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
3909f65
happy path example
Nov 4, 2024
9e08354
progress commit
Nov 21, 2024
3a31c7f
cleanup
Nov 27, 2024
cff050f
cleanup and pass the step run repo around
Nov 27, 2024
a4bd207
happy path example
Nov 4, 2024
2e1444f
progress commit
Nov 21, 2024
26c1aba
cleanup
Nov 27, 2024
6e0c9fb
cleanup and pass the step run repo around
Nov 27, 2024
4d152cb
merge in main
Dec 2, 2024
417a689
merge
Dec 2, 2024
4305611
before refactor
Dec 2, 2024
55daf28
quick refactor
Dec 2, 2024
b9107a0
lets do the hit the step run queue from everywhere else too
Dec 2, 2024
f81f0f4
some more refactoring
Dec 3, 2024
0a0e66d
some more cleanup and refactor
Dec 3, 2024
679b5cf
cleanup the caches when we have quit the step run engine
Dec 3, 2024
a621d25
deal with the cache in the caller to prevent leaks
Dec 3, 2024
78caa84
Merge branch 'main' into feat-skip-states-for-workflows
reillyse Dec 3, 2024
877dc08
Merge branch 'feat-skip-states-for-workflows' of github.com:hatchet-d…
Dec 3, 2024
63f7076
cleanup unused fields in query, no need to update the workflow run - …
Dec 3, 2024
3852d03
crazy-dag with e2e
Dec 5, 2024
cfa4c8e
parallelize short circuiting
Dec 6, 2024
51681f7
fix the simple test and reduce noise
Dec 6, 2024
bdc4470
check for the onfailure job when we create and only then update the w…
Dec 6, 2024
f92c408
clean up
Dec 6, 2024
f8c0234
reduce noise in tests
Dec 6, 2024
607a2a0
Merge branch 'main' of github.com:hatchet-dev/hatchet
Dec 16, 2024
97a7a8e
merge
Dec 16, 2024
6735c03
some cleanup
Dec 17, 2024
ad903fe
crazy dag
Dec 17, 2024
b180036
cleanup
Dec 17, 2024
172aa64
cleanup comments and make func private
Dec 17, 2024
abcfb2c
merge
reillyse Dec 18, 2024
a9f603d
Merge branch 'main' into feat-skip-states-for-workflows
reillyse Dec 18, 2024
d602a47
generate to remove the comment
reillyse Dec 18, 2024
8347614
make the e2e check the state of the workflow run to make sure it was …
reillyse Dec 18, 2024
30aca96
merge
reillyse Dec 18, 2024
3307c74
namespace the load tests and only queue the item once
reillyse Dec 19, 2024
92a1725
not working locally lets see about actions
reillyse Dec 19, 2024
72a26fe
no delay creating worfklow runs
reillyse Dec 19, 2024
e0df595
cleanup migrations
reillyse Dec 19, 2024
981f51d
make the concurrency test a bit more robust, explicitly check for the…
reillyse Dec 19, 2024
422acdb
more modifications for the concurrency test
reillyse Dec 20, 2024
4b78b2e
log and return an error
reillyse Dec 20, 2024
e7aa2f1
remove the namespace stuff to debug
reillyse Dec 20, 2024
eccb383
maybe the duplicate code is causing this
reillyse Dec 20, 2024
2942597
tighten up the tests a little
reillyse Dec 20, 2024
f382827
rewrite load tests
reillyse Dec 21, 2024
c8e9fe8
if we don't have a worker we can't register a workflow
reillyse Dec 21, 2024
677bc5b
remove debug
reillyse Dec 21, 2024
8d151e9
clean up context and go funcs
reillyse Dec 21, 2024
dc8ffff
fix the crazy dag timeout
reillyse Dec 21, 2024
761501b
add back in the timeout
reillyse Dec 21, 2024
e6bc3d3
explicitly quit the go funcs
reillyse Dec 21, 2024
99aa4b3
don't wait for the engine
reillyse Dec 21, 2024
ac6974d
wait for the engine to cleanup
reillyse Dec 21, 2024
0df3fb3
clean up the worker off the ctx
reillyse Dec 21, 2024
e08d0bb
tighten up the failure a little
reillyse Dec 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ tasks:
recreate-db-from-scratch:
cmds:
- docker compose down
- docker volume rm oss_hatchet_postgres_data
- docker volume rm oss_hatchet_rabbitmq_data
- docker volume rm oss_hatchet_postgres_data || true
- docker volume rm oss_hatchet_rabbitmq_data || true
- docker compose up -d
- task: setup
- task: init-dev-env
Expand Down
41 changes: 29 additions & 12 deletions api/v1/server/handlers/workflows/trigger.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/hatchet-dev/hatchet/internal/services/shared/tasktypes"
"github.com/hatchet-dev/hatchet/pkg/repository"
"github.com/hatchet-dev/hatchet/pkg/repository/metered"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/db"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/dbsqlc"
"github.com/hatchet-dev/hatchet/pkg/repository/prisma/sqlchelpers"
Expand Down Expand Up @@ -95,21 +96,37 @@ func (t *WorkflowService) WorkflowRunCreate(ctx echo.Context, request gen.Workfl
return nil, fmt.Errorf("trigger.go could not create workflow run: %w", err)
}

// send to workflow processing queue
err = t.config.MessageQueue.AddMessage(
ctx.Request().Context(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(
sqlchelpers.UUIDToStr(createdWorkflowRun.TenantId),
sqlchelpers.UUIDToStr(createdWorkflowRun.ID),
),
)
if !prisma.CanShortCircuit(createdWorkflowRun.Row) {
// send to workflow processing queue
err = t.config.MessageQueue.AddMessage(
ctx.Request().Context(),
msgqueue.WORKFLOW_PROCESSING_QUEUE,
tasktypes.WorkflowRunQueuedToTask(
sqlchelpers.UUIDToStr(createdWorkflowRun.Row.WorkflowRun.TenantId),
sqlchelpers.UUIDToStr(createdWorkflowRun.Row.WorkflowRun.ID),
),
)

if err != nil {
return nil, fmt.Errorf("could not add workflow run to queue: %w", err)
if err != nil {
return nil, fmt.Errorf("could not add workflow run to queue: %w", err)
}
}

workflowRun, err := t.config.APIRepository.WorkflowRun().GetWorkflowRunById(ctx.Request().Context(), tenant.ID, sqlchelpers.UUIDToStr(createdWorkflowRun.ID))
for _, queueName := range createdWorkflowRun.StepRunQueueNames {

if schedPartitionId, ok := tenant.SchedulerPartitionID(); ok {
err = t.config.MessageQueue.AddMessage(
ctx.Request().Context(),
msgqueue.QueueTypeFromPartitionIDAndController(schedPartitionId, msgqueue.Scheduler),
tasktypes.CheckTenantQueueToTask(tenant.ID, queueName, true, false),
)

if err != nil {
t.config.Logger.Err(err).Msg("could not add message to scheduler partition queue")
}
}
}
workflowRun, err := t.config.APIRepository.WorkflowRun().GetWorkflowRunById(ctx.Request().Context(), tenant.ID, sqlchelpers.UUIDToStr(createdWorkflowRun.Row.WorkflowRun.ID))

if err != nil {
return nil, fmt.Errorf("could not get workflow run: %w", err)
Expand Down
2 changes: 2 additions & 0 deletions cmd/hatchet-engine/engine/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -370,6 +370,7 @@ func runV0Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
admin.WithRepository(sc.EngineRepository),
admin.WithMessageQueue(sc.MessageQueue),
admin.WithEntitlementsRepository(sc.EntitlementRepository),
admin.WithLogger(sc.Logger),
)
if err != nil {
return nil, fmt.Errorf("could not create admin service: %w", err)
Expand Down Expand Up @@ -730,6 +731,7 @@ func runV1Config(ctx context.Context, sc *server.ServerConfig) ([]Teardown, erro
admin.WithRepository(sc.EngineRepository),
admin.WithMessageQueue(sc.MessageQueue),
admin.WithEntitlementsRepository(sc.EntitlementRepository),
admin.WithLogger(sc.Logger),
)

if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions examples/bulk_imports/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,9 @@ func run() (func() error, error) {

var events []client.EventWithAdditionalMetadata

// 20000 times to test the bulk push
// 999 (max amount) times to test the bulk push

for i := 0; i < 20000; i++ {
for i := 0; i < 999; i++ {
testEvent := userCreateEvent{
Username: "echo-test",
UserID: "1234 " + fmt.Sprint(i),
Expand Down
86 changes: 74 additions & 12 deletions examples/concurrency/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,27 +30,63 @@ func main() {
}

events := make(chan string, 50)
wfrIds := make(chan *client.Workflow, 50)
interrupt := cmdutils.InterruptChan()
c, err := client.New()

cleanup, err := run(events)
if err != nil {
log.Fatalf("error creating client: %v", err)
}
cleanup, err := run(c, events, wfrIds)
if err != nil {
panic(err)
}
selectLoop:
for {
select {

case <-interrupt:
log.Print("Interrupted")
break selectLoop
case wfrId := <-wfrIds:
log.Printf("Workflow run id: %s", wfrId.WorkflowRunId())
wfResult, err := wfrId.Result()
if err != nil {

<-interrupt
if err.Error() == "step output for step-one not found" {
log.Printf("Step output for step-one not found because it was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT")
continue
}
panic(fmt.Errorf("error getting workflow run result: %w", err))
}

stepOneOutput := &stepOneOutput{}

err = wfResult.StepOutput("step-one", stepOneOutput)

if err != nil {
if err.Error() == "step run failed: this step run was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT" {
log.Printf("Workflow run was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT")
continue
}
if err.Error() == "step output for step-one not found" {
log.Printf("Step output for step-one not found because it was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT")
continue
}
panic(fmt.Errorf("error getting workflow run result: %w", err))
}
case e := <-events:
log.Printf("Event: %s", e)
}
}

if err := cleanup(); err != nil {

panic(fmt.Errorf("error cleaning up: %w", err))
}
}

func run(events chan<- string) (func() error, error) {
c, err := client.New()

if err != nil {
return nil, fmt.Errorf("error creating client: %w", err)
}
func run(c client.Client, events chan<- string, wfrIds chan<- *client.Workflow) (func() error, error) {

w, err := worker.NewWorker(
worker.WithClient(
Expand All @@ -74,7 +110,8 @@ func run(events chan<- string) (func() error, error) {
err = ctx.WorkflowInput(input)

// we sleep to simulate a long running task
time.Sleep(10 * time.Second)

time.Sleep(7 * time.Second)

if err != nil {

Expand All @@ -98,7 +135,11 @@ func run(events chan<- string) (func() error, error) {
err = ctx.StepOutput("step-one", input)

if err != nil {
return nil, err

if err.Error() == "step run failed: this step run was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT" {
return nil, nil
}

}

if ctx.Err() != nil {
Expand All @@ -125,18 +166,39 @@ func run(events chan<- string) (func() error, error) {
"test": "test",
},
}

// I want some to be in Running and some to be in Pending so we cancel both

go func() {
// do this 10 times to test concurrency
for i := 0; i < 10; i++ {
for i := 0; i < 7; i++ {

wfr_id, err := c.Admin().RunWorkflow("simple-concurrency", testEvent)

log.Println("Starting workflow run id: ", wfr_id)
log.Println("Starting workflow run id: ", wfr_id.WorkflowRunId())

if err != nil {
panic(fmt.Errorf("error running workflow: %w", err))
}

wfrIds <- wfr_id
time.Sleep(400 * time.Millisecond)
}
}()
go func() {
// do this 10 times to test concurrency
for i := 0; i < 13; i++ {

wfr_id, err := c.Admin().RunWorkflow("simple-concurrency", testEvent)

log.Println("Starting workflow run id: ", wfr_id.WorkflowRunId())

if err != nil {
panic(fmt.Errorf("error running workflow: %w", err))
}

wfrIds <- wfr_id

}
}()

Expand Down
91 changes: 84 additions & 7 deletions examples/concurrency/main_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,49 +4,126 @@ package main

import (
"context"
"fmt"
"sync"
"testing"
"time"

"github.com/stretchr/testify/assert"

"github.com/hatchet-dev/hatchet/internal/testutils"
"github.com/hatchet-dev/hatchet/pkg/client"
)

func TestConcurrency(t *testing.T) {
testutils.Prepare(t)

ctx, cancel := context.WithTimeout(context.Background(), 120*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()

events := make(chan string, 50)
wfrIds := make(chan *client.Workflow, 50)
c, err := client.New()

cleanup, err := run(events)
if err != nil {
panic("error creating client: " + err.Error())
}
cleanup, err := run(c, events, wfrIds)
if err != nil {
t.Fatalf("/run() error = %v", err)
}

var items []string

var workflowRunIds []*client.WorkflowResult
var wg sync.WaitGroup
done := make(chan struct{})
outer:
for {

select {
case item := <-events:
items = append(items, item)
if len(items) > 2 {
fmt.Println("got 2 events")
break outer
}
case <-ctx.Done():
fmt.Println("context done")
break outer

case wfrId := <-wfrIds:
fmt.Println("got wfr id")
go func(workflow *client.Workflow) {
wg.Add(1)
defer wg.Done()
wfr, err := workflow.Result()
workflowRunIds = append(workflowRunIds, wfr)
if err != nil {
panic(fmt.Errorf("error getting workflow run result: %w", err))
}
}(wfrId)

}
}

go func() {
wg.Wait()
close(done)
}()

select {

case <-time.After(20 * time.Second):
t.Fatalf("timed out waiting for workflow results")
case <-done:

}

// our workflow run ids should have only one succeeded everyone else should have failed
stateCount := make(map[string]int)

if len(workflowRunIds) != 20 {
t.Fatalf("expected 20 workflow run ids, got %d", len(workflowRunIds))
}

for _, wfrId := range workflowRunIds {
state, err := getWorkflowStateForWorkflowRunId(c, ctx, wfrId)

fmt.Println("state: ", state)
if err != nil {
t.Fatalf("error getting workflow state: %v", err)
}
stateCount[state]++
}

assert.Equal(t, []string{
"step-one",
"step-two",
}, items)
assert.Equal(t, 1, stateCount["SUCCEEDED"])
assert.Equal(t, 19, stateCount["CANCELLED_BY_CONCURRENCY_LIMIT"])

if err := cleanup(); err != nil {
t.Fatalf("cleanup() error = %v", err)
}

}

func getWorkflowStateForWorkflowRunId(client client.Client, ctx context.Context, wfr *client.WorkflowResult) (string, error) {

stepOneOutput := &stepOneOutput{}

err := wfr.StepOutput("step-one", stepOneOutput)
if err != nil {

if err.Error() == "step run failed: this step run was cancelled due to CANCELLED_BY_CONCURRENCY_LIMIT" {
return "CANCELLED_BY_CONCURRENCY_LIMIT", nil
}

// this happens if we cancel before the workflow is run
if err.Error() == "step output for step-one not found" {
return "CANCELLED_BY_CONCURRENCY_LIMIT", nil
}

fmt.Println("error getting step output: %w", err)
return "", err
}

return "SUCCEEDED", nil
}
Loading
Loading