Skip to content

Commit 89756fb

Browse files
feat: decouple offset manager from shared Kafka configuration
This commit decouples the offset manager from the shared Kafka configuration. This is important as I will soon start work on a Kafka V2 client with a new configuration struct that is more generic and less coupled to the needs of partition ingesters.
1 parent 7fc404c commit 89756fb

File tree

4 files changed

+80
-96
lines changed

4 files changed

+80
-96
lines changed

pkg/kafka/partition/committer_test.go

Lines changed: 15 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,14 @@ import (
1818
)
1919

2020
func TestPartitionCommitter(t *testing.T) {
21+
const (
22+
topic = "test-topic"
23+
consumerGroup = "test-consumer-group"
24+
numPartitions = int32(3)
25+
)
2126
// Create a test Kafka cluster
22-
numPartitions := int32(3)
23-
topicName := "test-topic"
24-
_, kafkaCfg := testkafka.CreateCluster(t, numPartitions, topicName)
25-
kafkaCfg.ConsumerGroup = "test-group"
27+
_, kafkaCfg := testkafka.CreateCluster(t, numPartitions, topic)
28+
kafkaCfg.ConsumerGroup = consumerGroup
2629

2730
client, err := client.NewReaderClient("test-client", kafkaCfg, log.NewNopLogger(), prometheus.NewRegistry())
2831
require.NoError(t, err)
@@ -35,13 +38,13 @@ func TestPartitionCommitter(t *testing.T) {
3538
logger := log.NewNopLogger()
3639
reg := prometheus.NewRegistry()
3740
partitionID := int32(1)
38-
reader := newKafkaOffsetManager(
41+
offsetManager := NewKafkaOffsetManager(
3942
client,
40-
kafkaCfg,
41-
"fake-instance-id",
43+
topic,
44+
consumerGroup,
4245
logger,
4346
)
44-
committer := newCommitter(reader, partitionID, kafkaCfg.ConsumerGroupOffsetCommitInterval, logger, reg)
47+
committer := newCommitter(offsetManager, partitionID, kafkaCfg.ConsumerGroupOffsetCommitInterval, logger, reg)
4548

4649
// Test committing an offset
4750
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
@@ -57,9 +60,9 @@ func TestPartitionCommitter(t *testing.T) {
5760
assert.Equal(t, float64(testOffset), testutil.ToFloat64(committer.lastCommittedOffset))
5861

5962
// Verify committed offset
60-
offsets, err := admClient.FetchOffsets(context.Background(), reader.ConsumerGroup())
63+
offsets, err := admClient.FetchOffsets(context.Background(), consumerGroup)
6164
require.NoError(t, err)
62-
committedOffset, ok := offsets.Lookup(topicName, partitionID)
65+
committedOffset, ok := offsets.Lookup(topic, partitionID)
6366
require.True(t, ok)
6467
assert.Equal(t, testOffset, committedOffset.At)
6568

@@ -74,9 +77,9 @@ func TestPartitionCommitter(t *testing.T) {
7477
assert.Equal(t, float64(newTestOffset), testutil.ToFloat64(committer.lastCommittedOffset))
7578

7679
// Verify updated committed offset
77-
offsets, err = admClient.FetchOffsets(context.Background(), reader.ConsumerGroup())
80+
offsets, err = admClient.FetchOffsets(context.Background(), consumerGroup)
7881
require.NoError(t, err)
79-
committedOffset, ok = offsets.Lookup(topicName, partitionID)
82+
committedOffset, ok = offsets.Lookup(topic, partitionID)
8083
require.True(t, ok)
8184
assert.Equal(t, newTestOffset, committedOffset.At)
8285
}

pkg/kafka/partition/offset_manager.go

Lines changed: 44 additions & 74 deletions
Original file line numberDiff line numberDiff line change
@@ -8,91 +8,65 @@ import (
88

99
"github.com/go-kit/log"
1010
"github.com/go-kit/log/level"
11-
"github.com/prometheus/client_golang/prometheus"
1211
"github.com/twmb/franz-go/pkg/kadm"
1312
"github.com/twmb/franz-go/pkg/kerr"
1413
"github.com/twmb/franz-go/pkg/kgo"
1514
"github.com/twmb/franz-go/pkg/kmsg"
16-
17-
"github.com/grafana/loki/v3/pkg/kafka"
18-
"github.com/grafana/loki/v3/pkg/kafka/client"
1915
)
2016

17+
// A OffsetManager manages commit offsets for a Consumer Group.
2118
type OffsetManager interface {
22-
Topic() string
23-
ConsumerGroup() string
24-
19+
// LastCommittedOffset returns the last committed offset for the partition.
2520
LastCommittedOffset(ctx context.Context, partition int32) (int64, error)
21+
22+
// PartitionOffset returns the last produced offset for the partition.
2623
PartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
24+
25+
// NextOffset returns the first offset after t. If there are no offsets
26+
// after t, it returns the latest offset instead.
2727
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error)
28+
29+
// Commits the offset for the partition.
2830
Commit(ctx context.Context, partition int32, offset int64) error
2931
}
3032

33+
// Compile time check that KafkaOffsetManager implements OffsetManager.
3134
var _ OffsetManager = &KafkaOffsetManager{}
3235

36+
// KafkaOffsetManager implements the [OffsetManager] interface.
3337
type KafkaOffsetManager struct {
34-
client *kgo.Client
35-
adminClient *kadm.Client
36-
cfg kafka.Config
37-
instanceID string
38-
logger log.Logger
38+
admin *kadm.Client
39+
client *kgo.Client
40+
topic string
41+
consumerGroup string
42+
logger log.Logger
3943
}
4044

45+
// NewKafkaOffsetManager creates a new KafkaOffsetManager.
4146
func NewKafkaOffsetManager(
42-
cfg kafka.Config,
43-
instanceID string,
44-
logger log.Logger,
45-
reg prometheus.Registerer,
46-
) (*KafkaOffsetManager, error) {
47-
// Create a new Kafka client for the partition manager.
48-
c, err := client.NewReaderClient("partition-manager", cfg, log.With(logger, "component", "kafka-client"), reg)
49-
if err != nil {
50-
return nil, fmt.Errorf("creating kafka client: %w", err)
51-
}
52-
53-
return newKafkaOffsetManager(
54-
c,
55-
cfg,
56-
instanceID,
57-
logger,
58-
), nil
59-
}
60-
61-
// newKafkaReader creates a new KafkaReader instance
62-
func newKafkaOffsetManager(
6347
client *kgo.Client,
64-
cfg kafka.Config,
65-
instanceID string,
48+
topic string,
49+
consumerGroup string,
6650
logger log.Logger,
6751
) *KafkaOffsetManager {
6852
return &KafkaOffsetManager{
69-
client: client,
70-
adminClient: kadm.NewClient(client),
71-
cfg: cfg,
72-
instanceID: instanceID,
73-
logger: logger,
53+
admin: kadm.NewClient(client),
54+
client: client,
55+
topic: topic,
56+
consumerGroup: consumerGroup,
57+
logger: log.With(logger, "topic", topic, "consumer_group", consumerGroup),
7458
}
7559
}
7660

77-
// Topic returns the topic being read
78-
func (r *KafkaOffsetManager) Topic() string {
79-
return r.cfg.Topic
80-
}
81-
82-
func (r *KafkaOffsetManager) ConsumerGroup() string {
83-
return r.cfg.GetConsumerGroup(r.instanceID)
84-
}
85-
86-
// NextOffset returns the first offset after the timestamp t. If the partition
87-
// does not have an offset after t, it returns the current end offset.
88-
func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
89-
resp, err := r.adminClient.ListOffsetsAfterMilli(ctx, t.UnixMilli(), r.cfg.Topic)
61+
// NextOffset implements the [OffsetManager] interface.
62+
func (m *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error) {
63+
resp, err := m.admin.ListOffsetsAfterMilli(ctx, t.UnixMilli(), m.topic)
9064
if err != nil {
9165
return 0, err
9266
}
9367
// If a topic does not exist, a special -1 partition for each non-existing
9468
// topic is added to the response.
95-
partitions := resp[r.cfg.Topic]
69+
partitions := resp[m.topic]
9670
if special, ok := partitions[-1]; ok {
9771
return 0, special.Err
9872
}
@@ -108,16 +82,16 @@ func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t
10882
return listed.Offset, nil
10983
}
11084

111-
// LastCommittedOffset retrieves the last committed offset for this partition
112-
func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
85+
// LastCommittedOffset implements the [OffsetManager] interface.
86+
func (m *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
11387
req := kmsg.NewPtrOffsetFetchRequest()
11488
req.Topics = []kmsg.OffsetFetchRequestTopic{{
115-
Topic: r.cfg.Topic,
89+
Topic: m.topic,
11690
Partitions: []int32{partitionID},
11791
}}
118-
req.Group = r.ConsumerGroup()
92+
req.Group = m.consumerGroup
11993

120-
resps := r.client.RequestSharded(ctx, req)
94+
resps := m.client.RequestSharded(ctx, req)
12195

12296
// Since we issued a request for only 1 partition, we expect exactly 1 response.
12397
if expected, actual := 1, len(resps); actual != expected {
@@ -139,7 +113,7 @@ func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionI
139113
if len(fetchRes.Groups) != 1 ||
140114
len(fetchRes.Groups[0].Topics) != 1 ||
141115
len(fetchRes.Groups[0].Topics[0].Partitions) != 1 {
142-
level.Debug(r.logger).Log(
116+
level.Debug(m.logger).Log(
143117
"msg", "malformed response, setting to start offset",
144118
)
145119
return int64(KafkaStartOffset), nil
@@ -153,14 +127,14 @@ func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionI
153127
return partition.Offset, nil
154128
}
155129

156-
// FetchPartitionOffset retrieves the offset for a specific position
157-
func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
130+
// PatitionOffset implements the [OffsetManager] interface.
131+
func (m *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
158132
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
159133
partitionReq.Partition = partitionID
160134
partitionReq.Timestamp = int64(position)
161135

162136
topicReq := kmsg.NewListOffsetsRequestTopic()
163-
topicReq.Topic = r.cfg.Topic
137+
topicReq.Topic = m.topic
164138
topicReq.Partitions = []kmsg.ListOffsetsRequestTopicPartition{partitionReq}
165139

166140
req := kmsg.NewPtrListOffsetsRequest()
@@ -169,7 +143,7 @@ func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID in
169143

170144
// Even if we share the same client, other in-flight requests are not canceled once this context is canceled
171145
// (or its deadline is exceeded). We've verified it with a unit test.
172-
resps := r.client.RequestSharded(ctx, req)
146+
resps := m.client.RequestSharded(ctx, req)
173147

174148
// Since we issued a request for only 1 partition, we expect exactly 1 response.
175149
if len(resps) != 1 {
@@ -199,22 +173,18 @@ func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID in
199173
return partition.Offset, nil
200174
}
201175

202-
// Commit commits an offset to the consumer group
203-
func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
204-
admin := kadm.NewClient(r.client)
205-
176+
// Commit implements the [OffsetManager] interface.
177+
func (m *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
206178
// Commit the last consumed offset.
207179
toCommit := kadm.Offsets{}
208-
toCommit.AddOffset(r.cfg.Topic, partitionID, offset, -1)
209-
210-
committed, err := admin.CommitOffsets(ctx, r.ConsumerGroup(), toCommit)
180+
toCommit.AddOffset(m.topic, partitionID, offset, -1)
181+
committed, err := m.admin.CommitOffsets(ctx, m.consumerGroup, toCommit)
211182
if err != nil {
212183
return err
213184
} else if !committed.Ok() {
214185
return committed.Error()
215186
}
216-
217-
committedOffset, _ := committed.Lookup(r.cfg.Topic, partitionID)
218-
level.Debug(r.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
187+
committedOffset, _ := committed.Lookup(m.topic, partitionID)
188+
level.Debug(m.logger).Log("msg", "last commit offset successfully committed to Kafka", "offset", committedOffset.At)
219189
return nil
220190
}

pkg/kafka/partition/reader_service.go

Lines changed: 14 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -84,14 +84,21 @@ func NewReaderService(
8484
return nil, fmt.Errorf("creating kafka reader: %w", err)
8585
}
8686

87-
offsetManager, err := NewKafkaOffsetManager(
88-
kafkaCfg,
89-
instanceID,
87+
// Create a new Kafka client for the partition manager.
88+
offsetManagerClient, err := client.NewReaderClient("partition-manager", kafkaCfg, log.With(logger, "component", "kafka-client"), reg)
89+
if err != nil {
90+
return nil, fmt.Errorf("failed to create kafka client for offset manager: %w", err)
91+
}
92+
93+
consumerGroup := kafkaCfg.GetConsumerGroup(instanceID)
94+
offsetManager := NewKafkaOffsetManager(
95+
offsetManagerClient,
96+
kafkaCfg.Topic,
97+
consumerGroup,
9098
logger,
91-
reg,
9299
)
93100
if err != nil {
94-
return nil, fmt.Errorf("creating kafka offset manager: %w", err)
101+
return nil, fmt.Errorf("failed to create kafka offset manager: %w", err)
95102
}
96103

97104
return newReaderService(
@@ -103,7 +110,7 @@ func NewReaderService(
103110
offsetManager,
104111
partitionID,
105112
consumerFactory,
106-
logger,
113+
log.With(logger, "consumer_group", consumerGroup),
107114
reg,
108115
), nil
109116
}
@@ -123,7 +130,7 @@ func newReaderService(
123130
offsetManager: offsetManager,
124131
partitionID: partitionID,
125132
consumerFactory: consumerFactory,
126-
logger: log.With(logger, "partition", partitionID, "consumer_group", offsetManager.ConsumerGroup()),
133+
logger: log.With(logger, "partition", partitionID),
127134
metrics: newServiceMetrics(reg),
128135
lastProcessedOffset: int64(KafkaEndOffset),
129136
}

pkg/limits/service.go

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,11 +99,15 @@ func New(cfg Config, limits Limits, logger log.Logger, reg prometheus.Registerer
9999
kCfg.Topic = cfg.Topic
100100
kCfg.AutoCreateTopicEnabled = true
101101
kCfg.AutoCreateTopicDefaultPartitions = cfg.NumPartitions
102-
offsetManager, err := partition.NewKafkaOffsetManager(
103-
kCfg,
102+
offsetManagerClient, err := client.NewReaderClient("partition-manager", kCfg, log.With(logger, "component", "kafka-client"), reg)
103+
if err != nil {
104+
return nil, fmt.Errorf("failed to create offset manager client: %w", err)
105+
}
106+
offsetManager := partition.NewKafkaOffsetManager(
107+
offsetManagerClient,
108+
cfg.Topic,
104109
cfg.ConsumerGroup,
105110
logger,
106-
prometheus.NewRegistry(),
107111
)
108112
if err != nil {
109113
return nil, fmt.Errorf("failed to create offset manager: %w", err)

0 commit comments

Comments
 (0)