Skip to content

Commit

Permalink
feat: fix bugs in monitor (#391)
Browse files Browse the repository at this point in the history
Signed-off-by: Alex Collins <[email protected]>
  • Loading branch information
alexec authored Oct 1, 2021
1 parent b82ef2f commit 2720622
Show file tree
Hide file tree
Showing 11 changed files with 135 additions and 70 deletions.
1 change: 1 addition & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ test-hpa:
kubectl -n argo-dataflow-system autoscale step 101-hello-main --min 2 --max 2
sleep 20s
if [ `kubectl -n argo-dataflow-system get step 101-hello-main -o=jsonpath='{.status.replicas}'` != 2 ]; then exit 1; fi
test-kafka: test-kafka-e2e test-kafka-fmea test-kafka-stress
test-kafka-e2e:
test-kafka-fmea:
test-kafka-stress:
Expand Down
98 changes: 51 additions & 47 deletions runner/sidecar/monitor/impl.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,54 +34,27 @@ var (
)
)

//go:generate mockery --exported --name=storage

type storage interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
}

type redisStorage struct {
rdb redis.Cmdable
}

func (r *redisStorage) Get(ctx context.Context, key string) (string, error) {
return r.rdb.Get(ctx, key).Result()
}

func (r *redisStorage) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return r.rdb.Set(ctx, key, value, expiration).Err()
}

type impl struct {
mu sync.Mutex
db map[string]int64
mu sync.Mutex // for cache
cache map[string]int64 // key -> offset
pipelineName string
stepName string
storage storage
}

