Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 3 additions & 4 deletions pkg/kafka/partition/committer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,12 @@ func TestPartitionCommitter(t *testing.T) {
defer admClient.Close()

// Set up a committer for partition 1 and another for partition 2.
offsetManager, err := NewKafkaOffsetManager(
kafkaCfg,
offsetManager := NewKafkaOffsetManager(
client,
topic,
consumerGroup,
log.NewNopLogger(),
prometheus.NewRegistry(),
)
require.NoError(t, err)
partition1 := int32(1)
committer1 := newCommitter(
offsetManager,
Expand Down
118 changes: 44 additions & 74 deletions pkg/kafka/partition/offset_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,91 +8,65 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"

"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
)

// An OffsetManager manages commit offsets for a consumer group.
type OffsetManager interface {
Topic() string
ConsumerGroup() string

// LastCommittedOffset returns the last committed offset for the partition.
LastCommittedOffset(ctx context.Context, partition int32) (int64, error)

// PartitionOffset returns the last produced offset for the partition.
PartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)

// NextOffset returns the first offset after t. If there are no offsets
// after t, it returns the latest offset instead.
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error)

// Commits the offset for the partition.
Commit(ctx context.Context, partition int32, offset int64) error
}

// Compile time check that KafkaOffsetManager implements OffsetManager.
var _ OffsetManager = &KafkaOffsetManager{}

// KafkaOffsetManager implements the [OffsetManager] interface.
type KafkaOffsetManager struct {
client *kgo.Client
adminClient *kadm.Client
cfg kafka.Config
instanceID string
logger log.Logger
admin *kadm.Client
client *kgo.Client
topic string
consumerGroup string
logger log.Logger
}

// NewKafkaOffsetManager creates a new KafkaOffsetManager.
func NewKafkaOffsetManager(
cfg kafka.Config,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*KafkaOffsetManager, error) {
// Create a new Kafka client for the partition manager.
c, err := client.NewReaderClient("partition-manager", cfg, log.With(logger, "component", "kafka-client"), reg)
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}

return newKafkaOffsetManager(
c,
cfg,
instanceID,
logger,
), nil
}

// newKafkaReader creates a new KafkaReader instance
func newKafkaOffsetManager(
client *kgo.Client,
cfg kafka.Config,
instanceID string,
topic string,
consumerGroup string,
logger log.Logger,
) *KafkaOffsetManager {
return &KafkaOffsetManager{
client: client,
adminClient: kadm.NewClient(client),
cfg: cfg,
instanceID: instanceID,
logger: logger,
admin: kadm.NewClient(client),
client: client,
topic: topic,
consumerGroup: consumerGroup,
logger: log.With(logger, "topic", topic, "consumer_group", consumerGroup),
}
}

// Topic returns the topic being read
func (r *KafkaOffsetManager) Topic() string {
return r.cfg.Topic
}

func (r *KafkaOffsetManager) ConsumerGroup() string {
return r.cfg.GetConsumerGroup(r.instanceID)
}

// NextOffset returns the first offset after the timestamp t. If the partition
// does not have an offset after t, it returns the current end offset.
func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
resp, err := r.adminClient.ListOffsetsAfterMilli(ctx, t.UnixMilli(), r.cfg.Topic)
// NextOffset implements the [OffsetManager] interface.
func (m *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
resp, err := m.admin.ListOffsetsAfterMilli(ctx, t.UnixMilli(), m.topic)
if err != nil {
return 0, err
}
// If a topic does not exist, a special -1 partition for each non-existing
// topic is added to the response.
partitions := resp[r.cfg.Topic]
partitions := resp[m.topic]
if special, ok := partitions[-1]; ok {
return 0, special.Err
}
Expand All @@ -108,16 +82,16 @@ func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t
return listed.Offset, nil
}

