Skip to content

Commit

Permalink
Mostly cosmetic tidies split out from istio#53906
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Dec 20, 2024
1 parent aeb416a commit c4e7ebc
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 149 deletions.
222 changes: 116 additions & 106 deletions cni/pkg/nodeagent/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"errors"
"fmt"
"net/netip"
"strconv"
"strings"

corev1 "k8s.io/api/core/v1"
Expand All @@ -28,6 +27,7 @@ import (
"istio.io/api/annotation"
"istio.io/istio/cni/pkg/iptables"
"istio.io/istio/pkg/slices"
"istio.io/istio/cni/pkg/util"
dep "istio.io/istio/tools/istio-iptables/pkg/dependencies"
)

Expand All @@ -43,73 +43,43 @@ type NetServer struct {

var _ MeshDataplane = &NetServer{}

func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer {
return &NetServer{
ztunnelServer: ztunnelServer,
currentPodSnapshot: podNsMap,
podNs: podNs,
podIptables: podIptables,
netnsRunner: NetnsDo,
// ConstructInitialSnapshot is always called first, before Start.
// It takes a "snapshot" of ambient pods that were already running when the server started, and:
//
// - initializes a an internal cache of pod info and netns handles with these existing pods.
// This cache will also be updated when the K8S informer gets a new pod.
// This cache represents the "state of the world" of all enrolled pods on the node this agent
// knows about, and will be sent to any connecting ztunnel as a startup message.
func (s *NetServer) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error {
var consErr []error

podsByUID := slices.GroupUnique(ambientPods, (*corev1.Pod).GetUID)
if err := s.buildZtunnelSnapshot(podsByUID); err != nil {
log.Warnf("failed to construct initial ztunnel snapshot: %v", err)
consErr = append(consErr, err)
}

return errors.Join(consErr...)
}

// Start starts the ztunnel connection listen server.
// ConstructInitialSnapshot should always be invoked before this function.
func (s *NetServer) Start(ctx context.Context) {
log.Debug("starting ztunnel server")
go s.ztunnelServer.Run(ctx)
}

// Stop stops the ztunnel connection listen server.
func (s *NetServer) Stop(_ bool) {
log.Debug("stopping ztunnel server")
s.ztunnelServer.Close()
}

func (s *NetServer) rescanPod(pod *corev1.Pod) error {
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
filter := map[types.UID]*corev1.Pod{
pod.UID: pod,
}
return s.scanProcForPodsAndCache(filter)
}

func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) {
if netNs == "" {
return s.getNetns(pod)
}
return s.openNetns(pod, netNs)
}

func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) {
return s.currentPodSnapshot.UpsertPodCache(pod, netNs)
}

func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) {
openNetns := s.currentPodSnapshot.Get(string(pod.UID))
if openNetns != nil {
return openNetns, nil
}
log.Debug("pod netns was not found, trying to find it using procfs")
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
if err := s.rescanPod(pod); err != nil {
log.Errorf("error scanning proc: error was %s", err)
return nil, err
}
// try again. we can still get here if the pod is in the process of being created.
// in this case the CNI will be invoked soon and provide us with the netns.
openNetns = s.currentPodSnapshot.Get(string(pod.UID))
if openNetns == nil {
return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound)
}

return openNetns, nil
}

// AddPodToMesh adds a pod to mesh by
// 1. Getting the netns
// 1. Getting the netns (and making sure the netns is cached in the ztunnel state of the world snapshot)
// 2. Adding the pod's IPs to the hostnetns ipsets for node probe checks
// 3. Creating iptables rules inside the pod's netns
// 4. Notifying ztunnel via GRPC to create a proxy for the pod
// 4. Notifying the connected ztunnel via GRPC to create a proxy for the pod
//
// You may ask why we pass the pod IPs separately from the pod manifest itself (which contains the pod IPs as a field)
// - this is because during add specifically, if CNI plugins have not finished executing,
Expand Down Expand Up @@ -151,24 +121,100 @@ func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []
return nil
}

func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error {
return s.ztunnelServer.PodAdded(ctx, pod, netns)
// RemovePodFromMesh is called when a pod needs to be removed from the mesh.
//
// It:
// - Informs the connected ztunnel that the pod no longer needs to be proxied.
// - Removes the pod's netns file handle from the cache/state of the world snapshot.
// - Steps into the pod netns to remove the inpod iptables redirection rules.
func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh")

// Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps.
var errs []error

// Whether pod is already deleted or not, we need to let go of our netns ref.
openNetns := s.currentPodSnapshot.Take(string(pod.UID))
if openNetns == nil {
log.Debug("failed to find pod netns during removal")
}

// If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side.
if !isDelete {
if openNetns != nil {
// pod is removed from the mesh, but is still running. remove iptables rules
log.Debugf("calling DeleteInpodRules")
if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules(log) }); err != nil {
return fmt.Errorf("failed to delete inpod rules: %w", err)
}
} else {
log.Warn("pod netns already gone, not deleting inpod rules")
}
}

