Skip to content

Commit be30ca9

Browse files
committed
Build a single hub manager command
Signed-off-by: Jian Qiu <[email protected]>
1 parent 4cb6e38 commit be30ca9

File tree

4 files changed

+216
-53
lines changed

4 files changed

+216
-53
lines changed

cmd/registration-operator/main.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ func newNucleusCommand() *cobra.Command {
4747
}
4848

4949
cmd.AddCommand(hub.NewHubOperatorCmd())
50+
cmd.AddCommand(hub.NewHubManagerCmd())
5051
cmd.AddCommand(spoke.NewKlusterletOperatorCmd())
5152
cmd.AddCommand(spoke.NewKlusterletAgentCmd())
5253

pkg/cmd/hub/operator.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88

99
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
1010
"open-cluster-management.io/ocm/pkg/operator/operators/clustermanager"
11+
"open-cluster-management.io/ocm/pkg/singleton/hub"
1112
"open-cluster-management.io/ocm/pkg/version"
1213
)
1314

@@ -33,3 +34,16 @@ func NewHubOperatorCmd() *cobra.Command {
3334
opts.AddFlags(flags)
3435
return cmd
3536
}
37+
38+
func NewHubManagerCmd() *cobra.Command {
39+
opts := hub.NewHubOption()
40+
commonOpts := opts.CommonOption
41+
cmd := commonOpts.NewControllerCommandConfig("hub-manager", version.Get(), opts.RunManager, clock.RealClock{}).
42+
NewCommandWithContext(context.TODO())
43+
cmd.Use = "hub-manager"
44+
cmd.Short = "Start the hub manager"
45+
46+
flags := cmd.Flags()
47+
opts.AddFlags(flags)
48+
return cmd
49+
}

pkg/singleton/hub/manager.go

