Skip to content

Commit e642573

Browse files
committed
Build a single hub manager command
Signed-off-by: Jian Qiu <[email protected]>
1 parent 4cb6e38 commit e642573

File tree

6 files changed

+340
-54
lines changed

6 files changed

+340
-54
lines changed

cmd/registration-operator/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func newNucleusCommand() *cobra.Command {
4747
}
4848

4949
cmd.AddCommand(hub.NewHubOperatorCmd())
50+
cmd.AddCommand(hub.NewHubManagerCmd())
5051
cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
5152
cmd.AddCommand(spoke.NewKlusterletAgentCmd())
5253

pkg/cmd/hub/operator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
1010
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
11+
"open-cluster-management.io/ocm/pkg/singleton/hub"
1112
"open-cluster-management.io/ocm/pkg/version"
1213
)
1314

@@ -33,3 +34,16 @@ func NewHubOperatorCmd() *cobra.Command {
3334
opts.AddFlags(flags)
3435
return cmd
3536
}
37+
38+
func NewHubManagerCmd() *cobra.Command {
39+
opts := hub.NewHubOption()
40+
commonOpts := opts.CommonOption
41+
cmd := commonOpts.NewControllerCommandConfig("hub-manager", version.Get(), opts.RunManager, clock.RealClock{}).
42+
NewCommandWithContext(context.TODO())
43+
cmd.Use = "hub-manager"
44+
cmd.Short = "Start the hub manager"
45+
46+
flags := cmd.Flags()
47+
opts.AddFlags(flags)
48+
return cmd
49+
}

