Skip to content

Commit 530d15f

Browse files
Merge pull request #2874 from ncdc/committer/workload/namespace
🌱 Switch workload/namespace controller to use committer
2 parents 5e384c8 + 35e42f5 commit 530d15f

9 files changed

+206
-376
lines changed

pkg/reconciler/workload/namespace/namespace_controller.go

Lines changed: 42 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -18,35 +18,31 @@ package namespace
1818

1919
import (
2020
"context"
21-
"encoding/json"
2221
"fmt"
2322
"time"
2423

25-
jsonpatch "github.com/evanphx/json-patch"
2624
kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache"
2725
kcpcorev1informers "github.com/kcp-dev/client-go/informers/core/v1"
2826
kcpkubernetesclientset "github.com/kcp-dev/client-go/kubernetes"
29-
corev1listers "github.com/kcp-dev/client-go/listers/core/v1"
3027
"github.com/kcp-dev/logicalcluster/v3"
3128

3229
corev1 "k8s.io/api/core/v1"
33-
"k8s.io/apimachinery/pkg/api/equality"
3430
"k8s.io/apimachinery/pkg/api/errors"
35-
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
3631
"k8s.io/apimachinery/pkg/labels"
37-
"k8s.io/apimachinery/pkg/types"
32+
utilerrors "k8s.io/apimachinery/pkg/util/errors"
3833
"k8s.io/apimachinery/pkg/util/runtime"
3934
"k8s.io/apimachinery/pkg/util/wait"
35+
corev1client "k8s.io/client-go/kubernetes/typed/core/v1"
4036
"k8s.io/client-go/tools/cache"
4137
"k8s.io/client-go/util/workqueue"
4238
"k8s.io/klog/v2"
4339
"k8s.io/kube-openapi/pkg/util/sets"
4440

4541
"github.com/kcp-dev/kcp/pkg/logging"
4642
"github.com/kcp-dev/kcp/pkg/reconciler/apis/apiexport"
43+
"github.com/kcp-dev/kcp/pkg/reconciler/committer"
4744
schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
4845
schedulingv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/scheduling/v1alpha1"
49-
schedulingv1alpha1listers "github.com/kcp-dev/kcp/sdk/client/listers/scheduling/v1alpha1"
5046
)
5147

5248
const (
@@ -64,21 +60,20 @@ func NewController(
6460

6561
c := &controller{
6662
queue: queue,
67-
enqueueAfter: func(ns *corev1.Namespace, duration time.Duration) {
68-
key, err := kcpcache.MetaClusterNamespaceKeyFunc(ns)
69-
if err != nil {
70-
runtime.HandleError(err)
71-
return
72-
}
73-
queue.AddAfter(key, duration)
74-
},
7563

7664
kubeClusterClient: kubeClusterClient,
7765

78-
namespaceLister: namespaceInformer.Lister(),
79-
80-
placementLister: placementInformer.Lister(),
81-
placementIndexer: placementInformer.Informer().GetIndexer(),
66+
listNamespaces: func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error) {
67+
return namespaceInformer.Cluster(clusterName).Lister().List(labels.Everything())
68+
},
69+
getNamespace: func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error) {
70+
return namespaceInformer.Cluster(clusterName).Lister().Get(name)
71+
},
72+
listPlacements: func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) {
73+
return placementInformer.Cluster(clusterName).Lister().List(labels.Everything())
74+
},
75+
commit: committer.NewCommitter[*Namespace, Patcher, *NamespaceSpec, *NamespaceStatus](kubeClusterClient.CoreV1().Namespaces()),
76+
now: time.Now,
8277
}
8378

8479
// namespaceBlocklist holds a set of namespaces that should never be synced from kcp to physical clusters.
@@ -112,17 +107,24 @@ func NewController(
112107

113108
// controller.
114109
type controller struct {
115-
queue workqueue.RateLimitingInterface
116-
enqueueAfter func(*corev1.Namespace, time.Duration)
110+
queue workqueue.RateLimitingInterface
117111

118112
kubeClusterClient kcpkubernetesclientset.ClusterInterface
119113

120-
namespaceLister corev1listers.NamespaceClusterLister
121-
122-
placementLister schedulingv1alpha1listers.PlacementClusterLister
123-
placementIndexer cache.Indexer
114+
listNamespaces func(clusterName logicalcluster.Name) ([]*corev1.Namespace, error)
115+
getNamespace func(clusterName logicalcluster.Name, name string) (*corev1.Namespace, error)
116+
listPlacements func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error)
117+
commit CommitFunc
118+
now func() time.Time
124119
}
125120

