|  | 
|  | 1 | +/* | 
|  | 2 | +Copyright 2023 The KCP Authors. | 
|  | 3 | +
 | 
|  | 4 | +Licensed under the Apache License, Version 2.0 (the "License"); | 
|  | 5 | +you may not use this file except in compliance with the License. | 
|  | 6 | +You may obtain a copy of the License at | 
|  | 7 | +
 | 
|  | 8 | +    http://www.apache.org/licenses/LICENSE-2.0 | 
|  | 9 | +
 | 
|  | 10 | +Unless required by applicable law or agreed to in writing, software | 
|  | 11 | +distributed under the License is distributed on an "AS IS" BASIS, | 
|  | 12 | +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
|  | 13 | +See the License for the specific language governing permissions and | 
|  | 14 | +limitations under the License. | 
|  | 15 | +*/ | 
|  | 16 | + | 
|  | 17 | +package replicatelogicalcluster | 
|  | 18 | + | 
|  | 19 | +import ( | 
|  | 20 | +	"fmt" | 
|  | 21 | + | 
|  | 22 | +	kcpcache "github.com/kcp-dev/apimachinery/v2/pkg/cache" | 
|  | 23 | +	"github.com/kcp-dev/logicalcluster/v3" | 
|  | 24 | + | 
|  | 25 | +	apierrors "k8s.io/apimachinery/pkg/api/errors" | 
|  | 26 | +	"k8s.io/apimachinery/pkg/util/runtime" | 
|  | 27 | +	"k8s.io/client-go/tools/cache" | 
|  | 28 | + | 
|  | 29 | +	"github.com/kcp-dev/kcp/pkg/reconciler/cache/labellogicalcluster" | 
|  | 30 | +	"github.com/kcp-dev/kcp/pkg/reconciler/cache/replication" | 
|  | 31 | +	corev1alpha1 "github.com/kcp-dev/kcp/sdk/apis/core/v1alpha1" | 
|  | 32 | +	"github.com/kcp-dev/kcp/sdk/apis/workload" | 
|  | 33 | +	workloadv1alpha1 "github.com/kcp-dev/kcp/sdk/apis/workload/v1alpha1" | 
|  | 34 | +	kcpclientset "github.com/kcp-dev/kcp/sdk/client/clientset/versioned/cluster" | 
|  | 35 | +	corev1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/core/v1alpha1" | 
|  | 36 | +	workloadv1alpha1informers "github.com/kcp-dev/kcp/sdk/client/informers/externalversions/workload/v1alpha1" | 
|  | 37 | +) | 
|  | 38 | + | 
|  | 39 | +const ( | 
|  | 40 | +	ControllerName = "kcp-workload-replicate-logicalcluster" | 
|  | 41 | +) | 
|  | 42 | + | 
|  | 43 | +// NewController returns a new controller for labelling LogicalClusters that should be replicated. | 
|  | 44 | + | 
|  | 45 | +func NewController( | 
|  | 46 | +	kcpClusterClient kcpclientset.ClusterInterface, | 
|  | 47 | +	logicalClusterInformer corev1alpha1informers.LogicalClusterClusterInformer, | 
|  | 48 | +	syncTargetInformer workloadv1alpha1informers.SyncTargetClusterInformer, | 
|  | 49 | +) labellogicalcluster.Controller { | 
|  | 50 | +	logicalClusterLister := logicalClusterInformer.Lister() | 
|  | 51 | +	syncTargetIndexer := syncTargetInformer.Informer().GetIndexer() | 
|  | 52 | + | 
|  | 53 | +	c := labellogicalcluster.NewController( | 
|  | 54 | +		ControllerName, | 
|  | 55 | +		workload.GroupName, | 
|  | 56 | +		func(cluster *corev1alpha1.LogicalCluster) bool { | 
|  | 57 | +			// If there are any SyncTargets for this logical cluster, then the LogicalCluster object should be replicated. | 
|  | 58 | +			keys, err := syncTargetIndexer.IndexKeys(kcpcache.ClusterIndexName, kcpcache.ClusterIndexKey(logicalcluster.From(cluster))) | 
|  | 59 | +			if err != nil { | 
|  | 60 | +				runtime.HandleError(fmt.Errorf("failed to list SyncTargets: %v", err)) | 
|  | 61 | +				return false | 
|  | 62 | +			} | 
|  | 63 | +			return len(keys) > 0 | 
|  | 64 | +		}, | 
|  | 65 | +		kcpClusterClient, | 
|  | 66 | +		logicalClusterInformer, | 
|  | 67 | +	) | 
|  | 68 | + | 
|  | 69 | +	// enqueue the logical cluster every time the APIExport changes | 
|  | 70 | +	enqueueSyncTarget := func(obj interface{}) { | 
|  | 71 | +		if tombstone, ok := obj.(cache.DeletedFinalStateUnknown); ok { | 
|  | 72 | +			obj = tombstone.Obj | 
|  | 73 | +		} | 
|  | 74 | + | 
|  | 75 | +		syncTarget, ok := obj.(*workloadv1alpha1.SyncTarget) | 
|  | 76 | +		if !ok { | 
|  | 77 | +			runtime.HandleError(fmt.Errorf("unexpected object type: %T", obj)) | 
|  | 78 | +			return | 
|  | 79 | +		} | 
|  | 80 | + | 
|  | 81 | +		cluster, err := logicalClusterLister.Cluster(logicalcluster.From(syncTarget)).Get(corev1alpha1.LogicalClusterName) | 
|  | 82 | +		if err != nil && !apierrors.IsNotFound(err) { | 
|  | 83 | +			runtime.HandleError(fmt.Errorf("failed to get logical cluster: %v", err)) | 
|  | 84 | +			return | 
|  | 85 | +		} else if apierrors.IsNotFound(err) { | 
|  | 86 | +			return | 
|  | 87 | +		} | 
|  | 88 | + | 
|  | 89 | +		c.EnqueueLogicalCluster(cluster, "reason", "SyncTarget changed", "synctarget", syncTarget.Name) | 
|  | 90 | +	} | 
|  | 91 | + | 
|  | 92 | +	syncTargetInformer.Informer().AddEventHandler(cache.FilteringResourceEventHandler{ | 
|  | 93 | +		FilterFunc: replication.IsNoSystemClusterName, | 
|  | 94 | +		Handler: cache.ResourceEventHandlerFuncs{ | 
|  | 95 | +			AddFunc: func(obj interface{}) { | 
|  | 96 | +				enqueueSyncTarget(obj) | 
|  | 97 | +			}, | 
|  | 98 | +			DeleteFunc: func(obj interface{}) { | 
|  | 99 | +				enqueueSyncTarget(obj) | 
|  | 100 | +			}, | 
|  | 101 | +		}, | 
|  | 102 | +	}) | 
|  | 103 | + | 
|  | 104 | +	return c | 
|  | 105 | +} | 
0 commit comments