Skip to content

Commit

Permalink
support concurrency setting in RemoteClusterStatusChecker
Browse files Browse the repository at this point in the history
Signed-off-by: Bruce Ma <[email protected]>
  • Loading branch information
mars1024 committed Jul 5, 2022
1 parent 98a66b8 commit ecc26a4
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 85 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ require (
github.com/vishvananda/netlink v1.1.1-0.20210330154013-f5de75959ad5
github.com/vishvananda/netns v0.0.0-20211101163701-50045581ed74
golang.org/x/sys v0.0.0-20220209214540-3681064d5158
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac
google.golang.org/protobuf v1.27.1
k8s.io/api v0.23.6
k8s.io/apimachinery v0.23.6
Expand Down Expand Up @@ -86,7 +87,6 @@ require (
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
golang.org/x/term v0.0.0-20210927222741-03fcf44c2211 // indirect
golang.org/x/text v0.3.7 // indirect
golang.org/x/time v0.0.0-20210723032227-1f47c861a9ac // indirect
gomodules.xyz/jsonpatch/v2 v2.2.0 // indirect
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20211203200212-54befc351ae9 // indirect
Expand Down
29 changes: 0 additions & 29 deletions pkg/controllers/multicluster/event.go

This file was deleted.

33 changes: 17 additions & 16 deletions pkg/controllers/multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func RegisterToManager(ctx context.Context, mgr manager.Manager, options Registe
options.ConcurrencyMap = map[string]int{}
}

clusterCheckEvent := make(chan ClusterCheckEvent, 5)
clusterStatusCheckChan := make(chan string, 10)

uuidMutex, err := NewUUIDMutexFromClient(ctx, mgr.GetClient())
if err != nil {
Expand All @@ -60,26 +60,27 @@ func RegisterToManager(ctx context.Context, mgr manager.Manager, options Registe
}

if err = (&RemoteClusterReconciler{
Context: ctx,
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(ControllerRemoteCluster + "Controller"),
UUIDMutex: uuidMutex,
DaemonHub: daemonHub,
LocalManager: mgr,
Event: clusterCheckEvent,
ControllerConcurrency: concurrency.ControllerConcurrency(options.ConcurrencyMap[ControllerRemoteCluster]),
Context: ctx,
Client: mgr.GetClient(),
Recorder: mgr.GetEventRecorderFor(ControllerRemoteCluster + "Controller"),
UUIDMutex: uuidMutex,
DaemonHub: daemonHub,
LocalManager: mgr,
ClusterStatusCheckChan: clusterStatusCheckChan,
ControllerConcurrency: concurrency.ControllerConcurrency(options.ConcurrencyMap[ControllerRemoteCluster]),
}).SetupWithManager(mgr); err != nil {
return fmt.Errorf("unable to inject controller %s: %v", ControllerRemoteCluster, err)
}

if err = mgr.Add(&RemoteClusterStatusChecker{
Client: mgr.GetClient(),
Logger: mgr.GetLogger().WithName("checker").WithName(CheckerRemoteClusterStatus),
CheckPeriod: 30 * time.Second,
DaemonHub: daemonHub,
Checker: clusterStatusChecker,
Event: clusterCheckEvent,
Recorder: mgr.GetEventRecorderFor(CheckerRemoteClusterStatus + "Checker"),
Client: mgr.GetClient(),
Logger: mgr.GetLogger().WithName("checker").WithName(CheckerRemoteClusterStatus),
CheckPeriod: 30 * time.Second,
DaemonHub: daemonHub,
Checker: clusterStatusChecker,
ClusterStatusCheckChan: clusterStatusCheckChan,
Recorder: mgr.GetEventRecorderFor(CheckerRemoteClusterStatus + "Checker"),
Concurrency: concurrency.ControllerConcurrency(options.ConcurrencyMap[CheckerRemoteClusterStatus]),
}); err != nil {
return fmt.Errorf("unable to inject checker %s: %v", CheckerRemoteClusterStatus, err)
}
Expand Down
8 changes: 2 additions & 6 deletions pkg/controllers/multicluster/remotecluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ type RemoteClusterReconciler struct {

DaemonHub managerruntime.DaemonHub

Event chan<- ClusterCheckEvent
ClusterStatusCheckChan chan<- string

LocalManager manager.Manager

Expand Down Expand Up @@ -184,11 +184,7 @@ func (r *RemoteClusterReconciler) guardDaemon(ctx context.Context, name string,
}

// event checker to check and run this cluster
r.Event <- ClusterCheckEvent{
Context: ctx,
Name: name,
DaemonID: daemonID,
}
r.ClusterStatusCheckChan <- name
return nil
}

Expand Down
128 changes: 96 additions & 32 deletions pkg/controllers/multicluster/remoteclusterstatus_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,22 @@ package multicluster
import (
"context"
"fmt"
"sync"
"time"

"github.com/go-logr/logr"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
"k8s.io/client-go/tools/record"
"k8s.io/client-go/util/workqueue"
"sigs.k8s.io/controller-runtime/pkg/client"
"sigs.k8s.io/controller-runtime/pkg/controller/controllerutil"

multiclusterv1 "github.com/alibaba/hybridnet/pkg/apis/multicluster/v1"
"github.com/alibaba/hybridnet/pkg/controllers/concurrency"
"github.com/alibaba/hybridnet/pkg/controllers/multicluster/clusterchecker"
"github.com/alibaba/hybridnet/pkg/controllers/utils"
"github.com/alibaba/hybridnet/pkg/managerruntime"
Expand All @@ -45,41 +50,97 @@ const (

type RemoteClusterStatusChecker struct {
client.Client
Logger logr.Logger

CheckPeriod time.Duration

DaemonHub managerruntime.DaemonHub

Checker clusterchecker.Checker
Logger logr.Logger
Recorder record.EventRecorder

Event <-chan ClusterCheckEvent
CheckPeriod time.Duration
Checker clusterchecker.Checker
ClusterStatusCheckChan <-chan string
Queue workqueue.RateLimitingInterface
DaemonHub managerruntime.DaemonHub

Recorder record.EventRecorder
Concurrency concurrency.ControllerConcurrency
}

func (r *RemoteClusterStatusChecker) Start(ctx context.Context) error {
r.Logger.Info("remote cluster status checker is starting")

ticker := time.NewTicker(r.CheckPeriod)
// initialize work queue if nil
if r.Queue == nil {
r.Queue = workqueue.NewNamedRateLimitingQueue(
workqueue.NewMaxOfRateLimiter(
workqueue.NewItemExponentialFailureRateLimiter(time.Second, r.CheckPeriod),
&workqueue.BucketRateLimiter{
Limiter: rate.NewLimiter(rate.Limit(10), 100),
},
), CheckerRemoteClusterStatus)
}

go func() {
<-ctx.Done()
r.Queue.ShutDown()
}()

// launch checking workers to process resources
r.Logger.Info("starting checking workers", "count", r.Concurrency.Max())

// staring workers based on concurrency
wg := &sync.WaitGroup{}
wg.Add(r.Concurrency.Max())
for i := 0; i < r.Concurrency.Max(); i++ {
go func() {
defer wg.Done()
for r.processNext(ctx) {
}
}()
}

ticker := time.NewTicker(r.CheckPeriod)
for {
select {
case <-ticker.C:
r.Logger.V(1).Info("cron job for all clusters")
r.crontab(ctx)
case event := <-r.Event:
r.Logger.Info("single job for one cluster registration")
r.checkClusterStatus(event.Context, event.Name, event.DaemonID)
r.Logger.V(1).Info("all clusters check from cronjob")
r.enqueueAll(ctx)
case clusterName := <-r.ClusterStatusCheckChan:
r.Logger.Info("single cluster check event from channel", "cluster", clusterName)
r.enqueue(clusterName)
case <-ctx.Done():
ticker.Stop()
r.Logger.Info("remote cluster status checker is stopping")
r.Logger.Info("remote cluster status checker is stopping, waiting for all workers to finish")
wg.Wait()
r.Logger.Info("all checking workers finished")
return nil
}
}
}

func (r *RemoteClusterStatusChecker) crontab(ctx context.Context) {
func (r *RemoteClusterStatusChecker) processNext(ctx context.Context) bool {
obj, shutdown := r.Queue.Get()
if shutdown {
return false
}

defer r.Queue.Done(obj)

clusterName, ok := obj.(string)
if !ok {
r.Logger.Error(nil, "cluster name is not a valid string", "type", fmt.Sprintf("%T", obj))
r.Queue.Forget(obj)
return true
}

if err := r.checkClusterStatus(ctx, clusterName); err != nil {
r.Logger.V(1).Info("requeue cluster", "cluster", clusterName)
r.Queue.AddRateLimited(clusterName)
return true
}

r.Queue.Forget(obj)
return true
}

func (r *RemoteClusterStatusChecker) enqueueAll(ctx context.Context) {
defer utilruntime.HandleCrash()

remoteClusterList, err := utils.ListRemoteClusters(ctx, r)
Expand All @@ -88,33 +149,33 @@ func (r *RemoteClusterStatusChecker) crontab(ctx context.Context) {
return
}

nameDaemonIDMap := make(map[string]managerruntime.DaemonID)

for i := range remoteClusterList.Items {
var remoteCluster = &remoteClusterList.Items[i]
if len(remoteCluster.Status.UUID) > 0 {
nameDaemonIDMap[remoteCluster.Name] = managerruntime.DaemonID(remoteCluster.Status.UUID)
}
}

r.Logger.V(1).Info("check remote cluster status periodically", "clusters", nameDaemonIDMap)

for name, id := range nameDaemonIDMap {
r.checkClusterStatus(ctx, name, id)
r.enqueue(remoteClusterList.Items[i].Name)
}
}

func (r *RemoteClusterStatusChecker) enqueue(name string) {
r.Queue.Add(name)
}

func (r *RemoteClusterStatusChecker) checkClusterStatus(ctx context.Context, name string, daemonID managerruntime.DaemonID) {
func (r *RemoteClusterStatusChecker) checkClusterStatus(ctx context.Context, name string) error {
start := time.Now()
defer func() {
metrics.RemoteClusterStatusCheckDuration.WithLabelValues(name).Observe(time.Since(start).Seconds())
}()

remoteCluster, err := utils.GetRemoteCluster(ctx, r, name)
if err != nil {
// TODO: handle fetch error here
return
if errors.IsNotFound(err) {
// if cluster not exist, just ignore it
return nil
}
return fmt.Errorf("fail to get remote cluster: %v", err)
}

daemonID := managerruntime.DaemonID(remoteCluster.Status.UUID)
if len(daemonID) == 0 {
return fmt.Errorf("unexpected empty cluster uuid")
}

_, err = controllerutil.CreateOrPatch(ctx, r, remoteCluster, func() (err error) {
Expand Down Expand Up @@ -212,8 +273,11 @@ func (r *RemoteClusterStatusChecker) checkClusterStatus(ctx context.Context, nam

if err != nil {
r.Recorder.Event(remoteCluster, corev1.EventTypeWarning, "CheckStatusFail", err.Error())
r.Logger.Error(err, "unable to check cluster status", "RemoteCluster", name)
r.Logger.Error(err, "fail to check cluster status", "cluster", name)
} else {
r.Logger.V(1).Info("check cluster status successfully", "cluster", name)
}
return err
}

func (r *RemoteClusterStatusChecker) getManagerRuntimeByDaemonID(daemonID managerruntime.DaemonID) (managerruntime.ManagerRuntime, error) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/controllers/networking/ipam_controller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ var _ = Describe("IPAM controller integration test suite", func() {
defer manager.RUnlock()
return len(manager.Networks)
}()).
WithTimeout(10 * time.Second).
WithTimeout(30 * time.Second).
WithPolling(time.Second).
Should(Equal(2))

Expand Down

0 comments on commit ecc26a4

Please sign in to comment.