@@ -58,66 +58,68 @@ func (r *CLBBindingReconciler[T]) sync(ctx context.Context, bd T) (result ctrl.R
5858 }
5959 }
6060 // 确保所有端口都已分配且绑定 obj
61- if newResult , err := r .ensureCLBBinding (ctx , bd ); err != nil {
61+ if err := r .ensureCLBBinding (ctx , bd ); err != nil {
6262 errCause := errors .Cause (err )
63+ // 1. 扩容了 lb、或者正在扩容,忽略,因为会自动触发对账。
64+ // 2. 端口不足无法分配、端口池不存在,忽略,因为如果端口池不改正或扩容 lb,无法重试成功。
65+ // 3. lb 被删除或监听器被删除,自动移除了 status 中的记录,需重新入队对账。
6366 switch errCause {
64- case portpool .ErrNewLBCreated , portpool .ErrNewLBCreating : // 扩容了 lb,或者正在扩容,忽略,因为会自动触发对账
67+ case portpool .ErrNewLBCreated , portpool .ErrNewLBCreating , ErrNeedRetry :
68+ return result , nil
69+ case portpool .ErrNoPortAvailable :
70+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "NoPortAvailable" , "no port available in port pool, please add clb to port pool" )
71+ return result , nil
72+ case portpool .ErrPoolNotFound :
73+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "PoolNotFound" , "port pool not found, please check the port pool name" )
6574 return result , nil
6675 }
6776 // 如果是被云 API 限流(默认每秒 20 qps 限制),1s 后重新入队
6877 if clb .IsRequestLimitExceededError (errCause ) {
6978 result .RequeueAfter = time .Second
7079 return result , nil
7180 }
72- // 其它非资源冲突的错误,将错误记录到状态中方便排障
73- if ! apierrors .IsConflict (errCause ) {
81+
82+ if apierrors .IsConflict (errCause ) { // 资源冲突错误,直接重新入队触发重试
83+ result .Requeue = true
84+ return result , nil
85+ } else { // 其它非资源冲突的错误,将错误记录到 event 和状态中方便排障
86+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "SyncFailed" , errCause .Error ())
7487 if status .State != networkingv1alpha1 .CLBBindingStateFailed {
7588 status .State = networkingv1alpha1 .CLBBindingStateFailed
76- status .Message = err .Error ()
89+ status .Message = errCause .Error ()
7790 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
7891 return result , errors .WithStack (err )
7992 }
8093 }
81- // lb 已不存在,没必要重新入队对账,保持 Failed 状态即可。
94+ // lb 已不存在,没必要重新入队对账,不返回错误, 保持 Failed 状态即可。
8295 if clb .IsLbIdNotFoundError (errCause ) {
8396 return result , nil
8497 }
98+ // 其它错误,返回错误触发重试
8599 return result , errors .WithStack (err )
86100 }
87- } else {
88- if newResult != nil {
89- result = * newResult
90- }
91101 }
92102 return result , err
93103}
94104
95- func (r * CLBBindingReconciler [T ]) ensureCLBBinding (ctx context.Context , bd clbbinding.CLBBinding ) ( result * ctrl. Result , err error ) {
105+ func (r * CLBBindingReconciler [T ]) ensureCLBBinding (ctx context.Context , bd clbbinding.CLBBinding ) error {
96106 // 确保依赖的端口池和 CLB 都存在,如果已删除则释放端口并更新状态
97107 if err := r .ensurePoolAndCLB (ctx , bd ); err != nil {
98- return result , errors .WithStack (err )
108+ return errors .WithStack (err )
99109 }
100110 // 确保所有端口都被分配
101- if result , err := r .ensurePortAllocated (ctx , bd ); err != nil {
102- return result , errors .WithStack (err )
103- } else {
104- if result != nil {
105- return result , nil
106- }
111+ if err := r .ensurePortAllocated (ctx , bd ); err != nil {
112+ return errors .WithStack (err )
107113 }
108114 // 确保所有监听器都已创建
109- if result , err = r .ensureListeners (ctx , bd ); err != nil {
110- return result , errors .WithStack (err )
111- } else {
112- if result != nil {
113- return result , nil
114- }
115+ if err := r .ensureListeners (ctx , bd ); err != nil {
116+ return errors .WithStack (err )
115117 }
116- // 确保所有监听器都已绑定到 obj
118+ // 确保所有监听器都已绑定到 backend
117119 if err := r .ensureBackendBindings (ctx , bd ); err != nil {
118- return result , errors .WithStack (err )
120+ return errors .WithStack (err )
119121 }
120- return result , nil
122+ return nil
121123}
122124
123125func (r * CLBBindingReconciler [T ]) ensurePoolAndCLB (ctx context.Context , bd clbbinding.CLBBinding ) error {
@@ -151,17 +153,19 @@ func (r *CLBBindingReconciler[T]) ensurePoolAndCLB(ctx context.Context, bd clbbi
151153 return nil
152154}
153155
156+ var ErrNeedRetry = errors .New ("need retry" )
157+
154158// TODO: 优化性能:一次性查询所有监听器信息
155- func (r * CLBBindingReconciler [T ]) ensureListeners (ctx context.Context , bd clbbinding.CLBBinding ) (result * ctrl.Result , err error ) {
156- log .FromContext (ctx ).V (10 ).Info ("ensureListeners" )
159+ func (r * CLBBindingReconciler [T ]) ensureListeners (ctx context.Context , bd clbbinding.CLBBinding ) error {
157160 newBindings := []networkingv1alpha1.PortBindingStatus {}
158161 needUpdate := false
162+ needRetry := false
159163 status := bd .GetStatus ()
160164 for i := range status .PortBindings {
161165 binding := & status .PortBindings [i ]
162- op , err := r .ensureListener (ctx , binding )
166+ op , err := r .ensureListener (ctx , bd , binding )
163167 if err != nil {
164- return result , errors .WithStack (err )
168+ return errors .WithStack (err )
165169 }
166170 switch op {
167171 case util .StatusOpNone :
@@ -171,67 +175,51 @@ func (r *CLBBindingReconciler[T]) ensureListeners(ctx context.Context, bd clbbin
171175 newBindings = append (newBindings , * binding )
172176 case util .StatusOpDelete :
173177 needUpdate = true
174- result = & ctrl.Result {}
175- result .Requeue = true
178+ needRetry = true
176179 }
177180 }
178181 if needUpdate {
179182 status .PortBindings = newBindings
180183 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
181- return result , errors .WithStack (err )
184+ return errors .WithStack (err )
182185 }
183186 }
184- return result , nil
187+ if needRetry {
188+ return ErrNeedRetry
189+ }
190+ return nil
185191}
186192
187193func (r * CLBBindingReconciler [T ]) ensureBackendBindings (ctx context.Context , bd clbbinding.CLBBinding ) error {
188194 status := bd .GetStatus ()
189- log .FromContext (ctx ).V (10 ).Info ("ensureBackendBindings" )
190- ensureWaitForBackend := func () error {
191- if err := bd .EnsureWaitBackendState (ctx , r .Client ); err != nil {
192- return errors .WithStack (err )
193- }
194- needUpdate := false
195- for i := range status .PortBindings {
196- binding := & status .PortBindings [i ]
197- if binding .ListenerId != "" {
198- if err := clb .DeregisterAllTargets (ctx , binding .Region , binding .LoadbalancerId , binding .ListenerId ); err != nil {
199- return errors .WithStack (err )
200- }
201- needUpdate = true
202- }
203- }
204- if needUpdate {
205- if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
206- return errors .WithStack (err )
207- }
208- }
209- return nil
210- }
211195 backend , err := bd .GetAssociatedObject (ctx , r .Client )
212196 if err != nil {
213- if apierrors .IsNotFound (err ) { // 后端不存在,一般不存在,因为clbbinding 也会被 gc 自动清理,除非是手动管理 clbbinding
214- if err := ensureWaitForBackend ( ); err != nil {
197+ if apierrors .IsNotFound (errors . Cause ( err )) { // 后端不存在,一般是网络隔离场景,保持待绑定状态(正常情况 clbbinding 的 OwnerReference 是 pod/node,它们被清理后 clbbinding 也会被 gc 自动清理)
198+ if err = r . ensureState ( ctx , bd , networkingv1alpha1 . CLBBindingStateNoBackend ); err != nil {
215199 return errors .WithStack (err )
216200 }
217201 return nil
218202 }
203+ // 其它错误,直接返回
204+ return errors .WithStack (err )
219205 }
220- if backend .GetIP () == "" { // 等待 obj 分配 IP
221- if err := ensureWaitForBackend (); err != nil {
206+ if backend .GetIP () == "" { // 等待 backend 分配 IP
207+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeNormal , "WaitBackend" , "wait backend network to be ready" )
208+ if err = r .ensureState (ctx , bd , networkingv1alpha1 .CLBBindingStateWaitBackend ); err != nil {
222209 return errors .WithStack (err )
223210 }
224211 return nil
225212 }
226- // obj 准备就绪,将 CLB 监听器绑定到 obj
213+ // backend 准备就绪,将 CLB 监听器绑定到 bacekend
227214 for i := range status .PortBindings {
228215 binding := & status .PortBindings [i ]
229- if err := r .ensurePortBound (ctx , backend , binding ); err != nil {
216+ if err := r .ensurePortBound (ctx , bd , backend , binding ); err != nil {
230217 return errors .WithStack (err )
231218 }
232219 }
233- // 所有端口都已绑定,更新状态并将绑定信息写入 obj 注解
220+ // 所有端口都已绑定,更新状态并将绑定信息写入 backend 注解
234221 if status .State != networkingv1alpha1 .CLBBindingStateBound {
222+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeNormal , "AllBound" , "all targets bound to listener" )
235223 status .State = networkingv1alpha1 .CLBBindingStateBound
236224 status .Message = ""
237225 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
@@ -316,14 +304,13 @@ func (r *CLBBindingReconciler[T]) ensureBackendStatusAnnotation(ctx context.Cont
316304 if err := kube .PatchMap (ctx , r .Client , backend .GetObject (), patchMap ); err != nil {
317305 return errors .WithStack (err )
318306 }
319- log .FromContext (ctx ).V (10 ).Info ("patch clb port mapping status success" , "value" , string (val ))
307+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeNormal , "PatchAnnotation" , "clb port mapping result annotation is been patched" )
308+ log .FromContext (ctx ).V (3 ).Info ("patch clb port mapping status success" , "value" , string (val ))
320309 return nil
321310}
322311
323- func (r * CLBBindingReconciler [T ]) ensureListener (ctx context.Context , binding * networkingv1alpha1.PortBindingStatus ) (op util.StatusOp , err error ) {
324- log .FromContext (ctx ).V (10 ).Info ("ensureListener" , "port" , binding .Port , "protocol" , binding .Protocol )
312+ func (r * CLBBindingReconciler [T ]) ensureListener (ctx context.Context , bd clbbinding.CLBBinding , binding * networkingv1alpha1.PortBindingStatus ) (op util.StatusOp , err error ) {
325313 createListener := func () {
326- log .FromContext (ctx ).V (10 ).Info ("create listener" )
327314 var lisId string
328315 lisId , err = clb .CreateListenerTryBatch (
329316 ctx ,
@@ -335,12 +322,13 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, binding *n
335322 "" ,
336323 )
337324 if err != nil {
338- err = errors .Wrapf (err , "failed to create listener %d/%s" , binding .Port , binding .Protocol )
325+ r .Recorder .Eventf (bd .GetObject (), corev1 .EventTypeWarning , "CreateListener" , "failed to create clb listener for %d/%s: %s" , binding .Port , binding .Protocol , err .Error ())
326+ err = errors .Wrapf (err , "failed to create clb listener %d/%s" , binding .Port , binding .Protocol )
339327 return
340328 } else { // 创建监听器成功,更新状态
341329 binding .ListenerId = lisId
342330 op = util .StatusOpUpdate
343- log . FromContext ( ctx ). V ( 10 ). Info ( " create listener success" , "listenerId" , lisId )
331+ r . Recorder . Eventf ( bd . GetObject (), corev1 . EventTypeNormal , "CreateListener" , " create clb listener success for %d/%s: %d/%s " , binding . Port , binding . Protocol , binding . LoadbalancerPort , lisId )
344332 }
345333 }
346334 var lis * clb.Listener
@@ -359,7 +347,6 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, binding *n
359347 return
360348 } else {
361349 if lis == nil { // 还未创建监听器,执行创建
362- log .FromContext (ctx ).V (10 ).Info ("listener not create yet" )
363350 createListener ()
364351 } else { // 已创建监听器,检查是否符合预期
365352 if lis .ListenerId != binding .ListenerId { // id 不匹配,包括还未写入 id 的情况,更新下 id
@@ -372,8 +359,7 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, binding *n
372359 return
373360}
374361
375- func (r * CLBBindingReconciler [T ]) ensurePortBound (ctx context.Context , backend clbbinding.Backend , binding * networkingv1alpha1.PortBindingStatus ) error {
376- log .FromContext (ctx ).V (10 ).Info ("ensurePortBound" , "port" , binding .Port , "protocol" , binding .Protocol )
362+ func (r * CLBBindingReconciler [T ]) ensurePortBound (ctx context.Context , bd clbbinding.CLBBinding , backend clbbinding.Backend , binding * networkingv1alpha1.PortBindingStatus ) error {
377363 targets , err := clb .DescribeTargetsTryBatch (ctx , binding .Region , binding .LoadbalancerId , binding .ListenerId )
378364 if err != nil {
379365 return errors .WithStack (err )
@@ -389,19 +375,18 @@ func (r *CLBBindingReconciler[T]) ensurePortBound(ctx context.Context, backend c
389375 alreadyAdded = true
390376 } else {
391377 targetToDelete = append (targetToDelete , target )
392- log .FromContext (ctx ).V (10 ).Info ("remove unexpected target" , "got" , target , "expect" , backendTarget )
393378 }
394379 }
395380 // 清理多余的 rs
396381 if len (targetToDelete ) > 0 {
397- log . FromContext ( ctx ). V ( 10 ). Info ( "deregister targets" , "targets " , targetToDelete )
382+ r . Recorder . Eventf ( bd . GetObject (), corev1 . EventTypeNormal , "DeregisterTarget" , "remove unexpected target: %v " , targetToDelete )
398383 if err := clb .DeregisterTargetsForListenerTryBatch (ctx , binding .Region , binding .LoadbalancerId , binding .ListenerId , targetToDelete ... ); err != nil {
399384 return errors .WithStack (err )
400385 }
401386 }
402387 // 绑定后端
403388 if ! alreadyAdded {
404- log . FromContext ( ctx ). V ( 10 ). Info ( "register target" , "target" , backendTarget )
389+ r . Recorder . Eventf ( bd . GetObject (), corev1 . EventTypeNormal , "RegisterTarget" , "register target %v to %d/%s " , backendTarget , binding . LoadbalancerPort , binding . ListenerId )
405390 startTime := time .Now ()
406391 if err := clb .RegisterTarget (ctx , binding .Region , binding .LoadbalancerId , binding .ListenerId , backendTarget ); err != nil {
407392 return errors .WithStack (err )
@@ -412,7 +397,7 @@ func (r *CLBBindingReconciler[T]) ensurePortBound(ctx context.Context, backend c
412397 return nil
413398}
414399
415- func (r * CLBBindingReconciler [T ]) ensurePortAllocated (ctx context.Context , bd clbbinding.CLBBinding ) ( result * ctrl. Result , err error ) {
400+ func (r * CLBBindingReconciler [T ]) ensurePortAllocated (ctx context.Context , bd clbbinding.CLBBinding ) error {
416401 status := bd .GetStatus ()
417402 bindings := make (map [portKey ]* networkingv1alpha1.PortBindingStatus )
418403 bds := []networkingv1alpha1.PortBindingStatus {}
@@ -435,7 +420,7 @@ func (r *CLBBindingReconciler[T]) ensurePortAllocated(ctx context.Context, bd cl
435420 if haveLbRemoved {
436421 status .PortBindings = bds
437422 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
438- return result , errors .WithStack (err )
423+ return errors .WithStack (err )
439424 }
440425 }
441426 var allocatedPorts portpool.PortAllocations
@@ -476,20 +461,7 @@ LOOP_PORT:
476461 // 未分配端口,执行分配
477462 allocated , err := portpool .Allocator .Allocate (ctx , port .Pools , port .Protocol , util .GetValue (port .UseSamePortAcrossPools ))
478463 if err != nil {
479- causeErr := errors .Cause (err )
480- if causeErr == portpool .ErrNoPortAvailable || causeErr == portpool .ErrPoolNotFound { // 端口不足,或端口池不存在,在 event 里告警,不返回错误
481- msg := causeErr .Error ()
482- r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "AllocatePortFailed" , msg )
483- if status .State != networkingv1alpha1 .CLBBindingStateFailed {
484- status .State = networkingv1alpha1 .CLBBindingStateFailed
485- status .Message = msg
486- }
487- if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
488- return result , errors .WithStack (err )
489- }
490- return result , nil
491- }
492- return result , errors .WithStack (err )
464+ return errors .WithStack (err )
493465 }
494466 for _ , allocatedPort := range allocated {
495467 binding := networkingv1alpha1.PortBindingStatus {
@@ -514,7 +486,7 @@ LOOP_PORT:
514486 for _ , binding := range bindings {
515487 _ , err := clb .DeleteListenerByPort (ctx , binding .Region , binding .LoadbalancerId , int64 (binding .LoadbalancerPort ), binding .Protocol )
516488 if err != nil {
517- return result , errors .WithStack (err )
489+ return errors .WithStack (err )
518490 }
519491 }
520492 statuses := []networkingv1alpha1.PortBindingStatus {}
@@ -532,15 +504,15 @@ LOOP_PORT:
532504 }
533505
534506 if len (allocatedPorts ) == 0 && len (bindings ) == 0 { // 没有新端口分配,也没有多余端口需要删除,直接返回
535- return result , nil
507+ return nil
536508 }
537509 // 将已分配的端口写入 status
538510 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
539511 // 更新状态失败,释放已分配端口
540512 allocatedPorts .Release ()
541- return result , errors .WithStack (err )
513+ return errors .WithStack (err )
542514 }
543- return result , nil
515+ return nil
544516}
545517
546518func portFromPortBindingStatus (status * networkingv1alpha1.PortBindingStatus ) portpool.ProtocolPort {
0 commit comments