Skip to content

Commit

Permalink
Bugfix: compatible schemas not properly handled during deserialization (
Browse files Browse the repository at this point in the history
#25)

Forward/Backwards compatible schemas used to produce/consume messages
were not being properly deserialized.
`Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_OldProducerCanBeConsumedByNewConsumer`
and
`Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_NewProducerCanBeConsumedByOldConsumer`
demonstrated bug. Schema registry aware avro formatter was updated to
reconcile the bug.

---------

Co-authored-by: stewartboyd119 <[email protected]>
  • Loading branch information
stewartboyd119 and stewartbzillow authored Feb 13, 2025
1 parent 753a8aa commit 62f383c
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 48 deletions.
5 changes: 3 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
# Directories containing independent Go modules.
MODULE_DIRS = .
GOLANGCI_VERSION=1.61.0
AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/[email protected]
GOLANGCI_VERSION=1.64.5
AVRO_CMD_PATH=github.com/hamba/avro/v2/cmd/[email protected]
SCHEMA_REGISTRY_DOMAIN=schema-registry.shared.zg-int.net:443


# Sets up kafka broker using docker compose
Expand Down
6 changes: 6 additions & 0 deletions changelog.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,12 @@ All notable changes to this project will be documented in this file.

This project adheres to Semantic Versioning.

## 2.1.1 (Feb 13, 2025)

1. Forward/Backwards compatible schemas used to produce/consume messages were not being properly deserialized.
`Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_OldProducerCanBeConsumedByNewConsumer` and `Test_SchemaRegistryReal_Avro_AutoRegisterSchemas_NewProducerCanBeConsumedByOldConsumer`
demonstrated bug. Schema registry aware avro formatter was updated to reconcile the bug.

## 2.1.0 (Jan 23, 2025)

1. Include `DisableTracePropagation` as a WriterOption
Expand Down
3 changes: 1 addition & 2 deletions example/producer_avro/event_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 1 addition & 2 deletions example/worker_avro/event_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

7 changes: 4 additions & 3 deletions formatter.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,13 +163,14 @@ func (f avroSchemaRegistryFormatter) unmarshal(req unmarshReq) error {
return fmt.Errorf("failed to parse schema : %w", err)
}
sc := avro.NewSchemaCompatibility()
resolvedSchema, err := sc.Resolve(dataSchema, targetSchema)

resolvedSchema, err := sc.Resolve(targetSchema, dataSchema)
if err != nil {
return fmt.Errorf("failed to get schema from payload: %w", err)
return fmt.Errorf("failed to reconcile producer/consumer schemas: %w", err)
}

err = avro.Unmarshal(resolvedSchema, req.data[5:], req.target)
if err != nil {

return fmt.Errorf("failed to deserialize to confluent schema registry avro type: %w", err)
}
return nil
Expand Down
16 changes: 9 additions & 7 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,21 +1,23 @@
module github.com/zillow/zkafka/v2

go 1.23
go 1.23.0

toolchain go1.23.1

require (
github.com/confluentinc/confluent-kafka-go/v2 v2.8.0
github.com/google/go-cmp v0.6.0
github.com/google/uuid v1.6.0
github.com/hamba/avro/v2 v2.27.0
github.com/hamba/avro/v2 v2.28.0
github.com/heetch/avro v0.4.6
github.com/sony/gobreaker v1.0.0
github.com/stretchr/testify v1.10.0
github.com/zillow/zfmt v1.0.1
go.opentelemetry.io/otel v1.33.0
go.opentelemetry.io/otel/trace v1.33.0
go.opentelemetry.io/otel v1.34.0
go.opentelemetry.io/otel/trace v1.34.0
go.uber.org/mock v0.5.0
golang.org/x/sync v0.10.0
google.golang.org/protobuf v1.36.2
golang.org/x/sync v0.11.0
google.golang.org/protobuf v1.36.5
)

require (
Expand All @@ -38,7 +40,7 @@ require (
github.com/santhosh-tekuri/jsonschema/v5 v5.3.1 // indirect
github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect
go.opentelemetry.io/auto/sdk v1.1.0 // indirect
go.opentelemetry.io/otel/metric v1.33.0 // indirect
go.opentelemetry.io/otel/metric v1.34.0 // indirect
google.golang.org/genproto v0.0.0-20250106144421-5f5ef82da422 // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)
28 changes: 14 additions & 14 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -194,8 +194,8 @@ github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWm
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0 h1:YBftPWNWd4WwGqtY2yeZL2ef8rHAxPBD8KFhJpmcqms=
github.com/grpc-ecosystem/grpc-gateway/v2 v2.16.0/go.mod h1:YN5jB8ie0yfIUg6VvR9Kz84aCaG7AsGZnLjhHbUqwPg=
github.com/hamba/avro/v2 v2.27.0 h1:IAM4lQ0VzUIKBuo4qlAiLKfqALSrFC+zi1iseTtbBKU=
github.com/hamba/avro/v2 v2.27.0/go.mod h1:jN209lopfllfrz7IGoZErlDz+AyUJ3vrBePQFZwYf5I=
github.com/hamba/avro/v2 v2.28.0 h1:E8J5D27biyAulWKNiEBhV85QPc9xRMCUCGJewS0KYCE=
github.com/hamba/avro/v2 v2.28.0/go.mod h1:9TVrlt1cG1kkTUtm9u2eO5Qb7rZXlYzoKqPt8TSH+TA=
github.com/hashicorp/errwrap v1.1.0 h1:OxrOeh75EUXMY8TBjag2fzXGZ40LB6IKw45YeGUDY2I=
github.com/hashicorp/errwrap v1.1.0/go.mod h1:YH+1FKiLXxHSkmPseP+kNlulaMuP3n2brvKWEqk/Jc4=
github.com/hashicorp/go-cleanhttp v0.5.2 h1:035FKYIWjmULyFRBKPs8TBQoi0x6d9G4xc9neXJWAZQ=
Expand Down Expand Up @@ -238,8 +238,8 @@ github.com/json-iterator/go v1.1.12 h1:PV8peI4a0ysnczrg+LtxykD8LfKY9ML6u2jnxaEnr
github.com/json-iterator/go v1.1.12/go.mod h1:e30LSqwooZae/UwlEbR2852Gd8hjQvJoHmT4TnhNGBo=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51 h1:Z9n2FFNUXsshfwJMBgNA0RU6/i7WVaAegv3PtuIHPMs=
github.com/kballard/go-shellquote v0.0.0-20180428030007-95032a82bc51/go.mod h1:CzGEWj7cYgsdH8dAjBGEr58BoE7ScuLd+fwFZ44+/x8=
github.com/klauspost/compress v1.17.10 h1:oXAz+Vh0PMUvJczoi+flxpnBEPxoER1IaAnU/NMPtT0=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.17.11 h1:In6xLpyWOi1+C7tXUUWv2ot1QvBjxevKAaI6IXrJmUc=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down Expand Up @@ -411,8 +411,8 @@ go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.
go.opentelemetry.io/contrib/instrumentation/net/http/httptrace/otelhttptrace v0.46.1/go.mod h1:GnOaBaFQ2we3b9AGWJpsBa7v1S5RlQzlC3O7dRMxZhM=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0 h1:jq9TW8u3so/bN+JPT166wjOI6/vQPF6Xe7nMNIltagk=
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.49.0/go.mod h1:p8pYQP+m5XfbZm9fxtSKAbM6oIllS7s2AfxrChvc7iw=
go.opentelemetry.io/otel v1.33.0 h1:/FerN9bax5LoK51X/sI0SVYrjSE0/yUL7DpxW4K3FWw=
go.opentelemetry.io/otel v1.33.0/go.mod h1:SUUkR6csvUQl+yjReHu5uM3EtVV7MBm5FHKRlNx4I8I=
go.opentelemetry.io/otel v1.34.0 h1:zRLXxLCgL1WyKsPVrgbSdMN4c0FMkDAskSTQP+0hdUY=
go.opentelemetry.io/otel v1.34.0/go.mod h1:OWFPOQ+h4G8xpyjgqo4SxJYdDQ/qmRH+wivy7zzx9oI=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0 h1:ZtfnDL+tUrs1F0Pzfwbg2d59Gru9NCH3bgSHBM6LDwU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric v0.42.0/go.mod h1:hG4Fj/y8TR/tlEDREo8tWstl9fO9gcFkn4xrx0Io8xU=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v0.42.0 h1:NmnYCiR0qNufkldjVvyQfZTHSdzeHoZ41zggMsdMcLM=
Expand All @@ -425,14 +425,14 @@ go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0 h1:tIqhe
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.21.0/go.mod h1:nUeKExfxAQVbiVFn32YXpXZZHZ61Cc3s3Rn1pDBGAb0=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0 h1:digkEZCJWobwBqMwC0cwCq8/wkkRy/OowZg5OArWZrM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp v1.21.0/go.mod h1:/OpE/y70qVkndM0TrxT4KBoN3RsFZP0QaofcfYrj76I=
go.opentelemetry.io/otel/metric v1.33.0 h1:r+JOocAyeRVXD8lZpjdQjzMadVZp2M4WmQ+5WtEnklQ=
go.opentelemetry.io/otel/metric v1.33.0/go.mod h1:L9+Fyctbp6HFTddIxClbQkjtubW6O9QS3Ann/M82u6M=
go.opentelemetry.io/otel/metric v1.34.0 h1:+eTR3U0MyfWjRDhmFMxe2SsW64QrZ84AOhvqS7Y+PoQ=
go.opentelemetry.io/otel/metric v1.34.0/go.mod h1:CEDrp0fy2D0MvkXE+dPV7cMi8tWZwX3dmaIhwPOaqHE=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk/metric v1.21.0 h1:smhI5oD714d6jHE6Tie36fPx4WDFIg+Y6RfAY4ICcR0=
go.opentelemetry.io/otel/sdk/metric v1.21.0/go.mod h1:FJ8RAsoPGv/wYMgBdUJXOm+6pzFY3YdljnXtv1SBE8Q=
go.opentelemetry.io/otel/trace v1.33.0 h1:cCJuF7LRjUFso9LPnEAHJDB2pqzp+hbO8eu1qqW2d/s=
go.opentelemetry.io/otel/trace v1.33.0/go.mod h1:uIcdVUZMpTAmz0tI1z04GoVSezK37CbGV4fr1f2nBck=
go.opentelemetry.io/otel/trace v1.34.0 h1:+ouXS2V8Rd4hp4580a8q23bg0azF2nI8cqLYnC8mh/k=
go.opentelemetry.io/otel/trace v1.34.0/go.mod h1:Svm7lSjQD7kG7KJ/MUHPVXSDGz2OX4h0M2jHBhmSfRE=
go.opentelemetry.io/proto/otlp v1.0.0 h1:T0TX0tmXU8a3CbNXzEKGeU5mIVOdf0oykP+u2lIVU/I=
go.opentelemetry.io/proto/otlp v1.0.0/go.mod h1:Sy6pihPLfYHkr3NkUbEhGHFhINUSI/v80hjKIs5JXpM=
go.uber.org/mock v0.5.0 h1:KAMbZvZPyBPWgD14IrIQ38QCyjwpvVVV6K/bHl1IwQU=
Expand All @@ -445,8 +445,8 @@ golang.org/x/net v0.33.0 h1:74SYHlV8BIgHIFC/LrYkOGIwL19eTYXQ5wc6TBuO36I=
golang.org/x/net v0.33.0/go.mod h1:HXLR5J+9DxmrqMwG9qjGCxZ+zKXxBru04zlTvWlWuN4=
golang.org/x/oauth2 v0.18.0 h1:09qnuIAgzdx1XplqJvW6CQqMCtGZykZWcXzPMPUusvI=
golang.org/x/oauth2 v0.18.0/go.mod h1:Wf7knwG0MPoWIMMBgFlEaSUDaKskp0dCfrlJRJXbBi8=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sync v0.11.0 h1:GGz8+XQP4FvTTrjZPzNKTMFtSXH80RAzG+5ghFPgK9w=
golang.org/x/sync v0.11.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.28.0 h1:Fksou7UEQUWlKvIdsqzJmUmCX3cZuD2+P3XyyzwMhlA=
golang.org/x/sys v0.28.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/term v0.24.0 h1:Mh5cbb+Zk2hqqXNO7S1iTjEphVL+jb8ZWaqh/g+JWkM=
Expand All @@ -467,8 +467,8 @@ google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d h1:
google.golang.org/genproto/googleapis/rpc v0.0.0-20250102185135-69823020774d/go.mod h1:3ENsm/5D1mzDyhpzeRi1NR784I0BcofWBoSc5QqqMK4=
google.golang.org/grpc v1.67.3 h1:OgPcDAFKHnH8X3O4WcO4XUc8GRDeKsKReqbQtiCj7N8=
google.golang.org/grpc v1.67.3/go.mod h1:YGaHCc6Oap+FzBJTZLBzkGSYt/cvGPFTPxkn7QfSU8s=
google.golang.org/protobuf v1.36.2 h1:R8FeyR1/eLmkutZOM5CWghmo5itiG9z0ktFlTVLuTmU=
google.golang.org/protobuf v1.36.2/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
google.golang.org/protobuf v1.36.5 h1:tPhr+woSbjfYvY6/GPufUoYizxw1cF/yFoxJ2fmpwlM=
google.golang.org/protobuf v1.36.5/go.mod h1:9fA7Ob0pmnwhb644+1+CVWFRbNajQ6iRojtC/QF5bRE=
gopkg.in/cenkalti/backoff.v1 v1.1.0 h1:Arh75ttbsvlpVA7WtVpH4u9h6Zl46xuptxqLxPiSo4Y=
gopkg.in/cenkalti/backoff.v1 v1.1.0/go.mod h1:J6Vskwqd+OMVJl8C33mmtxTBs2gyzfv7UDAkHu8BrjI=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
Expand Down
2 changes: 2 additions & 0 deletions heap.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ func (h *offsetHeap) Pop() kafka.TopicPartition {
if len(h.data) == 0 {
panic("popped empty heap")
}
//nolint:errcheck // access control guarantees type is TopicPartition
return heap.Pop(&h.data).(kafka.TopicPartition)
}

Expand Down Expand Up @@ -56,6 +57,7 @@ func (h _offsetHeap) Less(i, j int) bool { return h[i].Offset < h[j].Offset }
func (h _offsetHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }

func (h *_offsetHeap) Push(x any) {
//nolint:errcheck // access control guarantees type is TopicPartition
*h = append(*h, x.(kafka.TopicPartition))
}

Expand Down
15 changes: 10 additions & 5 deletions test/evolution/avro1/schema_1_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

26 changes: 20 additions & 6 deletions test/evolution/avro1x/schema_1_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

6 changes: 3 additions & 3 deletions test/evolution/avro2/schema_2_gen.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions test/evolution/schema_1.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,26 @@
],
"default": "created"
}
},
{
"name": "interactiveContent",
"type": [
"null",
{
"type": "array",
"items": {
"type": "record",
"name": "InteractiveContentRecord",
"fields": [
{
"name": "url",
"type": "string"
}
]
}
}
],
"default": null
}
]
}
9 changes: 9 additions & 0 deletions test/evolution/schema_2.avsc
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,21 @@
{
"name": "url",
"type": "string"
},
{
"name": "isImx",
"type": [
"null",
"boolean"
],
"default": null
}
]
}
}
],
"default": null
}

]
}
Loading

0 comments on commit 62f383c

Please sign in to comment.