Skip to content
Closed
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
36 changes: 36 additions & 0 deletions common/component/kafka/kafka_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,6 +194,42 @@ func TestDeserializeValue(t *testing.T) {
_, err := kInv.DeserializeValue(&msg, handlerConfig)
require.Error(t, err, "schema registry details not set")
})

t.Run("reproducing intermittent issue with union types due to codec state mutation", func(t *testing.T) {
// Arrange
testSchemaUnion := `["null", "long"]`

// In happy path, codec is initialized and NativeFromBinary is called first, which sets the states of the codec
codecCard1, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion)
require.NoError(t, err)

datum1, _, err := codecCard1.NativeFromBinary([]byte{0x02, 0x06})
require.NoError(t, err)

// As expected, the datum is a long with value 3
require.Equal(t, int64(3), datum1.(map[string]any)["long"])

// Reproducing the error when NativeFromTextual is called before NativeFromBinary, which changes the states of the codec
codecCard2, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion)
require.NoError(t, err)

// Trigger textual path that mutates states
_, _, err = codecCard2.NativeFromTextual([]byte("1"))

// Binary for union index 1 (long) with value 3: 0x02 0x06
datum, _, err := codecCard2.NativeFromBinary([]byte{0x02, 0x06})
require.NoError(t, err)

// Prior to bug fix, the datum would be returned as a {"null", 3} but should return '{"long":3}'!
require.Nil(t, datum.(map[string]any)["null"])
require.Equal(t, int64(3), datum.(map[string]any)["long"])

// As a result, next call to TextualFromNative would fail with "Cannot encode textual union: cannot encode textual null: expected: Go nil; received: int64"
act, err := codecCard2.TextualFromNative(nil, datum)
require.NoError(t, err)
require.Equal(t, []byte("{\"long\":3}"), act)

})
}

func formatByteRecord(schemaID int, valueBytes []byte) []byte {
Expand Down