Skip to content

Commit

Permalink
feat: ignored duplicate Kafka messages (#373)
Browse files Browse the repository at this point in the history
  • Loading branch information
alexec authored Sep 23, 2021
1 parent 1e37143 commit 1b91c71
Show file tree
Hide file tree
Showing 8 changed files with 267 additions and 62 deletions.
4 changes: 3 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,13 @@ pprof:

pre-commit: codegen proto test install runner testapi lint start

codegen: generate manifests examples tests
codegen: generate manifests examples tests $(GOBIN)/mockery
go generate ./...

$(GOBIN)/goreman:
go install github.com/mattn/[email protected]
$(GOBIN)/mockery:
go install github.com/vektra/mockery/[email protected]

# Run against the configured Kubernetes cluster in ~/.kube/config
start: deploy build runner $(GOBIN)/goreman wait
Expand Down
118 changes: 118 additions & 0 deletions runner/sidecar/monitor/impl.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,118 @@
package monitor

import (
"context"
"fmt"
"strconv"
"sync"
"time"

sharedutil "github.com/argoproj-labs/argo-dataflow/shared/util"
"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"k8s.io/apimachinery/pkg/util/wait"
)

var (
logger = sharedutil.NewLogger()
duplicateCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: "sources",
Name: "duplicate",
Help: "Total number of duplicate messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_duplicate",
},
[]string{"sourceName"},
)
missingCounter = promauto.NewCounterVec(
prometheus.CounterOpts{
Subsystem: "sources",
Name: "missing",
Help: "Total number of missing messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_missing",
},
[]string{"sourceName"},
)
)

//go:generate mockery --exported --name=storage

type storage interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
}

type redisStorage struct {
rdb redis.Cmdable
}

func (r *redisStorage) Get(ctx context.Context, key string) (string, error) {
return r.rdb.Get(ctx, key).Result()
}

func (r *redisStorage) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return r.rdb.Set(ctx, key, value, expiration).Err()
}

type impl struct {
mu sync.Mutex
db map[string]int64
pipelineName string
stepName string
storage storage
}

func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) (bool, error) {
i.mu.Lock()
defer i.mu.Unlock()
key := fmt.Sprintf("%s/%s/%s/%d/offset", i.pipelineName, i.stepName, sourceURN, partition)
if _, ok := i.db[key]; !ok {
text, _ := i.storage.Get(ctx, key)
if text == "" { // assume that this is the first time, and we are continuous
i.db[key] = offset - 1
} else {
lastOffset, err := strconv.ParseInt(text, 10, 64)
if err != nil {
return false, err
}
i.db[key] = lastOffset
}
}
lastOffset := i.db[key]
expectedOffset := lastOffset + 1
if offset < expectedOffset {
duplicateCounter.WithLabelValues(sourceName).Inc()
return false, nil
}
if offset > expectedOffset {
missingCounter.WithLabelValues(sourceName).Inc()
} else {
i.db[key] = offset
}
return true, nil
}

func (i *impl) commitOffsets(ctx context.Context) {
i.mu.Lock()
defer i.mu.Unlock()
for key, offset := range i.db {
if err := i.storage.Set(ctx, key, offset, time.Hour*24*30); err != nil {
logger.Error(err, "failed to set bit", "key", key, "offset", offset)
}
}
}

func New(ctx context.Context, pipelineName, stepName string) Interface {
i := &impl{
sync.Mutex{},
map[string]int64{},
pipelineName,
stepName,
&redisStorage{redis.NewClient(&redis.Options{
Addr: "redis:6379",
})},
}

go wait.JitterUntilWithContext(ctx, i.commitOffsets, 3*time.Second, 1.2, true)

return i
}
77 changes: 77 additions & 0 deletions runner/sidecar/monitor/impl_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package monitor

import (
"context"
"errors"
"sync"
"testing"
"time"

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/monitor/mocks"
"github.com/prometheus/client_golang/prometheus"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
)

func Test_impl_Accept(t *testing.T) {
ctx := context.Background()
rdb := &mocks.Storage{}
rdb.On("Get", ctx, "my-pl/my-step/my-urn/1/offset").Return("", errors.New(""))
rdb.On("Get", ctx, "my-pl/my-step/my-urn/2/offset").Return("1", nil)
i := &impl{
mu: sync.Mutex{},
db: map[string]int64{},
pipelineName: "my-pl",
stepName: "my-step",
storage: rdb,
}
t.Run("EmptyStorage", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 1, 1)
assert.NoError(t, err)
assert.True(t, accept)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("ExistingStorage", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.NoError(t, err)
assert.True(t, accept)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("RepeatedOffset", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 2)
assert.NoError(t, err)
assert.False(t, accept)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("SkippedOffset", func(t *testing.T) {
accept, err := i.Accept(ctx, "my-source", "my-urn", 2, 4)
assert.NoError(t, err)
assert.True(t, accept)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 1, missing(t))
})
thirtyDays := time.Hour * 24 * 30
rdb.On("Set", ctx, "my-pl/my-step/my-urn/1/offset", int64(1), thirtyDays).Return(nil)
rdb.On("Set", ctx, "my-pl/my-step/my-urn/2/offset", int64(2), thirtyDays).Return(nil)
t.Run("CommitOffsets", func(t *testing.T) {
i.commitOffsets(ctx)
})
}

