Skip to content

Commit 7fc404c

Browse files
chore: deleted unused code from OffsetManager (#19256)
1 parent bd5a065 commit 7fc404c

File tree

3 files changed

+11
-94
lines changed

3 files changed

+11
-94
lines changed

pkg/kafka/partition/offset_manager.go

Lines changed: 5 additions & 88 deletions
Original file line numberDiff line numberDiff line change
@@ -18,61 +18,12 @@ import (
1818
"github.com/grafana/loki/v3/pkg/kafka/client"
1919
)
2020

21-
// Partition level metadata in a more easily digestible form than what Kafka provides
22-
type Lag struct {
23-
// First Available Offset in retention
24-
startOffset int64
25-
// Exclusive; the next available offset (as of yet unwritten)
26-
endOffset int64
27-
// Last committed offset
28-
committedOffset int64
29-
// rawLag measures how far behind the most recently committed offset is from the current offset.
30-
// In special cases, this can be positive even when there are no more records to process,
31-
// which happens when there is a gap between the last committed offset and the current offset, but
32-
// it is out of retention (unrecoverable).
33-
rawLag int64
34-
}
35-
36-
func NewLag(startOffset, endOffset, committedOffset, rawLag int64) Lag {
37-
return Lag{
38-
startOffset: startOffset,
39-
endOffset: endOffset,
40-
committedOffset: committedOffset,
41-
rawLag: rawLag,
42-
}
43-
}
44-
45-
// FirstUncommittedOffset returns the first offset that has not yet been committed
46-
func (l Lag) FirstUncommittedOffset() int64 {
47-
// startOffset is the previously-committed offset, so we need to start processing the first
48-
// _uncommitted_ offset
49-
return max(l.committedOffset+1, l.startOffset)
50-
}
51-
52-
func (l Lag) LastCommittedOffset() int64 {
53-
return l.committedOffset
54-
}
55-
56-
// NextAvailableOffset returns the next unwritten offset in a partition,
57-
// i.e. the end offset (exclusive)
58-
func (l Lag) NextAvailableOffset() int64 {
59-
return l.endOffset
60-
}
61-
62-
// Lag returns the difference between the last produced offset
63-
// and the first Uncommitted (but available) offset
64-
func (l Lag) Lag() int64 {
65-
return l.endOffset - l.FirstUncommittedOffset()
66-
}
67-
6821
type OffsetManager interface {
6922
Topic() string
7023
ConsumerGroup() string
7124

72-
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
73-
GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error)
74-
FetchLastCommittedOffset(ctx context.Context, partition int32) (int64, error)
75-
FetchPartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
25+
LastCommittedOffset(ctx context.Context, partition int32) (int64, error)
26+
PartitionOffset(ctx context.Context, partition int32, position SpecialOffset) (int64, error)
7627
NextOffset(ctx context.Context, partition int32, t time.Time) (int64, error)
7728
Commit(ctx context.Context, partition int32, offset int64) error
7829
}
@@ -157,8 +108,8 @@ func (r *KafkaOffsetManager) NextOffset(ctx context.Context, partition int32, t
157108
return listed.Offset, nil
158109
}
159110

