diff --git a/assets/databases/heimdall/build/heimdall.lst b/assets/databases/heimdall/build/heimdall.lst index 37fb01c..95d3413 100644 --- a/assets/databases/heimdall/build/heimdall.lst +++ b/assets/databases/heimdall/build/heimdall.lst @@ -8,6 +8,7 @@ tables/command_cluster_tags.sql tables/clusters.sql tables/cluster_tags.sql tables/jobs.sql +tables/jobs_status_system_job_id_idx.sql tables/job_tags.sql tables/job_statuses.sql tables/job_cluster_tags.sql diff --git a/assets/databases/heimdall/tables/jobs_status_system_job_id_idx.sql b/assets/databases/heimdall/tables/jobs_status_system_job_id_idx.sql new file mode 100644 index 0000000..064ead8 --- /dev/null +++ b/assets/databases/heimdall/tables/jobs_status_system_job_id_idx.sql @@ -0,0 +1 @@ +create index concurrently if not exists jobs_status_system_job_id_idx on jobs (job_status_id, system_job_id); diff --git a/internal/pkg/heimdall/job.go b/internal/pkg/heimdall/job.go index 4789f82..ccd11c2 100644 --- a/internal/pkg/heimdall/job.go +++ b/internal/pkg/heimdall/job.go @@ -159,7 +159,11 @@ func (h *Heimdall) runJob(ctx context.Context, j *job.Job, command *command.Comm // Check if context was canceled and mark status appropriately if pluginCtx.Err() != nil { - j.Status = jobStatus.Canceling // janitor will update to canceled when resources are cleaned up + if j.IsSync { + j.Status = jobStatus.Canceled // no need to send to janitor + } else { + j.Status = jobStatus.Canceling // janitor will update to canceled when resources are cleaned up + } runJobMethod.LogAndCountError(pluginCtx.Err(), command.Name, cluster.Name) return nil } diff --git a/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql b/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql index 11a5bc8..36a7183 100644 --- a/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql +++ b/internal/pkg/janitor/queries/stale_and_canceling_jobs_select.sql @@ -1,27 +1,38 @@ -select +with candidate_ids as ( + -- canceling jobs + select j.system_job_id + from jobs j + where j.job_status_id = 7 + + union + + -- stale active jobs (drive from active_jobs) + select aj.system_job_id + from active_jobs aj + where aj.last_heartbeat > 0 + and aj.last_heartbeat < (extract(epoch from now())::int - $1) +), +picked as ( + select j.system_job_id, j.job_id, j.job_status_id, - cm.command_id, - cl.cluster_id -from - jobs j - left join active_jobs aj on j.system_job_id = aj.system_job_id - join commands cm on cm.system_command_id = j.job_command_id - join clusters cl on cl.system_cluster_id = j.job_cluster_id -where - ( - -- Stale jobs: must be in active_jobs and have heartbeat timeout - (aj.system_job_id is not null and aj.last_heartbeat > 0 and extract(epoch from now())::int - $1 > aj.last_heartbeat) - or - -- Canceling jobs: status is CANCELING - j.job_status_id = 7 - ) -order by - j.system_job_id -for update of j - skip locked -limit - $2 -; + j.job_command_id, + j.job_cluster_id + from jobs j + join candidate_ids c on c.system_job_id = j.system_job_id + order by j.system_job_id + for update of j skip locked + limit $2 +) +select + p.system_job_id, + p.job_id, + p.job_status_id, + cm.command_id, + cl.cluster_id +from picked p +join commands cm on cm.system_command_id = p.job_command_id +join clusters cl on cl.system_cluster_id = p.job_cluster_id +order by p.system_job_id;