func duplicate(t *testing.T) int {
return counter(t, duplicateCounter)
}

func missing(t *testing.T) int {
return counter(t, missingCounter)
}

func counter(t *testing.T, counter *prometheus.CounterVec) int {
dto := &io_prometheus_client.Metric{}
err := counter.WithLabelValues("my-source").Write(dto)
assert.NoError(t, err)
return int(*dto.Counter.Value)
}
51 changes: 51 additions & 0 deletions runner/sidecar/monitor/mocks/storage.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

53 changes: 1 addition & 52 deletions runner/sidecar/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,59 +2,8 @@ package monitor

import (
"context"
"fmt"

"github.com/go-redis/redis/v8"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)

type Interface interface {
Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) error
}

type impl struct {
rdb *redis.Client
missingCounter *prometheus.CounterVec
duplicateCounter *prometheus.CounterVec
}

func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) error {
key := fmt.Sprintf("%s/%d/offset", sourceURN, partition)
last, err := i.rdb.GetBit(ctx, key, offset-1).Result()
if err == redis.Nil {
// noop
} else if err != nil {
return err
} else if last == 0 {
i.missingCounter.WithLabelValues(sourceName).Inc()
}
current, err := i.rdb.GetBit(ctx, key, offset).Result()
if err == redis.Nil {
// noop
} else if err != nil {
return err
} else if current == 1 {
i.duplicateCounter.WithLabelValues(sourceName).Inc()
}
i.rdb.SetBit(ctx, key, offset, 1)
return nil
}

func New() Interface {
return &impl{
redis.NewClient(&redis.Options{
Addr: "redis:6379",
}),
promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sources",
Name: "missing",
Help: "Total number of missing messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_missing",
}, []string{"sourceName"}),
promauto.NewCounterVec(prometheus.CounterOpts{
Subsystem: "sources",
Name: "duplicate",
Help: "Total number of duplicate messages, see https://github.com/argoproj-labs/argo-dataflow/blob/main/docs/METRICS.md#sources_duplicate",
}, []string{"sourceName"}),
}
Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) (bool, error)
}
15 changes: 9 additions & 6 deletions runner/sidecar/source/kafka/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,17 +13,17 @@ import (
"k8s.io/apimachinery/pkg/util/runtime"
)

var mntr = monitor.New()

type handler struct {
mntr monitor.Interface
sourceName string
sourceURN string
process source.Process
i int
}

func newHandler(sourceName, sourceURN string, process source.Process) sarama.ConsumerGroupHandler {
func newHandler(mntr monitor.Interface, sourceName, sourceURN string, process source.Process) sarama.ConsumerGroupHandler {
return &handler{
mntr: mntr,
sourceName: sourceName,
sourceURN: sourceURN,
process: process,
Expand All @@ -45,12 +45,15 @@ func (h *handler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.Co
logger.Info("starting consuming claim", "partition", claim.Partition())
defer sess.Commit()
for msg := range claim.Messages() {
if ok, err := h.mntr.Accept(ctx, h.sourceName, h.sourceURN, msg.Partition, msg.Offset); err != nil {
logger.Error(err, "failed to determine if we should accept the message")
} else if !ok {
continue
}
if err := h.processMessage(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
} else {
if err := mntr.Accept(ctx, h.sourceName, h.sourceURN, msg.Partition, msg.Offset); err != nil {
logger.Error(err, "failed to accept message")
}

sess.MarkMessage(msg, "")
h.i++
if h.i%dfv1.CommitN == 0 {
Expand Down
6 changes: 4 additions & 2 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"fmt"
"time"

"github.com/argoproj-labs/argo-dataflow/runner/sidecar/monitor"

"k8s.io/apimachinery/pkg/util/runtime"

"github.com/Shopify/sarama"
Expand All @@ -26,7 +28,7 @@ type kafkaSource struct {
topic string
}

func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGroupID, sourceName, sourceURN string, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monitor.Interface, consumerGroupID, sourceName, sourceURN string, x dfv1.KafkaSource, process source.Process) (source.Interface, error) {
config, err := kafka.GetConfig(ctx, secretInterface, x.Kafka.KafkaConfig)
if err != nil {
return nil, err
Expand All @@ -51,7 +53,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, consumerGr
if err != nil {
return nil, err
}
h := newHandler(sourceName, sourceURN, process)
h := newHandler(mntr, sourceName, sourceURN, process)
go wait.JitterUntil(func() {
defer runtime.HandleCrash()
ctx := context.Background()
Expand Down
Loading

0 comments on commit 1b91c71

Please sign in to comment.