160-
// FetchLastCommittedOffset retrieves the last committed offset for this partition
161-
func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
111+
// LastCommittedOffset retrieves the last committed offset for this partition
112+
func (r *KafkaOffsetManager) LastCommittedOffset(ctx context.Context, partitionID int32) (int64, error) {
162113
req := kmsg.NewPtrOffsetFetchRequest()
163114
req.Topics = []kmsg.OffsetFetchRequestTopic{{
164115
Topic: r.cfg.Topic,
@@ -203,7 +154,7 @@ func (r *KafkaOffsetManager) FetchLastCommittedOffset(ctx context.Context, parti
203154
}
204155

205156
// FetchPartitionOffset retrieves the offset for a specific position
206-
func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
157+
func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID int32, position SpecialOffset) (int64, error) {
207158
partitionReq := kmsg.NewListOffsetsRequestTopicPartition()
208159
partitionReq.Partition = partitionID
209160
partitionReq.Timestamp = int64(position)
@@ -248,40 +199,6 @@ func (r *KafkaOffsetManager) FetchPartitionOffset(ctx context.Context, partition
248199
return partition.Offset, nil
249200
}
250201

251-
// GroupLag returns the lag for the consumer group. Uses fallbackOffsetMillis to calculate the lag if the consumer group has no commits.
252-
func (r *KafkaOffsetManager) GroupLag(ctx context.Context, fallbackOffsetMillis int64) (map[int32]Lag, error) {
253-
lag, err := GetGroupLag(ctx, r.adminClient, r.cfg.Topic, r.ConsumerGroup(), fallbackOffsetMillis)
254-
if err != nil {
255-
return nil, err
256-
}
257-
258-
offsets, ok := lag[r.cfg.Topic]
259-
if !ok {
260-
return nil, errors.New("no lag found for the topic")
261-
}
262-
263-
res := make(map[int32]Lag, len(offsets))
264-
265-
for partition, partitionOffset := range offsets {
266-
res[partition] = Lag{
267-
// 1. kadm.GroupMemberLag contains valid Commit.At even when consumer group never committed any offset.
268-
// no additional validation is needed here
269-
// 2. committed offset could be behind start offset if we are falling behind retention period.
270-
271-
// startOffset is the previously-committed offset, so we need to start processing the first
272-
// _uncommitted_ offset
273-
startOffset: max(partitionOffset.Commit.At+1, partitionOffset.Start.Offset),
274-
// endOffset is initially the next available offset: this is why we treat jobs as end-exclusive:
275-
// so we won't try polling forever to a partition that won't have any more records
276-
endOffset: partitionOffset.End.Offset,
277-
committedOffset: partitionOffset.Commit.At,
278-
rawLag: partitionOffset.Lag,
279-
}
280-
}
281-
282-
return res, nil
283-
}
284-
285202
// Commit commits an offset to the consumer group
286203
func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
287204
admin := kadm.NewClient(r.client)

pkg/kafka/partition/reader_service.go

Lines changed: 4 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -143,7 +143,7 @@ func (s *ReaderService) starting(ctx context.Context) error {
143143
logger := log.With(s.logger, "phase", phaseStarting)
144144
s.reader.SetPhase(phaseStarting)
145145
// Fetch the last committed offset to determine where to start reading
146-
lastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID)
146+
lastCommittedOffset, err := s.offsetManager.LastCommittedOffset(ctx, s.partitionID)
147147
if err != nil {
148148
return fmt.Errorf("fetching last committed offset: %w", err)
149149
}
@@ -235,14 +235,14 @@ func (s *ReaderService) fetchUntilLagSatisfied(
235235

236236
for b.Ongoing() {
237237
// Send a direct request to the Kafka backend to fetch the partition start offset.
238-
partitionStartOffset, err := s.offsetManager.FetchPartitionOffset(ctx, s.partitionID, KafkaStartOffset)
238+
partitionStartOffset, err := s.offsetManager.PartitionOffset(ctx, s.partitionID, KafkaStartOffset)
239239
if err != nil {
240240
level.Warn(logger).Log("msg", "partition reader failed to fetch partition start offset", "err", err)
241241
b.Wait()
242242
continue
243243
}
244244

245-
consumerGroupLastCommittedOffset, err := s.offsetManager.FetchLastCommittedOffset(ctx, s.partitionID)
245+
consumerGroupLastCommittedOffset, err := s.offsetManager.LastCommittedOffset(ctx, s.partitionID)
246246
if err != nil {
247247
level.Warn(logger).Log("msg", "partition reader failed to fetch last committed offset", "err", err)
248248
b.Wait()
@@ -253,7 +253,7 @@ func (s *ReaderService) fetchUntilLagSatisfied(
253253
// We intentionally don't use WaitNextFetchLastProducedOffset() to not introduce further
254254
// latency.
255255
lastProducedOffsetRequestedAt := time.Now()
256-
lastProducedOffset, err := s.offsetManager.FetchPartitionOffset(ctx, s.partitionID, KafkaEndOffset)
256+
lastProducedOffset, err := s.offsetManager.PartitionOffset(ctx, s.partitionID, KafkaEndOffset)
257257
if err != nil {
258258
level.Warn(logger).Log("msg", "partition reader failed to fetch last produced offset", "err", err)
259259
b.Wait()

pkg/limits/partition_lifecycler.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,7 +78,7 @@ func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, par
7878
logger := log.With(l.logger, "partition", partition)
7979
// Get the start offset for the partition. This can be greater than zero
8080
// if a retention period has deleted old records.
81-
startOffset, err := l.offsetManager.FetchPartitionOffset(
81+
startOffset, err := l.offsetManager.PartitionOffset(
8282
ctx, partition, kafka_partition.KafkaStartOffset)
8383
if err != nil {
8484
return fmt.Errorf("failed to get last produced offset: %w", err)
@@ -87,7 +87,7 @@ func (l *partitionLifecycler) determineStateFromOffsets(ctx context.Context, par
8787
// record. For example, if a partition contains 1 record, then the last
8888
// produced offset is 1. However, the offset of the last produced record
8989
// is 0, as offsets start from 0.
90-
lastProducedOffset, err := l.offsetManager.FetchPartitionOffset(
90+
lastProducedOffset, err := l.offsetManager.PartitionOffset(
9191
ctx, partition, kafka_partition.KafkaEndOffset)
9292
if err != nil {
9393
return fmt.Errorf("failed to get last produced offset: %w", err)

0 commit comments

Comments
 (0)