Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions cmd/registration-operator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ func newNucleusCommand() *cobra.Command {
}

cmd.AddCommand(hub.NewHubOperatorCmd())
cmd.AddCommand(hub.NewHubManagerCmd())
cmd.AddCommand(hub.NewWebhookCmd())
Comment on lines +50 to +51
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What commands are these replacing?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking to introduce a singleton mode in the clustermanager also, so we do not deploy registration/placement /work controller and webhooks separately, but only two deployment: a single controller and a webhook.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh gotcha. It would be helpful to have some comments or command descriptions that explain that. As a newcomer to the project, it is often difficult to tell which command/manager is which for spoke singleton vs. separate.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, that is a good suggestion.

cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
cmd.AddCommand(spoke.NewKlusterletAgentCmd())

Expand Down
36 changes: 36 additions & 0 deletions pkg/cmd/hub/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (

"github.com/spf13/cobra"
"k8s.io/utils/clock"
ctrl "sigs.k8s.io/controller-runtime"

commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
"open-cluster-management.io/ocm/pkg/singleton/hub"
"open-cluster-management.io/ocm/pkg/version"
)

Expand All @@ -33,3 +35,37 @@ func NewHubOperatorCmd() *cobra.Command {
opts.AddFlags(flags)
return cmd
}

func NewHubManagerCmd() *cobra.Command {
opts := hub.NewHubOption()
commonOpts := opts.CommonOption
cmd := commonOpts.NewControllerCommandConfig("hub-manager", version.Get(), opts.RunManager, clock.RealClock{}).
NewCommandWithContext(context.TODO())
cmd.Use = "hub-manager"
cmd.Short = "Start the hub manager"

flags := cmd.Flags()
opts.AddFlags(flags)
return cmd
}

func NewWebhookCmd() *cobra.Command {
webhookOptions := commonoptions.NewWebhookOptions()
opts := hub.NewWebhookOptions()
cmd := &cobra.Command{
Use: "webhook-server",
Short: "Start the registration webhook server",
RunE: func(c *cobra.Command, args []string) error {
if err := opts.SetupWebhookServer(webhookOptions); err != nil {
return err
}
return webhookOptions.RunWebhookServer(ctrl.SetupSignalHandler())
},
}

flags := cmd.Flags()
opts.AddFlags(flags)
webhookOptions.AddFlags(flags)

return cmd
}
176 changes: 176 additions & 0 deletions pkg/singleton/hub/manager.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,176 @@
package hub

import (
"context"
"time"

"github.com/openshift/library-go/pkg/controller/controllercmd"
"github.com/spf13/pflag"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/dynamic/dynamicinformer"
kubeinformers "k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/metadata"
"k8s.io/client-go/rest"
"k8s.io/klog/v2"
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"

addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
clusterv1 "open-cluster-management.io/api/cluster/v1"
ocmfeature "open-cluster-management.io/api/feature"

"open-cluster-management.io/ocm/pkg/addon"
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
"open-cluster-management.io/ocm/pkg/features"
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
workhub "open-cluster-management.io/ocm/pkg/work/hub"
)

type HubOption struct {
CommonOption *commonoptions.Options
registrationOption *registrationhub.HubManagerOptions
workOption *workhub.WorkHubManagerOptions
}

func NewHubOption() *HubOption {
return &HubOption{
CommonOption: commonoptions.NewOptions(),
registrationOption: registrationhub.NewHubManagerOptions(),
workOption: workhub.NewWorkHubManagerOptions(),
}
}

func (o *HubOption) AddFlags(fs *pflag.FlagSet) {
o.CommonOption.AddFlags(fs)
o.registrationOption.AddFlags(fs)
o.workOption.AddFlags(fs)
}

func (o *HubOption) RunManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

// copy a separate config for gc controller and increase the gc controller's throughput.
metadataKubeConfig := rest.CopyConfig(controllerContext.KubeConfig)
metadataKubeConfig.QPS = controllerContext.KubeConfig.QPS * 2
metadataKubeConfig.Burst = controllerContext.KubeConfig.Burst * 2
metadataClient, err := metadata.NewForConfig(metadataKubeConfig)
if err != nil {
return err
}

clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
if err != nil {
return err
}

clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
replicaSetInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
func(listOptions *metav1.ListOptions) {
// Note all kube resources managed by registration should have the cluster label, and should not have
// the addon label.
selector := &metav1.LabelSelector{
MatchExpressions: []metav1.LabelSelectorRequirement{
{
Key: clusterv1.ClusterNameLabelKey,
Operator: metav1.LabelSelectorOpExists,
},
},
}
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
}))
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 30*time.Minute)

// Create an error channel to collect errors from controllers
errCh := make(chan error, 4)

// start registration controller
go func() {
err := o.registrationOption.RunControllerManagerWithInformers(
ctx, controllerContext,
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers)
if err != nil {
klog.Errorf("failed to start registration controller: %v", err)
errCh <- err
}
}()

