Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: step run partitioning by status #1067

Open
wants to merge 4 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Taskfile.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,8 @@ tasks:
recreate-db-from-scratch:
cmds:
- docker compose down
- docker volume rm oss_hatchet_postgres_data
- docker volume rm oss_hatchet_rabbitmq_data
- docker volume rm oss_hatchet_postgres_data || true
- docker volume rm oss_hatchet_rabbitmq_data || true
- docker compose up -d
- task: setup
- task: init-dev-env
Expand Down
28 changes: 13 additions & 15 deletions examples/bulk_workflows/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,23 @@ func main() {
panic(fmt.Errorf("error creating client: %w", err))
}

_, err = registerWorkflow(c, workflowName)
w, err := registerWorkflow(c, workflowName)

if err != nil {
panic(fmt.Errorf("error registering workflow: %w", err))
}

quantity := 999
cleanup, err := w.Start()
fmt.Println("Starting the worker")

if err != nil {
panic(fmt.Errorf("error starting worker: %w", err))
}

quantity := 600

overallStart := time.Now()
iterations := 10
iterations := 1000
for i := 0; i < iterations; i++ {
startTime := time.Now()

Expand All @@ -60,6 +67,8 @@ func main() {
panic(err)
}
fmt.Printf("Time taken to queue %dth bulk workflow: %v\n", i, time.Since(startTime))

time.Sleep(1 * time.Second)
}
fmt.Println("Overall time taken: ", time.Since(overallStart))
fmt.Printf("That is %d workflows per second\n", int(float64(quantity*iterations)/time.Since(overallStart).Seconds()))
Expand All @@ -76,18 +85,6 @@ func main() {

// I want to start the wofklow worker here

w, err := registerWorkflow(c, workflowName)
if err != nil {
panic(fmt.Errorf("error creating worker: %w", err))
}

cleanup, err := w.Start()
fmt.Println("Starting the worker")

if err != nil {
panic(fmt.Errorf("error starting worker: %w", err))
}

<-ch

if err := cleanup(); err != nil {
Expand All @@ -102,6 +99,7 @@ func registerWorkflow(c client.Client, workflowName string) (w *worker.Worker, e
worker.WithClient(
c,
),
worker.WithMaxRuns(200),
)
if err != nil {
return nil, fmt.Errorf("error creating worker: %w", err)
Expand Down
21 changes: 4 additions & 17 deletions hack/db/atlas-apply.sh
Original file line number Diff line number Diff line change
Expand Up @@ -41,26 +41,13 @@ if [[ ! "$DATABASE_URL" =~ sslmode ]]; then
fi

echo "DATABASE_URL: $DATABASE_URL"
# Check for prisma migrations
MIGRATION_NAME=$(psql "$DATABASE_URL" -t -c "SELECT migration_name FROM _prisma_migrations ORDER BY started_at DESC LIMIT 1;" 2>/dev/null | xargs)
MIGRATION_NAME=$(echo $MIGRATION_NAME | cut -d'_' -f1)

echo "Migration name: $MIGRATION_NAME"
echo "Applying migrations via atlas..."

if [ $? -eq 0 ] && [ -n "$MIGRATION_NAME" ]; then
echo "Using existing prisma migration: $MIGRATION_NAME"
atlas migrate apply \
--url "$DATABASE_URL" \
--dir "file://sql/migrations"

atlas migrate apply \
--url "$DATABASE_URL" \
--baseline "$MIGRATION_NAME" \
--dir "file://sql/migrations"
else
echo "No prisma migration found. Applying migrations via atlas..."

atlas migrate apply \
--url "$DATABASE_URL" \
--dir "file://sql/migrations"
fi

# if either of the above commands failed, exit with an error
if [ $? -ne 0 ]; then
Expand Down
1 change: 0 additions & 1 deletion pkg/repository/prisma/dbsqlc/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions pkg/repository/prisma/dbsqlc/step_runs.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

41 changes: 13 additions & 28 deletions pkg/repository/prisma/dbsqlc/workflow_runs.sql.go

Large diffs are not rendered by default.

3 changes: 1 addition & 2 deletions pkg/repository/prisma/dbsqlc/workflows.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading