diff --git a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs index 25c8e5c24ed4d..e3e53bacdb271 100644 --- a/src/query/ee/src/storages/fuse/operations/virtual_columns.rs +++ b/src/query/ee/src/storages/fuse/operations/virtual_columns.rs @@ -128,7 +128,7 @@ pub async fn do_refresh_virtual_column( let segment_reader = MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone()); - let write_settings = fuse_table.get_write_settings(); + let write_settings = fuse_table.get_write_settings_with_ctx(&ctx)?; let storage_format = write_settings.storage_format; let operator = fuse_table.get_operator_ref(); diff --git a/src/query/formats/src/output_format/parquet.rs b/src/query/formats/src/output_format/parquet.rs index 56233190af426..4f6e81f3edba0 100644 --- a/src/query/formats/src/output_format/parquet.rs +++ b/src/query/formats/src/output_format/parquet.rs @@ -60,6 +60,7 @@ impl OutputFormat for ParquetOutputFormat { &mut buf, TableCompression::Zstd, true, + false, None, )?; Ok(buf) diff --git a/src/query/service/src/interpreters/interpreter_index_refresh.rs b/src/query/service/src/interpreters/interpreter_index_refresh.rs index dfc9854517fdb..9e26a7fa7a66a 100644 --- a/src/query/service/src/interpreters/interpreter_index_refresh.rs +++ b/src/query/service/src/interpreters/interpreter_index_refresh.rs @@ -324,12 +324,14 @@ impl Interpreter for RefreshIndexInterpreter { let sink_schema = Arc::new(sink_schema); build_res.main_pipeline.try_resize(1)?; + let table_ctx: Arc = self.ctx.clone(); + let write_settings = fuse_table.get_write_settings_with_ctx(&table_ctx)?; build_res.main_pipeline.add_sink(|input| { AggIndexSink::try_create( input, fuse_table.get_operator(), self.plan.index_id, - fuse_table.get_write_settings(), + write_settings.clone(), sink_schema.clone(), block_name_offset, self.plan.user_defined_block_name, diff --git a/src/query/service/src/test_kits/block_writer.rs b/src/query/service/src/test_kits/block_writer.rs index a11e8e9d55b36..b57fe13e6b14b 100644 --- a/src/query/service/src/test_kits/block_writer.rs +++ b/src/query/service/src/test_kits/block_writer.rs @@ -158,6 +158,7 @@ impl<'a> BlockWriter<'a> { &mut data, TableCompression::None, false, + false, None, )?; let size = data.len() as u64; diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 7efd9f212e9e4..9fb83d503359b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -977,6 +977,13 @@ impl DefaultSettings { scope: SettingScope::Both, range: Some(SettingRange::Numeric(0..=1)), }), + ("enable_parquet_int32_delta_encoding", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enables automatic INT32 DELTA_BINARY_PACKED encoding when heuristics match.", + mode: SettingMode::Both, + scope: SettingScope::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("external_server_connect_timeout_secs", DefaultSettingValue { value: UserSettingValue::UInt64(10), desc: "Connection timeout to external server", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index 0ad3c78c5cdc3..1f72abf91774c 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -763,6 +763,10 @@ impl Settings { Ok(self.try_get_u64("enable_parquet_prewhere")? != 0) } + pub fn get_enable_parquet_int32_delta_encoding(&self) -> Result { + Ok(self.try_get_u64("enable_parquet_int32_delta_encoding")? != 0) + } + pub fn get_numeric_cast_option(&self) -> Result { self.try_get_string("numeric_cast_option") } diff --git a/src/query/storages/basic/src/result_cache/write/writer.rs b/src/query/storages/basic/src/result_cache/write/writer.rs index 9a654a0f2b7b3..5fe20f38a1e10 100644 --- a/src/query/storages/basic/src/result_cache/write/writer.rs +++ b/src/query/storages/basic/src/result_cache/write/writer.rs @@ -78,6 +78,7 @@ impl ResultCacheWriter { &mut buf, TableCompression::None, false, + false, None, )?; diff --git a/src/query/storages/common/blocks/src/parquet_rs.rs b/src/query/storages/common/blocks/src/parquet_rs.rs index 0e03b24342c2a..0f4cc3574dbce 100644 --- a/src/query/storages/common/blocks/src/parquet_rs.rs +++ b/src/query/storages/common/blocks/src/parquet_rs.rs @@ -12,13 +12,19 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; +use databend_common_expression::Scalar; +use databend_common_expression::TableDataType; use databend_common_expression::TableSchema; use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::number::NumberScalar; +use databend_storages_common_table_meta::meta::ColumnStatistics; use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; @@ -32,6 +38,8 @@ use parquet::schema::types::ColumnPath; /// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold. const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1; +const DELTA_HIGH_CARDINALITY_RATIO: f64 = 0.9; +const DELTA_RANGE_TOLERANCE: f64 = 1.05; /// Serialize data blocks to parquet format. pub fn blocks_to_parquet( @@ -40,6 +48,7 @@ pub fn blocks_to_parquet( write_buffer: &mut Vec, compression: TableCompression, enable_dictionary: bool, + enable_parquet_int32_delta_encoding: bool, metadata: Option>, ) -> Result { blocks_to_parquet_with_stats( @@ -48,6 +57,7 @@ pub fn blocks_to_parquet( write_buffer, compression, enable_dictionary, + enable_parquet_int32_delta_encoding, metadata, None, ) @@ -69,6 +79,7 @@ pub fn blocks_to_parquet_with_stats( write_buffer: &mut Vec, compression: TableCompression, enable_dictionary: bool, + enable_parquet_int32_delta_encoding: bool, metadata: Option>, column_stats: Option<&StatisticsOfColumns>, ) -> Result { @@ -78,11 +89,15 @@ pub fn blocks_to_parquet_with_stats( // the streaming writer and only rely on the first block's NDV (and row count) snapshot. let num_rows = blocks[0].num_rows(); let arrow_schema = Arc::new(table_schema.into()); + let column_metrics = column_stats.map(ColumnStatsView); let props = build_parquet_writer_properties( compression, enable_dictionary, - column_stats, + enable_parquet_int32_delta_encoding, + column_metrics + .as_ref() + .map(|view| view as &dyn EncodingStatsProvider), metadata, num_rows, table_schema, @@ -105,7 +120,8 @@ pub fn blocks_to_parquet_with_stats( pub fn build_parquet_writer_properties( compression: TableCompression, enable_dictionary: bool, - cols_stats: Option, + enable_parquet_int32_delta_encoding: bool, + column_metrics: Option<&dyn EncodingStatsProvider>, metadata: Option>, num_rows: usize, table_schema: &TableSchema, @@ -120,27 +136,62 @@ pub fn build_parquet_writer_properties( .set_key_value_metadata(metadata); if enable_dictionary { - // Enable dictionary for all columns - builder = builder - .set_writer_version(WriterVersion::PARQUET_2_0) - .set_dictionary_enabled(true); - if let Some(cols_stats) = cols_stats { - // Disable dictionary of columns that have high cardinality - for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) { - if let Some(ndv) = cols_stats.column_ndv(&column_id) { + builder = builder.set_dictionary_enabled(true); + } else { + builder = builder.set_dictionary_enabled(false); + } + + if enable_dictionary || enable_parquet_int32_delta_encoding { + builder = builder.set_writer_version(WriterVersion::PARQUET_2_0); + } + + let column_paths: HashMap = table_schema_arrow_leaf_paths(table_schema) + .into_iter() + .map(|(id, path)| (id, ColumnPath::from(path))) + .collect(); + + if enable_dictionary { + if let Some(metrics) = column_metrics { + for (column_id, column_path) in &column_paths { + if let Some(ndv) = metrics.column_ndv(column_id) { if num_rows > 0 && (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD { + builder = builder.set_column_dictionary_enabled(column_path.clone(), false); + } + } + } + } + } + + if enable_parquet_int32_delta_encoding { + if let Some(metrics) = column_metrics { + for field in table_schema.leaf_fields() { + if !matches!( + field.data_type().remove_nullable(), + TableDataType::Number(NumberDataType::Int32) + ) { + continue; + } + let column_id = field.column_id(); + let Some(stats) = metrics.column_stats(&column_id) else { + continue; + }; + let Some(ndv) = metrics.column_ndv(&column_id) else { + continue; + }; + if should_apply_int32_delta(stats, ndv, num_rows) { + if let Some(path) = column_paths.get(&column_id) { builder = builder - .set_column_dictionary_enabled(ColumnPath::from(components), false); + .set_column_dictionary_enabled(path.clone(), false) + .set_column_encoding(path.clone(), Encoding::DELTA_BINARY_PACKED); } } } } - builder.build() - } else { - builder.set_dictionary_enabled(false).build() } + + builder.build() } /// Provides per column NDV statistics @@ -154,6 +205,67 @@ impl NdvProvider for &StatisticsOfColumns { } } +pub trait EncodingStatsProvider: NdvProvider { + fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics>; +} + +struct ColumnStatsView<'a>(&'a StatisticsOfColumns); + +impl<'a> NdvProvider for ColumnStatsView<'a> { + fn column_ndv(&self, column_id: &ColumnId) -> Option { + self.0 + .get(column_id) + .and_then(|item| item.distinct_of_values) + } +} + +impl<'a> EncodingStatsProvider for ColumnStatsView<'a> { + fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> { + self.0.get(column_id) + } +} + +fn should_apply_int32_delta(stats: &ColumnStatistics, ndv: u64, num_rows: usize) -> bool { + if num_rows == 0 || ndv == 0 || stats.null_count > 0 { + return false; + } + let Some(min) = scalar_to_i64(&stats.min) else { + return false; + }; + let Some(max) = scalar_to_i64(&stats.max) else { + return false; + }; + if max <= min { + return false; + } + // Use ratio-based heuristics instead of absolute NDV threshold. + // This ensures consistent encoding decisions regardless of first batch size. + let ndv_ratio = ndv as f64 / num_rows as f64; + if ndv_ratio < DELTA_HIGH_CARDINALITY_RATIO { + return false; + } + let span = (max - min + 1) as f64; + let contiguous_ratio = span / ndv as f64; + contiguous_ratio <= DELTA_RANGE_TOLERANCE +} + +fn scalar_to_i64(val: &Scalar) -> Option { + match val { + Scalar::Number(num) => match num { + NumberScalar::Int8(v) => Some(*v as i64), + NumberScalar::Int16(v) => Some(*v as i64), + NumberScalar::Int32(v) => Some(*v as i64), + NumberScalar::Int64(v) => Some(*v), + NumberScalar::UInt8(v) => Some(*v as i64), + NumberScalar::UInt16(v) => Some(*v as i64), + NumberScalar::UInt32(v) => Some(*v as i64), + NumberScalar::UInt64(v) => i64::try_from(*v).ok(), + _ => None, + }, + _ => None, + } +} + #[cfg(test)] mod tests { use std::collections::HashMap; @@ -174,6 +286,12 @@ mod tests { } } + impl EncodingStatsProvider for TestNdvProvider { + fn column_stats(&self, _column_id: &ColumnId) -> Option<&ColumnStatistics> { + None + } + } + fn sample_schema() -> TableSchema { TableSchema::new(vec![ TableField::new("simple", TableDataType::Number(NumberDataType::Int32)), @@ -211,10 +329,12 @@ mod tests { .map(|(id, path)| (id, ColumnPath::from(path))) .collect(); + let provider = TestNdvProvider { ndv }; let props = build_parquet_writer_properties( TableCompression::Zstd, true, - Some(TestNdvProvider { ndv }), + false, + Some(&provider), None, 1000, &schema, @@ -250,7 +370,8 @@ mod tests { let props = build_parquet_writer_properties( TableCompression::Zstd, false, - None::, + false, + None, None, 1000, &schema, diff --git a/src/query/storages/fuse/benches/bench.rs b/src/query/storages/fuse/benches/bench.rs index e52099d49062e..96b978936e05f 100644 --- a/src/query/storages/fuse/benches/bench.rs +++ b/src/query/storages/fuse/benches/bench.rs @@ -148,6 +148,7 @@ mod dummy { max_page_size, block_per_seg, enable_parquet_dictionary, + enable_parquet_int32_delta_encoding: false, }; let schema = Arc::new(schema); let mut buffer = Vec::new(); diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index fe3bcbdce196b..3fda7ae1d7934 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -335,9 +335,23 @@ impl FuseTable { max_page_size, block_per_seg, enable_parquet_dictionary: enable_parquet_dictionary_encoding, + enable_parquet_int32_delta_encoding: true, } } + pub fn get_write_settings_with_ctx( + &self, + ctx: &Arc, + ) -> Result { + let mut settings = self.get_write_settings(); + if matches!(settings.storage_format, FuseStorageFormat::Parquet) { + settings.enable_parquet_int32_delta_encoding = ctx + .get_settings() + .get_enable_parquet_int32_delta_encoding()?; + } + Ok(settings) + } + /// Get max page size. /// For native storage format. pub fn get_max_page_size(&self) -> Option { diff --git a/src/query/storages/fuse/src/io/write/block_writer.rs b/src/query/storages/fuse/src/io/write/block_writer.rs index dfb7a73e8d897..5ddfb697748cb 100644 --- a/src/query/storages/fuse/src/io/write/block_writer.rs +++ b/src/query/storages/fuse/src/io/write/block_writer.rs @@ -97,6 +97,7 @@ pub fn serialize_block_with_column_stats( buf, write_settings.table_compression, write_settings.enable_parquet_dictionary, + write_settings.enable_parquet_int32_delta_encoding, None, column_stats, )?; diff --git a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs index a7cc4642d7ce6..da52fc0ad7a97 100644 --- a/src/query/storages/fuse/src/io/write/bloom_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/bloom_index_writer.rs @@ -78,6 +78,7 @@ impl BloomIndexState { &mut data, TableCompression::None, false, + false, None, )?; let data_size = data.len() as u64; diff --git a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs index 5f5ff17c9fc2a..07de83485686f 100644 --- a/src/query/storages/fuse/src/io/write/inverted_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/inverted_index_writer.rs @@ -338,6 +338,7 @@ impl InvertedIndexWriter { TableCompression::Zstd, // No dictionary page for inverted index false, + false, None, )?; diff --git a/src/query/storages/fuse/src/io/write/stream/block_builder.rs b/src/query/storages/fuse/src/io/write/stream/block_builder.rs index 7f8d20d9475b1..12a0fb638da19 100644 --- a/src/query/storages/fuse/src/io/write/stream/block_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/block_builder.rs @@ -40,6 +40,7 @@ use databend_common_meta_app::schema::TableIndex; use databend_common_native::write::NativeWriter; use databend_common_native::write::WriteOptions; use databend_common_sql::executor::physical_plans::MutationKind; +use databend_storages_common_blocks::EncodingStatsProvider; use databend_storages_common_blocks::NdvProvider; use databend_storages_common_blocks::build_parquet_writer_properties; use databend_storages_common_index::BloomIndex; @@ -50,6 +51,8 @@ use databend_storages_common_index::RangeIndex; use databend_storages_common_table_meta::meta::BlockHLLState; use databend_storages_common_table_meta::meta::BlockMeta; use databend_storages_common_table_meta::meta::ColumnMeta; +use databend_storages_common_table_meta::meta::ColumnStatistics; +use databend_storages_common_table_meta::meta::StatisticsOfColumns; use databend_storages_common_table_meta::meta::TableMetaTimestamps; use databend_storages_common_table_meta::table::TableCompression; use parquet::arrow::ArrowWriter; @@ -87,7 +90,8 @@ impl UninitializedArrowWriter { let writer_properties = build_parquet_writer_properties( write_settings.table_compression, write_settings.enable_parquet_dictionary, - Some(cols_ndv_info), + write_settings.enable_parquet_int32_delta_encoding, + Some(&cols_ndv_info), None, num_rows, self.table_schema.as_ref(), @@ -148,12 +152,25 @@ impl ArrowParquetWriter { pub struct ColumnsNdvInfo { cols_ndv: HashMap, + cols_stats: StatisticsOfColumns, num_rows: usize, } impl ColumnsNdvInfo { - fn new(num_rows: usize, cols_ndv: HashMap) -> Self { - Self { cols_ndv, num_rows } + fn new( + num_rows: usize, + cols_ndv: HashMap, + cols_stats: StatisticsOfColumns, + ) -> Self { + Self { + cols_ndv, + cols_stats, + num_rows, + } + } + + fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> { + self.cols_stats.get(column_id) } } impl NdvProvider for ColumnsNdvInfo { @@ -162,6 +179,12 @@ impl NdvProvider for ColumnsNdvInfo { } } +impl EncodingStatsProvider for ColumnsNdvInfo { + fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> { + self.column_stats(column_id) + } +} + pub enum BlockWriterImpl { Parquet(ArrowParquetWriter), // Native format doesnot support stream write. @@ -383,8 +406,9 @@ impl StreamBlockBuilder { // block's NDV stats to heuristically configure the parquet writer. let mut cols_ndv = self.column_stats_state.peek_cols_ndv(); cols_ndv.extend(self.block_stats_builder.peek_cols_ndv()); + let cols_stats = self.column_stats_state.peek_column_stats()?; self.block_writer - .start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv))?; + .start(ColumnsNdvInfo::new(block.num_rows(), cols_ndv, cols_stats))?; } self.block_writer @@ -555,7 +579,7 @@ impl StreamBlockProperties { ..table.schema().as_ref().clone() }); - let write_settings = table.get_write_settings(); + let write_settings = table.get_write_settings_with_ctx(&ctx)?; let bloom_columns_map = table .bloom_index_cols diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs index 4bee006f62afa..ecf27cc411b92 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_builder.rs @@ -80,6 +80,10 @@ pub trait ColumnStatsOps { fn finalize(self) -> Result; } +pub trait ColumnStatsPeek { + fn peek(&self) -> Result>; +} + impl ColumnStatsOps for GenericColumnStatisticsBuilder where T: ValueType + Send + Sync, @@ -333,7 +337,7 @@ where fn finalize(self) -> Result { let min = if let Some(v) = self.min { let v = A::value_to_scalar(v); - // safe upwrap. + // safe unwrap. T::upcast_scalar_with_type(v, &self.data_type) .trim_min() .unwrap() @@ -361,4 +365,62 @@ where None, )) } + + fn snapshot(&self) -> Result> { + if self.min.is_none() && self.max.is_none() && self.null_count == 0 { + return Ok(None); + } + let min = if let Some(v) = &self.min { + let v = A::value_to_scalar(v.clone()); + // safe unwrap. + T::upcast_scalar_with_type(v, &self.data_type) + .trim_min() + .unwrap() + } else { + Scalar::Null + }; + let max = if let Some(v) = &self.max { + let v = A::value_to_scalar(v.clone()); + if let Some(v) = T::upcast_scalar_with_type(v, &self.data_type).trim_max() { + v + } else { + return Err(ErrorCode::Internal( + "Unable to trim string: first 16 chars are all replacement_point".to_string(), + )); + } + } else { + Scalar::Null + }; + Ok(Some(ColumnStatistics::new( + min, + max, + self.null_count as u64, + self.in_memory_size as u64, + None, + ))) + } +} + +impl ColumnStatsPeek for ColumnStatisticsBuilder { + fn peek(&self) -> Result> { + match self { + ColumnStatisticsBuilder::Int8(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Int16(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Int32(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Int64(inner) => inner.snapshot(), + ColumnStatisticsBuilder::UInt8(inner) => inner.snapshot(), + ColumnStatisticsBuilder::UInt16(inner) => inner.snapshot(), + ColumnStatisticsBuilder::UInt32(inner) => inner.snapshot(), + ColumnStatisticsBuilder::UInt64(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Float32(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Float64(inner) => inner.snapshot(), + ColumnStatisticsBuilder::String(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Date(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Timestamp(inner) => inner.snapshot(), + ColumnStatisticsBuilder::TimestampTz(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Decimal64(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Decimal128(inner) => inner.snapshot(), + ColumnStatisticsBuilder::Decimal256(inner) => inner.snapshot(), + } + } } diff --git a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs index f462de3dc83e7..cb19cdf2b9830 100644 --- a/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs +++ b/src/query/storages/fuse/src/io/write/stream/column_statistics_state.rs @@ -26,6 +26,7 @@ use crate::io::write::stream::ColumnNDVEstimator; use crate::io::write::stream::ColumnNDVEstimatorOps; use crate::io::write::stream::ColumnStatisticsBuilder; use crate::io::write::stream::ColumnStatsOps; +use crate::io::write::stream::column_statistics_builder::ColumnStatsPeek; use crate::io::write::stream::create_column_ndv_estimator; use crate::io::write::stream::create_column_stats_builder; use crate::statistics::traverse_values_dfs; @@ -93,6 +94,16 @@ impl ColumnStatisticsState { .collect() } + pub fn peek_column_stats(&self) -> Result { + let mut statistics = StatisticsOfColumns::with_capacity(self.col_stats.len()); + for (column_id, builder) in &self.col_stats { + if let Some(stats) = builder.peek()? { + statistics.insert(*column_id, stats); + } + } + Ok(statistics) + } + pub fn finalize( self, mut column_distinct_count: HashMap, diff --git a/src/query/storages/fuse/src/io/write/vector_index_writer.rs b/src/query/storages/fuse/src/io/write/vector_index_writer.rs index ca372d66c2c2d..6036cddfa2f32 100644 --- a/src/query/storages/fuse/src/io/write/vector_index_writer.rs +++ b/src/query/storages/fuse/src/io/write/vector_index_writer.rs @@ -216,6 +216,7 @@ impl VectorIndexBuilder { TableCompression::Zstd, // No dictionary page for vector index false, + false, Some(metadata), )?; @@ -322,6 +323,7 @@ impl VectorIndexBuilder { TableCompression::Zstd, // No dictionary page for vector index false, + false, Some(metadata), )?; diff --git a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs index 3854aea2ccefb..a03129eaa8a94 100644 --- a/src/query/storages/fuse/src/io/write/virtual_column_builder.rs +++ b/src/query/storages/fuse/src/io/write/virtual_column_builder.rs @@ -514,6 +514,7 @@ impl VirtualColumnBuilder { &mut data, write_settings.table_compression, write_settings.enable_parquet_dictionary, + write_settings.enable_parquet_int32_delta_encoding, None, Some(&columns_statistics), )?; diff --git a/src/query/storages/fuse/src/io/write/write_settings.rs b/src/query/storages/fuse/src/io/write/write_settings.rs index 87baa8c3ecd18..d35e29d63f6ea 100644 --- a/src/query/storages/fuse/src/io/write/write_settings.rs +++ b/src/query/storages/fuse/src/io/write/write_settings.rs @@ -29,6 +29,7 @@ pub struct WriteSettings { pub block_per_seg: usize, pub enable_parquet_dictionary: bool, + pub enable_parquet_int32_delta_encoding: bool, } impl Default for WriteSettings { @@ -39,6 +40,7 @@ impl Default for WriteSettings { max_page_size: DEFAULT_ROW_PER_PAGE, block_per_seg: DEFAULT_BLOCK_PER_SEGMENT, enable_parquet_dictionary: false, + enable_parquet_int32_delta_encoding: true, } } } diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index 926954639263a..e8ecda45d0f59 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -187,11 +187,12 @@ impl TransformSerializeBlock { true }; + let write_settings = table.get_write_settings_with_ctx(&ctx)?; let block_builder = BlockBuilder { ctx, meta_locations: table.meta_location_generator().clone(), source_schema, - write_settings: table.get_write_settings(), + write_settings, cluster_stats_gen, bloom_columns_map, ndv_columns_map, diff --git a/src/query/storages/fuse/src/operations/inverted_index.rs b/src/query/storages/fuse/src/operations/inverted_index.rs index 30bcacd5edae6..33510cdf79959 100644 --- a/src/query/storages/fuse/src/operations/inverted_index.rs +++ b/src/query/storages/fuse/src/operations/inverted_index.rs @@ -149,7 +149,7 @@ impl FuseTable { let data_schema = Arc::new(DataSchema::from(index_schema.as_ref())); let settings = ReadSettings::from_ctx(&ctx)?; - let write_settings = self.get_write_settings(); + let write_settings = self.get_write_settings_with_ctx(&ctx)?; let storage_format = write_settings.storage_format; pipeline.add_source( diff --git a/src/query/storages/fuse/src/operations/merge.rs b/src/query/storages/fuse/src/operations/merge.rs index 6c2905680f514..b5ffadf006d4e 100644 --- a/src/query/storages/fuse/src/operations/merge.rs +++ b/src/query/storages/fuse/src/operations/merge.rs @@ -110,7 +110,7 @@ impl FuseTable { ctx: ctx.clone(), meta_locations: self.meta_location_generator().clone(), source_schema: new_schema, - write_settings: self.get_write_settings(), + write_settings: self.get_write_settings_with_ctx(&ctx)?, cluster_stats_gen, bloom_columns_map, ndv_columns_map, diff --git a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs index f38f9616de528..b06fb37cd618b 100644 --- a/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/mutator/matched_mutator.rs @@ -98,7 +98,7 @@ impl MatchedAggregator { let target_table_schema = Arc::new(table.schema_with_stream().remove_virtual_computed_fields()); let data_accessor = table.get_operator(); - let write_settings = table.get_write_settings(); + let write_settings = table.get_write_settings_with_ctx(&ctx)?; let update_stream_columns = table.change_tracking_enabled(); let read_settings = ReadSettings::from_ctx(&ctx)?; diff --git a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs index 7829194f4a737..95999384b4d2d 100644 --- a/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs +++ b/src/query/storages/fuse/src/operations/replace_into/mutator/replace_into_operation_agg.rs @@ -124,7 +124,7 @@ impl ReplaceIntoOperationAggregator { ) -> Result { let data_accessor = table.get_operator(); let table_schema = table.schema_with_stream(); - let write_settings = table.get_write_settings(); + let write_settings = table.get_write_settings_with_ctx(&ctx)?; let update_stream_columns = table.change_tracking_enabled(); let deletion_accumulator = DeletionAccumulator::default(); diff --git a/src/query/storages/fuse/src/operations/table_index.rs b/src/query/storages/fuse/src/operations/table_index.rs index 32d0f5ae81e45..5565a7b82aa87 100644 --- a/src/query/storages/fuse/src/operations/table_index.rs +++ b/src/query/storages/fuse/src/operations/table_index.rs @@ -177,7 +177,7 @@ pub async fn do_refresh_table_index( } let settings = ReadSettings::from_ctx(&ctx)?; - let write_settings = fuse_table.get_write_settings(); + let write_settings = fuse_table.get_write_settings_with_ctx(&ctx)?; let storage_format = write_settings.storage_format; pipeline.add_source( diff --git a/src/query/storages/fuse/src/table_functions/fuse_encoding.rs b/src/query/storages/fuse/src/table_functions/fuse_encoding.rs index 4dbe97f551567..2f14e9b8c4b1e 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_encoding.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_encoding.rs @@ -415,7 +415,6 @@ impl<'a> FuseEncodingImpl<'a> { continue; }; let Some(column_chunk) = columns.get(*column_idx) else { - // Missing column caused by schema evolutions continue; }; let chunk_meta = column_chunk.meta_data.as_ref().ok_or_else(|| { @@ -424,7 +423,6 @@ impl<'a> FuseEncodingImpl<'a> { location, column_id )) })?; - let compressed_size = u64::try_from(chunk_meta.total_compressed_size).map_err(|_| { ErrorCode::ParquetFileInvalid(format!( diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test index e8d386eae7a94..7c20acdfeda24 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test @@ -88,5 +88,33 @@ select * exclude(block_location) from fuse_encoding('db_09_0027', 't_parquet', ' ---- t_parquet Parquet c2 INT NULL (INT32) NULL 202 429 plain,rle zstd +statement ok +set enable_parquet_int32_delta_encoding = 1; + +statement ok +create or replace table t_delta(c int) storage_format='parquet' compression = 'zstd'; + +statement ok +insert into t_delta select number from numbers(4096); + +query I +select count(*) from fuse_encoding('db_09_0027', 't_delta', 'c') where level_one like '%delta_binary_packed%'; +---- +1 + +statement ok +set enable_parquet_int32_delta_encoding = 0; + +statement ok +create or replace table t_delta_plain(c int) storage_format='parquet' compression = 'zstd'; + +statement ok +insert into t_delta_plain select number from numbers(4096); + +query I +select count(*) from fuse_encoding('db_09_0027', 't_delta_plain', 'c') where level_one like '%delta_binary_packed%'; +---- +0 + statement ok DROP DATABASE db_09_0027