// start placement controller
go func() {
err := placementcontrollers.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
if err != nil {
klog.Errorf("failed to start placement controller: %v", err)
errCh <- err
}
}()

// start addon controller
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
go func() {
err := addon.RunControllerManagerWithInformers(
ctx, controllerContext, kubeClient, addOnClient, workClient,
clusterInformers, addOnInformers, workInformers, dynamicInformers)
if err != nil {
klog.Errorf("failed to start addon controller: %v", err)
errCh <- err
}
}()
}

// start work controller
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
go func() {
hubConfig := workhub.NewWorkHubManagerConfig(o.workOption)
err := hubConfig.RunControllerManagerWithInformers(
ctx, controllerContext, workClient, replicaSetInformers, workInformers, clusterInformers)
if err != nil {
klog.Errorf("failed to start work controller: %v", err)
errCh <- err
}
}()
}

// Wait for context cancellation or first error
select {
case <-ctx.Done():
return nil
case err := <-errCh:
return err
}
}
110 changes: 110 additions & 0 deletions pkg/singleton/hub/manager_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package hub

import (
"context"
"path/filepath"
"testing"
"time"

"github.com/onsi/ginkgo/v2"
"github.com/onsi/gomega"
"github.com/openshift/library-go/pkg/controller/controllercmd"
"k8s.io/client-go/kubernetes/scheme"
"k8s.io/client-go/rest"
"sigs.k8s.io/controller-runtime/pkg/envtest"
logf "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/log/zap"

ocmfeature "open-cluster-management.io/api/feature"
workapiv1 "open-cluster-management.io/api/work/v1"

"open-cluster-management.io/ocm/pkg/features"
"open-cluster-management.io/ocm/test/integration/util"
)

var testEnv *envtest.Environment
var sourceConfigFileName string
var cfg *rest.Config

var CRDPaths = []string{
// hub
"../../../vendor/open-cluster-management.io/api/work/v1/0000_00_work.open-cluster-management.io_manifestworks.crd.yaml",
"../../../vendor/open-cluster-management.io/api/work/v1alpha1/0000_00_work.open-cluster-management.io_manifestworkreplicasets.crd.yaml",
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1"),
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta1"),
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta2"),
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1alpha1"),
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "addon", "v1alpha1"),
}

func TestWorkManager(t *testing.T) {
gomega.RegisterFailHandler(ginkgo.Fail)
ginkgo.RunSpecs(t, "Singleton Hub Manager Suite")
}

var _ = ginkgo.BeforeSuite(func() {
logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true)))
ginkgo.By("bootstrapping test environment")
var err error

// start a kube-apiserver
testEnv = &envtest.Environment{
ErrorIfCRDPathMissing: true,
CRDDirectoryPaths: CRDPaths,
}
cfg, err = testEnv.Start()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
gomega.Expect(cfg).ToNot(gomega.BeNil())

err = workapiv1.Install(scheme.Scheme)
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubRegistrationFeatureGates)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubWorkFeatureGates)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubAddonManagerFeatureGates)
gomega.Expect(err).ToNot(gomega.HaveOccurred())

// enable ManagedClusterAutoApproval feature gate
err = features.HubMutableFeatureGate.Set("ManagedClusterAutoApproval=true")
gomega.Expect(err).NotTo(gomega.HaveOccurred())

// enable resourceCleanup feature gate
err = features.HubMutableFeatureGate.Set("ResourceCleanup=true")
gomega.Expect(err).NotTo(gomega.HaveOccurred())

err = features.HubMutableFeatureGate.Set("ManifestWorkReplicaSet=true")
gomega.Expect(err).NotTo(gomega.HaveOccurred())
})

var _ = ginkgo.AfterSuite(func() {
ginkgo.By("tearing down the test environment")

err := testEnv.Stop()
gomega.Expect(err).ToNot(gomega.HaveOccurred())
})

var _ = ginkgo.Describe("start hub manager", func() {
ginkgo.It("start hub manager", func() {
ctx, stopHub := context.WithCancel(context.Background())
opts := NewHubOption()
opts.workOption.WorkDriver = "kube"
opts.workOption.WorkDriverConfig = sourceConfigFileName
opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}

// start hub controller
go func() {
err := opts.RunManager(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}()

time.Sleep(5 * time.Second)
stopHub()
})
})
Comment on lines +90 to +110
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🛠️ Refactor suggestion

Improve test reliability and verification.

The test has several areas for improvement:

  1. Arbitrary sleep duration: The 5-second sleep is arbitrary and could make tests flaky in CI environments.
  2. Lack of startup verification: The test doesn't verify that the manager actually started successfully before stopping it.
  3. Minimal verification: Only checks that no error occurred during startup/shutdown.

