Skip to content

Commit

Permalink
Merge pull request #2645 from justinsb/opt_in_to_new_controller
Browse files Browse the repository at this point in the history
feat: allow users to opt-in to direct reconciliation
  • Loading branch information
google-oss-prow[bot] authored Sep 10, 2024
2 parents e78019c + 2db88e0 commit c55a528
Show file tree
Hide file tree
Showing 12 changed files with 1,242 additions and 50 deletions.
12 changes: 9 additions & 3 deletions pkg/controller/dcl/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import (
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/jitter"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/lifecyclehandler"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/metrics"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
kccpredicate "github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/predicate"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/ratelimiter"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourceactuation"
"github.com/GoogleCloudPlatform/k8s-config-connector/pkg/controller/resourcewatcher"
Expand Down Expand Up @@ -64,6 +64,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
"sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -98,10 +99,15 @@ type Reconciler struct {
}

func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, converter *conversion.Converter,
dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator) (k8s.SchemaReferenceUpdater, error) {
dclConfig *mmdcl.Config, serviceMappingLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator,
additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
if jitterGenerator == nil {
return nil, fmt.Errorf("jitter generator not initialized")
}
predicates := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}}
if additionalPredicate != nil {
predicates = append(predicates, additionalPredicate)
}
kind := crd.Spec.Names.Kind
apiVersion := k8s.GetAPIVersionFromCRD(crd)
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind))
Expand All @@ -122,7 +128,7 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, conve
Named(controllerName).
WithOptions(controller.Options{MaxConcurrentReconciles: k8s.ControllerMaxConcurrentReconciles, RateLimiter: ratelimiter.NewRateLimiter()}).
WatchesRawSource(&source.Channel{Source: immediateReconcileRequests}, &handler.EnqueueRequestForObject{}).
For(obj, builder.OnlyMetadata, builder.WithPredicates(predicate.UnderlyingResourceOutOfSyncPredicate{})).
For(obj, builder.OnlyMetadata, builder.WithPredicates(predicates...)).
Build(r)
if err != nil {
return nil, fmt.Errorf("error creating new controller: %w", err)
Expand Down
2 changes: 1 addition & 1 deletion pkg/controller/direct/monitoring/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ func (w *visitorWalker) visitAny(path string, v reflect.Value) {
}

switch v.Kind() {
case reflect.Ptr:
case reflect.Ptr, reflect.Interface:
if v.IsNil() {
return
}
Expand Down
9 changes: 2 additions & 7 deletions pkg/controller/direct/registry/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,13 +92,8 @@ func Init(ctx context.Context, config *config.ControllerConfig) error {
}

func RegisterModel(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc) {
if singleton.registrations == nil {
singleton.registrations = make(map[schema.GroupKind]*registration)
}
singleton.registrations[gvk.GroupKind()] = &registration{
gvk: gvk,
factory: modelFn,
}
rg := &predicate.OptInToDirectReconciliation{}
RegisterModelWithReconcileGate(gvk, modelFn, rg)
}

func RegisterModelWithReconcileGate(gvk schema.GroupVersionKind, modelFn ModelFactoryFunc, rg predicate.ReconcileGate) {
Expand Down
9 changes: 7 additions & 2 deletions pkg/controller/direct/sql/sqlinstance_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,11 +43,16 @@ func init() {
registry.RegisterModelWithReconcileGate(krm.SQLInstanceGVK, newSQLInstanceModel, rg)
}

type SQLInstanceReconcileGate struct{}
type SQLInstanceReconcileGate struct {
optIn kccpredicate.OptInToDirectReconciliation
}

var _ kccpredicate.ReconcileGate = &SQLInstanceReconcileGate{}

func (*SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool {
func (r *SQLInstanceReconcileGate) ShouldReconcile(o *unstructured.Unstructured) bool {
if r.optIn.ShouldReconcile(o) {
return true
}
obj := &krm.SQLInstance{}
if err := runtime.DefaultUnstructuredConverter.FromUnstructured(o.Object, &obj); err != nil {
return false
Expand Down
33 changes: 33 additions & 0 deletions pkg/controller/predicate/optin.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package predicate

import "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"

// AnnotationKeyAlphaReconciler allows customers to opt-in to using the direct reconciler.
const AnnotationKeyAlphaReconciler = "alpha.cnrm.cloud.google.com/reconciler"

// OptInToDirectReconciliation allows users to opt in to direct reconciliation
// by specifying an AnnotationKeyAlphaReconciler annotation.
type OptInToDirectReconciliation struct {
}

var _ ReconcileGate = &OptInToDirectReconciliation{}

// ShouldReconcile returns true if the reconciler should be used to for the resource.
func (r *OptInToDirectReconciliation) ShouldReconcile(o *unstructured.Unstructured) bool {
v := o.GetAnnotations()[AnnotationKeyAlphaReconciler]
return v == "direct"
}
86 changes: 52 additions & 34 deletions pkg/controller/registration/registration_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/handler"
crlog "sigs.k8s.io/controller-runtime/pkg/log"
"sigs.k8s.io/controller-runtime/pkg/manager"
"sigs.k8s.io/controller-runtime/pkg/predicate"
"sigs.k8s.io/controller-runtime/pkg/reconcile"
"sigs.k8s.io/controller-runtime/pkg/source"
)
Expand Down Expand Up @@ -238,55 +239,72 @@ func registerDefaultController(r *ReconcileRegistration, config *config.Controll
}
}

// register controllers for dcl-based CRDs
if val, ok := crd.Labels[k8s.DCL2CRDLabel]; ok && val == "true" {
su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator)
if err != nil {
return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
hasDirectController := registry.IsDirectByGK(gvk.GroupKind())
hasTerraformController := crd.Labels[crdgeneration.TF2CRDLabel] == "true"
hasDCLController := crd.Labels[k8s.DCL2CRDLabel] == "true"

var useDirectReconcilerPredicate predicate.Predicate
var useLegacyPredicate predicate.Predicate

// If we have a choice of controllers, construct predicates to choose between them
if hasDirectController && (hasTerraformController || hasDCLController) {
reconcileGate := registry.GetReconcileGate(gvk.GroupKind())
if reconcileGate != nil {
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
// run the direct reconciler only when the reconcile gate returns true.
useDirectReconcilerPredicate = kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate)
useLegacyPredicate = kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, reconcileGate)
}
return su, nil
}
// register controllers for tf-based CRDs
if val, ok := crd.Labels[crdgeneration.TF2CRDLabel]; ok && val == "true" {
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, nil)
if err != nil {
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)

if !hasTerraformController && !hasDCLController {
// We're always going to use the direct reconciler
useDirectReconcilerPredicate = nil
useLegacyPredicate = nil
}

if (hasTerraformController || hasDCLController) && useDirectReconcilerPredicate == nil {
logger.Error(fmt.Errorf("no predicate where we have multiple controllers"), "skipping direct controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
hasDirectController = false
}
return su, nil
}

// register controllers for direct CRDs
if registry.IsDirectByGK(gvk.GroupKind()) {
if hasDirectController {
model, err := registry.GetModel(gvk.GroupKind())
if err != nil {
return nil, err
}
deps := directbase.Deps{
JitterGenerator: r.jitterGenerator,
}
rg := registry.GetReconcileGate(gvk.GroupKind())
if rg != nil {
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
// run the direct reconciler only when the reconcile gate returns true.
rp := kccpredicate.NewReconcilePredicate(r.mgr.GetClient(), gvk, rg)
deps.ReconcilePredicate = rp
JitterGenerator: r.jitterGenerator,
ReconcilePredicate: useDirectReconcilerPredicate,
}
if err := directbase.AddController(r.mgr, gvk, model, deps); err != nil {
return nil, fmt.Errorf("error adding direct controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
}
if rg != nil {
// If reconcile gate is enabled for this gvk, generate a controller-runtime predicate that will
// run the terraform-based reconciler when the reconcile gate returns false.
irp := kccpredicate.NewInverseReconcilePredicate(r.mgr.GetClient(), gvk, rg)
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, irp)
if err != nil {
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
}
return su, nil
}

// register controllers for dcl-based CRDs
if hasDCLController {
su, err := dclcontroller.Add(r.mgr, crd, r.dclConverter, r.dclConfig, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate)
if err != nil {
return nil, fmt.Errorf("error adding dcl controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
}
return schemaUpdater, nil
return su, nil
}
logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
return nil, nil
// register controllers for tf-based CRDs
if hasTerraformController {
su, err := tf.Add(r.mgr, crd, r.provider, r.smLoader, r.defaulters, r.jitterGenerator, useLegacyPredicate)
if err != nil {
return nil, fmt.Errorf("error adding terraform controller for %v to a manager: %w", crd.Spec.Names.Kind, err)
}
return su, nil
}

if !hasDCLController && !hasTerraformController && !hasDirectController {
logger.Error(fmt.Errorf("unrecognized CRD: %v", crd.Spec.Names.Kind), "skipping controller registration", "group", gvk.Group, "version", gvk.Version, "kind", gvk.Kind)
return nil, nil
}

}
return schemaUpdater, nil
}
Expand Down
6 changes: 3 additions & 3 deletions pkg/controller/tf/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ type Reconciler struct {
resourceWatcherRoutines *semaphore.Weighted // Used to cap number of goroutines watching unready dependencies
}

func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, irp predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provider *tfschema.Provider, smLoader *servicemappingloader.ServiceMappingLoader, defaulters []k8s.Defaulter, jitterGenerator jitter.Generator, additionalPredicate predicate.Predicate) (k8s.SchemaReferenceUpdater, error) {
kind := crd.Spec.Names.Kind
apiVersion := k8s.GetAPIVersionFromCRD(crd)
controllerName := fmt.Sprintf("%v-controller", strings.ToLower(kind))
Expand All @@ -97,8 +97,8 @@ func Add(mgr manager.Manager, crd *apiextensions.CustomResourceDefinition, provi
},
}
predicateList := []predicate.Predicate{kccpredicate.UnderlyingResourceOutOfSyncPredicate{}}
if irp != nil {
predicateList = append(predicateList, irp)
if additionalPredicate != nil {
predicateList = append(predicateList, additionalPredicate)
}
_, err = builder.
ControllerManagedBy(mgr).
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
apiVersion: dataflow.cnrm.cloud.google.com/v1beta1
kind: DataflowFlexTemplateJob
metadata:
annotations:
alpha.cnrm.cloud.google.com/reconciler: direct
cnrm.cloud.google.com/management-conflict-prevention-policy: none
cnrm.cloud.google.com/on-delete: cancel
cnrm.cloud.google.com/project-id: ${projectId}
finalizers:
- cnrm.cloud.google.com/finalizer
- cnrm.cloud.google.com/deletion-defender
generation: 1
labels:
cnrm-test: "true"
name: dataflowflextemplatejob-${uniqueId}
namespace: ${uniqueId}
spec:
containerSpecGcsPath: gs://dataflow-templates/2022-10-03-00_RC00/flex/File_Format_Conversion
parameters:
inputFileFormat: csv
inputFileSpec: gs://config-connector-samples/dataflowflextemplate/numbertest.csv
outputBucket: gs://storagebucket-${uniqueId}
outputFileFormat: avro
schema: gs://config-connector-samples/dataflowflextemplate/numbers.avsc
region: us-central1
status:
conditions:
- lastTransitionTime: "1970-01-01T00:00:00Z"
message: The resource is up to date
reason: UpToDate
status: "True"
type: Ready
jobId: ${jobID}
observedGeneration: 1
Loading

0 comments on commit c55a528

Please sign in to comment.