Lines changed: 163 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,163 @@
1+
package hub
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/openshift/library-go/pkg/controller/controllercmd"
8+
"github.com/spf13/pflag"
9+
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
10+
"k8s.io/client-go/dynamic"
11+
"k8s.io/client-go/dynamic/dynamicinformer"
12+
kubeinformers "k8s.io/client-go/informers"
13+
"k8s.io/client-go/kubernetes"
14+
"k8s.io/client-go/metadata"
15+
"k8s.io/client-go/rest"
16+
"k8s.io/klog/v2"
17+
cpclientset "sigs.k8s.io/cluster-inventory-api/client/clientset/versioned"
18+
cpinformerv1alpha1 "sigs.k8s.io/cluster-inventory-api/client/informers/externalversions"
19+
20+
addonclient "open-cluster-management.io/api/client/addon/clientset/versioned"
21+
addoninformers "open-cluster-management.io/api/client/addon/informers/externalversions"
22+
clusterv1client "open-cluster-management.io/api/client/cluster/clientset/versioned"
23+
clusterv1informers "open-cluster-management.io/api/client/cluster/informers/externalversions"
24+
workv1client "open-cluster-management.io/api/client/work/clientset/versioned"
25+
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions"
26+
clusterv1 "open-cluster-management.io/api/cluster/v1"
27+
ocmfeature "open-cluster-management.io/api/feature"
28+
29+
"open-cluster-management.io/ocm/pkg/addon"
30+
commonoptions "open-cluster-management.io/ocm/pkg/common/options"
31+
"open-cluster-management.io/ocm/pkg/features"
32+
placementcontrollers "open-cluster-management.io/ocm/pkg/placement/controllers"
33+
registrationhub "open-cluster-management.io/ocm/pkg/registration/hub"
34+
workhub "open-cluster-management.io/ocm/pkg/work/hub"
35+
)
36+
37+
type HubOption struct {
38+
CommonOption *commonoptions.Options
39+
registrationOption *registrationhub.HubManagerOptions
40+
workOption *workhub.WorkHubManagerOptions
41+
}
42+
43+
func NewHubOption() *HubOption {
44+
return &HubOption{
45+
CommonOption: commonoptions.NewOptions(),
46+
registrationOption: registrationhub.NewHubManagerOptions(),
47+
workOption: workhub.NewWorkHubManagerOptions(),
48+
}
49+
}
50+
51+
func (o *HubOption) AddFlags(fs *pflag.FlagSet) {
52+
o.CommonOption.AddFlags(fs)
53+
o.registrationOption.AddFlags(fs)
54+
o.workOption.AddFlags(fs)
55+
}
56+
57+
func (o *HubOption) RunManager(ctx context.Context, controllerContext *controllercmd.ControllerContext) error {
58+
kubeClient, err := kubernetes.NewForConfig(controllerContext.KubeConfig)
59+
if err != nil {
60+
return err
61+
}
62+
63+
dynamicClient, err := dynamic.NewForConfig(controllerContext.KubeConfig)
64+
if err != nil {
65+
return err
66+
}
67+
68+
// copy a separate config for gc controller and increase the gc controller's throughput.
69+
metadataKubeConfig := rest.CopyConfig(controllerContext.KubeConfig)
70+
metadataKubeConfig.QPS = controllerContext.KubeConfig.QPS * 2
71+
metadataKubeConfig.Burst = controllerContext.KubeConfig.Burst * 2
72+
metadataClient, err := metadata.NewForConfig(metadataKubeConfig)
73+
if err != nil {
74+
return err
75+
}
76+
77+
clusterClient, err := clusterv1client.NewForConfig(controllerContext.KubeConfig)
78+
if err != nil {
79+
return err
80+
}
81+
82+
clusterProfileClient, err := cpclientset.NewForConfig(controllerContext.KubeConfig)
83+
if err != nil {
84+
return err
85+
}
86+
87+
workClient, err := workv1client.NewForConfig(controllerContext.KubeConfig)
88+
if err != nil {
89+
return err
90+
}
91+
92+
addOnClient, err := addonclient.NewForConfig(controllerContext.KubeConfig)
93+
if err != nil {
94+
return err
95+
}
96+
97+
clusterInformers := clusterv1informers.NewSharedInformerFactory(clusterClient, 30*time.Minute)
98+
clusterProfileInformers := cpinformerv1alpha1.NewSharedInformerFactory(clusterProfileClient, 30*time.Minute)
99+
workInformers := workv1informers.NewSharedInformerFactory(workClient, 30*time.Minute)
100+
kubeInfomers := kubeinformers.NewSharedInformerFactoryWithOptions(kubeClient, 30*time.Minute, kubeinformers.WithTweakListOptions(
101+
func(listOptions *metav1.ListOptions) {
102+
// Note all kube resources managed by registration should have the cluster label, and should not have
103+
// the addon label.
104+
selector := &metav1.LabelSelector{
105+
MatchExpressions: []metav1.LabelSelectorRequirement{
106+
{
107+
Key: clusterv1.ClusterNameLabelKey,
108+
Operator: metav1.LabelSelectorOpExists,
109+
},
110+
},
111+
}
112+
listOptions.LabelSelector = metav1.FormatLabelSelector(selector)
113+
}))
114+
addOnInformers := addoninformers.NewSharedInformerFactory(addOnClient, 30*time.Minute)
115+
dynamicInformers := dynamicinformer.NewDynamicSharedInformerFactory(dynamicClient, 30*time.Minute)
116+
117+
// start registration controller
118+
go func() {
119+
err := o.registrationOption.RunControllerManagerWithInformers(
120+
ctx, controllerContext,
121+
kubeClient, metadataClient, clusterClient, clusterProfileClient, addOnClient,
122+
kubeInfomers, clusterInformers, clusterProfileInformers, workInformers, addOnInformers)
123+
if err != nil {
124+
klog.Fatal(err)
125+
}
126+
}()
127+
128+
// start placement controller
129+
go func() {
130+
err := placementcontrollers.RunControllerManagerWithInformers(
131+
ctx, controllerContext, kubeClient, clusterClient, clusterInformers)
132+
if err != nil {
133+
klog.Fatal(err)
134+
}
135+
}()
136+
137+
// start addon controller
138+
if features.HubMutableFeatureGate.Enabled(ocmfeature.AddonManagement) {
139+
go func() {
140+
err := addon.RunControllerManagerWithInformers(
141+
ctx, controllerContext, kubeClient, addOnClient, workClient,
142+
clusterInformers, addOnInformers, workInformers, dynamicInformers)
143+
if err != nil {
144+
klog.Fatal(err)
145+
}
146+
}()
147+
}
148+
149+
// start work controller
150+
if features.HubMutableFeatureGate.Enabled(ocmfeature.ManifestWorkReplicaSet) {
151+
go func() {
152+
hubConfig := workhub.NewWorkHubManagerConfig(o.workOption)
153+
err := hubConfig.RunControllerManagerWithInformers(
154+
ctx, controllerContext, workClient, workInformers, workInformers, clusterInformers)
155+
if err != nil {
156+
klog.Fatal(err)
157+
}
158+
}()
159+
}
160+
161+
<-ctx.Done()
162+
return nil
163+
}

