Skip to content

Commit

Permalink
race condition fix
Browse files Browse the repository at this point in the history
  • Loading branch information
bilalcaliskan committed Aug 31, 2021
1 parent 43bed84 commit b127654
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 19 deletions.
7 changes: 6 additions & 1 deletion cmd/varnish-cache-invalidator/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"io/ioutil"
"log"
"os"
"strings"
"varnish-cache-invalidator/internal/k8s"
Expand Down Expand Up @@ -39,11 +40,15 @@ func main() {

// below check ensures that if our Varnish instances inside kubernetes or not
if opts.InCluster {
logger.Info("will use kubernetes pod instances, running pod informer to fetch pods")
k8s.InitK8sTypes()
go k8s.RunPodInformer()
} else {
log.Println(opts.TargetHosts)
splitted := strings.Split(opts.TargetHosts, ",")
logger.Info("will use standalone varnish instances", zap.Any("instances", splitted))
for _, v := range splitted {
k8s.VarnishInstances = append(k8s.VarnishInstances, &v)
options.VarnishInstances = append(options.VarnishInstances, &v)
}
}

Expand Down
13 changes: 7 additions & 6 deletions internal/k8s/informers.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,15 @@ var (
err error
logger *zap.Logger
opts *options.VarnishCacheInvalidatorOptions
// VarnishInstances keeps pointer of varnish instances' ip:port information
VarnishInstances []*string
)

func init() {
logger = logging.GetLogger()
opts = options.GetVarnishCacheInvalidatorOptions()
}

// InitK8sTypes initializes the required k8s types rest.Config and kubernetes.ClientSet
func InitK8sTypes() {
logger.Info("initializing kube client")

if restConfig, err = getConfig(); err != nil {
Expand Down Expand Up @@ -58,7 +59,7 @@ func RunPodInformer() {
if pod.Status.PodIP != "" {
podUrl := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, pod.Spec.Containers[0].Ports[0].ContainerPort)
logger.Info("Adding pod url to the varnishPods slice", zap.String("podUrl", podUrl))
addVarnishPod(&VarnishInstances, &podUrl)
addVarnishPod(&options.VarnishInstances, &podUrl)
} else {
logger.Warn("Varnish pod does not have an ip address yet, skipping add operation",
zap.String("pod", pod.Name), zap.String("namespace", pod.Namespace))
Expand All @@ -81,7 +82,7 @@ func RunPodInformer() {
zap.String("namespace", newPod.Namespace), zap.String("ipAddress", newPod.Status.PodIP))
podUrl := fmt.Sprintf("http://%s:%d", newPod.Status.PodIP, newPod.Spec.Containers[0].Ports[0].ContainerPort)
logger.Info("Adding pod url to the varnishPods slice", zap.String("podUrl", podUrl))
addVarnishPod(&VarnishInstances, &podUrl)
addVarnishPod(&options.VarnishInstances, &podUrl)
}
}
}
Expand All @@ -94,9 +95,9 @@ func RunPodInformer() {
logger.Info("Varnish pod is deleted, removing from varnishPods slice", zap.String("pod", pod.Name),
zap.String("namespace", pod.Namespace))
podUrl := fmt.Sprintf("http://%s:%d", pod.Status.PodIP, pod.Spec.Containers[0].Ports[0].ContainerPort)
index, found := findVarnishPod(VarnishInstances, podUrl)
index, found := findVarnishPod(options.VarnishInstances, podUrl)
if found {
removeVarnishPod(&VarnishInstances, index)
removeVarnishPod(&options.VarnishInstances, index)
}
}
}
Expand Down
9 changes: 6 additions & 3 deletions internal/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (

var vcio = &VarnishCacheInvalidatorOptions{}

// VarnishInstances keeps pointer of varnish instances' ip:port information
var VarnishInstances []*string

func init() {
vcio.addFlags(pflag.CommandLine)
pflag.Parse()
Expand Down Expand Up @@ -47,9 +50,9 @@ func (vcio *VarnishCacheInvalidatorOptions) addFlags(fs *pflag.FlagSet) {
"VarnishLabel is the label to select proper Varnish pods, defaults to app=varnish")
fs.BoolVar(&vcio.InCluster, "inCluster", true,
"InCluster is the boolean flag if varnish-cache-invalidator is running inside cluster or not, defaults to true")
fs.StringVar(&vcio.TargetHosts, "targetHosts", "",
"TargetHosts used when our Varnish instances are not running in Kubernetes as a pod, required for standalone "+
"Varnish instances, defaults to 'http://127.0.0.1:6081'")
fs.StringVar(&vcio.TargetHosts, "targetHosts", "http://127.0.0.1:6081",
"TargetHosts is comma seperated list of target hosts, used when our Varnish instances are not running "+
"in Kubernetes as a pod, required for standalone Varnish instances, defaults to 'http://127.0.0.1:6081'")
fs.StringVar(&vcio.PurgeDomain, "purgeDomain", "foo.example.com", "PurgeDomain will set Host header "+
"on purge requests. It must be changed to work properly on different environments.")
fs.IntVar(&vcio.ServerPort, "serverPort", 3000, "ServerPort is the web server port of the "+
Expand Down
18 changes: 9 additions & 9 deletions internal/web/handlers.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ package web
import (
"fmt"
"net/http"
"varnish-cache-invalidator/internal/k8s"
"varnish-cache-invalidator/internal/options"

"go.uber.org/zap"
)
Expand All @@ -19,7 +19,7 @@ func banHandler(w http.ResponseWriter, r *http.Request) {
return
}

for _, v := range k8s.VarnishInstances {
for _, v := range options.VarnishInstances {
req, _ := http.NewRequest("BAN", *v, nil)
req.Header.Set("ban-url", banRegex)
logger.Info("Making BAN request", zap.String("requestMethod", "BAN"), zap.String("targetHost", *v))
Expand All @@ -35,15 +35,15 @@ func banHandler(w http.ResponseWriter, r *http.Request) {

}

if successCount == len(k8s.VarnishInstances) {
if successCount == len(options.VarnishInstances) {
logger.Info("All BAN requests succeeded on Varnish pods!", zap.String("requestMethod", "BAN"),
zap.Int("successCount", successCount))
w.WriteHeader(http.StatusOK)
} else {
logger.Warn("One or more Varnish BAN requests failed", zap.String("requestMethod", "BAN"),
zap.Int("successCount", successCount), zap.Int("failureCount", len(k8s.VarnishInstances)-successCount))
zap.Int("successCount", successCount), zap.Int("failureCount", len(options.VarnishInstances)-successCount))
response = fmt.Sprintf("One or more Varnish BAN requests failed, check the logs!\nSucceeded request = %d\n"+
"Failed request = %d", successCount, len(k8s.VarnishInstances)-successCount)
"Failed request = %d", successCount, len(options.VarnishInstances)-successCount)
w.WriteHeader(http.StatusBadRequest)
}

Expand All @@ -61,7 +61,7 @@ func purgeHandler(w http.ResponseWriter, r *http.Request) {
return
}

for _, v := range k8s.VarnishInstances {
for _, v := range options.VarnishInstances {
fullUrl := fmt.Sprintf("%s%s", *v, purgePath)
req, _ := http.NewRequest("PURGE", fullUrl, nil)
req.Host = opts.PurgeDomain
Expand All @@ -78,15 +78,15 @@ func purgeHandler(w http.ResponseWriter, r *http.Request) {
}
}

if successCount == len(k8s.VarnishInstances) {
if successCount == len(options.VarnishInstances) {
logger.Info("All PURGE requests succeeded on Varnish pods!", zap.String("requestMethod", "PURGE"),
zap.Int("successCount", successCount))
w.WriteHeader(http.StatusOK)
} else {
logger.Warn("One or more Varnish PURGE requests failed", zap.String("requestMethod", "PURGE"),
zap.Int("successCount", successCount), zap.Int("failureCount", len(k8s.VarnishInstances)-successCount))
zap.Int("successCount", successCount), zap.Int("failureCount", len(options.VarnishInstances)-successCount))
response = fmt.Sprintf("One or more Varnish PURGE requests failed, check the logs!\nSucceeded request = %d\n"+
"Failed request = %d", successCount, len(k8s.VarnishInstances)-successCount)
"Failed request = %d", successCount, len(options.VarnishInstances)-successCount)
w.WriteHeader(http.StatusBadRequest)
}

Expand Down

0 comments on commit b127654

Please sign in to comment.