diff --git a/common/component/kafka/kafka_test.go b/common/component/kafka/kafka_test.go index e4f258195f..decb7c0ea5 100644 --- a/common/component/kafka/kafka_test.go +++ b/common/component/kafka/kafka_test.go @@ -194,6 +194,42 @@ func TestDeserializeValue(t *testing.T) { _, err := kInv.DeserializeValue(&msg, handlerConfig) require.Error(t, err, "schema registry details not set") }) + + t.Run("reproducing intermittent issue with union types due to codec state mutation", func(t *testing.T) { + // Arrange + testSchemaUnion := `["null", "long"]` + + // In happy path, codec is initialized and NativeFromBinary is called first, which sets the states of the codec + codecCard1, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion) + require.NoError(t, err) + + datum1, _, err := codecCard1.NativeFromBinary([]byte{0x02, 0x06}) + require.NoError(t, err) + + // As expected, the datum is a long with value 3 + require.Equal(t, int64(3), datum1.(map[string]any)["long"]) + + // Reproducing the error when NativeFromTextual is called before NativeFromBinary, which changes the states of the codec + codecCard2, err := goavro.NewCodecForStandardJSONFull(testSchemaUnion) + require.NoError(t, err) + + // Trigger textual path that mutates states + _, _, err = codecCard2.NativeFromTextual([]byte("1")) + + // Binary for union index 1 (long) with value 3: 0x02 0x06 + datum, _, err := codecCard2.NativeFromBinary([]byte{0x02, 0x06}) + require.NoError(t, err) + + // Prior to bug fix, the datum would be returned as a {"null", 3} but should return '{"long":3}'! + require.Nil(t, datum.(map[string]any)["null"]) + require.Equal(t, int64(3), datum.(map[string]any)["long"]) + + // As a result, next call to TextualFromNative would fail with "Cannot encode textual union: cannot encode textual null: expected: Go nil; received: int64" + act, err := codecCard2.TextualFromNative(nil, datum) + require.NoError(t, err) + require.Equal(t, []byte("{\"long\":3}"), act) + + }) } func formatByteRecord(schemaID int, valueBytes []byte) []byte {