pkg/work/hub/manager.go

Lines changed: 38 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -6,13 +6,12 @@ import (
66

77
"github.com/openshift/library-go/pkg/controller/controllercmd"
88
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
9-
"k8s.io/client-go/tools/clientcmd"
109

1110
clusterclientset "open-cluster-management.io/api/client/cluster/clientset/versioned"
1211
clusterinformers "open-cluster-management.io/api/client/cluster/informers/externalversions"
1312
workclientset "open-cluster-management.io/api/client/work/clientset/versioned"
1413
workinformers "open-cluster-management.io/api/client/work/informers/externalversions"
15-
workv1informer "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
14+
workv1informers "open-cluster-management.io/api/client/work/informers/externalversions/work/v1"
1615
workapplier "open-cluster-management.io/sdk-go/pkg/apis/work/v1/applier"
1716
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/options"
1817
"open-cluster-management.io/sdk-go/pkg/cloudevents/clients/work"
@@ -47,7 +46,7 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
4746
clusterInformerFactory := clusterinformers.NewSharedInformerFactory(hubClusterClient, 30*time.Minute)
4847

4948
// build a hub work client for ManifestWorkReplicaSets
50-
replicaSetsClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
49+
workClient, err := workclientset.NewForConfig(controllerContext.KubeConfig)
5150
if err != nil {
5251
return err
5352
}
@@ -68,28 +67,38 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
6867
},
6968
)
7069

71-
var workClient workclientset.Interface
72-
var watcherStore *store.SourceInformerWatcherStore
70+
workInformers := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption)
71+
replicaSetInformers := workinformers.NewSharedInformerFactory(workClient, 30*time.Minute)
7372

74-
if c.workOptions.WorkDriver == "kube" {
75-
config := controllerContext.KubeConfig
76-
if c.workOptions.WorkDriverConfig != "" {
77-
config, err = clientcmd.BuildConfigFromFlags("", c.workOptions.WorkDriverConfig)
78-
if err != nil {
79-
return err
80-
}
81-
}
73+
return c.RunControllerManagerWithInformers(
74+
ctx,
75+
controllerContext,
76+
workClient,
77+
replicaSetInformers,
78+
workInformers,
79+
clusterInformerFactory,
80+
)
81+
}
8282

