Skip to content

Commit

Permalink
Support reconciling existing ambient pod iptables rules on agent startup
Browse files Browse the repository at this point in the history
Signed-off-by: Benjamin Leggett <[email protected]>
  • Loading branch information
bleggett committed Nov 14, 2024
1 parent 4b6981d commit 23143de
Show file tree
Hide file tree
Showing 4 changed files with 249 additions and 168 deletions.
6 changes: 6 additions & 0 deletions cni/pkg/iptables/iptables.go
Original file line number Diff line number Diff line change
Expand Up @@ -700,3 +700,9 @@ func (cfg *IptablesConfigurator) AppendHostRules(hostSNATIP, hostSNATIPV6 netip.

return iptablesBuilder
}

// ReconcileModeEnable returns true if this particular iptables configurator
// supports/has idempotent execution enabled - i.e. reconcile mode.
func (cfg *IptablesConfigurator) ReconcileModeEnabled() bool {
return cfg.cfg.Reconcile
}
268 changes: 159 additions & 109 deletions cni/pkg/nodeagent/net.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,13 @@ import (
"errors"
"fmt"
"net/netip"
"strconv"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/types"

"istio.io/api/annotation"
"istio.io/istio/cni/pkg/iptables"
"istio.io/istio/cni/pkg/util"
"istio.io/istio/pkg/slices"
dep "istio.io/istio/tools/istio-iptables/pkg/dependencies"
)
Expand All @@ -42,73 +42,59 @@ type NetServer struct {

var _ MeshDataplane = &NetServer{}

func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer {
return &NetServer{
ztunnelServer: ztunnelServer,
currentPodSnapshot: podNsMap,
podNs: podNs,
podIptables: podIptables,
netnsRunner: NetnsDo,
// ConstructInitialSnapshot is always called first, before Start.
// It takes a "snapshot" of ambient pods that were already running when the server started, and:
//
// - initializes a an internal cache of pod info and netns handles with these existing pods.
// This cache will also be updated when the K8S informer gets a new pod.
// This cache represents the "state of the world" of all enrolled pods on the node this agent
// knows about, and will be sent to any connecting ztunnel as a startup message.
//
// - For each of these existing snapshotted pods, steps into their netNS, and reconciles their
// existing iptables rules against the expected set of rules. This is used to handle reconciling
// iptables rule drift/changes between versions.
func (s *NetServer) ConstructInitialSnapshot(existingAmbientPods []*corev1.Pod) error {
var consErr []error

podsByUID := slices.GroupUnique(existingAmbientPods, (*corev1.Pod).GetUID)
if err := s.buildZtunnelSnapshot(podsByUID); err != nil {
log.Warnf("failed to construct initial ztunnel snapshot: %v", err)
consErr = append(consErr, err)
}

if s.podIptables.ReconcileModeEnabled() {
for _, pod := range existingAmbientPods {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.Debug("upgrading and reconciling inpod rules for already-running pod if necessary")
err := s.reconcileExistingPod(pod)
if err != nil {
//nolint: lll
log.Errorf("failed to reconcile inpod rules for %s/%s, try restarting the pod, or removing and re-adding it to the mesh: %v", pod.Namespace, pod.Name, err)
}
consErr = append(consErr, err)
}
}
return errors.Join(consErr...)
}

// Start starts the ztunnel connection listen server.
// ConstructInitialSnapshot should always be invoked before this function.
func (s *NetServer) Start(ctx context.Context) {
log.Debug("starting ztunnel server")
go s.ztunnelServer.Run(ctx)
}

// Stop stops the ztunnel connection listen server.
func (s *NetServer) Stop() {
log.Debug("stopping ztunnel server")
s.ztunnelServer.Close()
}

func (s *NetServer) rescanPod(pod *corev1.Pod) error {
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
filter := map[types.UID]*corev1.Pod{
pod.UID: pod,
}
return s.scanProcForPodsAndCache(filter)
}

func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) {
if netNs == "" {
return s.getNetns(pod)
}
return s.openNetns(pod, netNs)
}

func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) {
return s.currentPodSnapshot.UpsertPodCache(pod, netNs)
}

func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) {
openNetns := s.currentPodSnapshot.Get(string(pod.UID))
if openNetns != nil {
return openNetns, nil
}
log.Debug("pod netns was not found, trying to find it using procfs")
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
if err := s.rescanPod(pod); err != nil {
log.Errorf("error scanning proc: error was %s", err)
return nil, err
}
// try again. we can still get here if the pod is in the process of being created.
// in this case the CNI will be invoked soon and provide us with the netns.
openNetns = s.currentPodSnapshot.Get(string(pod.UID))
if openNetns == nil {
return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound)
}

return openNetns, nil
}

