From 52e0d5cbf272e95c9999bb0d10d9f8cf6acb5167 Mon Sep 17 00:00:00 2001 From: Rama Chavali Date: Fri, 20 Dec 2024 00:06:14 +0530 Subject: [PATCH] do not trigger push for none visibility services (#54120) * do not trigger push for visibility none services Signed-off-by: Rama Chavali * do not push service with non visibility Signed-off-by: Rama Chavali * fix condition Signed-off-by: Rama Chavali * fix ut Signed-off-by: Rama Chavali * add test Signed-off-by: Rama Chavali --------- Signed-off-by: Rama Chavali --- .../kube/controller/controller.go | 20 ++++----- .../kube/controller/controller_test.go | 42 +++++++++++++++++++ .../kube/controller/endpointslice.go | 33 +++++++++++---- 3 files changed, 76 insertions(+), 19 deletions(-) diff --git a/pilot/pkg/serviceregistry/kube/controller/controller.go b/pilot/pkg/serviceregistry/kube/controller/controller.go index cc726005be8a..6aae309305c6 100644 --- a/pilot/pkg/serviceregistry/kube/controller/controller.go +++ b/pilot/pkg/serviceregistry/kube/controller/controller.go @@ -261,7 +261,7 @@ func NewController(kubeClient kubelib.Client, options Options) *Controller { c.services = kclient.NewFiltered[*v1.Service](kubeClient, kclient.Filter{ObjectFilter: kubeClient.ObjectFilter()}) - registerHandlers[*v1.Service](c, c.services, "Services", c.onServiceEvent, nil) + registerHandlers(c, c.services, "Services", c.onServiceEvent, nil) c.endpoints = newEndpointSliceController(c) @@ -426,8 +426,9 @@ func (c *Controller) deleteService(svc *model.Service) { shard := model.ShardKeyFromRegistry(c) event := model.EventDelete c.opts.XDSUpdater.SvcUpdate(shard, string(svc.Hostname), svc.Attributes.Namespace, event) - - c.handlers.NotifyServiceHandlers(nil, svc, event) + if !svc.Attributes.ExportTo.Contains(visibility.None) { + c.handlers.NotifyServiceHandlers(nil, svc, event) + } } // recomputeServiceForPod is called when a pod changes and service endpoints need to be recomputed. @@ -506,13 +507,11 @@ func (c *Controller) addOrUpdateService(pre, curr *v1.Service, currConv *model.S } } - // filter out same service event - if event == model.EventUpdate && !serviceUpdateNeedsPush(pre, curr, prevConv, currConv) { - return - } - c.opts.XDSUpdater.SvcUpdate(shard, string(currConv.Hostname), ns, event) - c.handlers.NotifyServiceHandlers(prevConv, currConv, event) + if serviceUpdateNeedsPush(pre, curr, prevConv, currConv) { + log.Debugf("Service %s in namespace %s updated and needs push", currConv.Hostname, ns) + c.handlers.NotifyServiceHandlers(prevConv, currConv, event) + } } func (c *Controller) buildEndpointsForService(svc *model.Service, updateCache bool) []*model.IstioEndpoint { @@ -1201,10 +1200,11 @@ func (c *Controller) servicesForNamespacedName(name types.NamespacedName) []*mod } func serviceUpdateNeedsPush(prev, curr *v1.Service, preConv, currConv *model.Service) bool { + // New Service - If it is not exported, no need to push. if preConv == nil { return !currConv.Attributes.ExportTo.Contains(visibility.None) } - // if service are not exported, no need to push + // if service Visibility is None and has not changed in the update/delete, no need to push. if preConv.Attributes.ExportTo.Contains(visibility.None) && currConv.Attributes.ExportTo.Contains(visibility.None) { return false diff --git a/pilot/pkg/serviceregistry/kube/controller/controller_test.go b/pilot/pkg/serviceregistry/kube/controller/controller_test.go index a4ac9191e1e8..83cc27014136 100644 --- a/pilot/pkg/serviceregistry/kube/controller/controller_test.go +++ b/pilot/pkg/serviceregistry/kube/controller/controller_test.go @@ -48,6 +48,7 @@ import ( "istio.io/istio/pkg/config/labels" "istio.io/istio/pkg/config/mesh" "istio.io/istio/pkg/config/protocol" + "istio.io/istio/pkg/config/schema/kind" "istio.io/istio/pkg/config/visibility" kubelib "istio.io/istio/pkg/kube" "istio.io/istio/pkg/kube/kclient/clienttest" @@ -2429,6 +2430,47 @@ func TestUpdateEdsCacheOnServiceUpdate(t *testing.T) { fx.WaitOrFail(t, "eds cache") } +func TestVisibilityNoneService(t *testing.T) { + controller, fx := NewFakeControllerWithOptions(t, FakeControllerOptions{}) + serviceHandler := func(_, curr *model.Service, _ model.Event) { + pushReq := &model.PushRequest{ + Full: true, + ConfigsUpdated: sets.New(model.ConfigKey{Kind: kind.ServiceEntry, Name: string(curr.Hostname), Namespace: curr.Attributes.Namespace}), + Reason: model.NewReasonStats(model.ServiceUpdate), + } + fx.ConfigUpdate(pushReq) + } + controller.Controller.AppendServiceHandler(serviceHandler) + + // Create an initial pod with a service with None visibility, and endpoint. + pod1 := generatePod([]string{"172.0.1.1"}, "pod1", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{}) + pod2 := generatePod([]string{"172.0.1.2"}, "pod2", "nsA", "", "node1", map[string]string{"app": "prod-app"}, map[string]string{}) + pods := []*corev1.Pod{pod1, pod2} + nodes := []*corev1.Node{ + generateNode("node1", map[string]string{NodeZoneLabel: "zone1", NodeRegionLabel: "region1", label.TopologySubzone.Name: "subzone1"}), + } + addNodes(t, controller, nodes...) + addPods(t, controller, fx, pods...) + createServiceWait(controller, "svc1", "nsA", []string{"10.0.0.1"}, nil, map[string]string{annotation.NetworkingExportTo.Name: "~"}, + []int32{8080}, map[string]string{"app": "prod-app"}, t) + + pod1Ips := []string{"172.0.1.1"} + portNames := []string{"tcp-port"} + createEndpoints(t, controller, "svc1", "nsA", portNames, pod1Ips, nil, nil) + // We should not get any events - service should be ignored. + fx.AssertEmpty(t, 0) + + // update service and remove exportTo annotation. + svc := getService(controller, "svc1", "nsA", t) + svc.Annotations = map[string]string{} + updateService(controller, svc, t) + fx.WaitOrFail(t, "eds cache") + fx.WaitOrFail(t, "service") + host := string(kube.ServiceHostname("svc1", "nsA", controller.opts.DomainSuffix)) + // We should see a full push. + fx.MatchOrFail(t, xdsfake.Event{Type: "xds full", ID: host}) +} + func TestDiscoverySelector(t *testing.T) { networksWatcher := mesh.NewFixedNetworksWatcher(&meshconfig.MeshNetworks{ Networks: map[string]*meshconfig.Network{ diff --git a/pilot/pkg/serviceregistry/kube/controller/endpointslice.go b/pilot/pkg/serviceregistry/kube/controller/endpointslice.go index 7d8823be2e9d..3fa24f7510b4 100644 --- a/pilot/pkg/serviceregistry/kube/controller/endpointslice.go +++ b/pilot/pkg/serviceregistry/kube/controller/endpointslice.go @@ -15,6 +15,7 @@ package controller import ( + "strings" "sync" "github.com/hashicorp/go-multierror" @@ -26,6 +27,7 @@ import ( "k8s.io/apimachinery/pkg/types" mcs "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1" + "istio.io/api/annotation" "istio.io/istio/pilot/pkg/features" "istio.io/istio/pilot/pkg/model" "istio.io/istio/pkg/config" @@ -103,16 +105,21 @@ func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, eve esc.updateEndpointSlice(ep) } - hostnames := esc.c.hostNamesForNamespacedName(namespacedName) - // Trigger EDS push for all hostnames. - esc.pushEDS(hostnames, namespacedName.Namespace) - // Now check if we need to do a full push for the service. // If the service is headless, we need to do a full push if service exposes TCP ports // to create IP based listeners. For pure HTTP headless services, we only need to push NDS. name := serviceNameForEndpointSlice(esLabels) namespace := ep.GetNamespace() svc := esc.c.services.Get(name, namespace) + if svc != nil && !serviceNeedsPush(svc) { + return + } + log.Infof("triggering EDS push for %s %s in namespace %s", name, event, namespace) + + hostnames := esc.c.hostNamesForNamespacedName(namespacedName) + // Trigger EDS push for all hostnames. + esc.pushEDS(hostnames, namespacedName.Namespace) + if svc == nil || svc.Spec.ClusterIP != corev1.ClusterIPNone || svc.Spec.Type == corev1.ServiceTypeExternalName { return } @@ -120,11 +127,6 @@ func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, eve configsUpdated := sets.New[model.ConfigKey]() supportsOnlyHTTP := true for _, modelSvc := range esc.c.servicesForNamespacedName(config.NamespacedName(svc)) { - // skip push if it is not exported - if modelSvc.Attributes.ExportTo.Contains(visibility.None) { - continue - } - for _, p := range modelSvc.Ports { if !p.Protocol.IsHTTP() { supportsOnlyHTTP = false @@ -152,6 +154,19 @@ func (esc *endpointSliceController) onEventInternal(_, ep *v1.EndpointSlice, eve } } +func serviceNeedsPush(svc *corev1.Service) bool { + if svc.Annotations[annotation.NetworkingExportTo.Name] != "" { + namespaces := strings.Split(svc.Annotations[annotation.NetworkingExportTo.Name], ",") + for _, ns := range namespaces { + ns = strings.TrimSpace(ns) + if ns == string(visibility.None) { + return false + } + } + } + return true +} + // GetProxyServiceTargets returns service instances co-located with a given proxy // This is only used to find the targets associated with a headless service. // For the service with selector, it will use GetProxyServiceTargetsByPod to get the service targets.