From f826af80a2be67ed059a6d7fa658d362c2aec3f5 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Tue, 5 May 2020 15:06:04 +0300 Subject: [PATCH 1/2] Rename cluster level things --- .../pkg/reconciler/controller/kafkachannel.go | 24 +++++++++---------- .../controller/kafkachannel_test.go | 6 ++--- 2 files changed, 15 insertions(+), 15 deletions(-) diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index 2f7b815997..d192ebfaff 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -109,9 +109,9 @@ type Reconciler struct { systemNamespace string dispatcherImage string - kafkaConfig *utils.KafkaConfig - kafkaConfigError error - kafkaClientSet kafkaclientset.Interface + clusterKafkaConfig *utils.KafkaConfig + clusterKafkaConfigError error + kafkaClientSet kafkaclientset.Interface // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. @@ -150,12 +150,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne return err } - if r.kafkaConfig == nil { - if r.kafkaConfigError == nil { - r.kafkaConfigError = errors.New("The config map 'config-kafka' does not exist") + if r.clusterKafkaConfig == nil { + if r.clusterKafkaConfigError == nil { + r.clusterKafkaConfigError = errors.New("The config map 'config-kafka' does not exist") } - kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.kafkaConfigError) - return r.kafkaConfigError + kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.clusterKafkaConfigError) + return r.clusterKafkaConfigError } kafkaClusterAdmin, err := r.createClient(ctx, kc) @@ -433,7 +433,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel kafkaClusterAdmin := r.kafkaClusterAdmin if kafkaClusterAdmin == nil { var err error - kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.kafkaConfig.Brokers) + kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.clusterKafkaConfig.Brokers) if err != nil { return nil, err } @@ -484,14 +484,14 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co } // For now just override the previous config. // Eventually the previous config should be snapshotted to delete Kafka topics - r.kafkaConfig = kafkaConfig - r.kafkaConfigError = err + r.clusterKafkaConfig = kafkaConfig + r.clusterKafkaConfigError = err } func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { // Do not attempt retrying creating the client because it might be a permanent error // in which case the finalizer will never get removed. - if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil { + if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.clusterKafkaConfig != nil { if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil { return err } diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index cd50396956..2cfece82a0 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -326,7 +326,7 @@ func TestAllCases(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), @@ -384,7 +384,7 @@ func TestTopicExists(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), @@ -454,7 +454,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), From b40202e5b37ddf85e040a01327eb775f3d744237 Mon Sep 17 00:00:00 2001 From: Ali Ok Date: Tue, 5 May 2020 15:07:22 +0300 Subject: [PATCH 2/2] Get rid of unused clientset in kafka channel reconciler --- kafka/channel/pkg/reconciler/controller/kafkachannel.go | 2 -- .../pkg/reconciler/controller/kafkachannel_test.go | 9 +++------ 2 files changed, 3 insertions(+), 8 deletions(-) diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index d192ebfaff..d6fb283d41 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -49,7 +49,6 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" - kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme" kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" @@ -111,7 +110,6 @@ type Reconciler struct { clusterKafkaConfig *utils.KafkaConfig clusterKafkaConfigError error - kafkaClientSet kafkaclientset.Interface // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index 2cfece82a0..c165016e38 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -336,11 +336,10 @@ func TestAllCases(t *testing.T) { serviceLister: listers.GetServiceLister(), endpointsLister: listers.GetEndpointsLister(), kafkaClusterAdmin: &mockClusterAdmin{}, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -402,11 +401,10 @@ func TestTopicExists(t *testing.T) { } }, }, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -472,11 +470,10 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { } }, }, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) }