Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
69 changes: 46 additions & 23 deletions pkg/controllers/updaterun/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,15 +104,29 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
// Emit the update run status metric based on status conditions in the updateRun.
defer emitUpdateRunStatusMetric(updateRun)

state := updateRun.GetUpdateRunSpec().State

var updatingStageIndex int
var toBeUpdatedBindings, toBeDeletedBindings []placementv1beta1.BindingObj
updateRunStatus := updateRun.GetUpdateRunStatus()
initCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionInitialized))
if !condition.IsConditionStatusTrue(initCond, updateRun.GetGeneration()) {
// Check if initialized regardless of generation.
// The updateRun spec fields are immutable except for the state field. When the state changes,
// the update run generation increments, but we don't need to reinitialize since initialization is a one-time setup.
if !(initCond != nil && initCond.Status == metav1.ConditionTrue) {
// Check if initialization failed for the current generation.
if condition.IsConditionStatusFalse(initCond, updateRun.GetGeneration()) {
klog.V(2).InfoS("The updateRun has failed to initialize", "errorMsg", initCond.Message, "updateRun", runObjRef)
return runtime.Result{}, nil
}

if initCond == nil {
// Update the status to indicate that the updateRun is initializing.
// Requeue immediately to continue with initialization.
klog.V(2).InfoS("The updateRun is initializing", "state", state, "updateRun", runObjRef)
return runtime.Result{RequeueAfter: utils.DefaultRequeueAfterDuration}, r.recordUpdateRunInitializing(ctx, updateRun)
}

var initErr error
if toBeUpdatedBindings, toBeDeletedBindings, initErr = r.initialize(ctx, updateRun); initErr != nil {
klog.ErrorS(initErr, "Failed to initialize the updateRun", "updateRun", runObjRef)
Expand All @@ -122,10 +136,10 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}
return runtime.Result{}, initErr
}
updatingStageIndex = 0 // start from the first stage.
klog.V(2).InfoS("Initialized the updateRun", "updateRun", runObjRef)
updatingStageIndex = 0 // start from the first stage (typically for Initialize or Execute states).
klog.V(2).InfoS("Initialized the updateRun", "state", state, "updateRun", runObjRef)
} else {
klog.V(2).InfoS("The updateRun is initialized", "updateRun", runObjRef)
klog.V(2).InfoS("The updateRun is initialized", "state", state, "updateRun", runObjRef)
// Check if the updateRun is finished.
finishedCond := meta.FindStatusCondition(updateRunStatus.Conditions, string(placementv1beta1.StagedUpdateRunConditionSucceeded))
if condition.IsConditionStatusTrue(finishedCond, updateRun.GetGeneration()) || condition.IsConditionStatusFalse(finishedCond, updateRun.GetGeneration()) {
Expand All @@ -151,28 +165,37 @@ func (r *Reconciler) Reconcile(ctx context.Context, req runtime.Request) (runtim
}

// Execute the updateRun.
klog.V(2).InfoS("Continue to execute the updateRun", "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
}
if state == placementv1beta1.StateStarted {
klog.V(2).InfoS("Continue to execute the updateRun", "state", state, "updatingStageIndex", updatingStageIndex, "updateRun", runObjRef)
finished, waitTime, execErr := r.execute(ctx, updateRun, updatingStageIndex, toBeUpdatedBindings, toBeDeletedBindings)
if errors.Is(execErr, errStagedUpdatedAborted) {
// errStagedUpdatedAborted cannot be retried.
return runtime.Result{}, r.recordUpdateRunFailed(ctx, updateRun, execErr.Error())
}

if finished {
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}
if finished {
klog.V(2).InfoS("The updateRun is completed", "updateRun", runObjRef)
return runtime.Result{}, r.recordUpdateRunSucceeded(ctx, updateRun)
}

// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
// The execution is not finished yet or it encounters a retriable error.
// We need to record the status and requeue.
if updateErr := r.recordUpdateRunStatus(ctx, updateRun); updateErr != nil {
return runtime.Result{}, updateErr
}
klog.V(2).InfoS("The updateRun is not finished yet", "requeueWaitTime", waitTime, "execErr", execErr, "updateRun", runObjRef)
if execErr != nil {
return runtime.Result{}, execErr
}
if waitTime == 0 {
// If update run is not finished and the waitTime needs to be update to a non-zero value or default requeue duration,
// as we are using RequeueAfter only since Requeue is deprecated.
return runtime.Result{RequeueAfter: utils.DefaultRequeueAfterDuration}, nil
}
return runtime.Result{RequeueAfter: waitTime}, nil
}
return runtime.Result{Requeue: true, RequeueAfter: waitTime}, nil
klog.V(2).InfoS("The updateRun is not started, waiting to be started", "state", state, "updateRun", runObjRef)
return runtime.Result{}, nil
}

// handleDelete handles the deletion of the updateRun object.
Expand Down
21 changes: 21 additions & 0 deletions pkg/controllers/updaterun/controller_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -272,6 +272,16 @@ func generateMetricsLabels(
}
}

func generateInitializationSucceededMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
string(metav1.ConditionTrue), condition.UpdateRunInitializeSucceededReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
Expand All @@ -282,6 +292,16 @@ func generateInitializationFailedMetric(updateRun *placementv1beta1.ClusterStage
}
}

func generateInitializationUnknownMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionInitialized),
string(metav1.ConditionUnknown), condition.UpdateRunInitializingReason),
Gauge: &prometheusclientmodel.Gauge{
Value: ptr.To(float64(time.Now().UnixNano()) / 1e9),
},
}
}

func generateProgressingMetric(updateRun *placementv1beta1.ClusterStagedUpdateRun) *prometheusclientmodel.Metric {
return &prometheusclientmodel.Metric{
Label: generateMetricsLabels(updateRun, string(placementv1beta1.StagedUpdateRunConditionProgressing),
Expand Down Expand Up @@ -341,6 +361,7 @@ func generateTestClusterStagedUpdateRun() *placementv1beta1.ClusterStagedUpdateR
PlacementName: testCRPName,
ResourceSnapshotIndex: testResourceSnapshotIndex,
StagedUpdateStrategyName: testUpdateStrategyName,
State: placementv1beta1.StateStarted,
},
}
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/controllers/updaterun/execution.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

placementv1beta1 "github.com/kubefleet-dev/kubefleet/apis/placement/v1beta1"
"github.com/kubefleet-dev/kubefleet/pkg/utils"
bindingutils "github.com/kubefleet-dev/kubefleet/pkg/utils/binding"
"github.com/kubefleet-dev/kubefleet/pkg/utils/condition"
"github.com/kubefleet-dev/kubefleet/pkg/utils/controller"
Expand Down Expand Up @@ -418,7 +419,7 @@ func (r *Reconciler) checkAfterStageTasksStatus(ctx context.Context, updatingSta
}
}
if passed {
afterStageWaitTime = 0
afterStageWaitTime = utils.DefaultRequeueAfterDuration
}
return passed, afterStageWaitTime, nil
}
Expand Down
Loading
Loading