pkg/registration/hub/manager.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,7 @@ func (m *HubManagerOptions) RunControllerManagerWithInformers(
349349
go workInformers.Start(ctx.Done())
350350
go kubeInformers.Start(ctx.Done())
351351
go addOnInformers.Start(ctx.Done())
352-
if features.HubMutableFeatureGate.Enabled(ocmfeature.DefaultClusterSet) {
352+
if features.HubMutableFeatureGate.Enabled(ocmfeature.ClusterProfile) {
353353
go clusterProfileInformers.Start(ctx.Done())
354354
}
355355

pkg/singleton/hub/manager.go

Lines changed: 176 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,176 @@
1+
package hub
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/openshift/library-go/pkg/controller/controllercmd"
8+
"github.com/spf13/pflag"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/dynamic"
11+
"k8s.io/client-go/dynamic/dynamicinformer"
12+
kubeinformers "k8s.io/client-go/informers"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/client-go/metadata"
15+
"k8s.io/client-go/rest"
16+
"k8s.io/klog/v2"
17+
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
18+
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"
19+
20+
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
21+
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
22+
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
23+
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
24+
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
25+
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
26+
clusterv1 "open-cluster-management.io/api/cluster/v1"
27+
ocmfeature "open-cluster-management.io/api/feature"
28+
29+
"open-cluster-management.io/ocm/pkg/addon"
30+
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
31+
"open-cluster-management.io/ocm/pkg/features"
32+
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
33+
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
34+
workhub "open-cluster-management.io/ocm/pkg/work/hub"
35+
)
36+
37+
type HubOption struct {
38+
CommonOption *commonoptions.Options
39+
registrationOption *registrationhub.HubManagerOptions
40+
workOption *workhub.WorkHubManagerOptions
41+
}
42+
43+
func NewHubOption() *HubOption {
44+
return &HubOption{
45+
CommonOption: commonoptions.NewOptions(),
46+
registrationOption: registrationhub.NewHubManagerOptions(),
47+
workOption: workhub.NewWorkHubManagerOptions(),
48+
}
49+
}
50+
51+
func (o *HubOption) AddFlags(fs *pflag.FlagSet) {
52+
o.CommonOption.AddFlags(fs)
53+
o.registrationOption.AddFlags(fs)
54+
o.workOption.AddFlags(fs)
55+
}
56+
57+
func (o *HubOption) RunManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
58+
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
59+
if err != nil {
60+
return err
61+
}
62+
63+
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
64+
if err != nil {
65+
return err
66+
}
67+
68+
// copy a separate config for gc controller and increase the gc controller's throughput.
69+
metadataKubeConfig := rest.CopyConfig(controllerContext.KubeConfig)
70+
metadataKubeConfig.QPS = controllerContext.KubeConfig.QPS * 2
71+
metadataKubeConfig.Burst = controllerContext.KubeConfig.Burst * 2
72+
metadataClient, err := metadata.NewForConfig(metadataKubeConfig)
73+
if err != nil {
74+
return err
75+
}
76+
77+
clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
78+
if err != nil {
79+
return err
80+
}
81+
82+
clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
83+
if err != nil {
84+
return err
85+
}
86+
87+
workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
88+
if err != nil {
89+
return err
90+
}
91+
92+
addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
93+
if err != nil {
94+
return err
95+
}
96+
97+
clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
98+
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
99+
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
100+
replicaSetInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
101+
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
102+
func(listOptions *metav1.ListOptions) {
103+
// Note all kube resources managed by registration should have the cluster label, and should not have
104+
// the addon label.
105+
selector := &metav1.LabelSelector{
106+
MatchExpressions: []metav1.LabelSelectorRequirement{
107+
{
108+
Key: clusterv1.ClusterNameLabelKey,
109+
Operator: metav1.LabelSelectorOpExists,
110+
},
111+
},
112+
}
113+
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
114+
}))
115+
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)
116+
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 30*time.Minute)
117+
118+
// Create an error channel to collect errors from controllers
119+
errCh := make(chan error, 4)
120+
121+
// start registration controller
122+
go func() {
123+
err := o.registrationOption.RunControllerManagerWithInformers(
124+
ctx, controllerContext,
125+
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
126+
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers)
127+
if err != nil {
128+
klog.Errorf("failed to start registration controller: %v", err)
129+
errCh <- err
130+
}
131+
}()
132+
133+
// start placement controller
134+
go func() {
135+
err := placementcontrollers.RunControllerManagerWithInformers(
136+
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
137+
if err != nil {
138+
klog.Errorf("failed to start placement controller: %v", err)
139+
errCh <- err
140+
}
141+
}()
142+
143+
// start addon controller
144+
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
145+
go func() {
146+
err := addon.RunControllerManagerWithInformers(
147+
ctx, controllerContext, kubeClient, addOnClient, workClient,
148+
clusterInformers, addOnInformers, workInformers, dynamicInformers)
149+
if err != nil {
150+
klog.Errorf("failed to start addon controller: %v", err)
151+
errCh <- err
152+
}
153+
}()
154+
}
155+
156+
// start work controller
157+
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
158+
go func() {
159+
hubConfig := workhub.NewWorkHubManagerConfig(o.workOption)
160+
err := hubConfig.RunControllerManagerWithInformers(
161+
ctx, controllerContext, workClient, replicaSetInformers, workInformers, clusterInformers)
162+
if err != nil {
163+
klog.Errorf("failed to start work controller: %v", err)
164+
errCh <- err
165+
}
166+
}()
167+
}
168+
169+
// Wait for context cancellation or first error
170+
select {
171+
case <-ctx.Done():
172+
return nil
173+
case err := <-errCh:
174+
return err
175+
}
176+
}

pkg/singleton/hub/manager_test.go