func (i *impl) Commit(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) {
func (i *impl) Mark(sourceURN string, partition int32, offset int64) {
i.mu.Lock()
defer i.mu.Unlock()
key := i.key(sourceURN, partition)
i.db[key] = offset
i.cache[i.key(sourceURN, partition)] = offset
}

func (i *impl) Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) bool {
func (i *impl) Accept(sourceName, sourceURN string, partition int32, offset int64) bool {
i.mu.Lock()
defer i.mu.Unlock()
key := i.key(sourceURN, partition)
if _, ok := i.db[key]; !ok {
text, _ := i.storage.Get(ctx, key)
lastOffset, err := strconv.ParseInt(text, 10, 64)
if err != nil {
i.db[key] = offset - 1
} else {
i.db[key] = lastOffset
}
lastOffset, ok := i.cache[i.key(sourceURN, partition)]
i.mu.Unlock()
if !ok {
lastOffset = offset - 1
}
lastOffset := i.db[key]
expectedOffset := lastOffset + 1
offsetDelta := offset - expectedOffset
if offsetDelta < 0 {
Expand All @@ -98,27 +71,58 @@ func (i *impl) key(sourceURN string, partition int32) string {
return fmt.Sprintf("%s/%s/%s/%d/offset", i.pipelineName, i.stepName, sourceURN, partition)
}

func (i *impl) commitOffsets(ctx context.Context) {
func (i *impl) AssignedPartition(ctx context.Context, sourceURN string, partition int32) {
key := i.key(sourceURN, partition)
i.mu.Lock()
defer i.mu.Unlock()
for key, offset := range i.db {
if err := i.storage.Set(ctx, key, offset, time.Hour*24*30); err != nil {
logger.Error(err, "failed to commit offset to Redis", "key", key, "offset", offset)
}
_, ok := i.cache[key]
i.mu.Unlock()
if ok {
return
}
text, _ := i.storage.Get(ctx, key)
lastOffset, err := strconv.ParseInt(text, 10, 64)
if err == nil {
i.mu.Lock()
defer i.mu.Unlock()
i.cache[key] = lastOffset
}
}

func (i *impl) RevokedPartition(ctx context.Context, sourceURN string, partition int32) {
key := i.key(sourceURN, partition)
i.mu.Lock()
offset := i.cache[key]
delete(i.cache, key)
i.mu.Unlock()
i.commitOffset(ctx, key, offset)
}

func (i *impl) commitOffset(ctx context.Context, key string, offset int64) {
if err := i.storage.Set(ctx, key, offset, time.Hour*24*30); err != nil {
logger.Error(err, "failed to commit offset to Redis", "key", key, "offset", offset)
}
}

func (i *impl) commitOffsets(ctx context.Context) {
for key, offset := range i.cache {
i.commitOffset(ctx, key, offset)
}
}

func (i *impl) Close(ctx context.Context) {
i.commitOffsets(ctx)
i.mu.Lock()
i.cache = map[string]int64{}
i.mu.Unlock()
}

func New(ctx context.Context, pipelineName, stepName string) Interface {
i := &impl{
sync.Mutex{},
map[string]int64{},
pipelineName,
stepName,
&redisStorage{redis.NewClient(&redis.Options{
mu: sync.Mutex{},
cache: map[string]int64{},
pipelineName: pipelineName,
stepName: stepName,
storage: &redisStorage{redis.NewClient(&redis.Options{
Addr: "redis:6379",
})},
}
Expand Down
28 changes: 17 additions & 11 deletions runner/sidecar/monitor/impl_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package monitor
import (
"context"
"errors"
"sync"
"testing"
"time"

Expand All @@ -19,37 +18,40 @@ func Test_impl_Accept(t *testing.T) {
rdb.On("Get", ctx, "my-pl/my-step/my-urn/1/offset").Return("", errors.New(""))
rdb.On("Get", ctx, "my-pl/my-step/my-urn/2/offset").Return("1", nil)
i := &impl{
mu: sync.Mutex{},
db: map[string]int64{},
cache: map[string]int64{},
pipelineName: "my-pl",
stepName: "my-step",
storage: rdb,
}
t.Run("AssignPartitions", func(t *testing.T) {
i.AssignedPartition(ctx, "my-urn", 1)
i.AssignedPartition(ctx, "my-urn", 2)
})
t.Run("EmptyStorage", func(t *testing.T) {
accept := i.Accept(ctx, "my-source", "my-urn", 1, 1)
accept := i.Accept("my-source", "my-urn", 1, 1)
assert.True(t, accept)
i.Commit(ctx, "my-source", "my-urn", 1, 1)
i.Mark("my-urn", 1, 1)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("ExistingStorage", func(t *testing.T) {
accept := i.Accept(ctx, "my-source", "my-urn", 2, 2)
i.Commit(ctx, "my-source", "my-urn", 2, 2)
accept := i.Accept("my-source", "my-urn", 2, 2)
i.Mark("my-urn", 2, 2)
assert.True(t, accept)
assert.Equal(t, 0, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("RepeatedOffset", func(t *testing.T) {
accept := i.Accept(ctx, "my-source", "my-urn", 2, 2)
accept := i.Accept("my-source", "my-urn", 2, 2)
assert.False(t, accept)
i.Commit(ctx, "my-source", "my-urn", 2, 2)
i.Mark("my-urn", 2, 2)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 0, missing(t))
})
t.Run("SkippedOffset", func(t *testing.T) {
accept := i.Accept(ctx, "my-source", "my-urn", 2, 5)
accept := i.Accept("my-source", "my-urn", 2, 5)
assert.True(t, accept)
i.Commit(ctx, "my-source", "my-urn", 2, 5)
i.Mark("my-urn", 2, 5)
assert.Equal(t, 1, duplicate(t))
assert.Equal(t, 2, missing(t))
})
Expand All @@ -59,6 +61,10 @@ func Test_impl_Accept(t *testing.T) {
t.Run("CommitOffsets", func(t *testing.T) {
i.commitOffsets(ctx)
})
t.Run("RevokePartition", func(t *testing.T) {
i.RevokedPartition(ctx, "my-urn", 1)
i.RevokedPartition(ctx, "my-urn", 2)
})
}

func duplicate(t *testing.T) int {
Expand Down
6 changes: 4 additions & 2 deletions runner/sidecar/monitor/monitor.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (

type Interface interface {
// Accept determine if the message should be processed. It is not a duplicate.
Accept(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64) bool
Commit(ctx context.Context, sourceName, sourceURN string, partition int32, offset int64)
Accept(sourceName, sourceURN string, partition int32, offset int64) bool
Mark(sourceURN string, partition int32, offset int64)
AssignedPartition(ctx context.Context, sourceURN string, partition int32)
RevokedPartition(ctx context.Context, sourceURN string, partition int32)
Close(context.Context)
}
20 changes: 20 additions & 0 deletions runner/sidecar/monitor/redis_storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package monitor

import (
"context"
"time"

"github.com/go-redis/redis/v8"
)

type redisStorage struct {
rdb redis.Cmdable
}

func (r *redisStorage) Get(ctx context.Context, key string) (string, error) {
return r.rdb.Get(ctx, key).Result()
}

func (r *redisStorage) Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error {
return r.rdb.Set(ctx, key, value, expiration).Err()
}
13 changes: 13 additions & 0 deletions runner/sidecar/monitor/storage.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package monitor

import (
"context"
"time"
)

//go:generate mockery --exported --name=storage

type storage interface {
Get(ctx context.Context, key string) (string, error)
Set(ctx context.Context, key string, value interface{}, expiration time.Duration) error
}
16 changes: 12 additions & 4 deletions runner/sidecar/source/kafka/kafka.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type kafkaSource struct {
logger logr.Logger
sourceName string
sourceURN string
mntr monitor.Interface
consumer *kafka.Consumer
topic string
wg *sync.WaitGroup
Expand Down Expand Up @@ -53,6 +54,7 @@ func New(ctx context.Context, secretInterface corev1.SecretInterface, mntr monit

s := &kafkaSource{
logger: logger,
mntr: mntr,
sourceName: sourceName,
sourceURN: sourceURN,
consumer: consumer,
Expand Down Expand Up @@ -93,19 +95,21 @@ func (s *kafkaSource) assignedPartition(ctx context.Context, partition int32) {
logger := s.logger.WithValues("partition", partition)
if _, ok := s.channels[partition]; !ok {
logger.Info("assigned partition")
s.mntr.AssignedPartition(ctx, s.sourceURN, partition)
s.channels[partition] = make(chan *kafka.Message, 256)
go wait.JitterUntilWithContext(ctx, func(ctx context.Context) {
s.consumePartition(ctx, partition)
}, 3*time.Second, 1.2, true)
}
}

func (s *kafkaSource) revokedPartition(partition int32) {
func (s *kafkaSource) revokedPartition(ctx context.Context, partition int32) {
if _, ok := s.channels[partition]; ok {
s.logger.Info("revoked partition", "partition", partition)
close(s.channels[partition])
delete(s.channels, partition)
}
s.mntr.RevokedPartition(ctx, s.sourceURN, partition)
}

func (s *kafkaSource) startPollLoop(ctx context.Context) {
Expand Down Expand Up @@ -143,7 +147,8 @@ func (s *kafkaSource) startPollLoop(ctx context.Context) {

func (s *kafkaSource) Close() error {
s.logger.Info("closing partition channels")
for _, ch := range s.channels {
for key, ch := range s.channels {
delete(s.channels, key)
close(ch)
}
s.logger.Info("waiting for partition consumers to finish")
Expand Down Expand Up @@ -189,7 +194,7 @@ func (s *kafkaSource) rebalanced(ctx context.Context, event kafka.Event) error {
}
case kafka.RevokedPartitions:
for _, p := range e.Partitions {
s.revokedPartition(p.Partition)
s.revokedPartition(ctx, p.Partition)
}
}
return nil
Expand All @@ -206,9 +211,12 @@ func (s *kafkaSource) consumePartition(ctx context.Context, partition int32) {
for msg := range s.channels[partition] {
offset := int64(msg.TopicPartition.Offset)
logger := logger.WithValues("offset", offset)
if err := s.processMessage(ctx, msg); err != nil {
if !s.mntr.Accept(s.sourceName, s.sourceURN, partition, offset) {
logger.Info("not accepting message")
} else if err := s.processMessage(ctx, msg); err != nil {
logger.Error(err, "failed to process message")
} else {
s.mntr.Mark(s.sourceURN, partition, offset)
if _, err := s.consumer.CommitMessage(msg); err != nil {
logger.Error(err, "failed to commit message")
}
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-stress/kafka_stress_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ func TestKafkaSourceStress(t *testing.T) {
}},
},
Replicas: Params.Replicas,
Sources: []Source{{Kafka: &KafkaSource{StartOffset: "First", Kafka: Kafka{Topic: topic, KafkaConfig: KafkaConfig{MaxMessageBytes: msgSize}}}}},
Sources: []Source{{Kafka: &KafkaSource{Kafka: Kafka{Topic: topic, KafkaConfig: KafkaConfig{MaxMessageBytes: msgSize}}}}},
Sinks: []Sink{DefaultLogSink},
}},
},
Expand Down
2 changes: 1 addition & 1 deletion test/kafka-stress/test-results.json
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
"TestKafkaSinkStress/async=true.tps": 400,
"TestKafkaSinkStress/messageSize=1000.tps": 300,
"TestKafkaSinkStress/replicas=2.tps": 400,
"TestKafkaSourceStress/.tps": 400,
"TestKafkaSourceStress/.tps": 150,
"TestKafkaSourceStress/N=10,messageSize=100.tps": 450,
"TestKafkaSourceStress/N=10,messageSize=1000.tps": 650,
"TestKafkaSourceStress/N=50000.tps": 3150,
Expand Down
17 changes: 14 additions & 3 deletions test/matchers.go
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
// +build test

package test

import "fmt"
Expand All @@ -11,16 +13,25 @@ func (m matcher) String() string { return m.string }

func Eq(v float64) matcher {
return matcher{
fmt.Sprintf("=%v", v),
fmt.Sprintf("eq %v", v),
func(w float64) bool {
return w == v
},
}
}

func Missing() matcher {
return matcher{
"missing",
func(w float64) bool {
return w == missing
},
}
}

func Gt(v float64) matcher {
return matcher{
fmt.Sprintf(">%v", v),
fmt.Sprintf("gt %v", v),
func(w float64) bool {
return w > v
},
Expand All @@ -29,7 +40,7 @@ func Gt(v float64) matcher {

func Between(min, max float64) matcher {
return matcher{
fmt.Sprintf("%v<= && <=%v", min, max),
fmt.Sprintf("between %v and <=%v", min, max),
func(w float64) bool {
return min <= w && w <= max
},
Expand Down
Loading

0 comments on commit 2720622

Please sign in to comment.