From 57d424652e925afb8ebbe795ab9faa4800e32335 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Wed, 3 May 2023 17:10:37 +0800 Subject: [PATCH 1/6] kafka(consumer): Improve `AtLeastOnce` handling Improves how `AtLeastOnceDelivery` is handled by the kafka consumer. Previously, the consumer could discard up to `cfg.MaxPollRecords`, since it incorrectly assumed that unacknowledged records would be re-delivered by Kafka if not committed within a certain amount of time. Now, it discards any records which can't be processed, but continues processing the rest of the records in the fetch. It can still result in some data loss, and is at the mercy of the failures returned by the processor, but it's much better than previously. Additionally, adds a few tests which cover both `AtLeastOnceDelivery` and `AtMostOnceDelivery`, and a `TestGracefulShutdown` test. Partially addresses #118 Part of #123 Signed-off-by: Marc Lopez Rubio --- go.mod | 1 + go.sum | 1 + kafka/consumer.go | 76 ++++----- kafka/consumer_test.go | 345 ++++++++++++++++++++++++++++++++++------- kafka/producer_test.go | 2 +- systemtest/go.mod | 3 +- systemtest/go.sum | 1 + 7 files changed, 336 insertions(+), 93 deletions(-) diff --git a/go.mod b/go.mod index 60e20b74..bebefb90 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v0.13.0 // indirect cloud.google.com/go/longrunning v0.4.1 // indirect + github.com/benbjohnson/clock v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 3b9ebd1c..f5746878 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,7 @@ cloud.google.com/go/pubsublite v1.8.0/go.mod h1:7UapoCpdX/0nt4phleaQlrJ5S2GH81hs github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/kafka/consumer.go b/kafka/consumer.go index fcf3c635..f6da581f 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -36,6 +36,12 @@ import ( "github.com/elastic/apm-queue/queuecontext" ) +var ( + // ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is + // apmqueue.AtMostOnceDelivery. + ErrCommitFailed = errors.New("kafka: failed to commit offsets") +) + // SASLMechanism type alias to sasl.Mechanism type SASLMechanism = sasl.Mechanism @@ -59,7 +65,7 @@ type ConsumerConfig struct { // Version is the software version to use in the Kafka client. This is // useful since it shows up in Kafka metrics and logs. Version string - // Decoder holds an encoding.Decoder for decoding events. + // Decoder holds an encoding.Decoder for decoding records. Decoder Decoder // MaxPollRecords defines an upper bound to the number of records that can // be polled on a single fetch. If MaxPollRecords <= 0, defaults to 100. @@ -210,7 +216,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { }, nil } -// Close closes the consumer. +// Close the consumer, blocking until all partition consumers are stopped. func (c *Consumer) Close() error { c.mu.Lock() defer c.mu.Unlock() @@ -221,7 +227,13 @@ func (c *Consumer) Close() error { return nil } -// Run executes the consumer in a blocking manner. +// Run the consumer until a non recoverable error is found: +// - context.Cancelled. +// - context.DeadlineExceeded. +// - kgo.ErrClientClosed. +// - ErrCommitFailed. +// +// To shut down the consumer, cancel the context, or call consumer.Close(). func (c *Consumer) Run(ctx context.Context) error { for { if err := c.fetch(ctx); err != nil { @@ -233,9 +245,6 @@ func (c *Consumer) Run(ctx context.Context) error { // fetch polls the Kafka broker for new records up to cfg.MaxPollRecords. // Any errors returned by fetch should be considered fatal. func (c *Consumer) fetch(ctx context.Context) error { - c.mu.RLock() - defer c.mu.RUnlock() - fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords) defer c.client.AllowRebalance() @@ -252,12 +261,12 @@ func (c *Consumer) fetch(ctx context.Context) error { case apmqueue.AtMostOnceDeliveryType: // Commit the fetched record offsets as soon as we've polled them. if err := c.client.CommitUncommittedOffsets(ctx); err != nil { - // If the commit fails, then return immediately and any uncommitted - // records will be re-delivered in time. Otherwise, records may be - // processed twice. - return nil + // NOTE(marclop): If the commit fails with an unrecoverable error, + // return it and terminate the consumer. This will avoid potentially + // processing records twice, and it's up to the consumer + return ErrCommitFailed } - // Allow rebalancing now that we have committed offsets, preventing + // Allow re-balancing now that we have committed offsets, preventing // another consumer from reprocessing the records. c.client.AllowRebalance() } @@ -266,6 +275,9 @@ func (c *Consumer) fetch(ctx context.Context) error { zap.Error(err), zap.String("topic", t), zap.Int32("partition", p), ) }) + // No need to grab the lock until we want to access `c.consumer`. + c.mu.RLock() + defer c.mu.RUnlock() // Send partition records to be processed by its dedicated goroutine. fetches.EachPartition(func(ftp kgo.FetchTopicPartition) { if len(ftp.Records) == 0 { @@ -275,9 +287,9 @@ func (c *Consumer) fetch(ctx context.Context) error { select { case c.consumer.consumers[tp].records <- ftp.Records: // When using AtMostOnceDelivery, if the context is cancelled between - // PollRecords and this line, the events will be lost. + // PollRecords and this line, records will be lost. // NOTE(marclop) Add a shutdown timer so that there's a grace period - // to allow the events to be processed before they're lost. + // to allow the records to be processed before they're lost. case <-ctx.Done(): if c.cfg.Delivery == apmqueue.AtMostOnceDeliveryType { c.cfg.Logger.Warn( @@ -320,8 +332,9 @@ type topicPartition struct { } // assigned must be set as a kgo.OnPartitionsAssigned callback. Ensuring all -// assigned partitions to this consumer process received records. -func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) { +// assigned partitions to this consumer process received records. The received +// context is only cancelled after the kgo.client is closed. +func (c *consumer) assigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) { c.mu.Lock() defer c.mu.Unlock() for topic, partitions := range assigned { @@ -337,7 +350,7 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ } go func(topic string, partition int32) { defer c.wg.Done() - pc.consume(topic, partition) + pc.consume(ctx, topic, partition) }(topic, partition) tp := topicPartition{partition: partition, topic: topic} c.consumers[tp] = pc @@ -374,7 +387,8 @@ type partitionConsumer struct { // consume processed the records from a topic and partition. Calling consume // more than once will cause a panic. -func (pc partitionConsumer) consume(topic string, partition int32) { +// The received context which is only canceled when the kgo.Client is closed. +func (pc partitionConsumer) consume(ctx context.Context, topic string, partition int32) { logger := pc.logger.With( zap.String("topic", topic), zap.Int32("partition", partition), @@ -383,7 +397,6 @@ func (pc partitionConsumer) consume(topic string, partition int32) { // Store the last processed record. Default to -1 for cases where // only the first record is received. last := -1 - recordLoop: for i, msg := range records { meta := make(map[string]string) for _, h := range msg.Headers { @@ -397,38 +410,27 @@ func (pc partitionConsumer) consume(topic string, partition int32) { zap.Int64("offset", msg.Offset), zap.Any("headers", meta), ) - // TODO(marclop) DLQ? The decoding has failed, re-delivery - // may cause the same error. Discard the event for now. + // NOTE(marclop) The decoding has failed, a DLQ may be helpful. continue } - ctx := queuecontext.WithMetadata(context.Background(), meta) + ctx := queuecontext.WithMetadata(ctx, meta) batch := model.Batch{event} + // If a record can't be processed, no retries are attempted and the + // record is lost. https://github.com/elastic/apm-queue/issues/118. if err := pc.processor.ProcessBatch(ctx, &batch); err != nil { - logger.Error("unable to process event", + logger.Error("data loss: unable to process event", zap.Error(err), zap.Int64("offset", msg.Offset), zap.Any("headers", meta), ) - switch pc.delivery { - case apmqueue.AtLeastOnceDeliveryType: - // Exit the loop and commit the last processed offset - // (if any). This ensures events which haven't been - // processed are re-delivered, but those that have, are - // committed. - break recordLoop - case apmqueue.AtMostOnceDeliveryType: - // Events which can't be processed, are lost. - continue - } } last = i } - // Only commit the records when at least a record has been processed - // with AtLeastOnceDeliveryType. + // Only commit the records when at least a record has been decoded when + // AtLeastOnceDeliveryType is set. if pc.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 { lastRecord := records[last] - err := pc.client.CommitRecords(context.Background(), lastRecord) - if err != nil { + if err := pc.client.CommitRecords(ctx, lastRecord); err != nil { logger.Error("unable to commit records", zap.Error(err), zap.Int64("offset", lastRecord.Offset), diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 6924e645..fbbf4631 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -20,6 +20,7 @@ package kafka import ( "context" "crypto/tls" + "errors" "strconv" "sync/atomic" "testing" @@ -30,6 +31,8 @@ import ( "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" "github.com/elastic/apm-data/model" apmqueue "github.com/elastic/apm-queue" @@ -145,7 +148,7 @@ func TestConsumerFetch(t *testing.T) { GroupID: "groupid", Decoder: codec, Logger: zap.NewNop(), - Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + Processor: model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { assert.Len(t, *b, 1) assert.Equal(t, event, (*b)[0]) return nil @@ -155,20 +158,261 @@ func TestConsumerFetch(t *testing.T) { b, err := codec.Encode(event) require.NoError(t, err) - record := &kgo.Record{ - Topic: string(topics[0]), - Value: b, + produceRecord(context.Background(), t, client, + &kgo.Record{Topic: string(topics[0]), Value: b}, + ) + consumer := newConsumer(t, cfg) + assert.NoError(t, consumer.fetch(context.Background())) +} + +func TestConsumerDelivery(t *testing.T) { + // ALOD = at least once delivery + // AMOD = at most once delivery + event := model.APMEvent{Transaction: &model.Transaction{ID: "1"}} + codec := json.JSON{} + topics := []apmqueue.Topic{"topic"} + + // Produces `initialRecords` + `lastRecords` in total. Asserting the + // "lossy" behavior of the consumer implementation, depending on the + // chosen DeliveryType. + // `initialRecords` are produced, then, the concurrent consumer is + // started, and the first receive will always fail to process the first + // records in the received poll (`maxPollRecords`). + // Next, the context is cancelled and the consumer is stopped. + // A new consumer is created which takes over from the last committed + // offset. + // Depending on the DeliveryType some or none records should be lost. + cases := map[string]struct { + // Test setup. + deliveryType apmqueue.DeliveryType + initialRecords int + maxPollRecords int + lastRecords int + // Number of successfully processed events. + processed int32 + // Number of unsuccessfully processed events. + errored int32 + }{ + "12_produced_10_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 10, + maxPollRecords: 10, + lastRecords: 2, + + processed: 2, // The last produced records are processed. + errored: 10, // The initial produced records are lost. + }, + "30_produced_2_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 20, + maxPollRecords: 2, + lastRecords: 10, + + processed: 26, // 30 total - 2 errored - 2 lost before batch processor. + errored: 2, // The first two fetch fails. + }, + "12_produced_1_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 1, + maxPollRecords: 1, + lastRecords: 11, + + processed: 11, // The last produced records are processed. + errored: 1, // The initial produced records are lost. + }, + "12_produced_10_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 10, + maxPollRecords: 10, + lastRecords: 2, + + processed: 12, // All records are re-processed. + errored: 10, // The initial batch errors. + }, + "30_produced_2_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 20, + maxPollRecords: 2, + lastRecords: 10, + + processed: 30, // All records are processed. + errored: 2, // The initial batch errors. + }, + "12_produced_1_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 1, + maxPollRecords: 1, + lastRecords: 11, + + processed: 11, + errored: 1, + }, } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + client, addrs := newClusterWithTopics(t, topics...) + failRecord := make(chan struct{}) + processRecord := make(chan struct{}) + defer close(failRecord) + + baseLogger := zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)) + + var processed atomic.Int32 + var errored atomic.Int32 + cfg := ConsumerConfig{ + Delivery: tc.deliveryType, + Brokers: addrs, + Decoder: codec, + Topics: topics, + GroupID: "groupid", + Logger: baseLogger, + MaxPollRecords: tc.maxPollRecords, + Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + select { + // Records are marked as processed on receive processRecord. + case <-processRecord: + processed.Add(int32(len(*b))) + // Records are marked as failed when ctx is canceled, or + // on receive failRecord. + case <-failRecord: + errored.Add(int32(len(*b))) + return errors.New("failed processing record") + case <-ctx.Done(): + errored.Add(int32(len(*b))) + return ctx.Err() + } + return nil + }), + } - results := client.ProduceSync(context.Background(), record) - assert.NoError(t, results.FirstErr()) + b, err := codec.Encode(event) + require.NoError(t, err) + record := &kgo.Record{ + Topic: string(topics[0]), + Value: b, + } - r, err := results.First() - assert.NoError(t, err) - assert.NotNil(t, r) + // Context used for the consumer + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i < int(tc.initialRecords); i++ { + produceRecord(ctx, t, client, record) + } - consumer := newConsumer(t, cfg) - assert.NoError(t, consumer.fetch(context.Background())) + cfg.Logger = baseLogger.Named("1") + consumer := newConsumer(t, cfg) + go func() { + err := consumer.Run(ctx) + assert.Error(t, err) + }() + + // Wait until the batch processor function is called. + // The first event is processed and after the context is canceled, + // the partition consumers are also stopped. Fetching and record + // processing is decoupled. The consumer may have fetched more + // records while waiting for `failRecord`. + // For AMOD, the offsets are committed after being fetched, which + // means that records may be lost before they reach the Processor. + select { + case failRecord <- struct{}{}: + cancel() + case <-time.After(time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + consumer.Close() + + assert.Eventually(t, func() bool { + return int(errored.Load()) == tc.maxPollRecords + }, time.Second, time.Millisecond) + + // Start a new consumer in the background and then produce + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + // Produce tc.lastRecords. + for i := 0; i < tc.lastRecords; i++ { + produceRecord(ctx, t, client, record) + } + cfg.MaxPollRecords = tc.lastRecords + cfg.Logger = baseLogger.Named("2") + consumer = newConsumer(t, cfg) + go func() { + assert.ErrorIs(t, consumer.Run(ctx), context.Canceled) + }() + // Wait for the first record to be consumed before running any assertion. + select { + case processRecord <- struct{}{}: + close(processRecord) // Allow records to be processed + case <-time.After(6 * time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + + assert.Eventually(t, func() bool { + return processed.Load() == tc.processed && + errored.Load() == tc.errored + }, 2*time.Second, time.Millisecond) + t.Logf("got: %d events errored, %d processed, want: %d errored, %d processed", + errored.Load(), processed.Load(), tc.errored, tc.processed, + ) + }) + } +} + +func TestGracefulSutdown(t *testing.T) { + test := func(t testing.TB, dt apmqueue.DeliveryType) { + client, brokers := newClusterWithTopics(t, "topic") + var codec json.JSON + var processed atomic.Int32 + var errored atomic.Int32 + process := make(chan struct{}) + records := 10 + consumer := newConsumer(t, ConsumerConfig{ + Brokers: brokers, + GroupID: "group", + Topics: []apmqueue.Topic{"topic"}, + Decoder: codec, + MaxPollRecords: records, + Delivery: dt, + Logger: zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)), + Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + select { + case <-ctx.Done(): + errored.Add(int32(len(*b))) + return ctx.Err() + case <-process: + processed.Add(int32(len(*b))) + } + return nil + }), + }) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go func() { consumer.Run(ctx) }() + + b, err := codec.Encode(model.APMEvent{Transaction: &model.Transaction{ID: "1"}}) + require.NoError(t, err) + for i := 0; i < records; i++ { + produceRecord(ctx, t, client, &kgo.Record{Topic: "topic", Value: b}) + } + + select { + case process <- struct{}{}: + close(process) // Allow records to be processed + cancel() // Stop the consumer. + case <-time.After(time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + assert.Eventually(t, func() bool { + return processed.Load() == int32(records) && errored.Load() == 0 + }, 5*time.Second, time.Millisecond) + t.Logf("got: %d events processed, %d errored", processed.Load(), errored.Load()) + } + + t.Run("AtLeastOnceDelivery", func(t *testing.T) { + test(t, apmqueue.AtLeastOnceDeliveryType) + }) + t.Run("AtMostOnceDelivery", func(t *testing.T) { + test(t, apmqueue.AtMostOnceDeliveryType) + }) } func TestMultipleConsumers(t *testing.T) { @@ -184,49 +428,36 @@ func TestMultipleConsumers(t *testing.T) { GroupID: "groupid", Decoder: codec, Logger: zap.NewNop(), - Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + Processor: model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { count.Add(1) assert.Len(t, *b, 1) return nil }), } - b, err := codec.Encode(event) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + consumers := 2 + for i := 0; i < consumers; i++ { + go func() { + consumer := newConsumer(t, cfg) + assert.ErrorIs(t, consumer.Run(ctx), context.Canceled) + }() + } - consumer1 := newConsumer(t, cfg) - go func() { - assert.ErrorIs(t, consumer1.Run(ctx), context.Canceled) - }() - - waitCh := make(chan struct{}) - go func() { - defer close(waitCh) - for i := 0; i < 1000; i++ { - results := client.ProduceSync(context.Background(), &kgo.Record{ - Topic: string(topics[0]), - Value: b, - }) - assert.NoError(t, results.FirstErr()) - record, err := results.First() - assert.NoError(t, err) - assert.NotNil(t, record) - } - }() - - consumer2 := newConsumer(t, cfg) - go func() { - assert.ErrorIs(t, consumer2.Run(ctx), context.Canceled) - }() - - <-waitCh - + b, err := codec.Encode(event) + require.NoError(t, err) + record := kgo.Record{ + Topic: string(topics[0]), + Value: b, + } + produced := 1000 + for i := 0; i < produced; i++ { + produceRecord(ctx, t, client, &record) + } assert.Eventually(t, func() bool { - return count.Load() == 1000 - }, 1*time.Second, 50*time.Millisecond) + return count.Load() == int32(produced) + }, time.Second, 50*time.Millisecond) } func TestMultipleConsumerGroups(t *testing.T) { @@ -234,7 +465,6 @@ func TestMultipleConsumerGroups(t *testing.T) { codec := json.JSON{} topics := []apmqueue.Topic{"topic"} client, addrs := newClusterWithTopics(t, topics...) - cfg := ConsumerConfig{ Brokers: addrs, Topics: topics, @@ -247,17 +477,15 @@ func TestMultipleConsumerGroups(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m := make(map[string]*atomic.Int64) - groups := 10 for i := 0; i < groups; i++ { gid := "groupid" + strconv.Itoa(i) cfg.GroupID = gid - m[gid] = &atomic.Int64{} - cfg.Processor = model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { - count := m[gid] - count.Add(1) + var counter atomic.Int64 + m[gid] = &counter + cfg.Processor = model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { + counter.Add(1) assert.Len(t, *b, 1) assert.Equal(t, event, (*b)[0]) return nil @@ -270,7 +498,7 @@ func TestMultipleConsumerGroups(t *testing.T) { produceRecords := 100 for i := 0; i < produceRecords; i++ { - client.Produce(context.Background(), &kgo.Record{ + client.Produce(ctx, &kgo.Record{ Topic: string(topics[0]), Value: b, }, func(r *kgo.Record, err error) { @@ -282,7 +510,7 @@ func TestMultipleConsumerGroups(t *testing.T) { for k, i := range m { assert.Eventually(t, func() bool { return i.Load() == int64(produceRecords) - }, 1*time.Second, 50*time.Millisecond, k) + }, time.Second, 50*time.Millisecond, k) } } @@ -305,7 +533,7 @@ func TestConsumerRunError(t *testing.T) { require.Error(t, consumer.Run(context.Background())) } -func newConsumer(t *testing.T, cfg ConsumerConfig) *Consumer { +func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { consumer, err := NewConsumer(cfg) require.NoError(t, err) t.Cleanup(func() { @@ -313,3 +541,12 @@ func newConsumer(t *testing.T, cfg ConsumerConfig) *Consumer { }) return consumer } + +func produceRecord(ctx context.Context, t testing.TB, c *kgo.Client, r *kgo.Record) { + t.Helper() + results := c.ProduceSync(ctx, r) + assert.NoError(t, results.FirstErr()) + r, err := results.First() + assert.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/kafka/producer_test.go b/kafka/producer_test.go index e292c9a4..d9b72170 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -155,7 +155,7 @@ func TestNewProducerBasic(t *testing.T) { test(t, false) } -func newClusterWithTopics(t *testing.T, topics ...apmqueue.Topic) (*kgo.Client, []string) { +func newClusterWithTopics(t testing.TB, topics ...apmqueue.Topic) (*kgo.Client, []string) { t.Helper() cluster, err := kfake.NewCluster() require.NoError(t, err) diff --git a/systemtest/go.mod b/systemtest/go.mod index b8a029be..32194d80 100644 --- a/systemtest/go.mod +++ b/systemtest/go.mod @@ -9,6 +9,8 @@ require ( github.com/hashicorp/hc-install v0.5.1 github.com/hashicorp/terraform-exec v0.18.1 github.com/stretchr/testify v1.8.2 + github.com/twmb/franz-go v1.13.3 + github.com/twmb/franz-go/pkg/kadm v1.8.0 go.uber.org/zap v1.24.0 golang.org/x/sync v0.1.0 ) @@ -40,7 +42,6 @@ require ( github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/twmb/franz-go v1.13.3 // indirect github.com/twmb/franz-go/pkg/kmsg v1.5.0 // indirect github.com/twmb/franz-go/plugin/kotel v1.1.0 // indirect github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect diff --git a/systemtest/go.sum b/systemtest/go.sum index ed4219c7..31751c83 100644 --- a/systemtest/go.sum +++ b/systemtest/go.sum @@ -149,6 +149,7 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/twmb/franz-go v1.13.3 h1:AO0HcPu7hNMi+ue+jz3CnV+VpuAizaazQuqTo1SvLr4= github.com/twmb/franz-go v1.13.3/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kadm v1.8.0 h1:vvKwZpxYn+VmM32v4mKkecHLKavZW+HcYLRKKxly5ZY= +github.com/twmb/franz-go/pkg/kadm v1.8.0/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8= github.com/twmb/franz-go/pkg/kfake v0.0.0-20230405224959-0d137e78cd4a h1:n4MRnUKC+zXcZhoKpWMgl2T7yyJXP4POTSs6kl7GFl8= github.com/twmb/franz-go/pkg/kmsg v1.5.0 h1:eqVJquFQLdBNLrRMWX03pPDPpngn6PTjGZLlZnagouk= github.com/twmb/franz-go/pkg/kmsg v1.5.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw= From 8bd8edcccd9722c99ee68887f6a392684f0d0d12 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 May 2023 08:21:23 +0800 Subject: [PATCH 2/6] Increase shutdown test max wait Signed-off-by: Marc Lopez Rubio --- kafka/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index fbbf4631..ca8c072d 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -403,7 +403,7 @@ func TestGracefulSutdown(t *testing.T) { } assert.Eventually(t, func() bool { return processed.Load() == int32(records) && errored.Load() == 0 - }, 5*time.Second, time.Millisecond) + }, 6*time.Second, time.Millisecond) t.Logf("got: %d events processed, %d errored", processed.Load(), errored.Load()) } From 19d213a16de634234998ee4c9d5eff6ac7ce7ef9 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 May 2023 21:02:24 -0700 Subject: [PATCH 3/6] Fix flaky tests Signed-off-by: Marc Lopez Rubio --- kafka/consumer.go | 3 ++- kafka/consumer_test.go | 15 +++++++++------ 2 files changed, 11 insertions(+), 7 deletions(-) diff --git a/kafka/consumer.go b/kafka/consumer.go index f6da581f..112b48f4 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -263,7 +263,8 @@ func (c *Consumer) fetch(ctx context.Context) error { if err := c.client.CommitUncommittedOffsets(ctx); err != nil { // NOTE(marclop): If the commit fails with an unrecoverable error, // return it and terminate the consumer. This will avoid potentially - // processing records twice, and it's up to the consumer + // processing records twice, and it's up to the consumer to re-start + // the consumer. return ErrCommitFailed } // Allow re-balancing now that we have committed offsets, preventing diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index ca8c072d..56e54273 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -188,7 +188,8 @@ func TestConsumerDelivery(t *testing.T) { initialRecords int maxPollRecords int lastRecords int - // Number of successfully processed events. + // Number of successfully processed events. The assertion is GE due to + // variable guarantee factors. processed int32 // Number of unsuccessfully processed events. errored int32 @@ -208,8 +209,9 @@ func TestConsumerDelivery(t *testing.T) { maxPollRecords: 2, lastRecords: 10, - processed: 26, // 30 total - 2 errored - 2 lost before batch processor. - errored: 2, // The first two fetch fails. + // 30 total - 2 errored - 2 lost before they can be processed. + processed: 26, + errored: 2, // The first two fetch fails. }, "12_produced_1_poll_AMOD": { deliveryType: apmqueue.AtMostOnceDeliveryType, @@ -347,7 +349,8 @@ func TestConsumerDelivery(t *testing.T) { } assert.Eventually(t, func() bool { - return processed.Load() == tc.processed && + // Some events may or may not be processed. Assert GE. + return processed.Load() >= tc.processed && errored.Load() == tc.errored }, 2*time.Second, time.Millisecond) t.Logf("got: %d events errored, %d processed, want: %d errored, %d processed", @@ -364,7 +367,7 @@ func TestGracefulSutdown(t *testing.T) { var processed atomic.Int32 var errored atomic.Int32 process := make(chan struct{}) - records := 10 + records := 5 consumer := newConsumer(t, ConsumerConfig{ Brokers: brokers, GroupID: "group", @@ -403,7 +406,7 @@ func TestGracefulSutdown(t *testing.T) { } assert.Eventually(t, func() bool { return processed.Load() == int32(records) && errored.Load() == 0 - }, 6*time.Second, time.Millisecond) + }, time.Second, time.Millisecond) t.Logf("got: %d events processed, %d errored", processed.Load(), errored.Load()) } From 239b65024f8b5dd4fa8e36254c927481ae4ecb7f Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 May 2023 21:10:07 -0700 Subject: [PATCH 4/6] allow a bit of time for cond Signed-off-by: Marc Lopez Rubio --- kafka/consumer_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 56e54273..b3e5fb61 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -406,7 +406,7 @@ func TestGracefulSutdown(t *testing.T) { } assert.Eventually(t, func() bool { return processed.Load() == int32(records) && errored.Load() == 0 - }, time.Second, time.Millisecond) + }, 5*time.Second, time.Millisecond) t.Logf("got: %d events processed, %d errored", processed.Load(), errored.Load()) } From ce7ce822af87e45b2c98b62a3a8d106ebf26afd8 Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 May 2023 21:22:49 -0700 Subject: [PATCH 5/6] Reduce the number of records Signed-off-by: Marc Lopez Rubio --- kafka/consumer_test.go | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index b3e5fb61..92aaeca5 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -367,7 +367,7 @@ func TestGracefulSutdown(t *testing.T) { var processed atomic.Int32 var errored atomic.Int32 process := make(chan struct{}) - records := 5 + records := 2 consumer := newConsumer(t, ConsumerConfig{ Brokers: brokers, GroupID: "group", @@ -406,8 +406,10 @@ func TestGracefulSutdown(t *testing.T) { } assert.Eventually(t, func() bool { return processed.Load() == int32(records) && errored.Load() == 0 - }, 5*time.Second, time.Millisecond) - t.Logf("got: %d events processed, %d errored", processed.Load(), errored.Load()) + }, 6*time.Second, time.Millisecond) + t.Logf("got: %d events processed, %d errored, want: %d processed", + processed.Load(), errored.Load(), records, + ) } t.Run("AtLeastOnceDelivery", func(t *testing.T) { From 37083b06673c54e6eb5d2b59c08be067e1b5354e Mon Sep 17 00:00:00 2001 From: Marc Lopez Rubio Date: Thu, 4 May 2023 21:37:37 -0700 Subject: [PATCH 6/6] Start consumer after producing Signed-off-by: Marc Lopez Rubio --- kafka/consumer_test.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 92aaeca5..684b2e62 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -387,16 +387,16 @@ func TestGracefulSutdown(t *testing.T) { return nil }), }) - ctx, cancel := context.WithCancel(context.Background()) - defer cancel() - go func() { consumer.Run(ctx) }() b, err := codec.Encode(model.APMEvent{Transaction: &model.Transaction{ID: "1"}}) require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() for i := 0; i < records; i++ { produceRecord(ctx, t, client, &kgo.Record{Topic: "topic", Value: b}) } + go func() { consumer.Run(ctx) }() select { case process <- struct{}{}: close(process) // Allow records to be processed