Skip to content

Commit 34421d0

Browse files
committed
Add Redpanda Migrator offset metadata
- New `redpanda_migrator_offsets` input. - Field `kafka_offset_metadata` added to the `redpanda_migrator_offsets` output. Signed-off-by: Mihai Todor <[email protected]>
1 parent 4461a69 commit 34421d0

8 files changed

+221
-41
lines changed

CHANGELOG.md

+3-1
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,11 @@ All notable changes to this project will be documented in this file.
77

88
### Added
99

10-
- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad)
10+
- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL. (@le-vlad)
1111
- Field `metadata_max_age` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
1212
- Field `kafka_timestamp_ms` added to the `kafka`, `kafka_franz`, `redpanda`, `redpanda_common` and `redpanda_migrator` outputs. (@mihaitodor)
13+
- New `redpanda_migrator_offsets` input. (@mihaitodor)
14+
- Field `kafka_offset_metadata` added to the `redpanda_migrator_offsets` output. (@mihaitodor)
1315

1416
### Changed
1517

internal/impl/kafka/enterprise/redpanda_common_input.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func init() {
114114
time.Sleep(time.Millisecond * 100)
115115
}
116116
return
117-
})
117+
}, nil)
118118
if err != nil {
119119
return nil, err
120120
}

internal/impl/kafka/enterprise/redpanda_migrator_bundle_input.tmpl.yaml

