diff --git a/runner/sidecar/sink/kafka/kafka.go b/runner/sidecar/sink/kafka/kafka.go index e0af749f..ecb0cd06 100644 --- a/runner/sidecar/sink/kafka/kafka.go +++ b/runner/sidecar/sink/kafka/kafka.go @@ -3,6 +3,7 @@ package kafka import ( "context" "fmt" + "math" "time" dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1" @@ -12,7 +13,6 @@ import ( kafka "github.com/confluentinc/confluent-kafka-go/kafka" "github.com/opentracing/opentracing-go" "github.com/prometheus/client_golang/prometheus" - "github.com/prometheus/client_golang/prometheus/promauto" "k8s.io/apimachinery/pkg/util/wait" corev1 "k8s.io/client-go/kubernetes/typed/core/v1" ) @@ -26,23 +26,7 @@ type kafkaSink struct { async bool } -var kafkaMessagesProducedSuccess, kafkaMessagesProducedErr *prometheus.CounterVec - -func init() { - // track async success and errors - kafkaMessagesProducedSuccess = promauto.NewCounterVec(prometheus.CounterOpts{ - Subsystem: "sinks", - Name: "kafka_produced_successes", - Help: "Number of messages successfully produced to Kafka", - }, []string{"sinkName"}) - kafkaMessagesProducedErr = promauto.NewCounterVec(prometheus.CounterOpts{ - Subsystem: "sinks", - Name: "kafka_produce_errors", - Help: "Number of errors while producing messages to Kafka", - }, []string{"sinkName"}) -} - -func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.KafkaSink) (sink.Interface, error) { +func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.KafkaSink, errorsCounter prometheus.Counter) (sink.Interface, error) { logger := logger.WithValues("sink", sinkName) config, err := sharedkafka.GetConfig(ctx, secretInterface, x.KafkaConfig) if err != nil { @@ -58,6 +42,9 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte config["compression.type"] = x.CompressionType config["acks"] = x.GetAcks() config["enable.idempotence"] = x.EnableIdempotence + if x.Async { // this is meant to be set by `enable.idempotence` automatically, but I'm not sure it is + config["retries"] = math.MaxInt32 + } logger.Info("kafka config", "config", sharedutil.MustJSON(sharedkafka.RedactConfigMap(config))) @@ -80,9 +67,7 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte case *kafka.Message: if err := ev.TopicPartition.Error; err != nil { logger.Error(err, "Async to Kafka failed", "topic", x.Topic) - kafkaMessagesProducedErr.WithLabelValues(sinkName).Inc() - } else { - kafkaMessagesProducedSuccess.WithLabelValues(sinkName).Inc() + errorsCounter.Inc() } } } diff --git a/runner/sidecar/sinks.go b/runner/sidecar/sinks.go index c2174c5e..6531371b 100644 --- a/runner/sidecar/sinks.go +++ b/runner/sidecar/sinks.go @@ -46,7 +46,7 @@ func connectSinks(ctx context.Context) (func(context.Context, []byte) error, fun return nil, nil, err } } else if x := s.Kafka; x != nil { - if sink, err = kafka.New(ctx, sinkName, secretInterface, *x); err != nil { + if sink, err = kafka.New(ctx, sinkName, secretInterface, *x, errorsCounter.WithLabelValues(sinkName, fmt.Sprint(replica), fmt.Sprint(s.DeadLetterQueue))); err != nil { return nil, nil, err } } else if x := s.Log; x != nil { diff --git a/test/kafka-e2e/kafka_test.go b/test/kafka-e2e/kafka_test.go index adb8c69c..dca5beab 100644 --- a/test/kafka-e2e/kafka_test.go +++ b/test/kafka-e2e/kafka_test.go @@ -77,8 +77,6 @@ func TestKafkaAsync(t *testing.T) { WaitForTotalSourceMessages(17) WaitForTotalSunkMessages(17) - ExpectMetric("sinks_kafka_produced_successes", Eq(17)) - DeletePipelines() WaitForPodsToBeDeleted() } @@ -116,8 +114,6 @@ func TestKafkaMultipleSink(t *testing.T) { WaitForTotalSourceMessages(17) WaitForTotalSunkMessages(17) - ExpectMetric("sinks_kafka_produced_successes", Eq(17)) - DeletePipelines() WaitForPodsToBeDeleted() }