From c4e7ebc524700a6258ac646a6a661c4a8ca15a6c Mon Sep 17 00:00:00 2001 From: Benjamin Leggett Date: Fri, 20 Dec 2024 15:59:52 -0500 Subject: [PATCH] Mostly cosmetic tidies split out from #53906 Signed-off-by: Benjamin Leggett --- cni/pkg/nodeagent/net.go | 222 +++++++++++++++++++----------------- cni/pkg/nodeagent/server.go | 93 ++++++++------- cni/pkg/util/podutil.go | 20 ++++ 3 files changed, 186 insertions(+), 149 deletions(-) diff --git a/cni/pkg/nodeagent/net.go b/cni/pkg/nodeagent/net.go index bca859def0f4..6ee59680dfa6 100644 --- a/cni/pkg/nodeagent/net.go +++ b/cni/pkg/nodeagent/net.go @@ -19,7 +19,6 @@ import ( "errors" "fmt" "net/netip" - "strconv" "strings" corev1 "k8s.io/api/core/v1" @@ -28,6 +27,7 @@ import ( "istio.io/api/annotation" "istio.io/istio/cni/pkg/iptables" "istio.io/istio/pkg/slices" + "istio.io/istio/cni/pkg/util" dep "istio.io/istio/tools/istio-iptables/pkg/dependencies" ) @@ -43,73 +43,43 @@ type NetServer struct { var _ MeshDataplane = &NetServer{} -func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer { - return &NetServer{ - ztunnelServer: ztunnelServer, - currentPodSnapshot: podNsMap, - podNs: podNs, - podIptables: podIptables, - netnsRunner: NetnsDo, +// ConstructInitialSnapshot is always called first, before Start. +// It takes a "snapshot" of ambient pods that were already running when the server started, and: +// +// - initializes a an internal cache of pod info and netns handles with these existing pods. +// This cache will also be updated when the K8S informer gets a new pod. +// This cache represents the "state of the world" of all enrolled pods on the node this agent +// knows about, and will be sent to any connecting ztunnel as a startup message. +func (s *NetServer) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error { + var consErr []error + + podsByUID := slices.GroupUnique(ambientPods, (*corev1.Pod).GetUID) + if err := s.buildZtunnelSnapshot(podsByUID); err != nil { + log.Warnf("failed to construct initial ztunnel snapshot: %v", err) + consErr = append(consErr, err) } + + return errors.Join(consErr...) } +// Start starts the ztunnel connection listen server. +// ConstructInitialSnapshot should always be invoked before this function. func (s *NetServer) Start(ctx context.Context) { log.Debug("starting ztunnel server") go s.ztunnelServer.Run(ctx) } +// Stop stops the ztunnel connection listen server. func (s *NetServer) Stop(_ bool) { log.Debug("stopping ztunnel server") s.ztunnelServer.Close() } -func (s *NetServer) rescanPod(pod *corev1.Pod) error { - // this can happen if the pod was dynamically added to the mesh after it was created. - // in that case, try finding the netns using procfs. - filter := map[types.UID]*corev1.Pod{ - pod.UID: pod, - } - return s.scanProcForPodsAndCache(filter) -} - -func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) { - if netNs == "" { - return s.getNetns(pod) - } - return s.openNetns(pod, netNs) -} - -func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) { - return s.currentPodSnapshot.UpsertPodCache(pod, netNs) -} - -func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) { - openNetns := s.currentPodSnapshot.Get(string(pod.UID)) - if openNetns != nil { - return openNetns, nil - } - log.Debug("pod netns was not found, trying to find it using procfs") - // this can happen if the pod was dynamically added to the mesh after it was created. - // in that case, try finding the netns using procfs. - if err := s.rescanPod(pod); err != nil { - log.Errorf("error scanning proc: error was %s", err) - return nil, err - } - // try again. we can still get here if the pod is in the process of being created. - // in this case the CNI will be invoked soon and provide us with the netns. - openNetns = s.currentPodSnapshot.Get(string(pod.UID)) - if openNetns == nil { - return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound) - } - - return openNetns, nil -} - // AddPodToMesh adds a pod to mesh by -// 1. Getting the netns +// 1. Getting the netns (and making sure the netns is cached in the ztunnel state of the world snapshot) // 2. Adding the pod's IPs to the hostnetns ipsets for node probe checks // 3. Creating iptables rules inside the pod's netns -// 4. Notifying ztunnel via GRPC to create a proxy for the pod +// 4. Notifying the connected ztunnel via GRPC to create a proxy for the pod // // You may ask why we pass the pod IPs separately from the pod manifest itself (which contains the pod IPs as a field) // - this is because during add specifically, if CNI plugins have not finished executing, @@ -151,24 +121,100 @@ func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs [] return nil } -func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error { - return s.ztunnelServer.PodAdded(ctx, pod, netns) +// RemovePodFromMesh is called when a pod needs to be removed from the mesh. +// +// It: +// - Informs the connected ztunnel that the pod no longer needs to be proxied. +// - Removes the pod's netns file handle from the cache/state of the world snapshot. +// - Steps into the pod netns to remove the inpod iptables redirection rules. +func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error { + log := log.WithLabels("ns", pod.Namespace, "name", pod.Name) + log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh") + + // Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps. + var errs []error + + // Whether pod is already deleted or not, we need to let go of our netns ref. + openNetns := s.currentPodSnapshot.Take(string(pod.UID)) + if openNetns == nil { + log.Debug("failed to find pod netns during removal") + } + + // If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side. + if !isDelete { + if openNetns != nil { + // pod is removed from the mesh, but is still running. remove iptables rules + log.Debugf("calling DeleteInpodRules") + if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules(log) }); err != nil { + return fmt.Errorf("failed to delete inpod rules: %w", err) + } + } else { + log.Warn("pod netns already gone, not deleting inpod rules") + } + } + + log.Debug("removing pod from ztunnel") + if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil { + log.Errorf("failed to delete pod from ztunnel: %v", err) + errs = append(errs, err) + } + return errors.Join(errs...) } -// ConstructInitialSnapshot takes a "snapshot" of current ambient pods and -// -// 1. Constructs a ztunnel state message to initialize ztunnel -// 2. Syncs the host ipset -func (s *NetServer) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error { - var consErr []error +func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer { + return &NetServer{ + ztunnelServer: ztunnelServer, + currentPodSnapshot: podNsMap, + podNs: podNs, + podIptables: podIptables, + netnsRunner: NetnsDo, + } +} - podsByUID := slices.GroupUnique(ambientPods, (*corev1.Pod).GetUID) - if err := s.buildZtunnelSnapshot(podsByUID); err != nil { - log.Warnf("failed to construct initial ztunnel snapshot: %v", err) - consErr = append(consErr, err) +func (s *NetServer) rescanPod(pod *corev1.Pod) error { + // this can happen if the pod was dynamically added to the mesh after it was created. + // in that case, try finding the netns using procfs. + filter := map[types.UID]*corev1.Pod{ + pod.UID: pod, } + return s.scanProcForPodsAndCache(filter) +} - return errors.Join(consErr...) +func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) { + if netNs == "" { + return s.getNetns(pod) + } + return s.openNetns(pod, netNs) +} + +func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) { + return s.currentPodSnapshot.UpsertPodCache(pod, netNs) +} + +func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) { + openNetns := s.currentPodSnapshot.Get(string(pod.UID)) + if openNetns != nil { + return openNetns, nil + } + log.Debug("pod netns was not found, trying to find it using procfs") + // this can happen if the pod was dynamically added to the mesh after it was created. + // in that case, try finding the netns using procfs. + if err := s.rescanPod(pod); err != nil { + log.Errorf("error scanning proc: error was %s", err) + return nil, err + } + // try again. we can still get here if the pod is in the process of being created. + // in this case the CNI will be invoked soon and provide us with the netns. + openNetns = s.currentPodSnapshot.Get(string(pod.UID)) + if openNetns == nil { + return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound) + } + + return openNetns, nil +} + +func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error { + return s.ztunnelServer.PodAdded(ctx, pod, netns) } func (s *NetServer) buildZtunnelSnapshot(ambientPodUIDs map[types.UID]*corev1.Pod) error { @@ -199,12 +245,11 @@ func getPodLevelTrafficOverrides(pod *corev1.Pod) iptables.PodLevelOverrides { // non-mesh traffic on inbound, and send to the mesh on outbound. // Basically, this just disables inbound redirection. podCfg := iptables.PodLevelOverrides{IngressMode: false} - if a, f := pod.Annotations[annotation.AmbientBypassInboundCapture.Name]; f { - var err error - podCfg.IngressMode, err = strconv.ParseBool(a) - if err != nil { - log.Warnf("annotation %v=%q found, but only '*' is supported", annotation.AmbientBypassInboundCapture.Name, a) - } + + if ingressMode, err := util.CheckBooleanAnnotation(pod, annotation.AmbientBypassInboundCapture.Name); err == nil { + podCfg.IngressMode = ingressMode + } else { + log.Warn(err) } if virt, hasVirt := pod.Annotations[annotation.IoIstioRerouteVirtualInterfaces.Name]; hasVirt { @@ -233,38 +278,3 @@ func realDependenciesInpod(useScopedLocks bool) *dep.RealDependencies { NetworkNamespace: "", } } - -// RemovePodFromMesh is called when a pod needs to be removed from the mesh -func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error { - log := log.WithLabels("ns", pod.Namespace, "name", pod.Name) - log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh") - - // Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps. - var errs []error - - // Whether pod is already deleted or not, we need to let go of our netns ref. - openNetns := s.currentPodSnapshot.Take(string(pod.UID)) - if openNetns == nil { - log.Debug("failed to find pod netns during removal") - } - - // If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side. - if !isDelete { - if openNetns != nil { - // pod is removed from the mesh, but is still running. remove iptables rules - log.Debugf("calling DeleteInpodRules") - if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules(log) }); err != nil { - return fmt.Errorf("failed to delete inpod rules: %w", err) - } - } else { - log.Warn("pod netns already gone, not deleting inpod rules") - } - } - - log.Debug("removing pod from ztunnel") - if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil { - log.Errorf("failed to delete pod from ztunnel: %v", err) - errs = append(errs, err) - } - return errors.Join(errs...) -} diff --git a/cni/pkg/nodeagent/server.go b/cni/pkg/nodeagent/server.go index 9a13aa7f9c8e..402c7cd89dcf 100644 --- a/cni/pkg/nodeagent/server.go +++ b/cni/pkg/nodeagent/server.go @@ -41,7 +41,7 @@ import ( var log = scopes.CNIAgent type MeshDataplane interface { - // called first, (even before Start()). + // MUST be called first, (even before Start()). ConstructInitialSnapshot(ambientPods []*corev1.Pod) error Start(ctx context.Context) @@ -150,39 +150,6 @@ func (s *Server) NotReady() { s.isReady.Store(false) } -// buildKubeClient creates the kube client -func buildKubeClient(kubeConfig string) (kube.Client, error) { - // Used by validation - kubeRestConfig, err := kube.DefaultRestConfig(kubeConfig, "", func(config *rest.Config) { - config.QPS = 80 - config.Burst = 160 - }) - if err != nil { - return nil, fmt.Errorf("failed creating kube config: %v", err) - } - - client, err := kube.NewClient(kube.NewClientConfigForRestConfig(kubeRestConfig), "") - if err != nil { - return nil, fmt.Errorf("failed creating kube client: %v", err) - } - - return client, nil -} - -// createHostsideProbeIpset creates an ipset. This is designed to be called from the host netns. -// Note that if the ipset already exist by name, Create will not return an error. -// -// We will unconditionally flush our set before use here, so it shouldn't matter. -func createHostsideProbeIpset(isV6 bool) (ipset.IPSet, error) { - linDeps := ipset.RealNlDeps() - probeSet, err := ipset.NewIPSet(iptables.ProbeIPSet, isV6, linDeps) - if err != nil { - return probeSet, err - } - probeSet.Flush() - return probeSet, nil -} - func (s *Server) Start() { log.Info("CNI ambient server starting") s.kubeClient.RunAndWait(s.ctx.Done()) @@ -225,10 +192,26 @@ type meshDataplane struct { hostsideProbeIPSet ipset.IPSet } +// ConstructInitialSnapshot is always called first, before Start. +// It takes a "snapshot" of ambient pods that were already running when the server started, +// and constructs various required "state" (adding the pods to the host-level node ipset, +// building the state of the world snapshot send to connecting ztunnels) +func (s *meshDataplane) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error { + if err := s.syncHostIPSets(ambientPods); err != nil { + log.Errorf("failed to sync host IPset: %v", err) + return err + } + + return s.netServer.ConstructInitialSnapshot(ambientPods) +} + +// Start starts the netserver. +// ConstructInitialSnapshot should always be invoked before this function. func (s *meshDataplane) Start(ctx context.Context) { s.netServer.Start(ctx) } +// Stop terminates the netserver, flushes host ipsets, and removes host iptables healthprobe rules. func (s *meshDataplane) Stop(skipCleanup bool) { // Remove host rules (or not) that allow pod healthchecks to work. // These are not critical but if they are not in place pods that have @@ -249,15 +232,6 @@ func (s *meshDataplane) Stop(skipCleanup bool) { s.netServer.Stop(skipCleanup) } -func (s *meshDataplane) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error { - if err := s.syncHostIPSets(ambientPods); err != nil { - log.Errorf("failed to sync host IPset: %v", err) - return err - } - - return s.netServer.ConstructInitialSnapshot(ambientPods) -} - func (s *meshDataplane) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []netip.Addr, netNs string) error { // Ordering is important in this func: // @@ -422,6 +396,20 @@ func (s *meshDataplane) addPodToHostNSIpset(pod *corev1.Pod, podIPs []netip.Addr return addedIps, errors.Join(ipsetAddrErrs...) } +// createHostsideProbeIpset creates an ipset. This is designed to be called from the host netns. +// Note that if the ipset already exist by name, Create will not return an error. +// +// We will unconditionally flush our set before use here, so it shouldn't matter. +func createHostsideProbeIpset(isV6 bool) (ipset.IPSet, error) { + linDeps := ipset.RealNlDeps() + probeSet, err := ipset.NewIPSet(iptables.ProbeIPSet, isV6, linDeps) + if err != nil { + return probeSet, err + } + probeSet.Flush() + return probeSet, nil +} + // removePodFromHostNSIpset will remove (v4, v6) pod IPs from the host IP set(s). // Note that unlike when we add the IP to the set, on removal we will simply // skip removing the IP if the IP matches, but the UID comment does not match our pod. @@ -459,3 +447,22 @@ func pruneHostIPset(expected sets.Set[netip.Addr], hostsideProbeSet *ipset.IPSet } return nil } + +// buildKubeClient creates the kube client +func buildKubeClient(kubeConfig string) (kube.Client, error) { + // Used by validation + kubeRestConfig, err := kube.DefaultRestConfig(kubeConfig, "", func(config *rest.Config) { + config.QPS = 80 + config.Burst = 160 + }) + if err != nil { + return nil, fmt.Errorf("failed creating kube config: %v", err) + } + + client, err := kube.NewClient(kube.NewClientConfigForRestConfig(kubeRestConfig), "") + if err != nil { + return nil, fmt.Errorf("failed creating kube client: %v", err) + } + + return client, nil +} diff --git a/cni/pkg/util/podutil.go b/cni/pkg/util/podutil.go index 8c23f7fd63cf..eb0cf8b099d1 100644 --- a/cni/pkg/util/podutil.go +++ b/cni/pkg/util/podutil.go @@ -18,6 +18,7 @@ import ( "context" "fmt" "net/netip" + "strconv" corev1 "k8s.io/api/core/v1" "k8s.io/apimachinery/pkg/api/errors" @@ -141,3 +142,22 @@ func GetPodIPsIfPresent(pod *corev1.Pod) []netip.Addr { } return podIPs } + +// CheckBooleanAnnotation checks for the named boolean-style (as per strcov.ParseBool) +// annotation on the pod. If not present, or the annotation value is unparsable, returns false. +// Otherwise, returns true. Returns a non-nil error if annotation value could not be parsed. +func CheckBooleanAnnotation(pod *corev1.Pod, annotationName string) (bool, error) { + val, isPresent := pod.Annotations[annotationName] + + if !isPresent { + return false, nil + } + + var err error + parsedVal, err := strconv.ParseBool(val) + if err != nil { + return false, fmt.Errorf("annotation %v=%q found, but only boolean values are supported", annotationName, val) + } + + return parsedVal, nil +}