83-
workClient, err = workclientset.NewForConfig(config)
84-
if err != nil {
85-
return err
86-
}
83+
func (c *WorkHubManagerConfig) RunControllerManagerWithInformers(
84+
ctx context.Context,
85+
controllerContext *controllercmd.ControllerContext,
86+
workClient workclientset.Interface,
87+
replicaSetInformers workinformers.SharedInformerFactory,
88+
workInformers workinformers.SharedInformerFactory,
89+
clusterInformers clusterinformers.SharedInformerFactory,
90+
) error {
91+
var manifestWorkClient workclientset.Interface
92+
var manifestWorkInformer workv1informers.ManifestWorkInformer
93+
94+
if c.workOptions.WorkDriver == "kube" {
95+
manifestWorkClient = workClient
96+
manifestWorkInformer = workInformers.Work().V1().ManifestWorks()
8797
} else {
8898
// For cloudevents drivers, we build ManifestWork client that implements the
8999
// ManifestWorkInterface and ManifestWork informer based on different driver configuration.
90100
// Refer to Event Based Manifestwork proposal in enhancements repo to get more details.
91-
92-
watcherStore = store.NewSourceInformerWatcherStore(ctx)
101+
watcherStore := store.NewSourceInformerWatcherStore(ctx)
93102

94103
_, config, err := generic.NewConfigLoader(c.workOptions.WorkDriver, c.workOptions.WorkDriverConfig).
95104
LoadConfig()
@@ -106,52 +115,28 @@ func (c *WorkHubManagerConfig) RunWorkHubManager(ctx context.Context, controller
106115
return err
107116
}
108117

109-
workClient = clientHolder.WorkInterface()
110-
}
118+
manifestWorkClient = clientHolder.WorkInterface()
111119

112-
factory := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute, workInformOption)
113-
informer := factory.Work().V1().ManifestWorks()
114-
115-
// For cloudevents work client, we use the informer store as the client store
116-
if watcherStore != nil {
117-
watcherStore.SetInformer(informer.Informer())
120+
ceInformers := workinformers.NewSharedInformerFactoryWithOptions(workClient, 30*time.Minute)
121+
manifestWorkInformer = ceInformers.Work().V1().ManifestWorks()
122+
watcherStore.SetInformer(manifestWorkInformer.Informer())
118123
}
119124

120-
return RunControllerManagerWithInformers(
121-
ctx,
122-
controllerContext,
123-
replicaSetsClient,
124-
workClient,
125-
informer,
126-
clusterInformerFactory,
127-
)
128-
}
129-
130-
func RunControllerManagerWithInformers(
131-
ctx context.Context,
132-
controllerContext *controllercmd.ControllerContext,
133-
replicaSetClient workclientset.Interface,
134-
workClient workclientset.Interface,
135-
workInformer workv1informer.ManifestWorkInformer,
136-
clusterInformers clusterinformers.SharedInformerFactory,
137-
) error {
138-
replicaSetInformerFactory := workinformers.NewSharedInformerFactory(replicaSetClient, 30*time.Minute)
139-
140125
manifestWorkReplicaSetController := manifestworkreplicasetcontroller.NewManifestWorkReplicaSetController(
141126
controllerContext.EventRecorder,
142-
replicaSetClient,
143-
workapplier.NewWorkApplierWithTypedClient(workClient, workInformer.Lister()),
144-
replicaSetInformerFactory.Work().V1alpha1().ManifestWorkReplicaSets(),
145-
workInformer,
127+
workClient,
128+
workapplier.NewWorkApplierWithTypedClient(manifestWorkClient, manifestWorkInformer.Lister()),
129+
replicaSetInformers.Work().V1alpha1().ManifestWorkReplicaSets(),
130+
manifestWorkInformer,
146131
clusterInformers.Cluster().V1beta1().Placements(),
147132
clusterInformers.Cluster().V1beta1().PlacementDecisions(),
148133
)
149134

150135
go clusterInformers.Start(ctx.Done())
151-
go replicaSetInformerFactory.Start(ctx.Done())
136+
go replicaSetInformers.Start(ctx.Done())
152137
go manifestWorkReplicaSetController.Run(ctx, 5)
153138

154-
go workInformer.Informer().Run(ctx.Done())
139+
go manifestWorkInformer.Informer().Run(ctx.Done())
155140

156141
<-ctx.Done()
157142
return nil

0 commit comments

Comments
 (0)