// AddPodToMesh adds a pod to mesh by
// 1. Getting the netns
// 1. Getting the netns (and making sure the netns is cached in the ztunnel state of the world snapshot)
// 2. Adding the pod's IPs to the hostnetns ipsets for node probe checks
// 3. Creating iptables rules inside the pod's netns
// 4. Notifying ztunnel via GRPC to create a proxy for the pod
// 4. Notifying the connected ztunnel via GRPC to create a proxy for the pod
//
// You may ask why we pass the pod IPs separately from the pod manifest itself (which contains the pod IPs as a field)
// - this is because during add specifically, if CNI plugins have not finished executing,
Expand All @@ -119,7 +105,7 @@ func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) {
// we actually may have them before K8S in the Pod object.
func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []netip.Addr, netNs string) error {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.Infof("adding pod to the mesh")
log.Info("adding pod to the mesh")
// make sure the cache is aware of the pod, even if we don't have the netns yet.
s.currentPodSnapshot.Ensure(string(pod.UID))
openNetns, err := s.getOrOpenNetns(pod, netNs)
Expand All @@ -130,21 +116,16 @@ func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []
// If true, the pod will run in 'ingress mode'. This is intended to be used for "ingress" type workloads which handle
// non-mesh traffic on inbound, and send to the mesh on outbound.
// Basically, this just disables inbound redirection.
// We use the SidecarTrafficExcludeInboundPorts annotation for compatibility (its somewhat widely used) but don't support all values.
ingressMode := false
if a, f := pod.Annotations[annotation.AmbientBypassInboundCapture.Name]; f {
var err error
ingressMode, err = strconv.ParseBool(a)
if err != nil {
log.Warnf("annotation %v=%q found, but only '*' is supported", annotation.AmbientBypassInboundCapture.Name, a)
}
ingressMode, err := util.CheckBooleanAnnotation(pod, annotation.AmbientBypassInboundCapture.Name)
if err != nil {
log.Warn(err)
}

log.Debug("calling CreateInpodRules")
if err := s.netnsRunner(openNetns, func() error {
return s.podIptables.CreateInpodRules(log, HostProbeSNATIP, HostProbeSNATIPV6, ingressMode)
}); err != nil {
log.Errorf("failed to update POD inpod: %s/%s %v", pod.Namespace, pod.Name, err)
log.Errorf("failed to inject pod %s/%s with inpod rules: %v", pod.Namespace, pod.Name, err)
return err
}

Expand All @@ -161,24 +142,128 @@ func (s *NetServer) AddPodToMesh(ctx context.Context, pod *corev1.Pod, podIPs []
return nil
}

func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error {
return s.ztunnelServer.PodAdded(ctx, pod, netns)
// RemovePodFromMesh is called when a pod needs to be removed from the mesh.
//
// It:
// - Informs the connected ztunnel that the pod no longer needs to be proxied.
// - Removes the pod's netns file handle from the cache/state of the world snapshot.
// - Steps into the pod netns to remove the inpod iptables redirection rules.
func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh")

// Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps.
var errs []error

// Whether pod is already deleted or not, we need to let go of our netns ref.
openNetns := s.currentPodSnapshot.Take(string(pod.UID))
if openNetns == nil {
log.Debug("failed to find pod netns during removal")
}

// If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side.
if !isDelete {
if openNetns != nil {
// pod is removed from the mesh, but is still running. remove iptables rules
log.Debugf("calling DeleteInpodRules")
if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules() }); err != nil {
return fmt.Errorf("failed to delete inpod rules: %w", err)
}
} else {
log.Warn("pod netns already gone, not deleting inpod rules")
}
}

log.Debug("removing pod from ztunnel")
if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil {
log.Errorf("failed to delete pod from ztunnel: %v", err)
errs = append(errs, err)
}
return errors.Join(errs...)
}

// ConstructInitialSnapshot takes a "snapshot" of current ambient pods and
func newNetServer(ztunnelServer ZtunnelServer, podNsMap *podNetnsCache, podIptables *iptables.IptablesConfigurator, podNs PodNetnsFinder) *NetServer {
return &NetServer{
ztunnelServer: ztunnelServer,
currentPodSnapshot: podNsMap,
podNs: podNs,
podIptables: podIptables,
netnsRunner: NetnsDo,
}
}

