Skip to content

Commit

Permalink
fix!: correctly report async Kafka sink errors, retry always. Fixes #494
Browse files Browse the repository at this point in the history
 (#495)
  • Loading branch information
alexec authored Nov 3, 2021
1 parent 894c83d commit 85b9d28
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 26 deletions.
27 changes: 6 additions & 21 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package kafka
import (
"context"
"fmt"
"math"
"time"

dfv1 "github.com/argoproj-labs/argo-dataflow/api/v1alpha1"
Expand All @@ -12,7 +13,6 @@ import (
kafka "github.com/confluentinc/confluent-kafka-go/kafka"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/util/wait"
corev1 "k8s.io/client-go/kubernetes/typed/core/v1"
)
Expand All @@ -26,23 +26,7 @@ type kafkaSink struct {
async bool
}

var kafkaMessagesProducedSuccess, kafkaMessagesProducedErr *prometheus.CounterVec

func init() {
// track async success and errors
kafkaMessagesProducedSuccess = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produced_successes",
Help: "Number of messages successfully produced to Kafka",
}, []string{"sinkName"})
kafkaMessagesProducedErr = promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sinks",
Name: "kafka_produce_errors",
Help: "Number of errors while producing messages to Kafka",
}, []string{"sinkName"})
}

func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.KafkaSink) (sink.Interface, error) {
func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInterface, x dfv1.KafkaSink, errorsCounter prometheus.Counter) (sink.Interface, error) {
logger := logger.WithValues("sink", sinkName)
config, err := sharedkafka.GetConfig(ctx, secretInterface, x.KafkaConfig)
if err != nil {
Expand All @@ -58,6 +42,9 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
config["compression.type"] = x.CompressionType
config["acks"] = x.GetAcks()
config["enable.idempotence"] = x.EnableIdempotence
if x.Async { // this is meant to be set by `enable.idempotence` automatically, but I'm not sure it is
config["retries"] = math.MaxInt32
}

logger.Info("kafka config", "config", sharedutil.MustJSON(sharedkafka.RedactConfigMap(config)))

Expand All @@ -80,9 +67,7 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
case *kafka.Message:
if err := ev.TopicPartition.Error; err != nil {
logger.Error(err, "Async to Kafka failed", "topic", x.Topic)
kafkaMessagesProducedErr.WithLabelValues(sinkName).Inc()
} else {
kafkaMessagesProducedSuccess.WithLabelValues(sinkName).Inc()
errorsCounter.Inc()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion runner/sidecar/sinks.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func connectSinks(ctx context.Context) (func(context.Context, []byte) error, fun
return nil, nil, err
}
} else if x := s.Kafka; x != nil {
if sink, err = kafka.New(ctx, sinkName, secretInterface, *x); err != nil {
if sink, err = kafka.New(ctx, sinkName, secretInterface, *x, errorsCounter.WithLabelValues(sinkName, fmt.Sprint(replica), fmt.Sprint(s.DeadLetterQueue))); err != nil {
return nil, nil, err
}
} else if x := s.Log; x != nil {
Expand Down
4 changes: 0 additions & 4 deletions test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ func TestKafkaAsync(t *testing.T) {
WaitForTotalSourceMessages(17)
WaitForTotalSunkMessages(17)

ExpectMetric("sinks_kafka_produced_successes", Eq(17))

DeletePipelines()
WaitForPodsToBeDeleted()
}
Expand Down Expand Up @@ -116,8 +114,6 @@ func TestKafkaMultipleSink(t *testing.T) {
WaitForTotalSourceMessages(17)
WaitForTotalSunkMessages(17)

ExpectMetric("sinks_kafka_produced_successes", Eq(17))

DeletePipelines()
WaitForPodsToBeDeleted()
}

0 comments on commit 85b9d28

Please sign in to comment.