@@ -3,24 +3,20 @@ package endpointcontroller
3
3
import (
4
4
"context"
5
5
"fmt"
6
- "strings "
6
+ "reflect "
7
7
8
8
v1 "k8s.io/api/core/v1"
9
9
apierrors "k8s.io/apimachinery/pkg/api/errors"
10
10
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
11
- "k8s.io/apimachinery/pkg/types"
12
11
"k8s.io/client-go/kubernetes"
13
12
"k8s.io/client-go/tools/record"
14
13
"k8s.io/client-go/util/retry"
15
14
"k8s.io/klog/v2"
16
15
controllerruntime "sigs.k8s.io/controller-runtime"
17
- "sigs.k8s.io/controller-runtime/pkg/builder"
18
16
"sigs.k8s.io/controller-runtime/pkg/client"
19
17
"sigs.k8s.io/controller-runtime/pkg/controller"
20
- "sigs.k8s.io/controller-runtime/pkg/event"
21
18
"sigs.k8s.io/controller-runtime/pkg/handler"
22
19
"sigs.k8s.io/controller-runtime/pkg/manager"
23
- "sigs.k8s.io/controller-runtime/pkg/predicate"
24
20
"sigs.k8s.io/controller-runtime/pkg/reconcile"
25
21
"sigs.k8s.io/controller-runtime/pkg/source"
26
22
@@ -30,153 +26,176 @@ import (
30
26
"github.com/kosmos.io/kosmos/pkg/utils"
31
27
)
32
28
29
+ type NodeGetter interface {
30
+ GetAPIServerNodes (client kubernetes.Interface , namespace string ) (* v1.NodeList , error )
31
+ }
32
+
33
+ type RealNodeGetter struct {}
34
+
35
+ func (r * RealNodeGetter ) GetAPIServerNodes (client kubernetes.Interface , namespace string ) (* v1.NodeList , error ) {
36
+ return util .GetAPIServerNodes (client , namespace )
37
+ }
38
+
33
39
type APIServerExternalSyncController struct {
34
40
client.Client
35
41
EventRecorder record.EventRecorder
42
+ KubeClient kubernetes.Interface
43
+ NodeGetter NodeGetter
36
44
}
37
45
38
46
const APIServerExternalSyncControllerName string = "api-server-external-service-sync-controller"
39
47
40
48
func (e * APIServerExternalSyncController ) SetupWithManager (mgr manager.Manager ) error {
41
- skipEvent := func (obj client.Object ) bool {
42
- return strings .Contains (obj .GetName (), "apiserver" ) && obj .GetNamespace () != ""
43
- }
44
-
45
49
return controllerruntime .NewControllerManagedBy (mgr ).
46
50
Named (APIServerExternalSyncControllerName ).
47
51
WithOptions (controller.Options {MaxConcurrentReconciles : 5 }).
48
- For (& v1.Endpoints {},
49
- builder .WithPredicates (predicate.Funcs {
50
- CreateFunc : func (createEvent event.CreateEvent ) bool {
51
- return skipEvent (createEvent .Object )
52
- },
53
- UpdateFunc : func (updateEvent event.UpdateEvent ) bool { return skipEvent (updateEvent .ObjectNew ) },
54
- DeleteFunc : func (deleteEvent event.DeleteEvent ) bool { return false },
55
- })).
56
- Watches (& source.Kind {Type : & v1alpha1.VirtualCluster {}}, handler .EnqueueRequestsFromMapFunc (e .newVirtualClusterMapFunc ())).
52
+ Watches (& source.Kind {Type : & v1.Pod {}}, handler .EnqueueRequestsFromMapFunc (e .newPodMapFunc ())).
57
53
Complete (e )
58
54
}
59
55
60
- func (e * APIServerExternalSyncController ) newVirtualClusterMapFunc () handler.MapFunc {
61
- return func (a client.Object ) []reconcile.Request {
62
- var requests []reconcile.Request
63
- vcluster := a .(* v1alpha1.VirtualCluster )
64
-
65
- // Join the Reconcile queue only if the status of the vcluster is Completed
66
- if vcluster .Status .Phase == v1alpha1 .Completed {
67
- klog .V (4 ).Infof ("api-server-external-sync-controller: virtualcluster change to completed: %s" , vcluster .Name )
68
- // Add the vcluster to the Reconcile queue
69
- requests = append (requests , reconcile.Request {
70
- NamespacedName : types.NamespacedName {
71
- Name : vcluster .Name ,
72
- Namespace : vcluster .Namespace ,
56
+ func (e * APIServerExternalSyncController ) newPodMapFunc () handler.MapFunc {
57
+ return func (obj client.Object ) []reconcile.Request {
58
+ pod , ok := obj .(* v1.Pod )
59
+
60
+ if ! ok {
61
+ klog .Warningf ("Object is not a Pod, skipping: %v" , obj )
62
+ return nil
63
+ }
64
+
65
+ // If the pod contains the specified label virtualCluster-app=apiserver,it indicates that it belongs to vc.
66
+ if val , exists := pod .Labels [constants .Label ]; exists && val == constants .LabelValue {
67
+ return []reconcile.Request {
68
+ {
69
+ NamespacedName : client.ObjectKey {
70
+ Name : pod .Name ,
71
+ Namespace : pod .Namespace ,
72
+ },
73
73
},
74
- })
74
+ }
75
75
}
76
- return requests
76
+
77
+ return nil
77
78
}
78
79
}
79
80
80
- func (e * APIServerExternalSyncController ) SyncAPIServerExternalEPS (ctx context.Context , k8sClient kubernetes.Interface ) error {
81
- kubeEndpoints , err := k8sClient .CoreV1 ().Endpoints (constants .DefaultNs ).Get (ctx , "kubernetes" , metav1.GetOptions {})
82
- if err != nil {
83
- klog .Errorf ("Error getting endpoints: %v" , err )
84
- return err
85
- }
86
- klog .V (4 ).Infof ("Endpoints for service 'kubernetes': %v" , kubeEndpoints )
87
- for _ , subset := range kubeEndpoints .Subsets {
88
- for _ , address := range subset .Addresses {
89
- klog .V (4 ).Infof ("IP: %s" , address .IP )
90
- }
81
+ func (e * APIServerExternalSyncController ) SyncAPIServerExternalEndpoints (ctx context.Context , k8sClient kubernetes.Interface , vc * v1alpha1.VirtualCluster ) error {
82
+ if e .NodeGetter == nil {
83
+ return fmt .Errorf ("NodeGetter is nil" )
91
84
}
92
85
93
- if len (kubeEndpoints .Subsets ) != 1 {
94
- return fmt .Errorf ("eps %s Subsets length is not 1" , "kubernetes" )
86
+ nodes , err := e .NodeGetter .GetAPIServerNodes (e .KubeClient , vc .Namespace )
87
+ if err != nil {
88
+ return fmt .Errorf ("failed to get API server nodes: %w" , err )
95
89
}
96
90
97
- if kubeEndpoints .Subsets [0 ].Addresses == nil || len (kubeEndpoints .Subsets [0 ].Addresses ) == 0 {
98
- klog .Errorf ("eps %s Addresses length is nil" , "kubernetes" )
99
- return err
91
+ if len (nodes .Items ) == 0 {
92
+ return fmt .Errorf ("no API server nodes found in the cluster" )
100
93
}
101
94
102
- apiServerExternalEndpoints , err := k8sClient .CoreV1 ().Endpoints (constants .DefaultNs ).Get (ctx , constants .APIServerExternalService , metav1.GetOptions {})
103
- if err != nil && ! apierrors .IsNotFound (err ) {
104
- klog .Errorf ("failed to get endpoints for %s : %v" , constants .APIServerExternalService , err )
105
- return err
95
+ var addresses []v1.EndpointAddress
96
+ for _ , node := range nodes .Items {
97
+ for _ , address := range node .Status .Addresses {
98
+ if address .Type == v1 .NodeInternalIP {
99
+ addresses = append (addresses , v1.EndpointAddress {
100
+ IP : address .Address ,
101
+ })
102
+ }
103
+ }
106
104
}
107
105
108
- updateEPS := apiServerExternalEndpoints .DeepCopy ()
109
-
110
- if apiServerExternalEndpoints != nil {
111
- klog .V (4 ).Infof ("apiServerExternalEndpoints: %v" , apiServerExternalEndpoints )
112
- } else {
113
- klog .V (4 ).Info ("apiServerExternalEndpoints is nil" )
106
+ if len (addresses ) == 0 {
107
+ return fmt .Errorf ("no internal IP addresses found for the API server nodes" )
114
108
}
115
109
116
- if updateEPS != nil {
117
- klog .V (4 ).Infof ("updateEPS: %v" , updateEPS )
118
- } else {
119
- klog .V (4 ).Info ("updateEPS is nil" )
110
+ apiServerPort , ok := vc .Status .PortMap [constants .APIServerPortKey ]
111
+ if ! ok {
112
+ return fmt .Errorf ("failed to get API server port from VirtualCluster status" )
120
113
}
114
+ klog .V (4 ).Infof ("API server port: %d" , apiServerPort )
121
115
122
- if len (updateEPS .Subsets ) == 1 && len (updateEPS .Subsets [0 ].Addresses ) == 1 {
123
- ip := kubeEndpoints .Subsets [0 ].Addresses [0 ].IP
124
- klog .V (4 ).Infof ("IP address: %s" , ip )
125
- updateEPS .Subsets [0 ].Addresses [0 ].IP = ip
116
+ newEndpoint := & v1.Endpoints {
117
+ ObjectMeta : metav1.ObjectMeta {
118
+ Name : constants .APIServerExternalService ,
119
+ Namespace : constants .KosmosNs ,
120
+ },
121
+ Subsets : []v1.EndpointSubset {
122
+ {
123
+ Addresses : addresses ,
124
+ Ports : []v1.EndpointPort {
125
+ {
126
+ Name : "https" ,
127
+ Port : apiServerPort ,
128
+ Protocol : v1 .ProtocolTCP ,
129
+ },
130
+ },
131
+ },
132
+ },
133
+ }
126
134
127
- if _ , err := k8sClient .CoreV1 ().Endpoints (constants .DefaultNs ).Update (ctx , updateEPS , metav1.UpdateOptions {}); err != nil {
128
- klog .Errorf ("failed to update endpoints for api-server-external-service: %v" , err )
129
- return err
135
+ //avoid unnecessary updates
136
+ return retry .RetryOnConflict (retry .DefaultRetry , func () error {
137
+ currentEndpoint , err := k8sClient .CoreV1 ().Endpoints (constants .KosmosNs ).Get (ctx , constants .APIServerExternalService , metav1.GetOptions {})
138
+ if apierrors .IsNotFound (err ) {
139
+ _ , err := k8sClient .CoreV1 ().Endpoints (constants .KosmosNs ).Create (ctx , newEndpoint , metav1.CreateOptions {})
140
+ if err != nil {
141
+ return fmt .Errorf ("failed to create api-server-external-service endpoint: %w" , err )
142
+ }
143
+ klog .V (4 ).Info ("Created api-server-external-service Endpoint" )
144
+ return nil
145
+ } else if err != nil {
146
+ return fmt .Errorf ("failed to get existing api-server-external-service endpoint: %w" , err )
130
147
}
131
- } else {
132
- klog .ErrorS (err , "Unexpected format of endpoints for api-server-external-service" , "endpoint_data" , updateEPS )
133
- return err
134
- }
135
148
136
- return nil
149
+ // determine if an update is needed
150
+ if ! reflect .DeepEqual (currentEndpoint .Subsets , newEndpoint .Subsets ) {
151
+ _ , err := k8sClient .CoreV1 ().Endpoints (constants .KosmosNs ).Update (ctx , newEndpoint , metav1.UpdateOptions {})
152
+ if err != nil {
153
+ return fmt .Errorf ("failed to update api-server-external-service endpoint: %w" , err )
154
+ }
155
+ klog .V (4 ).Info ("Updated api-server-external-service Endpoint" )
156
+ } else {
157
+ klog .V (4 ).Info ("No changes detected in Endpoint, skipping update" )
158
+ }
159
+ return nil
160
+ })
137
161
}
138
162
139
163
func (e * APIServerExternalSyncController ) Reconcile (ctx context.Context , request reconcile.Request ) (reconcile.Result , error ) {
140
164
klog .V (4 ).Infof ("============ %s start to reconcile %s ============" , APIServerExternalSyncControllerName , request .NamespacedName )
141
165
defer klog .V (4 ).Infof ("============ %s finish to reconcile %s ============" , APIServerExternalSyncControllerName , request .NamespacedName )
142
166
143
- var virtualClusterList v1alpha1.VirtualClusterList
144
- if err := e .List (ctx , & virtualClusterList ); err != nil {
145
- if apierrors .IsNotFound (err ) {
146
- return reconcile.Result {}, nil
147
- }
148
- klog .V (4 ).Infof ("query virtualcluster failed: %v" , err )
167
+ var vcList v1alpha1.VirtualClusterList
168
+ if err := e .List (ctx , & vcList , client .InNamespace (request .NamespacedName .Namespace )); err != nil {
169
+ klog .Errorf ("Failed to list VirtualClusters in namespace %s: %v" , request .NamespacedName .Namespace , err )
149
170
return reconcile.Result {RequeueAfter : utils .DefaultRequeueTime }, nil
150
171
}
151
- var targetVirtualCluster v1alpha1.VirtualCluster
152
- hasVirtualCluster := false
153
- for _ , vc := range virtualClusterList .Items {
154
- if vc .Namespace == request .Namespace {
155
- targetVirtualCluster = vc
156
- klog .V (4 ).Infof ("virtualcluster %s found" , targetVirtualCluster .Name )
157
- hasVirtualCluster = true
158
- break
159
- }
160
- }
161
- if ! hasVirtualCluster {
162
- klog .V (4 ).Infof ("virtualcluster %s not found" , request .Namespace )
172
+
173
+ if len (vcList .Items ) == 0 {
174
+ klog .V (4 ).Infof ("No VirtualCluster found in namespace %s" , request .NamespacedName .Namespace )
163
175
return reconcile.Result {}, nil
164
176
}
165
177
166
- if targetVirtualCluster .Status .Phase != v1alpha1 .Completed {
178
+ // A namespace should correspond to only one virtual cluster (vc). If it corresponds to multiple vcs, it indicates an error.
179
+ if len (vcList .Items ) > 1 {
180
+ klog .Errorf ("Multiple VirtualClusters found in namespace %s, expected only one" , request .NamespacedName .Namespace )
181
+ return reconcile.Result {RequeueAfter : utils .DefaultRequeueTime }, nil
182
+ }
183
+
184
+ vc := vcList .Items [0 ]
185
+
186
+ if vc .Status .Phase != v1alpha1 .Completed {
187
+ klog .V (4 ).Infof ("VirtualCluster %s is not in Completed phase" , vc .Name )
167
188
return reconcile.Result {}, nil
168
189
}
169
190
170
- k8sClient , err := util .GenerateKubeclient (& targetVirtualCluster )
191
+ k8sClient , err := util .GenerateKubeclient (& vc )
171
192
if err != nil {
172
- klog .Errorf ("virtualcluster %s crd kubernetes client failed : %v" , targetVirtualCluster .Name , err )
193
+ klog .Errorf ("Failed to generate Kubernetes client for VirtualCluster %s : %v" , vc .Name , err )
173
194
return reconcile.Result {}, nil
174
195
}
175
196
176
- if err := retry .RetryOnConflict (retry .DefaultRetry , func () error {
177
- return e .SyncAPIServerExternalEPS (ctx , k8sClient )
178
- }); err != nil {
179
- klog .Errorf ("virtualcluster %s sync apiserver external endpoints failed: %v" , targetVirtualCluster .Name , err )
197
+ if err := e .SyncAPIServerExternalEndpoints (ctx , k8sClient , & vc ); err != nil {
198
+ klog .Errorf ("Failed to sync apiserver external Endpoints: %v" , err )
180
199
return reconcile.Result {RequeueAfter : utils .DefaultRequeueTime }, nil
181
200
}
182
201
0 commit comments