Skip to content

fix: make leader election possible with multiple replicas #195

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 82 additions & 35 deletions cmd/manager/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package main
import (
"errors"
"fmt"
"net/url"
"os"
"runtime"
"strconv"
Expand All @@ -19,7 +20,6 @@ import (
dvo_prom "github.com/app-sre/deployment-validation-operator/pkg/prometheus"
"github.com/app-sre/deployment-validation-operator/pkg/validations"
"github.com/app-sre/deployment-validation-operator/version"
"github.com/prometheus/client_golang/prometheus"

"github.com/go-logr/logr"
osappsv1 "github.com/openshift/api/apps/v1"
Expand All @@ -43,13 +43,18 @@ func main() {
os.Setenv(operatorNameEnvVar, dvconfig.OperatorName)

opts := options.Options{
MetricsPort: 8383,
MetricsPath: "metrics",
ProbeAddr: ":8081",
ConfigFile: "config/deployment-validation-operator-config.yaml",
MetricsBindAddr: ":8383",
MetricsPath: "metrics",
MetricsServiceName: "deployment-validation-operator-metrics",
ProbeAddr: ":8081",
ConfigFile: "config/deployment-validation-operator-config.yaml",
}

opts.Process()
if err := opts.Process(); err != nil {
fmt.Fprintf(os.Stdout, "processing options: %v\n", err)

os.Exit(1)
}

// Use a zap logr.Logger implementation. If none of the zap
// flags are configured (or if the zap flag set is not being
Expand Down Expand Up @@ -106,12 +111,8 @@ func setupManager(log logr.Logger, opts options.Options) (manager.Manager, error
return nil, fmt.Errorf("initializing manager: %w", err)
}

if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return nil, fmt.Errorf("adding healthz check: %w", err)
}

if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return nil, fmt.Errorf("adding readyz check: %w", err)
if err := setupProbes(mgr, opts); err != nil {
return nil, fmt.Errorf("setting up probes: %w", err)
}

log.Info("Registering Components")
Expand All @@ -126,29 +127,12 @@ func setupManager(log logr.Logger, opts options.Options) (manager.Manager, error
return nil, fmt.Errorf("initializing generic reconciler: %w", err)
}

if err = gr.AddToManager(mgr); err != nil {
if err := gr.AddToManager(mgr); err != nil {
return nil, fmt.Errorf("adding generic reconciler to manager: %w", err)
}

log.Info("Initializing Prometheus Registry")

reg := prometheus.NewRegistry()

log.Info(fmt.Sprintf("Initializing Prometheus metrics endpoint on %q", opts.MetricsEndpoint()))

srv, err := dvo_prom.NewServer(reg, opts.MetricsPath, fmt.Sprintf(":%d", opts.MetricsPort))
if err != nil {
return nil, fmt.Errorf("initializing metrics server: %w", err)
}

if err := mgr.Add(srv); err != nil {
return nil, fmt.Errorf("adding metrics server to manager: %w", err)
}

log.Info("Initializing Validation Engine")

if err := validations.InitializeValidationEngine(opts.ConfigFile, reg); err != nil {
return nil, fmt.Errorf("initializing validation engine: %w", err)
if err := setupComponents(log, mgr, opts); err != nil {
return nil, fmt.Errorf("setting up components: %w", err)
}

return mgr, nil
Expand Down Expand Up @@ -193,9 +177,13 @@ func getManagerOptions(scheme *k8sruntime.Scheme, opts options.Options) (manager
}

mgrOpts := manager.Options{
Namespace: ns,
HealthProbeBindAddress: opts.ProbeAddr,
MetricsBindAddress: "0", // disable controller-runtime managed prometheus endpoint
LeaderElection: opts.EnableLeaderElection,
LeaderElectionID: "23h85e23.deployment-validation-operator-lock",
LeaderElectionNamespace: opts.LeaderElectionNamespace,
LeaderElectionResourceLock: "leases",
Namespace: ns,
HealthProbeBindAddress: opts.ProbeAddr,
MetricsBindAddress: "0", // disable controller-runtime managed prometheus endpoint
// disable caching of everything
NewClient: newClient,
Scheme: scheme,
Expand Down Expand Up @@ -237,3 +225,62 @@ func kubeClientQPS() (float32, error) {
qps = float32(val)
return qps, err
}

func setupProbes(mgr manager.Manager, opts options.Options) error {
if err := mgr.AddHealthzCheck("health", healthz.Ping); err != nil {
return fmt.Errorf("adding healthz check: %w", err)
}

if err := mgr.AddReadyzCheck("check", healthz.Ping); err != nil {
return fmt.Errorf("adding readyz check: %w", err)
}

return nil
}

func setupComponents(log logr.Logger, mgr manager.Manager, opts options.Options) error {
log.Info("Initializing Prometheus Registry")

reg, err := dvo_prom.NewRegistry()
if err != nil {
return fmt.Errorf("initializing prometheus registry: %w", err)
}

log.Info(fmt.Sprintf("Initializing Prometheus metrics endpoint on %q", opts.MetricsEndpoint()))

svcURL := &url.URL{
Scheme: "http",
Host: opts.MetricsServiceName,
}
if parts := strings.Split(opts.MetricsBindAddr, ":"); len(parts) > 0 {
if len(parts) > 1 {
svcURL.Host += parts[len(parts)-1]
}
}

srv, err := dvo_prom.NewServer(reg,
dvo_prom.WithMetricsAddr(opts.MetricsBindAddr),
dvo_prom.WithMetricsPath(opts.MetricsPath),
dvo_prom.WithServiceURL(svcURL.String()),
)
if err != nil {
return fmt.Errorf("initializing metrics server: %w", err)
}

go func() {
<-mgr.Elected()
srv.Ready()
}()

if err := mgr.Add(srv); err != nil {
return fmt.Errorf("adding metrics server to manager: %w", err)
}

log.Info("Initializing Validation Engine")

if err := validations.InitializeValidationEngine(opts.ConfigFile, reg); err != nil {
return fmt.Errorf("initializing validation engine: %w", err)
}

return nil
}
3 changes: 2 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ require (
github.com/prometheus/common v0.32.1
github.com/spf13/pflag v1.0.5
github.com/spf13/viper v1.9.0
github.com/stretchr/testify v1.7.0
github.com/stretchr/testify v1.7.1
go.uber.org/multierr v1.6.0
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f // indirect
golang.stackrox.io/kube-linter v0.0.0-20210928184316-5e1ead387f43
k8s.io/api v0.22.2
k8s.io/apimachinery v0.22.2
Expand Down
7 changes: 5 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1104,8 +1104,9 @@ github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UV
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.5.1/go.mod h1:5W2xD1RspED5o8YsWQXVCued0rvSQ+mT+I5cxcmMvtA=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.0 h1:nwc3DEeHmmLAfoZucVR881uASk0Mfjw8xYJ99tb5CcY=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.7.1 h1:5TQK59W5E3v0r2duFAb7P95B6hEeOyEnHRa8MjYSMTY=
github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/subosito/gotenv v1.2.0 h1:Slr1R9HxAlEKefgq5jn9U+DnETlIUa6HfgEzj0g5d7s=
github.com/subosito/gotenv v1.2.0/go.mod h1:N0PQaV/YGNqwC0u51sEeR/aUtSLEXKX9iv69rRypqCw=
github.com/syndtr/gocapability v0.0.0-20170704070218-db04d3cc01c8/go.mod h1:hkRG7XYTFWNJGYcbNJQlaLq0fg1yr4J4t/NcTQtrfww=
Expand Down Expand Up @@ -1262,8 +1263,9 @@ golang.org/x/crypto v0.0.0-20210220033148-5ea612d1eb83/go.mod h1:jdWPYTVW3xRLrWP
golang.org/x/crypto v0.0.0-20210322153248-0c34fe9e7dc2/go.mod h1:T9bdIzuCu7OtxOm1hfPfRQxPLYneinmdGuTeoZ9dtd4=
golang.org/x/crypto v0.0.0-20210513164829-c07d793c2f9a/go.mod h1:P+XmwS30IXTQdn5tA2iutPOUgjI07+tq3H3K9MVA1s8=
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519 h1:7I4JAnoQBe7ZtJcBaYHi5UtiO8tQHbUSXxL+pnGRANg=
golang.org/x/crypto v0.0.0-20210921155107-089bfa567519/go.mod h1:GvvjBRRGRdwPK5ydBHafDWAxML/pGHZbMvKqRZ5+Abc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f h1:OeJjE6G4dgCY4PIXvIRQbE8+RX+uXZyGhUy/ksMGJoc=
golang.org/x/crypto v0.0.0-20220427172511-eb4f295cb31f/go.mod h1:IxCIyHEi3zRg3s0A5j5BB6A9Jmi73HwBIUl50j+osU4=
golang.org/x/exp v0.0.0-20190121172915-509febef88a4/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190306152737-a1d7652674e8/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/exp v0.0.0-20190510132918-efd6b22b2522/go.mod h1:ZjyILWgesfNpC6sMxTJOJm9Kp84zZh5NQWvqDGG3Qr8=
Expand Down Expand Up @@ -1363,6 +1365,7 @@ golang.org/x/net v0.0.0-20210503060351-7fd8e65b6420/go.mod h1:9nx3DQGgdP8bBQD5qx
golang.org/x/net v0.0.0-20210520170846-37e1c6afe023/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20210525063256-abc453219eb5/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211015210444-4f30a5c0130f/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20211112202133-69e39bad7dc2/go.mod h1:9nx3DQGgdP8bBQD5qxJ1jj9UTztislL4KSBs9R2vV5Y=
golang.org/x/net v0.0.0-20220225172249-27dd8689420f/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220425223048-2871e0cb64e4/go.mod h1:CfG3xpIq0wQ8r1q4Su4UZFWDARRcnwPjda9FqA0JpMk=
golang.org/x/net v0.0.0-20220722155237-a158d28d115b h1:PxfKdU9lEEDYjdIzOtC4qFWgkU2rGHdKlKowJSMN9h0=
Expand Down
92 changes: 92 additions & 0 deletions internal/handler/handler.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
package handler

import (
"net/http"
"strings"
"sync"

"github.com/go-logr/logr"
)

func NewSwitchableHandler(handA http.Handler, handB http.Handler, opts ...SwitchableHandlerOption) *SwitchableHandler {
var cfg SwitchableHandlerConfig

cfg.Option(opts...)
cfg.Default()

return &SwitchableHandler{
cfg: cfg,
handlerA: handA,
handlerB: handB,
}
}

type SwitchableHandler struct {
cfg SwitchableHandlerConfig
handlerA http.Handler
handlerB http.Handler

lock sync.Mutex
switched bool
}

func (h *SwitchableHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
log := h.cfg.Log.WithValues("remote", r.RemoteAddr)

if h.switched {
log.WithValues("handler", "B").Info("serving request")

h.handlerB.ServeHTTP(w, r)
} else {
log.WithValues("handler", "A").Info("serving request")

h.handlerA.ServeHTTP(w, r)
}
}

func (h *SwitchableHandler) Switch() {
h.lock.Lock()
defer h.lock.Unlock()

if h.switched {
h.switched = false
} else {
h.switched = true
}
}

type SwitchableHandlerConfig struct {
Log logr.Logger
}

func (c *SwitchableHandlerConfig) Option(opts ...SwitchableHandlerOption) {
for _, opt := range opts {
opt.ConfigureSwitchableHandler(c)
}
}

func (c *SwitchableHandlerConfig) Default() {
if c.Log == nil {
c.Log = logr.Discard()
}
}

type SwitchableHandlerOption interface {
ConfigureSwitchableHandler(*SwitchableHandlerConfig)
}

func StopAfterNForwards(n uint, h http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
rawFwds, ok := r.Header["X-Forwarded-For"]
if !ok {
h.ServeHTTP(w, r)
}

splitFwds := strings.Split(strings.Join(rawFwds, ", "), ", ")
if uint(len(splitFwds)) >= n {
http.Error(w, "", http.StatusLoopDetected)
}

h.ServeHTTP(w, r)
})
}
Loading