Lines changed: 110 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,110 @@
1+
package hub
2+
3+
import (
4+
"context"
5+
"path/filepath"
6+
"testing"
7+
"time"
8+
9+
"github.com/onsi/ginkgo/v2"
10+
"github.com/onsi/gomega"
11+
"github.com/openshift/library-go/pkg/controller/controllercmd"
12+
"k8s.io/client-go/kubernetes/scheme"
13+
"k8s.io/client-go/rest"
14+
"sigs.k8s.io/controller-runtime/pkg/envtest"
15+
logf "sigs.k8s.io/controller-runtime/pkg/log"
16+
"sigs.k8s.io/controller-runtime/pkg/log/zap"
17+
18+
ocmfeature "open-cluster-management.io/api/feature"
19+
workapiv1 "open-cluster-management.io/api/work/v1"
20+
21+
"open-cluster-management.io/ocm/pkg/features"
22+
"open-cluster-management.io/ocm/test/integration/util"
23+
)
24+
25+
var testEnv *envtest.Environment
26+
var sourceConfigFileName string
27+
var cfg *rest.Config
28+
29+
var CRDPaths = []string{
30+
// hub
31+
"../../../vendor/open-cluster-management.io/api/work/v1/0000_00_work.open-cluster-management.io_manifestworks.crd.yaml",
32+
"../../../vendor/open-cluster-management.io/api/work/v1alpha1/0000_00_work.open-cluster-management.io_manifestworkreplicasets.crd.yaml",
33+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1"),
34+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta1"),
35+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1beta2"),
36+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "cluster", "v1alpha1"),
37+
filepath.Join("../../../", "vendor", "open-cluster-management.io", "api", "addon", "v1alpha1"),
38+
}
39+
40+
func TestWorkManager(t *testing.T) {
41+
gomega.RegisterFailHandler(ginkgo.Fail)
42+
ginkgo.RunSpecs(t, "Singleton Hub Manager Suite")
43+
}
44+
45+
var _ = ginkgo.BeforeSuite(func() {
46+
logf.SetLogger(zap.New(zap.WriteTo(ginkgo.GinkgoWriter), zap.UseDevMode(true)))
47+
ginkgo.By("bootstrapping test environment")
48+
var err error
49+
50+
// start a kube-apiserver
51+
testEnv = &envtest.Environment{
52+
ErrorIfCRDPathMissing: true,
53+
CRDDirectoryPaths: CRDPaths,
54+
}
55+
cfg, err = testEnv.Start()
56+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
57+
gomega.Expect(cfg).ToNot(gomega.BeNil())
58+
59+
err = workapiv1.Install(scheme.Scheme)
60+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
61+
62+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubRegistrationFeatureGates)
63+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
64+
65+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubWorkFeatureGates)
66+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
67+
68+
err = features.HubMutableFeatureGate.Add(ocmfeature.DefaultHubAddonManagerFeatureGates)
69+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
70+
71+
// enable ManagedClusterAutoApproval feature gate
72+
err = features.HubMutableFeatureGate.Set("ManagedClusterAutoApproval=true")
73+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
74+
75+
// enable resourceCleanup feature gate
76+
err = features.HubMutableFeatureGate.Set("ResourceCleanup=true")
77+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
78+
79+
err = features.HubMutableFeatureGate.Set("ManifestWorkReplicaSet=true")
80+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
81+
})
82+
83+
var _ = ginkgo.AfterSuite(func() {
84+
ginkgo.By("tearing down the test environment")
85+
86+
err := testEnv.Stop()
87+
gomega.Expect(err).ToNot(gomega.HaveOccurred())
88+
})
89+
90+
var _ = ginkgo.Describe("start hub manager", func() {
91+
ginkgo.It("start hub manager", func() {
92+
ctx, stopHub := context.WithCancel(context.Background())
93+
opts := NewHubOption()
94+
opts.workOption.WorkDriver = "kube"
95+
opts.workOption.WorkDriverConfig = sourceConfigFileName
96+
opts.registrationOption.ClusterAutoApprovalUsers = []string{util.AutoApprovalBootstrapUser}
97+
98+
// start hub controller
99+
go func() {
100+
err := opts.RunManager(ctx, &controllercmd.ControllerContext{
101+
KubeConfig: cfg,
102+
EventRecorder: util.NewIntegrationTestEventRecorder("hub"),
103+
})
104+
gomega.Expect(err).NotTo(gomega.HaveOccurred())
105+
}()
106+
107+
time.Sleep(5 * time.Second)
108+
stopHub()
109+
})
110+
})

0 commit comments

Comments
 (0)