Skip to content

Commit

Permalink
fix: fix Kafka failure mode (#395)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 4, 2021
1 parent 300af6c commit 7376f50
Show file tree
Hide file tree
Showing 22 changed files with 241 additions and 167 deletions.
4 changes: 4 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,12 @@ jobs:
if: ${{matrix.suite == 'e2e' || matrix.suite == 'examples'}}
- run: docker pull golang:1.16
- run: make wait
- run: make logs > /tmp/logs &
- run: make test-${{matrix.suite}}
- run: git diff
- name: cat logs
if: ${{ failure() }}
run: cat /tmp/logs
- name: controller logs
if: ${{ failure() }}
run: kubectl -n argo-dataflow-system logs deploy/controller-manager -c manager
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ test-stan-stress:
test-%:
go generate $(shell find ./test/$* -name '*.go')
kubectl -n argo-dataflow-system wait pod -l statefulset.kubernetes.io/pod-name --for condition=ready --timeout=2m
go test -count 1 -v --tags test ./test/$*
go test -failfast -count 1 -v --tags test ./test/$*

pprof:
go tool pprof -web http://127.0.0.1:3569/debug/pprof/allocs
Expand All @@ -93,6 +93,9 @@ wait:
kubectl -n argo-dataflow-system wait deploy --all --for=condition=available --timeout=2m
# kubectl wait does not work for statesfulsets, as statefulsets do not have conditions
kubectl -n argo-dataflow-system wait pod -l statefulset.kubernetes.io/pod-name --for condition=ready
$(GOBIN)/stern:
curl -Lo $(GOBIN)/stern https://github.com/wercker/stern/releases/download/1.11.0/stern_`uname -s|tr '[:upper:]' '[:lower:]'`_amd64
chmod +x $(GOBIN)/stern
logs: $(GOBIN)/stern
stern -n argo-dataflow-system --tail=3 -l dataflow.argoproj.io/step-name .

Expand Down
9 changes: 9 additions & 0 deletions runner/sidecar/sink/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,15 @@ func New(ctx context.Context, sinkName string, secretInterface corev1.SecretInte
if x.MaxMessageBytes > 0 {
config["message.max.bytes"] = x.Kafka.MaxMessageBytes
}
// https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html
config["batch.size"] = 100000
if x.Async {
config["linger.ms"] = 50 // Alias for queue.buffering.max.ms
} else {
config["linger.ms"] = 0
}
config["compression.type"] = "lz4"
config["request.required.acks"] = "all"
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/producer_example/producer_example.go
producer, err := kafka.NewProducer(&config)
if err != nil {
Expand Down
50 changes: 30 additions & 20 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@ package kafka
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"sync"
"time"

Expand Down Expand Up @@ -31,6 +33,11 @@ type kafkaSource struct {
totalLag int64
}

const (
seconds = 1000
pendingUnavailable = math.MinInt32
)

func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monitor.Interface, consumerGroupID, sourceName, sourceURN string, replica int, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
logger := sharedutil.NewLogger().WithValues("source", sourceName)
config, err := sharedkafka.GetConfig(ctx, secretInterface, x.KafkaConfig)
Expand All @@ -39,14 +46,19 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit
}
config["group.id"] = consumerGroupID
config["group.instance.id"] = fmt.Sprintf("%s/%d", consumerGroupID, replica)
config["heartbeat.interval.ms"] = 3 * seconds
config["socket.keepalive.enable"] = true
config["enable.auto.commit"] = false
config["enable.auto.offset.store"] = false
if x.StartOffset == "First" {
config["auto.offset.reset"] = "earliest"
} else {
config["auto.offset.reset"] = "latest"
}
config["statistics.interval.ms"] = 5 * 1000
config["statistics.interval.ms"] = 5 * seconds
// https://docs.confluent.io/cloud/current/client-apps/optimizing/throughput.html
// config["fetch.min.bytes"] = 100000
// config["fetch.max.wait.ms"] = seconds / 2
logger.Info("Kafka config", "config", sharedutil.MustJSON(sharedkafka.RedactConfigMap(config)))
// https://github.com/confluentinc/confluent-kafka-go/blob/master/examples/consumer_example/consumer_example.go
consumer, err := kafka.NewConsumer(&config)
Expand All @@ -63,6 +75,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit
channels: map[int32]chan *kafka.Message{}, // partition -> messages
wg: &sync.WaitGroup{},
process: process,
totalLag: pendingUnavailable,
}

if err = consumer.Subscribe(x.Topic, func(consumer *kafka.Consumer, event kafka.Event) error {
Expand Down Expand Up @@ -111,22 +124,6 @@ func (s *kafkaSource) revokedPartition(partition int32) {
}
}

type Stats struct {
Topics map[string]struct {
Partitions map[string]struct {
ConsumerLag int64 `json:"consumer_lag"`
} `json:"partitions"`
} `json:"topics"`
}

func (s Stats) totalLag(topic string) int64 {
var totalLag int64
for _, p := range s.Topics[topic].Partitions {
totalLag += p.ConsumerLag
}
return totalLag
}

func (s *kafkaSource) startPollLoop(ctx context.Context) {
s.logger.Info("starting poll loop")
for {
Expand Down Expand Up @@ -182,7 +179,9 @@ func (s *kafkaSource) Close() error {
}

func (s *kafkaSource) GetPending(context.Context) (uint64, error) {
if s.totalLag >= 0 {
if s.totalLag == pendingUnavailable {
return 0, source.ErrPendingUnavailable
} else if s.totalLag >= 0 {
return uint64(s.totalLag), nil
} else {
return 0, nil
Expand All @@ -208,18 +207,29 @@ func (s *kafkaSource) consumePartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
logger.Info("consuming partition")
s.wg.Add(1)
var firstCommittedOffset, lastCommittedOffset int64 = -1, -1
defer func() {
logger.Info("done consuming partition")
logger.Info("done consuming partition", "firstCommittedOffset", firstCommittedOffset, "lastCommittedOffset", lastCommittedOffset)
s.wg.Done()
}()
for msg := range s.channels[partition] {
offset := int64(msg.TopicPartition.Offset)
logger := logger.WithValues("offset", offset)
if err := s.processMessage(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
if errors.Is(err, context.Canceled) {
logger.Info("failed to process message", "err", err.Error())
} else {
logger.Error(err, "failed to process message")
}
} else {
if _, err := s.consumer.CommitMessage(msg); err != nil {
logger.Error(err, "failed to commit message")
} else {
if firstCommittedOffset == -1 {
firstCommittedOffset = offset
logger.Info("offset", "firstCommittedOffset", firstCommittedOffset)
}
lastCommittedOffset = offset
}
}
}
Expand Down
17 changes: 17 additions & 0 deletions runner/sidecar/source/kafka/stats.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
package kafka

type Stats struct {
Topics map[string]struct {
Partitions map[string]struct {
ConsumerLag int64 `json:"consumer_lag"`
} `json:"partitions"`
} `json:"topics"`
}

func (s Stats) totalLag(topic string) int64 {
var totalLag int64
for _, p := range s.Topics[topic].Partitions {
totalLag += p.ConsumerLag
}
return totalLag
}
5 changes: 5 additions & 0 deletions runner/sidecar/source/source.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package source

import (
"context"
"errors"
"io"
)

Expand All @@ -11,7 +12,11 @@ type Interface interface {

type Process func(ctx context.Context, msg []byte) error

var ErrPendingUnavailable = errors.New("pending not available")

type HasPending interface {
Interface
// GetPending returns the number of pending messages.
// It may return ErrPendingUnavailable if this is not available yet.
GetPending(ctx context.Context) (uint64, error)
}
8 changes: 6 additions & 2 deletions runner/sidecar/sources.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package sidecar

import (
"context"
"errors"
"fmt"
"time"

Expand Down Expand Up @@ -91,7 +92,6 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
totalBytesCounter.WithLabelValues(sourceName, fmt.Sprint(replica)).Add(float64(len(msg)))

meta, err := dfv1.MetaFromContext(ctx)

if err != nil {
return fmt.Errorf("could not send message: %w", err)
}
Expand Down Expand Up @@ -190,7 +190,11 @@ func connectSources(ctx context.Context, process func(context.Context, []byte) e
logger.Info("starting pending loop", "source", sourceName, "updateInterval", updateInterval.String())
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
if pending, err := x.GetPending(ctx); err != nil {
logger.Error(err, "failed to get pending", "source", sourceName)
if errors.Is(err, source.ErrPendingUnavailable) {
logger.Info("failed to get pending", "source", sourceName, "err", err.Error())
} else {
logger.Error(err, "failed to get pending", "source", sourceName)
}
} else {
logger.Info("got pending", "source", sourceName, "pending", pending)
pendingGauge.WithLabelValues(sourceName).Set(float64(pending))
Expand Down
1 change: 0 additions & 1 deletion test/db-e2e/db_source_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ func TestDBSource(t *testing.T) {

WaitForSunkMessages()
WaitForTotalSunkMessages(3)
WaitForProcessLatencySeconds(0)

DeletePipelines()
WaitForPodsToBeDeleted()
Expand Down
12 changes: 2 additions & 10 deletions test/fixtures.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package test

import (
"log"
"runtime"
"testing"

"k8s.io/client-go/dynamic"
Expand Down Expand Up @@ -46,21 +45,14 @@ func Setup(t *testing.T) (teardown func()) {
log.Printf("\n")
stopTestAPIPortForward()
log.Printf("\n")
r := recover() // tests should panic on error, we recover so we can run other tests
tailLogs := runtime.GOOS != "darwin" // we're probably running on CI
r := recover() // tests should panic on error, we recover so we can run other tests
if r != nil {
log.Printf("❌ FAIL: %s %v\n", t.Name(), r)
log.Printf("\n")
if tailLogs {
TailLogs()
}
t.Fail()
} else if t.Failed() {
log.Printf("❌ FAIL: %s\n", t.Name())
if tailLogs {
log.Printf("\n")
}
TailLogs()
log.Printf("\n")
} else {
log.Printf("✅ PASS: %s\n", t.Name())
log.Printf("\n")
Expand Down
8 changes: 4 additions & 4 deletions test/kafka-e2e/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ func TestKafka(t *testing.T) {
topic := CreateKafkaTopic()
sinkTopic := CreateKafkaTopic()

name := CreatePipeline(Pipeline{
CreatePipeline(Pipeline{
ObjectMeta: metav1.ObjectMeta{GenerateName: "kafka-"},
Spec: PipelineSpec{
Steps: []StepSpec{{
Expand All @@ -37,7 +37,7 @@ func TestKafka(t *testing.T) {

PumpKafkaTopic(topic, 17)

defer StartPortForward(name + "-main-0")()
defer StartPortForward()()
WaitForSunkMessages()

WaitForTotalSourceMessages(17)
Expand All @@ -53,7 +53,7 @@ func TestKafkaAsync(t *testing.T) {
topic := CreateKafkaTopic()
sinkTopic := CreateKafkaTopic()

name := CreatePipeline(Pipeline{
CreatePipeline(Pipeline{
ObjectMeta: metav1.ObjectMeta{GenerateName: "kafka-"},
Spec: PipelineSpec{
Steps: []StepSpec{{
Expand All @@ -67,7 +67,7 @@ func TestKafkaAsync(t *testing.T) {
WaitForPipeline()
WaitForPod()

defer StartPortForward(name + "-main-0")()
defer StartPortForward()()

PumpKafkaTopic(topic, 17)

Expand Down
Loading

0 comments on commit 7376f50

Please sign in to comment.