@@ -22,12 +22,15 @@ import (
2222 "fmt"
2323 "reflect"
2424 "strconv"
25+ "strings"
2526 "time"
2627
2728 apierrors "k8s.io/apimachinery/pkg/api/errors"
2829 "k8s.io/apimachinery/pkg/api/meta"
2930 metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3031 "k8s.io/apimachinery/pkg/types"
32+ utilerrors "k8s.io/apimachinery/pkg/util/errors"
33+ "k8s.io/apimachinery/pkg/util/intstr"
3134 "k8s.io/klog/v2"
3235 "sigs.k8s.io/controller-runtime/pkg/client"
3336
@@ -67,8 +70,12 @@ func (r *Reconciler) execute(
6770
6871 updateRunStatus := updateRun .GetUpdateRunStatus ()
6972 if updatingStageIndex < len (updateRunStatus .StagesStatus ) {
73+ maxConcurrency , err := calculateMaxConcurrencyValue (updateRunStatus , updatingStageIndex )
74+ if err != nil {
75+ return false , 0 , err
76+ }
7077 updatingStage := & updateRunStatus .StagesStatus [updatingStageIndex ]
71- waitTime , execErr := r .executeUpdatingStage (ctx , updateRun , updatingStageIndex , toBeUpdatedBindings )
78+ waitTime , execErr := r .executeUpdatingStage (ctx , updateRun , updatingStageIndex , toBeUpdatedBindings , maxConcurrency )
7279 if errors .Is (execErr , errStagedUpdatedAborted ) {
7380 markStageUpdatingFailed (updatingStage , updateRun .GetGeneration (), execErr .Error ())
7481 return true , waitTime , execErr
@@ -91,6 +98,7 @@ func (r *Reconciler) executeUpdatingStage(
9198 updateRun placementv1beta1.UpdateRunObj ,
9299 updatingStageIndex int ,
93100 toBeUpdatedBindings []placementv1beta1.BindingObj ,
101+ maxConcurrency int ,
94102) (time.Duration , error ) {
95103 updateRunStatus := updateRun .GetUpdateRunStatus ()
96104 updateRunSpec := updateRun .GetUpdateRunSpec ()
@@ -105,25 +113,30 @@ func (r *Reconciler) executeUpdatingStage(
105113 bindingSpec := binding .GetBindingSpec ()
106114 toBeUpdatedBindingsMap [bindingSpec .TargetCluster ] = binding
107115 }
108- finishedClusterCount := 0
109116
110- // Go through each cluster in the stage and check if it's updated.
111- for i := range updatingStageStatus .Clusters {
117+ finishedClusterCount := 0
118+ clusterUpdatingCount := 0
119+ var stuckClusterNames []string
120+ var clusterUpdateErrors []error
121+ // Go through each cluster in the stage and check if it's updating/succeeded/failed.
122+ for i := 0 ; i < len (updatingStageStatus .Clusters ) && clusterUpdatingCount < maxConcurrency ; i ++ {
112123 clusterStatus := & updatingStageStatus .Clusters [i ]
113- clusterStartedCond := meta .FindStatusCondition (clusterStatus .Conditions , string (placementv1beta1 .ClusterUpdatingConditionStarted ))
114124 clusterUpdateSucceededCond := meta .FindStatusCondition (clusterStatus .Conditions , string (placementv1beta1 .ClusterUpdatingConditionSucceeded ))
115- if condition .IsConditionStatusFalse (clusterUpdateSucceededCond , updateRun .GetGeneration ()) {
116- // The cluster is marked as failed to update.
117- failedErr := fmt .Errorf ("the cluster `%s` in the stage %s has failed" , clusterStatus .ClusterName , updatingStageStatus .StageName )
118- klog .ErrorS (failedErr , "The cluster has failed to be updated" , "updateRun" , updateRunRef )
119- return 0 , fmt .Errorf ("%w: %s" , errStagedUpdatedAborted , failedErr .Error ())
120- }
121125 if condition .IsConditionStatusTrue (clusterUpdateSucceededCond , updateRun .GetGeneration ()) {
122126 // The cluster has been updated successfully.
123127 finishedClusterCount ++
124128 continue
125129 }
126- // The cluster is either updating or not started yet.
130+ clusterUpdatingCount ++
131+ if condition .IsConditionStatusFalse (clusterUpdateSucceededCond , updateRun .GetGeneration ()) {
132+ // The cluster is marked as failed to update, this cluster is counted as updating cluster since it's not finished to avoid processing more clusters than maxConcurrency in this round.
133+ failedErr := fmt .Errorf ("the cluster `%s` in the stage %s has failed" , clusterStatus .ClusterName , updatingStageStatus .StageName )
134+ klog .ErrorS (failedErr , "The cluster has failed to be updated" , "updateRun" , updateRunRef )
135+ clusterUpdateErrors = append (clusterUpdateErrors , fmt .Errorf ("%w: %s" , errStagedUpdatedAborted , failedErr .Error ()))
136+ continue
137+ }
138+ // The cluster needs to be processed.
139+ clusterStartedCond := meta .FindStatusCondition (clusterStatus .Conditions , string (placementv1beta1 .ClusterUpdatingConditionStarted ))
127140 binding := toBeUpdatedBindingsMap [clusterStatus .ClusterName ]
128141 if ! condition .IsConditionStatusTrue (clusterStartedCond , updateRun .GetGeneration ()) {
129142 // The cluster has not started updating yet.
@@ -138,11 +151,13 @@ func (r *Reconciler) executeUpdatingStage(
138151 bindingSpec .ApplyStrategy = updateRunStatus .ApplyStrategy
139152 if err := r .Client .Update (ctx , binding ); err != nil {
140153 klog .ErrorS (err , "Failed to update binding to be bound with the matching spec of the updateRun" , "binding" , klog .KObj (binding ), "updateRun" , updateRunRef )
141- return 0 , controller .NewUpdateIgnoreConflictError (err )
154+ clusterUpdateErrors = append (clusterUpdateErrors , controller .NewUpdateIgnoreConflictError (err ))
155+ continue
142156 }
143157 klog .V (2 ).InfoS ("Updated the status of a binding to bound" , "binding" , klog .KObj (binding ), "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
144158 if err := r .updateBindingRolloutStarted (ctx , binding , updateRun ); err != nil {
145- return 0 , err
159+ clusterUpdateErrors = append (clusterUpdateErrors , err )
160+ continue
146161 }
147162 } else {
148163 klog .V (2 ).InfoS ("Found the first binding that is updating but the cluster status has not been updated" , "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
@@ -151,29 +166,33 @@ func (r *Reconciler) executeUpdatingStage(
151166 bindingSpec .State = placementv1beta1 .BindingStateBound
152167 if err := r .Client .Update (ctx , binding ); err != nil {
153168 klog .ErrorS (err , "Failed to update a binding to be bound" , "binding" , klog .KObj (binding ), "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
154- return 0 , controller .NewUpdateIgnoreConflictError (err )
169+ clusterUpdateErrors = append (clusterUpdateErrors , controller .NewUpdateIgnoreConflictError (err ))
170+ continue
155171 }
156172 klog .V (2 ).InfoS ("Updated the status of a binding to bound" , "binding" , klog .KObj (binding ), "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
157173 if err := r .updateBindingRolloutStarted (ctx , binding , updateRun ); err != nil {
158- return 0 , err
174+ clusterUpdateErrors = append (clusterUpdateErrors , err )
175+ continue
159176 }
160177 } else if ! condition .IsConditionStatusTrue (meta .FindStatusCondition (binding .GetBindingStatus ().Conditions , string (placementv1beta1 .ResourceBindingRolloutStarted )), binding .GetGeneration ()) {
161178 klog .V (2 ).InfoS ("The binding is bound and up-to-date but the generation is updated by the scheduler, update rolloutStarted status again" , "binding" , klog .KObj (binding ), "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
162179 if err := r .updateBindingRolloutStarted (ctx , binding , updateRun ); err != nil {
163- return 0 , err
180+ clusterUpdateErrors = append (clusterUpdateErrors , err )
181+ continue
164182 }
165183 } else {
166184 if _ , updateErr := checkClusterUpdateResult (binding , clusterStatus , updatingStageStatus , updateRun ); updateErr != nil {
167- return clusterUpdatingWaitTime , updateErr
185+ clusterUpdateErrors = append (clusterUpdateErrors , updateErr )
186+ continue
168187 }
169188 }
170189 }
171190 markClusterUpdatingStarted (clusterStatus , updateRun .GetGeneration ())
172191 if finishedClusterCount == 0 {
173192 markStageUpdatingStarted (updatingStageStatus , updateRun .GetGeneration ())
174193 }
175- // No need to continue as we only support one cluster updating at a time for now .
176- return clusterUpdatingWaitTime , nil
194+ // Need to continue as we need to process at most maxConcurrency number of clusters in parallel .
195+ continue
177196 }
178197
179198 // Now the cluster has to be updating, the binding should point to the right resource snapshot and the binding should be bound.
@@ -190,24 +209,35 @@ func (r *Reconciler) executeUpdatingStage(
190209 "bindingSpecInSync" , inSync , "bindingState" , bindingSpec .State ,
191210 "bindingRolloutStarted" , rolloutStarted , "binding" , klog .KObj (binding ), "updateRun" , updateRunRef )
192211 markClusterUpdatingFailed (clusterStatus , updateRun .GetGeneration (), preemptedErr .Error ())
193- return 0 , fmt .Errorf ("%w: %s" , errStagedUpdatedAborted , preemptedErr .Error ())
212+ clusterUpdateErrors = append (clusterUpdateErrors , fmt .Errorf ("%w: %s" , errStagedUpdatedAborted , preemptedErr .Error ()))
213+ continue
194214 }
195215
196216 finished , updateErr := checkClusterUpdateResult (binding , clusterStatus , updatingStageStatus , updateRun )
217+ if updateErr != nil {
218+ clusterUpdateErrors = append (clusterUpdateErrors , updateErr )
219+ }
197220 if finished {
198221 finishedClusterCount ++
199- markUpdateRunProgressing ( updateRun )
200- continue
222+ // The cluster has finished successfully, we can process another cluster in this round.
223+ clusterUpdatingCount --
201224 } else {
202225 // If cluster update has been running for more than "updateRunStuckThreshold", mark the update run as stuck.
203226 timeElapsed := time .Since (clusterStartedCond .LastTransitionTime .Time )
204227 if timeElapsed > updateRunStuckThreshold {
205228 klog .V (2 ).InfoS ("Time waiting for cluster update to finish passes threshold, mark the update run as stuck" , "time elapsed" , timeElapsed , "threshold" , updateRunStuckThreshold , "cluster" , clusterStatus .ClusterName , "stage" , updatingStageStatus .StageName , "updateRun" , updateRunRef )
206- markUpdateRunStuck ( updateRun , updatingStageStatus . StageName , clusterStatus .ClusterName )
229+ stuckClusterNames = append ( stuckClusterNames , clusterStatus .ClusterName )
207230 }
208231 }
209- // No need to continue as we only support one cluster updating at a time for now.
210- return clusterUpdatingWaitTime , updateErr
232+ }
233+
234+ // After processing maxConcurrency number of cluster, check if we need to mark the update run as stuck or progressing.
235+ aggregateUpdateRunStatus (updateRun , updatingStageStatus .StageName , stuckClusterNames )
236+
237+ // Aggregate and return errors.
238+ if len (clusterUpdateErrors ) > 0 {
239+ // Even though we aggregate errors, we can still check if one of the errors is a staged update aborted error by using errors.Is in the caller.
240+ return 0 , utilerrors .NewAggregate (clusterUpdateErrors )
211241 }
212242
213243 if finishedClusterCount == len (updatingStageStatus .Clusters ) {
@@ -232,6 +262,7 @@ func (r *Reconciler) executeUpdatingStage(
232262 }
233263 return waitTime , nil
234264 }
265+ // Some clusters are still updating.
235266 return clusterUpdatingWaitTime , nil
236267}
237268
@@ -431,6 +462,35 @@ func (r *Reconciler) updateApprovalRequestAccepted(ctx context.Context, appReq p
431462 return nil
432463}
433464
465+ // calculateMaxConcurrencyValue calculates the actual max concurrency value for a stage.
466+ // It converts the IntOrString maxConcurrency (which can be an integer or percentage) to an integer value
467+ // based on the total number of clusters in the stage. The value is rounded down with 1 at minimum.
468+ func calculateMaxConcurrencyValue (status * placementv1beta1.UpdateRunStatus , stageIndex int ) (int , error ) {
469+ specifiedMaxConcurrency := status .UpdateStrategySnapshot .Stages [stageIndex ].MaxConcurrency
470+ clusterCount := len (status .StagesStatus [stageIndex ].Clusters )
471+ // Round down the maxConcurrency to the number of clusters in the stage.
472+ maxConcurrencyValue , err := intstr .GetScaledValueFromIntOrPercent (specifiedMaxConcurrency , clusterCount , false )
473+ if err != nil {
474+ return 0 , err
475+ }
476+ // Handle the case where maxConcurrency is specified as percentage but results in 0 after scaling down.
477+ if maxConcurrencyValue == 0 {
478+ maxConcurrencyValue = 1
479+ }
480+ return maxConcurrencyValue , nil
481+ }
482+
483+ // aggregateUpdateRunStatus aggregates the status of the update run based on the cluster update status.
484+ // It marks the update run as stuck if any clusters are stuck, or as progressing if some clusters have finished updating.
485+ func aggregateUpdateRunStatus (updateRun placementv1beta1.UpdateRunObj , stageName string , stuckClusterNames []string ) {
486+ if len (stuckClusterNames ) > 0 {
487+ markUpdateRunStuck (updateRun , stageName , strings .Join (stuckClusterNames , ", " ))
488+ } else {
489+ // If there is no stuck cluster but some progress has been made, mark the update run as progressing.
490+ markUpdateRunProgressing (updateRun )
491+ }
492+ }
493+
434494// isBindingSyncedWithClusterStatus checks if the binding is up-to-date with the cluster status.
435495func isBindingSyncedWithClusterStatus (resourceSnapshotName string , updateRun placementv1beta1.UpdateRunObj , binding placementv1beta1.BindingObj , cluster * placementv1beta1.ClusterUpdatingStatus ) bool {
436496 bindingSpec := binding .GetBindingSpec ()
@@ -544,14 +604,14 @@ func markUpdateRunProgressingIfNotWaitingOrStuck(updateRun placementv1beta1.Upda
544604}
545605
546606// markUpdateRunStuck marks the updateRun as stuck in memory.
547- func markUpdateRunStuck (updateRun placementv1beta1.UpdateRunObj , stageName , clusterName string ) {
607+ func markUpdateRunStuck (updateRun placementv1beta1.UpdateRunObj , stageName , clusterNames string ) {
548608 updateRunStatus := updateRun .GetUpdateRunStatus ()
549609 meta .SetStatusCondition (& updateRunStatus .Conditions , metav1.Condition {
550610 Type : string (placementv1beta1 .StagedUpdateRunConditionProgressing ),
551611 Status : metav1 .ConditionFalse ,
552612 ObservedGeneration : updateRun .GetGeneration (),
553613 Reason : condition .UpdateRunStuckReason ,
554- Message : fmt .Sprintf ("The updateRun is stuck waiting for cluster %s in stage %s to finish updating, please check placement status for potential errors" , clusterName , stageName ),
614+ Message : fmt .Sprintf ("The updateRun is stuck waiting for cluster(s) %s in stage %s to finish updating, please check placement status for potential errors" , clusterNames , stageName ),
555615 })
556616}
557617
0 commit comments