From 62f383ce6d97b03a080c6ee95a8d574856cbef3d Mon Sep 17 00:00:00 2001 From: Stewart Boyd Date: Thu, 13 Feb 2025 15:16:48 -0800 Subject: [PATCH] Bugfix: compatible schemas not properly handled during deserialization (#25) Forward/Backwards compatible schemas used to produce/consume messages were not being properly deserialized. `Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_OldProducerCanBeConsumedByNewConsumer` and `Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_NewProducerCanBeConsumedByOldConsumer` demonstrated bug. Schema registry aware avro formatter was updated to reconcile the bug. --------- Co-authored-by: stewartboyd119 --- Makefile | 5 +- changelog.md | 6 + example/producer_avro/event_gen.go | 3 +- example/worker_avro/event_gen.go | 3 +- formatter.go | 7 +- go.mod | 16 ++- go.sum | 28 ++-- heap.go | 2 + test/evolution/avro1/schema_1_gen.go | 15 +- test/evolution/avro1x/schema_1_gen.go | 26 +++- test/evolution/avro2/schema_2_gen.go | 6 +- test/evolution/schema_1.avsc | 20 +++ test/evolution/schema_2.avsc | 9 ++ test/schema_registry_evo_test.go | 195 +++++++++++++++++++++++++- test/schema_registry_test.go | 1 - 15 files changed, 294 insertions(+), 48 deletions(-) diff --git a/Makefile b/Makefile index bb2a6f0..6967852 100644 --- a/Makefile +++ b/Makefile @@ -1,7 +1,8 @@ # Directories containing independent Go modules. MODULE_DIRS = . -GOLANGCI_VERSION=1.61.0 -AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/avrogen@v2.26.0 +GOLANGCI_VERSION=1.64.5 +AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/avrogen@v2.28.0 +SCHEMA_REGISTRY_DOMAIN=schema-registry.shared.zg-int.net:443 # Sets up kafka broker using docker compose diff --git a/changelog.md b/changelog.md index de7e84a..424f303 100644 --- a/changelog.md +++ b/changelog.md @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file. This project adheres to Semantic Versioning. +## 2.1.1 (Feb 13, 2025) + +1. Forward/Backwards compatible schemas used to produce/consume messages were not being properly deserialized. + `Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_OldProducerCanBeConsumedByNewConsumer` and `Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_NewProducerCanBeConsumedByOldConsumer` + demonstrated bug. Schema registry aware avro formatter was updated to reconcile the bug. + ## 2.1.0 (Jan 23, 2025) 1. Include `DisableTracePropagation` as a WriterOption diff --git a/example/producer_avro/event_gen.go b/example/producer_avro/event_gen.go index 50559e3..16be31f 100644 --- a/example/producer_avro/event_gen.go +++ b/example/producer_avro/event_gen.go @@ -1,6 +1,5 @@ -package main - // Code generated by avro/gen. DO NOT EDIT. +package main // DummyEvent is a generated struct. type DummyEvent struct { diff --git a/example/worker_avro/event_gen.go b/example/worker_avro/event_gen.go index a6fed57..50e3356 100644 --- a/example/worker_avro/event_gen.go +++ b/example/worker_avro/event_gen.go @@ -1,6 +1,5 @@ -package main - // Code generated by avro/gen. DO NOT EDIT. +package main // DummyEvent is a generated struct. type DummyEvent struct { diff --git a/formatter.go b/formatter.go index b8ba118..5c94494 100644 --- a/formatter.go +++ b/formatter.go @@ -163,13 +163,14 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error { return fmt.Errorf("failed to parse schema : %w", err) } sc := avro.NewSchemaCompatibility() - resolvedSchema, err := sc.Resolve(dataSchema, targetSchema) + + resolvedSchema, err := sc.Resolve(targetSchema, dataSchema) if err != nil { - return fmt.Errorf("failed to get schema from payload: %w", err) + return fmt.Errorf("failed to reconcile producer/consumer schemas: %w", err) } - err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target) if err != nil { + return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err) } return nil diff --git a/go.mod b/go.mod index abae141..83293f2 100644 --- a/go.mod +++ b/go.mod @@ -1,21 +1,23 @@ module github.com/zillow/zkafka/v2 -go 1.23 +go 1.23.0 + +toolchain go1.23.1 require ( github.com/confluentinc/confluent-kafka-go/v2 v2.8.0 github.com/google/go-cmp v0.6.0 github.com/google/uuid v1.6.0 - github.com/hamba/avro/v2 v2.27.0 + github.com/hamba/avro/v2 v2.28.0 github.com/heetch/avro v0.4.6 github.com/sony/gobreaker v1.0.0 github.com/stretchr/testify v1.10.0 github.com/zillow/zfmt v1.0.1 - go.opentelemetry.io/otel v1.33.0 - go.opentelemetry.io/otel/trace v1.33.0 + go.opentelemetry.io/otel v1.34.0 + go.opentelemetry.io/otel/trace v1.34.0 go.uber.org/mock v0.5.0 - golang.org/x/sync v0.10.0 - google.golang.org/protobuf v1.36.2 + golang.org/x/sync v0.11.0 + google.golang.org/protobuf v1.36.5 ) require ( @@ -38,7 +40,7 @@ require ( github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect go.opentelemetry.io/auto/sdk v1.1.0 // indirect - go.opentelemetry.io/otel/metric v1.33.0 // indirect + go.opentelemetry.io/otel/metric v1.34.0 // indirect google.golang.org/genproto v0.0.0-20250106144421-5f5ef82da422 // indirect gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 6711a8a..f0ba878 100644 --- a/go.sum +++ b/go.sum @@ -194,8 +194,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms= github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg= -github.com/hamba/avro/v2 v2.27.0 h1:IAM4lQ0VzUIKBuo4qlAiLKfqALSrFC+zi1iseTtbBKU= -github.com/hamba/avro/v2 v2.27.0/go.mod h1:jN209lopfllfrz7IGoZErlDz+AyUJ3vrBePQFZwYf5I= +github.com/hamba/avro/v2 v2.28.0 h1:E8J5D27biyAulWKNiEBhV85QPc9xRMCUCGJewS0KYCE= +github.com/hamba/avro/v2 v2.28.0/go.mod h1:9TVrlt1cG1kkTUtm9u2eO5Qb7rZXlYzoKqPt8TSH+TA= github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I= github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4= github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ= @@ -238,8 +238,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs= github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8= -github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0= -github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= @@ -411,8 +411,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0. go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1/go.mod h1:GnOaBaFQ2we3b9AGWJpsBa7v1S5RlQzlC3O7dRMxZhM= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk= go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw= -go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw= -go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I= +go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY= +go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU= go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM= @@ -425,14 +425,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqhe go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkEZCJWobwBqMwC0cwCq8/wkkRy/OowZg5OArWZrM= go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I= -go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ= -go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M= +go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ= +go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE= go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw= go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg= go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0= go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q= -go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s= -go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck= +go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k= +go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE= go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I= go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM= go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU= @@ -445,8 +445,8 @@ golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I= golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4= golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI= golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8= -golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= -golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= +golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w= +golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA= golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM= @@ -467,8 +467,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d h1: google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4= google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8= google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s= -google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU= -google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= +google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM= +google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE= gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y= gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/heap.go b/heap.go index 7743348..52eaac4 100644 --- a/heap.go +++ b/heap.go @@ -21,6 +21,7 @@ func (h *offsetHeap) Pop() kafka.TopicPartition { if len(h.data) == 0 { panic("popped empty heap") } + //nolint:errcheck // access control guarantees type is TopicPartition return heap.Pop(&h.data).(kafka.TopicPartition) } @@ -56,6 +57,7 @@ func (h _offsetHeap) Less(i, j int) bool { return h[i].Offset < h[j].Offset } func (h _offsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } func (h *_offsetHeap) Push(x any) { + //nolint:errcheck // access control guarantees type is TopicPartition *h = append(*h, x.(kafka.TopicPartition)) } diff --git a/test/evolution/avro1/schema_1_gen.go b/test/evolution/avro1/schema_1_gen.go index f26338b..ae434db 100644 --- a/test/evolution/avro1/schema_1_gen.go +++ b/test/evolution/avro1/schema_1_gen.go @@ -1,14 +1,19 @@ -package avro1 - // Code generated by avro/gen. DO NOT EDIT. +package avro1 import ( "time" ) +// InteractiveContentRecord is a generated struct. +type InteractiveContentRecord struct { + URL string `avro:"url"` +} + // Event is a generated struct. type Event struct { - ID string `avro:"id"` - DeliveredAtDateTimeUtc time.Time `avro:"deliveredAtDateTimeUtc"` - EventType string `avro:"eventType"` + ID string `avro:"id"` + DeliveredAtDateTimeUtc time.Time `avro:"deliveredAtDateTimeUtc"` + EventType string `avro:"eventType"` + InteractiveContent *[]InteractiveContentRecord `avro:"interactiveContent"` } diff --git a/test/evolution/avro1x/schema_1_gen.go b/test/evolution/avro1x/schema_1_gen.go index e97ec5f..708c6d6 100644 --- a/test/evolution/avro1x/schema_1_gen.go +++ b/test/evolution/avro1x/schema_1_gen.go @@ -4,21 +4,21 @@ package avro1 import ( "fmt" - "strconv" - "github.com/heetch/avro/avrotypegen" + "strconv" ) type Event struct { - Id string `json:"id"` - Deliveredatdatetimeutc int64 `json:"deliveredAtDateTimeUtc"` - Eventtype EventType `json:"eventType"` + Id string `json:"id"` + Deliveredatdatetimeutc int64 `json:"deliveredAtDateTimeUtc"` + Eventtype EventType `json:"eventType"` + Interactivecontent *[]InteractiveContentRecord `json:"interactiveContent"` } // AvroRecord implements the avro.AvroRecord interface. func (Event) AvroRecord() avrotypegen.RecordInfo { return avrotypegen.RecordInfo{ - Schema: `{"fields":[{"name":"id","type":"string"},{"name":"deliveredAtDateTimeUtc","type":{"logicalType":"timestamp-millis","type":"long"}},{"name":"eventType","type":{"default":"created","name":"EventType","symbols":["created","associated"],"type":"enum"}}],"name":"com.zillowgroup.Event","type":"record"}`, + Schema: `{"fields":[{"name":"id","type":"string"},{"name":"deliveredAtDateTimeUtc","type":{"logicalType":"timestamp-millis","type":"long"}},{"name":"eventType","type":{"default":"created","name":"EventType","symbols":["created","associated"],"type":"enum"}},{"default":null,"name":"interactiveContent","type":["null",{"items":{"fields":[{"name":"url","type":"string"}],"name":"InteractiveContentRecord","type":"record"},"type":"array"}]}],"name":"com.zillowgroup.Event","type":"record"}`, Required: []bool{ 0: true, 1: true, @@ -68,3 +68,17 @@ func (e *EventType) UnmarshalText(data []byte) error { } return fmt.Errorf("unknown value %q for EventType", data) } + +type InteractiveContentRecord struct { + Url string `json:"url"` +} + +// AvroRecord implements the avro.AvroRecord interface. +func (InteractiveContentRecord) AvroRecord() avrotypegen.RecordInfo { + return avrotypegen.RecordInfo{ + Schema: `{"fields":[{"name":"url","type":"string"}],"name":"com.zillowgroup.InteractiveContentRecord","type":"record"}`, + Required: []bool{ + 0: true, + }, + } +} diff --git a/test/evolution/avro2/schema_2_gen.go b/test/evolution/avro2/schema_2_gen.go index 3a3e6d8..7770c08 100644 --- a/test/evolution/avro2/schema_2_gen.go +++ b/test/evolution/avro2/schema_2_gen.go @@ -1,6 +1,5 @@ -package avro2 - // Code generated by avro/gen. DO NOT EDIT. +package avro2 import ( "time" @@ -8,7 +7,8 @@ import ( // InteractiveContentRecord is a generated struct. type InteractiveContentRecord struct { - URL string `avro:"url"` + URL string `avro:"url"` + IsImx *bool `avro:"isImx"` } // Event is a generated struct. diff --git a/test/evolution/schema_1.avsc b/test/evolution/schema_1.avsc index 9788647..1e765d3 100644 --- a/test/evolution/schema_1.avsc +++ b/test/evolution/schema_1.avsc @@ -25,6 +25,26 @@ ], "default": "created" } + }, + { + "name": "interactiveContent", + "type": [ + "null", + { + "type": "array", + "items": { + "type": "record", + "name": "InteractiveContentRecord", + "fields": [ + { + "name": "url", + "type": "string" + } + ] + } + } + ], + "default": null } ] } \ No newline at end of file diff --git a/test/evolution/schema_2.avsc b/test/evolution/schema_2.avsc index 1e765d3..10431cf 100644 --- a/test/evolution/schema_2.avsc +++ b/test/evolution/schema_2.avsc @@ -39,6 +39,14 @@ { "name": "url", "type": "string" + }, + { + "name": "isImx", + "type": [ + "null", + "boolean" + ], + "default": null } ] } @@ -46,5 +54,6 @@ ], "default": null } + ] } \ No newline at end of file diff --git a/test/schema_registry_evo_test.go b/test/schema_registry_evo_test.go index 28d2d9d..717be8e 100644 --- a/test/schema_registry_evo_test.go +++ b/test/schema_registry_evo_test.go @@ -86,10 +86,16 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC require.NoError(t, err) id := uuid.NewString() + u := "http://localhost:8081" evt1 := avro1.Event{ ID: id, DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), EventType: "created", + InteractiveContent: ptr([]avro1.InteractiveContentRecord{ + { + URL: u, + }, + }), } _, err = writer1.Write(ctx, &evt1) require.NoError(t, err) @@ -100,6 +106,12 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC ID: listingID2, DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), EventType: "created", + InteractiveContent: ptr([]avro2.InteractiveContentRecord{ + { + URL: u, + IsImx: ptr(true), + }, + }), } _, err = writer2.Write(ctx, &evt2) require.NoError(t, err) @@ -136,16 +148,193 @@ func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_BackwardCompatibleSchemasC receivedEvt2Schema1 := avro1.Event{} require.NoError(t, msg2.Decode(&receivedEvt2Schema1)) + expectedEvt2 := avro1.Event{ ID: evt2.ID, DeliveredAtDateTimeUtc: evt2.DeliveredAtDateTimeUtc, EventType: evt2.EventType, + InteractiveContent: ptr([]avro1.InteractiveContentRecord{ + { + URL: u, + }, + }), } assertEqual(t, expectedEvt2, receivedEvt2Schema1) +} - receivedEvt2Schema2 := avro2.Event{} - require.NoError(t, msg2.Decode(&receivedEvt2Schema2)) - assertEqual(t, evt2, receivedEvt2Schema2) +func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_OldProducerCanBeConsumedByNewConsumer(t *testing.T) { + checkShouldSkipTest(t, enableKafkaBrokerTest, enableSchemaRegistryTest) + + ctx := context.Background() + topic := "integration-test-topic-2" + uuid.NewString() + bootstrapServer := getBootstrap() + + createTopic(t, bootstrapServer, topic, 1) + t.Logf("Created topic: %s", topic) + + groupID := uuid.NewString() + + client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + defer func() { require.NoError(t, client.Close()) }() + + t.Log("Created writer with auto registered schemas") + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "http://localhost:8081", + Serialization: zkafka.SerializationConfig{ + AutoRegisterSchemas: true, + Schema: dummyEventSchema1, + }, + }, + }) + require.NoError(t, err) + + id := uuid.NewString() + + u := "http://localhost:8081" + evt1 := avro1.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "created", + InteractiveContent: ptr([]avro1.InteractiveContentRecord{ + { + URL: u, + }, + }), + } + _, err = writer1.Write(ctx, &evt1) + require.NoError(t, err) + + consumerTopicConfig := zkafka.ConsumerTopicConfig{ + ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "http://localhost:8081", + Deserialization: zkafka.DeserializationConfig{Schema: dummyEventSchema2}, + }, + GroupID: groupID, + AdditionalProps: map[string]any{ + "auto.offset.reset": "earliest", + }, + } + reader, err := client.Reader(ctx, consumerTopicConfig) + require.NoError(t, err) + + t.Log("Begin reading messages") + results, err := readMessages(reader, 1) + require.NoError(t, err) + + msg2 := <-results + t.Log("Close reader") + + require.NoError(t, reader.Close()) + + receivedEvt2Schema1 := avro2.Event{} + require.NoError(t, msg2.Decode(&receivedEvt2Schema1)) + + expectedEvt2 := avro2.Event{ + ID: evt1.ID, + DeliveredAtDateTimeUtc: evt1.DeliveredAtDateTimeUtc, + EventType: evt1.EventType, + InteractiveContent: ptr([]avro2.InteractiveContentRecord{ + { + URL: u, + }, + }), + } + assertEqual(t, expectedEvt2, receivedEvt2Schema1) +} + +func Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_NewProducerCanBeConsumedByOldConsumer(t *testing.T) { + checkShouldSkipTest(t, enableKafkaBrokerTest, enableSchemaRegistryTest) + + ctx := context.Background() + topic := "integration-test-topic-2" + uuid.NewString() + bootstrapServer := getBootstrap() + + createTopic(t, bootstrapServer, topic, 1) + t.Logf("Created topic: %s", topic) + + groupID := uuid.NewString() + + client := zkafka.NewClient(zkafka.Config{BootstrapServers: []string{bootstrapServer}}, zkafka.LoggerOption(stdLogger{})) + defer func() { require.NoError(t, client.Close()) }() + + t.Log("Created writer with auto registered schemas") + writer1, err := client.Writer(ctx, zkafka.ProducerTopicConfig{ + ClientID: fmt.Sprintf("writer-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "http://localhost:8081", + Serialization: zkafka.SerializationConfig{ + AutoRegisterSchemas: true, + Schema: dummyEventSchema2, + }, + }, + }) + require.NoError(t, err) + + id := uuid.NewString() + + u := "http://localhost:8081" + evt1 := avro2.Event{ + ID: id, + DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), + EventType: "created", + InteractiveContent: ptr([]avro2.InteractiveContentRecord{ + { + URL: u, + IsImx: ptr(true), + }, + }), + } + _, err = writer1.Write(ctx, &evt1) + require.NoError(t, err) + + consumerTopicConfig := zkafka.ConsumerTopicConfig{ + ClientID: fmt.Sprintf("reader-%s-%s", t.Name(), uuid.NewString()), + Topic: topic, + Formatter: zkafka.AvroSchemaRegistry, + SchemaRegistry: zkafka.SchemaRegistryConfig{ + URL: "http://localhost:8081", + Deserialization: zkafka.DeserializationConfig{Schema: dummyEventSchema1}, + }, + GroupID: groupID, + AdditionalProps: map[string]any{ + "auto.offset.reset": "earliest", + }, + } + reader, err := client.Reader(ctx, consumerTopicConfig) + require.NoError(t, err) + + t.Log("Begin reading messages") + results, err := readMessages(reader, 1) + require.NoError(t, err) + + msg2 := <-results + t.Log("Close reader") + + require.NoError(t, reader.Close()) + + receivedEvt2Schema1 := avro1.Event{} + require.NoError(t, msg2.Decode(&receivedEvt2Schema1)) + + expectedEvt2 := avro1.Event{ + ID: evt1.ID, + DeliveredAtDateTimeUtc: evt1.DeliveredAtDateTimeUtc, + EventType: evt1.EventType, + InteractiveContent: ptr([]avro1.InteractiveContentRecord{ + { + URL: u, + }, + }), + } + assertEqual(t, expectedEvt2, receivedEvt2Schema1) } func Test_SchemaRegistryReal_Proto_AutoRegisterSchemas_BackwardCompatibleSchemasCanBeRegisteredAndReadFrom(t *testing.T) { diff --git a/test/schema_registry_test.go b/test/schema_registry_test.go index 5033995..9c0c708 100644 --- a/test/schema_registry_test.go +++ b/test/schema_registry_test.go @@ -108,7 +108,6 @@ func Test_SchemaRegistry_Avro_AutoRegisterSchemas_RequiresSchemaSpecification(t DeliveredAtDateTimeUtc: time.Now().UTC().Truncate(time.Millisecond), EventType: "listingCreated", } - _, err = writer1.Write(ctx, evt1) require.ErrorContains(t, err, "avro schema is required for schema registry formatter") }