Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions cmd/yurthub/app/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -93,15 +93,15 @@ type YurtHubConfiguration struct {
DiskCachePath string
ConfigManager *configuration.Manager
TenantManager tenant.Interface
TransportAndDirectClientManager transport.Interface
TransportAndDirectClientManager transport.TransportManager
LoadBalancerForLeaderHub remote.Server
PoolScopeResources []schema.GroupVersionResource
PortForMultiplexer int
NodePoolName string
}

// Complete converts *options.YurtHubOptions to *YurtHubConfiguration
func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHubConfiguration, error) {
func Complete(ctx context.Context, options *options.YurtHubOptions) (*YurtHubConfiguration, error) {
cfg := &YurtHubConfiguration{
NodeName: options.NodeName,
WorkingMode: util.WorkingMode(options.WorkingMode),
Expand Down Expand Up @@ -191,19 +191,19 @@ func Complete(options *options.YurtHubOptions, stopCh <-chan struct{}) (*YurtHub
us,
options.HeartbeatTimeoutSeconds,
certMgr,
stopCh,
)
if err != nil {
return nil, fmt.Errorf("could not new transport manager, %w", err)
}
transportAndClientManager.Start(ctx)

cfg.SerializerManager = serializer.NewSerializerManager()
cfg.RESTMapperManager = restMapperManager
cfg.SharedFactory = sharedFactory
cfg.DynamicSharedFactory = dynamicSharedFactory
cfg.CertManager = certMgr
cfg.TransportAndDirectClientManager = transportAndClientManager
cfg.TenantManager = tenant.New(tenantNamespce, sharedFactory, stopCh)
cfg.TenantManager = tenant.New(tenantNamespce, sharedFactory)

// create feature configurations for both cloud and edge working mode as following:
// - configuration manager: monitor yurt-hub-cfg configmap and adopting changes dynamically.
Expand Down
2 changes: 1 addition & 1 deletion cmd/yurthub/app/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ func TestComplete(t *testing.T) {
options.EnableDummyIf = false
options.HubAgentDummyIfIP = "169.254.2.1"
options.NodeIP = "127.0.0.1"
cfg, err := Complete(options, nil)
cfg, err := Complete(t.Context(), options)
if err != nil {
t.Errorf("expect no err, but got %v", err)
} else if cfg == nil {
Expand Down
25 changes: 13 additions & 12 deletions cmd/yurthub/app/start.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,13 +70,13 @@
klog.Fatalf("validate options: %v", err)
}

yurtHubCfg, err := config.Complete(yurtHubOptions, ctx.Done())
yurtHubCfg, err := config.Complete(ctx, yurtHubOptions)

Check warning on line 73 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L73

Added line #L73 was not covered by tests
if err != nil {
klog.Fatalf("complete %s configuration error, %v", projectinfo.GetHubName(), err)
}
klog.Infof("%s cfg: %#+v", projectinfo.GetHubName(), yurtHubCfg)

util.SetupDumpStackTrap(yurtHubOptions.RootDir, ctx.Done())
util.SetupDumpStackTrap(ctx, yurtHubOptions.RootDir)

Check warning on line 79 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L79

Added line #L79 was not covered by tests
klog.Infof("start watch SIGUSR1 signal")

if err := Run(ctx, yurtHubCfg); err != nil {
Expand Down Expand Up @@ -130,36 +130,38 @@
trace++

klog.Infof("%d. create health checkers for remote servers", trace)
cloudHealthChecker, err = cloudapiserver.NewCloudAPIServerHealthChecker(cfg, ctx.Done())
cloudHealthChecker, err = cloudapiserver.NewCloudAPIServerHealthChecker(ctx, cfg)

Check warning on line 133 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L133

Added line #L133 was not covered by tests
if err != nil {
return fmt.Errorf("could not new health checker for cloud kube-apiserver, %w", err)
}
trace++

klog.Infof("%d. new gc manager for node %s, and gc frequency is a random time between %d min and %d min", trace, cfg.NodeName, cfg.GCFrequency, 3*cfg.GCFrequency)
gcMgr, err := gc.NewGCManager(cfg, cloudHealthChecker, ctx.Done())
gcMgr, err := gc.NewGCManager(cfg, cloudHealthChecker)

Check warning on line 140 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L140

Added line #L140 was not covered by tests
if err != nil {
return fmt.Errorf("could not new gc manager, %w", err)
}
gcMgr.Run()
gcMgr.Run(ctx)

Check warning on line 144 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L144

Added line #L144 was not covered by tests
trace++
}

// no leader hub servers for transport manager at startup time.
// and don't filter response of request for pool scope metadata from leader hub.
transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, 2, cfg.CertManager, ctx.Done())
transportManagerForLeaderHub, err := transport.NewTransportAndClientManager([]*url.URL{}, 2, cfg.CertManager)

Check warning on line 150 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L150

Added line #L150 was not covered by tests
if err != nil {
return fmt.Errorf("could not new transport manager for leader hub, %w", err)
}
healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(20*time.Second, nil, ctx.Done())
loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, cacheManager, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil, ctx.Done())
transportManagerForLeaderHub.Start(ctx)

Check warning on line 154 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L154

Added line #L154 was not covered by tests

healthCheckerForLeaderHub := leaderhub.NewLeaderHubHealthChecker(ctx, 20*time.Second, nil)
loadBalancerForLeaderHub := remote.NewLoadBalancer("round-robin", []*url.URL{}, cacheManager, transportManagerForLeaderHub, healthCheckerForLeaderHub, nil)

Check warning on line 157 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L156-L157

Added lines #L156 - L157 were not covered by tests

cfg.LoadBalancerForLeaderHub = loadBalancerForLeaderHub
requestMultiplexerManager := newRequestMultiplexerManager(cfg, healthCheckerForLeaderHub)

if cfg.NetworkMgr != nil {
klog.Infof("%d. start network manager for ensuing dummy interface", trace)
cfg.NetworkMgr.Run(ctx.Done())
cfg.NetworkMgr.Run(ctx)

Check warning on line 164 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L164

Added line #L164 was not covered by tests
trace++
}

Expand All @@ -173,15 +175,14 @@
cfg,
cacheManager,
cloudHealthChecker,
requestMultiplexerManager,
ctx.Done())
requestMultiplexerManager)

