diff --git a/go.mod b/go.mod index 7060b564..4a02344b 100644 --- a/go.mod +++ b/go.mod @@ -26,6 +26,7 @@ require ( cloud.google.com/go/compute/metadata v0.2.3 // indirect cloud.google.com/go/iam v0.13.0 // indirect cloud.google.com/go/longrunning v0.4.1 // indirect + github.com/benbjohnson/clock v1.1.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/go-logr/logr v1.2.4 // indirect github.com/go-logr/stdr v1.2.2 // indirect diff --git a/go.sum b/go.sum index 80923cd1..ebcd7b68 100644 --- a/go.sum +++ b/go.sum @@ -18,6 +18,7 @@ cloud.google.com/go/pubsublite v1.8.0/go.mod h1:7UapoCpdX/0nt4phleaQlrJ5S2GH81hs github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= +github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= diff --git a/kafka/consumer.go b/kafka/consumer.go index fcf3c635..112b48f4 100644 --- a/kafka/consumer.go +++ b/kafka/consumer.go @@ -36,6 +36,12 @@ import ( "github.com/elastic/apm-queue/queuecontext" ) +var ( + // ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is + // apmqueue.AtMostOnceDelivery. + ErrCommitFailed = errors.New("kafka: failed to commit offsets") +) + // SASLMechanism type alias to sasl.Mechanism type SASLMechanism = sasl.Mechanism @@ -59,7 +65,7 @@ type ConsumerConfig struct { // Version is the software version to use in the Kafka client. This is // useful since it shows up in Kafka metrics and logs. Version string - // Decoder holds an encoding.Decoder for decoding events. + // Decoder holds an encoding.Decoder for decoding records. Decoder Decoder // MaxPollRecords defines an upper bound to the number of records that can // be polled on a single fetch. If MaxPollRecords <= 0, defaults to 100. @@ -210,7 +216,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) { }, nil } -// Close closes the consumer. +// Close the consumer, blocking until all partition consumers are stopped. func (c *Consumer) Close() error { c.mu.Lock() defer c.mu.Unlock() @@ -221,7 +227,13 @@ func (c *Consumer) Close() error { return nil } -// Run executes the consumer in a blocking manner. +// Run the consumer until a non recoverable error is found: +// - context.Cancelled. +// - context.DeadlineExceeded. +// - kgo.ErrClientClosed. +// - ErrCommitFailed. +// +// To shut down the consumer, cancel the context, or call consumer.Close(). func (c *Consumer) Run(ctx context.Context) error { for { if err := c.fetch(ctx); err != nil { @@ -233,9 +245,6 @@ func (c *Consumer) Run(ctx context.Context) error { // fetch polls the Kafka broker for new records up to cfg.MaxPollRecords. // Any errors returned by fetch should be considered fatal. func (c *Consumer) fetch(ctx context.Context) error { - c.mu.RLock() - defer c.mu.RUnlock() - fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords) defer c.client.AllowRebalance() @@ -252,12 +261,13 @@ func (c *Consumer) fetch(ctx context.Context) error { case apmqueue.AtMostOnceDeliveryType: // Commit the fetched record offsets as soon as we've polled them. if err := c.client.CommitUncommittedOffsets(ctx); err != nil { - // If the commit fails, then return immediately and any uncommitted - // records will be re-delivered in time. Otherwise, records may be - // processed twice. - return nil + // NOTE(marclop): If the commit fails with an unrecoverable error, + // return it and terminate the consumer. This will avoid potentially + // processing records twice, and it's up to the consumer to re-start + // the consumer. + return ErrCommitFailed } - // Allow rebalancing now that we have committed offsets, preventing + // Allow re-balancing now that we have committed offsets, preventing // another consumer from reprocessing the records. c.client.AllowRebalance() } @@ -266,6 +276,9 @@ func (c *Consumer) fetch(ctx context.Context) error { zap.Error(err), zap.String("topic", t), zap.Int32("partition", p), ) }) + // No need to grab the lock until we want to access `c.consumer`. + c.mu.RLock() + defer c.mu.RUnlock() // Send partition records to be processed by its dedicated goroutine. fetches.EachPartition(func(ftp kgo.FetchTopicPartition) { if len(ftp.Records) == 0 { @@ -275,9 +288,9 @@ func (c *Consumer) fetch(ctx context.Context) error { select { case c.consumer.consumers[tp].records <- ftp.Records: // When using AtMostOnceDelivery, if the context is cancelled between - // PollRecords and this line, the events will be lost. + // PollRecords and this line, records will be lost. // NOTE(marclop) Add a shutdown timer so that there's a grace period - // to allow the events to be processed before they're lost. + // to allow the records to be processed before they're lost. case <-ctx.Done(): if c.cfg.Delivery == apmqueue.AtMostOnceDeliveryType { c.cfg.Logger.Warn( @@ -320,8 +333,9 @@ type topicPartition struct { } // assigned must be set as a kgo.OnPartitionsAssigned callback. Ensuring all -// assigned partitions to this consumer process received records. -func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) { +// assigned partitions to this consumer process received records. The received +// context is only cancelled after the kgo.client is closed. +func (c *consumer) assigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) { c.mu.Lock() defer c.mu.Unlock() for topic, partitions := range assigned { @@ -337,7 +351,7 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[ } go func(topic string, partition int32) { defer c.wg.Done() - pc.consume(topic, partition) + pc.consume(ctx, topic, partition) }(topic, partition) tp := topicPartition{partition: partition, topic: topic} c.consumers[tp] = pc @@ -374,7 +388,8 @@ type partitionConsumer struct { // consume processed the records from a topic and partition. Calling consume // more than once will cause a panic. -func (pc partitionConsumer) consume(topic string, partition int32) { +// The received context which is only canceled when the kgo.Client is closed. +func (pc partitionConsumer) consume(ctx context.Context, topic string, partition int32) { logger := pc.logger.With( zap.String("topic", topic), zap.Int32("partition", partition), @@ -383,7 +398,6 @@ func (pc partitionConsumer) consume(topic string, partition int32) { // Store the last processed record. Default to -1 for cases where // only the first record is received. last := -1 - recordLoop: for i, msg := range records { meta := make(map[string]string) for _, h := range msg.Headers { @@ -397,38 +411,27 @@ func (pc partitionConsumer) consume(topic string, partition int32) { zap.Int64("offset", msg.Offset), zap.Any("headers", meta), ) - // TODO(marclop) DLQ? The decoding has failed, re-delivery - // may cause the same error. Discard the event for now. + // NOTE(marclop) The decoding has failed, a DLQ may be helpful. continue } - ctx := queuecontext.WithMetadata(context.Background(), meta) + ctx := queuecontext.WithMetadata(ctx, meta) batch := model.Batch{event} + // If a record can't be processed, no retries are attempted and the + // record is lost. https://github.com/elastic/apm-queue/issues/118. if err := pc.processor.ProcessBatch(ctx, &batch); err != nil { - logger.Error("unable to process event", + logger.Error("data loss: unable to process event", zap.Error(err), zap.Int64("offset", msg.Offset), zap.Any("headers", meta), ) - switch pc.delivery { - case apmqueue.AtLeastOnceDeliveryType: - // Exit the loop and commit the last processed offset - // (if any). This ensures events which haven't been - // processed are re-delivered, but those that have, are - // committed. - break recordLoop - case apmqueue.AtMostOnceDeliveryType: - // Events which can't be processed, are lost. - continue - } } last = i } - // Only commit the records when at least a record has been processed - // with AtLeastOnceDeliveryType. + // Only commit the records when at least a record has been decoded when + // AtLeastOnceDeliveryType is set. if pc.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 { lastRecord := records[last] - err := pc.client.CommitRecords(context.Background(), lastRecord) - if err != nil { + if err := pc.client.CommitRecords(ctx, lastRecord); err != nil { logger.Error("unable to commit records", zap.Error(err), zap.Int64("offset", lastRecord.Offset), diff --git a/kafka/consumer_test.go b/kafka/consumer_test.go index 6924e645..684b2e62 100644 --- a/kafka/consumer_test.go +++ b/kafka/consumer_test.go @@ -20,6 +20,7 @@ package kafka import ( "context" "crypto/tls" + "errors" "strconv" "sync/atomic" "testing" @@ -30,6 +31,8 @@ import ( "github.com/twmb/franz-go/pkg/kfake" "github.com/twmb/franz-go/pkg/kgo" "go.uber.org/zap" + "go.uber.org/zap/zapcore" + "go.uber.org/zap/zaptest" "github.com/elastic/apm-data/model" apmqueue "github.com/elastic/apm-queue" @@ -145,7 +148,7 @@ func TestConsumerFetch(t *testing.T) { GroupID: "groupid", Decoder: codec, Logger: zap.NewNop(), - Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + Processor: model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { assert.Len(t, *b, 1) assert.Equal(t, event, (*b)[0]) return nil @@ -155,20 +158,266 @@ func TestConsumerFetch(t *testing.T) { b, err := codec.Encode(event) require.NoError(t, err) - record := &kgo.Record{ - Topic: string(topics[0]), - Value: b, + produceRecord(context.Background(), t, client, + &kgo.Record{Topic: string(topics[0]), Value: b}, + ) + consumer := newConsumer(t, cfg) + assert.NoError(t, consumer.fetch(context.Background())) +} + +func TestConsumerDelivery(t *testing.T) { + // ALOD = at least once delivery + // AMOD = at most once delivery + event := model.APMEvent{Transaction: &model.Transaction{ID: "1"}} + codec := json.JSON{} + topics := []apmqueue.Topic{"topic"} + + // Produces `initialRecords` + `lastRecords` in total. Asserting the + // "lossy" behavior of the consumer implementation, depending on the + // chosen DeliveryType. + // `initialRecords` are produced, then, the concurrent consumer is + // started, and the first receive will always fail to process the first + // records in the received poll (`maxPollRecords`). + // Next, the context is cancelled and the consumer is stopped. + // A new consumer is created which takes over from the last committed + // offset. + // Depending on the DeliveryType some or none records should be lost. + cases := map[string]struct { + // Test setup. + deliveryType apmqueue.DeliveryType + initialRecords int + maxPollRecords int + lastRecords int + // Number of successfully processed events. The assertion is GE due to + // variable guarantee factors. + processed int32 + // Number of unsuccessfully processed events. + errored int32 + }{ + "12_produced_10_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 10, + maxPollRecords: 10, + lastRecords: 2, + + processed: 2, // The last produced records are processed. + errored: 10, // The initial produced records are lost. + }, + "30_produced_2_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 20, + maxPollRecords: 2, + lastRecords: 10, + + // 30 total - 2 errored - 2 lost before they can be processed. + processed: 26, + errored: 2, // The first two fetch fails. + }, + "12_produced_1_poll_AMOD": { + deliveryType: apmqueue.AtMostOnceDeliveryType, + initialRecords: 1, + maxPollRecords: 1, + lastRecords: 11, + + processed: 11, // The last produced records are processed. + errored: 1, // The initial produced records are lost. + }, + "12_produced_10_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 10, + maxPollRecords: 10, + lastRecords: 2, + + processed: 12, // All records are re-processed. + errored: 10, // The initial batch errors. + }, + "30_produced_2_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 20, + maxPollRecords: 2, + lastRecords: 10, + + processed: 30, // All records are processed. + errored: 2, // The initial batch errors. + }, + "12_produced_1_poll_ALOD": { + deliveryType: apmqueue.AtLeastOnceDeliveryType, + initialRecords: 1, + maxPollRecords: 1, + lastRecords: 11, + + processed: 11, + errored: 1, + }, } + for name, tc := range cases { + t.Run(name, func(t *testing.T) { + client, addrs := newClusterWithTopics(t, topics...) + failRecord := make(chan struct{}) + processRecord := make(chan struct{}) + defer close(failRecord) + + baseLogger := zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)) + + var processed atomic.Int32 + var errored atomic.Int32 + cfg := ConsumerConfig{ + Delivery: tc.deliveryType, + Brokers: addrs, + Decoder: codec, + Topics: topics, + GroupID: "groupid", + Logger: baseLogger, + MaxPollRecords: tc.maxPollRecords, + Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + select { + // Records are marked as processed on receive processRecord. + case <-processRecord: + processed.Add(int32(len(*b))) + // Records are marked as failed when ctx is canceled, or + // on receive failRecord. + case <-failRecord: + errored.Add(int32(len(*b))) + return errors.New("failed processing record") + case <-ctx.Done(): + errored.Add(int32(len(*b))) + return ctx.Err() + } + return nil + }), + } - results := client.ProduceSync(context.Background(), record) - assert.NoError(t, results.FirstErr()) + b, err := codec.Encode(event) + require.NoError(t, err) + record := &kgo.Record{ + Topic: string(topics[0]), + Value: b, + } - r, err := results.First() - assert.NoError(t, err) - assert.NotNil(t, r) + // Context used for the consumer + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i < int(tc.initialRecords); i++ { + produceRecord(ctx, t, client, record) + } - consumer := newConsumer(t, cfg) - assert.NoError(t, consumer.fetch(context.Background())) + cfg.Logger = baseLogger.Named("1") + consumer := newConsumer(t, cfg) + go func() { + err := consumer.Run(ctx) + assert.Error(t, err) + }() + + // Wait until the batch processor function is called. + // The first event is processed and after the context is canceled, + // the partition consumers are also stopped. Fetching and record + // processing is decoupled. The consumer may have fetched more + // records while waiting for `failRecord`. + // For AMOD, the offsets are committed after being fetched, which + // means that records may be lost before they reach the Processor. + select { + case failRecord <- struct{}{}: + cancel() + case <-time.After(time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + consumer.Close() + + assert.Eventually(t, func() bool { + return int(errored.Load()) == tc.maxPollRecords + }, time.Second, time.Millisecond) + + // Start a new consumer in the background and then produce + ctx, cancel = context.WithCancel(context.Background()) + defer cancel() + // Produce tc.lastRecords. + for i := 0; i < tc.lastRecords; i++ { + produceRecord(ctx, t, client, record) + } + cfg.MaxPollRecords = tc.lastRecords + cfg.Logger = baseLogger.Named("2") + consumer = newConsumer(t, cfg) + go func() { + assert.ErrorIs(t, consumer.Run(ctx), context.Canceled) + }() + // Wait for the first record to be consumed before running any assertion. + select { + case processRecord <- struct{}{}: + close(processRecord) // Allow records to be processed + case <-time.After(6 * time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + + assert.Eventually(t, func() bool { + // Some events may or may not be processed. Assert GE. + return processed.Load() >= tc.processed && + errored.Load() == tc.errored + }, 2*time.Second, time.Millisecond) + t.Logf("got: %d events errored, %d processed, want: %d errored, %d processed", + errored.Load(), processed.Load(), tc.errored, tc.processed, + ) + }) + } +} + +func TestGracefulSutdown(t *testing.T) { + test := func(t testing.TB, dt apmqueue.DeliveryType) { + client, brokers := newClusterWithTopics(t, "topic") + var codec json.JSON + var processed atomic.Int32 + var errored atomic.Int32 + process := make(chan struct{}) + records := 2 + consumer := newConsumer(t, ConsumerConfig{ + Brokers: brokers, + GroupID: "group", + Topics: []apmqueue.Topic{"topic"}, + Decoder: codec, + MaxPollRecords: records, + Delivery: dt, + Logger: zaptest.NewLogger(t, zaptest.Level(zapcore.DebugLevel)), + Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + select { + case <-ctx.Done(): + errored.Add(int32(len(*b))) + return ctx.Err() + case <-process: + processed.Add(int32(len(*b))) + } + return nil + }), + }) + + b, err := codec.Encode(model.APMEvent{Transaction: &model.Transaction{ID: "1"}}) + require.NoError(t, err) + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + for i := 0; i < records; i++ { + produceRecord(ctx, t, client, &kgo.Record{Topic: "topic", Value: b}) + } + + go func() { consumer.Run(ctx) }() + select { + case process <- struct{}{}: + close(process) // Allow records to be processed + cancel() // Stop the consumer. + case <-time.After(time.Second): + t.Fatal("timed out waiting for consumer to process event") + } + assert.Eventually(t, func() bool { + return processed.Load() == int32(records) && errored.Load() == 0 + }, 6*time.Second, time.Millisecond) + t.Logf("got: %d events processed, %d errored, want: %d processed", + processed.Load(), errored.Load(), records, + ) + } + + t.Run("AtLeastOnceDelivery", func(t *testing.T) { + test(t, apmqueue.AtLeastOnceDeliveryType) + }) + t.Run("AtMostOnceDelivery", func(t *testing.T) { + test(t, apmqueue.AtMostOnceDeliveryType) + }) } func TestMultipleConsumers(t *testing.T) { @@ -184,49 +433,36 @@ func TestMultipleConsumers(t *testing.T) { GroupID: "groupid", Decoder: codec, Logger: zap.NewNop(), - Processor: model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { + Processor: model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { count.Add(1) assert.Len(t, *b, 1) return nil }), } - b, err := codec.Encode(event) - require.NoError(t, err) - ctx, cancel := context.WithCancel(context.Background()) defer cancel() + consumers := 2 + for i := 0; i < consumers; i++ { + go func() { + consumer := newConsumer(t, cfg) + assert.ErrorIs(t, consumer.Run(ctx), context.Canceled) + }() + } - consumer1 := newConsumer(t, cfg) - go func() { - assert.ErrorIs(t, consumer1.Run(ctx), context.Canceled) - }() - - waitCh := make(chan struct{}) - go func() { - defer close(waitCh) - for i := 0; i < 1000; i++ { - results := client.ProduceSync(context.Background(), &kgo.Record{ - Topic: string(topics[0]), - Value: b, - }) - assert.NoError(t, results.FirstErr()) - record, err := results.First() - assert.NoError(t, err) - assert.NotNil(t, record) - } - }() - - consumer2 := newConsumer(t, cfg) - go func() { - assert.ErrorIs(t, consumer2.Run(ctx), context.Canceled) - }() - - <-waitCh - + b, err := codec.Encode(event) + require.NoError(t, err) + record := kgo.Record{ + Topic: string(topics[0]), + Value: b, + } + produced := 1000 + for i := 0; i < produced; i++ { + produceRecord(ctx, t, client, &record) + } assert.Eventually(t, func() bool { - return count.Load() == 1000 - }, 1*time.Second, 50*time.Millisecond) + return count.Load() == int32(produced) + }, time.Second, 50*time.Millisecond) } func TestMultipleConsumerGroups(t *testing.T) { @@ -234,7 +470,6 @@ func TestMultipleConsumerGroups(t *testing.T) { codec := json.JSON{} topics := []apmqueue.Topic{"topic"} client, addrs := newClusterWithTopics(t, topics...) - cfg := ConsumerConfig{ Brokers: addrs, Topics: topics, @@ -247,17 +482,15 @@ func TestMultipleConsumerGroups(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() - m := make(map[string]*atomic.Int64) - groups := 10 for i := 0; i < groups; i++ { gid := "groupid" + strconv.Itoa(i) cfg.GroupID = gid - m[gid] = &atomic.Int64{} - cfg.Processor = model.ProcessBatchFunc(func(ctx context.Context, b *model.Batch) error { - count := m[gid] - count.Add(1) + var counter atomic.Int64 + m[gid] = &counter + cfg.Processor = model.ProcessBatchFunc(func(_ context.Context, b *model.Batch) error { + counter.Add(1) assert.Len(t, *b, 1) assert.Equal(t, event, (*b)[0]) return nil @@ -270,7 +503,7 @@ func TestMultipleConsumerGroups(t *testing.T) { produceRecords := 100 for i := 0; i < produceRecords; i++ { - client.Produce(context.Background(), &kgo.Record{ + client.Produce(ctx, &kgo.Record{ Topic: string(topics[0]), Value: b, }, func(r *kgo.Record, err error) { @@ -282,7 +515,7 @@ func TestMultipleConsumerGroups(t *testing.T) { for k, i := range m { assert.Eventually(t, func() bool { return i.Load() == int64(produceRecords) - }, 1*time.Second, 50*time.Millisecond, k) + }, time.Second, 50*time.Millisecond, k) } } @@ -305,7 +538,7 @@ func TestConsumerRunError(t *testing.T) { require.Error(t, consumer.Run(context.Background())) } -func newConsumer(t *testing.T, cfg ConsumerConfig) *Consumer { +func newConsumer(t testing.TB, cfg ConsumerConfig) *Consumer { consumer, err := NewConsumer(cfg) require.NoError(t, err) t.Cleanup(func() { @@ -313,3 +546,12 @@ func newConsumer(t *testing.T, cfg ConsumerConfig) *Consumer { }) return consumer } + +func produceRecord(ctx context.Context, t testing.TB, c *kgo.Client, r *kgo.Record) { + t.Helper() + results := c.ProduceSync(ctx, r) + assert.NoError(t, results.FirstErr()) + r, err := results.First() + assert.NoError(t, err) + assert.NotNil(t, r) +} diff --git a/kafka/producer_test.go b/kafka/producer_test.go index e292c9a4..d9b72170 100644 --- a/kafka/producer_test.go +++ b/kafka/producer_test.go @@ -155,7 +155,7 @@ func TestNewProducerBasic(t *testing.T) { test(t, false) } -func newClusterWithTopics(t *testing.T, topics ...apmqueue.Topic) (*kgo.Client, []string) { +func newClusterWithTopics(t testing.TB, topics ...apmqueue.Topic) (*kgo.Client, []string) { t.Helper() cluster, err := kfake.NewCluster() require.NoError(t, err) diff --git a/systemtest/go.mod b/systemtest/go.mod index 29c78aae..62bf30dd 100644 --- a/systemtest/go.mod +++ b/systemtest/go.mod @@ -9,6 +9,8 @@ require ( github.com/hashicorp/hc-install v0.5.1 github.com/hashicorp/terraform-exec v0.18.1 github.com/stretchr/testify v1.8.2 + github.com/twmb/franz-go v1.13.3 + github.com/twmb/franz-go/pkg/kadm v1.8.1 go.uber.org/zap v1.24.0 golang.org/x/sync v0.2.0 ) @@ -40,7 +42,6 @@ require ( github.com/pierrec/lz4/v4 v4.1.17 // indirect github.com/pkg/errors v0.9.1 // indirect github.com/pmezard/go-difflib v1.0.0 // indirect - github.com/twmb/franz-go v1.13.3 // indirect github.com/twmb/franz-go/pkg/kmsg v1.5.0 // indirect github.com/twmb/franz-go/plugin/kotel v1.1.0 // indirect github.com/twmb/franz-go/plugin/kzap v1.1.2 // indirect diff --git a/systemtest/go.sum b/systemtest/go.sum index 402d1fb2..36ac4218 100644 --- a/systemtest/go.sum +++ b/systemtest/go.sum @@ -149,6 +149,7 @@ github.com/tidwall/sjson v1.2.5 h1:kLy8mja+1c9jlljvWTlSazM7cKDRfJuR/bOJhcY5NcY= github.com/twmb/franz-go v1.13.3 h1:AO0HcPu7hNMi+ue+jz3CnV+VpuAizaazQuqTo1SvLr4= github.com/twmb/franz-go v1.13.3/go.mod h1:jm/FtYxmhxDTN0gNSb26XaJY0irdSVcsckLiR5tQNMk= github.com/twmb/franz-go/pkg/kadm v1.8.1 h1:SrzL855I7gQTGdMtOYGTHhebs7TPgPN29FPtjusqwlE= +github.com/twmb/franz-go/pkg/kadm v1.8.1/go.mod h1:qUSM7pxoMCU1UNu5H4USE64ODcVmeG9LS96mysv1nu8= github.com/twmb/franz-go/pkg/kfake v0.0.0-20230405224959-0d137e78cd4a h1:n4MRnUKC+zXcZhoKpWMgl2T7yyJXP4POTSs6kl7GFl8= github.com/twmb/franz-go/pkg/kmsg v1.5.0 h1:eqVJquFQLdBNLrRMWX03pPDPpngn6PTjGZLlZnagouk= github.com/twmb/franz-go/pkg/kmsg v1.5.0/go.mod h1:se9Mjdt0Nwzc9lnjJ0HyDtLyBnaBDAd7pCje47OhSyw=