Skip to content

Commit f643704

Browse files
chore: add tests for offset manager
1 parent f9ee970 commit f643704

File tree

1 file changed

+1
-3
lines changed

1 file changed

+1
-3
lines changed

pkg/kafka/partition/offset_manager.go

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -201,13 +201,11 @@ func (r *KafkaOffsetManager) PartitionOffset(ctx context.Context, partitionID in
201201

202202
// Commit commits an offset to the consumer group
203203
func (r *KafkaOffsetManager) Commit(ctx context.Context, partitionID int32, offset int64) error {
204-
admin := kadm.NewClient(r.client)
205-
206204
// Commit the last consumed offset.
207205
toCommit := kadm.Offsets{}
208206
toCommit.AddOffset(r.cfg.Topic, partitionID, offset, -1)
209207

210-
committed, err := admin.CommitOffsets(ctx, r.ConsumerGroup(), toCommit)
208+
committed, err := r.adminClient.CommitOffsets(ctx, r.ConsumerGroup(), toCommit)
211209
if err != nil {
212210
return err
213211
} else if !committed.Ok() {

0 commit comments

Comments
 (0)