Skip to content

Commit

Permalink
Merge pull request #280 from cybozu-go/fix-to-check-valid-egress-client
Browse files Browse the repository at this point in the history
Fix to check that egress_watcher pick a valid client
  • Loading branch information
terassyi authored Mar 29, 2024
2 parents 8776f98 + 1955ed2 commit b8e3a99
Show file tree
Hide file tree
Showing 3 changed files with 121 additions and 41 deletions.
27 changes: 23 additions & 4 deletions v2/controllers/egress_watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,8 +55,27 @@ func (r *EgressWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
return ctrl.Result{}, err
}

for _, pod := range pods.Items {
for k, v := range pod.Annotations {
targetPods := make(map[string]*corev1.Pod)
for i := range pods.Items {
pod := &pods.Items[i]
if pod.Spec.HostNetwork {
// Pods in host network cannot use egress NAT.
// So skip it.
continue
}
podIp := pod.Status.PodIP
// The reconciliation should be triggered only for running pods.
if pod.Status.Phase == corev1.PodRunning {
if _, found := targetPods[podIp]; found {
// multiple running pods have the same address.
return ctrl.Result{}, fmt.Errorf("multiple pods have the same address: %s", podIp)
}
targetPods[podIp] = pod
}
}

for _, targetPod := range targetPods {
for k, v := range targetPod.Annotations {
if !strings.HasPrefix(k, constants.AnnEgressPrefix) {
continue
}
Expand All @@ -68,7 +87,7 @@ func (r *EgressWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R
// shortcut for the most typical case
if v == eg.Name {
// Do reconcile
if err := r.reconcileEgressClient(ctx, eg, &pod, &logger); err != nil {
if err := r.reconcileEgressClient(ctx, eg, targetPod, &logger); err != nil {
logger.Error(err, "failed to reconcile Egress client pod")
return ctrl.Result{}, err
}
Expand All @@ -77,7 +96,7 @@ func (r *EgressWatcher) Reconcile(ctx context.Context, req ctrl.Request) (ctrl.R

for _, n := range strings.Split(v, ",") {
if n == eg.Name {
if err := r.reconcileEgressClient(ctx, eg, &pod, &logger); err != nil {
if err := r.reconcileEgressClient(ctx, eg, targetPod, &logger); err != nil {
logger.Error(err, "failed to reconcile Egress client pod")
return ctrl.Result{}, err
}
Expand Down
112 changes: 86 additions & 26 deletions v2/controllers/egress_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,8 @@ package controllers

import (
"context"
"fmt"
"net"
"reflect"
"sync"
"time"

Expand All @@ -21,7 +21,7 @@ import (

var _ = Describe("Egress watcher", func() {
ctx := context.Background()
podNetwork := &mockPodNetwork{ips: make(map[string]bool)}
podNetwork := &mockPodNetwork{ips: make(map[string]int)}
var cancel context.CancelFunc

BeforeEach(func() {
Expand All @@ -43,13 +43,16 @@ var _ = Describe("Egress watcher", func() {

makePod("pod1", []string{"10.1.1.2", "fd01::2"}, map[string]string{
"default": "egress1",
})
}, corev1.PodRunning)
makePod("pod2", []string{"10.1.1.3", "fd01::3"}, map[string]string{
"default": "egress2",
})
}, corev1.PodRunning)
makePod("pod3", []string{"10.1.1.4", "fd01::4"}, map[string]string{
"internet": "egress1",
})
}, corev1.PodRunning)
makePod("pod4", []string{"10.1.1.5", "fd01::5"}, map[string]string{
"default": "egress1",
}, corev1.PodSucceeded)

ctx, cancel = context.WithCancel(context.TODO())
mgr, err := ctrl.NewManager(cfg, ctrl.Options{
Expand Down Expand Up @@ -97,26 +100,72 @@ var _ = Describe("Egress watcher", func() {
Expect(err).ToNot(HaveOccurred())

// pod1 is a client of default/egress1, but pod2 and pod3 are the clients of other egresses
Eventually(func() bool {
return reflect.DeepEqual(podNetwork.updatedPodIPs(), map[string]bool{
"10.1.1.2": true,
"fd01::2": true,
})
}).Should(BeTrue())

Consistently(func() bool {
return reflect.DeepEqual(podNetwork.updatedPodIPs(), map[string]bool{
"10.1.1.2": true,
"fd01::2": true,
})
}, 5*time.Second, 1*time.Second).Should(BeTrue())
})
// pod4 is not the target of the reconciliation because pod4 is not running.
Eventually(func() error {
updated := podNetwork.getPodIPCount()
updatedIPv4, ok := updated["10.1.1.2"]
if !ok || updatedIPv4 != 1 {
return fmt.Errorf("pod1's IPv4 address is not updated")
}
updatedIPv6, ok := updated["fd01::2"]
if !ok || updatedIPv6 != 1 {
return fmt.Errorf("pod1's IPv4 address is not updated")
}
return nil
}).Should(Succeed())

Consistently(func() error {
updated := podNetwork.getPodIPCount()
updatedIPv4, ok := updated["10.1.1.2"]
if !ok || updatedIPv4 != 1 {
return fmt.Errorf("pod1's IPv4 address is not updated")
}
updatedIPv6, ok := updated["fd01::2"]
if !ok || updatedIPv6 != 1 {
return fmt.Errorf("pod1's IPv4 address is not updated")
}
return nil
}, 5*time.Second, 1*time.Second).Should(Succeed())

makePod("pod5", []string{"10.1.1.3", "fd01::3"}, map[string]string{
"default": "egress2",
}, corev1.PodRunning)

eg2 := makeEgress("egress2")
err = k8sClient.Create(ctx, eg2)
Expect(err).ShouldNot(HaveOccurred())

svc2 := &corev1.Service{}
svc2.Namespace = "default"
svc2.Name = "egress2"
svc2.Spec.Type = corev1.ServiceTypeClusterIP
svc2.Spec.Ports = []corev1.ServicePort{{
Port: 5555,
TargetPort: intstr.FromInt(5555),
Protocol: corev1.ProtocolUDP,
}}
err = k8sClient.Create(ctx, svc2)
Expect(err).ShouldNot(HaveOccurred())

// Reconciliation should fail because of duplicate address
Consistently(func() error {
updated := podNetwork.getPodIPCount()
_, ok := updated["10.1.1.3"]
if ok {
return fmt.Errorf("pod2 and pod5 should not be updated")
}
_, ok = updated["fd01::3"]
if ok {
return fmt.Errorf("pod2 and pod5 should not be updated")
}
return nil
}, 5*time.Second, 1*time.Second).Should(Succeed())
})
})

type mockPodNetwork struct {
nUpdate int
ips map[string]bool
ips map[string]int

mu sync.Mutex
}
Expand All @@ -137,20 +186,31 @@ func (p *mockPodNetwork) Update(podIPv4, podIPv6 net.IP, hook nodenet.SetupHook)
defer p.mu.Unlock()

p.nUpdate++
p.ips[podIPv4.String()] = true
p.ips[podIPv6.String()] = true
c4, ok := p.ips[podIPv4.String()]
if !ok {
p.ips[podIPv4.String()] = 1
} else {
c4 += 1
}
c6, ok := p.ips[podIPv6.String()]
if !ok {
p.ips[podIPv6.String()] = 1
} else {
c6 += 1
}
return nil
}

func (p *mockPodNetwork) updatedPodIPs() map[string]bool {
m := make(map[string]bool)
func (p *mockPodNetwork) getPodIPCount() map[string]int {
m := make(map[string]int)

p.mu.Lock()
defer p.mu.Unlock()

for k := range p.ips {
m[k] = true
for k, v := range p.ips {
m[k] = v
}

return m
}

Expand Down
23 changes: 12 additions & 11 deletions v2/controllers/pod_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ func makePodWithHostNetwork(name string, ips []string, egresses map[string]strin
ExpectWithOffset(1, err).ShouldNot(HaveOccurred())
}

func makePod(name string, ips []string, egresses map[string]string) {
func makePod(name string, ips []string, egresses map[string]string, phase corev1.PodPhase) {
pod := &corev1.Pod{}
pod.Name = name
pod.Namespace = "default"
Expand All @@ -62,6 +62,7 @@ func makePod(name string, ips []string, egresses map[string]string) {
podIPs[i] = corev1.PodIP{IP: ip}
}
pod.Status.PodIPs = podIPs
pod.Status.Phase = phase
err = k8sClient.Status().Update(context.Background(), pod)
ExpectWithOffset(1, err).ShouldNot(HaveOccurred())
}
Expand All @@ -82,17 +83,17 @@ var _ = Describe("Pod watcher", func() {
var eg *mockEgress

BeforeEach(func() {
makePod("pod1", []string{"10.1.1.1", "fd01::1"}, nil)
makePod("pod1", []string{"10.1.1.1", "fd01::1"}, nil, corev1.PodRunning)
makePod("pod2", []string{"10.1.1.2", "fd01::2"}, map[string]string{
"internet": "egress2",
"external": "egress1,egress2",
})
}, corev1.PodRunning)
makePod("pod3", []string{"fd01::3"}, map[string]string{
"internet": "egress1,egress2",
})
}, corev1.PodRunning)
makePod("pod4", []string{"fd01::4"}, map[string]string{
"external": "egress1",
})
}, corev1.PodRunning)

ctx, cancel = context.WithCancel(context.TODO())
ft = &mockFoUTunnel{peers: make(map[string]bool)}
Expand Down Expand Up @@ -144,10 +145,10 @@ var _ = Describe("Pod watcher", func() {
})

It("should handle new Pods", func() {
makePod("pod5", []string{"10.1.1.5"}, nil)
makePod("pod5", []string{"10.1.1.5"}, nil, corev1.PodRunning)
makePod("pod6", []string{"10.1.1.6"}, map[string]string{
"internet": "egress2",
})
}, corev1.PodRunning)
Eventually(func() bool {
return reflect.DeepEqual(ft.GetPeers(), map[string]bool{
"10.1.1.2": true,
Expand Down Expand Up @@ -295,7 +296,7 @@ var _ = Describe("Pod watcher", func() {
It("should not delete a peer that another pod is reusing", func() {
makePod("job", []string{"10.1.1.5"}, map[string]string{
"internet": "egress2",
})
}, corev1.PodRunning)

Eventually(func() bool {
return reflect.DeepEqual(ft.GetPeers(), map[string]bool{
Expand Down Expand Up @@ -325,7 +326,7 @@ var _ = Describe("Pod watcher", func() {
// another pod reuses the same ip
makePod("another", []string{"10.1.1.5"}, map[string]string{
"internet": "egress2",
})
}, corev1.PodRunning)

jobPod = &corev1.Pod{}
err = k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "job"}, jobPod)
Expand All @@ -347,7 +348,7 @@ var _ = Describe("Pod watcher", func() {
It("should not delete a peer when another pod accidentally hits the same one", func() {
makePod("job", []string{"10.1.1.5"}, map[string]string{
"internet": "egress2",
})
}, corev1.PodRunning)

Eventually(func() bool {
return reflect.DeepEqual(ft.GetPeers(), map[string]bool{
Expand All @@ -361,7 +362,7 @@ var _ = Describe("Pod watcher", func() {
// another pod hits the same ip and trigger the adding pod event
makePod("another", []string{"10.1.1.5"}, map[string]string{
"internet": "egress2",
})
}, corev1.PodRunning)

jobPod := &corev1.Pod{}
err := k8sClient.Get(ctx, client.ObjectKey{Namespace: "default", Name: "job"}, jobPod)
Expand Down

0 comments on commit b8e3a99

Please sign in to comment.