@@ -28,12 +28,20 @@ import (
28
28
apierrors "k8s.io/apimachinery/pkg/api/errors"
29
29
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
30
30
"k8s.io/apimachinery/pkg/labels"
31
+ "k8s.io/apimachinery/pkg/types"
32
+ "sigs.k8s.io/controller-runtime/pkg/builder"
31
33
"sigs.k8s.io/controller-runtime/pkg/client"
34
+ "sigs.k8s.io/controller-runtime/pkg/handler"
32
35
"sigs.k8s.io/controller-runtime/pkg/manager"
36
+ "sigs.k8s.io/controller-runtime/pkg/reconcile"
33
37
"slices"
34
38
"time"
35
39
)
36
40
41
+ const (
42
+ waitCacheTimeout = 10 * time .Second
43
+ )
44
+
37
45
// Actor reconciles CN Claim
38
46
type Actor struct {
39
47
clientMgr * hacli.HAKeeperClientManager
@@ -146,7 +154,7 @@ func (r *Actor) doClaimCN(ctx *recon.Context[*v1alpha1.CNClaim], orphans []corev
146
154
return nil , errors .Wrap (err , "error list idle Pods" )
147
155
}
148
156
149
- slices . SortFunc ( idleCNs , priorityFunc ( c ) )
157
+ sortCNByPriority ( c , idleCNs )
150
158
for i := range idleCNs {
151
159
pod := & idleCNs [i ]
152
160
pod .Labels [v1alpha1 .CNPodPhaseLabel ] = v1alpha1 .CNPodPhaseBound
@@ -191,7 +199,32 @@ func (r *Actor) bindPod(ctx *recon.Context[*v1alpha1.CNClaim], pod *corev1.Pod,
191
199
}
192
200
193
201
func (r * Actor ) Sync (ctx * recon.Context [* v1alpha1.CNClaim ]) error {
194
- // TODO: monitor pod health
202
+ c := ctx .Obj
203
+ switch c .Status .Phase {
204
+ case v1alpha1 .CNClaimPhasePending :
205
+ return errors .Errorf ("CN Claim %s/%s is pending, should bind it first" , c .Namespace , c .Name )
206
+ case v1alpha1 .CNClaimPhaseLost :
207
+ return nil
208
+ case v1alpha1 .CNClaimPhaseBound , v1alpha1 .CNClaimPhaseOutdated :
209
+ // noop
210
+ default :
211
+ return errors .Errorf ("CN Claim %s/%s is in unknown phase %s" , c .Namespace , c .Name , c .Status .Phase )
212
+ }
213
+ pod := & corev1.Pod {}
214
+ err := ctx .Get (types.NamespacedName {Namespace : c .Namespace , Name : c .Spec .PodName }, pod )
215
+ if err != nil {
216
+ if apierrors .IsNotFound (err ) {
217
+ if c .Status .BoundTime != nil && time .Since (c .Status .BoundTime .Time ) < waitCacheTimeout {
218
+ return recon .ErrReSync ("pod status may be not update to date, wait" , waitCacheTimeout )
219
+ }
220
+ c .Status .Phase = v1alpha1 .CNClaimPhaseLost
221
+ return nil
222
+ }
223
+ }
224
+ if pod .Status .Phase == corev1 .PodSucceeded || pod .Status .Phase == corev1 .PodUnknown {
225
+ c .Status .Phase = v1alpha1 .CNClaimPhaseLost
226
+ return nil
227
+ }
195
228
return nil
196
229
}
197
230
@@ -223,7 +256,7 @@ func (r *Actor) Finalize(ctx *recon.Context[*v1alpha1.CNClaim]) (bool, error) {
223
256
// set the CN Pod to draining phase and let the draining process handle recycling
224
257
if err := ctx .Patch (& cn , func () error {
225
258
cn .Labels [v1alpha1 .CNPodPhaseLabel ] = v1alpha1 .CNPodPhaseDraining
226
- delete (cn .Labels , v1alpha1 .ClaimOwnerNameLabel )
259
+ delete (cn .Labels , v1alpha1 .PodClaimedByLabel )
227
260
if cn .Annotations == nil {
228
261
cn .Annotations = map [string ]string {}
229
262
}
@@ -268,7 +301,24 @@ func (r *Actor) patchStore(ctx *recon.Context[*v1alpha1.CNClaim], pod *corev1.Po
268
301
}
269
302
270
303
func (r * Actor ) Start (mgr manager.Manager ) error {
271
- return recon .Setup [* v1alpha1.CNClaim ](& v1alpha1.CNClaim {}, "cn-claim-manager" , mgr , r )
304
+ return recon .Setup [* v1alpha1.CNClaim ](& v1alpha1.CNClaim {}, "cn-claim-manager" , mgr , r , recon .WithBuildFn (func (b * builder.Builder ) {
305
+ b .Watches (& corev1.Pod {}, handler .EnqueueRequestsFromMapFunc (func (ctx context.Context , object client.Object ) []reconcile.Request {
306
+ pod , ok := object .(* corev1.Pod )
307
+ if ! ok {
308
+ return nil
309
+ }
310
+ claimName , ok := pod .Labels [v1alpha1 .PodClaimedByLabel ]
311
+ if ! ok {
312
+ return nil
313
+ }
314
+ return []reconcile.Request {{
315
+ NamespacedName : types.NamespacedName {
316
+ Namespace : pod .Namespace ,
317
+ Name : claimName ,
318
+ },
319
+ }}
320
+ }))
321
+ }))
272
322
}
273
323
274
324
func toStoreStatus (cn * metadata.CNService ) v1alpha1.CNStoreStatus {
@@ -293,6 +343,10 @@ func toStoreStatus(cn *metadata.CNService) v1alpha1.CNStoreStatus {
293
343
}
294
344
}
295
345
346
+ func sortCNByPriority (c * v1alpha1.CNClaim , pods []corev1.Pod ) {
347
+ slices .SortFunc (pods , priorityFunc (c ))
348
+ }
349
+
296
350
func priorityFunc (c * v1alpha1.CNClaim ) func (a , b corev1.Pod ) int {
297
351
return func (a , b corev1.Pod ) int {
298
352
// 1. claim the previously used pod first
0 commit comments