From 617bbb040dc3b3951dbc5124a03263b60a61dcff Mon Sep 17 00:00:00 2001 From: Lei Liu Date: Wed, 25 Jun 2025 19:13:20 +0800 Subject: [PATCH] [yurthub] enhancements and fixes for function cancellation * remove a bunch of unused `stopCh` variable in functions * stop bind lifecycle variable(stopCh) to manager type struct * prefer use context to control long-runing functions' cacellation * fix some never closed stopCh * align the context of test target to `testing.T` in test code Signed-off-by: Lei Liu --- cmd/yurthub/app/config/config.go | 8 +++--- cmd/yurthub/app/config/config_test.go | 2 +- cmd/yurthub/app/start.go | 25 ++++++++++--------- pkg/yurthub/cachemanager/cache_manager.go | 8 +++--- .../cachemanager/cache_manager_test.go | 6 ++--- pkg/yurthub/configuration/manager_test.go | 6 ++--- pkg/yurthub/filter/approver/approver.go | 2 -- pkg/yurthub/filter/base/base_test.go | 8 +++--- .../filter/discardcloudservice/filter.go | 2 +- .../filter/discardcloudservice/filter_test.go | 3 +-- .../filter/forwardkubesvctraffic/filter.go | 2 +- .../forwardkubesvctraffic/filter_test.go | 3 +-- pkg/yurthub/filter/inclusterconfig/filter.go | 2 +- .../filter/inclusterconfig/filter_test.go | 3 +-- .../filter/initializer/initializer_test.go | 2 +- .../initializer/node_initializer_test.go | 2 +- pkg/yurthub/filter/interfaces.go | 4 +-- pkg/yurthub/filter/masterservice/filter.go | 2 +- .../filter/masterservice/filter_test.go | 3 +-- .../filter/nodeportisolation/filter.go | 2 +- .../filter/nodeportisolation/filter_test.go | 4 +-- pkg/yurthub/filter/objectfilter/chain.go | 4 +-- pkg/yurthub/filter/responsefilter/filter.go | 17 ++++++------- .../filter/responsefilter/filter_test.go | 12 ++++----- .../filter/serviceenvupdater/filter.go | 2 +- .../filter/serviceenvupdater/filter_test.go | 5 ++-- pkg/yurthub/filter/servicetopology/filter.go | 2 +- .../filter/servicetopology/filter_test.go | 3 +-- pkg/yurthub/gc/gc.go | 10 +++----- .../cloudapiserver/health_checker.go | 9 ++++--- .../cloudapiserver/health_checker_test.go | 7 +++--- .../healthchecker/leaderhub/leader_hub.go | 5 ++-- .../leaderhub/leader_hub_test.go | 4 +-- pkg/yurthub/multiplexer/filterstore.go | 6 ++--- pkg/yurthub/multiplexer/filterwatch.go | 2 +- pkg/yurthub/network/network.go | 5 ++-- pkg/yurthub/otaupdate/ota.go | 2 +- pkg/yurthub/otaupdate/ota_test.go | 3 ++- pkg/yurthub/proxy/autonomy/autonomy.go | 4 +-- pkg/yurthub/proxy/local/local.go | 5 ++-- .../proxy/multiplexer/multiplexerproxy.go | 4 +-- .../multiplexer/multiplexerproxy_test.go | 15 +++++------ .../testing/fake_endpointslicesfilter.go | 2 +- .../proxy/nonresourcerequest/nonresource.go | 2 +- .../nonresourcerequest/nonresource_test.go | 3 ++- pkg/yurthub/proxy/proxy.go | 8 +++--- pkg/yurthub/proxy/remote/loadbalancer.go | 19 ++++++-------- pkg/yurthub/proxy/remote/loadbalancer_test.go | 17 ++++++------- pkg/yurthub/proxy/remote/remote.go | 5 +--- pkg/yurthub/proxy/util/util.go | 2 +- pkg/yurthub/proxy/util/util_test.go | 8 +++--- pkg/yurthub/server/server.go | 16 ++++++------ pkg/yurthub/tenant/tenant.go | 13 +++++----- pkg/yurthub/transport/fake_transport.go | 5 +++- pkg/yurthub/transport/transport.go | 18 ++++++------- pkg/yurthub/util/connrotation_test.go | 5 +--- pkg/yurthub/util/dumpstack.go | 5 ++-- pkg/yurthub/util/dumpstack_test.go | 4 +-- 58 files changed, 160 insertions(+), 197 deletions(-) diff --git a/cmd/yurthub/app/config/config.go b/cmd/yurthub/app/config/config.go index 64bfe28c897..177a2bdbfb1 100644 --- a/cmd/yurthub/app/config/config.go +++ b/cmd/yurthub/app/config/config.go @@ -93,7 +93,7 @@ type YurtHubConfiguration struct { DiskCachePath string ConfigManager *configuration.Manager TenantManager tenant.Interface - TransportAndDirectClientManager transport.Interface + TransportAndDirectClientManager transport.TransportManager LoadBalancerForLeaderHub remote.Server PoolScopeResources []schema.GroupVersionResource PortForMultiplexer int @@ -101,7 +101,7 @@ type YurtHubConfiguration struct { } // Complete converts *options.YurtHubOptions to *YurtHubConfiguration -func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHubConfiguration, error) { +func Complete(ctx context.Context, options *options.YurtHubOptions) (*YurtHubConfiguration, error) { cfg := &YurtHubConfiguration{ NodeName: options.NodeName, WorkingMode: util.WorkingMode(options.WorkingMode), @@ -191,11 +191,11 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub us, options.HeartbeatTimeoutSeconds, certMgr, - stopCh, ) if err != nil { return nil, fmt.Errorf("could not new transport manager, %w", err) } + transportAndClientManager.Start(ctx) cfg.SerializerManager = serializer.NewSerializerManager() cfg.RESTMapperManager = restMapperManager @@ -203,7 +203,7 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub cfg.DynamicSharedFactory = dynamicSharedFactory cfg.CertManager = certMgr cfg.TransportAndDirectClientManager = transportAndClientManager - cfg.TenantManager = tenant.New(tenantNamespce, sharedFactory, stopCh) + cfg.TenantManager = tenant.New(tenantNamespce, sharedFactory) // create feature configurations for both cloud and edge working mode as following: // - configuration manager: monitor yurt-hub-cfg configmap and adopting changes dynamically. diff --git a/cmd/yurthub/app/config/config_test.go b/cmd/yurthub/app/config/config_test.go index 52f4a1362ea..7267bf24113 100644 --- a/cmd/yurthub/app/config/config_test.go +++ b/cmd/yurthub/app/config/config_test.go @@ -39,7 +39,7 @@ func TestComplete(t *testing.T) { options.EnableDummyIf = false options.HubAgentDummyIfIP = "169.254.2.1" options.NodeIP = "127.0.0.1" - cfg, err := Complete(options, nil) + cfg, err := Complete(t.Context(), options) if err != nil { t.Errorf("expect no err, but got %v", err) } else if cfg == nil { diff --git a/cmd/yurthub/app/start.go b/cmd/yurthub/app/start.go index 23f8065fea3..8274326c1f5 100644 --- a/cmd/yurthub/app/start.go +++ b/cmd/yurthub/app/start.go @@ -70,13 +70,13 @@ func NewCmdStartYurtHub(ctx context.Context) *cobra.Command { klog.Fatalf("validate options: %v", err) } - yurtHubCfg, err := config.Complete(yurtHubOptions, ctx.Done()) + yurtHubCfg, err := config.Complete(ctx, yurtHubOptions) if err != nil { klog.Fatalf("complete %s configuration error, %v", projectinfo.GetHubName(), err) } klog.Infof("%s cfg: %#+v", projectinfo.GetHubName(), yurtHubCfg) - util.SetupDumpStackTrap(yurtHubOptions.RootDir, ctx.Done()) + util.SetupDumpStackTrap(ctx, yurtHubOptions.RootDir) klog.Infof("start watch SIGUSR1 signal") if err := Run(ctx, yurtHubCfg); err != nil { @@ -130,36 +130,38 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { trace++ klog.Infof("%d. create health checkers for remote servers", trace) - cloudHealthChecker, err = cloudapiserver.NewCloudAPIServerHealthChecker(cfg, ctx.Done()) + cloudHealthChecker, err = cloudapiserver.NewCloudAPIServerHealthChecker(ctx, cfg) if err != nil { return fmt.Errorf("could not new health checker for cloud kube-apiserver, %w", err) } trace++ klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency) - gcMgr, err := gc.NewGCManager(cfg, cloudHealthChecker, ctx.Done()) + gcMgr, err := gc.NewGCManager(cfg, cloudHealthChecker) if err != nil { return fmt.Errorf("could not new gc manager, %w", err) } - gcMgr.Run() + gcMgr.Run(ctx) trace++ } // no leader hub servers for transport manager at startup time. // and don't filter response of request for pool scope metadata from leader hub. - transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, 2, cfg.CertManager, ctx.Done()) + transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, 2, cfg.CertManager) if err != nil { return fmt.Errorf("could not new transport manager for leader hub, %w", err) } - healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(20*time.Second, nil, ctx.Done()) - loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, cacheManager, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil, ctx.Done()) + transportManagerForLeaderHub.Start(ctx) + + healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(ctx, 20*time.Second, nil) + loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, cacheManager, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil) cfg.LoadBalancerForLeaderHub = loadBalancerForLeaderHub requestMultiplexerManager := newRequestMultiplexerManager(cfg, healthCheckerForLeaderHub) if cfg.NetworkMgr != nil { klog.Infof("%d. start network manager for ensuing dummy interface", trace) - cfg.NetworkMgr.Run(ctx.Done()) + cfg.NetworkMgr.Run(ctx) trace++ } @@ -173,15 +175,14 @@ func Run(ctx context.Context, cfg *config.YurtHubConfiguration) error { cfg, cacheManager, cloudHealthChecker, - requestMultiplexerManager, - ctx.Done()) + requestMultiplexerManager) if err != nil { return fmt.Errorf("could not create reverse proxy handler, %w", err) } trace++ klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName()) - if err := server.RunYurtHubServers(cfg, yurtProxyHandler, cloudHealthChecker, ctx.Done()); err != nil { + if err := server.RunYurtHubServers(ctx, cfg, yurtProxyHandler, cloudHealthChecker); err != nil { return fmt.Errorf("could not run hub servers, %w", err) } default: diff --git a/pkg/yurthub/cachemanager/cache_manager.go b/pkg/yurthub/cachemanager/cache_manager.go index 2a0253e0157..c7af29d1759 100644 --- a/pkg/yurthub/cachemanager/cache_manager.go +++ b/pkg/yurthub/cachemanager/cache_manager.go @@ -60,7 +60,7 @@ var ( // CacheManager is an adaptor to cache runtime object data into backend storage type CacheManager interface { - CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error + CacheResponse(req *http.Request, prc io.ReadCloser) error QueryCache(req *http.Request) (runtime.Object, error) CanCacheFor(req *http.Request) bool DeleteKindFor(gvr schema.GroupVersionResource) error @@ -109,11 +109,11 @@ func (cm *cacheManager) QueryCacheResult() CacheResult { } // CacheResponse cache response of request into backend storage -func (cm *cacheManager) CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error { +func (cm *cacheManager) CacheResponse(req *http.Request, prc io.ReadCloser) error { ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) if isWatch(ctx) { - return cm.saveWatchObject(ctx, info, prc, stopCh) + return cm.saveWatchObject(ctx, info, prc) } var buf bytes.Buffer @@ -359,7 +359,7 @@ func generateEmptyListObjOfGVK(listGvk schema.GroupVersionKind) (runtime.Object, return listObj, nil } -func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.RequestInfo, r io.ReadCloser, _ <-chan struct{}) error { +func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.RequestInfo, r io.ReadCloser) error { delObjCnt := 0 updateObjCnt := 0 addObjCnt := 0 diff --git a/pkg/yurthub/cachemanager/cache_manager_test.go b/pkg/yurthub/cachemanager/cache_manager_test.go index bbcfeadf4aa..30a980482af 100644 --- a/pkg/yurthub/cachemanager/cache_manager_test.go +++ b/pkg/yurthub/cachemanager/cache_manager_test.go @@ -503,7 +503,7 @@ func TestCacheGetResponse(t *testing.T) { } } prc := io.NopCloser(buf) - cacheErr = yurtCM.CacheResponse(req, prc, nil) + cacheErr = yurtCM.CacheResponse(req, prc) }) handler = proxyutil.WithRequestContentType(handler) @@ -953,7 +953,7 @@ func TestCacheWatchResponse(t *testing.T) { pw.Close() }(pw) rc := io.NopCloser(pr) - err = yurtCM.CacheResponse(req, rc, nil) + err = yurtCM.CacheResponse(req, rc) }) handler = proxyutil.WithRequestContentType(handler) @@ -1544,7 +1544,7 @@ func TestCacheListResponse(t *testing.T) { } prc := io.NopCloser(buf) // call cache response - cacheErr = yurtCM.CacheResponse(req, prc, nil) + cacheErr = yurtCM.CacheResponse(req, prc) }) handler = proxyutil.WithRequestContentType(handler) diff --git a/pkg/yurthub/configuration/manager_test.go b/pkg/yurthub/configuration/manager_test.go index 85f138be62e..c332c2c0260 100644 --- a/pkg/yurthub/configuration/manager_test.go +++ b/pkg/yurthub/configuration/manager_test.go @@ -184,11 +184,9 @@ func TestManager(t *testing.T) { informerfactory := informers.NewSharedInformerFactory(client, 0) manager := NewConfigurationManager(tc.nodeName, informerfactory) - stopCh := make(chan struct{}) - informerfactory.Start(stopCh) - defer close(stopCh) + informerfactory.Start(t.Context().Done()) - if ok := cache.WaitForCacheSync(stopCh, manager.HasSynced); !ok { + if ok := cache.WaitForCacheSync(t.Context().Done(), manager.HasSynced); !ok { t.Errorf("configuration manager is not ready") return } diff --git a/pkg/yurthub/filter/approver/approver.go b/pkg/yurthub/filter/approver/approver.go index bea39ef6462..e9f7282d1c5 100644 --- a/pkg/yurthub/filter/approver/approver.go +++ b/pkg/yurthub/filter/approver/approver.go @@ -30,14 +30,12 @@ import ( type approver struct { skipRequestUserAgentList sets.Set[string] configManager *configuration.Manager - stopCh chan struct{} } func NewApprover(nodeName string, configManager *configuration.Manager) filter.Approver { na := &approver{ skipRequestUserAgentList: sets.New[string](projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+nodeName), configManager: configManager, - stopCh: make(chan struct{}), } return na } diff --git a/pkg/yurthub/filter/base/base_test.go b/pkg/yurthub/filter/base/base_test.go index ee5a6fd9ad4..2b4ce959743 100644 --- a/pkg/yurthub/filter/base/base_test.go +++ b/pkg/yurthub/filter/base/base_test.go @@ -46,12 +46,12 @@ func (noh *nopObjectHandler) SupportedResourceAndVerbs() map[string]sets.Set[str return map[string]sets.Set[string]{} } -func (noh *nopObjectHandler) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (noh *nopObjectHandler) Filter(obj runtime.Object) runtime.Object { return obj } var ( - nodesNameErr = errors.New("nodes name error") + errNodesName = errors.New("nodes name error") ) type nopNodesErrHandler struct { @@ -61,7 +61,7 @@ type nopNodesErrHandler struct { func NewNopNodesErrHandler() filter.ObjectFilter { return &nopNodesErrHandler{ - err: nodesNameErr, + err: errNodesName, } } @@ -193,7 +193,7 @@ func TestInitializers(t *testing.T) { }, "initialize error": { filter: NewNopNodesErrHandler(), - resultErr: nodesNameErr, + resultErr: errNodesName, }, } diff --git a/pkg/yurthub/filter/discardcloudservice/filter.go b/pkg/yurthub/filter/discardcloudservice/filter.go index fc75377496f..78ae91232ca 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter.go +++ b/pkg/yurthub/filter/discardcloudservice/filter.go @@ -61,7 +61,7 @@ func (sf *discardCloudServiceFilter) Name() string { return FilterName } -func (sf *discardCloudServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { +func (sf *discardCloudServiceFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *v1.Service: return discardCloudService(v) diff --git a/pkg/yurthub/filter/discardcloudservice/filter_test.go b/pkg/yurthub/filter/discardcloudservice/filter_test.go index 38bccf891b5..68f9af7e855 100644 --- a/pkg/yurthub/filter/discardcloudservice/filter_test.go +++ b/pkg/yurthub/filter/discardcloudservice/filter_test.go @@ -139,12 +139,11 @@ func TestFilter(t *testing.T) { }, } - stopCh := make(<-chan struct{}) for k, tt := range testcases { t.Run(k, func(t *testing.T) { dcsf, _ := NewDiscardCloudServiceFilter() - newObj := dcsf.Filter(tt.responseObj, stopCh) + newObj := dcsf.Filter(tt.responseObj) if tt.expectObj == nil { if !util.IsNil(newObj) { t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) diff --git a/pkg/yurthub/filter/forwardkubesvctraffic/filter.go b/pkg/yurthub/filter/forwardkubesvctraffic/filter.go index 8120493176e..b85109b8e21 100644 --- a/pkg/yurthub/filter/forwardkubesvctraffic/filter.go +++ b/pkg/yurthub/filter/forwardkubesvctraffic/filter.go @@ -77,7 +77,7 @@ func (fkst *forwardKubeSVCTrafficFilter) SetMasterServicePort(portStr string) er return nil } -func (fkst *forwardKubeSVCTrafficFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (fkst *forwardKubeSVCTrafficFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *discovery.EndpointSlice: fkst.mutateDefaultKubernetesEps(v) diff --git a/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go b/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go index 3b2c5e8b47a..d3a16b63da3 100644 --- a/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go +++ b/pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go @@ -213,13 +213,12 @@ func TestFilter(t *testing.T) { }, } - stopCh := make(<-chan struct{}) for k, tt := range testcases { t.Run(k, func(t *testing.T) { fkst, _ := NewForwardKubeSVCTrafficFilter() fkst.SetMasterServiceHost(tt.host) fkst.SetMasterServicePort(tt.port) - newObj := fkst.Filter(tt.responseObject, stopCh) + newObj := fkst.Filter(tt.responseObject) if tt.expectObject == nil { if !util.IsNil(newObj) { t.Errorf("Filter expect nil obj, but got %v", newObj) diff --git a/pkg/yurthub/filter/inclusterconfig/filter.go b/pkg/yurthub/filter/inclusterconfig/filter.go index 5396964aa55..f85c99e6db8 100644 --- a/pkg/yurthub/filter/inclusterconfig/filter.go +++ b/pkg/yurthub/filter/inclusterconfig/filter.go @@ -56,7 +56,7 @@ func (iccf *inClusterConfigFilter) Name() string { return FilterName } -func (iccf *inClusterConfigFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { +func (iccf *inClusterConfigFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *v1.ConfigMap: return mutateKubeProxyConfigMap(v) diff --git a/pkg/yurthub/filter/inclusterconfig/filter_test.go b/pkg/yurthub/filter/inclusterconfig/filter_test.go index 4f7a2d7e00c..84f36c23ecc 100644 --- a/pkg/yurthub/filter/inclusterconfig/filter_test.go +++ b/pkg/yurthub/filter/inclusterconfig/filter_test.go @@ -130,10 +130,9 @@ func TestRuntimeObjectFilter(t *testing.T) { }, } - stopCh := make(<-chan struct{}) for k, tc := range testcases { t.Run(k, func(t *testing.T) { - newObj := iccf.Filter(tc.responseObject, stopCh) + newObj := iccf.Filter(tc.responseObject) if tc.expectObject == nil { if !util.IsNil(newObj) { t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) diff --git a/pkg/yurthub/filter/initializer/initializer_test.go b/pkg/yurthub/filter/initializer/initializer_test.go index 4671605b391..8f768f7278c 100644 --- a/pkg/yurthub/filter/initializer/initializer_test.go +++ b/pkg/yurthub/filter/initializer/initializer_test.go @@ -134,7 +134,7 @@ func (bef *baseErrFilter) SupportedResourceAndVerbs() map[string]sets.Set[string return map[string]sets.Set[string]{} } -func (bef *baseErrFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { +func (bef *baseErrFilter) Filter(obj runtime.Object) runtime.Object { return obj } diff --git a/pkg/yurthub/filter/initializer/node_initializer_test.go b/pkg/yurthub/filter/initializer/node_initializer_test.go index 93ed041229d..e92d57287e7 100644 --- a/pkg/yurthub/filter/initializer/node_initializer_test.go +++ b/pkg/yurthub/filter/initializer/node_initializer_test.go @@ -49,7 +49,7 @@ func (nop *nopNodeHandler) SupportedResourceAndVerbs() map[string]sets.Set[strin return map[string]sets.Set[string]{} } -func (nop *nopNodeHandler) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (nop *nopNodeHandler) Filter(obj runtime.Object) runtime.Object { return obj } diff --git a/pkg/yurthub/filter/interfaces.go b/pkg/yurthub/filter/interfaces.go index cc55bd9811c..ac8c5c91aad 100644 --- a/pkg/yurthub/filter/interfaces.go +++ b/pkg/yurthub/filter/interfaces.go @@ -42,7 +42,7 @@ type Approver interface { type ResponseFilter interface { Name() string // Filter is used to filter data returned from the cloud. - Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) + Filter(req *http.Request, rc io.ReadCloser) (int, io.ReadCloser, error) } // ObjectFilter is used for filtering runtime object. @@ -52,7 +52,7 @@ type ObjectFilter interface { Name() string // Filter is used for filtering runtime object // all filter logic should be located in it. - Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object + Filter(obj runtime.Object) runtime.Object } type FilterFinder interface { diff --git a/pkg/yurthub/filter/masterservice/filter.go b/pkg/yurthub/filter/masterservice/filter.go index 4753ddb03e4..4a7333ec91e 100644 --- a/pkg/yurthub/filter/masterservice/filter.go +++ b/pkg/yurthub/filter/masterservice/filter.go @@ -71,7 +71,7 @@ func (msf *masterServiceFilter) SetMasterServicePort(portStr string) error { return nil } -func (msf *masterServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { +func (msf *masterServiceFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *v1.Service: msf.mutateMasterService(v) diff --git a/pkg/yurthub/filter/masterservice/filter_test.go b/pkg/yurthub/filter/masterservice/filter_test.go index a7c8bf5e9d0..01de119b940 100644 --- a/pkg/yurthub/filter/masterservice/filter_test.go +++ b/pkg/yurthub/filter/masterservice/filter_test.go @@ -161,13 +161,12 @@ func TestFilter(t *testing.T) { }, } - stopCh := make(<-chan struct{}) for k, tt := range testcases { t.Run(k, func(t *testing.T) { msf := &masterServiceFilter{} msf.SetMasterServiceHost(masterServiceHost) msf.SetMasterServicePort(masterServicePortStr) - newObj := msf.Filter(tt.responseObject, stopCh) + newObj := msf.Filter(tt.responseObject) if tt.expectObject == nil { if !util.IsNil(newObj) { t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) diff --git a/pkg/yurthub/filter/nodeportisolation/filter.go b/pkg/yurthub/filter/nodeportisolation/filter.go index 54b73f3df2f..365e83b58e6 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter.go +++ b/pkg/yurthub/filter/nodeportisolation/filter.go @@ -77,7 +77,7 @@ func (nif *nodePortIsolationFilter) SetKubeClient(client kubernetes.Interface) e return nil } -func (nif *nodePortIsolationFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (nif *nodePortIsolationFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *v1.Service: return nif.isolateNodePortService(v) diff --git a/pkg/yurthub/filter/nodeportisolation/filter_test.go b/pkg/yurthub/filter/nodeportisolation/filter_test.go index cb574d856ac..f7b75fb4a5a 100644 --- a/pkg/yurthub/filter/nodeportisolation/filter_test.go +++ b/pkg/yurthub/filter/nodeportisolation/filter_test.go @@ -356,8 +356,6 @@ func TestFilter(t *testing.T) { }, } - stopCh := make(<-chan struct{}) - for k, tc := range testcases { t.Run(k, func(t *testing.T) { nif := &nodePortIsolationFilter{} @@ -371,7 +369,7 @@ func TestFilter(t *testing.T) { nif.client = client } - newObj := nif.Filter(tc.responseObj, stopCh) + newObj := nif.Filter(tc.responseObj) if tc.expectObj == nil { if !util.IsNil(newObj) { t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj) diff --git a/pkg/yurthub/filter/objectfilter/chain.go b/pkg/yurthub/filter/objectfilter/chain.go index 4f070a45517..cf2b74b0b33 100644 --- a/pkg/yurthub/filter/objectfilter/chain.go +++ b/pkg/yurthub/filter/objectfilter/chain.go @@ -47,9 +47,9 @@ func (chain filterChain) SupportedResourceAndVerbs() map[string]sets.Set[string] return map[string]sets.Set[string]{} } -func (chain filterChain) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (chain filterChain) Filter(obj runtime.Object) runtime.Object { for i := range chain { - obj = chain[i].Filter(obj, stopCh) + obj = chain[i].Filter(obj) if yurtutil.IsNil(obj) { break } diff --git a/pkg/yurthub/filter/responsefilter/filter.go b/pkg/yurthub/filter/responsefilter/filter.go index 70a5a50cb8d..fac9c9f017d 100644 --- a/pkg/yurthub/filter/responsefilter/filter.go +++ b/pkg/yurthub/filter/responsefilter/filter.go @@ -45,7 +45,6 @@ type filterReadCloser struct { isWatch bool isList bool ownerName string - stopCh <-chan struct{} } // newFilterReadCloser create an filterReadCloser object @@ -54,8 +53,7 @@ func newFilterReadCloser( sm *serializer.SerializerManager, rc io.ReadCloser, objectFilter filter.ObjectFilter, - ownerName string, - stopCh <-chan struct{}) (int, io.ReadCloser, error) { + ownerName string) (int, io.ReadCloser, error) { ctx := req.Context() info, _ := apirequest.RequestInfoFrom(ctx) respContentType, _ := util.RespContentTypeFrom(ctx) @@ -74,7 +72,6 @@ func newFilterReadCloser( isWatch: info.Verb == "watch", isList: info.Verb == "list", ownerName: ownerName, - stopCh: stopCh, } if frc.isWatch { @@ -134,11 +131,11 @@ func (frc *filterReadCloser) objectResponseFilter(rc io.ReadCloser) (*bytes.Buff if frc.isList { items, err := meta.ExtractList(obj) if err != nil || len(items) == 0 { - obj = frc.objectFilter.Filter(obj, frc.stopCh) + obj = frc.objectFilter.Filter(obj) } else { list := make([]runtime.Object, 0) for i := range items { - newObj := frc.objectFilter.Filter(items[i], frc.stopCh) + newObj := frc.objectFilter.Filter(items[i]) if !yurtutil.IsNil(newObj) { list = append(list, newObj) } @@ -149,7 +146,7 @@ func (frc *filterReadCloser) objectResponseFilter(rc io.ReadCloser) (*bytes.Buff } } } else { - obj = frc.objectFilter.Filter(obj, frc.stopCh) + obj = frc.objectFilter.Filter(obj) } if yurtutil.IsNil(obj) { klog.Warningf("filter %s doesn't work correctly, response is discarded completely in list request.", frc.ownerName) @@ -178,7 +175,7 @@ func (frc *filterReadCloser) streamResponseFilter(rc io.ReadCloser, ch chan *byt newObj := obj // BOOKMARK and ERROR response are unnecessary to filter if !(watchType == watch.Bookmark || watchType == watch.Error) { - if newObj = frc.objectFilter.Filter(obj, frc.stopCh); yurtutil.IsNil(newObj) { + if newObj = frc.objectFilter.Filter(obj); yurtutil.IsNil(newObj) { // if an object is removed in the filter chain, it means that this object is not needed // to return back to clients(like kube-proxy). but in order to update the client's local cache, // it's a good idea to return a watch.Deleted event to clients and make clients to remove this object in local cache. @@ -226,6 +223,6 @@ func (rf *responseFilter) Name() string { return rf.objectFilter.Name() } -func (rf *responseFilter) Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error) { - return newFilterReadCloser(req, rf.serializerManager, rc, rf.objectFilter, rf.objectFilter.Name(), stopCh) +func (rf *responseFilter) Filter(req *http.Request, rc io.ReadCloser) (int, io.ReadCloser, error) { + return newFilterReadCloser(req, rf.serializerManager, rc, rf.objectFilter, rf.objectFilter.Name()) } diff --git a/pkg/yurthub/filter/responsefilter/filter_test.go b/pkg/yurthub/filter/responsefilter/filter_test.go index efa860dbcde..cde842aa1c8 100644 --- a/pkg/yurthub/filter/responsefilter/filter_test.go +++ b/pkg/yurthub/filter/responsefilter/filter_test.go @@ -77,7 +77,7 @@ func (noh *nopObjectHandler) SupportedResourceAndVerbs() map[string]sets.Set[str return map[string]sets.Set[string]{} } -func (noh *nopObjectHandler) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (noh *nopObjectHandler) Filter(obj runtime.Object) runtime.Object { return obj } @@ -85,7 +85,6 @@ func TestFilterReadCloser_Read_List(t *testing.T) { resolver := newTestRequestInfoResolver() sm := serializer.NewSerializerManager() handler := &nopObjectHandler{} - stopCh := make(chan struct{}) testcases := map[string]struct { path string @@ -229,7 +228,7 @@ func TestFilterReadCloser_Read_List(t *testing.T) { buf := bytes.NewBuffer(listBytes) rc := io.NopCloser(buf) - size, newRc, err := newFilterReadCloser(req, sm, rc, handler, "foo", stopCh) + size, newRc, err := newFilterReadCloser(req, sm, rc, handler, "foo") if err != nil { t.Errorf("failed new filter readcloser, %v", err) } @@ -269,7 +268,6 @@ func TestFilterReadCloser_Read_Watch(t *testing.T) { resolver := newTestRequestInfoResolver() sm := serializer.NewSerializerManager() handler := &nopObjectHandler{} - stopCh := make(chan struct{}) testcases := map[string]struct { path string @@ -367,7 +365,7 @@ func TestFilterReadCloser_Read_Watch(t *testing.T) { } rc := io.NopCloser(&buf) - _, newRc, err := newFilterReadCloser(req, sm, rc, handler, "foo", stopCh) + _, newRc, err := newFilterReadCloser(req, sm, rc, handler, "foo") if err != nil { t.Errorf("failed new filter readcloser, %v", err) } @@ -2671,7 +2669,7 @@ func TestResponseFilterForListRequest(t *testing.T) { ctx = hubutil.WithRespContentType(ctx, tc.accept) req = req.WithContext(ctx) rc := io.NopCloser(buf) - _, newReadCloser, filterErr = responseFilter.Filter(req, rc, nil) + _, newReadCloser, filterErr = responseFilter.Filter(req, rc) }) handler = util.WithRequestClientComponent(handler) @@ -2886,7 +2884,7 @@ func TestResponseFilterForWatchRequest(t *testing.T) { ctx = hubutil.WithRespContentType(ctx, tc.accept) req = req.WithContext(ctx) rc := io.NopCloser(buf) - _, newReadCloser, filterErr = responseFilter.Filter(req, rc, nil) + _, newReadCloser, filterErr = responseFilter.Filter(req, rc) }) handler = util.WithRequestClientComponent(handler) diff --git a/pkg/yurthub/filter/serviceenvupdater/filter.go b/pkg/yurthub/filter/serviceenvupdater/filter.go index b501c9de522..443c45c0f55 100644 --- a/pkg/yurthub/filter/serviceenvupdater/filter.go +++ b/pkg/yurthub/filter/serviceenvupdater/filter.go @@ -63,7 +63,7 @@ func (sef *serviceEnvUpdaterFilter) SetMasterServicePort(port string) error { return nil } -func (sef *serviceEnvUpdaterFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object { +func (sef *serviceEnvUpdaterFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *corev1.Pod: return sef.mutatePodEnv(v) diff --git a/pkg/yurthub/filter/serviceenvupdater/filter_test.go b/pkg/yurthub/filter/serviceenvupdater/filter_test.go index 092c33a630a..f653bf417cb 100644 --- a/pkg/yurthub/filter/serviceenvupdater/filter_test.go +++ b/pkg/yurthub/filter/serviceenvupdater/filter_test.go @@ -249,14 +249,13 @@ func TestFilterServiceEnvUpdater(t *testing.T) { }, }, } - stopCh := make(<-chan struct{}) for _, tc := range tests { t.Run(tc.name, func(t *testing.T) { pef, _ := NewServiceEnvUpdaterFilter() pef.SetMasterServiceHost(masterHost) pef.SetMasterServicePort(masterPort) - newObj := pef.Filter(tc.requestObj, stopCh) + newObj := pef.Filter(tc.requestObj) if tc.expectedObj == nil { if !util.IsNil(newObj) { @@ -281,7 +280,7 @@ func TestFilterNonPodReq(t *testing.T) { }, } pef, _ := NewServiceEnvUpdaterFilter() - newObj := pef.Filter(serviceReq, make(<-chan struct{})) + newObj := pef.Filter(serviceReq) if !reflect.DeepEqual(newObj, serviceReq) { t.Errorf("RuntimeObjectFilter got error, expected: \n%v\nbut got: \n%v\n", serviceReq, newObj) diff --git a/pkg/yurthub/filter/servicetopology/filter.go b/pkg/yurthub/filter/servicetopology/filter.go index 95c17ec2d0b..7178db731a2 100644 --- a/pkg/yurthub/filter/servicetopology/filter.go +++ b/pkg/yurthub/filter/servicetopology/filter.go @@ -128,7 +128,7 @@ func (stf *serviceTopologyFilter) resolveNodePoolName() string { return stf.nodePoolName } -func (stf *serviceTopologyFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (stf *serviceTopologyFilter) Filter(obj runtime.Object) runtime.Object { switch v := obj.(type) { case *v1.Endpoints, *discoveryV1beta1.EndpointSlice, *discoveryv1.EndpointSlice: return stf.serviceTopologyHandler(v) diff --git a/pkg/yurthub/filter/servicetopology/filter_test.go b/pkg/yurthub/filter/servicetopology/filter_test.go index 5c1f654911b..5f10ee60a47 100644 --- a/pkg/yurthub/filter/servicetopology/filter_test.go +++ b/pkg/yurthub/filter/servicetopology/filter_test.go @@ -3339,8 +3339,7 @@ func TestFilter(t *testing.T) { yurtFactory.Start(stopper2) yurtFactory.WaitForCacheSync(stopper2) - stopCh := make(<-chan struct{}) - newObj := stf.Filter(tt.responseObject, stopCh) + newObj := stf.Filter(tt.responseObject) if util.IsNil(newObj) { t.Errorf("empty object is returned") } diff --git a/pkg/yurthub/gc/gc.go b/pkg/yurthub/gc/gc.go index a5d8a9ce373..4c5b7ed5af7 100644 --- a/pkg/yurthub/gc/gc.go +++ b/pkg/yurthub/gc/gc.go @@ -45,15 +45,14 @@ var ( type GCManager struct { store cachemanager.StorageWrapper healthChecker healthchecker.Interface - clientManager transport.Interface + clientManager transport.TransportManager nodeName string eventsGCFrequency time.Duration lastTime time.Time - stopCh <-chan struct{} } // NewGCManager creates a *GCManager object -func NewGCManager(cfg *config.YurtHubConfiguration, healthChecker healthchecker.Interface, stopCh <-chan struct{}) (*GCManager, error) { +func NewGCManager(cfg *config.YurtHubConfiguration, healthChecker healthchecker.Interface) (*GCManager, error) { gcFrequency := cfg.GCFrequency if gcFrequency == 0 { gcFrequency = defaultEventGcInterval @@ -65,14 +64,13 @@ func NewGCManager(cfg *config.YurtHubConfiguration, healthChecker healthchecker. healthChecker: healthChecker, clientManager: cfg.TransportAndDirectClientManager, eventsGCFrequency: time.Duration(gcFrequency) * time.Minute, - stopCh: stopCh, } mgr.gcPodsWhenRestart() return mgr, nil } // Run starts GCManager -func (m *GCManager) Run() { +func (m *GCManager) Run(ctx context.Context) { // run gc events after a time duration between eventsGCFrequency and 3 * eventsGCFrequency m.lastTime = time.Now() go wait.JitterUntil(func() { @@ -91,7 +89,7 @@ func (m *GCManager) Run() { m.gcEvents(kubeClient, "kubelet") m.gcEvents(kubeClient, "kube-proxy") - }, m.eventsGCFrequency, 2, true, m.stopCh) + }, m.eventsGCFrequency, 2, true, ctx.Done()) } func (m *GCManager) gcPodsWhenRestart() { diff --git a/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go b/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go index 02e66908cf6..31f4e7433c3 100644 --- a/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go +++ b/pkg/yurthub/healthchecker/cloudapiserver/health_checker.go @@ -17,6 +17,7 @@ limitations under the License. package cloudapiserver import ( + "context" "fmt" "net/url" "strconv" @@ -51,7 +52,7 @@ type cloudAPIServerHealthChecker struct { } // NewCloudAPIServerHealthChecker returns a health checker for verifying cloud kube-apiserver status. -func NewCloudAPIServerHealthChecker(cfg *config.YurtHubConfiguration, stopCh <-chan struct{}) (healthchecker.Interface, error) { +func NewCloudAPIServerHealthChecker(ctx context.Context, cfg *config.YurtHubConfiguration) (healthchecker.Interface, error) { hc := &cloudAPIServerHealthChecker{ probers: make(map[string]healthchecker.BackendProber), remoteServers: cfg.RemoteServers, @@ -71,7 +72,7 @@ func NewCloudAPIServerHealthChecker(cfg *config.YurtHubConfiguration, stopCh <-c hc.getLastNodeLease) } if len(hc.probers) != 0 { - go hc.run(stopCh) + go hc.run(ctx) } return hc, nil } @@ -116,13 +117,13 @@ func (hc *cloudAPIServerHealthChecker) UpdateBackends(servers []*url.URL) { // do nothing } -func (hc *cloudAPIServerHealthChecker) run(stopCh <-chan struct{}) { +func (hc *cloudAPIServerHealthChecker) run(ctx context.Context) { intervalTicker := time.NewTicker(time.Duration(hc.heartbeatInterval) * time.Second) defer intervalTicker.Stop() for { select { - case <-stopCh: + case <-ctx.Done(): klog.Infof("exit normally in health check loop.") return case <-intervalTicker.C: diff --git a/pkg/yurthub/healthchecker/cloudapiserver/health_checker_test.go b/pkg/yurthub/healthchecker/cloudapiserver/health_checker_test.go index c14a93bfd5a..f0da9d626cf 100644 --- a/pkg/yurthub/healthchecker/cloudapiserver/health_checker_test.go +++ b/pkg/yurthub/healthchecker/cloudapiserver/health_checker_test.go @@ -223,7 +223,6 @@ func TestNewCloudAPIServerHealthChecker(t *testing.T) { for k, tt := range testcases { t.Run(k, func(t *testing.T) { - stopCh := make(chan struct{}) cfg := &config.YurtHubConfiguration{ RemoteServers: tt.remoteServers, StorageWrapper: cachemanager.NewStorageWrapper(store), @@ -242,9 +241,11 @@ func TestNewCloudAPIServerHealthChecker(t *testing.T) { cl.PrependReactor("get", "leases", tt.getReactor[i]) fakeClients[tt.remoteServers[i].String()] = cl } + cfg.TransportAndDirectClientManager = transport.NewFakeTransportManager(http.StatusOK, fakeClients) + cfg.TransportAndDirectClientManager.Start(t.Context()) - checker, _ := NewCloudAPIServerHealthChecker(cfg, stopCh) + checker, _ := NewCloudAPIServerHealthChecker(t.Context(), cfg) // wait for the probe completed time.Sleep(time.Duration(5*len(tt.remoteServers)) * time.Second) @@ -257,8 +258,6 @@ func TestNewCloudAPIServerHealthChecker(t *testing.T) { if checker.IsHealthy() != tt.serverHealthy { t.Errorf("expect all servers healthy status %v, but got %v", tt.serverHealthy, checker.IsHealthy()) } - - close(stopCh) }) } diff --git a/pkg/yurthub/healthchecker/leaderhub/leader_hub.go b/pkg/yurthub/healthchecker/leaderhub/leader_hub.go index 3a238c7426f..2b317b4e61f 100644 --- a/pkg/yurthub/healthchecker/leaderhub/leader_hub.go +++ b/pkg/yurthub/healthchecker/leaderhub/leader_hub.go @@ -17,6 +17,7 @@ limitations under the License. package leaderhub import ( + "context" "net" "net/url" "sync" @@ -34,7 +35,7 @@ type leaderHubHealthChecker struct { pingFunc func(*url.URL) bool } -func NewLeaderHubHealthChecker(checkerInterval time.Duration, pingFunc func(*url.URL) bool, stopCh <-chan struct{}) healthchecker.Interface { +func NewLeaderHubHealthChecker(ctx context.Context, checkerInterval time.Duration, pingFunc func(*url.URL) bool) healthchecker.Interface { if pingFunc == nil { pingFunc = pingServer } @@ -45,7 +46,7 @@ func NewLeaderHubHealthChecker(checkerInterval time.Duration, pingFunc func(*url checkInterval: checkerInterval, pingFunc: pingFunc, } - go hc.startHealthCheck(stopCh) + go hc.startHealthCheck(ctx.Done()) return hc } diff --git a/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go b/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go index e5f186454d2..b9e9fde40ab 100644 --- a/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go +++ b/pkg/yurthub/healthchecker/leaderhub/leader_hub_test.go @@ -133,8 +133,7 @@ func TestLeaderHubHealthChecker(t *testing.T) { for k, tc := range testcases { t.Run(k, func(t *testing.T) { - stopCh := make(chan struct{}) - hc := NewLeaderHubHealthChecker(2*time.Second, tc.pingFunc, stopCh) + hc := NewLeaderHubHealthChecker(t.Context(), 2*time.Second, tc.pingFunc) hc.UpdateBackends(tc.servers) assert.Equal(t, hc.IsHealthy(), tc.expectedIsHealthy, "IsHealthy result is not equal") @@ -150,7 +149,6 @@ func TestLeaderHubHealthChecker(t *testing.T) { assert.Equal(t, hc.BackendIsHealthy(u), isHealthy, "BackendIsHealthy result is not equal after updated") } } - close(stopCh) }) } } diff --git a/pkg/yurthub/multiplexer/filterstore.go b/pkg/yurthub/multiplexer/filterstore.go index 7464994804f..84d44eb6436 100644 --- a/pkg/yurthub/multiplexer/filterstore.go +++ b/pkg/yurthub/multiplexer/filterstore.go @@ -78,7 +78,7 @@ func (fs *filterStore) List(ctx context.Context, options *metainternalversion.Li return result, nil } -func (fs *filterStore) filterListObject(ctx context.Context, obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) { +func (fs *filterStore) filterListObject(_ context.Context, obj runtime.Object, filter filter.ObjectFilter) (runtime.Object, error) { if yurtutil.IsNil(filter) { return obj, nil } @@ -86,12 +86,12 @@ func (fs *filterStore) filterListObject(ctx context.Context, obj runtime.Object, items, err := meta.ExtractList(obj) if err != nil || len(items) == 0 { - return filter.Filter(obj, ctx.Done()), nil + return filter.Filter(obj), nil } list := make([]runtime.Object, 0) for _, item := range items { - newObj := filter.Filter(item, ctx.Done()) + newObj := filter.Filter(item) if !yurtutil.IsNil(newObj) { list = append(list, newObj) } diff --git a/pkg/yurthub/multiplexer/filterwatch.go b/pkg/yurthub/multiplexer/filterwatch.go index b79836f5f1b..39ff8f4a845 100644 --- a/pkg/yurthub/multiplexer/filterwatch.go +++ b/pkg/yurthub/multiplexer/filterwatch.go @@ -75,7 +75,7 @@ func (f *filterWatch) receive() { } if !(result.Type == watch.Bookmark || result.Type == watch.Error) { - if newObj = f.filter.Filter(newObj, f.done); yurtutil.IsNil(newObj) { + if newObj = f.filter.Filter(newObj); yurtutil.IsNil(newObj) { watchType = watch.Deleted newObj = result.Object } diff --git a/pkg/yurthub/network/network.go b/pkg/yurthub/network/network.go index 03cc7a3368a..f8259149d17 100644 --- a/pkg/yurthub/network/network.go +++ b/pkg/yurthub/network/network.go @@ -17,6 +17,7 @@ limitations under the License. package network import ( + "context" "net" "time" @@ -49,14 +50,14 @@ func NewNetworkManager(options *options.YurtHubOptions) (*NetworkManager, error) return m, nil } -func (m *NetworkManager) Run(stopCh <-chan struct{}) { +func (m *NetworkManager) Run(ctx context.Context) { go func() { ticker := time.NewTicker(SyncNetworkPeriod * time.Second) defer ticker.Stop() for { select { - case <-stopCh: + case <-ctx.Done(): klog.Infof("exit network manager run goroutine normally") err := m.ifController.DeleteDummyInterface(m.dummyIfName) if err != nil { diff --git a/pkg/yurthub/otaupdate/ota.go b/pkg/yurthub/otaupdate/ota.go index bcfb7de7515..91ee6b3b144 100644 --- a/pkg/yurthub/otaupdate/ota.go +++ b/pkg/yurthub/otaupdate/ota.go @@ -177,7 +177,7 @@ func preCheck(clientset kubernetes.Interface, namespace, podName, nodeName strin } // HealthyCheck checks if cloud-edge is disconnected before ota update handle, ota update is not allowed when disconnected -func HealthyCheck(healthChecker healthchecker.Interface, clientManager transport.Interface, nodeName string, handler OTAHandler) http.Handler { +func HealthyCheck(healthChecker healthchecker.Interface, clientManager transport.TransportManager, nodeName string, handler OTAHandler) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { var kubeClient kubernetes.Interface if yurtutil.IsNil(healthChecker) { diff --git a/pkg/yurthub/otaupdate/ota_test.go b/pkg/yurthub/otaupdate/ota_test.go index aa7380d7531..bdc1bbbb99a 100644 --- a/pkg/yurthub/otaupdate/ota_test.go +++ b/pkg/yurthub/otaupdate/ota_test.go @@ -149,10 +149,11 @@ func TestHealthyCheck(t *testing.T) { t.Errorf("certificates are not ready, %v", err) } - clientManager, err := transport.NewTransportAndClientManager(remoteServers, 10, certManager, context.Background().Done()) + clientManager, err := transport.NewTransportAndClientManager(remoteServers, 10, certManager) if err != nil { t.Fatalf("could not new transport manager, %v", err) } + clientManager.Start(t.Context()) req, err := http.NewRequest("POST", "", nil) if err != nil { diff --git a/pkg/yurthub/proxy/autonomy/autonomy.go b/pkg/yurthub/proxy/autonomy/autonomy.go index 73be58707f4..1c9ad870aac 100644 --- a/pkg/yurthub/proxy/autonomy/autonomy.go +++ b/pkg/yurthub/proxy/autonomy/autonomy.go @@ -52,13 +52,13 @@ var ( type AutonomyProxy struct { cacheMgr cachemanager.CacheManager healthChecker healthchecker.Interface - clientManager transport.Interface + clientManager transport.TransportManager cacheFailedCount *int32 } func NewAutonomyProxy( healthChecker healthchecker.Interface, - clientManager transport.Interface, + clientManager transport.TransportManager, cacheMgr cachemanager.CacheManager, ) *AutonomyProxy { return &AutonomyProxy{ diff --git a/pkg/yurthub/proxy/local/local.go b/pkg/yurthub/proxy/local/local.go index d2ecdec9c27..a111f81e8b2 100644 --- a/pkg/yurthub/proxy/local/local.go +++ b/pkg/yurthub/proxy/local/local.go @@ -120,11 +120,10 @@ func (lp *LocalProxy) localPost(w http.ResponseWriter, req *http.Request) error if info.Resource == "events" && len(reqContentType) != 0 { ctx = hubutil.WithRespContentType(ctx, reqContentType) req = req.WithContext(ctx) - stopCh := make(chan struct{}) rc, prc := hubutil.NewDualReadCloser(req, req.Body, false) go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { - klog.V(2).Infof("cache events when cluster is unhealthy, %v", lp.cacheMgr.CacheResponse(req, prc, stopCh)) - }(req, prc, stopCh) + klog.V(2).Infof("cache events when cluster is unhealthy, %v", lp.cacheMgr.CacheResponse(req, prc)) + }(req, prc, ctx.Done()) req.Body = rc } diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go index 47288e9fa09..1f129873654 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy.go @@ -47,7 +47,6 @@ const ( type multiplexerProxy struct { requestsMultiplexerManager *multiplexer.MultiplexerManager restMapperManager *hubmeta.RESTMapperManager - stop <-chan struct{} } func init() { @@ -56,9 +55,8 @@ func init() { v1.AddToScheme(scheme.Scheme) } -func NewMultiplexerProxy(multiplexerManager *multiplexer.MultiplexerManager, restMapperMgr *hubmeta.RESTMapperManager, stop <-chan struct{}) http.Handler { +func NewMultiplexerProxy(multiplexerManager *multiplexer.MultiplexerManager, restMapperMgr *hubmeta.RESTMapperManager) http.Handler { return &multiplexerProxy{ - stop: stop, requestsMultiplexerManager: multiplexerManager, restMapperManager: restMapperMgr, } diff --git a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go index d735c87f8ef..6f42f26d2ba 100644 --- a/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go +++ b/pkg/yurthub/proxy/multiplexer/multiplexerproxy_test.go @@ -19,7 +19,6 @@ package multiplexer import ( "bytes" - "context" "fmt" "net/http" "net/http/httptest" @@ -162,7 +161,7 @@ func TestShareProxy_ServeHTTP_LIST(t *testing.T) { } healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) - loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil) dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) cfg := &config.YurtHubConfiguration{ PoolScopeResources: poolScopeResources, @@ -179,13 +178,12 @@ func TestShareProxy_ServeHTTP_LIST(t *testing.T) { Resource: "endpointslices", }) } - stopCh := make(chan struct{}) - if ok := cache.WaitForCacheSync(stopCh, informerSynced); !ok { + if ok := cache.WaitForCacheSync(t.Context().Done(), informerSynced); !ok { t.Errorf("configuration manager is not ready") return } - sp := NewMultiplexerProxy(rmm, restMapperManager, make(<-chan struct{})) + sp := NewMultiplexerProxy(rmm, restMapperManager) sp.ServeHTTP(w, newEndpointSliceListRequest(tc.url, tc.objectFilter)) @@ -340,7 +338,7 @@ func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { } { t.Run(k, func(t *testing.T) { healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) - loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil) dsm := multiplexerstorage.NewDummyStorageManager(mockCacheMap()) cfg := &config.YurtHubConfiguration{ @@ -358,13 +356,12 @@ func TestShareProxy_ServeHTTP_WATCH(t *testing.T) { Resource: "endpointslices", }) } - stopCh := make(chan struct{}) - if ok := cache.WaitForCacheSync(stopCh, informerSynced); !ok { + if ok := cache.WaitForCacheSync(t.Context().Done(), informerSynced); !ok { t.Errorf("configuration manager is not ready") return } - sp := NewMultiplexerProxy(rmm, restMapperManager, make(<-chan struct{})) + sp := NewMultiplexerProxy(rmm, restMapperManager) req := newWatchEndpointSliceRequest(tc.url, tc.objectFilter) w := newWatchResponse() diff --git a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go index 1fae9f8e444..6cf92014533 100644 --- a/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go +++ b/pkg/yurthub/proxy/multiplexer/testing/fake_endpointslicesfilter.go @@ -31,7 +31,7 @@ func (ie *IgnoreEndpointslicesWithNodeName) Name() string { // Filter is used for filtering runtime object // all filter logic should be located in it. -func (ie *IgnoreEndpointslicesWithNodeName) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object { +func (ie *IgnoreEndpointslicesWithNodeName) Filter(obj runtime.Object) runtime.Object { endpointslice, ok := obj.(*discovery.EndpointSlice) if !ok { return obj diff --git a/pkg/yurthub/proxy/nonresourcerequest/nonresource.go b/pkg/yurthub/proxy/nonresourcerequest/nonresource.go index 7e821ac956b..ea4591e2fd9 100644 --- a/pkg/yurthub/proxy/nonresourcerequest/nonresource.go +++ b/pkg/yurthub/proxy/nonresourcerequest/nonresource.go @@ -60,7 +60,7 @@ func WrapNonResourceHandler(proxyHandler http.Handler, config *config.YurtHubCon return wrapMux } -func localCacheHandler(handler NonResourceHandler, healthChecker healthchecker.Interface, clientManager transport.Interface, sw cachemanager.StorageWrapper, path string) http.Handler { +func localCacheHandler(handler NonResourceHandler, healthChecker healthchecker.Interface, clientManager transport.TransportManager, sw cachemanager.StorageWrapper, path string) http.Handler { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { // if cloud kube-apiserver is healthy, forward non resource request to cloud kube-apiserver // otherwise serve non resource request by local cache. diff --git a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go index 5e9c0129d5c..823653897c4 100644 --- a/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go +++ b/pkg/yurthub/proxy/nonresourcerequest/nonresource_test.go @@ -108,10 +108,11 @@ func TestLocalCacheHandler(t *testing.T) { t.Errorf("certificates are not ready, %v", err) } - transportManager, err := transport.NewTransportAndClientManager(remoteServers, 10, certManager, context.Background().Done()) + transportManager, err := transport.NewTransportAndClientManager(remoteServers, 10, certManager) if err != nil { t.Fatalf("could not new transport manager, %v", err) } + transportManager.Start(t.Context()) testcases := map[string]struct { path string diff --git a/pkg/yurthub/proxy/proxy.go b/pkg/yurthub/proxy/proxy.go index a71053eb4f7..5a930cff717 100644 --- a/pkg/yurthub/proxy/proxy.go +++ b/pkg/yurthub/proxy/proxy.go @@ -64,8 +64,7 @@ func NewYurtReverseProxyHandler( yurtHubCfg *config.YurtHubConfiguration, localCacheMgr cachemanager.CacheManager, cloudHealthChecker healthchecker.Interface, - requestMultiplexerManager *basemultiplexer.MultiplexerManager, - stopCh <-chan struct{}) (http.Handler, error) { + requestMultiplexerManager *basemultiplexer.MultiplexerManager) (http.Handler, error) { cfg := &server.Config{ LegacyAPIGroupPrefixes: sets.NewString(server.DefaultLegacyAPIPrefix), } @@ -77,8 +76,7 @@ func NewYurtReverseProxyHandler( localCacheMgr, yurtHubCfg.TransportAndDirectClientManager, cloudHealthChecker, - yurtHubCfg.FilterFinder, - stopCh) + yurtHubCfg.FilterFinder) var localProxy, autonomyProxy http.Handler if !yurtutil.IsNil(cloudHealthChecker) && !yurtutil.IsNil(localCacheMgr) { @@ -97,7 +95,7 @@ func NewYurtReverseProxyHandler( ) } - multiplexerProxy := multiplexer.NewMultiplexerProxy(requestMultiplexerManager, yurtHubCfg.RESTMapperManager, stopCh) + multiplexerProxy := multiplexer.NewMultiplexerProxy(requestMultiplexerManager, yurtHubCfg.RESTMapperManager) yurtProxy := &yurtReverseProxy{ cfg: yurtHubCfg, diff --git a/pkg/yurthub/proxy/remote/loadbalancer.go b/pkg/yurthub/proxy/remote/loadbalancer.go index c4a3a8020a8..1ad14a480ce 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer.go +++ b/pkg/yurthub/proxy/remote/loadbalancer.go @@ -261,10 +261,9 @@ type LoadBalancer struct { strategy LoadBalancingStrategy localCacheMgr cachemanager.CacheManager filterFinder filter.FilterFinder - transportMgr transport.Interface + transportMgr transport.TransportManager healthChecker healthchecker.Interface mode string - stopCh <-chan struct{} } // NewLoadBalancer creates a loadbalancer for specified remote servers @@ -272,17 +271,15 @@ func NewLoadBalancer( lbMode string, remoteServers []*url.URL, localCacheMgr cachemanager.CacheManager, - transportMgr transport.Interface, + transportMgr transport.TransportManager, healthChecker healthchecker.Interface, - filterFinder filter.FilterFinder, - stopCh <-chan struct{}) *LoadBalancer { + filterFinder filter.FilterFinder) *LoadBalancer { lb := &LoadBalancer{ mode: lbMode, localCacheMgr: localCacheMgr, filterFinder: filterFinder, transportMgr: transportMgr, healthChecker: healthChecker, - stopCh: stopCh, } // initialize backends @@ -295,7 +292,7 @@ func NewLoadBalancer( func (lb *LoadBalancer) UpdateBackends(remoteServers []*url.URL) { newBackends := make([]*RemoteProxy, 0, len(remoteServers)) for _, server := range remoteServers { - proxy, err := NewRemoteProxy(server, lb.modifyResponse, lb.errorHandler, lb.transportMgr, lb.stopCh) + proxy, err := NewRemoteProxy(server, lb.modifyResponse, lb.errorHandler, lb.transportMgr) if err != nil { klog.Errorf("could not create proxy for backend %s, %v", server.String(), err) continue @@ -387,7 +384,7 @@ func (lb *LoadBalancer) modifyResponse(resp *http.Response) error { if !yurtutil.IsNil(lb.filterFinder) { if responseFilter, ok := lb.filterFinder.FindResponseFilter(req); ok { wrapBody, needUncompressed := hubutil.NewGZipReaderCloser(resp.Header, resp.Body, req, "filter") - size, filterRc, err := responseFilter.Filter(req, wrapBody, lb.stopCh) + size, filterRc, err := responseFilter.Filter(req, wrapBody) if err != nil { klog.Errorf("could not filter response for %s, %v", hubutil.ReqString(req), err) return err @@ -440,12 +437,12 @@ func (lb *LoadBalancer) cacheResponse(req *http.Request, resp *http.Response) { // cache the response at local. rc, prc := hubutil.NewDualReadCloser(req, resp.Body, true) - go func(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) { - if err := lb.localCacheMgr.CacheResponse(req, prc, stopCh); err != nil && !errors.Is(err, io.EOF) && + go func(req *http.Request, prc io.ReadCloser) { + if err := lb.localCacheMgr.CacheResponse(req, prc); err != nil && !errors.Is(err, io.EOF) && !errors.Is(err, context.Canceled) { klog.Errorf("lb could not cache req %s in local cache, %v", hubutil.ReqString(req), err) } - }(req, prc, req.Context().Done()) + }(req, prc) resp.Body = rc } } diff --git a/pkg/yurthub/proxy/remote/loadbalancer_test.go b/pkg/yurthub/proxy/remote/loadbalancer_test.go index 13397226752..268e64513b5 100644 --- a/pkg/yurthub/proxy/remote/loadbalancer_test.go +++ b/pkg/yurthub/proxy/remote/loadbalancer_test.go @@ -17,7 +17,6 @@ limitations under the License. package remote import ( - "context" "net/http" "net/url" "sort" @@ -30,14 +29,6 @@ import ( "github.com/openyurtio/openyurt/pkg/yurthub/transport" ) -var ( - neverStop <-chan struct{} = context.Background().Done() - transportMgr transport.Interface = transport.NewFakeTransportManager( - http.StatusOK, - map[string]kubernetes.Interface{}, - ) -) - func sortURLs(urls []*url.URL) { sort.Slice(urls, func(i, j int) bool { return urls[i].Host < urls[j].Host @@ -255,7 +246,13 @@ func TestLoadBalancingStrategy(t *testing.T) { sortURLs(servers) klog.Infof("servers: %+v", servers) - lb := NewLoadBalancer(tc.lbMode, servers, nil, transportMgr, checker, nil, neverStop) + var transportMgr transport.TransportManager = transport.NewFakeTransportManager( + http.StatusOK, + map[string]kubernetes.Interface{}, + ) + + transportMgr.Start(t.Context()) + lb := NewLoadBalancer(tc.lbMode, servers, nil, transportMgr, checker, nil) for i, host := range tc.results { strategy := lb.CurrentStrategy() diff --git a/pkg/yurthub/proxy/remote/remote.go b/pkg/yurthub/proxy/remote/remote.go index 268a1b823e0..d9fb62c11fa 100644 --- a/pkg/yurthub/proxy/remote/remote.go +++ b/pkg/yurthub/proxy/remote/remote.go @@ -41,7 +41,6 @@ type RemoteProxy struct { bearerTransport http.RoundTripper upgradeHandler *proxy.UpgradeAwareHandler bearerUpgradeHandler *proxy.UpgradeAwareHandler - stopCh <-chan struct{} } type responder struct{} @@ -55,8 +54,7 @@ func (r *responder) Error(w http.ResponseWriter, req *http.Request, err error) { func NewRemoteProxy(remoteServer *url.URL, modifyResponse func(*http.Response) error, errhandler func(http.ResponseWriter, *http.Request, error), - transportMgr transport.Interface, - stopCh <-chan struct{}) (*RemoteProxy, error) { + transportMgr transport.TransportManager) (*RemoteProxy, error) { currentTransport := transportMgr.CurrentTransport() if currentTransport == nil { return nil, fmt.Errorf("could not get current transport when init proxy backend(%s)", remoteServer.String()) @@ -78,7 +76,6 @@ func NewRemoteProxy(remoteServer *url.URL, bearerTransport: bearerTransport, upgradeHandler: upgradeAwareHandler, bearerUpgradeHandler: bearerUpgradeAwareHandler, - stopCh: stopCh, } proxyBackend.reverseProxy.Transport = proxyBackend diff --git a/pkg/yurthub/proxy/util/util.go b/pkg/yurthub/proxy/util/util.go index 20bcf94fa04..a838a584754 100644 --- a/pkg/yurthub/proxy/util/util.go +++ b/pkg/yurthub/proxy/util/util.go @@ -366,7 +366,7 @@ func WithSaTokenSubstitute(handler http.Handler, tenantMgr tenant.Interface) htt if tenantNs, _, err := serviceaccount.SplitUsername(oldClaim.Subject); err == nil { - if tenantMgr.GetTenantNs() != tenantNs && tenantNs == "kube-system" && tenantMgr.WaitForCacheSync() { // token is not from tenant's namespace + if tenantMgr.GetTenantNs() != tenantNs && tenantNs == "kube-system" && tenantMgr.WaitForCacheSync(req.Context()) { // token is not from tenant's namespace req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", tenantMgr.GetTenantToken())) klog.V(2).Infof("replace token, old: %s, new: %s", oldToken, tenantMgr.GetTenantToken()) } diff --git a/pkg/yurthub/proxy/util/util_test.go b/pkg/yurthub/proxy/util/util_test.go index 9b650f60217..3d56db0fcf3 100644 --- a/pkg/yurthub/proxy/util/util_test.go +++ b/pkg/yurthub/proxy/util/util_test.go @@ -122,7 +122,7 @@ func TestWithRequestForPoolScopeMetadata(t *testing.T) { } healthChecher := fakeHealthChecker.NewFakeChecker(map[*url.URL]bool{}) - loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil, context.Background().Done()) + loadBalancer := remote.NewLoadBalancer("round-robin", []*url.URL{}, nil, nil, healthChecher, nil) cfg := &config.YurtHubConfiguration{ PoolScopeResources: poolScopeResources, RESTMapperManager: restMapperManager, @@ -580,8 +580,7 @@ func TestWithSaTokenSubstitute(t *testing.T) { resolver := newTestRequestInfoResolver() - stopCh := make(<-chan struct{}) - tenantMgr := tenant.New("myspace", nil, stopCh) + tenantMgr := tenant.New("myspace", nil) data := make(map[string][]byte) data["token"] = []byte(tenantToken) @@ -670,8 +669,7 @@ func TestWithSaTokenSubstituteTenantTokenEmpty(t *testing.T) { resolver := newTestRequestInfoResolver() - stopCh := make(<-chan struct{}) - tenantMgr := tenant.New("myspace", nil, stopCh) + tenantMgr := tenant.New("myspace", nil) data := make(map[string][]byte) data["token"] = []byte(tenantToken) diff --git a/pkg/yurthub/server/server.go b/pkg/yurthub/server/server.go index 07441c40ecf..b0f9ba3023b 100644 --- a/pkg/yurthub/server/server.go +++ b/pkg/yurthub/server/server.go @@ -17,6 +17,7 @@ limitations under the License. package server import ( + "context" "fmt" "net/http" @@ -36,42 +37,41 @@ import ( ) // RunYurtHubServers is used to start up all servers for yurthub -func RunYurtHubServers(cfg *config.YurtHubConfiguration, +func RunYurtHubServers(ctx context.Context, cfg *config.YurtHubConfiguration, proxyHandler http.Handler, - healthChecker healthchecker.Interface, - stopCh <-chan struct{}) error { + healthChecker healthchecker.Interface) error { hubServerHandler := mux.NewRouter() registerHandlers(hubServerHandler, cfg, healthChecker) // start yurthub http server for serving metrics, pprof. if cfg.YurtHubServerServing != nil { - if err := cfg.YurtHubServerServing.Serve(hubServerHandler, 0, stopCh); err != nil { + if err := cfg.YurtHubServerServing.Serve(hubServerHandler, 0, ctx.Done()); err != nil { return err } } // start yurthub proxy servers for forwarding requests to cloud kube-apiserver if cfg.YurtHubProxyServerServing != nil { - if err := cfg.YurtHubProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + if err := cfg.YurtHubProxyServerServing.Serve(proxyHandler, 0, ctx.Done()); err != nil { return err } } if cfg.YurtHubDummyProxyServerServing != nil { - if err := cfg.YurtHubDummyProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + if err := cfg.YurtHubDummyProxyServerServing.Serve(proxyHandler, 0, ctx.Done()); err != nil { return err } } if cfg.YurtHubSecureProxyServerServing != nil { - if _, _, err := cfg.YurtHubSecureProxyServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + if _, _, err := cfg.YurtHubSecureProxyServerServing.Serve(proxyHandler, 0, ctx.Done()); err != nil { return err } } if cfg.YurtHubMultiplexerServerServing != nil { - if _, _, err := cfg.YurtHubMultiplexerServerServing.Serve(proxyHandler, 0, stopCh); err != nil { + if _, _, err := cfg.YurtHubMultiplexerServerServing.Serve(proxyHandler, 0, ctx.Done()); err != nil { return err } } diff --git a/pkg/yurthub/tenant/tenant.go b/pkg/yurthub/tenant/tenant.go index d5863b0ebd3..54151c5bd28 100644 --- a/pkg/yurthub/tenant/tenant.go +++ b/pkg/yurthub/tenant/tenant.go @@ -16,6 +16,7 @@ limitations under the License. package tenant import ( + "context" "sync" v1 "k8s.io/api/core/v1" @@ -29,7 +30,7 @@ type Interface interface { GetTenantToken() string - WaitForCacheSync() bool + WaitForCacheSync(context.Context) bool SetSecret(sec *v1.Secret) } @@ -41,14 +42,12 @@ type tenantManager struct { TenantNs string - StopCh <-chan struct{} - IsSynced bool mutex sync.Mutex } -func (mgr *tenantManager) WaitForCacheSync() bool { +func (mgr *tenantManager) WaitForCacheSync(ctx context.Context) bool { if mgr.IsSynced || mgr.TenantSecret != nil { //try to do sync for just one time, fast return return true @@ -57,18 +56,18 @@ func (mgr *tenantManager) WaitForCacheSync() bool { mgr.mutex.Lock() defer mgr.mutex.Unlock() - mgr.IsSynced = cache.WaitForCacheSync(mgr.StopCh, mgr.secretSynced) + mgr.IsSynced = cache.WaitForCacheSync(ctx.Done(), mgr.secretSynced) return mgr.IsSynced } -func New(tenantNs string, factory informers.SharedInformerFactory, stopCh <-chan struct{}) Interface { +func New(tenantNs string, factory informers.SharedInformerFactory) Interface { klog.Infof("parse tenant ns: %s", tenantNs) if tenantNs == "" { return nil } - tenantMgr := &tenantManager{TenantNs: tenantNs, StopCh: stopCh} + tenantMgr := &tenantManager{TenantNs: tenantNs} if factory != nil { informer := factory.InformerFor(&v1.Secret{}, nil) //get registered secret informer diff --git a/pkg/yurthub/transport/fake_transport.go b/pkg/yurthub/transport/fake_transport.go index 8b8a9ea6cfd..a2f84b5f5c0 100644 --- a/pkg/yurthub/transport/fake_transport.go +++ b/pkg/yurthub/transport/fake_transport.go @@ -17,6 +17,7 @@ limitations under the License. package transport import ( + "context" "net/http" "net/url" @@ -39,7 +40,7 @@ type fakeTransportManager struct { serverToClientset map[string]kubernetes.Interface } -func NewFakeTransportManager(code int, fakeClients map[string]kubernetes.Interface) Interface { +func NewFakeTransportManager(code int, fakeClients map[string]kubernetes.Interface) TransportManager { return &fakeTransportManager{ nop: &nopRoundTrip{code: code}, serverToClientset: fakeClients, @@ -75,3 +76,5 @@ func (f *fakeTransportManager) GetDirectClientsetAtRandom() kubernetes.Interface func (f *fakeTransportManager) ListDirectClientset() map[string]kubernetes.Interface { return f.serverToClientset } + +func (f *fakeTransportManager) Start(_ context.Context) {} diff --git a/pkg/yurthub/transport/transport.go b/pkg/yurthub/transport/transport.go index 794582212cb..15af22b794c 100644 --- a/pkg/yurthub/transport/transport.go +++ b/pkg/yurthub/transport/transport.go @@ -17,6 +17,7 @@ limitations under the License. package transport import ( + "context" "crypto/tls" "fmt" "net/http" @@ -41,8 +42,8 @@ type CertGetter interface { GetCAData() []byte } -// Interface is an transport interface for managing clients that used to connecting kube-apiserver -type Interface interface { +// TransportManager is an interface for managing clients that used to connecting kube-apiserver +type TransportManager interface { // CurrentTransport get transport that used by load balancer // and can be used by multiple goroutines concurrently. CurrentTransport() http.RoundTripper @@ -56,6 +57,9 @@ type Interface interface { GetDirectClientsetAtRandom() kubernetes.Interface // ListDirectClientset returns all clientsets ListDirectClientset() map[string]kubernetes.Interface + + // Start manager internal work. + Start(context.Context) } type transportAndClientManager struct { @@ -64,12 +68,11 @@ type transportAndClientManager struct { certGetter CertGetter closeAll func() close func(string) - stopCh <-chan struct{} serverToClientset map[string]kubernetes.Interface } // NewTransportManager create a transport interface object. -func NewTransportAndClientManager(servers []*url.URL, timeout int, certGetter CertGetter, stopCh <-chan struct{}) (Interface, error) { +func NewTransportAndClientManager(servers []*url.URL, timeout int, certGetter CertGetter) (TransportManager, error) { caData := certGetter.GetCAData() if len(caData) == 0 { return nil, fmt.Errorf("ca cert data was not prepared when new transport") @@ -110,7 +113,6 @@ func NewTransportAndClientManager(servers []*url.URL, timeout int, certGetter Ce certGetter: certGetter, closeAll: d.CloseAll, close: d.Close, - stopCh: stopCh, serverToClientset: make(map[string]kubernetes.Interface), } @@ -131,8 +133,6 @@ func NewTransportAndClientManager(servers []*url.URL, timeout int, certGetter Ce } } - tcm.start() - return tcm, nil } @@ -168,7 +168,7 @@ func (tcm *transportAndClientManager) ListDirectClientset() map[string]kubernete return tcm.serverToClientset } -func (tcm *transportAndClientManager) start() { +func (tcm *transportAndClientManager) Start(ctx context.Context) { lastCert := tcm.certGetter.GetAPIServerClientCert() go wait.Until(func() { @@ -196,7 +196,7 @@ func (tcm *transportAndClientManager) start() { // certificate expired or deleted unintentionally, just wait for cert updated by bootstrap config, do nothing klog.Warningf("certificate expired or deleted unintentionally") } - }, 10*time.Second, tcm.stopCh) + }, 10*time.Second, ctx.Done()) } func tlsConfig(current func() *tls.Certificate, caData []byte) (*tls.Config, error) { diff --git a/pkg/yurthub/util/connrotation_test.go b/pkg/yurthub/util/connrotation_test.go index 032e7799c54..2cff480692c 100644 --- a/pkg/yurthub/util/connrotation_test.go +++ b/pkg/yurthub/util/connrotation_test.go @@ -29,9 +29,6 @@ func TestNewDialer(t *testing.T) { notExistLocalAddress := ":13998" network := "tcp" requestMessage := []byte("some message") - stopCh := make(chan struct{}) - - defer close(stopCh) go func(stopCh <-chan struct{}) { l, err := net.Listen(network, localAddress) @@ -60,7 +57,7 @@ func TestNewDialer(t *testing.T) { } } - }(stopCh) + }(t.Context().Done()) time.Sleep(time.Second) diff --git a/pkg/yurthub/util/dumpstack.go b/pkg/yurthub/util/dumpstack.go index 5df324a5a99..aa035d66252 100644 --- a/pkg/yurthub/util/dumpstack.go +++ b/pkg/yurthub/util/dumpstack.go @@ -17,6 +17,7 @@ limitations under the License. package util import ( + "context" "fmt" "os" "os/signal" @@ -27,7 +28,7 @@ import ( "k8s.io/klog/v2" ) -func SetupDumpStackTrap(logDir string, stopCh <-chan struct{}) { +func SetupDumpStackTrap(ctx context.Context, logDir string) { c := make(chan os.Signal, 1) signal.Notify(c, syscall.SIGUSR1) @@ -36,7 +37,7 @@ func SetupDumpStackTrap(logDir string, stopCh <-chan struct{}) { select { case <-c: dumpStacks(true, logDir) - case <-stopCh: + case <-ctx.Done(): return } } diff --git a/pkg/yurthub/util/dumpstack_test.go b/pkg/yurthub/util/dumpstack_test.go index 4b4b8f05bf3..09275dabc9e 100644 --- a/pkg/yurthub/util/dumpstack_test.go +++ b/pkg/yurthub/util/dumpstack_test.go @@ -29,10 +29,8 @@ import ( func TestSetupDumpStackTrap(t *testing.T) { logDir := "/tmp" - stopCh := make(chan struct{}) - defer close(stopCh) - SetupDumpStackTrap(logDir, stopCh) + SetupDumpStackTrap(t.Context(), logDir) proc, err := os.FindProcess(os.Getpid()) assert.NoError(t, err)