log.Debug("removing pod from ztunnel")
if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil {
log.Errorf("failed to delete pod from ztunnel: %v", err)
errs = append(errs, err)
}
return errors.Join(errs...)
}

// ConstructInitialSnapshot takes a "snapshot" of current ambient pods and
//
// 1. Constructs a ztunnel state message to initialize ztunnel
// 2. Syncs the host ipset
func (s *NetServer) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error {
var consErr []error
func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer {
return &NetServer{
ztunnelServer: ztunnelServer,
currentPodSnapshot: podNsMap,
podNs: podNs,
podIptables: podIptables,
netnsRunner: NetnsDo,
}
}

podsByUID := slices.GroupUnique(ambientPods, (*corev1.Pod).GetUID)
if err := s.buildZtunnelSnapshot(podsByUID); err != nil {
log.Warnf("failed to construct initial ztunnel snapshot: %v", err)
consErr = append(consErr, err)
func (s *NetServer) rescanPod(pod *corev1.Pod) error {
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
filter := map[types.UID]*corev1.Pod{
pod.UID: pod,
}
return s.scanProcForPodsAndCache(filter)
}

return errors.Join(consErr...)
func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) {
if netNs == "" {
return s.getNetns(pod)
}
return s.openNetns(pod, netNs)
}

func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) {
return s.currentPodSnapshot.UpsertPodCache(pod, netNs)
}

func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) {
openNetns := s.currentPodSnapshot.Get(string(pod.UID))
if openNetns != nil {
return openNetns, nil
}
log.Debug("pod netns was not found, trying to find it using procfs")
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
if err := s.rescanPod(pod); err != nil {
log.Errorf("error scanning proc: error was %s", err)
return nil, err
}
// try again. we can still get here if the pod is in the process of being created.
// in this case the CNI will be invoked soon and provide us with the netns.
openNetns = s.currentPodSnapshot.Get(string(pod.UID))
if openNetns == nil {
return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound)
}

return openNetns, nil
}

func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error {
return s.ztunnelServer.PodAdded(ctx, pod, netns)
}

func (s *NetServer) buildZtunnelSnapshot(ambientPodUIDs map[types.UID]*corev1.Pod) error {
Expand Down Expand Up @@ -199,12 +245,11 @@ func getPodLevelTrafficOverrides(pod *corev1.Pod) iptables.PodLevelOverrides {
// non-mesh traffic on inbound, and send to the mesh on outbound.
// Basically, this just disables inbound redirection.
podCfg := iptables.PodLevelOverrides{IngressMode: false}
if a, f := pod.Annotations[annotation.AmbientBypassInboundCapture.Name]; f {
var err error
podCfg.IngressMode, err = strconv.ParseBool(a)
if err != nil {
log.Warnf("annotation %v=%q found, but only '*' is supported", annotation.AmbientBypassInboundCapture.Name, a)
}

if ingressMode, err := util.CheckBooleanAnnotation(pod, annotation.AmbientBypassInboundCapture.Name); err == nil {
podCfg.IngressMode = ingressMode
} else {
log.Warn(err)
}

if virt, hasVirt := pod.Annotations[annotation.IoIstioRerouteVirtualInterfaces.Name]; hasVirt {
Expand Down Expand Up @@ -233,38 +278,3 @@ func realDependenciesInpod(useScopedLocks bool) *dep.RealDependencies {
NetworkNamespace: "",
}
}

// RemovePodFromMesh is called when a pod needs to be removed from the mesh
func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh")

// Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps.
var errs []error

// Whether pod is already deleted or not, we need to let go of our netns ref.
openNetns := s.currentPodSnapshot.Take(string(pod.UID))
if openNetns == nil {
log.Debug("failed to find pod netns during removal")
}

// If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side.
if !isDelete {
if openNetns != nil {
// pod is removed from the mesh, but is still running. remove iptables rules
log.Debugf("calling DeleteInpodRules")
if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules(log) }); err != nil {
return fmt.Errorf("failed to delete inpod rules: %w", err)
}
} else {
log.Warn("pod netns already gone, not deleting inpod rules")
}
}

log.Debug("removing pod from ztunnel")
if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil {
log.Errorf("failed to delete pod from ztunnel: %v", err)
errs = append(errs, err)
}
return errors.Join(errs...)
}
Loading

0 comments on commit c4e7ebc

Please sign in to comment.