Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 12 additions & 3 deletions codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,14 @@ type CodecOption struct {
// When true, the string literal "null" in textual Avro data will be coerced to Go's nil.
// Primarily used to handle edge cases where some Avro implementations allow string representations of null.
EnableStringNull bool

// EnableDecimalBinarySpecCompliantEncoding controls whether decimal values use
// Avro 1.10.2 spec-compliant encoding. When true:
// - Binary encoding uses two's-complement representation of the unscaled integer
// - JSON textual encoding uses human-readable decimal strings like "40.20"
// When false (default), legacy encoding is used for backwards compatibility.
// Default: false (legacy encoding for backwards compatibility)
EnableDecimalBinarySpecCompliantEncoding bool
}

// Codec supports decoding binary and text Avro data to Go native data types,
Expand Down Expand Up @@ -82,7 +90,8 @@ type codecBuilder struct {
// DefaultCodecOption returns a CodecOption with recommended default settings.
func DefaultCodecOption() *CodecOption {
return &CodecOption{
EnableStringNull: true,
EnableStringNull: true,
EnableDecimalBinarySpecCompliantEncoding: false,
}
}

Expand Down Expand Up @@ -739,9 +748,9 @@ func buildCodecForTypeDescribedByString(st map[string]*Codec, enclosingNamespace
case "record":
return makeRecordCodec(st, enclosingNamespace, schemaMap, cb)
case "bytes.decimal":
return makeDecimalBytesCodec(st, enclosingNamespace, schemaMap)
return makeDecimalBytesCodec(st, enclosingNamespace, schemaMap, cb)
case "fixed.decimal":
return makeDecimalFixedCodec(st, enclosingNamespace, schemaMap)
return makeDecimalFixedCodec(st, enclosingNamespace, schemaMap, cb)
case "string.validated-string":
return makeValidatedStringCodec(st, enclosingNamespace, schemaMap)
default:
Expand Down
84 changes: 70 additions & 14 deletions logical_type.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func precisionAndScaleFromSchemaMap(schemaMap map[string]interface{}) (int, int,

var one = big.NewInt(1)

func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) {
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
if err != nil {
return nil, err
Expand All @@ -275,14 +275,27 @@ func makeDecimalBytesCodec(st map[string]*Codec, enclosingNamespace string, sche
decimalSearchType := fmt.Sprintf("bytes.decimal.%d.%d", precision, scale)
st[decimalSearchType] = c

c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, precision, scale)
c.nativeFromTextual = nativeFromDecimalBytes(bytesNativeFromTextual, precision, scale)
// Check if spec-compliant encoding is enabled
specCompliant := cb != nil && cb.option != nil && cb.option.EnableDecimalBinarySpecCompliantEncoding

if specCompliant {
// Spec-compliant encoding: two's complement binary, human-readable textual
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
c.textualFromNative = decimalTextualFromNative(scale)
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, scale)
c.nativeFromTextual = nativeFromDecimalTextual()
} else {
// Legacy encoding (default): for backwards compatibility
c.binaryFromNative = decimalBytesFromNative(bytesBinaryFromNative, toSignedBytes, precision, scale)
c.textualFromNative = decimalBytesFromNative(bytesTextualFromNative, toSignedBytes, precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(bytesNativeFromBinary, scale)
c.nativeFromTextual = nativeFromDecimalBytes(bytesNativeFromTextual, scale)
}
return c, nil
}

func nativeFromDecimalBytes(fn toNativeFn, precision, scale int) toNativeFn {
// nativeFromDecimalBytes decodes bytes to *big.Rat using two's-complement representation.
func nativeFromDecimalBytes(fn toNativeFn, scale int) toNativeFn {
return func(bytes []byte) (interface{}, []byte, error) {
d, b, err := fn(bytes)
if err != nil {
Expand All @@ -292,15 +305,16 @@ func nativeFromDecimalBytes(fn toNativeFn, precision, scale int) toNativeFn {
if !ok {
return nil, bytes, fmt.Errorf("cannot transform to native decimal, expected []byte, received %T", d)
}

// Two's-complement decoding
num := big.NewInt(0)
fromSignedBytes(num, bs)
denom := new(big.Int).Exp(big.NewInt(10), big.NewInt(int64(scale)), nil)
r := new(big.Rat).SetFrac(num, denom)
return r, b, nil
return new(big.Rat).SetFrac(num, denom), b, nil
}
}

func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, precision, scale int) fromNativeFn {
func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, _, scale int) fromNativeFn {
return func(b []byte, d interface{}) ([]byte, error) {
r, ok := d.(*big.Rat)
if !ok {
Expand All @@ -320,7 +334,36 @@ func decimalBytesFromNative(fromNativeFn fromNativeFn, toBytesFn toBytesFn, prec
}
}

func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}) (*Codec, error) {
// decimalTextualFromNative encodes a *big.Rat to a JSON string representation
// like "40.20" according to the Avro 1.10.2 spec.
func decimalTextualFromNative(scale int) fromNativeFn {
return func(b []byte, d interface{}) ([]byte, error) {
r, ok := d.(*big.Rat)
if !ok {
return nil, fmt.Errorf("cannot transform to textual decimal, expected *big.Rat, received %T", d)
}
// Format as decimal string with proper scale
return stringTextualFromNative(b, r.FloatString(scale))
}
}

// nativeFromDecimalTextual decodes a JSON string like "40.20" to a *big.Rat
// according to the Avro 1.10.2 spec.
func nativeFromDecimalTextual() toNativeFn {
return func(buf []byte) (interface{}, []byte, error) {
s, remaining, err := stringNativeFromTextual(buf)
if err != nil {
return nil, nil, fmt.Errorf("cannot decode textual decimal: %s", err)
}
r := new(big.Rat)
if _, ok := r.SetString(s.(string)); !ok {
return nil, nil, fmt.Errorf("cannot parse decimal string: %q", s)
}
return r, remaining, nil
}
}

func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, schemaMap map[string]interface{}, cb *codecBuilder) (*Codec, error) {
precision, scale, err := precisionAndScaleFromSchemaMap(schemaMap)
if err != nil {
return nil, err
Expand All @@ -336,10 +379,23 @@ func makeDecimalFixedCodec(st map[string]*Codec, enclosingNamespace string, sche
if err != nil {
return nil, err
}
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, precision, scale)
c.nativeFromTextual = nativeFromDecimalBytes(c.nativeFromTextual, precision, scale)

// Check if spec-compliant encoding is enabled
specCompliant := cb != nil && cb.option != nil && cb.option.EnableDecimalBinarySpecCompliantEncoding

if specCompliant {
// Spec-compliant encoding: two's complement binary, human-readable textual
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
c.textualFromNative = decimalTextualFromNative(scale)
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, scale)
c.nativeFromTextual = nativeFromDecimalTextual()
} else {
// Legacy encoding (default): for backwards compatibility
c.binaryFromNative = decimalBytesFromNative(c.binaryFromNative, toSignedFixedBytes(size), precision, scale)
c.textualFromNative = decimalBytesFromNative(c.textualFromNative, toSignedFixedBytes(size), precision, scale)
c.nativeFromBinary = nativeFromDecimalBytes(c.nativeFromBinary, scale)
c.nativeFromTextual = nativeFromDecimalBytes(c.nativeFromTextual, scale)
}
return c, nil
}

Expand Down
Loading
Loading