From d0481ae76047c31b04873bafc63d86ab9b1bc32b Mon Sep 17 00:00:00 2001 From: David Farr Date: Thu, 9 Feb 2023 23:03:31 -0800 Subject: [PATCH] feat: Optional kubernetes-based leader election (#2438) Signed-off-by: David Farr --- .gitignore | 2 + common/common.go | 6 + common/leaderelection/leaderelection.go | 149 ++++++++++++++----- common/leaderelection/leaderelection_test.go | 51 +++++++ common/leaderelection/test/auth.yaml | 3 + controllers/eventsource/resource.go | 6 +- controllers/sensor/resource.go | 6 +- docs/eventsources/ha.md | 23 +++ docs/sensors/ha.md | 25 ++++ eventsources/cmd/start.go | 1 + eventsources/eventing.go | 15 +- sensors/listener.go | 9 +- test/e2e/functional_test.go | 43 +++--- test/e2e/testdata/es-calendar-ha-k8s.yaml | 13 ++ test/e2e/testdata/sensor-log-ha-k8s.yaml | 18 +++ 15 files changed, 305 insertions(+), 65 deletions(-) create mode 100644 common/leaderelection/leaderelection_test.go create mode 100644 common/leaderelection/test/auth.yaml create mode 100644 test/e2e/testdata/es-calendar-ha-k8s.yaml create mode 100644 test/e2e/testdata/sensor-log-ha-k8s.yaml diff --git a/.gitignore b/.gitignore index 299e605ff5..fcd2837259 100644 --- a/.gitignore +++ b/.gitignore @@ -1,6 +1,7 @@ .vscode/ .idea/ .DS_Store +.env vendor/ dist/ # delve debug binaries @@ -12,3 +13,4 @@ debug.test *.out site/ /go-diagrams/ +argo-events diff --git a/common/common.go b/common/common.go index 310c7dffaa..3f6cf3e909 100644 --- a/common/common.go +++ b/common/common.go @@ -29,6 +29,10 @@ const ( EnvVarKubeConfig = "KUBECONFIG" // EnvVarDebugLog is the env var to turn on the debug mode for logging EnvVarDebugLog = "DEBUG_LOG" + // ENVVarPodName should be set to the name of the pod + EnvVarPodName = "POD_NAME" + // ENVVarLeaderElection sets the leader election mode + EnvVarLeaderElection = "LEADER_ELECTION" // EnvImagePullPolicy is the env var to set container's ImagePullPolicy EnvImagePullPolicy = "IMAGE_PULL_POLICY" ) @@ -119,6 +123,8 @@ const ( LabelOwnerName = "owner-name" // AnnotationResourceSpecHash is the annotation of a K8s resource spec hash AnnotationResourceSpecHash = "resource-spec-hash" + // AnnotationLeaderElection is the annotation for leader election + AnnotationLeaderElection = "events.argoproj.io/leader-election" ) // various supported media types diff --git a/common/leaderelection/leaderelection.go b/common/leaderelection/leaderelection.go index 137fd48ac8..dc7a500bde 100644 --- a/common/leaderelection/leaderelection.go +++ b/common/leaderelection/leaderelection.go @@ -4,20 +4,31 @@ import ( "context" "crypto/tls" "fmt" + "os" + "strings" + "time" "github.com/fsnotify/fsnotify" "github.com/nats-io/graft" nats "github.com/nats-io/nats.go" "github.com/spf13/viper" "go.uber.org/zap" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/leaderelection" + "k8s.io/client-go/tools/leaderelection/resourcelock" "github.com/argoproj/argo-events/common" "github.com/argoproj/argo-events/common/logging" eventbuscommon "github.com/argoproj/argo-events/eventbus/common" - apicommon "github.com/argoproj/argo-events/pkg/apis/common" eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" ) +var ( + eventBusAuthFileMountPath = common.EventBusAuthFileMountPath +) + type Elector interface { RunOrDie(context.Context, LeaderCallbacks) } @@ -27,25 +38,39 @@ type LeaderCallbacks struct { OnStoppedLeading func() } -func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int) (Elector, error) { - logger := logging.FromContext(ctx) - - var eventBusType apicommon.EventBusType - var eventBusAuth *eventbusv1alpha1.AuthStrategy +func NewElector(ctx context.Context, eventBusConfig eventbusv1alpha1.BusConfig, clusterName string, clusterSize int, namespace string, leasename string, hostname string) (Elector, error) { switch { + case strings.ToLower(os.Getenv(common.EnvVarLeaderElection)) == "k8s": + return newKubernetesElector(namespace, leasename, hostname) case eventBusConfig.NATS != nil: - eventBusType = apicommon.EventBusNATS - eventBusAuth = eventBusConfig.NATS.Auth + return newEventBusElector(ctx, eventBusConfig.NATS.Auth, clusterName, clusterSize, eventBusConfig.NATS.URL) case eventBusConfig.JetStream != nil: - eventBusType = apicommon.EventBusJetStream - eventBusAuth = &eventbusv1alpha1.AuthStrategyBasic + return newEventBusElector(ctx, &eventbusv1alpha1.AuthStrategyBasic, clusterName, clusterSize, eventBusConfig.JetStream.URL) default: return nil, fmt.Errorf("invalid event bus") } +} + +func newEventBusElector(ctx context.Context, authStrategy *eventbusv1alpha1.AuthStrategy, clusterName string, clusterSize int, url string) (Elector, error) { + auth, err := getEventBusAuth(ctx, authStrategy) + if err != nil { + return nil, err + } + + return &natsEventBusElector{ + clusterName: clusterName, + size: clusterSize, + url: url, + auth: auth, + }, nil +} + +func getEventBusAuth(ctx context.Context, authStrategy *eventbusv1alpha1.AuthStrategy) (*eventbuscommon.Auth, error) { + logger := logging.FromContext(ctx) var auth *eventbuscommon.Auth - cred := &eventbuscommon.AuthCredential{} - if eventBusAuth == nil || *eventBusAuth == eventbusv1alpha1.AuthStrategyNone { + + if authStrategy == nil || *authStrategy == eventbusv1alpha1.AuthStrategyNone { auth = &eventbuscommon.Auth{ Strategy: eventbusv1alpha1.AuthStrategyNone, } @@ -53,47 +78,31 @@ func NewEventBusElector(ctx context.Context, eventBusConfig eventbusv1alpha1.Bus v := viper.New() v.SetConfigName("auth") v.SetConfigType("yaml") - v.AddConfigPath(common.EventBusAuthFileMountPath) - err := v.ReadInConfig() - if err != nil { + v.AddConfigPath(eventBusAuthFileMountPath) + + if err := v.ReadInConfig(); err != nil { return nil, fmt.Errorf("failed to load auth.yaml. err: %w", err) } - err = v.Unmarshal(cred) - if err != nil { + + cred := &eventbuscommon.AuthCredential{} + if err := v.Unmarshal(cred); err != nil { logger.Errorw("failed to unmarshal auth.yaml", zap.Error(err)) return nil, err } + v.WatchConfig() v.OnConfigChange(func(e fsnotify.Event) { // Auth file changed, let it restart. logger.Fatal("Eventbus auth config file changed, exiting..") }) + auth = &eventbuscommon.Auth{ - Strategy: *eventBusAuth, + Strategy: *authStrategy, Credential: cred, } } - var elector Elector - switch eventBusType { - case apicommon.EventBusNATS: - elector = &natsEventBusElector{ - clusterName: clusterName, - size: clusterSize, - url: eventBusConfig.NATS.URL, - auth: auth, - } - case apicommon.EventBusJetStream: - elector = &natsEventBusElector{ - clusterName: clusterName, - size: clusterSize, - url: eventBusConfig.JetStream.URL, - auth: auth, - } - default: - return nil, fmt.Errorf("invalid eventbus type") - } - return elector, nil + return auth, nil } type natsEventBusElector struct { @@ -179,3 +188,67 @@ func (e *natsEventBusElector) RunOrDie(ctx context.Context, callbacks LeaderCall } } } + +type kubernetesElector struct { + namespace string + leasename string + hostname string +} + +func newKubernetesElector(namespace string, leasename string, hostname string) (Elector, error) { + return &kubernetesElector{ + namespace: namespace, + leasename: leasename, + hostname: hostname, + }, nil +} + +func (e *kubernetesElector) RunOrDie(ctx context.Context, callbacks LeaderCallbacks) { + logger := logging.FromContext(ctx) + + config, err := rest.InClusterConfig() + if err != nil { + logger.Fatalw("Failed to retrieve kubernetes config", zap.Error(err)) + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + logger.Fatalw("Failed to create kubernetes client", zap.Error(err)) + } + + lock := &resourcelock.LeaseLock{ + LeaseMeta: metav1.ObjectMeta{ + Name: e.leasename, + Namespace: e.namespace, + }, + Client: client.CoordinationV1(), + LockConfig: resourcelock.ResourceLockConfig{ + Identity: e.hostname, + }, + } + + for { + select { + case <-ctx.Done(): + return + default: + ctx, cancel := context.WithCancel(ctx) + leaderelection.RunOrDie(ctx, leaderelection.LeaderElectionConfig{ + Lock: lock, + ReleaseOnCancel: true, + LeaseDuration: 5 * time.Second, + RenewDeadline: 2 * time.Second, + RetryPeriod: 1 * time.Second, + Callbacks: leaderelection.LeaderCallbacks{ + OnStartedLeading: callbacks.OnStartedLeading, + OnStoppedLeading: callbacks.OnStoppedLeading, + }, + }) + + // When the leader is lost, leaderelection.RunOrDie will + // cease blocking and we will cancel the context. This + // will halt all eventsource/sensor go routines. + cancel() + } + } +} diff --git a/common/leaderelection/leaderelection_test.go b/common/leaderelection/leaderelection_test.go new file mode 100644 index 0000000000..3a3778bdd5 --- /dev/null +++ b/common/leaderelection/leaderelection_test.go @@ -0,0 +1,51 @@ +package leaderelection + +import ( + "context" + "os" + "testing" + + "github.com/argoproj/argo-events/common" + eventbusv1alpha1 "github.com/argoproj/argo-events/pkg/apis/eventbus/v1alpha1" + "github.com/stretchr/testify/assert" +) + +var ( + configs = []eventbusv1alpha1.BusConfig{ + {NATS: &eventbusv1alpha1.NATSConfig{}}, + {JetStream: &eventbusv1alpha1.JetStreamConfig{}}, + } +) + +func TestLeaderElectionWithInvalidEventBus(t *testing.T) { + elector, err := NewElector(context.TODO(), eventbusv1alpha1.BusConfig{}, "", 0, "", "", "") + + assert.Nil(t, elector) + assert.EqualError(t, err, "invalid event bus") +} + +func TestLeaderElectionWithEventBusElector(t *testing.T) { + eventBusAuthFileMountPath = "test" + + for _, config := range configs { + elector, err := NewElector(context.TODO(), config, "", 0, "", "", "") + assert.Nil(t, err) + + _, ok := elector.(*natsEventBusElector) + assert.True(t, ok) + } +} + +func TestLeaderElectionWithKubernetesElector(t *testing.T) { + eventBusAuthFileMountPath = "test" + + os.Setenv(common.EnvVarLeaderElection, "k8s") + + for _, config := range configs { + elector, err := NewElector(context.TODO(), config, "", 0, "", "", "") + assert.Nil(t, err) + + _, ok := elector.(*kubernetesElector) + assert.True(t, ok) + } +} diff --git a/common/leaderelection/test/auth.yaml b/common/leaderelection/test/auth.yaml new file mode 100644 index 0000000000..b3345129cd --- /dev/null +++ b/common/leaderelection/test/auth.yaml @@ -0,0 +1,3 @@ +token: "token" +username: "username" +password: "password" diff --git a/controllers/eventsource/resource.go b/controllers/eventsource/resource.go index 53de809f87..14d5feddc5 100644 --- a/controllers/eventsource/resource.go +++ b/controllers/eventsource/resource.go @@ -188,9 +188,13 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a Value: fmt.Sprintf("eventbus-%s", args.EventSource.Namespace), }, { - Name: "POD_NAME", + Name: common.EnvVarPodName, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}, }, + { + Name: common.EnvVarLeaderElection, + Value: args.EventSource.Annotations[common.AnnotationLeaderElection], + }, } busConfigBytes, err := json.Marshal(eventBus.Status.Config) diff --git a/controllers/sensor/resource.go b/controllers/sensor/resource.go index 4e546e8e9d..85e5111067 100644 --- a/controllers/sensor/resource.go +++ b/controllers/sensor/resource.go @@ -148,9 +148,13 @@ func buildDeployment(args *AdaptorArgs, eventBus *eventbusv1alpha1.EventBus) (*a Value: fmt.Sprintf("eventbus-%s", args.Sensor.Namespace), }, { - Name: "POD_NAME", + Name: common.EnvVarPodName, ValueFrom: &corev1.EnvVarSource{FieldRef: &corev1.ObjectFieldSelector{FieldPath: "metadata.name"}}, }, + { + Name: common.EnvVarLeaderElection, + Value: args.Sensor.Annotations[common.AnnotationLeaderElection], + }, } busConfigBytes, err := json.Marshal(eventBus.Status.Config) diff --git a/docs/eventsources/ha.md b/docs/eventsources/ha.md index 4722085f5c..368b0308bc 100644 --- a/docs/eventsources/ha.md +++ b/docs/eventsources/ha.md @@ -53,6 +53,29 @@ old one is gone. - Redis - Resource +## Kubernetes Leader Election + +By default, Argo Events will use NATS for the HA leader election. Alternatively, +you can opt-in to a Kubernetes native leader election by specifying the following +annotation. +```yaml +annotations: + events.argoproj.io/leader-election: k8s +``` + +To use Kubernetes leader election the following RBAC rules need to be associated +with the EventSource ServiceAccount. +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: argo-events-leaderelection-role +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "create", "update"] +``` + ## More Click [here](../dr_ha_recommendations.md) to learn more information about Argo diff --git a/docs/sensors/ha.md b/docs/sensors/ha.md index 9201a52f89..7e9519b3db 100644 --- a/docs/sensors/ha.md +++ b/docs/sensors/ha.md @@ -9,5 +9,30 @@ elected to be active if the old one is gone. **Please DO NOT manually scale up the replicas, that might cause unexpected behaviors!** +## Kubernetes Leader Election + +By default, Argo Events will use NATS for the HA leader election. Alternatively, +you can opt-in to a Kubernetes native leader election by specifying the following +annotation. +```yaml +annotations: + events.argoproj.io/leader-election: k8s +``` + +To use Kubernetes leader election the following RBAC rules need to be associated +with the Sensor ServiceAccount. +```yaml +apiVersion: rbac.authorization.k8s.io/v1 +kind: Role +metadata: + name: argo-events-leaderelection-role +rules: +- apiGroups: ["coordination.k8s.io"] + resources: ["leases"] + verbs: ["get", "create", "update"] +``` + +## More + Click [here](../dr_ha_recommendations.md) to learn more information about Argo Events DR/HA recommendations. diff --git a/eventsources/cmd/start.go b/eventsources/cmd/start.go index d73302c8ae..dd492a7bfb 100644 --- a/eventsources/cmd/start.go +++ b/eventsources/cmd/start.go @@ -62,6 +62,7 @@ func Start() { logger.Infow("starting eventsource server", "version", argoevents.GetVersion()) adaptor := eventsources.NewEventSourceAdaptor(eventSource, busConfig, ebSubject, hostname, m) + if err := adaptor.Start(ctx); err != nil { logger.Fatalw("failed to start eventsource server", zap.Error(err)) } diff --git a/eventsources/eventing.go b/eventsources/eventing.go index f5dffee9b1..b0d16c8cd4 100644 --- a/eventsources/eventing.go +++ b/eventsources/eventing.go @@ -377,26 +377,31 @@ func (e *EventSourceAdaptor) Start(ctx context.Context) error { for _, esType := range apicommon.RecreateStrategyEventSources { recreateTypes[esType] = true } - isRecreatType := false + isRecreateType := false servers, filters := GetEventingServers(e.eventSource, e.metrics) for k := range servers { if _, ok := recreateTypes[k]; ok { - isRecreatType = true + isRecreateType = true } // This is based on the presumption that all the events in one // EventSource object use the same type of deployment strategy break } - if !isRecreatType { + + if !isRecreateType { return e.run(ctx, servers, filters) } - custerName := fmt.Sprintf("%s-eventsource-%s", e.eventSource.Namespace, e.eventSource.Name) - elector, err := leaderelection.NewEventBusElector(ctx, *e.eventBusConfig, custerName, int(e.eventSource.Spec.GetReplicas())) + clusterName := fmt.Sprintf("%s-eventsource-%s", e.eventSource.Namespace, e.eventSource.Name) + replicas := int(e.eventSource.Spec.GetReplicas()) + leasename := fmt.Sprintf("eventsource-%s", e.eventSource.Name) + + elector, err := leaderelection.NewElector(ctx, *e.eventBusConfig, clusterName, replicas, e.eventSource.Namespace, leasename, e.hostname) if err != nil { log.Errorw("failed to get an elector", zap.Error(err)) return err } + elector.RunOrDie(ctx, leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { if err := e.run(ctx, servers, filters); err != nil { diff --git a/sensors/listener.go b/sensors/listener.go index 7d89a5106e..300ba3af2a 100644 --- a/sensors/listener.go +++ b/sensors/listener.go @@ -55,12 +55,16 @@ func subscribeOnce(subLock *uint32, subscribe func()) { func (sensorCtx *SensorContext) Start(ctx context.Context) error { log := logging.FromContext(ctx) - custerName := fmt.Sprintf("%s-sensor-%s", sensorCtx.sensor.Namespace, sensorCtx.sensor.Name) - elector, err := leaderelection.NewEventBusElector(ctx, *sensorCtx.eventBusConfig, custerName, int(sensorCtx.sensor.Spec.GetReplicas())) + clusterName := fmt.Sprintf("%s-sensor-%s", sensorCtx.sensor.Namespace, sensorCtx.sensor.Name) + replicas := int(sensorCtx.sensor.Spec.GetReplicas()) + leasename := fmt.Sprintf("sensor-%s", sensorCtx.sensor.Name) + + elector, err := leaderelection.NewElector(ctx, *sensorCtx.eventBusConfig, clusterName, replicas, sensorCtx.sensor.Namespace, leasename, sensorCtx.hostname) if err != nil { log.Errorw("failed to get an elector", zap.Error(err)) return err } + elector.RunOrDie(ctx, leaderelection.LeaderCallbacks{ OnStartedLeading: func(ctx context.Context) { if err := sensorCtx.listenEvents(ctx); err != nil { @@ -71,6 +75,7 @@ func (sensorCtx *SensorContext) Start(ctx context.Context) error { log.Fatalf("leader lost: %s", sensorCtx.hostname) }, }) + return nil } diff --git a/test/e2e/functional_test.go b/test/e2e/functional_test.go index 117d1c0e72..9046fa8093 100644 --- a/test/e2e/functional_test.go +++ b/test/e2e/functional_test.go @@ -69,24 +69,31 @@ func (s *FunctionalSuite) TestCreateCalendarEventSource() { } func (s *FunctionalSuite) TestCreateCalendarEventSourceWithHA() { - t1 := s.Given().EventSource("@testdata/es-calendar-ha.yaml"). - When(). - CreateEventSource(). - WaitForEventSourceReady(). - Wait(3 * time.Second). - Then(). - ExpectEventSourcePodLogContains(LogPublishEventSuccessful) - - defer t1.When().DeleteEventSource() - - t2 := s.Given().Sensor("@testdata/sensor-log-ha.yaml"). - When(). - CreateSensor(). - WaitForSensorReady(). - Wait(3 * time.Second). - Then(). - ExpectSensorPodLogContains(LogTriggerActionSuccessful("log-trigger")) - defer t2.When().DeleteSensor() + for _, test := range []struct { + es, s string + }{ + {"@testdata/es-calendar-ha.yaml", "@testdata/sensor-log-ha.yaml"}, + {"@testdata/es-calendar-ha-k8s.yaml", "@testdata/sensor-log-ha-k8s.yaml"}, + } { + t1 := s.Given().EventSource(test.es). + When(). + CreateEventSource(). + WaitForEventSourceReady(). + Wait(3 * time.Second). + Then(). + ExpectEventSourcePodLogContains(LogPublishEventSuccessful) + + defer t1.When().DeleteEventSource() + + t2 := s.Given().Sensor(test.s). + When(). + CreateSensor(). + WaitForSensorReady(). + Wait(3 * time.Second). + Then(). + ExpectSensorPodLogContains(LogTriggerActionSuccessful("log-trigger")) + defer t2.When().DeleteSensor() + } } func (s *FunctionalSuite) TestMetricsWithCalendar() { diff --git a/test/e2e/testdata/es-calendar-ha-k8s.yaml b/test/e2e/testdata/es-calendar-ha-k8s.yaml new file mode 100644 index 0000000000..6e0e61f640 --- /dev/null +++ b/test/e2e/testdata/es-calendar-ha-k8s.yaml @@ -0,0 +1,13 @@ +apiVersion: argoproj.io/v1alpha1 +kind: EventSource +metadata: + annotations: + events.argoproj.io/leader-election: k8s + name: e2e-calendar-ha-k8s +spec: + replicas: 2 + template: + serviceAccountName: argo-events-sa + calendar: + example: + interval: 2s diff --git a/test/e2e/testdata/sensor-log-ha-k8s.yaml b/test/e2e/testdata/sensor-log-ha-k8s.yaml new file mode 100644 index 0000000000..4896c753a9 --- /dev/null +++ b/test/e2e/testdata/sensor-log-ha-k8s.yaml @@ -0,0 +1,18 @@ +apiVersion: argoproj.io/v1alpha1 +kind: Sensor +metadata: + annotations: + events.argoproj.io/leader-election: k8s + name: e2e-log-ha-k8s +spec: + replicas: 2 + template: + serviceAccountName: argo-events-sa + dependencies: + - name: test-dep + eventSourceName: e2e-calendar-ha-k8s + eventName: example + triggers: + - template: + name: log-trigger + log: {}