Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

kafka(consumer): Improve AtLeastOnce handling #125

Merged
merged 8 commits into from
May 5, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ require (
cloud.google.com/go/compute/metadata v0.2.3 // indirect
cloud.google.com/go/iam v0.13.0 // indirect
cloud.google.com/go/longrunning v0.4.1 // indirect
github.com/benbjohnson/clock v1.1.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/go-logr/logr v1.2.4 // indirect
github.com/go-logr/stdr v1.2.2 // indirect
Expand Down
1 change: 1 addition & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ cloud.google.com/go/pubsublite v1.8.0/go.mod h1:7UapoCpdX/0nt4phleaQlrJ5S2GH81hs
github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU=
github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY=
github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8=
github.com/benbjohnson/clock v1.1.0/go.mod h1:J11/hYXuz8f4ySSvYwY0FKfm+ezbsZBKZxNJlLklBHA=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/cespare/xxhash/v2 v2.1.1/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw=
Expand Down
77 changes: 40 additions & 37 deletions kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,12 @@ import (
"github.com/elastic/apm-queue/queuecontext"
)

var (
// ErrCommitFailed may be returned by `consumer.Run` when DeliveryType is
// apmqueue.AtMostOnceDelivery.
ErrCommitFailed = errors.New("kafka: failed to commit offsets")
)

// SASLMechanism type alias to sasl.Mechanism
type SASLMechanism = sasl.Mechanism

Expand All @@ -59,7 +65,7 @@ type ConsumerConfig struct {
// Version is the software version to use in the Kafka client. This is
// useful since it shows up in Kafka metrics and logs.
Version string
// Decoder holds an encoding.Decoder for decoding events.
// Decoder holds an encoding.Decoder for decoding records.
Decoder Decoder
// MaxPollRecords defines an upper bound to the number of records that can
// be polled on a single fetch. If MaxPollRecords <= 0, defaults to 100.
Expand Down Expand Up @@ -210,7 +216,7 @@ func NewConsumer(cfg ConsumerConfig) (*Consumer, error) {
}, nil
}

// Close closes the consumer.
// Close the consumer, blocking until all partition consumers are stopped.
func (c *Consumer) Close() error {
c.mu.Lock()
defer c.mu.Unlock()
Expand All @@ -221,7 +227,13 @@ func (c *Consumer) Close() error {
return nil
}

// Run executes the consumer in a blocking manner.
// Run the consumer until a non recoverable error is found:
// - context.Cancelled.
// - context.DeadlineExceeded.
// - kgo.ErrClientClosed.
// - ErrCommitFailed.
//
// To shut down the consumer, cancel the context, or call consumer.Close().
func (c *Consumer) Run(ctx context.Context) error {
for {
if err := c.fetch(ctx); err != nil {
Expand All @@ -233,9 +245,6 @@ func (c *Consumer) Run(ctx context.Context) error {
// fetch polls the Kafka broker for new records up to cfg.MaxPollRecords.
// Any errors returned by fetch should be considered fatal.
func (c *Consumer) fetch(ctx context.Context) error {
c.mu.RLock()
defer c.mu.RUnlock()

fetches := c.client.PollRecords(ctx, c.cfg.MaxPollRecords)
defer c.client.AllowRebalance()

Expand All @@ -252,12 +261,13 @@ func (c *Consumer) fetch(ctx context.Context) error {
case apmqueue.AtMostOnceDeliveryType:
// Commit the fetched record offsets as soon as we've polled them.
if err := c.client.CommitUncommittedOffsets(ctx); err != nil {
// If the commit fails, then return immediately and any uncommitted
// records will be re-delivered in time. Otherwise, records may be
// processed twice.
return nil
// NOTE(marclop): If the commit fails with an unrecoverable error,
// return it and terminate the consumer. This will avoid potentially
// processing records twice, and it's up to the consumer to re-start
// the consumer.
return ErrCommitFailed
}
// Allow rebalancing now that we have committed offsets, preventing
// Allow re-balancing now that we have committed offsets, preventing
// another consumer from reprocessing the records.
c.client.AllowRebalance()
}
Expand All @@ -266,6 +276,9 @@ func (c *Consumer) fetch(ctx context.Context) error {
zap.Error(err), zap.String("topic", t), zap.Int32("partition", p),
)
})
// No need to grab the lock until we want to access `c.consumer`.
c.mu.RLock()
defer c.mu.RUnlock()
// Send partition records to be processed by its dedicated goroutine.
fetches.EachPartition(func(ftp kgo.FetchTopicPartition) {
if len(ftp.Records) == 0 {
Expand All @@ -275,9 +288,9 @@ func (c *Consumer) fetch(ctx context.Context) error {
select {
case c.consumer.consumers[tp].records <- ftp.Records:
// When using AtMostOnceDelivery, if the context is cancelled between
// PollRecords and this line, the events will be lost.
// PollRecords and this line, records will be lost.
// NOTE(marclop) Add a shutdown timer so that there's a grace period
// to allow the events to be processed before they're lost.
// to allow the records to be processed before they're lost.
case <-ctx.Done():
if c.cfg.Delivery == apmqueue.AtMostOnceDeliveryType {
c.cfg.Logger.Warn(
Expand Down Expand Up @@ -320,8 +333,9 @@ type topicPartition struct {
}

// assigned must be set as a kgo.OnPartitionsAssigned callback. Ensuring all
// assigned partitions to this consumer process received records.
func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[string][]int32) {
// assigned partitions to this consumer process received records. The received
// context is only cancelled after the kgo.client is closed.
func (c *consumer) assigned(ctx context.Context, client *kgo.Client, assigned map[string][]int32) {
c.mu.Lock()
defer c.mu.Unlock()
for topic, partitions := range assigned {
Expand All @@ -337,7 +351,7 @@ func (c *consumer) assigned(_ context.Context, client *kgo.Client, assigned map[
}
go func(topic string, partition int32) {
defer c.wg.Done()
pc.consume(topic, partition)
pc.consume(ctx, topic, partition)
}(topic, partition)
tp := topicPartition{partition: partition, topic: topic}
c.consumers[tp] = pc
Expand Down Expand Up @@ -374,7 +388,8 @@ type partitionConsumer struct {

// consume processed the records from a topic and partition. Calling consume
// more than once will cause a panic.
func (pc partitionConsumer) consume(topic string, partition int32) {
// The received context which is only canceled when the kgo.Client is closed.
func (pc partitionConsumer) consume(ctx context.Context, topic string, partition int32) {
logger := pc.logger.With(
zap.String("topic", topic),
zap.Int32("partition", partition),
Expand All @@ -383,7 +398,6 @@ func (pc partitionConsumer) consume(topic string, partition int32) {
// Store the last processed record. Default to -1 for cases where
// only the first record is received.
last := -1
recordLoop:
for i, msg := range records {
meta := make(map[string]string)
for _, h := range msg.Headers {
Expand All @@ -397,38 +411,27 @@ func (pc partitionConsumer) consume(topic string, partition int32) {
zap.Int64("offset", msg.Offset),
zap.Any("headers", meta),
)
// TODO(marclop) DLQ? The decoding has failed, re-delivery
// may cause the same error. Discard the event for now.
// NOTE(marclop) The decoding has failed, a DLQ may be helpful.
continue
}
ctx := queuecontext.WithMetadata(context.Background(), meta)
ctx := queuecontext.WithMetadata(ctx, meta)
batch := model.Batch{event}
// If a record can't be processed, no retries are attempted and the
// record is lost. https://github.com/elastic/apm-queue/issues/118.
if err := pc.processor.ProcessBatch(ctx, &batch); err != nil {
logger.Error("unable to process event",
logger.Error("data loss: unable to process event",
zap.Error(err),
zap.Int64("offset", msg.Offset),
zap.Any("headers", meta),
)
switch pc.delivery {
case apmqueue.AtLeastOnceDeliveryType:
// Exit the loop and commit the last processed offset
// (if any). This ensures events which haven't been
// processed are re-delivered, but those that have, are
// committed.
break recordLoop
case apmqueue.AtMostOnceDeliveryType:
// Events which can't be processed, are lost.
continue
}
}
last = i
}
// Only commit the records when at least a record has been processed
// with AtLeastOnceDeliveryType.
// Only commit the records when at least a record has been decoded when
// AtLeastOnceDeliveryType is set.
if pc.delivery == apmqueue.AtLeastOnceDeliveryType && last >= 0 {
lastRecord := records[last]
err := pc.client.CommitRecords(context.Background(), lastRecord)
if err != nil {
if err := pc.client.CommitRecords(ctx, lastRecord); err != nil {
logger.Error("unable to commit records",
zap.Error(err),
zap.Int64("offset", lastRecord.Offset),
Expand Down
Loading