Skip to content

Commit

Permalink
Fix scout heartbeat (#354)
Browse files Browse the repository at this point in the history
* fix scout heartbeat
  • Loading branch information
weilaaa committed Nov 22, 2023
1 parent 7358d91 commit e2d5e1f
Show file tree
Hide file tree
Showing 20 changed files with 250 additions and 64 deletions.
17 changes: 17 additions & 0 deletions cmd/cube/app/options/flags/ctrlmgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,5 +38,22 @@ func init() {
Name: "allow-privileged",
Destination: &CubeOpts.CtrlMgrOpts.AllowPrivileged,
},
&cli.StringFlag{
Name: "enable-controllers",
Value: "*",
Destination: &CubeOpts.CtrlMgrOpts.EnableControllers,
},
&cli.IntFlag{
Name: "scout-wait-timeout-seconds",
Destination: &CubeOpts.CtrlMgrOpts.ScoutWaitTimeoutSeconds,
Value: 20,
Usage: "timeout wait for warden report heartbeat",
},
&cli.IntFlag{
Name: "scout-initial-delay-seconds",
Destination: &CubeOpts.CtrlMgrOpts.ScoutInitialDelaySeconds,
Value: 10,
Usage: "the time that wait for warden start",
},
}...)
}
5 changes: 5 additions & 0 deletions cmd/warden/app/options/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ var (
Value: true,
Destination: &WardenOpts.GenericWardenOpts.AllowPrivileged,
},
&cli.StringFlag{
Name: "enable-controllers",
Value: "*",
Destination: &WardenOpts.GenericWardenOpts.EnableControllers,
},

