diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel.go b/kafka/channel/pkg/reconciler/controller/kafkachannel.go index 2f7b815997..d6fb283d41 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel.go @@ -49,7 +49,6 @@ import ( pkgreconciler "knative.dev/pkg/reconciler" "knative.dev/eventing-contrib/kafka/channel/pkg/apis/messaging/v1alpha1" - kafkaclientset "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned" kafkaScheme "knative.dev/eventing-contrib/kafka/channel/pkg/client/clientset/versioned/scheme" kafkaChannelReconciler "knative.dev/eventing-contrib/kafka/channel/pkg/client/injection/reconciler/messaging/v1alpha1/kafkachannel" listers "knative.dev/eventing-contrib/kafka/channel/pkg/client/listers/messaging/v1alpha1" @@ -109,9 +108,8 @@ type Reconciler struct { systemNamespace string dispatcherImage string - kafkaConfig *utils.KafkaConfig - kafkaConfigError error - kafkaClientSet kafkaclientset.Interface + clusterKafkaConfig *utils.KafkaConfig + clusterKafkaConfigError error // Using a shared kafkaClusterAdmin does not work currently because of an issue with // Shopify/sarama, see https://github.com/Shopify/sarama/issues/1162. @@ -150,12 +148,12 @@ func (r *Reconciler) ReconcileKind(ctx context.Context, kc *v1alpha1.KafkaChanne return err } - if r.kafkaConfig == nil { - if r.kafkaConfigError == nil { - r.kafkaConfigError = errors.New("The config map 'config-kafka' does not exist") + if r.clusterKafkaConfig == nil { + if r.clusterKafkaConfigError == nil { + r.clusterKafkaConfigError = errors.New("The config map 'config-kafka' does not exist") } - kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.kafkaConfigError) - return r.kafkaConfigError + kc.Status.MarkConfigFailed("MissingConfiguration", "%v", r.clusterKafkaConfigError) + return r.clusterKafkaConfigError } kafkaClusterAdmin, err := r.createClient(ctx, kc) @@ -433,7 +431,7 @@ func (r *Reconciler) createClient(ctx context.Context, kc *v1alpha1.KafkaChannel kafkaClusterAdmin := r.kafkaClusterAdmin if kafkaClusterAdmin == nil { var err error - kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.kafkaConfig.Brokers) + kafkaClusterAdmin, err = resources.MakeClient(controllerAgentName, r.clusterKafkaConfig.Brokers) if err != nil { return nil, err } @@ -484,14 +482,14 @@ func (r *Reconciler) updateKafkaConfig(ctx context.Context, configMap *corev1.Co } // For now just override the previous config. // Eventually the previous config should be snapshotted to delete Kafka topics - r.kafkaConfig = kafkaConfig - r.kafkaConfigError = err + r.clusterKafkaConfig = kafkaConfig + r.clusterKafkaConfigError = err } func (r *Reconciler) FinalizeKind(ctx context.Context, kc *v1alpha1.KafkaChannel) pkgreconciler.Event { // Do not attempt retrying creating the client because it might be a permanent error // in which case the finalizer will never get removed. - if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.kafkaConfig != nil { + if kafkaClusterAdmin, err := r.createClient(ctx, kc); err == nil && r.clusterKafkaConfig != nil { if err := r.deleteTopic(ctx, kc, kafkaClusterAdmin); err != nil { return err } diff --git a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go index cd50396956..c165016e38 100644 --- a/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go +++ b/kafka/channel/pkg/reconciler/controller/kafkachannel_test.go @@ -326,7 +326,7 @@ func TestAllCases(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), @@ -336,11 +336,10 @@ func TestAllCases(t *testing.T) { serviceLister: listers.GetServiceLister(), endpointsLister: listers.GetEndpointsLister(), kafkaClusterAdmin: &mockClusterAdmin{}, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -384,7 +383,7 @@ func TestTopicExists(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), @@ -402,11 +401,10 @@ func TestTopicExists(t *testing.T) { } }, }, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) } @@ -454,7 +452,7 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { r := &Reconciler{ systemNamespace: testNS, dispatcherImage: testDispatcherImage, - kafkaConfig: &KafkaConfig{ + clusterKafkaConfig: &KafkaConfig{ Brokers: []string{brokerName}, }, kafkachannelLister: listers.GetKafkaChannelLister(), @@ -472,11 +470,10 @@ func TestDeploymentUpdatedOnImageChange(t *testing.T) { } }, }, - kafkaClientSet: fakekafkaclient.Get(ctx), KubeClientSet: kubeclient.Get(ctx), EventingClientSet: eventingClient.Get(ctx), } - return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), r.kafkaClientSet, listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) + return kafkachannel.NewReconciler(ctx, logging.FromContext(ctx), fakekafkaclient.Get(ctx), listers.GetKafkaChannelLister(), controller.GetEventRecorder(ctx), r) }, zap.L())) }