// reconcileExistingPod is intended to run on node agent startup, for each pod that was already enrolled prior to startup.
// Willl reconcile any in-pod iptables rules the pod may already have against this node agent's expected/required in-pod iptables rules.
//
// 1. Constructs a ztunnel state message to initialize ztunnel
// 2. Syncs the host ipset
func (s *NetServer) ConstructInitialSnapshot(ambientPods []*corev1.Pod) error {
var consErr []error
// This is used to handle upgrades and such. Note that this call should be idempotent for any pod already in the mesh,
// but should never need to be invoked outside of node agent startup.
func (s *NetServer) reconcileExistingPod(pod *corev1.Pod) error {
openNetns, err := s.getNetns(pod)
if err != nil {
return err
}

podsByUID := slices.GroupUnique(ambientPods, (*corev1.Pod).GetUID)
if err := s.buildZtunnelSnapshot(podsByUID); err != nil {
log.Warnf("failed to construct initial ztunnel snapshot: %v", err)
consErr = append(consErr, err)
// If true, the pod will run in 'ingress mode'. This is intended to be used for "ingress" type workloads which handle
// non-mesh traffic on inbound, and send to the mesh on outbound.
// Basically, this just disables inbound redirection.
ingressMode, err := util.CheckBooleanAnnotation(pod, annotation.AmbientBypassInboundCapture.Name)
if err != nil {
log.Warn(err)
}

return errors.Join(consErr...)
if err := s.netnsRunner(openNetns, func() error {
return s.podIptables.CreateInpodRules(log, HostProbeSNATIP, HostProbeSNATIPV6, ingressMode)
}); err != nil {
return err
}

return nil
}

func (s *NetServer) rescanPod(pod *corev1.Pod) error {
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
filter := map[types.UID]*corev1.Pod{
pod.UID: pod,
}
return s.scanProcForPodsAndCache(filter)
}

func (s *NetServer) getOrOpenNetns(pod *corev1.Pod, netNs string) (Netns, error) {
if netNs == "" {
return s.getNetns(pod)
}
return s.openNetns(pod, netNs)
}

func (s *NetServer) openNetns(pod *corev1.Pod, netNs string) (Netns, error) {
return s.currentPodSnapshot.UpsertPodCache(pod, netNs)
}

func (s *NetServer) getNetns(pod *corev1.Pod) (Netns, error) {
openNetns := s.currentPodSnapshot.Get(string(pod.UID))
if openNetns != nil {
return openNetns, nil
}
log.Debug("pod netns was not found, trying to find it using procfs")
// this can happen if the pod was dynamically added to the mesh after it was created.
// in that case, try finding the netns using procfs.
if err := s.rescanPod(pod); err != nil {
log.Errorf("error scanning proc: error was %s", err)
return nil, err
}
// try again. we can still get here if the pod is in the process of being created.
// in this case the CNI will be invoked soon and provide us with the netns.
openNetns = s.currentPodSnapshot.Get(string(pod.UID))
if openNetns == nil {
return nil, fmt.Errorf("can't find netns for pod, this is ok if this is a newly created pod (%w)", ErrPodNotFound)
}

return openNetns, nil
}

func (s *NetServer) sendPodToZtunnelAndWaitForAck(ctx context.Context, pod *corev1.Pod, netns Netns) error {
return s.ztunnelServer.PodAdded(ctx, pod, netns)
}

func (s *NetServer) buildZtunnelSnapshot(ambientPodUIDs map[types.UID]*corev1.Pod) error {
Expand Down Expand Up @@ -219,38 +304,3 @@ func realDependenciesInpod() *dep.RealDependencies {
NetworkNamespace: "",
}
}

// RemovePodFromMesh is called when a pod needs to be removed from the mesh
func (s *NetServer) RemovePodFromMesh(ctx context.Context, pod *corev1.Pod, isDelete bool) error {
log := log.WithLabels("ns", pod.Namespace, "name", pod.Name)
log.WithLabels("delete", isDelete).Debugf("removing pod from the mesh")

// Aggregate errors together, so that if part of the cleanup fails we still proceed with other steps.
var errs []error

// Whether pod is already deleted or not, we need to let go of our netns ref.
openNetns := s.currentPodSnapshot.Take(string(pod.UID))
if openNetns == nil {
log.Debug("failed to find pod netns during removal")
}

// If the pod is already deleted or terminated, we do not need to clean up the pod network -- only the host side.
if !isDelete {
if openNetns != nil {
// pod is removed from the mesh, but is still running. remove iptables rules
log.Debugf("calling DeleteInpodRules")
if err := s.netnsRunner(openNetns, func() error { return s.podIptables.DeleteInpodRules() }); err != nil {
return fmt.Errorf("failed to delete inpod rules: %w", err)
}
} else {
log.Warn("pod netns already gone, not deleting inpod rules")
}
}

log.Debug("removing pod from ztunnel")
if err := s.ztunnelServer.PodDeleted(ctx, string(pod.UID)); err != nil {
log.Errorf("failed to delete pod from ztunnel: %v", err)
errs = append(errs, err)
}
return errors.Join(errs...)
}
Loading

0 comments on commit 23143de

Please sign in to comment.