Consider this improved approach:

 var _ = ginkgo.Describe("start hub manager", func() {
-	ginkgo.It("start hub manager", func() {
+	ginkgo.It("should start and stop hub manager successfully", func() {
 		ctx, stopHub := context.WithCancel(context.Background())
+		defer stopHub()
+		
 		opts := NewHubOption()
 		opts.workOption.WorkDriver = "kube"
 		opts.workOption.WorkDriverConfig = sourceConfigFileName
 		opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}

+		// Channel to capture startup errors
+		errChan := make(chan error, 1)
+		
 		// start hub controller
 		go func() {
 			err := opts.RunManager(ctx, &controllercmd.ControllerContext{
 				KubeConfig:    cfg,
 				EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
 			})
-			gomega.Expect(err).NotTo(gomega.HaveOccurred())
+			errChan <- err
 		}()

-		time.Sleep(5 * time.Second)
-		stopHub()
+		// Wait for either startup completion or timeout
+		select {
+		case err := <-errChan:
+			if err != nil {
+				ginkgo.Fail("Hub manager failed to start: " + err.Error())
+			}
+		case <-time.After(10 * time.Second):
+			// Manager started successfully, now stop it
+			stopHub()
+			// Wait for graceful shutdown
+			select {
+			case err := <-errChan:
+				gomega.Expect(err).To(gomega.Or(gomega.BeNil(), gomega.MatchError("context canceled")))
+			case <-time.After(5 * time.Second):
+				ginkgo.Fail("Hub manager did not shutdown gracefully")
+			}
+		}
 	})
 })
📝 Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.

Suggested change
var _ = ginkgo.Describe("start hub manager", func() {
ginkgo.It("start hub manager", func() {
ctx, stopHub := context.WithCancel(context.Background())
opts := NewHubOption()
opts.workOption.WorkDriver = "kube"
opts.workOption.WorkDriverConfig = sourceConfigFileName
opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}
// start hub controller
go func() {
err := opts.RunManager(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
})
gomega.Expect(err).NotTo(gomega.HaveOccurred())
}()
time.Sleep(5 * time.Second)
stopHub()
})
})
var _ = ginkgo.Describe("start hub manager", func() {
ginkgo.It("should start and stop hub manager successfully", func() {
ctx, stopHub := context.WithCancel(context.Background())
defer stopHub()
opts := NewHubOption()
opts.workOption.WorkDriver = "kube"
opts.workOption.WorkDriverConfig = sourceConfigFileName
opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}
// Channel to capture startup errors
errChan := make(chan error, 1)
// start hub controller
go func() {
err := opts.RunManager(ctx, &controllercmd.ControllerContext{
KubeConfig: cfg,
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
})
errChan <- err
}()
// Wait for either startup completion or timeout
select {
case err := <-errChan:
if err != nil {
ginkgo.Fail("Hub manager failed to start: " + err.Error())
}
case <-time.After(10 * time.Second):
// Manager started successfully, now stop it
stopHub()
// Wait for graceful shutdown
select {
case err := <-errChan:
gomega.Expect(err).To(gomega.Or(gomega.BeNil(), gomega.MatchError("context canceled")))
case <-time.After(5 * time.Second):
ginkgo.Fail("Hub manager did not shutdown gracefully")
}
}
})
})
🤖 Prompt for AI Agents
In pkg/singleton/hub/manager_test.go around lines 92 to 112, replace the
arbitrary 5-second sleep with a synchronization mechanism such as a channel or
wait group to confirm the manager has started successfully before proceeding.
Add explicit verification steps to ensure the manager is running as expected,
not just that no error occurred. This can include checking for expected side
effects or states after startup. Finally, enhance the test assertions to cover
both startup and shutdown phases more robustly to improve reliability and reduce
flakiness in CI.

36 changes: 36 additions & 0 deletions pkg/singleton/hub/webhook.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
package hub

import (
"github.com/spf13/pflag"

commonoptions "open-cluster-management.io/ocm/pkg/common/options"
registrationwebhook "open-cluster-management.io/ocm/pkg/registration/webhook"
workwebhook "open-cluster-management.io/ocm/pkg/work/webhook"
)

// Config contains the server (the webhook) cert and key.
type WebhookOptions struct {
workWebhookOptions *workwebhook.Options
}

// NewWebhookOptions constructs a new set of default options for webhook.
func NewWebhookOptions() *WebhookOptions {
return &WebhookOptions{
workWebhookOptions: workwebhook.NewOptions(),
}
}

func (c *WebhookOptions) AddFlags(fs *pflag.FlagSet) {
c.workWebhookOptions.AddFlags(fs)
}

func (c *WebhookOptions) SetupWebhookServer(opts *commonoptions.WebhookOptions) error {
if err := registrationwebhook.SetupWebhookServer(opts); err != nil {
return err
}
if err := c.workWebhookOptions.SetupWebhookServer(opts); err != nil {
return err
}

return nil
}
Loading
Loading