Check warning on line 178 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L178

Added line #L178 was not covered by tests
if err != nil {
return fmt.Errorf("could not create reverse proxy handler, %w", err)
}
trace++

klog.Infof("%d. new %s server and begin to serve", trace, projectinfo.GetHubName())
if err := server.RunYurtHubServers(cfg, yurtProxyHandler, cloudHealthChecker, ctx.Done()); err != nil {
if err := server.RunYurtHubServers(ctx, cfg, yurtProxyHandler, cloudHealthChecker); err != nil {

Check warning on line 185 in cmd/yurthub/app/start.go

View check run for this annotation

Codecov / codecov/patch

cmd/yurthub/app/start.go#L185

Added line #L185 was not covered by tests
return fmt.Errorf("could not run hub servers, %w", err)
}
default:
Expand Down
8 changes: 4 additions & 4 deletions pkg/yurthub/cachemanager/cache_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ var (

// CacheManager is an adaptor to cache runtime object data into backend storage
type CacheManager interface {
CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error
CacheResponse(req *http.Request, prc io.ReadCloser) error
QueryCache(req *http.Request) (runtime.Object, error)
CanCacheFor(req *http.Request) bool
DeleteKindFor(gvr schema.GroupVersionResource) error
Expand Down Expand Up @@ -109,11 +109,11 @@ func (cm *cacheManager) QueryCacheResult() CacheResult {
}

// CacheResponse cache response of request into backend storage
func (cm *cacheManager) CacheResponse(req *http.Request, prc io.ReadCloser, stopCh <-chan struct{}) error {
func (cm *cacheManager) CacheResponse(req *http.Request, prc io.ReadCloser) error {
ctx := req.Context()
info, _ := apirequest.RequestInfoFrom(ctx)
if isWatch(ctx) {
return cm.saveWatchObject(ctx, info, prc, stopCh)
return cm.saveWatchObject(ctx, info, prc)
}

var buf bytes.Buffer
Expand Down Expand Up @@ -359,7 +359,7 @@ func generateEmptyListObjOfGVK(listGvk schema.GroupVersionKind) (runtime.Object,
return listObj, nil
}

func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.RequestInfo, r io.ReadCloser, _ <-chan struct{}) error {
func (cm *cacheManager) saveWatchObject(ctx context.Context, info *apirequest.RequestInfo, r io.ReadCloser) error {
delObjCnt := 0
updateObjCnt := 0
addObjCnt := 0
Expand Down
6 changes: 3 additions & 3 deletions pkg/yurthub/cachemanager/cache_manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -503,7 +503,7 @@ func TestCacheGetResponse(t *testing.T) {
}
}
prc := io.NopCloser(buf)
cacheErr = yurtCM.CacheResponse(req, prc, nil)
cacheErr = yurtCM.CacheResponse(req, prc)
})

handler = proxyutil.WithRequestContentType(handler)
Expand Down Expand Up @@ -953,7 +953,7 @@ func TestCacheWatchResponse(t *testing.T) {
pw.Close()
}(pw)
rc := io.NopCloser(pr)
err = yurtCM.CacheResponse(req, rc, nil)
err = yurtCM.CacheResponse(req, rc)
})

handler = proxyutil.WithRequestContentType(handler)
Expand Down Expand Up @@ -1544,7 +1544,7 @@ func TestCacheListResponse(t *testing.T) {
}
prc := io.NopCloser(buf)
// call cache response
cacheErr = yurtCM.CacheResponse(req, prc, nil)
cacheErr = yurtCM.CacheResponse(req, prc)
})

