@@ -20,7 +20,9 @@ import (
2020
2121 "github.com/conduitio/conduit-connector-postgres/source/logrepl/internal"
2222 "github.com/conduitio/conduit-connector-postgres/source/position"
23+ "github.com/conduitio/conduit-connector-postgres/source/schema"
2324 sdk "github.com/conduitio/conduit-connector-sdk"
25+ "github.com/hamba/avro/v2"
2426 "github.com/jackc/pglogrepl"
2527)
2628
@@ -31,17 +33,23 @@ type CDCHandler struct {
3133 relationSet * internal.RelationSet
3234 out chan <- sdk.Record
3335 lastTXLSN pglogrepl.LSN
36+
37+ relAvroSchema map [string ]avro.Schema
38+ withAvroSchema bool
3439}
3540
3641func NewCDCHandler (
3742 rs * internal.RelationSet ,
3843 tableKeys map [string ]string ,
44+ withAvroSchema bool ,
3945 out chan <- sdk.Record ,
4046) * CDCHandler {
4147 return & CDCHandler {
42- tableKeys : tableKeys ,
43- relationSet : rs ,
44- out : out ,
48+ tableKeys : tableKeys ,
49+ relationSet : rs ,
50+ out : out ,
51+ withAvroSchema : withAvroSchema ,
52+ relAvroSchema : make (map [string ]avro.Schema ),
4553 }
4654}
4755
@@ -100,6 +108,10 @@ func (h *CDCHandler) handleInsert(
100108 return fmt .Errorf ("failed to decode new values: %w" , err )
101109 }
102110
111+ if err := h .updateAvroSchema (rel , msg .Tuple ); err != nil {
112+ return fmt .Errorf ("failed to update avro schema: %w" , err )
113+ }
114+
103115 rec := sdk .Util .Source .NewRecordCreate (
104116 h .buildPosition (lsn ),
105117 h .buildRecordMetadata (rel ),
@@ -127,6 +139,10 @@ func (h *CDCHandler) handleUpdate(
127139 return fmt .Errorf ("failed to decode new values: %w" , err )
128140 }
129141
142+ if err := h .updateAvroSchema (rel , msg .NewTuple ); err != nil {
143+ return fmt .Errorf ("failed to update avro schema: %w" , err )
144+ }
145+
130146 oldValues , err := h .relationSet .Values (msg .RelationID , msg .OldTuple )
131147 if err != nil {
132148 // this is not a critical error, old values are optional, just log it
@@ -180,10 +196,16 @@ func (h *CDCHandler) send(ctx context.Context, rec sdk.Record) error {
180196 }
181197}
182198
183- func (h * CDCHandler ) buildRecordMetadata (relation * pglogrepl.RelationMessage ) map [string ]string {
184- return map [string ]string {
185- sdk .MetadataCollection : relation .RelationName ,
199+ func (h * CDCHandler ) buildRecordMetadata (rel * pglogrepl.RelationMessage ) map [string ]string {
200+ m := map [string ]string {
201+ sdk .MetadataCollection : rel .RelationName ,
186202 }
203+
204+ if h .withAvroSchema {
205+ m [schema .AvroMetadataKey ] = h .relAvroSchema [rel .RelationName ].String ()
206+ }
207+
208+ return m
187209}
188210
189211// buildRecordKey takes the values from the message and extracts the key that
@@ -209,9 +231,27 @@ func (h *CDCHandler) buildRecordPayload(values map[string]any) sdk.Data {
209231 return sdk .StructuredData (values )
210232}
211233
234+ // buildPosition stores the LSN in position and converts it to bytes.
212235func (* CDCHandler ) buildPosition (lsn pglogrepl.LSN ) sdk.Position {
213236 return position.Position {
214237 Type : position .TypeCDC ,
215238 LastLSN : lsn .String (),
216239 }.ToSDKPosition ()
217240}
241+
242+ // updateAvroSchema generates and stores avro schema based on the relation's row,
243+ // when usage of avro schema is requested.
244+ func (h * CDCHandler ) updateAvroSchema (rel * pglogrepl.RelationMessage , row * pglogrepl.TupleData ) error {
245+ if ! h .withAvroSchema {
246+ return nil
247+ }
248+
249+ sch , err := schema .Avro .ExtractLogrepl (rel , row )
250+ if err != nil {
251+ return err
252+ }
253+
254+ h .relAvroSchema [rel .RelationName ] = sch
255+
256+ return nil
257+ }
0 commit comments