Skip to content
This repository has been archived by the owner on Sep 12, 2023. It is now read-only.

Commit

Permalink
Skip some logics for terminated job and add PodGroup reconcile loop (#93
Browse files Browse the repository at this point in the history
)

* Skip check activeDeadline or backoffLimit if job terminated

This is originally from kubeflow/training-operator#1111

Signed-off-by: Jiaxin Shan <[email protected]>

* Add PodGroup reconcile logic

This is missing in kubeflow/common. We need this to make sure minAvailableReplicas is correct in PodGroup for each training job

Signed-off-by: Jiaxin Shan <[email protected]>
  • Loading branch information
Jeffwan authored May 17, 2020
1 parent 4d01e68 commit a63fa3d
Showing 1 changed file with 70 additions and 36 deletions.
106 changes: 70 additions & 36 deletions pkg/controller.v1/common/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,8 +91,6 @@ func (jc *JobController) ReconcileJobs(
}
log.Infof("Reconciling for job %s", metaObject.GetName())

oldStatus := jobStatus.DeepCopy()

pods, err := jc.Controller.GetPodsForJob(job)
if err != nil {
log.Warnf("GetPodsForJob error %v", err)
Expand All @@ -105,6 +103,45 @@ func (jc *JobController) ReconcileJobs(
return err
}

oldStatus := jobStatus.DeepCopy()
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) {
// If the Job is succeed or failed, delete all pods and services.
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}

if err := jc.CleanupJob(runPolicy, jobStatus, job); err != nil {
return err
}

if jc.Config.EnableGangScheduling {
jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, "JobTerminated", "Job has been terminated. Deleting PodGroup")
if err := jc.DeletePodGroup(metaObject); err != nil {
jc.Recorder.Eventf(runtimeObject, v1.EventTypeWarning, "FailedDeletePodGroup", "Error deleting: %v", err)
return err
} else {
jc.Recorder.Eventf(runtimeObject, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)
}
}

// At this point the pods may have been deleted.
// 1) If the job succeeded, we manually set the replica status.
// 2) If any replicas are still active, set their status to succeeded.
if commonutil.IsSucceeded(jobStatus) {
for rtype := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rtype].Succeeded += jobStatus.ReplicaStatuses[rtype].Active
jobStatus.ReplicaStatuses[rtype].Active = 0
}
}

// No need to update the job status if the status hasn't changed since last time.
if !reflect.DeepEqual(*oldStatus, jobStatus) {
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}

return nil
}

// retrieve the previous number of retry
previousRetry := jc.WorkQueue.NumRequeues(jobKey)

Expand Down Expand Up @@ -143,8 +180,9 @@ func (jc *JobController) ReconcileJobs(
jobExceedsLimit = true
}

// If the Job is terminated, delete all pods and services.
if commonutil.IsSucceeded(jobStatus) || commonutil.IsFailed(jobStatus) || jobExceedsLimit {
if jobExceedsLimit {
// If the Job exceeds backoff limit or is past active deadline
// delete all pods and services, then set the status to failed
if err := jc.DeletePodsAndServices(runPolicy, job, pods); err != nil {
return err
}
Expand All @@ -160,48 +198,44 @@ func (jc *JobController) ReconcileJobs(
return err
} else {
jc.Recorder.Eventf(runtimeObject, v1.EventTypeNormal, "SuccessfulDeletePodGroup", "Deleted PodGroup: %v", jobName)

}
}

if jobExceedsLimit {
jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage)
if err != nil {
log.Infof("Append job condition error: %v", err)
return err
}
jc.Recorder.Event(runtimeObject, v1.EventTypeNormal, commonutil.JobFailedReason, failureMessage)
if jobStatus.CompletionTime == nil {
now := metav1.Now()
jobStatus.CompletionTime = &now
}
if err := commonutil.UpdateJobConditions(&jobStatus, apiv1.JobFailed, commonutil.JobFailedReason, failureMessage); err != nil {
log.Infof("Append job condition error: %v", err)
return err
}

// At this point the pods may have been deleted.
// 1) If the job succeeded, we manually set the replica status.
// 2) If any replicas are still active, set their status to succeeded.
if commonutil.IsSucceeded(jobStatus) {
for rtype := range jobStatus.ReplicaStatuses {
jobStatus.ReplicaStatuses[rtype].Succeeded += jobStatus.ReplicaStatuses[rtype].Active
jobStatus.ReplicaStatuses[rtype].Active = 0
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
} else {
// General cases which need to reconcile
if jc.Config.EnableGangScheduling {
minAvailableReplicas := totalReplicas
_, err := jc.SyncPodGroup(metaObject, minAvailableReplicas)
if err != nil {
log.Warnf("Sync PodGroup %v: %v", jobKey, err)
}
}
return jc.Controller.UpdateJobStatusInApiServer(job, &jobStatus)
}

// Diff current active pods/services with replicas.
for rtype, spec := range replicas {
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
if err != nil {
log.Warnf("ReconcilePods error %v", err)
return err
}
// Diff current active pods/services with replicas.
for rtype, spec := range replicas {
err := jc.Controller.ReconcilePods(metaObject, &jobStatus, pods, rtype, spec, replicas)
if err != nil {
log.Warnf("ReconcilePods error %v", err)
return err
}

err = jc.Controller.ReconcileServices(metaObject, services, rtype, spec)
err = jc.Controller.ReconcileServices(metaObject, services, rtype, spec)

if err != nil {
log.Warnf("ReconcileServices error %v", err)
return err
if err != nil {
log.Warnf("ReconcileServices error %v", err)
return err
}
}
}

Expand Down

0 comments on commit a63fa3d

Please sign in to comment.