// LastCommittedOffset retrieves the last committed offset for this partition
func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
// LastCommittedOffset implements the [OffsetManager] interface.
func (m *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
req := kmsg.NewPtrOffsetFetchRequest()
req.Topics = []kmsg.OffsetFetchRequestTopic{{
Topic: r.cfg.Topic,
Topic: m.topic,
Partitions: []int32{partitionID},
}}
req.Group = r.ConsumerGroup()
req.Group = m.consumerGroup

resps := r.client.RequestSharded(ctx, req)
resps := m.client.RequestSharded(ctx, req)

// Since we issued a request for only 1 partition, we expect exactly 1 response.
if expected, actual := 1, len(resps); actual != expected {
Expand All @@ -139,7 +113,7 @@ func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionI
if len(fetchRes.Groups) != 1 ||
len(fetchRes.Groups[0].Topics) != 1 ||
len(fetchRes.Groups[0].Topics[0].Partitions) != 1 {
level.Debug(r.logger).Log(
level.Debug(m.logger).Log(
"msg", "malformed response, setting to start offset",
)
return int64(KafkaStartOffset), nil
Expand All @@ -153,14 +127,14 @@ func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionI
return partition.Offset, nil
}

// FetchPartitionOffset retrieves the offset for a specific position
func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
// PatitionOffset implements the [OffsetManager] interface.
func (m *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
partitionReq.Partition = partitionID
partitionReq.Timestamp = int64(position)

topicReq := kmsg.NewListOffsetsRequestTopic()
topicReq.Topic = r.cfg.Topic
topicReq.Topic = m.topic
topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq}

req := kmsg.NewPtrListOffsetsRequest()
Expand All @@ -169,7 +143,7 @@ func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID in

// Even if we share the same client, other in-flight requests are not canceled once this context is canceled
// (or its deadline is exceeded). We've verified it with a unit test.
resps := r.client.RequestSharded(ctx, req)
resps := m.client.RequestSharded(ctx, req)

// Since we issued a request for only 1 partition, we expect exactly 1 response.
if len(resps) != 1 {
Expand Down Expand Up @@ -199,22 +173,18 @@ func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID in
return partition.Offset, nil
}

// Commit commits an offset to the consumer group
func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
admin := kadm.NewClient(r.client)

// Commit implements the [OffsetManager] interface.
func (m *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
// Commit the last consumed offset.
toCommit := kadm.Offsets{}
toCommit.AddOffset(r.cfg.Topic, partitionID, offset, -1)

committed, err := admin.CommitOffsets(ctx, r.ConsumerGroup(), toCommit)
toCommit.AddOffset(m.topic, partitionID, offset, -1)
committed, err := m.admin.CommitOffsets(ctx, m.consumerGroup, toCommit)
if err != nil {
return err
} else if !committed.Ok() {
return committed.Error()
}

committedOffset, _ := committed.Lookup(r.cfg.Topic, partitionID)
level.Debug(r.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
committedOffset, _ := committed.Lookup(m.topic, partitionID)
level.Debug(m.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
return nil
}
21 changes: 14 additions & 7 deletions pkg/kafka/partition/reader_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,21 @@ func NewReaderService(
return nil, fmt.Errorf("creating kafka reader: %w", err)
}

offsetManager, err := NewKafkaOffsetManager(
kafkaCfg,
instanceID,
// Create a new Kafka client for the partition manager.
offsetManagerClient, err := client.NewReaderClient("partition-manager", kafkaCfg, log.With(logger, "component", "kafka-client"), reg)
if err != nil {
return nil, fmt.Errorf("failed to create kafka client for offset manager: %w", err)
}

consumerGroup := kafkaCfg.GetConsumerGroup(instanceID)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was moved out of offset_manager.go. It is relevant for partition ingesters but not for limits service.

offsetManager := NewKafkaOffsetManager(
offsetManagerClient,
kafkaCfg.Topic,
consumerGroup,
logger,
reg,
)
if err != nil {
return nil, fmt.Errorf("creating kafka offset manager: %w", err)
return nil, fmt.Errorf("failed to create kafka offset manager: %w", err)
}

return newReaderService(
Expand All @@ -103,7 +110,7 @@ func NewReaderService(
offsetManager,
partitionID,
consumerFactory,
logger,
log.With(logger, "consumer_group", consumerGroup),
reg,
), nil
}
Expand All @@ -123,7 +130,7 @@ func newReaderService(
offsetManager: offsetManager,
partitionID: partitionID,
consumerFactory: consumerFactory,
logger: log.With(logger, "partition", partitionID, "consumer_group", offsetManager.ConsumerGroup()),
logger: log.With(logger, "partition", partitionID),
metrics: newServiceMetrics(reg),
lastProcessedOffset: int64(KafkaEndOffset),
}
Expand Down
10 changes: 7 additions & 3 deletions pkg/limits/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,11 +99,15 @@ func New(cfg Config, limits Limits, logger log.Logger, reg prometheus.Registerer
kCfg.Topic = cfg.Topic
kCfg.AutoCreateTopicEnabled = true
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions
offsetManager, err := partition.NewKafkaOffsetManager(
kCfg,
offsetManagerClient, err := client.NewReaderClient("partition-manager", kCfg, log.With(logger, "component", "kafka-client"), reg)
if err != nil {
return nil, fmt.Errorf("failed to create offset manager client: %w", err)
}
offsetManager := partition.NewKafkaOffsetManager(
offsetManagerClient,
cfg.Topic,
cfg.ConsumerGroup,
logger,
prometheus.NewRegistry(),
)
if err != nil {
return nil, fmt.Errorf("failed to create offset manager: %w", err)
Expand Down