+7-10
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ fields:
3838
mapping: |
3939
#!blobl
4040
41-
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "tls", "sasl").assign({"topics": ["__consumer_offsets"]})
41+
let redpandaMigratorOffsets = this.redpanda_migrator.with("seed_brokers", "consumer_group", "client_id", "rack_id", "fetch_max_bytes", "fetch_min_bytes", "fetch_max_partition_bytes", "tls", "sasl")
4242
4343
root = if this.redpanda_migrator.length() == 0 {
4444
throw("the redpanda_migrator input must be configured")
@@ -63,7 +63,7 @@ mapping: |
6363
- redpanda_migrator: %s
6464
processors:
6565
- mapping: meta input_label = "redpanda_migrator"
66-
- kafka_franz: %s
66+
- redpanda_migrator_offsets: %s
6767
processors:
6868
- mapping: meta input_label = "redpanda_migrator_offsets"
6969
""".format(this.schema_registry.string(), this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
@@ -86,7 +86,7 @@ mapping: |
8686
- redpanda_migrator: %s
8787
processors:
8888
- mapping: meta input_label = "redpanda_migrator"
89-
- kafka_franz: %s
89+
- redpanda_migrator_offsets: %s
9090
processors:
9191
- mapping: meta input_label = "redpanda_migrator_offsets"
9292
""".format(this.schema_registry.string(), this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
@@ -97,7 +97,7 @@ mapping: |
9797
- redpanda_migrator: %s
9898
processors:
9999
- mapping: meta input_label = "redpanda_migrator"
100-
- kafka_franz: %s
100+
- redpanda_migrator_offsets: %s
101101
processors:
102102
- mapping: meta input_label = "redpanda_migrator_offsets"
103103
""".format(this.redpanda_migrator.string(), $redpandaMigratorOffsets.string()).parse_yaml()
@@ -137,9 +137,8 @@ tests:
137137
consumer_group: "migrator"
138138
processors:
139139
- mapping: meta input_label = "redpanda_migrator"
140-
- kafka_franz:
140+
- redpanda_migrator_offsets:
141141
seed_brokers: [ "127.0.0.1:9092" ]
142-
topics: [ "__consumer_offsets" ]
143142
consumer_group: "migrator"
144143
processors:
145144
- mapping: meta input_label = "redpanda_migrator_offsets"
@@ -177,9 +176,8 @@ tests:
177176
consumer_group: "migrator"
178177
processors:
179178
- mapping: meta input_label = "redpanda_migrator"
180-
- kafka_franz:
179+
- redpanda_migrator_offsets:
181180
seed_brokers: [ "127.0.0.1:9092" ]
182-
topics: [ "__consumer_offsets" ]
183181
consumer_group: "migrator"
184182
processors:
185183
- mapping: meta input_label = "redpanda_migrator_offsets"
@@ -200,9 +198,8 @@ tests:
200198
consumer_group: "migrator"
201199
processors:
202200
- mapping: meta input_label = "redpanda_migrator"
203-
- kafka_franz:
201+
- redpanda_migrator_offsets:
204202
seed_brokers: [ "127.0.0.1:9092" ]
205-
topics: [ "__consumer_offsets" ]
206203
consumer_group: "migrator"
207204
processors:
208205
- mapping: meta input_label = "redpanda_migrator_offsets"
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,148 @@
1+
// Copyright 2024 Redpanda Data, Inc.
2+
//
3+
// Licensed as a Redpanda Enterprise file under the Redpanda Community
4+
// License (the "License"); you may not use this file except in compliance with
5+
// the License. You may obtain a copy of the License at
6+
//
7+
// https://github.com/redpanda-data/connect/blob/main/licenses/rcl.md
8+
9+
package enterprise
10+
11+
import (
12+
"slices"
13+
14+
"github.com/twmb/franz-go/pkg/kgo"
15+
"github.com/twmb/franz-go/pkg/kmsg"
16+
17+
"github.com/redpanda-data/benthos/v4/public/service"
18+
"github.com/redpanda-data/connect/v4/internal/impl/kafka"
19+
)
20+
21+
const (
22+
// Consumer fields
23+
rmoiFieldRackID = "rack_id"
24+
rmoiFieldFetchMaxBytes = "fetch_max_bytes"
25+
rmoiFieldFetchMinBytes = "fetch_min_bytes"
26+
rmoiFieldFetchMaxPartitionBytes = "fetch_max_partition_bytes"
27+
)
28+
29+
func redpandaInputConfig() *service.ConfigSpec {
30+
return service.NewConfigSpec().
31+
Beta().
32+
Categories("Services").
33+
Summary(`Redpanda Migrator consumer group offsets output using the https://github.com/twmb/franz-go[Franz Kafka client library^].`).
34+
Description(`
35+
TODO: Description
36+
37+
== Metadata
38+
39+
This input adds the following metadata fields to each message:
40+
41+
` + "```text" + `
42+
- kafka_key
43+
- kafka_topic
44+
- kafka_partition
45+
- kafka_offset
46+
- kafka_timestamp_unix
47+
- kafka_timestamp_ms
48+
- kafka_tombstone_message
49+
- kafka_offset_metadata
50+
` + "```" + `
51+
`).
52+
Fields(redpandaInputConfigFields()...)
53+
}
54+
55+
func redpandaInputConfigFields() []*service.ConfigField {
56+
return slices.Concat(
57+
kafka.FranzConnectionFields(),
58+
[]*service.ConfigField{
59+
service.NewStringField(rmoiFieldRackID).
60+
Description("A rack specifies where the client is physically located and changes fetch requests to consume from the closest replica as opposed to the leader replica.").
61+
Default("").
62+
Advanced(),
63+
service.NewStringField(rmoiFieldFetchMaxBytes).
64+
Description("Sets the maximum amount of bytes a broker will try to send during a fetch. Note that brokers may not obey this limit if it has records larger than this limit. This is the equivalent to the Java fetch.max.bytes setting.").
65+
Advanced().
66+
Default("50MiB"),
67+
service.NewStringField(rmoiFieldFetchMinBytes).
68+
Description("Sets the minimum amount of bytes a broker will try to send during a fetch. This is the equivalent to the Java fetch.min.bytes setting.").
69+
Advanced().
70+
Default("1B"),
71+
service.NewStringField(rmoiFieldFetchMaxPartitionBytes).
72+
Description("Sets the maximum amount of bytes that will be consumed for a single partition in a fetch request. Note that if a single batch is larger than this number, that batch will still be returned so the client can make progress. This is the equivalent to the Java fetch.max.partition.bytes setting.").
73+
Advanced().
74+
Default("1MiB"),
75+
},
76+
kafka.FranzReaderOrderedConfigFields(),
77+
[]*service.ConfigField{
78+
service.NewAutoRetryNacksToggleField(),
79+
},
80+
)
81+
}
82+
83+
func init() {
84+
err := service.RegisterBatchInput("redpanda_migrator_offsets", redpandaInputConfig(),
85+
func(conf *service.ParsedConfig, mgr *service.Resources) (service.BatchInput, error) {
86+
tmpOpts, err := kafka.FranzConnectionOptsFromConfig(conf, mgr.Logger())
87+
if err != nil {
88+
return nil, err
89+
}
90+
clientOpts := append([]kgo.Opt{}, tmpOpts...)
91+
92+
d := kafka.FranzConsumerDetails{}
93+
94+
if d.RackID, err = conf.FieldString(rmoiFieldRackID); err != nil {
95+
return nil, err
96+
}
97+
98+
d.Topics = []string{`__consumer_offsets`}
99+
100+
if d.FetchMaxBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxBytes, conf); err != nil {
101+
return nil, err
102+
}
103+
if d.FetchMinBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMinBytes, conf); err != nil {
104+
return nil, err
105+
}
106+
if d.FetchMaxPartitionBytes, err = kafka.BytesFromStrFieldAsInt32(rmoiFieldFetchMaxPartitionBytes, conf); err != nil {
107+
return nil, err
108+
}
109+
110+
clientOpts = append(clientOpts, d.FranzOpts()...)
111+
112+
rdr, err := kafka.NewFranzReaderOrderedFromConfig(conf, mgr, func() ([]kgo.Opt, error) {
113+
return clientOpts, nil
114+
}, func(record *kgo.Record) *service.Message {
115+
key := kmsg.NewOffsetCommitKey()
116+
// Check the version to ensure that we process only offset commit keys
117+
if err := key.ReadFrom(record.Key); err != nil || (key.Version != 0 && key.Version != 1) {
118+
return nil
119+
}
120+
121+
offsetCommitValue := kmsg.NewOffsetCommitValue()
122+
if err := offsetCommitValue.ReadFrom(record.Value); err != nil {
123+
// Omit records we can't decode
124+
return nil
125+
}
126+
127+
msg := service.NewMessage(record.Value)
128+
msg.MetaSetMut("kafka_key", record.Key)
129+
msg.MetaSetMut("kafka_topic", record.Topic)
130+
msg.MetaSetMut("kafka_partition", int(record.Partition))
131+
msg.MetaSetMut("kafka_offset", int(record.Offset))
132+
msg.MetaSetMut("kafka_timestamp_unix", record.Timestamp.Unix())
133+
msg.MetaSetMut("kafka_timestamp_ms", record.Timestamp.UnixMilli())
134+
msg.MetaSetMut("kafka_tombstone_message", record.Value == nil)
135+
msg.MetaSetMut("kafka_offset_metadata", offsetCommitValue.Metadata)
136+
137+
return msg
138+
})
139+
if err != nil {
140+
return nil, err
141+
}
142+
143+
return service.AutoRetryNacksBatchedToggled(conf, rdr)
144+
})
145+
if err != nil {
146+
panic(err)
147+
}
148+
}

internal/impl/kafka/enterprise/redpanda_migrator_offsets_output.go

+38-14
Original file line numberDiff line numberDiff line change
@@ -27,8 +27,9 @@ import (
2727
)
2828

2929
const (
30-
rmooFieldMaxInFlight = "max_in_flight"
31-
rmooFieldKafkaKey = "kafka_key"
30+
rmooFieldKafkaKey = "kafka_key"
31+
rmooFieldOffsetMetadata = "offset_metadata"
32+
rmooFieldMaxInFlight = "max_in_flight"
3233
)
3334

3435
func redpandaMigratorOffsetsOutputConfig() *service.ConfigSpec {
@@ -49,6 +50,9 @@ func RedpandaMigratorOffsetsOutputConfigFields() []*service.ConfigField {
4950
[]*service.ConfigField{
5051
service.NewInterpolatedStringField(rmooFieldKafkaKey).
5152
Description("Kafka key.").Default("${! @kafka_key }"),
53+
service.NewInterpolatedStringField(rmooFieldOffsetMetadata).
54+
Description("The offset metadata value.").
55+
Default(`${! @kafka_offset_metadata }`),
5256
service.NewIntField(rmooFieldMaxInFlight).
5357
Description("The maximum number of batches to be sending in parallel at any given time.").
5458
Default(1),
@@ -80,10 +84,11 @@ func init() {
8084

8185
// RedpandaMigratorOffsetsWriter implements a Redpanda Migrator offsets writer using the franz-go library.
8286
type RedpandaMigratorOffsetsWriter struct {
83-
clientDetails *kafka.FranzConnectionDetails
84-
clientOpts []kgo.Opt
85-
kafkaKey *service.InterpolatedString
86-
backoffCtor func() backoff.BackOff
87+
clientDetails *kafka.FranzConnectionDetails
88+
clientOpts []kgo.Opt
89+
kafkaKey *service.InterpolatedString
90+
offsetMetadata *service.InterpolatedString
91+
backoffCtor func() backoff.BackOff
8792

8893
connMut sync.Mutex
8994
client *kadm.Client
@@ -106,6 +111,10 @@ func NewRedpandaMigratorOffsetsWriterFromConfig(conf *service.ParsedConfig, mgr
106111
return nil, err
107112
}
108113

114+
if w.offsetMetadata, err = conf.FieldInterpolatedString(rmooFieldOffsetMetadata); err != nil {
115+
return nil, err
116+
}
117+
109118
if w.clientOpts, err = kafka.FranzProducerLimitsOptsFromConfig(conf); err != nil {
110119
return nil, err
111120
}
@@ -167,7 +176,6 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
167176

168177
var kafkaKey []byte
169178
var err error
170-
// TODO: The `kafka_key` metadata field is cast from `[]byte` to string in the `kafka_franz` input, which is wrong.
171179
if kafkaKey, err = w.kafkaKey.TryBytes(msg); err != nil {
172180
return fmt.Errorf("failed to extract kafka key: %w", err)
173181
}
@@ -183,13 +191,20 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
183191
return fmt.Errorf("failed to get message bytes: %s", err)
184192
}
185193

186-
val := kmsg.NewOffsetCommitValue()
187-
if err := val.ReadFrom(msgBytes); err != nil {
194+
offsetCommitValue := kmsg.NewOffsetCommitValue()
195+
if err := offsetCommitValue.ReadFrom(msgBytes); err != nil {
188196
return fmt.Errorf("failed to decode offset commit value: %s", err)
189197
}
190198

199+
var offsetMetadata string
200+
if w.offsetMetadata != nil {
201+
if offsetMetadata, err = w.offsetMetadata.TryString(msg); err != nil {
202+
return fmt.Errorf("failed to extract offset metadata: %w", err)
203+
}
204+
}
205+
191206
updateConsumerOffsets := func() error {
192-
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, val.CommitTimestamp, key.Topic)
207+
listedOffsets, err := w.client.ListOffsetsAfterMilli(ctx, offsetCommitValue.CommitTimestamp, key.Topic)
193208
if err != nil {
194209
return fmt.Errorf("failed to translate consumer offsets: %s", err)
195210
}
@@ -198,11 +213,20 @@ func (w *RedpandaMigratorOffsetsWriter) Write(ctx context.Context, msg *service.
198213
return fmt.Errorf("listed offsets returned and error: %s", err)
199214
}
200215

201-
// TODO: Add metadata to offsets!
202216
offsets := listedOffsets.Offsets()
203-
offsets.KeepFunc(func(o kadm.Offset) bool {
204-
return o.Partition == key.Partition
205-
})
217+
// Logic extracted from offsets.KeepFunc() and adjusted to set the metadata.
218+
for topic, partitionOffsets := range offsets {
219+
for partition, offset := range partitionOffsets {
220+
if offset.Partition != key.Partition {
221+
delete(partitionOffsets, partition)
222+
}
223+
offset.Metadata = offsetMetadata
224+
partitionOffsets[partition] = offset
225+
}
226+
if len(partitionOffsets) == 0 {
227+
delete(offsets, topic)
228+
}
229+
}
206230

207231
offsetResponses, err := w.client.CommitOffsets(ctx, key.Group, offsets)
208232
if err != nil {

internal/impl/kafka/franz_reader.go

+5-4
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,8 @@ func bytesFromStrField(name string, pConf *service.ParsedConfig) (uint64, error)
3636
return fieldAsBytes, nil
3737
}
3838

39-
func bytesFromStrFieldAsInt32(name string, pConf *service.ParsedConfig) (int32, error) {
39+
// BytesFromStrFieldAsInt32 attempts to parse string field containing a human-readable byte size
40+
func BytesFromStrFieldAsInt32(name string, pConf *service.ParsedConfig) (int32, error) {
4041
ui64, err := bytesFromStrField(name, pConf)
4142
if err != nil {
4243
return 0, err
@@ -161,13 +162,13 @@ func FranzConsumerDetailsFromConfig(conf *service.ParsedConfig) (*FranzConsumerD
161162
return nil, err
162163
}
163164

164-
if d.FetchMaxBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMaxBytes, conf); err != nil {
165+
if d.FetchMaxBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMaxBytes, conf); err != nil {
165166
return nil, err
166167
}
167-
if d.FetchMinBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMinBytes, conf); err != nil {
168+
if d.FetchMinBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMinBytes, conf); err != nil {
168169
return nil, err
169170
}
170-
if d.FetchMaxPartitionBytes, err = bytesFromStrFieldAsInt32(kfrFieldFetchMaxPartitionBytes, conf); err != nil {
171+
if d.FetchMaxPartitionBytes, err = BytesFromStrFieldAsInt32(kfrFieldFetchMaxPartitionBytes, conf); err != nil {
171172
return nil, err
172173
}
173174

0 commit comments

Comments
 (0)