handler = proxyutil.WithRequestContentType(handler)
Expand Down
6 changes: 2 additions & 4 deletions pkg/yurthub/configuration/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,11 +184,9 @@ func TestManager(t *testing.T) {
informerfactory := informers.NewSharedInformerFactory(client, 0)
manager := NewConfigurationManager(tc.nodeName, informerfactory)

stopCh := make(chan struct{})
informerfactory.Start(stopCh)
defer close(stopCh)
informerfactory.Start(t.Context().Done())

if ok := cache.WaitForCacheSync(stopCh, manager.HasSynced); !ok {
if ok := cache.WaitForCacheSync(t.Context().Done(), manager.HasSynced); !ok {
t.Errorf("configuration manager is not ready")
return
}
Expand Down
2 changes: 0 additions & 2 deletions pkg/yurthub/filter/approver/approver.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,12 @@ import (
type approver struct {
skipRequestUserAgentList sets.Set[string]
configManager *configuration.Manager
stopCh chan struct{}
}

func NewApprover(nodeName string, configManager *configuration.Manager) filter.Approver {
na := &approver{
skipRequestUserAgentList: sets.New[string](projectinfo.GetHubName(), util.MultiplexerProxyClientUserAgentPrefix+nodeName),
configManager: configManager,
stopCh: make(chan struct{}),
}
return na
}
Expand Down
8 changes: 4 additions & 4 deletions pkg/yurthub/filter/base/base_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,12 @@ func (noh *nopObjectHandler) SupportedResourceAndVerbs() map[string]sets.Set[str
return map[string]sets.Set[string]{}
}

func (noh *nopObjectHandler) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
func (noh *nopObjectHandler) Filter(obj runtime.Object) runtime.Object {
return obj
}

var (
nodesNameErr = errors.New("nodes name error")
errNodesName = errors.New("nodes name error")
)

type nopNodesErrHandler struct {
Expand All @@ -61,7 +61,7 @@ type nopNodesErrHandler struct {

func NewNopNodesErrHandler() filter.ObjectFilter {
return &nopNodesErrHandler{
err: nodesNameErr,
err: errNodesName,
}
}

Expand Down Expand Up @@ -193,7 +193,7 @@ func TestInitializers(t *testing.T) {
},
"initialize error": {
filter: NewNopNodesErrHandler(),
resultErr: nodesNameErr,
resultErr: errNodesName,
},
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/discardcloudservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func (sf *discardCloudServiceFilter) Name() string {
return FilterName
}

func (sf *discardCloudServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
func (sf *discardCloudServiceFilter) Filter(obj runtime.Object) runtime.Object {
switch v := obj.(type) {
case *v1.Service:
return discardCloudService(v)
Expand Down
3 changes: 1 addition & 2 deletions pkg/yurthub/filter/discardcloudservice/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -139,12 +139,11 @@ func TestFilter(t *testing.T) {
},
}

stopCh := make(<-chan struct{})
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
dcsf, _ := NewDiscardCloudServiceFilter()

newObj := dcsf.Filter(tt.responseObj, stopCh)
newObj := dcsf.Filter(tt.responseObj)
if tt.expectObj == nil {
if !util.IsNil(newObj) {
t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/forwardkubesvctraffic/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (fkst *forwardKubeSVCTrafficFilter) SetMasterServicePort(portStr string) er
return nil
}

func (fkst *forwardKubeSVCTrafficFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
func (fkst *forwardKubeSVCTrafficFilter) Filter(obj runtime.Object) runtime.Object {
switch v := obj.(type) {
case *discovery.EndpointSlice:
fkst.mutateDefaultKubernetesEps(v)
Expand Down
3 changes: 1 addition & 2 deletions pkg/yurthub/filter/forwardkubesvctraffic/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,13 +213,12 @@ func TestFilter(t *testing.T) {
},
}

stopCh := make(<-chan struct{})
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
fkst, _ := NewForwardKubeSVCTrafficFilter()
fkst.SetMasterServiceHost(tt.host)
fkst.SetMasterServicePort(tt.port)
newObj := fkst.Filter(tt.responseObject, stopCh)
newObj := fkst.Filter(tt.responseObject)
if tt.expectObject == nil {
if !util.IsNil(newObj) {
t.Errorf("Filter expect nil obj, but got %v", newObj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/inclusterconfig/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func (iccf *inClusterConfigFilter) Name() string {
return FilterName
}

func (iccf *inClusterConfigFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
func (iccf *inClusterConfigFilter) Filter(obj runtime.Object) runtime.Object {
switch v := obj.(type) {
case *v1.ConfigMap:
return mutateKubeProxyConfigMap(v)
Expand Down
3 changes: 1 addition & 2 deletions pkg/yurthub/filter/inclusterconfig/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,10 +130,9 @@ func TestRuntimeObjectFilter(t *testing.T) {
},
}

stopCh := make(<-chan struct{})
for k, tc := range testcases {
t.Run(k, func(t *testing.T) {
newObj := iccf.Filter(tc.responseObject, stopCh)
newObj := iccf.Filter(tc.responseObject)
if tc.expectObject == nil {
if !util.IsNil(newObj) {
t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/initializer/initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -134,7 +134,7 @@ func (bef *baseErrFilter) SupportedResourceAndVerbs() map[string]sets.Set[string
return map[string]sets.Set[string]{}
}

func (bef *baseErrFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
func (bef *baseErrFilter) Filter(obj runtime.Object) runtime.Object {
return obj
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/initializer/node_initializer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ func (nop *nopNodeHandler) SupportedResourceAndVerbs() map[string]sets.Set[strin
return map[string]sets.Set[string]{}
}

func (nop *nopNodeHandler) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
func (nop *nopNodeHandler) Filter(obj runtime.Object) runtime.Object {
return obj
}

Expand Down
4 changes: 2 additions & 2 deletions pkg/yurthub/filter/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Approver interface {
type ResponseFilter interface {
Name() string
// Filter is used to filter data returned from the cloud.
Filter(req *http.Request, rc io.ReadCloser, stopCh <-chan struct{}) (int, io.ReadCloser, error)
Filter(req *http.Request, rc io.ReadCloser) (int, io.ReadCloser, error)
}

// ObjectFilter is used for filtering runtime object.
Expand All @@ -52,7 +52,7 @@ type ObjectFilter interface {
Name() string
// Filter is used for filtering runtime object
// all filter logic should be located in it.
Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object
Filter(obj runtime.Object) runtime.Object
}

type FilterFinder interface {
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/masterservice/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func (msf *masterServiceFilter) SetMasterServicePort(portStr string) error {
return nil
}

func (msf *masterServiceFilter) Filter(obj runtime.Object, _ <-chan struct{}) runtime.Object {
func (msf *masterServiceFilter) Filter(obj runtime.Object) runtime.Object {
switch v := obj.(type) {
case *v1.Service:
msf.mutateMasterService(v)
Expand Down
3 changes: 1 addition & 2 deletions pkg/yurthub/filter/masterservice/filter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,13 +161,12 @@ func TestFilter(t *testing.T) {
},
}

stopCh := make(<-chan struct{})
for k, tt := range testcases {
t.Run(k, func(t *testing.T) {
msf := &masterServiceFilter{}
msf.SetMasterServiceHost(masterServiceHost)
msf.SetMasterServicePort(masterServicePortStr)
newObj := msf.Filter(tt.responseObject, stopCh)
newObj := msf.Filter(tt.responseObject)
if tt.expectObject == nil {
if !util.IsNil(newObj) {
t.Errorf("RuntimeObjectFilter expect nil obj, but got %v", newObj)
Expand Down
2 changes: 1 addition & 1 deletion pkg/yurthub/filter/nodeportisolation/filter.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ func (nif *nodePortIsolationFilter) SetKubeClient(client kubernetes.Interface) e
return nil
}

func (nif *nodePortIsolationFilter) Filter(obj runtime.Object, stopCh <-chan struct{}) runtime.Object {
func (nif *nodePortIsolationFilter) Filter(obj runtime.Object) runtime.Object {
switch v := obj.(type) {
case *v1.Service:
return nif.isolateNodePortService(v)
Expand Down
Loading
Loading