Skip to content

Commit

Permalink
Merge branch 'devel'
Browse files Browse the repository at this point in the history
  • Loading branch information
bilalcaliskan committed Jan 15, 2022
2 parents e284e0e + de66e36 commit e5c153d
Show file tree
Hide file tree
Showing 7 changed files with 192 additions and 85 deletions.
9 changes: 5 additions & 4 deletions internal/k8s/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,17 +33,18 @@ func RunPodInformer(clientSet kubernetes.Interface) {
varnishLabelKey := strings.Split(opts.VarnishLabel, "=")[0]
varnishLabelValue := strings.Split(opts.VarnishLabel, "=")[1]

informerFactory := informers.NewSharedInformerFactory(clientSet, time.Second*30)
podInformer := informerFactory.Core().V1().Pods()
podInformer.Informer().AddEventHandler(cache.ResourceEventHandlerFuncs{
informerFactory := informers.NewSharedInformerFactory(clientSet, 30*time.Second)
podInformer := informerFactory.Core().V1().Pods().Informer()
podInformer.AddEventHandler(cache.ResourceEventHandlerFuncs{
AddFunc: func(obj interface{}) {
pod := obj.(*v1.Pod)
labels := pod.GetLabels()
for key, value := range labels {
if key == varnishLabelKey && value == varnishLabelValue && pod.Namespace == opts.VarnishNamespace {
if pod.Status.PodIP != "" {
podUrl := fmt.Sprintf(PodUrl, pod.Status.PodIP, pod.Spec.Containers[0].Ports[0].ContainerPort)
logger.Info("adding pod url to the varnishPods slice", zap.String("podUrl", podUrl))
logger.Info("adding pod url to the varnishPods slice", zap.String("pod", pod.Name),
zap.String("namespace", pod.Namespace), zap.String("podUrl", podUrl))
addVarnishPod(&options.VarnishInstances, &podUrl)
} else {
logger.Warn("varnish pod does not have an ip address yet, skipping add operation",
Expand Down
125 changes: 71 additions & 54 deletions internal/k8s/informers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/kubernetes/fake"
"sync"
"testing"
"time"
)
Expand All @@ -16,10 +17,33 @@ type FakeAPI struct {
Namespace string
}

/*func (fAPI *FakeAPI) deletePod(name string) error {
// gracePeriodSeconds := int64(0)
return fAPI.ClientSet.CoreV1().Pods(fAPI.Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{})
}*/
func getFakeAPI() *FakeAPI {
client := fake.NewSimpleClientset()
api := &FakeAPI{ClientSet: client, Namespace: "default"}
return api
}

func (fAPI *FakeAPI) deletePod(name string) error {
gracePeriodSeconds := int64(0)
return fAPI.ClientSet.CoreV1().Pods(fAPI.Namespace).Delete(context.TODO(), name, metav1.DeleteOptions{GracePeriodSeconds: &gracePeriodSeconds})
}

func (fAPI *FakeAPI) getPod(name string) (*v1.Pod, error) {
return fAPI.ClientSet.CoreV1().Pods(fAPI.Namespace).Get(context.Background(), name, metav1.GetOptions{})
}

func (fAPI *FakeAPI) updatePod(name, podIP string) (*v1.Pod, error) {
pod, _ := fAPI.getPod(name)
pod.Status.PodIP = podIP
pod.ResourceVersion = "123456"

pod, err := fAPI.ClientSet.CoreV1().Pods(fAPI.Namespace).Update(context.TODO(), pod, metav1.UpdateOptions{})
if err != nil {
return nil, err
}

return pod, nil
}

func (fAPI *FakeAPI) createPod(name, ip string) (*v1.Pod, error) {
pod := &v1.Pod{
Expand Down Expand Up @@ -59,15 +83,8 @@ func (fAPI *FakeAPI) createPod(name, ip string) (*v1.Pod, error) {
return pod, nil
}

func fakeAPI() *FakeAPI {
client := fake.NewSimpleClientset()
api := &FakeAPI{ClientSet: client, Namespace: "default"}
return api
}

func TestRunPodInformerCreatePod(t *testing.T) {
t.Parallel()
api := fakeAPI()
func TestRunPodInformer(t *testing.T) {
api := getFakeAPI()
assert.NotNil(t, api)

cases := []struct {
Expand All @@ -91,51 +108,51 @@ func TestRunPodInformerCreatePod(t *testing.T) {

for _, tc := range cases {
t.Run(tc.caseName, func(t *testing.T) {
pod, err := api.createPod(tc.podName, tc.ip)
assert.Nil(t, err)
assert.NotNil(t, pod)

/*err = api.deletePod(tc.podName)
assert.Nil(t, err)*/
var wg sync.WaitGroup
wg.Add(1)
go func() {
defer wg.Done()
pod, err := api.createPod(tc.podName, tc.ip)
assert.Nil(t, err)
assert.NotNil(t, pod)
}()
wg.Wait()

time.Sleep(2 * time.Second)

wg.Add(1)
go func() {
defer wg.Done()
createdPod, err := api.getPod(tc.podName)
assert.NotNil(t, createdPod)
assert.Nil(t, err)
}()
wg.Wait()

time.Sleep(2 * time.Second)

wg.Add(1)
go func() {
defer wg.Done()
updatedPod, err := api.updatePod(tc.podName, "10.0.0.15")
assert.NotNil(t, updatedPod)
assert.Nil(t, err)
}()
wg.Wait()

time.Sleep(2 * time.Second)

wg.Add(1)
go func() {
defer wg.Done()
err := api.deletePod(tc.podName)
assert.Nil(t, err)
}()
wg.Wait()
})
}

<-time.After(10 * time.Second)
}

/*func TestRunPodInformerDeletePod(t *testing.T) {
t.Parallel()
api := fakeAPI()
assert.NotNil(t, api)
cases := []struct {
caseName, podName, ip string
}{
{
caseName: "case1",
ip: "10.0.0.15",
podName: "varnish-pod-1",
},
}
go func() {
RunPodInformer(api.ClientSet)
}()
for _, tc := range cases {
t.Run(tc.caseName, func(t *testing.T) {
pod, err := api.createPod(tc.podName, tc.ip)
assert.Nil(t, err)
assert.NotNil(t, pod)
err = api.deletePod(tc.podName)
assert.Nil(t, err)
})
}
<- time.After(10 * time.Second)
}*/

func TestGetClientSet(t *testing.T) {
opts.IsLocal = true
opts.KubeConfigPath = "../../mock/kubeconfig"
Expand Down
7 changes: 0 additions & 7 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,6 @@ func init() {

// RunMetricsServer provides an endpoint, exports prometheus metrics using prometheus client golang
func RunMetricsServer() error {
defer func() {
err := logger.Sync()
if err != nil {
panic(err)
}
}()

router := mux.NewRouter()
metricServer := &http.Server{
Handler: router,
Expand Down
38 changes: 35 additions & 3 deletions internal/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,24 +1,56 @@
package metrics

import (
"fmt"
"github.com/stretchr/testify/assert"
"io/ioutil"
"net"
"net/http"
"testing"
"time"
)

func TestRunMetricsServer(t *testing.T) {
errChan := make(chan error, 1)
connChan := make(chan bool, 1)

go func() {
errChan <- RunMetricsServer()
}()

go func() {
for i := 0; i <= 5; i++ {
if i == 5 {
t.Errorf("connection to port %d could not succeeded, not retrying!\n", opts.MetricsPort)
return
}

_, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", opts.MetricsPort))
if err != nil {
t.Logf("connection to port %d could not succeeded, retrying...\n", opts.MetricsPort)
time.Sleep(1 * time.Second)
continue
}

connChan <- true
return
}
}()

for {
select {
case c := <-errChan:
t.Error(c)
case <-time.After(5 * time.Second):
t.Log("success")
t.Fatal(c)
case <-connChan:
resp, err := http.Get(fmt.Sprintf("http://127.0.0.1:%d/metrics", opts.MetricsPort))
assert.Nil(t, err)

body, err := ioutil.ReadAll(resp.Body)
assert.Nil(t, err)
assert.NotEmpty(t, string(body))
return
case <-time.After(20 * time.Second):
t.Fatal("could not completed in 20 seconds, failing")
}
}
}
11 changes: 5 additions & 6 deletions internal/web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ func purgeHandler(w http.ResponseWriter, r *http.Request) {
logger = logger.With(zap.String("requestMethod", "PURGE"))
purgePath := r.Header.Get("purge-path")
if purgePath == "" {
logger.Error("unable to make a PURGE request to Varnish targets, header purge-path must be set!",
zap.String("requestMethod", "PURGE"))
logger.Error("unable to make a PURGE request to Varnish targets, header purge-path must be set!")
http.Error(w, "Header purge-path must be set!", http.StatusBadRequest)
return
}
Expand All @@ -31,11 +30,11 @@ func purgeHandler(w http.ResponseWriter, r *http.Request) {
}

for _, v := range options.VarnishInstances {
// fullUrl := fmt.Sprintf("%s%s", *v, purgePath)
logger.Debug(*v)
fullUrl := fmt.Sprintf("http://192.168.49.2:30654%s", purgePath)
fullUrl := fmt.Sprintf("%s%s", *v, purgePath)
// fullUrl := fmt.Sprintf("http://192.168.49.2:30654%s", purgePath)
req, _ := http.NewRequest("PURGE", fullUrl, nil)
req.Host = "nginx.default.svc"
// req.Host = "nginx.default.svc"
req.Host = purgeDomain

logger.Info("making PURGE request", zap.String("url", fullUrl))
res, err := client.Do(req)
Expand Down
7 changes: 0 additions & 7 deletions internal/web/web.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,6 @@ func init() {

// RunWebServer runs the web server which multiplexes client requests
func RunWebServer() error {
defer func() {
err := logger.Sync()
if err != nil {
panic(err)
}
}()

router := mux.NewRouter()
webServer := initServer(router, fmt.Sprintf(":%d", opts.ServerPort),
time.Duration(int32(opts.WriteTimeoutSeconds))*time.Second,
Expand Down
80 changes: 76 additions & 4 deletions internal/web/web_test.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,95 @@
package web

import (
"fmt"
"github.com/stretchr/testify/assert"
"net"
"net/http"
"net/http/httptest"
"sync"
"testing"
"time"
"varnish-cache-invalidator/internal/options"
)

func TestRunWebServer(t *testing.T) {
func TestPurgeHandler(t *testing.T) {
errChan := make(chan error, 1)
defer close(errChan)
connChan := make(chan bool, 1)
defer close(connChan)
mockChan := make(chan bool, 1)
defer close(mockChan)

var wg sync.WaitGroup
wg.Add(1)
go func() {
mockServer := httptest.NewServer(http.HandlerFunc(func(writer http.ResponseWriter, request *http.Request) {
writer.WriteHeader(http.StatusOK)
if _, err := fmt.Fprint(writer, "asdasfas"); err != nil {
t.Errorf("a fatal error occured while writing response body: %s", err.Error())
return
}
}))
options.VarnishInstances = append(options.VarnishInstances, &mockServer.URL)
wg.Done()
<-mockChan
defer mockServer.Close()
}()
wg.Wait()

go func() {
errChan <- RunWebServer()
}()

go func() {
for i := 0; i <= 5; i++ {
if i == 5 {
t.Errorf("connection to port %d could not succeeded, not retrying!\n", opts.ServerPort)
return
}

_, err := net.Dial("tcp", fmt.Sprintf("127.0.0.1:%d", opts.ServerPort))
if err != nil {
t.Logf("connection to port %d could not succeeded, retrying...\n", opts.ServerPort)
time.Sleep(1 * time.Second)
continue
}

connChan <- true
return
}
}()

for {
select {
case c := <-errChan:
t.Error(c)
case <-time.After(5 * time.Second):
t.Log("success")
t.Fatal(c)
case <-connChan:
var cases = []struct {
name, purgePath, purgeDomain string
expectedCode int
}{
{"case1", "/", "example.com", 200},
{"case2", "", "example.com", 400},
{"case3", "/", "", 400},
}

for _, tc := range cases {
t.Run(tc.name, func(t *testing.T) {
client := &http.Client{Timeout: 10 * time.Second}
req, err := http.NewRequest("PURGE", fmt.Sprintf("http://127.0.0.1:%d/purge", opts.ServerPort), nil)
assert.Nil(t, err)
assert.NotNil(t, req)

req.Header.Set("purge-path", tc.purgePath)
req.Header.Set("purge-domain", tc.purgeDomain)

resp, err := client.Do(req)
assert.NotNil(t, resp)
assert.Nil(t, err)
assert.Equal(t, tc.expectedCode, resp.StatusCode)
})
}
return
}
}
Expand Down

0 comments on commit e5c153d

Please sign in to comment.