// rotate flags
&cli.StringFlag{
Expand Down
19 changes: 19 additions & 0 deletions docs/changelog.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,22 @@
# v1.7.9

## Bugfix
- Fix scout heartbeat[#354](https://github.com/kubecube-io/KubeCube/pull/354)

## Dependencies

- hnc v1.0
- nginx-ingress v0.46.0
- helm 3.5
- metrics-server v0.4.1
- elasticsearch 7.8
- kubecube-monitoring 15.4.8
- thanos 3.18.0
- logseer v1.0.0
- logagent v1.0.0
- kubecube-audit v1.2.0
- kubecube-webconsole v1.2.4

# v1.7.8

## Bugfix
Expand Down
14 changes: 8 additions & 6 deletions pkg/ctrlmgr/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,14 +17,16 @@ limitations under the License.
package ctrlmgr

type Config struct {
KubernetesConfig string

AllowPrivileged bool

LeaderElect bool

KubernetesConfig string
AllowPrivileged bool
LeaderElect bool
WebhookCert string
WebhookServerPort int
EnableControllers string
// ScoutWaitTimeoutSeconds that heartbeat not receive timeout
ScoutWaitTimeoutSeconds int
// ScoutInitialDelaySeconds the time that wait for warden start
ScoutInitialDelaySeconds int
}

func (c *Config) Validate() []error {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

v12 "github.com/kubecube-io/kubecube/pkg/apis/user/v1"
"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
"github.com/kubecube-io/kubecube/pkg/utils/constants"
)

Expand Down Expand Up @@ -111,7 +112,7 @@ func (r *ClusterRoleBindingReconciler) syncUserOnDelete(ctx context.Context, nam
return ctrl.Result{}, nil
}

func SetupClusterRoleBindingReconcilerWithManager(mgr ctrl.Manager) error {
func SetupClusterRoleBindingReconcilerWithManager(mgr ctrl.Manager, _ *options.Options) error {
r, err := newClusterRoleBindingReconciler(mgr)
if err != nil {
return err
Expand Down
3 changes: 2 additions & 1 deletion pkg/ctrlmgr/controllers/binding/rolebinding_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

v12 "github.com/kubecube-io/kubecube/pkg/apis/user/v1"
"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
"github.com/kubecube-io/kubecube/pkg/utils/constants"
)

Expand Down Expand Up @@ -129,7 +130,7 @@ func (r *RoleBindingReconciler) syncUserOnDelete(ctx context.Context, name, name
return ctrl.Result{}, nil
}

func SetupRoleBindingReconcilerWithManager(mgr ctrl.Manager) error {
func SetupRoleBindingReconcilerWithManager(mgr ctrl.Manager, _ *options.Options) error {
r, err := newRoleBindingReconciler(mgr)
if err != nil {
return err
Expand Down
23 changes: 14 additions & 9 deletions pkg/ctrlmgr/controllers/cluster/cluster_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

clusterv1 "github.com/kubecube-io/kubecube/pkg/apis/cluster/v1"
"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
"github.com/kubecube-io/kubecube/pkg/multicluster"
"github.com/kubecube-io/kubecube/pkg/utils"
"github.com/kubecube-io/kubecube/pkg/utils/kubeconfig"
Expand All @@ -54,23 +55,27 @@ type ClusterReconciler struct {
Scheme *runtime.Scheme
// todo: remove this field in the future
pivotCluster *clusterv1.Cluster

// retryQueue holds all retrying cluster that has the way to stop retrying
retryQueue sync.Map

// Affected is a channel of event.GenericEvent (see "Watching Channels" in
// https://book-v1.book.kubebuilder.io/beyond_basics/controller_watches.html) that is used to
// enqueue additional objects that need updating.
Affected chan event.GenericEvent
// WaitTimeoutSeconds that heartbeat not receive timeout
ScoutWaitTimeoutSeconds int
// InitialDelaySeconds the time that wait for warden start
ScoutInitialDelaySeconds int
}

func newReconciler(mgr manager.Manager) (*ClusterReconciler, error) {
func newReconciler(mgr manager.Manager, opts *options.Options) (*ClusterReconciler, error) {
log = clog.WithName("cluster")

r := &ClusterReconciler{
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Affected: make(chan event.GenericEvent),
Client: mgr.GetClient(),
Scheme: mgr.GetScheme(),
Affected: make(chan event.GenericEvent),
ScoutWaitTimeoutSeconds: opts.ScoutWaitTimeoutSeconds,
ScoutInitialDelaySeconds: opts.ScoutInitialDelaySeconds,
}
return r, nil
}
Expand Down Expand Up @@ -147,7 +152,7 @@ func (r *ClusterReconciler) syncCluster(ctx context.Context, cluster clusterv1.C

// generate internal cluster for current cluster and add
// it to the cache of multi cluster manager
err = multicluster.AddInternalClusterWithScout(cluster)
err = multicluster.AddInternalClusterWithScoutOpts(cluster, r.ScoutInitialDelaySeconds, r.ScoutWaitTimeoutSeconds)
if err != nil {
log.Error(err.Error())
_ = utils.UpdateClusterStatusByState(ctx, r.Client, &cluster, clusterv1.ClusterInitFailed)
Expand Down Expand Up @@ -221,8 +226,8 @@ func (r *ClusterReconciler) enqueue(cluster clusterv1.Cluster) {
}

// SetupWithManager sets up the controller with the Manager.
func SetupWithManager(mgr ctrl.Manager) error {
r, err := newReconciler(mgr)
func SetupWithManager(mgr ctrl.Manager, opts *options.Options) error {
r, err := newReconciler(mgr, opts)
if err != nil {
return err
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ import (

quotav1 "github.com/kubecube-io/kubecube/pkg/apis/quota/v1"
"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
"github.com/kubecube-io/kubecube/pkg/quota"
"github.com/kubecube-io/kubecube/pkg/quota/cube"
)
Expand Down Expand Up @@ -195,7 +196,7 @@ func (r *CubeResourceQuotaReconciler) ifUpdateUsed(hard, used v1.ResourceList) (
}

// SetupWithManager sets up the controller with the Manager.
func SetupWithManager(mgr ctrl.Manager) error {
func SetupWithManager(mgr ctrl.Manager, _ *options.Options) error {
r, err := newReconciler(mgr)
if err != nil {
return err
Expand Down
26 changes: 15 additions & 11 deletions pkg/ctrlmgr/controllers/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package controllers

import (
"errors"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"

"k8s.io/apimachinery/pkg/api/meta"
"sigs.k8s.io/controller-runtime/pkg/manager"

Expand All @@ -26,25 +28,27 @@ import (
cluster "github.com/kubecube-io/kubecube/pkg/ctrlmgr/controllers/cluster"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/controllers/quota"
user "github.com/kubecube-io/kubecube/pkg/ctrlmgr/controllers/user"
"github.com/kubecube-io/kubecube/pkg/utils/ctrlopts"
)

// todo: change set func if need

var setupFns []func(manager manager.Manager) error
var setupFns = make(ctrlopts.ControllerInitFns)

func init() {
// setup controllers
setupFns = append(setupFns, cluster.SetupWithManager)
setupFns = append(setupFns, user.SetupWithManager)
setupFns = append(setupFns, quota.SetupWithManager)
setupFns = append(setupFns, binding.SetupClusterRoleBindingReconcilerWithManager)
setupFns = append(setupFns, binding.SetupRoleBindingReconcilerWithManager)
setupFns["cluster"] = cluster.SetupWithManager
setupFns["user"] = user.SetupWithManager
setupFns["cuberesourcequota"] = quota.SetupWithManager
setupFns["clusterrolebinding"] = binding.SetupClusterRoleBindingReconcilerWithManager
setupFns["rolebinding"] = binding.SetupRoleBindingReconcilerWithManager
}

// SetupWithManager set up controllers into manager
func SetupWithManager(m manager.Manager) error {
for _, f := range setupFns {
if err := f(m); err != nil {
func SetupWithManager(m manager.Manager, controllers string, opts *options.Options) error {
for name, f := range setupFns {
if !ctrlopts.IsControllerEnabled(name, ctrlopts.ParseControllers(controllers)) {
continue
}
if err := f(m, opts); err != nil {
var kindMatchErr *meta.NoKindMatchError
if errors.As(err, &kindMatchErr) {
clog.Warn("CRD %v is not installed, its controller will dry run!", kindMatchErr.GroupKind)
Expand Down
3 changes: 2 additions & 1 deletion pkg/ctrlmgr/controllers/user/user_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/client"

userv1 "github.com/kubecube-io/kubecube/pkg/apis/user/v1"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
)

var _ reconcile.Reconciler = &UserReconciler{}
Expand Down Expand Up @@ -64,7 +65,7 @@ func (r *UserReconciler) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.
}

// SetupWithManager sets up the controller with the Manager.
func SetupWithManager(mgr ctrl.Manager) error {
func SetupWithManager(mgr ctrl.Manager, _ *options.Options) error {
r, err := newReconciler(mgr)
if err != nil {
return err
Expand Down
5 changes: 3 additions & 2 deletions pkg/ctrlmgr/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/kubecube-io/kubecube/pkg/apis"
"github.com/kubecube-io/kubecube/pkg/clog"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/controllers"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/options"
"github.com/kubecube-io/kubecube/pkg/ctrlmgr/webhooks"
"github.com/kubecube-io/kubecube/pkg/multicluster"
"github.com/kubecube-io/kubecube/pkg/utils/env"
Expand Down Expand Up @@ -75,7 +76,7 @@ func NewCtrlMgrWithOpts(options *Config) *ControllerManager {
clog.Fatal("unable to set up controller manager: %v", err)
}

syncMgr, err := multicluster.NewSyncMgrWithDefaultSetting(cfg, true)
syncMgr, err := multicluster.NewSyncMgrWithScoutSetting(cfg, options.ScoutInitialDelaySeconds, options.ScoutWaitTimeoutSeconds)
if err != nil {
clog.Fatal("unable to set up subsidiary sync manager: %v", err)
}
Expand All @@ -84,7 +85,7 @@ func NewCtrlMgrWithOpts(options *Config) *ControllerManager {
}

func (m *ControllerManager) Initialize() error {
err := controllers.SetupWithManager(m.CtrlMgr)
err := controllers.SetupWithManager(m.CtrlMgr, m.EnableControllers, &options.Options{ScoutWaitTimeoutSeconds: m.ScoutWaitTimeoutSeconds, ScoutInitialDelaySeconds: m.ScoutInitialDelaySeconds})
if err != nil {
return err
}
Expand Down
30 changes: 30 additions & 0 deletions pkg/ctrlmgr/options/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
/*
Copyright 2023 KubeCube Authors
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/

package options

// Options here for avoid import cycle, remove it as soon as we found better way.
type Options struct {
KubernetesConfig string
AllowPrivileged bool
LeaderElect bool
WebhookCert string
WebhookServerPort int
// ScoutWaitTimeoutSeconds that heartbeat not receive timeout
ScoutWaitTimeoutSeconds int
// ScoutInitialDelaySeconds the time that wait for warden start
ScoutInitialDelaySeconds int
}
18 changes: 12 additions & 6 deletions pkg/multicluster/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -334,18 +334,24 @@ func (m *MultiClustersMgr) FuzzyCopy() map[string]*FuzzyCluster {
}

// AddInternalClusterWithScout build internal cluster of cluster and add it
// to multi cluster manager with scout
// to multi cluster manager with scout by default opts.
func AddInternalClusterWithScout(cluster clusterv1.Cluster) error {
return addInternalCluster(cluster, true)
return addInternalCluster(cluster, true, 0, 0)
}

// AddInternalClusterWithScoutOpts build internal cluster of cluster and add it
// to multi cluster manager with scout opts.
func AddInternalClusterWithScoutOpts(cluster clusterv1.Cluster, scoutInitialDelaySeconds, scoutWaitTimeoutSeconds int) error {
return addInternalCluster(cluster, true, scoutInitialDelaySeconds, scoutWaitTimeoutSeconds)
}

// AddInternalCluster build internal cluster of cluster and add it
// to multi cluster manager without scout
// to multi cluster manager without scout.
func AddInternalCluster(cluster clusterv1.Cluster) error {
return addInternalCluster(cluster, false)
return addInternalCluster(cluster, false, 0, 0)
}

func addInternalCluster(cluster clusterv1.Cluster, withScout bool) error {
func addInternalCluster(cluster clusterv1.Cluster, withScout bool, scoutInitialDelaySeconds, scoutWaitTimeoutSeconds int) error {
_, err := ManagerImpl.Get(cluster.Name)
if err == nil {
// return Immediately if active internal cluster exist
Expand All @@ -362,7 +368,7 @@ func addInternalCluster(cluster clusterv1.Cluster, withScout bool) error {
if err != nil {
return err
}
c.Scout = scout.NewScout(cluster.Name, 0, 0, localCluster.Client.Direct(), c.StopCh)
c.Scout = scout.NewScout(cluster.Name, scoutInitialDelaySeconds, scoutWaitTimeoutSeconds, localCluster.Client.Direct(), c.StopCh)
}

err = ManagerImpl.Add(cluster.Name, c)
Expand Down
10 changes: 7 additions & 3 deletions pkg/multicluster/scout/scout.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ func (s *Scout) ClusterHealth() v1.ClusterState {

// Collect will scout a specified warden of cluster
func (s *Scout) Collect(ctx context.Context) {
clog.Info("watch heartbeat for cluster %v, initial delay: %v, wait timeout: %v", s.Cluster, s.InitialDelaySeconds, s.WaitTimeoutSeconds)

ticker := time.NewTicker(time.Duration(s.WaitTimeoutSeconds) * time.Second)
defer ticker.Stop()
for {
Expand Down Expand Up @@ -163,14 +165,17 @@ func (s *Scout) illWarden(ctx context.Context) {
return
}

if cluster.Status.LastHeartbeat != nil {
s.LastHeartbeat = cluster.Status.LastHeartbeat.Time
}

if !isDisconnected(cluster, s.WaitTimeoutSeconds) {
// going here means cluster heartbeat is normal

if s.clusterState != v1.ClusterNormal {
clog.Info("cluster %v connected", cluster.Name)
}

s.LastHeartbeat = cluster.Status.LastHeartbeat.Time
s.clusterState = v1.ClusterNormal
return
}
Expand All @@ -182,10 +187,9 @@ func (s *Scout) illWarden(ctx context.Context) {
state := v1.ClusterAbnormal
obj.Status.State = &state
obj.Status.Reason = reason
obj.Status.LastHeartbeat = &metav1.Time{Time: s.LastHeartbeat}
}

clog.Warn("%v, last heartbeat: %v", reason, s.LastHeartbeat)
clog.Warn("%v, last heartbeat: %v", reason, cluster.Status.LastHeartbeat.Time)

err := utils.UpdateClusterStatus(ctx, s.client, cluster, updateFn)
if err != nil {
Expand Down
Loading

0 comments on commit e2d5e1f

Please sign in to comment.