@@ -18,12 +18,14 @@ import (
1818 ctrl "sigs.k8s.io/controller-runtime"
1919 "sigs.k8s.io/controller-runtime/pkg/client"
2020 "sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"
21+ "sigs.k8s.io/controller-runtime/pkg/event"
2122 "sigs.k8s.io/controller-runtime/pkg/log"
2223
2324 networkingv1alpha1 "github.com/imroc/tke-extend-network-controller/api/v1alpha1"
2425 "github.com/imroc/tke-extend-network-controller/internal/clbbinding"
2526 "github.com/imroc/tke-extend-network-controller/internal/portpool"
2627 "github.com/imroc/tke-extend-network-controller/pkg/clb"
28+ "github.com/imroc/tke-extend-network-controller/pkg/eventsource"
2729 "github.com/imroc/tke-extend-network-controller/pkg/kube"
2830 "github.com/imroc/tke-extend-network-controller/pkg/util"
2931 "github.com/pkg/errors"
@@ -56,7 +58,7 @@ func (r *CLBBindingReconciler[T]) sync(ctx context.Context, bd T) (result ctrl.R
5658 }
5759 }
5860 // 确保所有端口都已分配且绑定 obj
59- if err := r .ensureCLBBinding (ctx , bd ); err != nil {
61+ if newResult , err := r .ensureCLBBinding (ctx , bd ); err != nil {
6062 // 如果是等待端口池扩容 CLB,确保状态为 WaitForLB,并重新入队,以便在 CLB 扩容完成后能自动分配端口并绑定 obj
6163 if errors .Is (err , portpool .ErrWaitLBScale ) {
6264 result .RequeueAfter = 3 * time .Second
@@ -78,33 +80,44 @@ func (r *CLBBindingReconciler[T]) sync(ctx context.Context, bd T) (result ctrl.R
7880 }
7981 // lb 已不存在,没必要重新入队对账,保持 Failed 状态即可。
8082 if clb .IsLbIdNotFoundError (errors .Cause (err )) {
81- log .FromContext (ctx ).Error (err , "CLB is not exists" )
8283 return result , nil
8384 }
8485 return result , errors .WithStack (err )
8586 }
87+ } else {
88+ if newResult != nil {
89+ result = * newResult
90+ }
8691 }
8792 return result , err
8893}
8994
90- func (r * CLBBindingReconciler [T ]) ensureCLBBinding (ctx context.Context , bd clbbinding.CLBBinding ) error {
95+ func (r * CLBBindingReconciler [T ]) ensureCLBBinding (ctx context.Context , bd clbbinding.CLBBinding ) ( result * ctrl. Result , err error ) {
9196 // 确保依赖的端口池和 CLB 都存在,如果已删除则释放端口并更新状态
9297 if err := r .ensurePoolAndCLB (ctx , bd ); err != nil {
93- return err
98+ return result , errors . WithStack ( err )
9499 }
95100 // 确保所有端口都被分配
96- if err := r .ensurePortAllocated (ctx , bd ); err != nil {
97- return errors .WithStack (err )
101+ if result , err := r .ensurePortAllocated (ctx , bd ); err != nil {
102+ return result , errors .WithStack (err )
103+ } else {
104+ if result != nil {
105+ return result , nil
106+ }
98107 }
99108 // 确保所有监听器都已创建
100- if err := r .ensureListeners (ctx , bd ); err != nil {
101- return errors .WithStack (err )
109+ if result , err = r .ensureListeners (ctx , bd ); err != nil {
110+ return result , errors .WithStack (err )
111+ } else {
112+ if result != nil {
113+ return result , nil
114+ }
102115 }
103116 // 确保所有监听器都已绑定到 obj
104117 if err := r .ensureBackendBindings (ctx , bd ); err != nil {
105- return errors .WithStack (err )
118+ return result , errors .WithStack (err )
106119 }
107- return nil
120+ return result , nil
108121}
109122
110123func (r * CLBBindingReconciler [T ]) ensurePoolAndCLB (ctx context.Context , bd clbbinding.CLBBinding ) error {
@@ -138,22 +151,37 @@ func (r *CLBBindingReconciler[T]) ensurePoolAndCLB(ctx context.Context, bd clbbi
138151 return nil
139152}
140153
141- func (r * CLBBindingReconciler [T ]) ensureListeners (ctx context.Context , bd clbbinding.CLBBinding ) error {
154+ // TODO: 优化性能:一次性查询所有监听器信息
155+ func (r * CLBBindingReconciler [T ]) ensureListeners (ctx context.Context , bd clbbinding.CLBBinding ) (result * ctrl.Result , err error ) {
142156 log .FromContext (ctx ).V (10 ).Info ("ensureListeners" )
157+ newBindings := []networkingv1alpha1.PortBindingStatus {}
158+ needUpdate := false
143159 status := bd .GetStatus ()
144160 for i := range status .PortBindings {
145161 binding := & status .PortBindings [i ]
146- needUpdate , err := r .ensureListener (ctx , binding )
162+ op , err := r .ensureListener (ctx , binding )
147163 if err != nil {
148- return errors .WithStack (err )
164+ return result , errors .WithStack (err )
149165 }
150- if needUpdate {
151- if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
152- return errors .WithStack (err )
153- }
166+ switch op {
167+ case util .StatusOpNone :
168+ newBindings = append (newBindings , * binding )
169+ case util .StatusOpUpdate :
170+ needUpdate = true
171+ newBindings = append (newBindings , * binding )
172+ case util .StatusOpDelete :
173+ needUpdate = true
174+ result = & ctrl.Result {}
175+ result .Requeue = true
154176 }
155177 }
156- return nil
178+ if needUpdate {
179+ status .PortBindings = newBindings
180+ if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
181+ return result , errors .WithStack (err )
182+ }
183+ }
184+ return result , nil
157185}
158186
159187func (r * CLBBindingReconciler [T ]) ensureBackendBindings (ctx context.Context , bd clbbinding.CLBBinding ) error {
@@ -292,7 +320,7 @@ func (r *CLBBindingReconciler[T]) ensureBackendStatusAnnotation(ctx context.Cont
292320 return nil
293321}
294322
295- func (r * CLBBindingReconciler [T ]) ensureListener (ctx context.Context , binding * networkingv1alpha1.PortBindingStatus ) (needUpdate bool , err error ) {
323+ func (r * CLBBindingReconciler [T ]) ensureListener (ctx context.Context , binding * networkingv1alpha1.PortBindingStatus ) (op util. StatusOp , err error ) {
296324 log .FromContext (ctx ).V (10 ).Info ("ensureListener" , "port" , binding .Port , "protocol" , binding .Protocol )
297325 createListener := func () {
298326 log .FromContext (ctx ).V (10 ).Info ("create listener" )
@@ -311,13 +339,23 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, binding *n
311339 return
312340 } else { // 创建监听器成功,更新状态
313341 binding .ListenerId = lisId
314- needUpdate = true
342+ op = util . StatusOpUpdate
315343 log .FromContext (ctx ).V (10 ).Info ("create listener success" , "listenerId" , lisId )
316344 }
317345 }
318346 var lis * clb.Listener
319347 if lis , err = clb .GetListenerByIdOrPort (ctx , binding .Region , binding .LoadbalancerId , binding .ListenerId , int64 (binding .LoadbalancerPort ), binding .Protocol ); err != nil {
320- err = errors .Wrapf (err , "failed to get listener by port %d/%s" , binding .Port , binding .Protocol )
348+ if clb .IsLbIdNotFoundError (errors .Cause (err )) { // lb 已删除,通知关联的端口池重新对账
349+ pp := & networkingv1alpha1.CLBPortPool {}
350+ pp .Name = binding .Pool
351+ eventsource .PortPool <- event.TypedGenericEvent [client.Object ]{
352+ Object : pp ,
353+ }
354+ op = util .StatusOpDelete
355+ err = errors .WithStack (err )
356+ } else {
357+ err = errors .Wrapf (err , "failed to get listener by port %d/%s" , binding .Port , binding .Protocol )
358+ }
321359 return
322360 } else {
323361 if lis == nil { // 还未创建监听器,执行创建
@@ -327,7 +365,7 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, binding *n
327365 if lis .ListenerId != binding .ListenerId { // id 不匹配,包括还未写入 id 的情况,更新下 id
328366 log .FromContext (ctx ).V (10 ).Info ("listenerId not match, need update" , "expect" , binding .ListenerId , "actual" , lis .ListenerId )
329367 binding .ListenerId = lis .ListenerId
330- needUpdate = true
368+ op = util . StatusOpUpdate
331369 }
332370 }
333371 }
@@ -374,7 +412,7 @@ func (r *CLBBindingReconciler[T]) ensurePortBound(ctx context.Context, backend c
374412 return nil
375413}
376414
377- func (r * CLBBindingReconciler [T ]) ensurePortAllocated (ctx context.Context , bd clbbinding.CLBBinding ) error {
415+ func (r * CLBBindingReconciler [T ]) ensurePortAllocated (ctx context.Context , bd clbbinding.CLBBinding ) ( result * ctrl. Result , err error ) {
378416 status := bd .GetStatus ()
379417 bindings := make (map [portKey ]* networkingv1alpha1.PortBindingStatus )
380418 bds := []networkingv1alpha1.PortBindingStatus {}
@@ -397,7 +435,7 @@ func (r *CLBBindingReconciler[T]) ensurePortAllocated(ctx context.Context, bd cl
397435 if haveLbRemoved {
398436 status .PortBindings = bds
399437 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
400- return errors .WithStack (err )
438+ return result , errors .WithStack (err )
401439 }
402440 }
403441 var allocatedPorts portpool.PortAllocations
@@ -438,11 +476,22 @@ LOOP_PORT:
438476 // 未分配端口,执行分配
439477 allocated , err := portpool .Allocator .Allocate (ctx , port .Pools , port .Protocol , util .GetValue (port .UseSamePortAcrossPools ))
440478 if err != nil {
441- if errors .Is (err , portpool .ErrNoPortAvailable ) { // 端口不足,在 event 里告警,不返回错误
442- r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "NoPortAvailable" , "no port available in port pool" )
443- return nil
479+ causeErr := errors .Cause (err )
480+ if causeErr == portpool .ErrNoPortAvailable || causeErr == portpool .ErrPoolNotFound { // 端口不足,或端口池不存在,在 event 里告警,不返回错误
481+ msg := causeErr .Error ()
482+ r .Recorder .Event (bd .GetObject (), corev1 .EventTypeWarning , "AllocatePortFailed" , msg )
483+ if status .State != networkingv1alpha1 .CLBBindingStateFailed {
484+ status .State = networkingv1alpha1 .CLBBindingStateFailed
485+ status .Message = msg
486+ }
487+ if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
488+ return result , errors .WithStack (err )
489+ }
490+ result = & ctrl.Result {}
491+ result .RequeueAfter = 2 * time .Second
492+ return result , nil
444493 }
445- return errors .WithStack (err )
494+ return result , errors .WithStack (err )
446495 }
447496 for _ , allocatedPort := range allocated {
448497 binding := networkingv1alpha1.PortBindingStatus {
@@ -467,7 +516,7 @@ LOOP_PORT:
467516 for _ , binding := range bindings {
468517 _ , err := clb .DeleteListenerByPort (ctx , binding .Region , binding .LoadbalancerId , int64 (binding .LoadbalancerPort ), binding .Protocol )
469518 if err != nil {
470- return errors .WithStack (err )
519+ return result , errors .WithStack (err )
471520 }
472521 }
473522 statuses := []networkingv1alpha1.PortBindingStatus {}
@@ -485,15 +534,15 @@ LOOP_PORT:
485534 }
486535
487536 if len (allocatedPorts ) == 0 && len (bindings ) == 0 { // 没有新端口分配,也没有多余端口需要删除,直接返回
488- return nil
537+ return result , nil
489538 }
490539 // 将已分配的端口写入 status
491540 if err := r .Status ().Update (ctx , bd .GetObject ()); err != nil {
492541 // 更新状态失败,释放已分配端口
493542 allocatedPorts .Release ()
494- return errors .WithStack (err )
543+ return result , errors .WithStack (err )
495544 }
496- return nil
545+ return result , nil
497546}
498547
499548func portFromPortBindingStatus (status * networkingv1alpha1.PortBindingStatus ) portpool.ProtocolPort {
0 commit comments