121+
type Namespace = corev1.Namespace
122+
type NamespaceSpec = corev1.NamespaceSpec
123+
type NamespaceStatus = corev1.NamespaceStatus
124+
type Patcher = corev1client.NamespaceInterface
125+
type Resource = committer.Resource[*NamespaceSpec, *NamespaceStatus]
126+
type CommitFunc = func(ctx context.Context, original, updated *Resource) error
127+
126128
func (c *controller) enqueueNamespace(obj interface{}) {
127129
key, err := kcpcache.DeletionHandlingMetaClusterNamespaceKeyFunc(obj)
128130
if err != nil {
@@ -147,14 +149,14 @@ func (c *controller) enqueuePlacement(obj interface{}) {
147149
return
148150
}
149151

150-
nss, err := c.namespaceLister.Cluster(clusterName).List(labels.Everything())
152+
namespaces, err := c.listNamespaces(clusterName)
151153
if err != nil {
152154
runtime.HandleError(err)
153155
return
154156
}
155157

156158
logger := logging.WithObject(logging.WithReconciler(klog.Background(), ControllerName), obj.(*schedulingv1alpha1.Placement))
157-
for _, ns := range nss {
159+
for _, ns := range namespaces {
158160
logger = logging.WithObject(logger, ns)
159161

160162
nsKey, err := kcpcache.MetaClusterNamespaceKeyFunc(ns)
@@ -222,55 +224,29 @@ func (c *controller) process(ctx context.Context, key string) error {
222224
return nil
223225
}
224226

225-
obj, err := c.namespaceLister.Cluster(clusterName).Get(name)
227+
ns, err := c.getNamespace(clusterName, name)
226228
if err != nil {
227229
if errors.IsNotFound(err) {
228230
return nil // object deleted before we handled it
229231
}
230232
return err
231233
}
232-
old := obj
233-
obj = obj.DeepCopy()
234+
old := ns
235+
ns = ns.DeepCopy()
234236

235-
logger = logging.WithObject(logger, obj)
237+
logger = logging.WithObject(logger, ns)
236238
ctx = klog.NewContext(ctx, logger)
237239

238-
reconcileErr := c.reconcile(ctx, obj)
239-
240-
// If the object being reconciled changed as a result, update it.
241-
if !equality.Semantic.DeepEqual(old.Status, obj.Status) {
242-
oldData, err := json.Marshal(corev1.Namespace{
243-
Status: old.Status,
244-
})
245-
if err != nil {
246-
return fmt.Errorf("failed to Marshal old data for placement %s|%s: %w", clusterName, name, err)
247-
}
248-
249-
newData, err := json.Marshal(corev1.Namespace{
250-
ObjectMeta: metav1.ObjectMeta{
251-
UID: old.UID,
252-
ResourceVersion: old.ResourceVersion,
253-
}, // to ensure they appear in the patch as preconditions
254-
Status: obj.Status,
255-
})
256-
if err != nil {
257-
return fmt.Errorf("failed to Marshal new data for LocationDomain %s|%s: %w", clusterName, name, err)
258-
}
259-
260-
patchBytes, err := jsonpatch.CreateMergePatch(oldData, newData)
261-
if err != nil {
262-
return fmt.Errorf("failed to create patch for LocationDomain %s|%s: %w", clusterName, name, err)
263-
}
264-
logger.WithValues("patch", string(patchBytes)).V(2).Info("patching Namespace")
265-
_, uerr := c.kubeClusterClient.Cluster(clusterName.Path()).CoreV1().Namespaces().Patch(ctx, obj.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{}, "status")
266-
return uerr
240+
var errs []error
241+
if err := c.reconcile(ctx, key, ns); err != nil {
242+
errs = append(errs, err)
267243
}
268244

269-
return reconcileErr
270-
}
245+
oldResource := &Resource{ObjectMeta: old.ObjectMeta, Spec: &old.Spec, Status: &old.Status}
246+
newResource := &Resource{ObjectMeta: ns.ObjectMeta, Spec: &ns.Spec, Status: &ns.Status}
247+
if err := c.commit(ctx, oldResource, newResource); err != nil {
248+
errs = append(errs, err)
249+
}
271250

272-
func (c *controller) patchNamespace(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error) {
273-
logger := klog.FromContext(ctx)
274-
logger.WithValues("patch", string(data)).V(2).Info("patching Namespace")
275-
return c.kubeClusterClient.Cluster(clusterName).CoreV1().Namespaces().Patch(ctx, name, pt, data, opts, subresources...)
251+
return utilerrors.NewAggregate(errs)
276252
}

pkg/reconciler/workload/namespace/namespace_reconcile.go

Lines changed: 19 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -20,60 +20,37 @@ import (
2020
"context"
2121
"time"
2222

23-
"github.com/kcp-dev/logicalcluster/v3"
24-
2523
corev1 "k8s.io/api/core/v1"
26-
"k8s.io/apimachinery/pkg/labels"
27-
utilserrors "k8s.io/apimachinery/pkg/util/errors"
28-
29-
schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
30-
)
31-
32-
type reconcileStatus int
33-
34-
const (
35-
reconcileStatusStop reconcileStatus = iota
36-
reconcileStatusContinue
3724
)
3825

39-
type reconciler interface {
40-
reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error)
26+
type reconcileResult struct {
27+
stop bool
28+
requeueAfter time.Duration
4129
}
4230

43-
func (c *controller) reconcile(ctx context.Context, ns *corev1.Namespace) error {
44-
reconcilers := []reconciler{
45-
&bindNamespaceReconciler{
46-
listPlacement: c.listPlacement,
47-
patchNamespace: c.patchNamespace,
48-
},
49-
&placementSchedulingReconciler{
50-
listPlacement: c.listPlacement,
51-
enqueueAfter: c.enqueueAfter,
52-
patchNamespace: c.patchNamespace,
53-
now: time.Now,
54-
},
55-
&statusConditionReconciler{
56-
patchNamespace: c.patchNamespace,
57-
},
58-
}
31+
type reconcileFunc func(ctx context.Context, key string, ns *corev1.Namespace) (reconcileResult, error)
5932

60-
var errs []error
33+
func (c *controller) reconcile(ctx context.Context, key string, ns *corev1.Namespace) error {
34+
reconcilers := []reconcileFunc{
35+
c.reconcilePlacementBind,
36+
c.reconcileScheduling,
37+
c.reconcileStatus,
38+
}
6139

6240
for _, r := range reconcilers {
63-
var err error
64-
var status reconcileStatus
65-
status, ns, err = r.reconcile(ctx, ns)
41+
result, err := r(ctx, key, ns)
6642
if err != nil {
67-
errs = append(errs, err)
43+
return err
6844
}
69-
if status == reconcileStatusStop {
45+
46+
if result.stop {
7047
break
7148
}
72-
}
7349

74-
return utilserrors.NewAggregate(errs)
75-
}
50+
if result.requeueAfter > 0 {
51+
c.queue.AddAfter(key, result.requeueAfter)
52+
}
53+
}
7654

77-
func (c *controller) listPlacement(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error) {
78-
return c.placementLister.Cluster(clusterName).List(labels.Everything())
55+
return nil
7956
}

pkg/reconciler/workload/namespace/namespace_reconcile_placementbind.go

Lines changed: 25 additions & 44 deletions
Original file line numberDiff line numberDiff line change
@@ -18,73 +18,54 @@ package namespace
1818

1919
import (
2020
"context"
21-
"encoding/json"
2221

2322
"github.com/kcp-dev/logicalcluster/v3"
2423

2524
corev1 "k8s.io/api/core/v1"
2625
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
27-
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
2826
"k8s.io/apimachinery/pkg/labels"
29-
"k8s.io/apimachinery/pkg/types"
30-
"k8s.io/klog/v2"
3127

3228
schedulingv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/scheduling/v1alpha1"
3329
"github.com/kcp-dev/kcp/sdk/apis/third_party/conditions/util/conditions"
3430
)
3531

36-
// bindNamespaceReconciler updates the existing annotation and creates an empty one if
37-
// at least one placement matches and there is no annotation. It delete the annotation
32+
// reconcilePlacementBind updates the existing scheduling.kcp.io/placement annotation and creates an
33+
// empty one if at least one placement matches and there is no annotation. It deletes the annotation
3834
// if there is no matched placement.
35+
//
3936
// TODO this should be reconsidered when we want lazy binding.
40-
type bindNamespaceReconciler struct {
41-
listPlacement func(clusterName logicalcluster.Name) ([]*schedulingv1alpha1.Placement, error)
42-
43-
patchNamespace func(ctx context.Context, clusterName logicalcluster.Path, name string, pt types.PatchType, data []byte, opts metav1.PatchOptions, subresources ...string) (*corev1.Namespace, error)
44-
}
45-
46-
func (r *bindNamespaceReconciler) reconcile(ctx context.Context, ns *corev1.Namespace) (reconcileStatus, *corev1.Namespace, error) {
47-
logger := klog.FromContext(ctx)
37+
func (c *controller) reconcilePlacementBind(
38+
_ context.Context,
39+
_ string,
40+
ns *corev1.Namespace,
41+
) (reconcileResult, error) {
4842
clusterName := logicalcluster.From(ns)
4943

50-
_, foundPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey]
51-
52-
validPlacements, err := r.validPlacements(clusterName, ns)
44+
validPlacements, err := c.validPlacements(clusterName, ns)
5345
if err != nil {
54-
return reconcileStatusContinue, ns, err
46+
return reconcileResult{stop: true}, err
5547
}
5648

57-
expectedAnnotations := map[string]interface{}{} // nil means to remove the key
58-
if len(validPlacements) > 0 && !foundPlacement {
59-
expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = ""
60-
} else if len(validPlacements) == 0 && foundPlacement {
61-
expectedAnnotations[schedulingv1alpha1.PlacementAnnotationKey] = nil
62-
}
63-
64-
if len(expectedAnnotations) == 0 {
65-
return reconcileStatusContinue, ns, nil
66-
}
49+
_, hasPlacement := ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey]
50+
shouldHavePlacement := len(validPlacements) > 0
6751

68-
patch := map[string]interface{}{}
69-
if err := unstructured.SetNestedField(patch, expectedAnnotations, "metadata", "annotations"); err != nil {
70-
return reconcileStatusStop, ns, err
71-
}
72-
73-
patchBytes, err := json.Marshal(patch)
74-
if err != nil {
75-
return reconcileStatusStop, ns, err
76-
}
77-
logger.WithValues("patch", string(patchBytes)).V(3).Info("patching Namespace to update placement annotation")
78-
updated, err := r.patchNamespace(ctx, clusterName.Path(), ns.Name, types.MergePatchType, patchBytes, metav1.PatchOptions{})
79-
if err != nil {
80-
return reconcileStatusStop, ns, err
52+
switch {
53+
case shouldHavePlacement && hasPlacement, !shouldHavePlacement && !hasPlacement:
54+
return reconcileResult{}, nil
55+
case shouldHavePlacement && !hasPlacement:
56+
if ns.Annotations == nil {
57+
ns.Annotations = make(map[string]string)
58+
}
59+
ns.Annotations[schedulingv1alpha1.PlacementAnnotationKey] = ""
60+
case !shouldHavePlacement && hasPlacement:
61+
delete(ns.Annotations, schedulingv1alpha1.PlacementAnnotationKey)
8162
}
8263

83-
return reconcileStatusContinue, updated, nil
64+
return reconcileResult{stop: true}, nil
8465
}
8566

86-
func (r *bindNamespaceReconciler) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) {
87-
placements, err := r.listPlacement(clusterName)
67+
func (c *controller) validPlacements(clusterName logicalcluster.Name, ns *corev1.Namespace) ([]*schedulingv1alpha1.Placement, error) {
68+
placements, err := c.listPlacements(clusterName)
8869

8970
if err != nil {
9071
return nil, err

0 commit comments

Comments
 (0)