Skip to content

Commit 4b8cbe2

Browse files
authored
Add Decimal32 and Decimal64 support to arrow-avro Reader (#8255)
# Which issue does this PR close? - Part of #4886 # Rationale for this change Apache Avro’s `decimal` logical type annotates either `bytes` or `fixed` and carries `precision` and `scale`. Implementations should reject invalid combinations such as `scale > precision`, and the underlying bytes are the two’s‑complement big‑endian representation of the unscaled integer. On the Arrow side, Rust now exposes first‑class `Decimal32`, `Decimal64`, `Decimal128`, and `Decimal256` data types with documented maximum precisions (9, 18, 38, 76 respectively). Until now, `arrow-avro` decoded all Avro decimals to 128/256‑bit Arrow decimals, even when a narrower type would suffice. # What changes are included in this PR? **`arrow-avro/src/codec.rs`** * Map `Codec::Decimal(precision, scale, _size)` to Arrow’s `Decimal32`/`64`/`128`/`256` **by precision**, preferring the narrowest type (≤9→32, ≤18→64, ≤38→128, otherwise 256). * Strengthen decimal attribute parsing: * Error if `scale > precision`. * Error if `precision` exceeds Arrow’s maximum (Decimal256). * If Avro uses `fixed`, check that declared `precision` fits the byte width (≤4→max 9, ≤8→18, ≤16→38, ≤32→76). * Update docstring of `Codec::Decimal` to mention `Decimal32`/`64`. **`arrow-avro/src/reader/record.rs`** * Add `Decoder::Decimal32` and `Decoder::Decimal64` variants with corresponding builders (`Decimal32Builder`, `Decimal64Builder`). * Builder selection: * If Avro uses **fixed**: choose by size (≤4→Decimal32, ≤8→Decimal64, ≤16→Decimal128, ≤32→Decimal256). * If Avro uses **bytes**: choose by declared precision (≤9/≤18/≤38/≤76). * Implement decode paths that sign‑extend Avro’s two’s‑complement payload to 4/8 bytes and append values to the new builders; update `append_null`/`flush` for 32/64‑bit decimals. **`arrow-avro/src/reader/mod.rs` (tests)** * Expand `test_decimal` to assert that: * bytes‑backed decimals with precision 4 map to `Decimal32`; precision 10 map to `Decimal64`; * legacy fixed\[8] decimals map to `Decimal64`; * fixed\[16] decimals map to `Decimal128`. * Add a nulls path test for bytes‑backed `Decimal32`. # Are these changes tested? Yes. Unit tests under `arrow-avro/src/reader/mod.rs` construct expected `Decimal32Array`/`Decimal64Array`/`Decimal128Array` with `with_precision_and_scale`, and compare against batches decoded from Avro files (including legacy fixed and bytes‑backed cases). The tests also exercise small batch sizes to cover buffering paths; a new Avro data file is added for higher‑width decimals. New Avro test file details: - test/data/int256_decimal.avro # bytes logicalType: decimal(precision=76, scale=10) - test/data/fixed256_decimal.avro # fixed[32] logicalType: decimal(precision=76, scale=10) - test/data/fixed_length_decimal_legacy_32.avro # fixed[4] logicalType: decimal(precision=9, scale=2) - test/data/int128_decimal.avro # bytes logicalType: decimal(precision=38, scale=2) These new Avro test files were created using this script: https://gist.github.com/jecsand838/3890349bdb33082a3e8fdcae3257eef7 There is also an arrow-testing PR for these new files: apache/arrow-testing#112 # Are there any user-facing changes? N/A due to `arrow-avro` not being public.
1 parent 911940f commit 4b8cbe2

File tree

9 files changed

+611
-142
lines changed

9 files changed

+611
-142
lines changed

arrow-avro/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@ snappy = ["snap", "crc"]
4242
canonical_extension_types = ["arrow-schema/canonical_extension_types"]
4343
md5 = ["dep:md5"]
4444
sha256 = ["dep:sha2"]
45+
small_decimals = []
4546

4647
[dependencies]
4748
arrow-schema = { workspace = true }

arrow-avro/src/codec.rs

Lines changed: 75 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -21,10 +21,10 @@ use crate::schema::{
2121
};
2222
use arrow_schema::{
2323
ArrowError, DataType, Field, Fields, IntervalUnit, TimeUnit, DECIMAL128_MAX_PRECISION,
24-
DECIMAL128_MAX_SCALE,
24+
DECIMAL256_MAX_PRECISION,
2525
};
26-
use serde_json::Value;
27-
use std::borrow::Cow;
26+
#[cfg(feature = "small_decimals")]
27+
use arrow_schema::{DECIMAL32_MAX_PRECISION, DECIMAL64_MAX_PRECISION};
2828
use std::collections::HashMap;
2929
use std::sync::Arc;
3030

@@ -388,7 +388,7 @@ pub enum Codec {
388388
/// Represents Avro fixed type, maps to Arrow's FixedSizeBinary data type
389389
/// The i32 parameter indicates the fixed binary size
390390
Fixed(i32),
391-
/// Represents Avro decimal type, maps to Arrow's Decimal128 or Decimal256 data types
391+
/// Represents Avro decimal type, maps to Arrow's Decimal32, Decimal64, Decimal128, or Decimal256 data types
392392
///
393393
/// The fields are `(precision, scale, fixed_size)`.
394394
/// - `precision` (`usize`): Total number of digits.
@@ -434,20 +434,28 @@ impl Codec {
434434
}
435435
Self::Interval => DataType::Interval(IntervalUnit::MonthDayNano),
436436
Self::Fixed(size) => DataType::FixedSizeBinary(*size),
437-
Self::Decimal(precision, scale, size) => {
437+
Self::Decimal(precision, scale, _size) => {
438438
let p = *precision as u8;
439439
let s = scale.unwrap_or(0) as i8;
440-
let too_large_for_128 = match *size {
441-
Some(sz) => sz > 16,
442-
None => {
443-
(p as usize) > DECIMAL128_MAX_PRECISION as usize
444-
|| (s as usize) > DECIMAL128_MAX_SCALE as usize
440+
#[cfg(feature = "small_decimals")]
441+
{
442+
if *precision <= DECIMAL32_MAX_PRECISION as usize {
443+
DataType::Decimal32(p, s)
444+
} else if *precision <= DECIMAL64_MAX_PRECISION as usize {
445+
DataType::Decimal64(p, s)
446+
} else if *precision <= DECIMAL128_MAX_PRECISION as usize {
447+
DataType::Decimal128(p, s)
448+
} else {
449+
DataType::Decimal256(p, s)
450+
}
451+
}
452+
#[cfg(not(feature = "small_decimals"))]
453+
{
454+
if *precision <= DECIMAL128_MAX_PRECISION as usize {
455+
DataType::Decimal128(p, s)
456+
} else {
457+
DataType::Decimal256(p, s)
445458
}
446-
};
447-
if too_large_for_128 {
448-
DataType::Decimal256(p, s)
449-
} else {
450-
DataType::Decimal128(p, s)
451459
}
452460
}
453461
Self::Uuid => DataType::FixedSizeBinary(16),
@@ -493,6 +501,29 @@ impl From<PrimitiveType> for Codec {
493501
}
494502
}
495503

504+
/// Compute the exact maximum base‑10 precision that fits in `n` bytes for Avro
505+
/// `fixed` decimals stored as two's‑complement unscaled integers (big‑endian).
506+
///
507+
/// Per Avro spec (Decimal logical type), for a fixed length `n`:
508+
/// max precision = ⌊log₁₀(2^(8n − 1) − 1)⌋.
509+
///
510+
/// This function returns `None` if `n` is 0 or greater than 32 (Arrow supports
511+
/// Decimal256, which is 32 bytes and has max precision 76).
512+
const fn max_precision_for_fixed_bytes(n: usize) -> Option<usize> {
513+
// Precomputed exact table for n = 1..=32
514+
// 1:2, 2:4, 3:6, 4:9, 5:11, 6:14, 7:16, 8:18, 9:21, 10:23, 11:26, 12:28,
515+
// 13:31, 14:33, 15:35, 16:38, 17:40, 18:43, 19:45, 20:47, 21:50, 22:52,
516+
// 23:55, 24:57, 25:59, 26:62, 27:64, 28:67, 29:69, 30:71, 31:74, 32:76
517+
const MAX_P: [usize; 32] = [
518+
2, 4, 6, 9, 11, 14, 16, 18, 21, 23, 26, 28, 31, 33, 35, 38, 40, 43, 45, 47, 50, 52, 55, 57,
519+
59, 62, 64, 67, 69, 71, 74, 76,
520+
];
521+
match n {
522+
1..=32 => Some(MAX_P[n - 1]),
523+
_ => None,
524+
}
525+
}
526+
496527
fn parse_decimal_attributes(
497528
attributes: &Attributes,
498529
fallback_size: Option<usize>,
@@ -516,6 +547,34 @@ fn parse_decimal_attributes(
516547
.and_then(|v| v.as_u64())
517548
.map(|s| s as usize)
518549
.or(fallback_size);
550+
if precision == 0 {
551+
return Err(ArrowError::ParseError(
552+
"Decimal requires precision > 0".to_string(),
553+
));
554+
}
555+
if scale > precision {
556+
return Err(ArrowError::ParseError(format!(
557+
"Decimal has invalid scale > precision: scale={scale}, precision={precision}"
558+
)));
559+
}
560+
if precision > DECIMAL256_MAX_PRECISION as usize {
561+
return Err(ArrowError::ParseError(format!(
562+
"Decimal precision {precision} exceeds maximum supported by Arrow ({})",
563+
DECIMAL256_MAX_PRECISION
564+
)));
565+
}
566+
if let Some(sz) = size {
567+
let max_p = max_precision_for_fixed_bytes(sz).ok_or_else(|| {
568+
ArrowError::ParseError(format!(
569+
"Invalid fixed size for decimal: {sz}, must be between 1 and 32 bytes"
570+
))
571+
})?;
572+
if precision > max_p {
573+
return Err(ArrowError::ParseError(format!(
574+
"Decimal precision {precision} exceeds capacity of fixed size {sz} bytes (max {max_p})"
575+
)));
576+
}
577+
}
519578
Ok((precision, scale, size))
520579
}
521580

@@ -734,7 +793,7 @@ impl<'a> Maker<'a> {
734793
Ok(field)
735794
}
736795
ComplexType::Array(a) => {
737-
let mut field = self.parse_type(a.items.as_ref(), namespace)?;
796+
let field = self.parse_type(a.items.as_ref(), namespace)?;
738797
Ok(AvroDataType {
739798
nullability: None,
740799
metadata: a.attributes.field_metadata(),

arrow-avro/src/reader/mod.rs

Lines changed: 121 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -697,7 +697,7 @@ mod test {
697697
};
698698
use arrow_array::types::{Int32Type, IntervalMonthDayNanoType};
699699
use arrow_array::*;
700-
use arrow_buffer::{Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
700+
use arrow_buffer::{i256, Buffer, NullBuffer, OffsetBuffer, ScalarBuffer};
701701
use arrow_schema::{ArrowError, DataType, Field, Fields, IntervalUnit, Schema};
702702
use bytes::{Buf, BufMut, Bytes};
703703
use futures::executor::block_on;
@@ -2176,37 +2176,137 @@ mod test {
21762176

21772177
#[test]
21782178
fn test_decimal() {
2179-
let files = [
2180-
("avro/fixed_length_decimal.avro", 25, 2),
2181-
("avro/fixed_length_decimal_legacy.avro", 13, 2),
2182-
("avro/int32_decimal.avro", 4, 2),
2183-
("avro/int64_decimal.avro", 10, 2),
2179+
// Choose expected Arrow types depending on the `small_decimals` feature flag.
2180+
// With `small_decimals` enabled, Decimal32/Decimal64 are used where their
2181+
// precision allows; otherwise, those cases resolve to Decimal128.
2182+
#[cfg(feature = "small_decimals")]
2183+
let files: [(&str, DataType); 8] = [
2184+
(
2185+
"avro/fixed_length_decimal.avro",
2186+
DataType::Decimal128(25, 2),
2187+
),
2188+
(
2189+
"avro/fixed_length_decimal_legacy.avro",
2190+
DataType::Decimal64(13, 2),
2191+
),
2192+
("avro/int32_decimal.avro", DataType::Decimal32(4, 2)),
2193+
("avro/int64_decimal.avro", DataType::Decimal64(10, 2)),
2194+
(
2195+
"test/data/int256_decimal.avro",
2196+
DataType::Decimal256(76, 10),
2197+
),
2198+
(
2199+
"test/data/fixed256_decimal.avro",
2200+
DataType::Decimal256(76, 10),
2201+
),
2202+
(
2203+
"test/data/fixed_length_decimal_legacy_32.avro",
2204+
DataType::Decimal32(9, 2),
2205+
),
2206+
("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)),
2207+
];
2208+
#[cfg(not(feature = "small_decimals"))]
2209+
let files: [(&str, DataType); 8] = [
2210+
(
2211+
"avro/fixed_length_decimal.avro",
2212+
DataType::Decimal128(25, 2),
2213+
),
2214+
(
2215+
"avro/fixed_length_decimal_legacy.avro",
2216+
DataType::Decimal128(13, 2),
2217+
),
2218+
("avro/int32_decimal.avro", DataType::Decimal128(4, 2)),
2219+
("avro/int64_decimal.avro", DataType::Decimal128(10, 2)),
2220+
(
2221+
"test/data/int256_decimal.avro",
2222+
DataType::Decimal256(76, 10),
2223+
),
2224+
(
2225+
"test/data/fixed256_decimal.avro",
2226+
DataType::Decimal256(76, 10),
2227+
),
2228+
(
2229+
"test/data/fixed_length_decimal_legacy_32.avro",
2230+
DataType::Decimal128(9, 2),
2231+
),
2232+
("test/data/int128_decimal.avro", DataType::Decimal128(38, 2)),
21842233
];
2185-
let decimal_values: Vec<i128> = (1..=24).map(|n| n as i128 * 100).collect();
2186-
for (file, precision, scale) in files {
2187-
let file_path = arrow_test_data(file);
2234+
for (file, expected_dt) in files {
2235+
let (precision, scale) = match expected_dt {
2236+
DataType::Decimal32(p, s)
2237+
| DataType::Decimal64(p, s)
2238+
| DataType::Decimal128(p, s)
2239+
| DataType::Decimal256(p, s) => (p, s),
2240+
_ => unreachable!("Unexpected decimal type in test inputs"),
2241+
};
2242+
assert!(scale >= 0, "test data uses non-negative scales only");
2243+
let scale_u32 = scale as u32;
2244+
let file_path: String = if file.starts_with("avro/") {
2245+
arrow_test_data(file)
2246+
} else {
2247+
std::path::PathBuf::from(env!("CARGO_MANIFEST_DIR"))
2248+
.join(file)
2249+
.to_string_lossy()
2250+
.into_owned()
2251+
};
2252+
let pow10: i128 = 10i128.pow(scale_u32);
2253+
let values_i128: Vec<i128> = (1..=24).map(|n| (n as i128) * pow10).collect();
2254+
let build_expected = |dt: &DataType, values: &[i128]| -> ArrayRef {
2255+
match *dt {
2256+
DataType::Decimal32(p, s) => {
2257+
let it = values.iter().map(|&v| v as i32);
2258+
Arc::new(
2259+
Decimal32Array::from_iter_values(it)
2260+
.with_precision_and_scale(p, s)
2261+
.unwrap(),
2262+
)
2263+
}
2264+
DataType::Decimal64(p, s) => {
2265+
let it = values.iter().map(|&v| v as i64);
2266+
Arc::new(
2267+
Decimal64Array::from_iter_values(it)
2268+
.with_precision_and_scale(p, s)
2269+
.unwrap(),
2270+
)
2271+
}
2272+
DataType::Decimal128(p, s) => {
2273+
let it = values.iter().copied();
2274+
Arc::new(
2275+
Decimal128Array::from_iter_values(it)
2276+
.with_precision_and_scale(p, s)
2277+
.unwrap(),
2278+
)
2279+
}
2280+
DataType::Decimal256(p, s) => {
2281+
let it = values.iter().map(|&v| i256::from_i128(v));
2282+
Arc::new(
2283+
Decimal256Array::from_iter_values(it)
2284+
.with_precision_and_scale(p, s)
2285+
.unwrap(),
2286+
)
2287+
}
2288+
_ => unreachable!("Unexpected decimal type in test"),
2289+
}
2290+
};
21882291
let actual_batch = read_file(&file_path, 8, false);
2189-
let expected_array = Decimal128Array::from_iter_values(decimal_values.clone())
2190-
.with_precision_and_scale(precision, scale)
2191-
.unwrap();
2292+
let actual_nullable = actual_batch.schema().field(0).is_nullable();
2293+
let expected_array = build_expected(&expected_dt, &values_i128);
21922294
let mut meta = HashMap::new();
21932295
meta.insert("precision".to_string(), precision.to_string());
21942296
meta.insert("scale".to_string(), scale.to_string());
2195-
let field_with_meta = Field::new("value", DataType::Decimal128(precision, scale), true)
2196-
.with_metadata(meta);
2197-
let expected_schema = Arc::new(Schema::new(vec![field_with_meta]));
2297+
let field =
2298+
Field::new("value", expected_dt.clone(), actual_nullable).with_metadata(meta);
2299+
let expected_schema = Arc::new(Schema::new(vec![field]));
21982300
let expected_batch =
2199-
RecordBatch::try_new(expected_schema.clone(), vec![Arc::new(expected_array)])
2200-
.expect("Failed to build expected RecordBatch");
2301+
RecordBatch::try_new(expected_schema.clone(), vec![expected_array]).unwrap();
22012302
assert_eq!(
22022303
actual_batch, expected_batch,
2203-
"Decoded RecordBatch does not match the expected Decimal128 data for file {file}"
2304+
"Decoded RecordBatch does not match for {file}"
22042305
);
22052306
let actual_batch_small = read_file(&file_path, 3, false);
22062307
assert_eq!(
2207-
actual_batch_small,
2208-
expected_batch,
2209-
"Decoded RecordBatch does not match the expected Decimal128 data for file {file} with batch size 3"
2308+
actual_batch_small, expected_batch,
2309+
"Decoded RecordBatch does not match for {file} with batch size 3"
22102310
);
22112311
}
22122312
}

0 commit comments

Comments
 (0)