Skip to content

Commit

Permalink
fix: manual-reconciliation should suspend currect job (#184)
Browse files Browse the repository at this point in the history
  • Loading branch information
aclevername authored Jun 27, 2024
1 parent 1f5a889 commit 4c6d1db
Show file tree
Hide file tree
Showing 2 changed files with 35 additions and 6 deletions.
26 changes: 20 additions & 6 deletions lib/workflow/reconciler.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,13 +117,23 @@ func ReconcileConfigure(opts Opts) (bool, error) {
pipeline := opts.Resources[pipelineIndex]
opts.logger = originalLogger.WithName(pipeline.Name)

isManualReconciliation := isManualReconciliation(opts.parentObject.GetLabels())
if jobIsForPipeline(pipeline, mostRecentJob) {
if isRunning(mostRecentJob) {
if isManualReconciliation {
err := suspendJob(opts.ctx, opts.client, mostRecentJob)
if err != nil {
opts.logger.Error(err, "failed to suspend Job", "job", mostRecentJob.GetName())
return true, err
}
return true, nil
}

opts.logger.Info("Job already inflight for Pipeline, waiting for it to complete", "job", mostRecentJob.Name, "pipeline", pipeline.Name)
return true, nil
}

if isManualReconciliation(opts.parentObject.GetLabels()) {
if isManualReconciliation {
opts.logger.Info("Pipeline running due to manual reconciliation", "pipeline", pipeline.Name)
return createConfigurePipeline(opts, pipelineIndex, pipeline)
}
Expand All @@ -146,12 +156,9 @@ func ReconcileConfigure(opts Opts) (bool, error) {
// have any active pods)
if isRunning(mostRecentJob) {
opts.logger.Info("Job already inflight for another workflow, suspending it", "job", mostRecentJob.Name)
trueBool := true
patch := client.MergeFrom(mostRecentJob.DeepCopy())
mostRecentJob.Spec.Suspend = &trueBool
err := opts.client.Patch(opts.ctx, mostRecentJob, patch)
err := suspendJob(opts.ctx, opts.client, mostRecentJob)
if err != nil {
opts.logger.Error(err, "failed to patch Job", "job", mostRecentJob.GetName())
opts.logger.Error(err, "failed to suspend Job", "job", mostRecentJob.GetName())
}
return true, nil
}
Expand All @@ -161,6 +168,13 @@ func ReconcileConfigure(opts Opts) (bool, error) {
return createConfigurePipeline(opts, pipelineIndex, pipeline)
}

func suspendJob(ctx context.Context, c client.Client, job *batchv1.Job) error {
trueBool := true
patch := client.MergeFrom(job.DeepCopy())
job.Spec.Suspend = &trueBool
return c.Patch(ctx, job, patch)
}

func getLabelsForPipelineJob(pipeline v1alpha1.PipelineJobResources) map[string]string {
labels := pipeline.Job.DeepCopy().GetLabels()
return labels
Expand Down
15 changes: 15 additions & 0 deletions lib/workflow/reconciler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,21 @@ var _ = Describe("Workflow Reconciler", func() {
It("returns true", func() {
Expect(requeue).To(BeTrue())
})

When("a new manual reconciliation request is made", func() {
It("cancels the current job (allowing a new job to be queued up)", func() {
uPromise.SetLabels(map[string]string{
"kratix.io/manual-reconciliation": "true",
})
opts := workflow.NewOpts(ctx, fakeK8sClient, logger, uPromise, workflowPipelines, "promise")
requeue, err := workflow.ReconcileConfigure(opts)

Expect(err).NotTo(HaveOccurred())
Expect(requeue).To(BeTrue())
Expect(listJobs(namespace)).To(HaveLen(1))
Expect(*listJobs(namespace)[0].Spec.Suspend).To(BeTrue())
})
})
})

Context("and the job is completed", func() {
Expand Down

0 comments on commit 4c6d1db

Please sign in to comment.