Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions internal/common/ingest/testfixtures/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ const (
)

var (
RunIndex = int64(0)
PartitionMarkerGroupIdUuid = uuid.MustParse(PartitionMarkerGroupId)
PriorityClassName = "test-priority"
Groups = []string{"group1", "group2"}
Expand Down
39 changes: 39 additions & 0 deletions internal/common/preemption/utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
package preemption

import (
"strconv"

"github.com/armadaproject/armada/internal/server/configuration"
)

// AreRetriesEnabled determines whether preemption retries are enabled at the job level. Also returns whether the
// annotation was set.
func AreRetriesEnabled(annotations map[string]string) (enabled bool, annotationSet bool) {
preemptionRetryEnabledStr, exists := annotations[configuration.PreemptionRetryEnabledAnnotation]
if !exists {
return false, false
}

preemptionRetryEnabled, err := strconv.ParseBool(preemptionRetryEnabledStr)
if err != nil {
return false, true
}
return preemptionRetryEnabled, true
}

// GetMaxRetryCount gets the max preemption retry count at a job level. Also returns whether the annotation was set.
func GetMaxRetryCount(annotations map[string]string) (maxRetryCount uint, annotationSet bool) {
var preemptionRetryCountMax uint = 0
preemptionRetryCountMaxStr, exists := annotations[configuration.PreemptionRetryCountMaxAnnotation]

if !exists {
return preemptionRetryCountMax, false
}
maybePreemptionRetryCountMax, err := strconv.Atoi(preemptionRetryCountMaxStr)
if err != nil {
return preemptionRetryCountMax, true
} else {
preemptionRetryCountMax = uint(maybePreemptionRetryCountMax)
return preemptionRetryCountMax, true
}
}
1 change: 1 addition & 0 deletions internal/executor/domain/pod_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package domain
const (
JobId = "armada_job_id"
JobRunId = "armada_job_run_id"
JobRunIndex = "armada_job_run_index"
PodNumber = "armada_pod_number"
PodCount = "armada_pod_count"
JobSetId = "armada_jobset_id"
Expand Down
13 changes: 7 additions & 6 deletions internal/executor/util/kubernetes_object.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,11 +116,12 @@ func CreatePodFromExecutorApiJob(job *executorapi.JobRunLease, defaults *configu
}

labels := util.MergeMaps(job.Job.ObjectMeta.Labels, map[string]string{
domain.JobId: jobId,
domain.JobRunId: runId,
domain.Queue: job.Queue,
domain.PodNumber: strconv.Itoa(0),
domain.PodCount: strconv.Itoa(1),
domain.JobId: jobId,
domain.JobRunId: runId,
domain.JobRunIndex: strconv.Itoa(int(job.JobRunIndex)),
domain.Queue: job.Queue,
domain.PodNumber: strconv.Itoa(0),
domain.PodCount: strconv.Itoa(1),
})
annotation := util.MergeMaps(job.Job.ObjectMeta.Annotations, map[string]string{
domain.JobSetId: job.Jobset,
Expand All @@ -132,7 +133,7 @@ func CreatePodFromExecutorApiJob(job *executorapi.JobRunLease, defaults *configu

pod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: common.PodNamePrefix + job.Job.JobId + "-" + strconv.Itoa(0),
Name: common.PodNamePrefix + job.Job.JobId + "-" + strconv.Itoa(0) + "-" + strconv.FormatUint(uint64(job.JobRunIndex), 10),
Labels: labels,
Annotations: annotation,
Namespace: job.Job.ObjectMeta.Namespace,
Expand Down
24 changes: 14 additions & 10 deletions internal/executor/util/kubernetes_objects_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package util

import (
"fmt"
"strconv"
"testing"

"github.com/armadaproject/armada/internal/common/util"
Expand Down Expand Up @@ -142,12 +143,14 @@ func makePodSpec() *v1.PodSpec {
func TestCreatePodFromExecutorApiJob(t *testing.T) {
runId := uuid.NewString()
jobId := util.NewULID()
runIndex := 0

validJobLease := &executorapi.JobRunLease{
JobRunId: runId,
Queue: "queue",
Jobset: "job-set",
User: "user",
JobRunId: runId,
JobRunIndex: uint32(runIndex),
Queue: "queue",
Jobset: "job-set",
User: "user",
Job: &armadaevents.SubmitJob{
ObjectMeta: &armadaevents.ObjectMeta{
Labels: map[string]string{},
Expand All @@ -167,14 +170,15 @@ func TestCreatePodFromExecutorApiJob(t *testing.T) {

expectedPod := &v1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: fmt.Sprintf("armada-%s-0", jobId),
Name: fmt.Sprintf("armada-%s-0-%d", jobId, runIndex),
Namespace: "test-namespace",
Labels: map[string]string{
domain.JobId: jobId,
domain.JobRunId: runId,
domain.Queue: "queue",
domain.PodNumber: "0",
domain.PodCount: "1",
domain.JobId: jobId,
domain.JobRunId: runId,
domain.JobRunIndex: strconv.Itoa(runIndex),
domain.Queue: "queue",
domain.PodNumber: "0",
domain.PodCount: "1",
},
Annotations: map[string]string{
domain.JobSetId: "job-set",
Expand Down
18 changes: 12 additions & 6 deletions internal/scheduler/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -170,15 +170,21 @@ func (srv *ExecutorApi) LeaseJobRuns(stream executorapi.ExecutorApi_LeaseJobRuns
}
}

var runIndex uint32 = 0
if lease.RunIndex != nil {
runIndex = uint32(*lease.RunIndex)
}

err := stream.Send(&executorapi.LeaseStreamMessage{
Event: &executorapi.LeaseStreamMessage_Lease{
Lease: &executorapi.JobRunLease{
JobRunId: lease.RunID,
Queue: lease.Queue,
Jobset: lease.JobSet,
User: lease.UserID,
Groups: groups,
Job: submitMsg,
JobRunId: lease.RunID,
JobRunIndex: runIndex,
Queue: lease.Queue,
Jobset: lease.JobSet,
User: lease.UserID,
Groups: groups,
Job: submitMsg,
},
},
})
Expand Down
7 changes: 7 additions & 0 deletions internal/scheduler/configuration/configuration.go
Original file line number Diff line number Diff line change
Expand Up @@ -248,6 +248,8 @@ type SchedulingConfig struct {
Pools []PoolConfig
ExperimentalIndicativePricing ExperimentalIndicativePricing
ExperimentalIndicativeShare ExperimentalIndicativeShare
// Default preemption retries settings so you don't have to annotate all jobs with retries.
DefaultPreemptionRetry PreemptionRetryConfig
}

const (
Expand Down Expand Up @@ -354,3 +356,8 @@ type PriorityOverrideConfig struct {
ServiceUrl string
ForceNoTls bool
}

type PreemptionRetryConfig struct {
Enabled bool
DefaultMaxRetryCount *uint
}
11 changes: 7 additions & 4 deletions internal/scheduler/database/job_repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ type hasSerial interface {

type JobRunLease struct {
RunID string
RunIndex *int64
Queue string
Pool string
JobSet string
Expand Down Expand Up @@ -54,7 +55,7 @@ type JobRepository interface {
CountReceivedPartitions(ctx *armadacontext.Context, groupId uuid.UUID) (uint32, error)

// FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active
// Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
// Runs are inactive if they don't exist or if they have succeeded, failed, preempted or been cancelled
FindInactiveRuns(ctx *armadacontext.Context, runIds []string) ([]string, error)

// FetchJobRunLeases fetches new job runs for a given executor. A maximum of maxResults rows will be returned, while run
Expand Down Expand Up @@ -293,7 +294,7 @@ func (r *PostgresJobRepository) FetchJobUpdates(ctx *armadacontext.Context, jobS
}

// FindInactiveRuns returns a slice containing all dbRuns that the scheduler does not currently consider active
// Runs are inactive if they don't exist or if they have succeeded, failed or been cancelled
// Runs are inactive if they don't exist or if they have succeeded, failed, preempted or been cancelled
func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, runIds []string) ([]string, error) {
var inactiveRuns []string
err := pgx.BeginTxFunc(ctx, r.db, pgx.TxOptions{
Expand All @@ -313,6 +314,7 @@ func (r *PostgresJobRepository) FindInactiveRuns(ctx *armadacontext.Context, run
WHERE runs.run_id IS NULL
OR runs.succeeded = true
OR runs.failed = true
OR runs.preempted = true
OR runs.cancelled = true;`

rows, err := tx.Query(ctx, fmt.Sprintf(query, tmpTable))
Expand Down Expand Up @@ -351,7 +353,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
}

query := `
SELECT jr.run_id, jr.node, j.queue, j.job_set, jr.pool, j.user_id, j.groups, j.submit_message, jr.pod_requirements_overlay
SELECT jr.run_id, jr.run_index, jr.node, j.queue, j.job_set, jr.pool, j.user_id, j.groups, j.submit_message, jr.pod_requirements_overlay
FROM runs jr
LEFT JOIN %s as tmp ON (tmp.run_id = jr.run_id)
JOIN jobs j
Expand All @@ -361,6 +363,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
AND jr.succeeded = false
AND jr.failed = false
AND jr.cancelled = false
AND jr.preempted = false
ORDER BY jr.serial
LIMIT %d;
`
Expand All @@ -372,7 +375,7 @@ func (r *PostgresJobRepository) FetchJobRunLeases(ctx *armadacontext.Context, ex
defer rows.Close()
for rows.Next() {
run := JobRunLease{}
err = rows.Scan(&run.RunID, &run.Node, &run.Queue, &run.JobSet, &run.Pool, &run.UserID, &run.Groups, &run.SubmitMessage, &run.PodRequirementsOverlay)
err = rows.Scan(&run.RunID, &run.RunIndex, &run.Node, &run.Queue, &run.JobSet, &run.Pool, &run.UserID, &run.Groups, &run.SubmitMessage, &run.PodRequirementsOverlay)
if err != nil {
return errors.WithStack(err)
}
Expand Down
17 changes: 17 additions & 0 deletions internal/scheduler/database/job_repository_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -553,6 +553,15 @@ func TestFindInactiveRuns(t *testing.T) {
},
expectedInactive: []string{runIds[1]},
},
"run preempted": {
runsToCheck: runIds,
dbRuns: []Run{
{RunID: runIds[0]},
{RunID: runIds[1], Preempted: true},
{RunID: runIds[2]},
},
expectedInactive: []string{runIds[1]},
},
"run missing": {
runsToCheck: runIds,
dbRuns: []Run{
Expand Down Expand Up @@ -654,6 +663,14 @@ func TestFetchJobRunLeases(t *testing.T) {
Pool: "test-pool",
Succeeded: true, // should be ignored as terminal
},
{
RunID: uuid.NewString(),
JobID: dbJobs[0].JobID,
JobSet: "test-jobset",
Executor: executorName,
Pool: "test-pool",
Preempted: true, // should be ignored as terminal
},
}
expectedLeases := make([]*JobRunLease, 4)
for i := range expectedLeases {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
ALTER TABLE runs ADD COLUMN IF NOT EXISTS run_index bigint;
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

During testing we found that the old scheduler ingester was trying to write rows with a null value which blocked ingestion from occurring. In order to make this smoother we've made this null-able and then handle null-able values in the app layer.

1 change: 1 addition & 0 deletions internal/scheduler/database/models.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 6 additions & 3 deletions internal/scheduler/database/query.sql.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Loading