diff --git a/Cargo.lock b/Cargo.lock index 758356029b890..9d91736c0324c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -584,7 +584,7 @@ dependencies = [ "arrow-schema 56.2.0", "arrow-select 56.2.0", "flatbuffers", - "lz4_flex", + "lz4_flex 0.11.3", "zstd 0.13.3", ] @@ -601,7 +601,7 @@ dependencies = [ "arrow-schema 55.1.0", "chrono", "half", - "indexmap 2.9.0", + "indexmap 2.12.0", "lexical-core", "memchr", "num", @@ -623,7 +623,7 @@ dependencies = [ "arrow-schema 56.2.0", "chrono", "half", - "indexmap 2.9.0", + "indexmap 2.12.0", "lexical-core", "memchr", "num", @@ -4312,6 +4312,23 @@ dependencies = [ "zstd 0.12.4", ] +[[package]] +name = "databend-common-parquet-reader-experimental" +version = "0.1.0" +dependencies = [ + "bytes", + "databend-common-column", + "databend-common-exception", + "databend-common-expression", + "databend-storages-common-table-meta", + "lz4_flex 0.9.5", + "parquet 56.2.0", + "parquet-format-safe", + "parquet2", + "streaming-decompression", + "zstd 0.12.4", +] + [[package]] name = "databend-common-pipeline" version = "0.1.0" @@ -4488,7 +4505,7 @@ dependencies = [ "enum-as-inner", "fastrace", "globiter", - "indexmap 2.9.0", + "indexmap 2.12.0", "itertools 0.13.0", "jsonb", "log", @@ -4653,6 +4670,7 @@ dependencies = [ "databend-common-meta-types", "databend-common-metrics", "databend-common-native", + "databend-common-parquet-reader-experimental", "databend-common-pipeline", "databend-common-pipeline-transforms", "databend-common-sql", @@ -4674,7 +4692,7 @@ dependencies = [ "fastrace", "futures", "futures-util", - "indexmap 2.9.0", + "indexmap 2.12.0", "itertools 0.13.0", "jsonb", "log", @@ -4682,6 +4700,7 @@ dependencies = [ "opendal", "parking_lot 0.12.3", "parquet 56.2.0", + "parquet2", "paste", "rand 0.8.5", "roaring", @@ -5945,7 +5964,7 @@ dependencies = [ "chrono", "delta_kernel_derive", "futures", - "indexmap 2.9.0", + "indexmap 2.12.0", "itertools 0.14.0", "object_store", "parquet 55.1.0", @@ -6008,7 +6027,7 @@ dependencies = [ "either", "futures", "humantime", - "indexmap 2.9.0", + "indexmap 2.12.0", "itertools 0.14.0", "maplit", "num-bigint", @@ -7442,7 +7461,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" dependencies = [ "fallible-iterator", - "indexmap 2.9.0", + "indexmap 2.12.0", "stable_deref_trait", ] @@ -8445,7 +8464,7 @@ dependencies = [ "futures-sink", "futures-util", "http 0.2.12", - "indexmap 2.9.0", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -8464,7 +8483,7 @@ dependencies = [ "futures-core", "futures-sink", "http 1.3.1", - "indexmap 2.9.0", + "indexmap 2.12.0", "slab", "tokio", "tokio-util", @@ -9400,13 +9419,14 @@ dependencies = [ [[package]] name = "indexmap" -version = "2.9.0" +version = "2.12.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "cea70ddb795996207ad57735b50c5982d8844f38ba9ee5f1aedcfb708a2aa11e" +checksum = "6717a8d2a5a929a1a2eb43a12812498ed141a0bcfb7e8f7844fbdbe4303bba9f" dependencies = [ "equivalent", - "hashbrown 0.15.3", + "hashbrown 0.16.0", "serde", + "serde_core", ] [[package]] @@ -9422,7 +9442,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "232929e1d75fe899576a3d5c7416ad0d88dbfbb3c3d6aa00873a7408a50ddb88" dependencies = [ "ahash 0.8.12", - "indexmap 2.9.0", + "indexmap 2.12.0", "is-terminal", "itoa", "log", @@ -9665,7 +9685,7 @@ dependencies = [ "ahash 0.8.12", "dyn-clone", "hifijson", - "indexmap 2.9.0", + "indexmap 2.12.0", "jaq-syn", "once_cell", "serde_json", @@ -9874,7 +9894,7 @@ version = "0.4.2" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "4ee7893dab2e44ae5f9d0173f26ff4aa327c10b01b06a72b52dd9405b628640d" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", ] [[package]] @@ -10365,6 +10385,15 @@ dependencies = [ "libc", ] +[[package]] +name = "lz4_flex" +version = "0.9.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1a8cbbb2831780bc3b9c15a41f5b49222ef756b6730a95f3decfdd15903eb5a3" +dependencies = [ + "twox-hash 1.6.3", +] + [[package]] name = "lz4_flex" version = "0.11.3" @@ -11222,7 +11251,7 @@ dependencies = [ "crc32fast", "flate2", "hashbrown 0.15.3", - "indexmap 2.9.0", + "indexmap 2.12.0", "memchr", "ruzstd", ] @@ -11622,7 +11651,7 @@ dependencies = [ "flate2", "futures", "futures-util", - "lz4_flex", + "lz4_flex 0.11.3", "lzokay-native", "num", "prost", @@ -11797,7 +11826,7 @@ dependencies = [ "futures", "half", "hashbrown 0.15.3", - "lz4_flex", + "lz4_flex 0.11.3", "num", "num-bigint", "object_store", @@ -11833,9 +11862,12 @@ dependencies = [ "futures", "half", "hashbrown 0.16.0", - "lz4_flex", + "lz4_flex 0.11.3", "num", "num-bigint", + "parquet-variant", + "parquet-variant-compute", + "parquet-variant-json", "paste", "seq-macro", "simdutf8", @@ -11846,6 +11878,75 @@ dependencies = [ "zstd 0.13.3", ] +[[package]] +name = "parquet-format-safe" +version = "0.2.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1131c54b167dd4e4799ce762e1ab01549ebb94d5bdd13e6ec1b467491c378e1f" +dependencies = [ + "async-trait", + "futures", +] + +[[package]] +name = "parquet-variant" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a56bf96fdaf5f9392b447cf1e60bfe4b72149ab3309aa3855c02813cc89ad93f" +dependencies = [ + "arrow-schema 56.2.0", + "chrono", + "half", + "indexmap 2.12.0", + "simdutf8", + "uuid", +] + +[[package]] +name = "parquet-variant-compute" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bc7ce683368c9f2672379c12e35b5cd664e81964e234d891b4073a8355015ce7" +dependencies = [ + "arrow 56.2.0", + "arrow-schema 56.2.0", + "chrono", + "half", + "parquet-variant", + "parquet-variant-json", +] + +[[package]] +name = "parquet-variant-json" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "af8bde078e883e197efe49d315bfa0ecf8f68879d32a2abf34eb09f40ea88f21" +dependencies = [ + "arrow-schema 56.2.0", + "base64 0.22.1", + "chrono", + "parquet-variant", + "serde_json", + "uuid", +] + +[[package]] +name = "parquet2" +version = "0.17.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "579fe5745f02cef3d5f236bfed216fd4693e49e4e920a13475c6132233283bce" +dependencies = [ + "async-stream", + "futures", + "lz4", + "parquet-format-safe", + "seq-macro", + "serde", + "snap", + "streaming-decompression", + "zstd 0.12.4", +] + [[package]] name = "parse-display" version = "0.9.1" @@ -11937,7 +12038,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4c5cc86750666a3ed20bdaf5ca2a0344f9c67674cae0515bec2da16fbaa47db" dependencies = [ "fixedbitset 0.4.2", - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", "serde_derive", ] @@ -11949,7 +12050,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "3672b37090dbd86368a4145bc067582552b29c27377cad4e0a306c97f9bd7772" dependencies = [ "fixedbitset 0.5.7", - "indexmap 2.9.0", + "indexmap 2.12.0", ] [[package]] @@ -12655,7 +12756,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b4aeaa1f2460f1d348eeaeed86aea999ce98c1bded6f089ff8514c9d9dbdc973" dependencies = [ "anyhow", - "indexmap 2.9.0", + "indexmap 2.12.0", "log", "protobuf", "protobuf-support", @@ -13552,7 +13653,7 @@ checksum = "1e147371c75553e1e2fcdb483944a8540b8438c31426279553b9a8182a9b7b65" dependencies = [ "bytes", "hashbrown 0.15.3", - "indexmap 2.9.0", + "indexmap 2.12.0", "munge", "ptr_meta 0.3.0", "rancor", @@ -13681,7 +13782,7 @@ dependencies = [ "convert_case 0.6.0", "fnv", "ident_case", - "indexmap 2.9.0", + "indexmap 2.12.0", "proc-macro-crate 1.3.1", "proc-macro-error 1.0.4", "proc-macro2", @@ -14167,7 +14268,7 @@ version = "0.1.0" source = "git+https://github.com/datafuse-extras/serde-bridge?rev=4f0e99a#4f0e99abc82de8a82046415c90f01f9739bad630" dependencies = [ "anyhow", - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", ] @@ -14242,7 +14343,7 @@ version = "1.0.140" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "20068b6e96dc6c9bd23e01df8827e6c7e1f2fddd43c21810382803c136b99373" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", "itoa", "memchr", "ryu", @@ -14319,7 +14420,7 @@ dependencies = [ "chrono", "hex", "indexmap 1.9.3", - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", "serde_derive", "serde_json", @@ -14345,7 +14446,7 @@ version = "0.9.34+deprecated" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6a8b1a1a2ebf674015cc02edccce75287f1a0130d394307b36743c2f5d504b47" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", "itoa", "ryu", "serde", @@ -14358,7 +14459,7 @@ version = "0.1.0" source = "git+https://github.com/datafuse-extras/serfig?rev=610ac6d#610ac6dfa251206d3667d169b772de7b0f05d151" dependencies = [ "anyhow", - "indexmap 2.9.0", + "indexmap 2.12.0", "log", "serde", "serde-bridge", @@ -14836,7 +14937,7 @@ dependencies = [ "futures-util", "hashbrown 0.15.3", "hashlink 0.10.0", - "indexmap 2.9.0", + "indexmap 2.12.0", "log", "memchr", "once_cell", @@ -15057,6 +15158,15 @@ dependencies = [ "pin-project-lite", ] +[[package]] +name = "streaming-decompression" +version = "0.1.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bf6cc3b19bfb128a8ad11026086e31d3ce9ad23f8ea37354b31383a187c44cf3" +dependencies = [ + "fallible-streaming-iterator", +] + [[package]] name = "strength_reduce" version = "0.2.4" @@ -15474,7 +15584,7 @@ dependencies = [ "levenshtein_automata", "log", "lru", - "lz4_flex", + "lz4_flex 0.11.3", "measure_time", "memmap2", "once_cell", @@ -16019,7 +16129,7 @@ version = "0.9.5" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "75129e1dc5000bfbaa9fee9d1b21f974f9fbad9daec557a521ee6e080825f6e8" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", "serde_spanned 1.0.0", "toml_datetime 0.7.0", @@ -16052,7 +16162,7 @@ version = "0.19.15" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1b5bb770da30e5cbfde35a2d7b9b8a2c4b8ef89548a7a6aeab5c9a576e3e7421" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", "serde_spanned 0.6.8", "toml_datetime 0.6.9", @@ -16065,7 +16175,7 @@ version = "0.22.26" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "310068873db2c5b3e7659d2cc35d21855dbafa50d1ce336397c666e3cb08137e" dependencies = [ - "indexmap 2.9.0", + "indexmap 2.12.0", "serde", "serde_spanned 0.6.8", "toml_datetime 0.6.9", @@ -16211,7 +16321,7 @@ checksum = "d039ad9159c98b70ecfd540b2573b97f7f52c3e8d9f8ad57a24b916a536975f9" dependencies = [ "futures-core", "futures-util", - "indexmap 2.9.0", + "indexmap 2.12.0", "pin-project-lite", "slab", "sync_wrapper", @@ -17044,7 +17154,7 @@ dependencies = [ "ahash 0.8.12", "bitflags 2.9.0", "hashbrown 0.14.5", - "indexmap 2.9.0", + "indexmap 2.12.0", "semver", "serde", ] @@ -17056,7 +17166,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "808198a69b5a0535583370a51d459baa14261dfab04800c4864ee9e1a14346ed" dependencies = [ "bitflags 2.9.0", - "indexmap 2.9.0", + "indexmap 2.12.0", "semver", ] @@ -17088,7 +17198,7 @@ dependencies = [ "fxprof-processed-profile", "gimli 0.31.1", "hashbrown 0.14.5", - "indexmap 2.9.0", + "indexmap 2.12.0", "ittapi", "libc", "libm", @@ -17214,7 +17324,7 @@ dependencies = [ "cranelift-bitset", "cranelift-entity", "gimli 0.31.1", - "indexmap 2.9.0", + "indexmap 2.12.0", "log", "object", "postcard", @@ -17310,7 +17420,7 @@ checksum = "bf3963c9c29df91564d8bd181eb00d0dbaeafa1b2a01e15952bb7391166b704e" dependencies = [ "anyhow", "heck 0.5.0", - "indexmap 2.9.0", + "indexmap 2.12.0", "wit-parser", ] @@ -18093,7 +18203,7 @@ checksum = "ca004bb251010fe956f4a5b9d4bf86b4e415064160dd6669569939e8cbf2504f" dependencies = [ "anyhow", "id-arena", - "indexmap 2.9.0", + "indexmap 2.12.0", "log", "semver", "serde", @@ -18320,7 +18430,7 @@ dependencies = [ "flate2", "getrandom 0.3.3", "hmac", - "indexmap 2.9.0", + "indexmap 2.12.0", "lzma-rs", "memchr", "pbkdf2", diff --git a/Cargo.toml b/Cargo.toml index 8143f848e0d7e..ccfe3e7567353 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -154,6 +154,7 @@ databend-common-meta-store = { path = "src/meta/store" } databend-common-meta-types = { path = "src/meta/types" } databend-common-metrics = { path = "src/common/metrics" } databend-common-native = { path = "src/common/native" } +databend-common-parquet-reader-experimental = { path = "src/common/experimental_parquet_reader" } databend-common-pipeline = { path = "src/query/pipeline" } databend-common-pipeline-transforms = { path = "src/query/pipeline/transforms" } databend-common-proto-conv = { path = "src/meta/proto-conv" } @@ -370,6 +371,7 @@ logforth = { git = "https://github.com/datafuse-extras/logforth", branch = "main 'fastrace', ] } lz4 = "1.24.0" +lz4_flex = { version = "^0.9" } map-api = { version = "0.4.2" } maplit = "1.0.2" match-template = "0.0.1" @@ -418,6 +420,8 @@ ordq = "0.2.0" p256 = "0.13" parking_lot = "0.12.1" parquet = { version = "56", features = ["async"] } +parquet-format-safe = "0.2.0" +parquet2 = { version = "0.17.0", default-features = false, features = ["serde_types", "async", "zstd", "snappy", "lz4"] } passwords = { version = "3.1.16" } paste = "1.0.15" percent-encoding = "2.3.1" @@ -497,6 +501,7 @@ stacker = "0.1" state = "0.6.0" state-machine-api = { version = "0.3.4" } stream-more = "0.1.3" +streaming-decompression = "0.1.2" strength_reduce = "0.2.4" stringslice = "0.2.0" strum = "0.24.1" @@ -600,7 +605,8 @@ map_entry = "allow" debug = 1 lto = "thin" overflow-checks = false -opt-level = "s" # defaults to be 3 +#opt-level = "s" # defaults to be 3 +opt-level = 3 incremental = false codegen-units = 1 ## better performance see below comment ## DONT'T DELETE THIS: If we want best performance, we should use this profile but it will take longer time to compile. diff --git a/src/common/column/src/binview/mod.rs b/src/common/column/src/binview/mod.rs index 609f696933cfe..78ee010013dbc 100644 --- a/src/common/column/src/binview/mod.rs +++ b/src/common/column/src/binview/mod.rs @@ -138,9 +138,9 @@ impl Clone for BinaryViewColumnGeneric { } } -unsafe impl Send for BinaryViewColumnGeneric {} +// impl Send for BinaryViewColumnGeneric {} -unsafe impl Sync for BinaryViewColumnGeneric {} +// unsafe impl Sync for BinaryViewColumnGeneric {} impl BinaryViewColumnGeneric { fn init_cache(value: Option) -> OnceLock { diff --git a/src/common/experimental_parquet_reader/Cargo.toml b/src/common/experimental_parquet_reader/Cargo.toml new file mode 100644 index 0000000000000..611340b5e25ac --- /dev/null +++ b/src/common/experimental_parquet_reader/Cargo.toml @@ -0,0 +1,32 @@ +[package] +name = "databend-common-parquet-reader-experimental" +version = { workspace = true } +authors = { workspace = true } +license = { workspace = true } +publish = { workspace = true } +edition = { workspace = true } + +[features] + +[dependencies] +databend-common-column = { workspace = true } +databend-common-exception = { workspace = true } +databend-common-expression = { workspace = true } +databend-storages-common-table-meta = { workspace = true } + +bytes = { workspace = true } +lz4_flex = { workspace = true } +parquet = { workspace = true, features = ["experimental"] } +parquet-format-safe = { workspace = true } +parquet2 = { workspace = true } +streaming-decompression = { workspace = true } +zstd = { workspace = true } + +[dev-dependencies] +# used to test async readers + +[package.metadata.cargo-machete] +ignored = ["match-template"] + +[lints] +workspace = true diff --git a/src/common/experimental_parquet_reader/src/column/common.rs b/src/common/experimental_parquet_reader/src/column/common.rs new file mode 100644 index 0000000000000..4b1e3f25367cb --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/common.rs @@ -0,0 +1,887 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Common utilities for Parquet column deserialization + +use databend_common_column::bitmap::Bitmap; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::NullableColumn; +use databend_common_expression::Column; +use decompressor::Decompressor; +use parquet::encodings::rle::RleDecoder; +use parquet2::schema::types::PhysicalType; +use streaming_decompression::FallibleStreamingIterator; + +use crate::reader::decompressor; + +// ============================================================================= +// Dictionary Support Trait +// ============================================================================= + +/// Trait for types that support dictionary encoding in Parquet +/// This trait enables efficient dictionary-based deserialization for numeric types +pub trait DictionarySupport: ParquetColumnType { + /// Create a value from a dictionary entry (raw bytes) + /// + /// # Arguments + /// * `entry` - Raw bytes from dictionary page + /// + /// # Returns + /// Decoded value of type Self + fn from_dictionary_entry(entry: &[u8]) -> Result; + + /// Batch lookup from dictionary into provided output slice + /// + /// # Arguments + /// * `dictionary` - The dictionary values to lookup from + /// * `indices` - Array of dictionary indices + /// * `output` - Output slice to write results into (must have same length as indices) + /// + /// # Performance + /// This is the performance-critical path for dictionary decoding. + /// Implementations should use batch bounds checking and unsafe operations for maximum speed. + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> Result<()>; +} + +/// Extract definition levels, repetition levels, and values from a data page +fn extract_page_data(data_page: &parquet2::page::DataPage) -> Result<(&[u8], &[u8], &[u8])> { + match parquet2::page::split_buffer(data_page) { + Ok((rep_levels, def_levels, values_buffer)) => Ok((def_levels, rep_levels, values_buffer)), + Err(e) => Err(ErrorCode::Internal(format!( + "Failed to split buffer: {}", + e + ))), + } +} + +/// Decode definition levels and create validity bitmap +pub fn decode_definition_levels( + def_levels: &[u8], + bit_width: u32, + num_values: usize, + data_page: &parquet2::page::DataPage, +) -> Result<(Option, usize)> { + let mut rle_decoder = RleDecoder::new(bit_width as u8); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(def_levels)); + + let expected_levels = num_values; + let mut levels = vec![0i32; expected_levels]; + let decoded_count = rle_decoder + .get_batch(&mut levels) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode definition levels: {}", e)))?; + + if decoded_count != expected_levels { + return Err(ErrorCode::Internal(format!( + "Definition level decoder returned wrong count: expected={}, got={}", + expected_levels, decoded_count + ))); + } + + let max_def_level = data_page.descriptor.max_def_level as i32; + let mut validity_bits = Vec::with_capacity(expected_levels); + let mut non_null_count = 0; + let mut has_nulls = false; + + for &level in &levels { + let is_valid = level == max_def_level; + validity_bits.push(is_valid); + if is_valid { + non_null_count += 1; + } else { + has_nulls = true; + } + } + + let bitmap = if has_nulls { + Some(Bitmap::from_iter(validity_bits)) + } else { + Some(Bitmap::new_constant(true, expected_levels)) + }; + Ok((bitmap, non_null_count)) +} + +/// Process plain encoded data +/// # Arguments +/// * `values_buffer` - The buffer containing the encoded values (maybe plain encoded) +/// * `page_rows` - The number of rows in the page +/// * `column_data` - The vector to which the decoded values will be appended, capacity should be reserved properly +/// * `validity_bitmap` - The validity bitmap for the column if any +fn process_plain_encoding( + values_buffer: &[u8], + page_rows: usize, + column_data: &mut Vec, + validity_bitmap: Option<&Bitmap>, +) -> Result<()> { + let type_size = std::mem::size_of::(); + let old_len = column_data.len(); + + // Calculate how many non-null values we expect to read + let non_null_count = if let Some(bitmap) = validity_bitmap { + bitmap.iter().filter(|&b| b).count() + } else { + page_rows + }; + + if let Some(bitmap) = validity_bitmap { + // Nullable column: process values based on validity bitmap + // Extend vector to final size, leaving NULL positions uninitialized + unsafe { + column_data.set_len(old_len + page_rows); + } + + let mut values_read = 0; + for (i, is_valid) in bitmap.iter().enumerate() { + if is_valid && values_read < non_null_count { + let src_offset = values_read * type_size; + let dst_offset = old_len + i; + + if src_offset + type_size <= values_buffer.len() { + // Handle endianness conversion for numeric types + #[cfg(target_endian = "big")] + { + // On big-endian systems, convert from Parquet's little-endian format + convert_endianness_and_copy::( + &values_buffer[src_offset..src_offset + type_size], + &mut column_data[dst_offset..dst_offset + 1], + ); + } + #[cfg(target_endian = "little")] + { + // On little-endian systems, direct copy is sufficient + unsafe { + let src_ptr = values_buffer.as_ptr().add(src_offset); + let dst_ptr = + column_data[dst_offset..dst_offset + 1].as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, type_size); + } + } + values_read += 1; + } else { + return Err(ErrorCode::Internal("Values buffer underflow".to_string())); + } + } + // Note: NULL positions (is_valid == false) are left uninitialized + // This is safe because the validity bitmap controls access + } + } else { + let values_to_copy = non_null_count.min(page_rows); + let total_bytes = values_to_copy * type_size; + + if total_bytes <= values_buffer.len() { + #[cfg(target_endian = "big")] + { + // On big-endian systems, convert each value individually + unsafe { + column_data.set_len(old_len + values_to_copy); + } + for i in 0..values_to_copy { + let src_offset = i * type_size; + let dst_offset = old_len + i; + convert_endianness_and_copy::( + &values_buffer[src_offset..src_offset + type_size], + &mut column_data[dst_offset..dst_offset + 1], + ); + } + } + #[cfg(target_endian = "little")] + { + // On little-endian systems, batch copy for performance + unsafe { + let src_ptr = values_buffer.as_ptr(); + let dst_ptr = column_data.as_mut_ptr().add(old_len) as *mut u8; + std::ptr::copy_nonoverlapping(src_ptr, dst_ptr, total_bytes); + column_data.set_len(old_len + values_to_copy); + } + } + } else { + return Err(ErrorCode::Internal("Values buffer underflow".to_string())); + } + } + + Ok(()) +} + +/// Process a complete data page for any type T +fn process_data_page( + data_page: &parquet2::page::DataPage, + column_data: &mut Vec, + target_rows: usize, + is_nullable: bool, + expected_physical_type: &PhysicalType, + dictionary: Option<&[T]>, +) -> Result> { + // Validate physical type + validate_physical_type( + data_page.descriptor.primitive_type.physical_type, + *expected_physical_type, + )?; + + let (def_levels, _, values_buffer) = extract_page_data(data_page)?; + let remaining = target_rows - column_data.len(); + + // Defensive checks for nullable vs non-nullable columns + #[cfg(debug_assertions)] + validate_column_nullability(def_levels, is_nullable)?; + + // Number of values(not rows), including NULLs + let num_values = data_page.num_values(); + + // Calculate how many rows this page will actually contribute + let page_rows = if is_nullable { + // For nullable columns, page contributes num_values rows (including NULLs) + num_values.min(remaining) + } else { + // For non-nullable columns, we need to handle different encodings differently + match data_page.encoding() { + parquet2::encoding::Encoding::Plain => { + let type_size = std::mem::size_of::(); + let num_values_in_buffer = values_buffer.len() / type_size; + num_values_in_buffer.min(remaining) + } + parquet2::encoding::Encoding::RleDictionary => { + // For RLE dictionary, we use num_values from the page header + num_values.min(remaining) + } + _ => num_values.min(remaining), + } + }; + + // Process definition levels to create validity bitmap (only for nullable columns) + let validity_bitmap = if is_nullable { + let bit_width = get_bit_width(data_page.descriptor.max_def_level); + let (bitmap, _non_null_count) = + decode_definition_levels(def_levels, bit_width, num_values, data_page)?; + bitmap + } else { + // For non-nullable columns, no validity bitmap needed + None + }; + + // Process values based on encoding + match data_page.encoding() { + parquet2::encoding::Encoding::Plain => { + // Validate values_buffer alignment for plain encoding + #[cfg(debug_assertions)] + { + let type_size = std::mem::size_of::(); + if values_buffer.len() % type_size != 0 { + return Err(ErrorCode::Internal(format!( + "Values buffer length ({}) is not aligned to type size ({}). Buffer may be corrupted.", + values_buffer.len(), + type_size + ))); + } + } + + process_plain_encoding( + values_buffer, + page_rows, + column_data, + validity_bitmap.as_ref(), + )?; + } + parquet2::encoding::Encoding::RleDictionary => { + if let Some(dict) = dictionary { + process_rle_dictionary_encoding(values_buffer, page_rows, column_data, dict)?; + } else { + return Err(ErrorCode::Internal( + "RLE dictionary encoding requires dictionary page".to_string(), + )); + } + } + encoding => { + return Err(ErrorCode::Internal(format!( + "Unsupported encoding: {:?}", + encoding + ))); + } + } + + Ok(validity_bitmap) +} + +/// Process dictionary page for numeric types +fn process_dictionary_page( + dict_page: &parquet2::page::DictPage, + dictionary: &mut Vec, +) -> Result<()> { + let dict_buffer: &[u8] = dict_page.buffer.as_ref(); + let type_size = match T::PHYSICAL_TYPE { + PhysicalType::Int32 => 4, + PhysicalType::Int64 => 8, + PhysicalType::FixedLenByteArray(len) => len as usize, + _ => { + return Err(ErrorCode::Internal(format!( + "Unsupported physical type for dictionary: {:?}", + T::PHYSICAL_TYPE + ))) + } + }; + + // Parse dictionary entries based on physical type + for chunk in dict_buffer.chunks_exact(type_size) { + let value = T::from_dictionary_entry(chunk)?; + dictionary.push(value); + } + + Ok(()) +} + +/// Process RLE dictionary encoded data page +fn process_rle_dictionary_encoding( + values_buffer: &[u8], + page_rows: usize, + column_data: &mut Vec, + dictionary: &[T], +) -> Result<()> { + if values_buffer.is_empty() { + return Err(ErrorCode::Internal( + "Empty values buffer for RLE dictionary".to_string(), + )); + } + + // First byte is bit_width + let bit_width = values_buffer[0]; + + // Create RLE decoder + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(&values_buffer[1..])); + + // Decode indices - avoid zero initialization for performance + let mut indices = Vec::with_capacity(page_rows); + unsafe { + indices.set_len(page_rows); + } + let decoded_count = rle_decoder + .get_batch(&mut indices) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode RLE indices: {}", e)))?; + + if decoded_count != page_rows { + return Err(ErrorCode::Internal(format!( + "RLE decoder returned wrong count: expected={}, got={}", + page_rows, decoded_count + ))); + } + + // Batch dictionary lookup - performance critical path + let old_len = column_data.len(); + column_data.reserve(page_rows); + unsafe { + column_data.set_len(old_len + page_rows); + } + T::batch_from_dictionary_into_slice(dictionary, &indices, &mut column_data[old_len..])?; + + Ok(()) +} + +// TODO rename this +pub trait ParquetColumnType: Copy + Send + Sync + 'static { + /// Additional metadata needed to create columns (e.g., precision/scale for decimals) + type Metadata: Clone; + + /// The Parquet physical type for this column type + const PHYSICAL_TYPE: PhysicalType; + + /// Create a column from the deserialized data + fn create_column( + data: Vec, + metadata: &Self::Metadata, + ) -> databend_common_expression::Column; +} + +// TODO rename this +pub struct ParquetColumnIterator<'a, T: ParquetColumnType + DictionarySupport> { + pages: Decompressor<'a>, + chunk_size: Option, + num_rows: usize, + is_nullable: bool, + metadata: T::Metadata, + dictionary: Option>, // Cached dictionary values + _phantom: std::marker::PhantomData, +} + +impl<'a, T: ParquetColumnType + DictionarySupport> ParquetColumnIterator<'a, T> { + pub fn new( + pages: Decompressor<'a>, + num_rows: usize, + is_nullable: bool, + metadata: T::Metadata, + chunk_size: Option, + ) -> Self { + Self { + pages, + chunk_size, + num_rows, + is_nullable, + metadata, + dictionary: None, + _phantom: std::marker::PhantomData, + } + } +} + +// WIP: State of iterator should be adjusted, if we allow chunk_size be chosen freely +impl<'a, T: ParquetColumnType + DictionarySupport> Iterator for ParquetColumnIterator<'a, T> { + type Item = Result; + + fn next(&mut self) -> Option { + let target_rows = self.chunk_size.unwrap_or(self.num_rows); + let mut column_data: Vec = Vec::with_capacity(target_rows); + let mut validity_bitmaps = Vec::new(); + + while column_data.len() < target_rows { + // Get the next page + let page = match self.pages.next() { + Ok(Some(page)) => page, + Ok(None) => break, + Err(e) => { + return Some(Err(ErrorCode::Internal(format!( + "Failed to get next page: {}", + e + )))) + } + }; + + match page { + parquet2::page::Page::Data(data_page) => { + let data_len_before = column_data.len(); + match process_data_page( + data_page, + &mut column_data, + target_rows, + self.is_nullable, + &T::PHYSICAL_TYPE, + self.dictionary.as_ref().map(|dict| dict.as_slice()), + ) { + Ok(validity_bitmap) => { + if self.is_nullable { + // For nullable columns, we must have a validity bitmap for each page + if let Some(bitmap) = validity_bitmap { + let data_added = column_data.len() - data_len_before; + + // Verify bitmap length matches data added + if bitmap.len() != data_added { + return Some(Err(ErrorCode::Internal(format!( + "Bitmap length mismatch: bitmap={}, data_added={}", + bitmap.len(), + data_added + )))); + } + validity_bitmaps.push(bitmap); + } else { + // This should not happen for nullable columns + return Some(Err(ErrorCode::Internal( + "Nullable column page must produce validity bitmap" + .to_string(), + ))); + } + } + } + Err(e) => return Some(Err(e)), + } + } + parquet2::page::Page::Dict(dict_page) => { + if T::PHYSICAL_TYPE == PhysicalType::Int32 + || T::PHYSICAL_TYPE == PhysicalType::Int64 + || matches!(T::PHYSICAL_TYPE, PhysicalType::FixedLenByteArray(_)) + { + // Process dictionary page and cache the dictionary + if let Some(ref mut dictionary) = self.dictionary { + if let Err(e) = process_dictionary_page::(dict_page, dictionary) { + return Some(Err(e)); + } + } else { + let mut dictionary = Vec::new(); + if let Err(e) = process_dictionary_page::(dict_page, &mut dictionary) + { + return Some(Err(e)); + } + self.dictionary = Some(dictionary); + } + } else { + return Some(Err(ErrorCode::Internal( + "Dictionary page not supported for this type".to_string(), + ))); + } + } + } + } + + if column_data.is_empty() { + return None; + } + + // Return the appropriate Column variant based on nullability + if self.is_nullable { + // For nullable columns, create NullableColumn + let column_len = column_data.len(); + let base_column = T::create_column(column_data, &self.metadata); + + // Combine validity bitmaps from multiple pages + let combined_bitmap = match combine_validity_bitmaps(validity_bitmaps, column_len) { + Ok(bitmap) => bitmap, + Err(e) => return Some(Err(e)), + }; + + let nullable_column = NullableColumn::new(base_column, combined_bitmap); + Some(Ok(Column::Nullable(Box::new(nullable_column)))) + } else { + // For non-nullable columns, return the column directly + Some(Ok(T::create_column(column_data, &self.metadata))) + } + } +} + +fn get_bit_width(max_level: i16) -> u32 { + if max_level == 1 { + 1 + } else { + 16 - max_level.leading_zeros() + } +} + +/// Convert endianness and copy data for big-endian systems +/// +/// This function handles the conversion from Parquet's little-endian format +/// to the native big-endian format on big-endian systems. +#[cfg(target_endian = "big")] +fn convert_endianness_and_copy(src_bytes: &[u8], dst_slice: &mut [T]) { + let type_size = std::mem::size_of::(); + + match type_size { + 1 => { + // Single byte: no endianness conversion needed + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(src_bytes.as_ptr(), dst_ptr, 1); + } + } + 2 => { + // 2-byte integer (i16): convert from little-endian + let mut bytes = [0u8; 2]; + bytes.copy_from_slice(src_bytes); + let value = i16::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i16; + *dst_ptr = value; + } + } + 4 => { + // 4-byte integer (i32): convert from little-endian + let mut bytes = [0u8; 4]; + bytes.copy_from_slice(src_bytes); + let value = i32::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i32; + *dst_ptr = value; + } + } + 8 => { + // 8-byte integer (i64): convert from little-endian + let mut bytes = [0u8; 8]; + bytes.copy_from_slice(src_bytes); + let value = i64::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i64; + *dst_ptr = value; + } + } + 16 => { + // 16-byte integer (i128): convert from little-endian + let mut bytes = [0u8; 16]; + bytes.copy_from_slice(src_bytes); + let value = i128::from_le_bytes(bytes); + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut i128; + *dst_ptr = value; + } + } + 32 => { + // 32-byte integer (i256): convert from little-endian + // Note: i256 doesn't have from_le_bytes, so we reverse the bytes manually + let mut bytes = [0u8; 32]; + bytes.copy_from_slice(src_bytes); + bytes.reverse(); // Convert from little-endian to big-endian + unsafe { + let dst_ptr = dst_slice.as_mut_ptr() as *mut u8; + std::ptr::copy_nonoverlapping(bytes.as_ptr(), dst_ptr, 32); + } + } + } +} + +/// Perform defensive checks for nullable vs non-nullable columns +#[cfg(debug_assertions)] +pub fn validate_column_nullability(def_levels: &[u8], is_nullable: bool) -> Result<()> { + if is_nullable { + // Nullable columns must have definition levels + if def_levels.is_empty() { + return Err(ErrorCode::Internal( + "Nullable column must have definition levels".to_string(), + )); + } + } else { + // Non-nullable columns should not have definition levels + if !def_levels.is_empty() { + return Err(ErrorCode::Internal( + "Non-nullable column should not have definition levels".to_string(), + )); + } + } + Ok(()) +} + +/// Validate physical type matches expected type +pub fn validate_physical_type(actual: PhysicalType, expected: PhysicalType) -> Result<()> { + if actual != expected { + return Err(ErrorCode::Internal(format!( + "Physical type mismatch: expected {:?}, got {:?}", + expected, actual + ))); + } + Ok(()) +} + +/// Combine multiple validity bitmaps from different pages +pub fn combine_validity_bitmaps( + validity_bitmaps: Vec, + expected_total_len: usize, +) -> Result { + if validity_bitmaps.is_empty() { + Ok(Bitmap::new_constant(true, expected_total_len)) + } else if validity_bitmaps.len() == 1 { + Ok(validity_bitmaps.into_iter().next().unwrap()) + } else { + // Combine multiple validity bitmaps + let total_len: usize = validity_bitmaps.iter().map(|b| b.len()).sum(); + if total_len != expected_total_len { + return Err(ErrorCode::Internal(format!( + "Combined bitmap length ({}) does not match expected length ({})", + total_len, expected_total_len + ))); + } + let mut combined_bits = Vec::with_capacity(total_len); + for bitmap in validity_bitmaps { + combined_bits.extend(bitmap.iter()); + } + Ok(Bitmap::from_iter(combined_bits)) + } +} + +#[cfg(test)] +mod tests { + use std::sync::Arc; + + use parquet2::compression::Compression; + use parquet2::encoding::Encoding; + use parquet2::page::DataPage; + use parquet2::page::DictPage; + use parquet2::schema::types::PhysicalType; + + use super::*; + + // Mock implementation for testing + #[derive(Debug, Clone, Copy, PartialEq)] + struct TestType(i32); + + impl ParquetColumnType for TestType { + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int32; + type Metadata = (); + + fn create_column( + data: Vec, + metadata: &Self::Metadata, + ) -> databend_common_expression::Column { + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + databend_common_expression::Column::Number( + databend_common_expression::NumberColumn::Int32(raw_data.into()), + ) + } + } + + impl DictionarySupport for TestType { + fn from_dictionary_entry(entry: &[u8]) -> Result { + if entry.len() != 4 { + return Err(databend_common_exception::ErrorCode::Internal( + "Expected 4 bytes for TestType".to_string(), + )); + } + let value = i32::from_le_bytes([entry[0], entry[1], entry[2], entry[3]]); + Ok(TestType(value)) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> Result<()> { + if indices.len() != output.len() { + return Err(databend_common_exception::ErrorCode::Internal( + "Output slice length mismatch".to_string(), + )); + } + + for (i, &index) in indices.iter().enumerate() { + if index < 0 || index as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + index, + dictionary.len() + ))); + } + output[i] = dictionary[index as usize]; + } + Ok(()) + } + } + + #[test] + fn test_process_dictionary_page() -> Result<()> { + // Create test dictionary data (3 i32 values: 10, 20, 30) + let dict_data = vec![ + 10u8, 0, 0, 0, // 10 in little-endian + 20u8, 0, 0, 0, // 20 in little-endian + 30u8, 0, 0, 0, // 30 in little-endian + ]; + + let dict_page = DictPage { + buffer: dict_data, + num_values: 3, + is_sorted: false, + }; + + let mut dictionary = Vec::new(); + process_dictionary_page::(&dict_page, &mut dictionary)?; + + assert_eq!(dictionary.len(), 3); + assert_eq!(dictionary[0], TestType(10)); + assert_eq!(dictionary[1], TestType(20)); + assert_eq!(dictionary[2], TestType(30)); + + Ok(()) + } + + #[test] + fn test_process_dictionary_page_empty() -> Result<()> { + let dict_page = DictPage { + buffer: bytes::Bytes::from(vec![]), + num_values: 0, + is_sorted: false, + }; + + let mut dictionary = Vec::new(); + process_dictionary_page::(&dict_page, &mut dictionary)?; + + assert_eq!(dictionary.len(), 0); + Ok(()) + } + + #[test] + fn test_process_dictionary_page_invalid_size() -> Result<()> { + // Create invalid dictionary data (incomplete i32) + let dict_data = vec![10u8, 0, 0]; // Only 3 bytes instead of 4 + + let dict_page = DictPage { + buffer: bytes::Bytes::from(dict_data), + num_values: 1, + is_sorted: false, + }; + + let mut dictionary = Vec::new(); + let result = process_dictionary_page::(&dict_page, &mut dictionary); + + assert!(result.is_err()); + Ok(()) + } + + #[test] + fn test_rle_indices_allocation_optimization() { + // This test verifies that our optimization to avoid zero initialization + // doesn't break the basic functionality. We can't directly test the + // performance improvement, but we can ensure correctness. + + let page_rows = 1000; + + // Create indices vector with our optimized allocation + let mut indices = Vec::with_capacity(page_rows); + unsafe { + indices.set_len(page_rows); + } + + // Verify the vector has the correct capacity and length + assert_eq!(indices.len(), page_rows); + assert!(indices.capacity() >= page_rows); + + // Verify we can write to all positions (this would crash if unsafe was wrong) + for i in 0..page_rows { + indices[i] = i as i32; + } + + // Verify the data was written correctly + for i in 0..page_rows { + assert_eq!(indices[i], i as i32); + } + } + + #[test] + fn test_dictionary_support_trait_consistency() { + // Test that all our types have consistent PHYSICAL_TYPE values + assert_eq!(i32::PHYSICAL_TYPE, PhysicalType::Int32); + assert_eq!(i64::PHYSICAL_TYPE, PhysicalType::Int64); + assert_eq!(Date::PHYSICAL_TYPE, PhysicalType::Int32); + + // Decimal types use FixedLenByteArray + assert_eq!(Decimal64::PHYSICAL_TYPE, PhysicalType::FixedLenByteArray(8)); + assert_eq!( + Decimal128::PHYSICAL_TYPE, + PhysicalType::FixedLenByteArray(16) + ); + assert_eq!( + Decimal256::PHYSICAL_TYPE, + PhysicalType::FixedLenByteArray(32) + ); + } + + #[test] + fn test_batch_dictionary_lookup_performance_pattern() -> Result<()> { + // Test the performance pattern we optimized: pre-allocation + direct assignment + let dictionary = vec![TestType(100), TestType(200), TestType(300)]; + let indices = vec![0i32, 1, 2, 0, 1, 2]; // 6 lookups + + // Pre-allocate output (our optimization) + let mut output = Vec::with_capacity(indices.len()); + unsafe { + output.set_len(indices.len()); + } + + // Perform batch lookup + TestType::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + + // Verify results + assert_eq!(output.len(), 6); + assert_eq!(output[0], TestType(100)); + assert_eq!(output[1], TestType(200)); + assert_eq!(output[2], TestType(300)); + assert_eq!(output[3], TestType(100)); + assert_eq!(output[4], TestType(200)); + assert_eq!(output[5], TestType(300)); + + Ok(()) + } +} diff --git a/src/common/experimental_parquet_reader/src/column/date.rs b/src/common/experimental_parquet_reader/src/column/date.rs new file mode 100644 index 0000000000000..0697aa7f628ba --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/date.rs @@ -0,0 +1,253 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::DictionarySupport; +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::column::number::IntegerMetadata; + +/// Date type alias for i32 (days since epoch) +#[derive(Clone, Copy, Debug, PartialEq)] +pub struct Date(i32); + +impl ParquetColumnType for Date { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int32; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Date(raw_data.into()) + } +} + +// ============================================================================= +// Dictionary Support Implementation +// ============================================================================= + +impl DictionarySupport for Date { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 4 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid Date dictionary entry length: expected 4, got {}", + entry.len() + ))); + } + + // Parquet stores dates as i32 in little-endian format + let bytes: [u8; 4] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to Date".to_string(), + ) + })?; + + Ok(Date(i32::from_le_bytes(bytes))) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + if let Some(&max_idx) = indices.iter().max() { + if max_idx as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + max_idx, + dictionary.len() + ))); + } + } + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +pub type DateIter<'a> = ParquetColumnIterator<'a, Date>; + +#[cfg(test)] +mod tests { + use databend_common_exception::Result; + + use super::*; + + #[test] + fn test_date_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let entry = [42u8, 0, 0, 0]; // 42 in little-endian (days since epoch) + let value = Date::from_dictionary_entry(&entry)?; + assert_eq!(value, Date(42)); + + // Test zero date (epoch) + let entry = [0u8, 0, 0, 0]; // 0 in little-endian + let value = Date::from_dictionary_entry(&entry)?; + assert_eq!(value, Date(0)); + + // Test large date value + let entry = [255u8, 255, 255, 127]; // i32::MAX in little-endian + let value = Date::from_dictionary_entry(&entry)?; + assert_eq!(value, Date(i32::MAX)); + + // Test invalid entry size + let entry = [42u8, 0, 0]; // Only 3 bytes + let result = Date::from_dictionary_entry(&entry); + assert!(result.is_err()); + assert!(result.unwrap_err().to_string().contains("Expected 4 bytes")); + + Ok(()) + } + + #[test] + fn test_date_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary with various dates + let dictionary = vec![ + Date(0), // 1970-01-01 (epoch) + Date(365), // 1971-01-01 (1 year later) + Date(730), // 1972-01-01 (2 years later) + Date(1095), // 1973-01-01 (3 years later) + Date(18262), // 2020-01-01 (50 years later) + ]; + + // Test normal indices + let indices = [0i32, 2, 4, 1, 3]; + let mut output = vec![Date(0); 5]; + + Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Date(0), + Date(730), + Date(18262), + Date(365), + Date(1095) + ]); + + // Test repeated indices + let indices = [4i32, 4, 4, 4]; // All point to 2020-01-01 + let mut output = vec![Date(0); 4]; + + Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Date(18262), + Date(18262), + Date(18262), + Date(18262) + ]); + + Ok(()) + } + + #[test] + fn test_date_bounds_checking() -> Result<()> { + let dictionary = vec![Date(0), Date(365), Date(730)]; // 3 dates + + // Test out of bounds index + let indices = [0i32, 3, 1]; // Index 3 is out of bounds + let mut output = vec![Date(0); 3]; + + let result = Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Dictionary index out of bounds")); + + // Test negative index (should be caught as out of bounds when cast to usize) + let indices = [0i32, -1, 1]; + let mut output = vec![Date(0); 3]; + + let result = Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_date_empty_cases() -> Result<()> { + // Test empty indices with non-empty dictionary + let dictionary = vec![Date(0), Date(365), Date(730)]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + // Test empty dictionary with empty indices + let dictionary: Vec = vec![]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + Ok(()) + } + + #[test] + fn test_date_mismatched_output_slice_length() -> Result<()> { + let dictionary = vec![Date(0), Date(365), Date(730)]; + let indices = [0i32, 1, 2]; + let mut output = vec![Date(0); 2]; // Output too small + + let result = Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Output slice length mismatch")); + + Ok(()) + } + + #[test] + fn test_date_physical_type() { + assert_eq!(Date::PHYSICAL_TYPE, PhysicalType::Int32); + } + + #[test] + fn test_date_edge_cases() -> Result<()> { + // Test with minimum i32 value (very old date) + let entry = [0u8, 0, 0, 128]; // i32::MIN in little-endian + let value = Date::from_dictionary_entry(&entry)?; + assert_eq!(value, Date(i32::MIN)); + + // Test dictionary with single element + let dictionary = vec![Date(12345)]; + let indices = [0i32]; + let mut output = vec![Date(0); 1]; + + Date::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![Date(12345)]); + + Ok(()) + } +} diff --git a/src/common/experimental_parquet_reader/src/column/decimal.rs b/src/common/experimental_parquet_reader/src/column/decimal.rs new file mode 100644 index 0000000000000..33915b15cdc23 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/decimal.rs @@ -0,0 +1,550 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Decimal column deserialization for Parquet data + +use databend_common_column::buffer::Buffer; +use databend_common_expression::types::i256; +use databend_common_expression::types::DecimalColumn; +use databend_common_expression::types::DecimalSize; +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::DictionarySupport; +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::reader::decompressor::Decompressor; + +// ============================================================================= +// Wrapper Types for Decimal Usage +// ============================================================================= + +/// Wrapper for i64 as Decimal64 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq)] +#[repr(transparent)] +pub struct Decimal64(pub i64); + +/// Wrapper for i128 as Decimal128 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq)] +#[repr(transparent)] +pub struct Decimal128(pub i128); + +/// Wrapper for i256 as Decimal256 - enables zero-cost transmute via #[repr(transparent)] +#[derive(Clone, Copy, Debug, PartialEq)] +#[repr(transparent)] +pub struct Decimal256(pub i256); + +#[derive(Clone)] +pub struct DecimalMetadata { + pub precision: u8, + pub scale: u8, +} + +impl ParquetColumnType for Decimal64 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int64; + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal64( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +impl ParquetColumnType for Decimal128 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::FixedLenByteArray(16); + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal128( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +impl ParquetColumnType for Decimal256 { + type Metadata = DecimalMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::FixedLenByteArray(32); + + fn create_column(data: Vec, metadata: &Self::Metadata) -> Column { + let decimal_size = DecimalSize::new_unchecked(metadata.precision, metadata.scale); + let raw_data: Vec = unsafe { std::mem::transmute(data) }; + Column::Decimal(DecimalColumn::Decimal256( + Buffer::from(raw_data), + decimal_size, + )) + } +} + +// ============================================================================= +// Dictionary Support Implementation +// ============================================================================= + +impl DictionarySupport for Decimal64 { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 8 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid Decimal64 dictionary entry length: expected 8, got {}", + entry.len() + ))); + } + + // Parquet stores integers in little-endian format + let bytes: [u8; 8] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to Decimal64".to_string(), + ) + })?; + + Ok(Decimal64(i64::from_le_bytes(bytes))) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + if let Some(&max_idx) = indices.iter().max() { + if max_idx as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + max_idx, + dictionary.len() + ))); + } + } + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +impl DictionarySupport for Decimal128 { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 16 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid Decimal128 dictionary entry length: expected 16, got {}", + entry.len() + ))); + } + + // Parquet stores integers in little-endian format + let bytes: [u8; 16] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to Decimal128".to_string(), + ) + })?; + + Ok(Decimal128(i128::from_le_bytes(bytes))) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + if let Some(&max_idx) = indices.iter().max() { + if max_idx as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + max_idx, + dictionary.len() + ))); + } + } + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +impl DictionarySupport for Decimal256 { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 32 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid Decimal256 dictionary entry length: expected 32, got {}", + entry.len() + ))); + } + + // Parquet stores integers in little-endian format + let bytes: [u8; 32] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to Decimal256".to_string(), + ) + })?; + + // Create i256 from bytes + let value = i256::from_le_bytes(bytes); + Ok(Decimal256(value)) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + if let Some(&max_idx) = indices.iter().max() { + if max_idx as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + max_idx, + dictionary.len() + ))); + } + } + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +// ============================================================================= +// Iterator Type Aliases +// ============================================================================= + +pub type DecimalIter<'a, T> = ParquetColumnIterator<'a, T>; + +// ============================================================================= +// Constructor Functions +// ============================================================================= + +/// Generic decimal iterator constructor +pub fn new_decimal_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter +where + T: ParquetColumnType + DictionarySupport, +{ + let metadata = DecimalMetadata { precision, scale }; + ParquetColumnIterator::new(pages, num_rows, is_nullable, metadata, chunk_size) +} + +pub fn new_decimal64_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} + +pub fn new_decimal128_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} + +pub fn new_decimal256_iter( + pages: Decompressor, + num_rows: usize, + precision: u8, + scale: u8, + is_nullable: bool, + chunk_size: Option, +) -> DecimalIter { + new_decimal_iter(pages, num_rows, precision, scale, is_nullable, chunk_size) +} + +#[cfg(test)] +mod tests { + use databend_common_exception::Result; + + use super::*; + + #[test] + fn test_decimal64_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let entry = [42u8, 0, 0, 0, 0, 0, 0, 0]; // 42 in little-endian + let value = Decimal64::from_dictionary_entry(&entry)?; + assert_eq!(value.0, 42); + + // Test negative number + let entry = [255u8, 255, 255, 255, 255, 255, 255, 255]; // -1 in little-endian + let value = Decimal64::from_dictionary_entry(&entry)?; + assert_eq!(value.0, -1); + + // Test invalid entry size + let entry = [42u8, 0, 0, 0, 0, 0, 0]; // Only 7 bytes + let result = Decimal64::from_dictionary_entry(&entry); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_decimal128_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let entry = [42u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // 42 in little-endian + let value = Decimal128::from_dictionary_entry(&entry)?; + assert_eq!(value.0, 42); + + // Test large number + let mut entry = [0u8; 16]; + entry[0] = 255; + entry[1] = 255; + entry[2] = 255; + entry[3] = 255; + entry[4] = 255; + entry[5] = 255; + entry[6] = 255; + entry[7] = 127; // i128::MAX lower 64 bits + let value = Decimal128::from_dictionary_entry(&entry)?; + assert_eq!(value.0, 9223372036854775807i128); + + // Test invalid entry size + let entry = [42u8, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]; // Only 15 bytes + let result = Decimal128::from_dictionary_entry(&entry); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_decimal256_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let mut entry = [0u8; 32]; + entry[0] = 42; // 42 in little-endian + let value = Decimal256::from_dictionary_entry(&entry)?; + let expected = i256::from_le_bytes(entry); + assert_eq!(value.0, expected); + + // Test all bytes set + let entry = [255u8; 32]; + let value = Decimal256::from_dictionary_entry(&entry)?; + let expected = i256::from_le_bytes(entry); + assert_eq!(value.0, expected); + + // Test invalid entry size + let entry = [42u8; 31]; // Only 31 bytes + let result = Decimal256::from_dictionary_entry(&entry); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_decimal64_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary + let dictionary = vec![ + Decimal64(100), + Decimal64(200), + Decimal64(300), + Decimal64(400), + Decimal64(500), + ]; + + // Test normal indices + let indices = [0i32, 2, 4, 1, 3]; + let mut output = vec![Decimal64(0); 5]; + + Decimal64::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Decimal64(100), + Decimal64(300), + Decimal64(500), + Decimal64(200), + Decimal64(400) + ]); + + // Test repeated indices + let indices = [1i32, 1, 1, 1]; + let mut output = vec![Decimal64(0); 4]; + + Decimal64::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Decimal64(200), + Decimal64(200), + Decimal64(200), + Decimal64(200) + ]); + + Ok(()) + } + + #[test] + fn test_decimal128_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary + let dictionary = vec![Decimal128(1000), Decimal128(2000), Decimal128(3000)]; + + // Test normal indices + let indices = [2i32, 0, 1, 2, 0]; + let mut output = vec![Decimal128(0); 5]; + + Decimal128::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Decimal128(3000), + Decimal128(1000), + Decimal128(2000), + Decimal128(3000), + Decimal128(1000) + ]); + + Ok(()) + } + + #[test] + fn test_decimal256_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary + let dictionary = vec![ + Decimal256(i256::from(10)), + Decimal256(i256::from(20)), + Decimal256(i256::from(30)), + ]; + + // Test normal indices + let indices = [1i32, 0, 2]; + let mut output = vec![Decimal256(i256::from(0)); 3]; + + Decimal256::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![ + Decimal256(i256::from(20)), + Decimal256(i256::from(10)), + Decimal256(i256::from(30)) + ]); + + Ok(()) + } + + #[test] + fn test_decimal_bounds_checking() -> Result<()> { + let dictionary = vec![Decimal64(10), Decimal64(20), Decimal64(30)]; + + // Test out of bounds index + let indices = [0i32, 3, 1]; // Index 3 is out of bounds + let mut output = vec![Decimal64(0); 3]; + + let result = + Decimal64::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Dictionary index out of bounds")); + + // Test negative index + let indices = [0i32, -1, 1]; + let mut output = vec![Decimal64(0); 3]; + + let result = + Decimal64::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_decimal_empty_cases() -> Result<()> { + // Test empty indices with non-empty dictionary + let dictionary = vec![Decimal128(10), Decimal128(20), Decimal128(30)]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + Decimal128::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + // Test empty dictionary with empty indices + let dictionary: Vec = vec![]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + Decimal128::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + Ok(()) + } + + #[test] + fn test_decimal_mismatched_output_slice_length() -> Result<()> { + let dictionary = vec![Decimal256(i256::from(10)), Decimal256(i256::from(20))]; + let indices = [0i32, 1, 0]; + let mut output = vec![Decimal256(i256::from(0)); 2]; // Output too small + + let result = + Decimal256::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Output slice length mismatch")); + + Ok(()) + } +} diff --git a/src/common/experimental_parquet_reader/src/column/mod.rs b/src/common/experimental_parquet_reader/src/column/mod.rs new file mode 100644 index 0000000000000..33f6233aedf4f --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/mod.rs @@ -0,0 +1,25 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod common; +mod date; +mod decimal; +mod number; +mod string; + +pub use date::*; +pub use decimal::*; +pub use number::IntegerMetadata; +pub use number::*; +pub use string::*; diff --git a/src/common/experimental_parquet_reader/src/column/number.rs b/src/common/experimental_parquet_reader/src/column/number.rs new file mode 100644 index 0000000000000..5b5ddc0099f63 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/number.rs @@ -0,0 +1,321 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_column::buffer::Buffer; +use databend_common_expression::types::Number; +use databend_common_expression::Column; +use parquet2::schema::types::PhysicalType; + +use crate::column::common::DictionarySupport; +use crate::column::common::ParquetColumnIterator; +use crate::column::common::ParquetColumnType; +use crate::reader::decompressor::Decompressor; + +#[derive(Clone, Copy)] +pub struct IntegerMetadata; + +impl ParquetColumnType for i32 { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int32; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + Column::Number(i32::upcast_column(Buffer::from(data))) + } +} + +impl ParquetColumnType for i64 { + type Metadata = IntegerMetadata; + const PHYSICAL_TYPE: PhysicalType = PhysicalType::Int64; + + fn create_column(data: Vec, _metadata: &Self::Metadata) -> Column { + Column::Number(i64::upcast_column(Buffer::from(data))) + } +} + +impl DictionarySupport for i64 { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 8 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid i64 dictionary entry length: expected 8, got {}", + entry.len() + ))); + } + + // Parquet stores integers in little-endian format + let bytes: [u8; 8] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to i64".to_string(), + ) + })?; + + Ok(i64::from_le_bytes(bytes)) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + // if let Some(&max_idx) = indices.iter().max() { + // if max_idx as usize >= dictionary.len() { + // return Err(databend_common_exception::ErrorCode::Internal(format!( + // "Dictionary index out of bounds: {} >= {}", + // max_idx, dictionary.len() + // ))); + // } + //} + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +impl DictionarySupport for i32 { + fn from_dictionary_entry(entry: &[u8]) -> databend_common_exception::Result { + if entry.len() != 4 { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Invalid i32 dictionary entry length: expected 4, got {}", + entry.len() + ))); + } + + // Parquet stores integers in little-endian format + let bytes: [u8; 4] = entry.try_into().map_err(|_| { + databend_common_exception::ErrorCode::Internal( + "Failed to convert bytes to i32".to_string(), + ) + })?; + + Ok(i32::from_le_bytes(bytes)) + } + + fn batch_from_dictionary_into_slice( + dictionary: &[Self], + indices: &[i32], + output: &mut [Self], + ) -> databend_common_exception::Result<()> { + // Validate output slice length + if output.len() != indices.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Output slice length ({}) doesn't match indices length ({})", + output.len(), + indices.len() + ))); + } + + // Batch bounds checking - find max index once + if let Some(&max_idx) = indices.iter().max() { + if max_idx as usize >= dictionary.len() { + return Err(databend_common_exception::ErrorCode::Internal(format!( + "Dictionary index out of bounds: {} >= {}", + max_idx, + dictionary.len() + ))); + } + } + + // Fast unchecked copy - all bounds verified above + for (i, &index) in indices.iter().enumerate() { + unsafe { + *output.get_unchecked_mut(i) = *dictionary.get_unchecked(index as usize); + } + } + + Ok(()) + } +} + +pub type Int32Iter<'a> = ParquetColumnIterator<'a, i32>; + +pub type Int64Iter<'a> = ParquetColumnIterator<'a, i64>; + +pub fn new_int32_iter( + pages: Decompressor, + num_rows: usize, + is_nullable: bool, + chunk_size: Option, +) -> Int32Iter { + ParquetColumnIterator::new(pages, num_rows, is_nullable, IntegerMetadata, chunk_size) +} + +pub fn new_int64_iter( + pages: Decompressor, + num_rows: usize, + is_nullable: bool, + chunk_size: Option, +) -> Int64Iter { + ParquetColumnIterator::new(pages, num_rows, is_nullable, IntegerMetadata, chunk_size) +} + +#[cfg(test)] +mod tests { + use databend_common_exception::Result; + + use super::*; + + #[test] + fn test_i32_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let entry = [42u8, 0, 0, 0]; // 42 in little-endian + let value = i32::from_dictionary_entry(&entry)?; + assert_eq!(value, 42); + + // Test negative number + let entry = [255u8, 255, 255, 255]; // -1 in little-endian + let value = i32::from_dictionary_entry(&entry)?; + assert_eq!(value, -1); + + // Test invalid entry size + let entry = [42u8, 0, 0]; // Only 3 bytes + let result = i32::from_dictionary_entry(&entry); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_i64_dictionary_support() -> Result<()> { + // Test from_dictionary_entry + let entry = [42u8, 0, 0, 0, 0, 0, 0, 0]; // 42 in little-endian + let value = i64::from_dictionary_entry(&entry)?; + assert_eq!(value, 42); + + // Test large number + let entry = [255u8, 255, 255, 255, 255, 255, 255, 127]; // i64::MAX + let value = i64::from_dictionary_entry(&entry)?; + assert_eq!(value, i64::MAX); + + // Test invalid entry size + let entry = [42u8, 0, 0, 0, 0, 0, 0]; // Only 7 bytes + let result = i64::from_dictionary_entry(&entry); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_i32_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary + let dictionary = vec![10i32, 20, 30, 40, 50]; + + // Test normal indices + let indices = [0i32, 2, 4, 1, 3]; + let mut output = vec![0i32; 5]; + + i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![10, 30, 50, 20, 40]); + + // Test repeated indices + let indices = [1i32, 1, 1, 1]; + let mut output = vec![0i32; 4]; + + i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![20, 20, 20, 20]); + + Ok(()) + } + + #[test] + fn test_i64_batch_from_dictionary_into_slice() -> Result<()> { + // Setup dictionary + let dictionary = vec![100i64, 200, 300]; + + // Test normal indices + let indices = [2i32, 0, 1, 2, 0]; + let mut output = vec![0i64; 5]; + + i64::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output, vec![300, 100, 200, 300, 100]); + + Ok(()) + } + + #[test] + fn test_batch_dictionary_bounds_checking() -> Result<()> { + let dictionary = vec![10i32, 20, 30]; + + // Test out of bounds index + let indices = [0i32, 3, 1]; // Index 3 is out of bounds + let mut output = vec![0i32; 3]; + + let result = i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Dictionary index out of bounds")); + + // Test negative index (should be caught as out of bounds when cast to usize) + let indices = [0i32, -1, 1]; + let mut output = vec![0i32; 3]; + + let result = i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + + Ok(()) + } + + #[test] + fn test_empty_dictionary_and_indices() -> Result<()> { + // Test empty indices with non-empty dictionary + let dictionary = vec![10i32, 20, 30]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + // Test empty dictionary with empty indices + let dictionary: Vec = vec![]; + let indices: [i32; 0] = []; + let mut output: Vec = vec![]; + + i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output)?; + assert_eq!(output.len(), 0); + + Ok(()) + } + + #[test] + fn test_mismatched_output_slice_length() -> Result<()> { + let dictionary = vec![10i32, 20, 30]; + let indices = [0i32, 1, 2]; + let mut output = vec![0i32; 2]; // Output too small + + let result = i32::batch_from_dictionary_into_slice(&dictionary, &indices, &mut output); + assert!(result.is_err()); + assert!(result + .unwrap_err() + .to_string() + .contains("Output slice length mismatch")); + + Ok(()) + } +} diff --git a/src/common/experimental_parquet_reader/src/column/string.rs b/src/common/experimental_parquet_reader/src/column/string.rs new file mode 100644 index 0000000000000..a97f6b22e2aa4 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/column/string.rs @@ -0,0 +1,869 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::arch::is_aarch64_feature_detected; + +use databend_common_column::binview::Utf8ViewColumn; +use databend_common_column::binview::View; +use databend_common_column::buffer::Buffer; +use databend_common_exception::ErrorCode; +use databend_common_expression::Column; +use parquet::encodings::rle::RleDecoder; +use parquet2::encoding::Encoding; +use parquet2::page::Page; +use parquet2::schema::types::PhysicalType; + +use crate::reader::decompressor::Decompressor; + +pub struct StringIter<'a> { + /// Page decompressor for reading Parquet pages + pages: Decompressor<'a>, + /// Optional chunk size for batched processing + chunk_size: Option, + /// Total number of rows to process + num_rows: usize, + /// Dictionary entries + dictionary: Option>>, + // Cached dictionary views + cached_dict_views: Option>, + cached_dict_lengths: Option>, + // Scratch buffer for rle decoding + rle_index_buffer: Option>, +} + +impl<'a> StringIter<'a> { + pub fn new(pages: Decompressor<'a>, num_rows: usize, chunk_size: Option) -> Self { + Self { + pages, + chunk_size, + num_rows, + dictionary: None, + cached_dict_views: None, + cached_dict_lengths: None, + rle_index_buffer: None, + } + } + + /// Process a dictionary page and store the dictionary entries + fn process_dictionary_page( + &mut self, + dict_page: &parquet2::page::DictPage, + ) -> Result<(), ErrorCode> { + assert!(self.dictionary.is_none()); + let mut dict_values = Vec::new(); + let mut offset = 0; + let buffer = &dict_page.buffer; + + while offset < buffer.len() { + if offset + 4 > buffer.len() { + return Err(ErrorCode::Internal( + "Invalid dictionary page: incomplete length prefix".to_string(), + )); + } + + let length = u32::from_le_bytes([ + buffer[offset], + buffer[offset + 1], + buffer[offset + 2], + buffer[offset + 3], + ]) as usize; + offset += 4; + + if offset + length > buffer.len() { + return Err(ErrorCode::Internal( + "Invalid dictionary page: string length exceeds buffer".to_string(), + )); + } + + dict_values.push(buffer[offset..offset + length].to_vec()); + offset += length; + } + + self.dictionary = Some(dict_values); + // Clear cached views when dictionary changes + self.cached_dict_views = None; + Ok(()) + } + + /// Create a View from a string slice, handling both inline and buffer storage + fn create_view_from_string( + string_data: &[u8], + page_bytes: &mut Vec, + page_offset: &mut usize, + buffer_index: u32, + ) -> View { + let len = string_data.len() as u32; + if len <= 12 { + // Inline small strings directly in the View + unsafe { + let mut payload = [0u8; 16]; + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + len as usize, + ); + std::mem::transmute::<[u8; 16], View>(payload) + } + } else { + // Store large strings in buffer and reference them + let current_offset = *page_offset; + // TODO use memcpy + page_bytes.extend_from_slice(string_data); + *page_offset += string_data.len(); + + unsafe { + let mut payload = [0u8; 16]; + // Length + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + // Prefix (first 4 bytes of string) + let prefix_len = std::cmp::min(4, string_data.len()); + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + prefix_len, + ); + // Buffer index + payload + .as_mut_ptr() + .add(8) + .cast::() + .write_unaligned(buffer_index.to_le()); + // Offset in buffer + payload + .as_mut_ptr() + .add(12) + .cast::() + .write_unaligned((current_offset as u32).to_le()); + + std::mem::transmute::<[u8; 16], View>(payload) + } + } + } + + /// Process plain encoded data page + fn process_plain_encoding( + &self, + values_buffer: &[u8], + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + let mut offset = 0; + let estimated_capacity = values_buffer.len(); + let mut page_bytes = Vec::with_capacity(estimated_capacity); + let mut page_offset = 0; + let buffer_index = buffers.len() as u32; + + for _ in 0..remaining { + if offset + 4 > values_buffer.len() { + return Err(ErrorCode::Internal( + "Invalid plain encoding: incomplete length prefix".to_string(), + )); + } + + let length = u32::from_le_bytes([ + values_buffer[offset], + values_buffer[offset + 1], + values_buffer[offset + 2], + values_buffer[offset + 3], + ]) as usize; + offset += 4; + + if offset + length > values_buffer.len() { + return Err(ErrorCode::Internal( + "Invalid plain encoding: string length exceeds buffer".to_string(), + )); + } + + let string_data = &values_buffer[offset..offset + length]; + let view = Self::create_view_from_string( + string_data, + &mut page_bytes, + &mut page_offset, + buffer_index, + ); + views.push(view); + *total_bytes_len += length; + offset += length; + } + + if !page_bytes.is_empty() { + buffers.push(Buffer::from(page_bytes)); + } + + Ok(()) + } + + /// Process RLE dictionary encoded data page with optimized paths for different scenarios. + fn process_rle_dictionary_encoding( + &mut self, + values_buffer: &[u8], + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + if values_buffer.is_empty() { + return Err(ErrorCode::Internal("Empty RLE dictionary data".to_string())); + } + + let bit_width = values_buffer[0]; + + // Clone dictionary to avoid borrowing issues + if let Some(dict) = self.dictionary.clone() { + // Check if we can use the optimized small string fast path + // TODO cache this + if self.can_use_small_string_fast_path(&dict) { + return self.process_small_string_fast_path( + &dict, + values_buffer, + bit_width, + remaining, + views, + total_bytes_len, + ); + } + } + + // General path for large dictionaries or mixed string sizes + self.process_general_rle_path( + values_buffer, + bit_width, + remaining, + views, + buffers, + total_bytes_len, + ) + } + + /// Check if dictionary qualifies for small string fast path optimization. + fn can_use_small_string_fast_path(&self, dict: &[Vec]) -> bool { + // TODO 16 is rather small? + dict.len() <= 16 && dict.iter().all(|s| s.len() <= 12) + } + + /// Process RLE dictionary encoding using the optimized small string fast path. + fn process_small_string_fast_path( + &mut self, + dict: &[Vec], + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + views.reserve_exact(remaining); + + if bit_width == 0 { + // Special case: all indices are 0, repeat dictionary[0] + return self.process_bit_width_zero(dict, remaining, views, total_bytes_len); + } + + // General small string case with RLE decoding + self.process_small_string_rle( + dict, + values_buffer, + bit_width, + remaining, + views, + total_bytes_len, + ) + } + + /// Handle the special case where bit_width=0 (all values are dictionary[0]). + fn process_bit_width_zero( + &self, + dict: &[Vec], + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + if dict.is_empty() { + return Err(ErrorCode::Internal( + "Empty dictionary for RLE dictionary encoding".to_string(), + )); + } + + let dict_entry = &dict[0]; + let inline_view = Self::create_inline_view(dict_entry); + + // TODO: Use slice::fill when available for better performance + for _ in 0..remaining { + views.push(inline_view); + *total_bytes_len += dict_entry.len(); + } + + Ok(()) + } + + /// Process small string RLE decoding with cached dictionary views. + fn process_small_string_rle( + &mut self, + dict: &[Vec], + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + // Create RLE decoder + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(&values_buffer[1..])); + + // Ensure dictionary views are cached + // TODO any better way? + self.ensure_dict_views_cached(dict); + let dict_views = self.cached_dict_views.as_ref().unwrap(); + + // Decode indices and populate views in single pass + let start_len = views.len(); + // TODO hotspot + // let mut indices = vec![0i32; remaining]; + + let indices: &mut Vec = if let Some(indices) = self.rle_index_buffer.as_mut() { + if indices.capacity() < remaining { + indices.reserve_exact(remaining - indices.capacity()); + } + indices + } else { + let indices: Vec = Vec::with_capacity(remaining); + self.rle_index_buffer = Some(indices); + self.rle_index_buffer.as_mut().unwrap() + }; + unsafe { + indices.set_len(remaining); + } + + let decoded_count = rle_decoder + .get_batch(indices) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode RLE indices: {}", e)))?; + if decoded_count != remaining { + return Err(ErrorCode::Internal(format!( + "RleDecoder returned wrong count: expected={}, got={}", + remaining, decoded_count + ))); + } + + let mut local_bytes_len = 0usize; + let mut chunks_4 = indices.chunks_exact(4); + let remainder_after_4 = chunks_4.remainder(); + let dict_views_len = dict_views.len(); + let dict_views_ptr = dict_views.as_ptr(); + let dict_lengths_ptr = self.cached_dict_lengths.as_ref().unwrap().as_ptr(); + + let mut i = 0; + + let mut copy_chunks_scalar = |chunks: &mut std::slice::ChunksExact<'_, i32>, + out_index: &mut usize| unsafe { + for chunk in chunks { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + *views.as_mut_ptr().add(start_len + *out_index) = *dict_views_ptr.add(idx1); + *views.as_mut_ptr().add(start_len + *out_index + 1) = *dict_views_ptr.add(idx2); + *views.as_mut_ptr().add(start_len + *out_index + 2) = *dict_views_ptr.add(idx3); + *views.as_mut_ptr().add(start_len + *out_index + 3) = *dict_views_ptr.add(idx4); + + *out_index += 4; + } + }; + + #[cfg(target_arch = "x86_64")] + { + if is_x86_feature_detected!("avx2") { + use std::arch::x86_64::*; + + for chunk in &mut chunks_4 { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + unsafe { + let view1 = _mm_loadu_si128(dict_views_ptr.add(idx1) as *const __m128i); + let view2 = _mm_loadu_si128(dict_views_ptr.add(idx2) as *const __m128i); + let view3 = _mm_loadu_si128(dict_views_ptr.add(idx3) as *const __m128i); + let view4 = _mm_loadu_si128(dict_views_ptr.add(idx4) as *const __m128i); + + let views_12 = _mm256_set_m128i(view2, view1); + let views_34 = _mm256_set_m128i(view4, view3); + + let output_12_ptr = views.as_mut_ptr().add(start_len + i) as *mut __m256i; + let output_34_ptr = + views.as_mut_ptr().add(start_len + i + 2) as *mut __m256i; + + _mm256_storeu_si256(output_12_ptr, views_12); + _mm256_storeu_si256(output_34_ptr, views_34); + } + i += 4; + } + } else if is_x86_feature_detected!("sse2") { + use std::arch::x86_64::*; + + for chunk in &mut chunks_4 { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + unsafe { + let view1 = _mm_load_si128(dict_views_ptr.add(idx1) as *const __m128i); + let view2 = _mm_load_si128(dict_views_ptr.add(idx2) as *const __m128i); + let view3 = _mm_load_si128(dict_views_ptr.add(idx3) as *const __m128i); + let view4 = _mm_load_si128(dict_views_ptr.add(idx4) as *const __m128i); + + _mm_store_si128( + views.as_mut_ptr().add(start_len + i) as *mut __m128i, + view1, + ); + _mm_store_si128( + views.as_mut_ptr().add(start_len + i + 1) as *mut __m128i, + view2, + ); + _mm_store_si128( + views.as_mut_ptr().add(start_len + i + 2) as *mut __m128i, + view3, + ); + _mm_store_si128( + views.as_mut_ptr().add(start_len + i + 3) as *mut __m128i, + view4, + ); + } + + i += 4; + } + } else { + copy_chunks_scalar(&mut chunks_4, &mut i); + } + } + + #[cfg(target_arch = "aarch64")] + { + if is_aarch64_feature_detected!("neon") { + use std::arch::aarch64::*; + + for chunk in &mut chunks_4 { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + unsafe { + let view1 = vld1q_u8(dict_views_ptr.add(idx1) as *const u8); + let view2 = vld1q_u8(dict_views_ptr.add(idx2) as *const u8); + let view3 = vld1q_u8(dict_views_ptr.add(idx3) as *const u8); + let view4 = vld1q_u8(dict_views_ptr.add(idx4) as *const u8); + + vst1q_u8(views.as_mut_ptr().add(start_len + i) as *mut u8, view1); + vst1q_u8(views.as_mut_ptr().add(start_len + i + 1) as *mut u8, view2); + vst1q_u8(views.as_mut_ptr().add(start_len + i + 2) as *mut u8, view3); + vst1q_u8(views.as_mut_ptr().add(start_len + i + 3) as *mut u8, view4); + } + + i += 4; + } + } else { + copy_chunks_scalar(&mut chunks_4, &mut i); + } + } + + #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))] + { + copy_chunks_scalar(&mut chunks_4, &mut i); + } + + // Handle remaining elements with 2x unrolling + let pairs = remainder_after_4.chunks_exact(2); + let final_remainder = pairs.remainder(); + + for pair in pairs { + let dict_idx1 = pair[0] as usize; + let dict_idx2 = pair[1] as usize; + + debug_assert!(dict_idx1 < dict_views_len); + debug_assert!(dict_idx2 < dict_views_len); + + unsafe { + *views.as_mut_ptr().add(start_len + i) = *dict_views_ptr.add(dict_idx1); + *views.as_mut_ptr().add(start_len + i + 1) = *dict_views_ptr.add(dict_idx2); + } + + i += 2; + } + + // Handle final single element if any + if let [index] = final_remainder { + let dict_idx = *index as usize; + debug_assert!(dict_idx < dict_views_len); + + unsafe { + *views.as_mut_ptr().add(start_len + i) = *dict_views_ptr.add(dict_idx); + } + } + + // Use SIMD to accumulate lengths - collect indices first to avoid iterator move issues + let chunk_indices: Vec<_> = indices.chunks_exact(4).collect(); + + let mut sum_chunk_lengths_scalar = |chunks: &[&[i32]]| unsafe { + for chunk in chunks { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + local_bytes_len += *dict_lengths_ptr.add(idx1) as usize; + local_bytes_len += *dict_lengths_ptr.add(idx2) as usize; + local_bytes_len += *dict_lengths_ptr.add(idx3) as usize; + local_bytes_len += *dict_lengths_ptr.add(idx4) as usize; + } + }; + + #[cfg(target_arch = "x86_64")] + { + if is_x86_feature_detected!("avx2") && !chunk_indices.is_empty() { + use std::arch::x86_64::*; + + unsafe { + let mut sum = _mm256_setzero_si256(); + for chunk in &chunk_indices { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + let len1 = *dict_lengths_ptr.add(idx1) as i32; + let len2 = *dict_lengths_ptr.add(idx2) as i32; + let len3 = *dict_lengths_ptr.add(idx3) as i32; + let len4 = *dict_lengths_ptr.add(idx4) as i32; + + let lens = _mm_set_epi32(len4, len3, len2, len1); + sum = _mm256_add_epi32(sum, _mm256_castsi128_si256(lens)); + } + + let mut sum_arr = [0i32; 8]; + _mm256_store_si256(sum_arr.as_mut_ptr() as *mut __m256i, sum); + for &x in &sum_arr { + local_bytes_len += x as usize; + } + } + } else { + sum_chunk_lengths_scalar(&chunk_indices); + } + } + + #[cfg(target_arch = "aarch64")] + { + if is_aarch64_feature_detected!("neon") && !chunk_indices.is_empty() { + use std::arch::aarch64::*; + + unsafe { + let mut sum_vec = vdupq_n_u32(0); + for chunk in &chunk_indices { + let idx1 = chunk[0] as usize; + let idx2 = chunk[1] as usize; + let idx3 = chunk[2] as usize; + let idx4 = chunk[3] as usize; + + let lens = [ + *dict_lengths_ptr.add(idx1) as u32, + *dict_lengths_ptr.add(idx2) as u32, + *dict_lengths_ptr.add(idx3) as u32, + *dict_lengths_ptr.add(idx4) as u32, + ]; + let lens_vec = vld1q_u32(lens.as_ptr()); + sum_vec = vaddq_u32(sum_vec, lens_vec); + } + + let mut sum_arr = [0u32; 4]; + vst1q_u32(sum_arr.as_mut_ptr(), sum_vec); + for &x in &sum_arr { + local_bytes_len += x as usize; + } + } + } else { + sum_chunk_lengths_scalar(&chunk_indices); + } + } + + #[cfg(not(any(target_arch = "x86_64", target_arch = "aarch64")))] + { + sum_chunk_lengths_scalar(&chunk_indices); + } + + // Handle remaining pairs + let pair_indices: Vec<_> = remainder_after_4.chunks_exact(2).collect(); + for pair in &pair_indices { + let dict_idx1 = pair[0] as usize; + let dict_idx2 = pair[1] as usize; + + debug_assert!(dict_idx1 < dict_views_len); + debug_assert!(dict_idx2 < dict_views_len); + + unsafe { + local_bytes_len += *dict_lengths_ptr.add(dict_idx1) as usize; + local_bytes_len += *dict_lengths_ptr.add(dict_idx2) as usize; + } + } + + // Handle final single element + let final_remainder = remainder_after_4.chunks_exact(2).remainder(); + if let [index] = final_remainder { + let dict_idx = *index as usize; + debug_assert!(dict_idx < dict_views_len); + + unsafe { + local_bytes_len += *dict_lengths_ptr.add(dict_idx) as usize; + } + } + *total_bytes_len += local_bytes_len; + + unsafe { + views.set_len(start_len + remaining); + } + + Ok(()) + } + + /// Ensure dictionary views are cached for the current dictionary. + fn ensure_dict_views_cached(&mut self, dict: &[Vec]) { + if self.cached_dict_views.is_none() { + self.cached_dict_views = Some( + dict.iter() + .map(|s| Self::create_inline_view(s)) + .collect::>(), + ); + + // Working on small strings, u8 is enough for lengths + let lengths: Vec = dict.iter().map(|s| s.len() as u8).collect(); + self.cached_dict_lengths = Some(lengths); + } + } + + /// Process RLE dictionary encoding using the general path for large dictionaries. + fn process_general_rle_path( + &mut self, + values_buffer: &[u8], + bit_width: u8, + remaining: usize, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + // Create new RleDecoder for general path + let mut rle_decoder = RleDecoder::new(bit_width); + rle_decoder.set_data(bytes::Bytes::copy_from_slice(&values_buffer[1..])); + + if let Some(ref dict) = self.dictionary { + // Initialize buffer management variables for general case + let mut page_bytes = Vec::new(); + let mut page_offset = 0; + let buffer_index = buffers.len() as u32; + + // Decode indices and process each one + let mut indices = vec![0i32; remaining]; + let decoded_count = rle_decoder + .get_batch(&mut indices) + .map_err(|e| ErrorCode::Internal(format!("Failed to decode RLE indices: {}", e)))?; + + if decoded_count != remaining { + return Err(ErrorCode::Internal(format!( + "RleDecoder returned wrong count: expected={}, got={}", + remaining, decoded_count + ))); + } + + // Process each index and create views + for &index in &indices { + let dict_idx = index as usize; + if dict_idx >= dict.len() { + return Err(ErrorCode::Internal(format!( + "Dictionary index {} out of bounds (dictionary size: {})", + dict_idx, + dict.len() + ))); + } + + let string_data = &dict[dict_idx]; + let view = Self::create_view_from_string( + string_data, + &mut page_bytes, + &mut page_offset, + buffer_index, + ); + views.push(view); + *total_bytes_len += string_data.len(); + } + + // Add buffer if any data was written + if !page_bytes.is_empty() { + buffers.push(Buffer::from(page_bytes)); + } + } else { + return Err(ErrorCode::Internal( + "No dictionary found for RLE dictionary encoding".to_string(), + )); + } + + Ok(()) + } + + /// Create an inline View for small strings (≤12 bytes) with maximum performance. + fn create_inline_view(string_data: &[u8]) -> View { + debug_assert!( + string_data.len() <= 12, + "create_inline_view called with string longer than 12 bytes" + ); + + unsafe { + let mut payload = [0u8; 16]; + let len = string_data.len() as u32; + + // Write length prefix (little-endian) + payload + .as_mut_ptr() + .cast::() + .write_unaligned(len.to_le()); + + // Copy string data directly + std::ptr::copy_nonoverlapping( + string_data.as_ptr(), + payload.as_mut_ptr().add(4), + len as usize, + ); + + // Convert to View with zero cost + std::mem::transmute::<[u8; 16], View>(payload) + } + } + + /// Process a data page based on its encoding type. + fn process_data_page( + &mut self, + data_page: &parquet2::page::DataPage, + views: &mut Vec, + buffers: &mut Vec>, + total_bytes_len: &mut usize, + ) -> Result<(), ErrorCode> { + let (_, _, values_buffer) = parquet2::page::split_buffer(data_page) + .map_err(|e| ErrorCode::StorageOther(format!("Failed to split buffer: {}", e)))?; + let remaining = data_page.num_values(); + + match data_page.encoding() { + Encoding::Plain => self.process_plain_encoding( + values_buffer, + remaining, + views, + buffers, + total_bytes_len, + ), + Encoding::RleDictionary | Encoding::PlainDictionary => self + .process_rle_dictionary_encoding( + values_buffer, + remaining, + views, + buffers, + total_bytes_len, + ), + _ => Err(ErrorCode::Internal(format!( + "Unsupported encoding for string column: {:?}", + data_page.encoding() + ))), + } + } +} + +impl<'a> Iterator for StringIter<'a> { + type Item = Result; + + fn next(&mut self) -> Option { + if self.num_rows == 0 { + return None; + } + + // let chunk_size = self.chunk_size.unwrap_or(self.num_rows); + // let limit = std::cmp::min(chunk_size, self.num_rows); + let limit = self.chunk_size.unwrap_or(self.num_rows); + + let mut views = Vec::with_capacity(limit); + let mut buffers = Vec::new(); + let mut total_bytes_len = 0; + let mut processed_rows = 0; + + while processed_rows < limit { + let page = match self.pages.next_owned() { + Ok(Some(page)) => page, + Ok(None) => break, + Err(e) => return Some(Err(ErrorCode::StorageOther(e.to_string()))), + }; + + match page { + Page::Data(data_page) => { + if data_page.descriptor.primitive_type.physical_type != PhysicalType::ByteArray + { + return Some(Err(ErrorCode::Internal( + "Expected ByteArray type for string column".to_string(), + ))); + } + + let remaining_in_chunk = limit - processed_rows; + let page_rows = std::cmp::min(data_page.num_values(), remaining_in_chunk); + + if let Err(e) = self.process_data_page( + &data_page, + &mut views, + &mut buffers, + &mut total_bytes_len, + ) { + return Some(Err(e)); + } + + processed_rows += page_rows; + } + Page::Dict(dict_page) => { + if let Err(e) = self.process_dictionary_page(&dict_page) { + return Some(Err(e)); + } + } + } + } + + if processed_rows == 0 { + return None; + } + + self.num_rows -= processed_rows; + + // Calculate total buffer length for new_unchecked + let total_buffer_len = buffers.iter().map(|b| b.len()).sum(); + + let column = Utf8ViewColumn::new_unchecked( + views.into(), + buffers.into(), + Some(total_bytes_len), + Some(total_buffer_len), + ); + + Some(Ok(Column::String(column))) + } +} diff --git a/src/common/experimental_parquet_reader/src/lib.rs b/src/common/experimental_parquet_reader/src/lib.rs new file mode 100644 index 0000000000000..6809c8fe78d4a --- /dev/null +++ b/src/common/experimental_parquet_reader/src/lib.rs @@ -0,0 +1,23 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Direct deserialization from parquet to DataBlock + +mod column; +mod util; + +mod reader; + +pub use reader::column_reader::*; +pub use util::*; diff --git a/src/common/experimental_parquet_reader/src/reader/column_reader.rs b/src/common/experimental_parquet_reader/src/reader/column_reader.rs new file mode 100644 index 0000000000000..e8326114df302 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/column_reader.rs @@ -0,0 +1,164 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::types::DecimalDataType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::Column; +use databend_common_expression::TableDataType; +use databend_common_expression::TableField; +use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::Compression; +use parquet2::compression::Compression as ParquetCompression; +use parquet2::metadata::Descriptor; +use parquet2::read::PageMetaData; +use parquet2::schema::types::PhysicalType; +use parquet2::schema::types::PrimitiveType; + +use crate::column::new_decimal128_iter; +use crate::column::new_decimal256_iter; +use crate::column::new_decimal64_iter; +use crate::column::new_int32_iter; +use crate::column::new_int64_iter; +use crate::column::DateIter; +use crate::column::IntegerMetadata; +use crate::column::StringIter; +use crate::reader::decompressor::Decompressor; +use crate::reader::page_reader::PageReader; + +pub type ColumnIter<'a> = Box> + Send + Sync + 'a>; + +pub fn data_chunk_to_col_iter<'a>( + meta: &ColumnMeta, + chunk: &'a [u8], + rows: usize, + column_descriptor: &Descriptor, + field: TableField, + compression: &Compression, +) -> Result> { + let pages = { + let meta = meta.as_parquet().unwrap(); + let page_meta_data = PageMetaData { + column_start: meta.offset, + num_values: meta.num_values as i64, + compression: to_parquet_compression(compression)?, + descriptor: (*column_descriptor).clone(), + }; + let pages = PageReader::new_with_page_meta(chunk, page_meta_data, usize::MAX); + Decompressor::new(pages, vec![]) + }; + + let typ = &column_descriptor.primitive_type; + + pages_to_column_iter(pages, typ, field, rows, None) +} + +fn pages_to_column_iter<'a>( + column: Decompressor<'a>, + types: &PrimitiveType, + field: TableField, + num_rows: usize, + chunk_size: Option, +) -> Result> { + let pages = column; + let parquet_physical_type = &types.physical_type; + + let (inner_data_type, is_nullable) = match &field.data_type { + TableDataType::Nullable(inner) => (inner.as_ref(), true), + other => (other, false), + }; + + match (parquet_physical_type, inner_data_type) { + (PhysicalType::Int32, TableDataType::Number(NumberDataType::Int32)) => { + Ok(Box::new(new_int32_iter(pages, num_rows, is_nullable, chunk_size))) + } + (PhysicalType::Int64, TableDataType::Number(NumberDataType::Int64)) => { + Ok(Box::new(new_int64_iter(pages, num_rows, is_nullable, chunk_size))) + } + (PhysicalType::ByteArray, TableDataType::String) => { + Ok(Box::new(StringIter::new(pages, num_rows, chunk_size))) + } + (PhysicalType::Int32, TableDataType::Decimal(DecimalDataType::Decimal64(_))) => { + unimplemented!("coming soon") + } + (PhysicalType::Int64, TableDataType::Decimal(DecimalDataType::Decimal64(decimal_size))) => { + Ok(Box::new(new_decimal64_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + // TODO: arrow 55.1.0 does not support Decimal64 yet, so we use Decimal128, but the storage format is Int64 + (PhysicalType::Int64, TableDataType::Decimal(DecimalDataType::Decimal128(decimal_size))) => { + Ok(Box::new(new_decimal64_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::FixedLenByteArray(_), TableDataType::Decimal(DecimalDataType::Decimal128(decimal_size))) => { + Ok(Box::new(new_decimal128_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::FixedLenByteArray(_), TableDataType::Decimal(DecimalDataType::Decimal256(decimal_size))) => { + Ok(Box::new(new_decimal256_iter( + pages, + num_rows, + decimal_size.precision(), + decimal_size.scale(), + is_nullable, + chunk_size, + ))) + } + (PhysicalType::Int32, TableDataType::Date) => { + Ok(Box::new(DateIter::new( + pages, + num_rows, + is_nullable, + IntegerMetadata, + chunk_size, + ))) + } + (physical_type, table_data_type) => Err(ErrorCode::StorageOther(format!( + "Unsupported combination: parquet_physical_type={:?}, field_data_type={:?}, nullable={}", + physical_type, table_data_type, is_nullable + ))), + } +} + +fn to_parquet_compression(meta_compression: &Compression) -> Result { + match meta_compression { + Compression::Lz4 => Err(ErrorCode::StorageOther( + "Legacy compression algorithm [Lz4] is no longer supported.", + )), + Compression::Lz4Raw => Ok(ParquetCompression::Lz4Raw), + Compression::Snappy => Ok(ParquetCompression::Snappy), + Compression::Zstd => Ok(ParquetCompression::Zstd), + Compression::Gzip => Ok(ParquetCompression::Gzip), + Compression::None => Ok(ParquetCompression::Uncompressed), + } +} diff --git a/src/common/experimental_parquet_reader/src/reader/decompressor.rs b/src/common/experimental_parquet_reader/src/reader/decompressor.rs new file mode 100644 index 0000000000000..d768378921a0f --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/decompressor.rs @@ -0,0 +1,233 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Decompressor that integrates with zero-copy PageReader + +use parquet2::compression::Compression; +use parquet2::error::Error; +use parquet2::page::DataPage; +use parquet2::page::DataPageHeader; +use parquet2::page::DataPageHeaderV2; +use parquet2::page::DictPage; +use parquet2::page::Page; +use parquet2::FallibleStreamingIterator; + +use crate::reader::page_reader::PageReader; +use crate::reader::pages::BorrowedCompressedDataPage; +use crate::reader::pages::BorrowedCompressedDictPage; +use crate::reader::pages::BorrowedCompressedPage; + +pub struct Decompressor<'a> { + page_reader: PageReader<'a>, + decompression_buffer: Vec, + current_page: Option, + was_decompressed: bool, +} + +impl<'a> Decompressor<'a> { + pub fn new(page_reader: PageReader<'a>, decompression_buffer: Vec) -> Self { + Self { + page_reader, + decompression_buffer, + current_page: None, + was_decompressed: false, + } + } + + fn decompress_borrowed_page( + compressed_page: BorrowedCompressedPage<'_>, + uncompressed_buffer: &mut Vec, + ) -> parquet2::error::Result { + match compressed_page { + BorrowedCompressedPage::Data(compressed_data_page) => { + let BorrowedCompressedDataPage { + header, + buffer, + compression, + uncompressed_page_size, + descriptor, + } = *compressed_data_page; + + uncompressed_buffer.clear(); + uncompressed_buffer.reserve(uncompressed_page_size); + unsafe { + uncompressed_buffer.set_len(uncompressed_page_size); + } + + match &header { + DataPageHeader::V2(v2) => decode_data_page_v2( + buffer, + compression, + v2, + uncompressed_buffer.as_mut_slice(), + )?, + _ => { + decode_full_slice(buffer, compression, uncompressed_buffer.as_mut_slice())? + } + } + + Ok(Page::Data(DataPage::new( + header, + std::mem::take(uncompressed_buffer), + descriptor, + None, + ))) + } + BorrowedCompressedPage::Dict(dict_page) => { + let BorrowedCompressedDictPage { + buffer, + compression, + uncompressed_page_size, + num_values, + is_sorted, + } = dict_page; + + uncompressed_buffer.clear(); + uncompressed_buffer.reserve(uncompressed_page_size); + unsafe { + uncompressed_buffer.set_len(uncompressed_page_size); + } + decode_full_slice(buffer, compression, uncompressed_buffer.as_mut_slice())?; + + Ok(Page::Dict(DictPage::new( + std::mem::take(uncompressed_buffer), + num_values, + is_sorted, + ))) + } + } + } + + pub fn next_owned(&mut self) -> Result, Error> { + let page_tuple = self.page_reader.next_page()?; + + if let Some(page) = page_tuple { + self.was_decompressed = page.compression() != Compression::Uncompressed; + + let decompress_page = + Self::decompress_borrowed_page(page, &mut self.decompression_buffer)?; + + Ok(Some(decompress_page)) + } else { + Ok(None) + } + } +} + +fn decode_full_slice( + src: &[u8], + compression: Compression, + dst: &mut [u8], +) -> parquet2::error::Result<()> { + match compression { + Compression::Uncompressed => { + if src.len() != dst.len() { + return Err(Error::OutOfSpec(format!( + "Uncompressed data length mismatch: src={}, dst={}", + src.len(), + dst.len() + ))); + } + dst.copy_from_slice(src); + Ok(()) + } + Compression::Lz4Raw => lz4_flex::decompress_into(src, dst) + .map(|_| ()) + .map_err(|e| Error::OutOfSpec(format!("LZ4 decompression failed: {}", e))), + Compression::Zstd => zstd::bulk::decompress_to_buffer(src, dst) + .map(|_| ()) + .map_err(|e| Error::OutOfSpec(format!("Zstd decompression failed: {}", e))), + other => Err(Error::FeatureNotSupported(format!( + "Compression {:?} not supported", + other + ))), + } +} + +fn decode_data_page_v2( + buffer: &[u8], + compression: Compression, + header: &DataPageHeaderV2, + dst: &mut [u8], +) -> parquet2::error::Result<()> { + let rep_len = usize_from_i32(header.repetition_levels_byte_length, "repetition")?; + let def_len = usize_from_i32(header.definition_levels_byte_length, "definition")?; + let prefix_len = rep_len + .checked_add(def_len) + .ok_or_else(|| Error::OutOfSpec("Level bytes length overflow".to_string()))?; + + if buffer.len() < prefix_len || dst.len() < prefix_len { + return Err(Error::OutOfSpec( + "Data page v2 prefix longer than buffers".to_string(), + )); + } + + dst[..rep_len].copy_from_slice(&buffer[..rep_len]); + dst[rep_len..prefix_len].copy_from_slice(&buffer[rep_len..rep_len + def_len]); + + let values_src = &buffer[prefix_len..]; + let values_dst = &mut dst[prefix_len..]; + + if header.is_compressed.unwrap_or(true) { + decode_full_slice(values_src, compression, values_dst) + } else { + if values_dst.len() != values_src.len() { + return Err(Error::OutOfSpec( + "Uncompressed data page v2 values length mismatch".to_string(), + )); + } + values_dst.copy_from_slice(values_src); + Ok(()) + } +} + + +// TODO do we need this? value:i32 should not be negative according to spec +fn usize_from_i32(value: i32, field: &str) -> parquet2::error::Result { + usize::try_from(value).map_err(|_| { + Error::OutOfSpec(format!( + "Negative {} levels byte length in data page header v2", + field + )) + }) +} + +impl<'a> FallibleStreamingIterator for Decompressor<'a> { + type Item = Page; + type Error = Error; + + fn advance(&mut self) -> Result<(), Self::Error> { + self.current_page = None; + let page_tuple = self.page_reader.next_page()?; + + if let Some(page) = page_tuple { + self.was_decompressed = page.compression() != Compression::Uncompressed; + + let decompress_page = + Self::decompress_borrowed_page(page, &mut self.decompression_buffer)?; + + self.current_page = Some(decompress_page); + } + + Ok(()) + } + + fn get(&self) -> Option<&Self::Item> { + self.current_page.as_ref() + } + + fn size_hint(&self) -> (usize, Option) { + (0, None) + } +} diff --git a/src/common/experimental_parquet_reader/src/reader/mod.rs b/src/common/experimental_parquet_reader/src/reader/mod.rs new file mode 100644 index 0000000000000..03cb9979b3337 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/mod.rs @@ -0,0 +1,18 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +pub mod column_reader; +pub mod decompressor; +pub mod page_reader; +pub mod pages; diff --git a/src/common/experimental_parquet_reader/src/reader/page_reader.rs b/src/common/experimental_parquet_reader/src/reader/page_reader.rs new file mode 100644 index 0000000000000..8e5a51fb5b242 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/page_reader.rs @@ -0,0 +1,178 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use parquet2::compression::Compression; +use parquet2::encoding::Encoding; +use parquet2::error::Error; +use parquet2::metadata::Descriptor; +use parquet2::page::DataPageHeader; +use parquet2::page::PageType; +use parquet2::page::ParquetPageHeader; +use parquet2::read::PageMetaData; +use parquet_format_safe::thrift::protocol::TCompactInputProtocol; + +use crate::reader::pages::BorrowedCompressedDataPage; +use crate::reader::pages::BorrowedCompressedDictPage; +use crate::reader::pages::BorrowedCompressedPage; + +/// "Zero-copy" Parquet page reader +pub struct PageReader<'a> { + raw_data_slice: &'a [u8], + compression: Compression, + seen_num_values: i64, + total_num_values: i64, + descriptor: Descriptor, + max_page_size: usize, +} + +impl<'a> PageReader<'a> { + pub fn new_with_page_meta( + raw_data: &'a [u8], + reader_meta: PageMetaData, + max_page_size: usize, + ) -> Self { + Self { + raw_data_slice: raw_data, + total_num_values: reader_meta.num_values, + compression: reader_meta.compression, + seen_num_values: 0, + descriptor: reader_meta.descriptor, + max_page_size, + } + } + + pub fn next_page(&mut self) -> parquet2::error::Result> { + if self.seen_num_values >= self.total_num_values { + return Ok(None); + }; + + let page_header = + read_page_header_from_slice(&mut self.raw_data_slice, self.max_page_size)?; + + self.seen_num_values += get_page_header(&page_header)? + .map(|x| x.num_values() as i64) + .unwrap_or_default(); + + let read_size: usize = page_header.compressed_page_size.try_into()?; + + if read_size > self.max_page_size { + return Err(Error::WouldOverAllocate); + } + + if self.raw_data_slice.len() < read_size { + return Err(Error::OutOfSpec( + "Not enough data in slice for page".to_string(), + )); + } + + let data_slice = &self.raw_data_slice[..read_size]; + self.raw_data_slice = &self.raw_data_slice[read_size..]; + + match page_header.type_.try_into()? { + PageType::DataPage => { + let header = page_header.data_page_header.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 data header is empty" + .to_string(), + ) + })?; + Ok(Some(BorrowedCompressedPage::new_data_page( + BorrowedCompressedDataPage::new( + DataPageHeader::V1(header), + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + self.descriptor.clone(), + ), + ))) + } + PageType::DataPageV2 => { + let header = page_header.data_page_header_v2.ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v2 data page but the v2 data header is empty" + .to_string(), + ) + })?; + Ok(Some(BorrowedCompressedPage::new_data_page( + BorrowedCompressedDataPage::new( + DataPageHeader::V2(header), + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + self.descriptor.clone(), + ), + ))) + } + PageType::DictionaryPage => { + let dict_header = page_header.dictionary_page_header.as_ref().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a dictionary page but the dictionary header is empty".to_string(), + ) + })?; + let num_values = dict_header.num_values.try_into()?; + let is_sorted = dict_header.is_sorted.unwrap_or(false); + + Ok(Some(BorrowedCompressedPage::Dict( + BorrowedCompressedDictPage::new( + data_slice, + self.compression, + page_header.uncompressed_page_size.try_into()?, + num_values, + is_sorted, + ), + ))) + } + } + } +} + +fn read_page_header_from_slice( + reader: &mut &[u8], + max_size: usize, +) -> parquet2::error::Result { + let mut prot = TCompactInputProtocol::new(reader, max_size); + let page_header = ParquetPageHeader::read_from_in_protocol(&mut prot)?; + Ok(page_header) +} + +pub(crate) fn get_page_header( + header: &ParquetPageHeader, +) -> parquet2::error::Result> { + let type_ = header.type_.try_into()?; + Ok(match type_ { + PageType::DataPage => { + let header = header.data_page_header.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + + let _: Encoding = header.encoding.try_into()?; + let _: Encoding = header.repetition_level_encoding.try_into()?; + let _: Encoding = header.definition_level_encoding.try_into()?; + + Some(DataPageHeader::V1(header)) + } + PageType::DataPageV2 => { + let header = header.data_page_header_v2.clone().ok_or_else(|| { + Error::OutOfSpec( + "The page header type is a v1 data page but the v1 header is empty".to_string(), + ) + })?; + let _: Encoding = header.encoding.try_into()?; + Some(DataPageHeader::V2(header)) + } + _ => None, + }) +} diff --git a/src/common/experimental_parquet_reader/src/reader/pages.rs b/src/common/experimental_parquet_reader/src/reader/pages.rs new file mode 100644 index 0000000000000..c78ea186acd75 --- /dev/null +++ b/src/common/experimental_parquet_reader/src/reader/pages.rs @@ -0,0 +1,122 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//! Borrowed page types for zero-copy parquet reading +//! +//! This module provides page types that reference data without owning it, +//! enabling "zero-copy" reading when the source is a slice. + +use parquet2::compression::Compression; +use parquet2::metadata::Descriptor; +use parquet2::page::DataPageHeader; + +/// A compressed page that borrows its data from a slice +#[derive(Debug)] +pub enum BorrowedCompressedPage<'a> { + Data(Box>), + Dict(BorrowedCompressedDictPage<'a>), +} + +impl<'a> BorrowedCompressedPage<'a> { + pub fn new_data_page(data_page: BorrowedCompressedDataPage<'a>) -> Self { + Self::Data(Box::new(data_page)) + } +} + +/// A borrowed compressed data page +#[derive(Debug)] +pub struct BorrowedCompressedDataPage<'a> { + pub header: DataPageHeader, + pub buffer: &'a [u8], + pub compression: Compression, + pub uncompressed_page_size: usize, + pub descriptor: Descriptor, +} + +/// A borrowed compressed dictionary page +#[derive(Debug)] +pub struct BorrowedCompressedDictPage<'a> { + pub buffer: &'a [u8], + pub compression: Compression, + pub uncompressed_page_size: usize, + pub num_values: usize, + pub is_sorted: bool, +} + +impl<'a> BorrowedCompressedPage<'a> { + /// Get the compression type of this page + pub fn compression(&self) -> Compression { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.compression, + BorrowedCompressedPage::Dict(dict_page) => dict_page.compression, + } + } + + /// Check if this page is compressed + pub fn is_compressed(&self) -> bool { + self.compression() != Compression::Uncompressed + } + + /// Get the uncompressed size of this page + pub fn uncompressed_size(&self) -> usize { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.uncompressed_page_size, + BorrowedCompressedPage::Dict(dict_page) => dict_page.uncompressed_page_size, + } + } + + /// Get the compressed data as a slice + pub fn data(&self) -> &[u8] { + match self { + BorrowedCompressedPage::Data(data_page) => data_page.buffer, + BorrowedCompressedPage::Dict(dict_page) => dict_page.buffer, + } + } +} + +impl<'a> BorrowedCompressedDataPage<'a> { + pub fn new( + header: DataPageHeader, + buffer: &'a [u8], + compression: Compression, + uncompressed_page_size: usize, + descriptor: Descriptor, + ) -> Self { + Self { + header, + buffer, + compression, + uncompressed_page_size, + descriptor, + } + } +} + +impl<'a> BorrowedCompressedDictPage<'a> { + pub fn new( + buffer: &'a [u8], + compression: Compression, + uncompressed_page_size: usize, + num_values: usize, + is_sorted: bool, + ) -> Self { + Self { + buffer, + compression, + uncompressed_page_size, + num_values, + is_sorted, + } + } +} diff --git a/src/common/experimental_parquet_reader/src/util.rs b/src/common/experimental_parquet_reader/src/util.rs new file mode 100644 index 0000000000000..2405917039d2c --- /dev/null +++ b/src/common/experimental_parquet_reader/src/util.rs @@ -0,0 +1,104 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_expression::types::NumberDataType; +use databend_common_expression::TableDataType; +use parquet2::schema::types::PhysicalType; +use parquet2::schema::types::PrimitiveType; +use parquet2::schema::Repetition; + +pub fn from_table_field_type(field_name: String, field_type: &TableDataType) -> PrimitiveType { + let (inner_type, is_nullable) = match field_type { + TableDataType::Nullable(inner) => (inner.as_ref(), true), + other => (other, false), + }; + + let mut parquet_primitive_type = match inner_type { + TableDataType::String => PrimitiveType::from_physical(field_name, PhysicalType::ByteArray), + TableDataType::Number(number_type) => match number_type { + NumberDataType::Int8 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int16 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int32 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::Int64 => PrimitiveType::from_physical(field_name, PhysicalType::Int64), + NumberDataType::UInt8 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::UInt16 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::UInt32 => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + NumberDataType::UInt64 => PrimitiveType::from_physical(field_name, PhysicalType::Int64), + NumberDataType::Float32 => { + PrimitiveType::from_physical(field_name, PhysicalType::Float) + } + NumberDataType::Float64 => { + PrimitiveType::from_physical(field_name, PhysicalType::Double) + } + }, + TableDataType::Decimal(decimal_type) => { + let precision = decimal_type.precision(); + let _scale = decimal_type.scale(); + if precision <= 9 { + PrimitiveType::from_physical(field_name, PhysicalType::Int32) + } else if precision <= 18 { + PrimitiveType::from_physical(field_name, PhysicalType::Int64) + } else { + let len = decimal_length_from_precision(precision as usize); + PrimitiveType::from_physical(field_name, PhysicalType::FixedLenByteArray(len)) + } + } + TableDataType::Date => PrimitiveType::from_physical(field_name, PhysicalType::Int32), + TableDataType::Nullable(_) => unreachable!("Nullable should have been unwrapped"), + t => unimplemented!("Unsupported type: {:?} ", t), + }; + + if !is_nullable { + parquet_primitive_type.field_info.repetition = Repetition::Required; + } + + parquet_primitive_type +} + +fn decimal_length_from_precision(precision: usize) -> usize { + (((10.0_f64.powi(precision as i32) + 1.0).log2() + 1.0) / 8.0).ceil() as usize +} + +pub fn calculate_parquet_max_levels(data_type: &TableDataType) -> (i16, i16) { + match data_type { + TableDataType::Boolean => (0, 0), + TableDataType::Binary => (0, 0), + TableDataType::String => (0, 0), + TableDataType::Number(_) => (0, 0), + TableDataType::Decimal(_) => (0, 0), + TableDataType::Timestamp => (0, 0), + TableDataType::Date => (0, 0), + TableDataType::Interval => (0, 0), + TableDataType::Bitmap => (0, 0), + TableDataType::Variant => (0, 0), + TableDataType::Geometry => (0, 0), + TableDataType::Geography => (0, 0), + TableDataType::Vector(_) => (0, 0), + + TableDataType::Nullable(inner) => { + let (inner_def, inner_rep) = calculate_parquet_max_levels(inner); + (inner_def + 1, inner_rep) + } + + TableDataType::Null + | TableDataType::EmptyArray + | TableDataType::EmptyMap + | TableDataType::Array(_) + | TableDataType::Map(_) + | TableDataType::Tuple { .. } + | TableDataType::TimestampTz + | TableDataType::Opaque(_) + | TableDataType::StageLocation => unimplemented!(), + } +} diff --git a/src/query/expression/src/converts/arrow/from.rs b/src/query/expression/src/converts/arrow/from.rs index 6902f7f1cf1f9..6ce5de662a5c9 100644 --- a/src/query/expression/src/converts/arrow/from.rs +++ b/src/query/expression/src/converts/arrow/from.rs @@ -45,6 +45,7 @@ use crate::types::AnyType; use crate::types::ArrayColumn; use crate::types::DataType; use crate::types::DecimalColumn; +use crate::types::DecimalDataKind; use crate::types::DecimalDataType; use crate::types::DecimalSize; use crate::types::GeographyColumn; @@ -141,10 +142,13 @@ impl TryFrom<&Field> for TableField { )?)) } ArrowDataType::Decimal128(precision, scale) if *scale >= 0 => { - TableDataType::Decimal(DecimalDataType::Decimal128(DecimalSize::new( - *precision, - *scale as _, - )?)) + let size = DecimalSize::new(*precision, *scale as _)?; + match size.data_kind() { + DecimalDataKind::Decimal64 | DecimalDataKind::Decimal128 => { + TableDataType::Decimal(DecimalDataType::Decimal128(size)) + } + _ => unreachable!(), + } } ArrowDataType::Decimal256(precision, scale) if *scale >= 0 => { TableDataType::Decimal(DecimalDataType::Decimal256(DecimalSize::new( diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index b7f4606364105..2913f169ca18c 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -793,7 +793,7 @@ pub trait ReturnType: ValueType { fn create_builder(capacity: usize, generics: &GenericMap) -> Self::ColumnBuilder; fn column_from_vec(vec: Vec, generics: &GenericMap) -> Self::Column { - Self::column_from_iter(vec.iter().cloned(), generics) + Self::column_from_iter(vec.into_iter(), generics) } fn column_from_iter( diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 6d5329258152c..a5dd979aee704 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -876,9 +876,16 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(2..=u64::MAX)), }), - ("use_parquet2", DefaultSettingValue { + ("use_experimental_parquet_reader", DefaultSettingValue { value: UserSettingValue::UInt64(0), - desc: "This setting is deprecated", + desc: "Use experimental parquet reader to deserialize parquet data of fuse table.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), + ("enable_fuse_parquet_dictionary_encoding", DefaultSettingValue { + value: UserSettingValue::UInt64(0), + desc: "Enables dictionary encoding for fuse parquet storage format.", mode: SettingMode::Both, scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 2cf2d7453ff32..c003cce3e1814 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -669,12 +669,12 @@ impl Settings { self.try_get_u64("auto_compaction_segments_limit") } - pub fn get_use_parquet2(&self) -> Result { - Ok(self.try_get_u64("use_parquet2")? != 0) + pub fn get_use_experimental_parquet_reader(&self) -> Result { + Ok(self.try_get_u64("use_experimental_parquet_reader")? != 0) } - pub fn set_use_parquet2(&self, val: bool) -> Result<()> { - self.try_set_u64("use_parquet2", u64::from(val)) + pub fn set_use_experimental_parquet_reader(&self, val: bool) -> Result<()> { + self.try_set_u64("use_experimental_parquet_reader", u64::from(val)) } pub fn get_enable_replace_into_partitioning(&self) -> Result { @@ -1106,4 +1106,8 @@ impl Settings { pub fn get_max_aggregate_spill_level(&self) -> Result { self.try_get_u64("max_aggregate_spill_level") } + + pub fn get_enable_fuse_parquet_dictionary_encoding(&self) -> Result { + Ok(self.try_get_u64("enable_fuse_parquet_dictionary_encoding")? == 1) + } } diff --git a/src/query/storages/fuse/Cargo.toml b/src/query/storages/fuse/Cargo.toml index 198a114dd1b7d..4dd6925ca84fa 100644 --- a/src/query/storages/fuse/Cargo.toml +++ b/src/query/storages/fuse/Cargo.toml @@ -22,6 +22,7 @@ databend-common-meta-app = { workspace = true } databend-common-meta-types = { workspace = true } databend-common-metrics = { workspace = true } databend-common-native = { workspace = true } +databend-common-parquet-reader-experimental = { workspace = true } databend-common-pipeline = { workspace = true } databend-common-pipeline-transforms = { workspace = true } databend-common-sql = { workspace = true } @@ -62,6 +63,8 @@ match-template = { workspace = true } opendal = { workspace = true } parking_lot = { workspace = true } parquet = { workspace = true } + +parquet2 = { workspace = true } paste = { workspace = true } rand = { workspace = true } roaring = { workspace = true } diff --git a/src/query/storages/fuse/src/io/read/block/block_reader.rs b/src/query/storages/fuse/src/io/read/block/block_reader.rs index 7ddb31d8d44b4..619ef29c690d0 100644 --- a/src/query/storages/fuse/src/io/read/block/block_reader.rs +++ b/src/query/storages/fuse/src/io/read/block/block_reader.rs @@ -17,7 +17,6 @@ use std::sync::Arc; use arrow_schema::Field; use arrow_schema::Schema; -use arrow_schema::SchemaRef; use databend_common_catalog::plan::Projection; use databend_common_catalog::table_context::TableContext; use databend_common_exception::ErrorCode; @@ -37,14 +36,12 @@ use opendal::Operator; use crate::BlockReadResult; -// TODO: make BlockReader as a trait. #[derive(Clone)] pub struct BlockReader { pub(crate) ctx: Arc, pub(crate) operator: Operator, pub(crate) projection: Projection, pub(crate) projected_schema: TableSchemaRef, - pub(crate) arrow_schema: SchemaRef, pub(crate) project_indices: BTreeMap, pub(crate) project_column_nodes: Vec, pub(crate) default_vals: Vec, @@ -55,6 +52,7 @@ pub struct BlockReader { pub original_schema: TableSchemaRef, pub native_columns_reader: NativeColumnsReader, + pub use_experimental_parquet_reader: bool, } fn inner_project_field_default_values(default_vals: &[Scalar], paths: &[usize]) -> Result { @@ -141,12 +139,14 @@ impl BlockReader { let project_indices = Self::build_projection_indices(&project_column_nodes); + let use_experimental_parquet_reader = + ctx.get_settings().get_use_experimental_parquet_reader()?; + Ok(Arc::new(BlockReader { ctx, operator, projection, projected_schema, - arrow_schema: arrow_schema.into(), project_indices, project_column_nodes, default_vals, @@ -155,6 +155,7 @@ impl BlockReader { put_cache, original_schema: schema, native_columns_reader, + use_experimental_parquet_reader, })) } @@ -191,10 +192,6 @@ impl BlockReader { self.projected_schema.clone() } - pub fn arrow_schema(&self) -> SchemaRef { - self.arrow_schema.clone() - } - pub fn data_fields(&self) -> Vec { self.schema().fields().iter().map(DataField::from).collect() } diff --git a/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs b/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs new file mode 100644 index 0000000000000..493f9afcb7e35 --- /dev/null +++ b/src/query/storages/fuse/src/io/read/block/parquet/deserialize_v2.rs @@ -0,0 +1,263 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashMap; +use std::sync::Arc; + +use bytes::Buf; +use databend_common_exception::ErrorCode; +use databend_common_exception::Result; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::ColumnId; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchema; +use databend_common_expression::Scalar; +use databend_common_parquet_reader_experimental as v2_reader; +use databend_common_storage::ColumnNode; +use databend_storages_common_cache::CacheAccessor; +use databend_storages_common_cache::CacheManager; +use databend_storages_common_cache::SizedColumnArray; +use databend_storages_common_cache::TableDataCacheKey; +use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::Compression; +use parquet2::metadata::Descriptor; +use v2_reader::calculate_parquet_max_levels; +use v2_reader::data_chunk_to_col_iter; +use v2_reader::from_table_field_type; + +use super::BlockReader; +use crate::io::read::block::block_reader_merge_io::DataItem; + +pub struct FieldDeserializationContext<'a> { + pub(crate) column_metas: &'a HashMap, + pub(crate) column_chunks: &'a HashMap>, + pub(crate) num_rows: usize, + pub(crate) compression: &'a Compression, +} + +enum DeserializedColumn<'a> { + FromCache(&'a Arc), + Column((ColumnId, Column, usize)), +} + +impl BlockReader { + pub(crate) fn deserialize_v2( + &self, + block_path: &str, + num_rows: usize, + compression: &Compression, + column_metas: &HashMap, + column_chunks: HashMap, + ) -> Result { + if column_chunks.is_empty() { + return self.build_default_values_block(num_rows); + } + + let mut need_default_vals = Vec::with_capacity(self.project_column_nodes.len()); + let mut need_to_fill_default_val = false; + let mut deserialized_column_arrays = Vec::with_capacity(self.projection.len()); + let field_deserialization_ctx = FieldDeserializationContext { + column_metas, + column_chunks: &column_chunks, + num_rows, + compression, + }; + + for column_node in &self.project_column_nodes { + let deserialized_column = self + .deserialize_field_v2(&field_deserialization_ctx, column_node) + .map_err(|e| { + e.add_message(format!( + "failed to deserialize column: {:?}, location {} ", + column_node, block_path + )) + })?; + match deserialized_column { + None => { + need_to_fill_default_val = true; + need_default_vals.push(true); + } + Some(v) => { + deserialized_column_arrays.push((v, column_node.table_field.data_type())); + need_default_vals.push(false); + } + } + } + + let cache = if self.put_cache { + CacheManager::instance().get_table_data_array_cache() + } else { + None + }; + + let mut block_entries = Vec::with_capacity(deserialized_column_arrays.len()); + for (col, table_data_type) in deserialized_column_arrays { + // TODO we should cache deserialized data as Column (instead of arrow Array) + // Converting arrow array to column may be expensive + let entry = match col { + DeserializedColumn::FromCache(arrow_array) => { + BlockEntry::Column(Column::from_arrow_rs( + arrow_array.0.clone(), + &(&table_data_type.clone()).into(), + )?) + } + DeserializedColumn::Column((column_id, col, size)) => { + if let Some(cache) = &cache { + let meta = column_metas.get(&column_id).unwrap(); + let (offset, len) = meta.offset_length(); + let key = TableDataCacheKey::new(block_path, column_id, offset, len); + let array = col.clone().into_arrow_rs(); + cache.insert(key.into(), (array, size)); + }; + BlockEntry::Column(col) + } + }; + block_entries.push(entry); + } + + // build data block + let data_block = if !need_to_fill_default_val { + assert_eq!(block_entries.len(), self.projected_schema.num_fields()); + DataBlock::new(block_entries, num_rows) + } else { + let mut default_vals = Vec::with_capacity(need_default_vals.len()); + for (i, need_default_val) in need_default_vals.iter().enumerate() { + if !need_default_val { + default_vals.push(None); + } else { + default_vals.push(Some(self.default_vals[i].clone())); + } + } + + create_with_opt_default_value( + block_entries, + &self.data_schema(), + &default_vals, + num_rows, + )? + }; + Ok(data_block) + } + + fn deserialize_field_v2<'a>( + &self, + deserialization_context: &'a FieldDeserializationContext, + column_node: &ColumnNode, + ) -> Result>> { + let is_nested = column_node.is_nested; + + if is_nested { + unimplemented!("Nested type is not supported now"); + } + + let column_chunks = deserialization_context.column_chunks; + let compression = deserialization_context.compression; + + let (max_def_level, max_rep_level) = + calculate_parquet_max_levels(&column_node.table_field.data_type); + + let parquet_primitive_type = from_table_field_type( + column_node.table_field.name.clone(), + &column_node.table_field.data_type, + ); + + let column_descriptor = Descriptor { + primitive_type: parquet_primitive_type, + max_def_level, + max_rep_level, + }; + + // Since we only support leaf column now + let leaf_column_id = 0; + let column_id = column_node.leaf_column_ids[leaf_column_id]; + + let Some(column_meta) = deserialization_context.column_metas.get(&column_id) else { + return Ok(None); + }; + let Some(chunk) = column_chunks.get(&column_id) else { + return Ok(None); + }; + + match chunk { + DataItem::RawData(data) => { + let field_uncompressed_size = data.len(); + let num_rows = deserialization_context.num_rows; + let field_name = column_node.field.name().to_owned(); + + let bytes = data.to_bytes(); + let chunk = bytes.chunk(); + let mut column_iter = data_chunk_to_col_iter( + column_meta, + &chunk, + num_rows, + &column_descriptor, + column_node.table_field.clone(), + compression, + )?; + + let column = column_iter.next().transpose()?.ok_or_else(|| { + ErrorCode::StorageOther(format!("no array found for field {field_name}")) + })?; + + // Since we deserialize all the rows of this column, the iterator should be drained + assert!(column_iter.next().is_none()); + // Deserialized from raw bytes, and intended to be cached + Ok(Some(DeserializedColumn::Column(( + column_id, + column, + field_uncompressed_size, + )))) + } + DataItem::ColumnArray(column_array) => { + if is_nested { + return Err(ErrorCode::StorageOther( + "unexpected nested field: nested leaf field hits cached", + )); + } + // since it is not nested, this field contains only one column + Ok(Some(DeserializedColumn::FromCache(column_array))) + } + } + } +} + +fn create_with_opt_default_value( + block_entries: Vec, + schema: &DataSchema, + default_vals: &[Option], + num_rows: usize, +) -> Result { + let schema_fields = schema.fields(); + let mut block_entries_iter = block_entries.into_iter(); + + let mut entries = Vec::with_capacity(default_vals.len()); + for (i, default_val) in default_vals.iter().enumerate() { + let field = &schema_fields[i]; + let data_type = field.data_type(); + + let entry = match default_val { + Some(default_val) => { + BlockEntry::new_const_column(data_type.clone(), default_val.to_owned(), num_rows) + } + None => block_entries_iter + .next() + .expect("arrays should have enough elements"), + }; + + entries.push(entry); + } + + Ok(DataBlock::new(entries, num_rows)) +} diff --git a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs index 781bb843c3f8f..ec74286470378 100644 --- a/src/query/storages/fuse/src/io/read/block/parquet/mod.rs +++ b/src/query/storages/fuse/src/io/read/block/parquet/mod.rs @@ -34,6 +34,8 @@ use databend_storages_common_table_meta::meta::Compression; mod adapter; mod deserialize; +mod deserialize_v2; + pub use adapter::RowGroupImplBuilder; pub use deserialize::column_chunks_to_record_batch; @@ -48,6 +50,33 @@ impl BlockReader { column_chunks: HashMap, compression: &Compression, block_path: &str, + ) -> databend_common_exception::Result { + if self.use_experimental_parquet_reader { + self.deserialize_v2( + block_path, + num_rows, + compression, + column_metas, + column_chunks, + ) + } else { + self.deserialize_using_arrow( + num_rows, + column_metas, + column_chunks, + compression, + block_path, + ) + } + } + + pub fn deserialize_using_arrow( + &self, + num_rows: usize, + column_metas: &HashMap, + column_chunks: HashMap, + compression: &Compression, + block_path: &str, ) -> databend_common_exception::Result { if column_chunks.is_empty() { return self.build_default_values_block(num_rows); diff --git a/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test b/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test new file mode 100644 index 0000000000000..f20e0cbeeaa96 --- /dev/null +++ b/tests/sqllogictests/suites/base/03_common/03_extra_new_reader.test @@ -0,0 +1,87 @@ +statement ok +create or replace database test_new_reader; + +statement ok +use test_new_reader; + +statement ok +SET use_experimental_parquet_reader = 1; + + +statement ok +create table t1 (a int not null); + +statement ok +insert into t1 values(1), (3), (5); + +query I +select * from t1 order by a; +---- +1 +3 +5 + +statement ok +create table t2 (a int); + +statement ok +insert into t2 values(1), (null), (5); + +query I +select * from t2 order by a; +---- +1 +5 +NULL + + +statement ok +create table t3 (a int); + +statement ok +create table r like t3 engine = random; + +statement ok +insert into t3 select * from r limit 100000; + +statement ok +select sum(a) from t3 ignore_result; + +statement ok +CREATE TABLE IF NOT EXISTS lineitem +( + l_orderkey BIGINT not null, + l_partkey BIGINT not null, + l_suppkey BIGINT not null, + l_linenumber BIGINT not null, + l_quantity DECIMAL(15, 2) not null, + l_extendedprice DECIMAL(15, 2) not null, + l_discount DECIMAL(15, 2) not null, + l_tax DECIMAL(15, 2) not null, + l_returnflag STRING not null, + l_linestatus STRING not null, + l_shipdate DATE not null, + l_commitdate DATE not null, + l_receiptdate DATE not null, + l_shipinstruct STRING not null, + l_shipmode STRING not null, + l_comment STRING not null +); + +statement ok +create table lr like lineitem engine = random; + +statement ok +set enable_fuse_parquet_dictionary_encoding = 1; + +statement ok +insert into lineitem select * from lr limit 1000; + +statement ok +insert into lineitem select * from lr limit 1000; + +statement ok +select * from lineitem ignore_result; + + +