Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Disable a single resource multi dependency distribution #5743

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 9 additions & 7 deletions cmd/controller-manager/app/controllermanager.go
Original file line number Diff line number Diff line change
Expand Up @@ -762,20 +762,22 @@ func setupControllers(mgr controllerruntime.Manager, opts *options.Options, stop
ConcurrentClusterPropagationPolicySyncs: opts.ConcurrentClusterPropagationPolicySyncs,
ConcurrentResourceTemplateSyncs: opts.ConcurrentResourceTemplateSyncs,
RateLimiterOptions: opts.RateLimiterOpts,
DisableMultiDependencyDistribution: opts.DisableMultiDependencyDistribution,
}

if err := mgr.Add(resourceDetector); err != nil {
klog.Fatalf("Failed to setup resource detector: %v", err)
}
if features.FeatureGate.Enabled(features.PropagateDeps) {
dependenciesDistributor := &dependenciesdistributor.DependenciesDistributor{
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
Client: mgr.GetClient(),
DynamicClient: dynamicClientSet,
InformerManager: controlPlaneInformerManager,
ResourceInterpreter: resourceInterpreter,
RESTMapper: mgr.GetRESTMapper(),
EventRecorder: mgr.GetEventRecorderFor("dependencies-distributor"),
RateLimiterOptions: opts.RateLimiterOpts,
DisableMultiDependencyDistribution: opts.DisableMultiDependencyDistribution,
}
if err := dependenciesDistributor.SetupWithManager(mgr); err != nil {
klog.Fatalf("Failed to setup dependencies distributor: %v", err)
Expand Down
14 changes: 14 additions & 0 deletions cmd/controller-manager/app/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,6 +143,17 @@ type Options struct {
// in scenario of dynamic replica assignment based on cluster free resources.
// Disable if it does not fit your cases for better performance.
EnableClusterResourceModeling bool

// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
//
// Before v1.12, this capability is allowed by default. If you still wish to enable this capability, you can set
// this flag to false. However, you will need to bear some side effects that come with it.
// For example, you can refer to https://github.com/karmada-io/karmada/pull/5717. When the primary resource is deleted,
// it does not consider other resources that currently depend on the resource or any PropagationPolicy associated with it.
//
// It is recommended that you adapt your business accordingly to avoid continued use.
DisableMultiDependencyDistribution bool
Comment on lines +146 to +156
Copy link
Member

@RainbowMango RainbowMango Nov 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would it be better to have a feature gate than a command flag? Something like: DisableDependencySharing.
For example:

  • release-1.12 starts from alpha, which is disabled by default, for backward compatibility concerns.
  • release-1.13 move to beta, which is enabled by default, and allows people to disable it as an escape hatch
  • release-1.14 move to stable, enabled by default, and don't allow to disable it.

@XiShanYongYe-Chang @chaunceyjiang What do you think?

}

// NewOptions builds an empty options.
Expand Down Expand Up @@ -224,6 +235,9 @@ func (o *Options) AddFlags(flags *pflag.FlagSet, allControllers, disabledByDefau
flags.BoolVar(&o.EnableClusterResourceModeling, "enable-cluster-resource-modeling", true, "Enable means controller would build resource modeling for each cluster by syncing Nodes and Pods resources.\n"+
"The resource modeling might be used by the scheduler to make scheduling decisions in scenario of dynamic replica assignment based on cluster free resources.\n"+
"Disable if it does not fit your cases for better performance.")
flags.BoolVar(&o.DisableMultiDependencyDistribution, "disable-multi-dependency-distribution", true, "True value means disable the ability to a resource from being depended on by multiple resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.\n"+
"Before v1.12, this capability is allowed by default. If you still wish to enable this capability, you can set this flag to false. However, you will need to bear some side effects that come with it. For example, you can refer to https://github.com/karmada-io/karmada/pull/5717. When the primary resource is deleted, it does not consider other resources that currently depend on the resource or any PropagationPolicy associated with it.\n"+
"It is recommended that you adapt your business accordingly to avoid continued use.")

o.RateLimiterOpts.AddFlags(flags)
o.ProfileOpts.AddFlags(flags)
Expand Down
71 changes: 71 additions & 0 deletions pkg/dependenciesdistributor/dependencies_distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"encoding/json"
"fmt"
"strings"

corev1 "k8s.io/api/core/v1"
apierrors "k8s.io/apimachinery/pkg/api/errors"
Expand All @@ -29,6 +30,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
utilerrors "k8s.io/apimachinery/pkg/util/errors"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/tools/cache"
Expand All @@ -47,6 +49,7 @@ import (
"sigs.k8s.io/controller-runtime/pkg/source"

configv1alpha1 "github.com/karmada-io/karmada/pkg/apis/config/v1alpha1"
policyv1alpha1 "github.com/karmada-io/karmada/pkg/apis/policy/v1alpha1"
workv1alpha2 "github.com/karmada-io/karmada/pkg/apis/work/v1alpha2"
"github.com/karmada-io/karmada/pkg/events"
"github.com/karmada-io/karmada/pkg/resourceinterpreter"
Expand Down Expand Up @@ -100,6 +103,9 @@ type DependenciesDistributor struct {
RESTMapper meta.RESTMapper
ResourceInterpreter resourceinterpreter.ResourceInterpreter
RateLimiterOptions ratelimiterflag.Options
// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
DisableMultiDependencyDistribution bool

eventHandler cache.ResourceEventHandler
resourceProcessor util.AsyncWorker
Expand Down Expand Up @@ -163,6 +169,12 @@ func (d *DependenciesDistributor) reconcileResourceTemplate(key util.QueueKey) e
return fmt.Errorf("invalid key")
}
klog.V(4).Infof("DependenciesDistributor start to reconcile object: %s", resourceTemplateKey)

if d.DisableMultiDependencyDistribution && resourceTemplateClaimedByPolicy(resourceTemplateKey.Labels) {
klog.V(4).Infof("Skip object(%s) as it has been claimed by PropagationPolicy/ClusterPropagationPolicy", resourceTemplateKey)
return nil
}

bindingList := &workv1alpha2.ResourceBindingList{}
err := d.Client.List(context.TODO(), bindingList, &client.ListOptions{
Namespace: resourceTemplateKey.Namespace,
Expand Down Expand Up @@ -190,6 +202,12 @@ func (d *DependenciesDistributor) reconcileResourceTemplate(key util.QueueKey) e
return nil
}

func resourceTemplateClaimedByPolicy(resourceLabels map[string]string) bool {
_, ppClaimed := resourceLabels[policyv1alpha1.PropagationPolicyPermanentIDLabel]
_, cppClaimed := resourceLabels[policyv1alpha1.ClusterPropagationPolicyPermanentIDLabel]
return ppClaimed || cppClaimed
}

// matchesWithBindingDependencies tells if the given object(resource template) is matched
// with the dependencies of independent resourceBinding.
func matchesWithBindingDependencies(resourceTemplateKey *LabelsKey, independentBinding *workv1alpha2.ResourceBinding) bool {
Expand Down Expand Up @@ -333,6 +351,7 @@ func (d *DependenciesDistributor) handleDependentResource(
Namespace: dependent.Namespace,
Name: dependent.Name,
}
independentBindingID := independentBinding.Labels[workv1alpha2.ResourceBindingPermanentIDLabel]

switch {
case len(dependent.Name) != 0:
Expand All @@ -344,6 +363,17 @@ func (d *DependenciesDistributor) handleDependentResource(
}
return err
}

if d.DisableMultiDependencyDistribution {
hasBeenPropagated, err := objHasBeenPropagatedByOther(d.Client, rawObject, independentBindingID)
if err != nil {
return err
}
if hasBeenPropagated {
return nil
}
}

attachedBinding := buildAttachedBinding(independentBinding, rawObject)
return d.createOrUpdateAttachedBinding(attachedBinding)
case dependent.LabelSelector != nil:
Expand All @@ -357,6 +387,16 @@ func (d *DependenciesDistributor) handleDependentResource(
return err
}
for _, rawObject := range rawObjects {
if d.DisableMultiDependencyDistribution {
hasBeenPropagated, err := objHasBeenPropagatedByOther(d.Client, rawObject, independentBindingID)
if err != nil {
return err
}
if hasBeenPropagated {
continue
}
}

attachedBinding := buildAttachedBinding(independentBinding, rawObject)
if err := d.createOrUpdateAttachedBinding(attachedBinding); err != nil {
return err
Expand All @@ -368,6 +408,37 @@ func (d *DependenciesDistributor) handleDependentResource(
return fmt.Errorf("the Name and LabelSelector in the DependentObjectReference cannot be empty at the same time")
}

func objHasBeenPropagatedByOther(c client.Client, object *unstructured.Unstructured, independentBindingID string) (bool, error) {
if resourceTemplateClaimedByPolicy(object.GetLabels()) {
klog.Warningf("Skip object(%s,kind=%s %s/%s) as it has been claimed by PropagationPolicy/ClusterPropagationPolicy",
object.GetAPIVersion(), object.GetKind(), object.GetNamespace(), object.GetName())
return true, nil
}

bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
binding := &workv1alpha2.ResourceBinding{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: object.GetNamespace(), Name: bindingName}, binding)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}

for k, v := range binding.Labels {
if !strings.HasPrefix(k, dependedByLabelKeyPrefix) {
continue
}

if v != independentBindingID {
klog.Warningf("Skip object(%s,kind=%s %s/%s) as it has been propagated by other dependent resource",
object.GetAPIVersion(), object.GetKind(), object.GetNamespace(), object.GetName())
return true, nil
}
}
return false, nil
}

func (d *DependenciesDistributor) syncScheduleResultToAttachedBindings(ctx context.Context, independentBinding *workv1alpha2.ResourceBinding, dependencies []configv1alpha1.DependentObjectReference) (err error) {
defer func() {
if err != nil {
Expand Down
46 changes: 45 additions & 1 deletion pkg/detector/detector.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"context"
"fmt"
"regexp"
"strings"
"sync"
"time"

Expand All @@ -31,6 +32,7 @@ import (
"k8s.io/apimachinery/pkg/labels"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/errors"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/discovery"
Expand Down Expand Up @@ -62,6 +64,8 @@ import (
"github.com/karmada-io/karmada/pkg/util/restmapper"
)

var dependedByLabelKeyPrefix = "resourcebinding.karmada.io/depended-by-"

// ResourceDetector is a resource watcher which watches all resources and reconcile the events.
type ResourceDetector struct {
// DiscoveryClientSet is used to resource discovery.
Expand Down Expand Up @@ -106,6 +110,10 @@ type ResourceDetector struct {
// the controller.
RateLimiterOptions ratelimiterflag.Options

// DisableMultiDependencyDistribution indicates disable the ability to a resource from being depended on by multiple
// resources or being distributed by PropagationPolicy/ClusterPropagationPolicy while being depended on.
DisableMultiDependencyDistribution bool

stopCh <-chan struct{}
}

Expand Down Expand Up @@ -245,10 +253,22 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
// currently we do that by setting owner reference to derived objects.
return nil
}
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKeyWithConfig, err)
klog.Errorf("Failed to get unstructured object(%s), error: %v", clusterWideKeyWithConfig.ClusterWideKey, err)
return err
}

if d.DisableMultiDependencyDistribution {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you remind me what situation this logic is handling?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A parameter is added to determine whether the Karmada system allows the same resource to be dependent on different resources. By default, the value is true, indicating that multiple dependencies will be prohibited in future evolution, including that a resource is processed by the PropagationPolicy and distributed along with the resource.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I understand the purpose of your DisableMultiDependencyDistribution parameter. But I haven't grasped what the code you wrote in the detector is meant to handle.

Looking at the e2e scenario, this code seems to be breaking compatibility.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But I haven't grasped what the code you wrote in the detector is meant to handle.

The logic of this code is incorrect. I will correct it later.

Looking at the e2e scenario, this code seems to be breaking compatibility.

When DisableMultiDependencyDistribution is false, it adds some restrictions, and it really can be understood as break change.
When DisableMultiDependencyDistribution is true, it keeps the original behavior.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please rebase and fix the failing tests.

skip, err := skipForPropagatedByDependent(d.Client, object)
if err != nil {
klog.Errorf("Failed to calc skipForPropagatedByDependent with object(%s), error: %v", clusterWideKeyWithConfig.ClusterWideKey, err)
return err
}
if skip {
klog.Warningf("Skip to propagate resource(%s) for it has been propagated by the dependency", clusterWideKeyWithConfig.ClusterWideKey)
return nil
}
}

resourceTemplateClaimedBy := util.GetLabelValue(object.GetLabels(), util.ResourceTemplateClaimedByLabel)
// If the resource lacks this label, it implies that the resource template can be propagated by Policy.
// For instance, once MultiClusterService takes over the Service, Policy cannot reclaim it.
Expand All @@ -260,6 +280,30 @@ func (d *ResourceDetector) Reconcile(key util.QueueKey) error {
return d.propagateResource(object, clusterWideKey, resourceChangeByKarmada)
}

func skipForPropagatedByDependent(c client.Client, object *unstructured.Unstructured) (bool, error) {
// cluster scope resource can not be propagated by dependent
if object.GetNamespace() == "" {
return false, nil
}

bindingName := names.GenerateBindingName(object.GetKind(), object.GetName())
binding := &workv1alpha2.ResourceBinding{}
err := c.Get(context.TODO(), types.NamespacedName{Namespace: object.GetNamespace(), Name: bindingName}, binding)
if err != nil {
if apierrors.IsNotFound(err) {
return false, nil
}
return false, err
}

for k := range binding.Labels {
if strings.HasPrefix(k, dependedByLabelKeyPrefix) {
return true, nil
}
}
return false, nil
}

// EventFilter tells if an object should be taken care of.
//
// All objects under Karmada reserved namespace should be ignored:
Expand Down