Skip to content
Draft
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -589,4 +589,4 @@ tantivy-common = { git = "https://github.com/datafuse-extras/tantivy", rev = "b6
tantivy-jieba = { git = "https://github.com/datafuse-extras/tantivy-jieba", rev = "68ab55c" }
tantivy-query-grammar = { git = "https://github.com/datafuse-extras/tantivy", rev = "b600b0e", package = "tantivy-query-grammar" }
watcher = { git = "https://github.com/databendlabs/watcher", tag = "v0.4.2" }
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", tag = "databend-alpha.4" }
xorfilter-rs = { git = "https://github.com/datafuse-extras/xorfilter", rev = "2e434bc" }
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn do_refresh_virtual_column(
let segment_reader =
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());

let write_settings = fuse_table.get_write_settings();
let write_settings = fuse_table.get_write_settings_with_ctx(&ctx)?;
let storage_format = write_settings.storage_format;

let operator = fuse_table.get_operator_ref();
Expand Down
13 changes: 5 additions & 8 deletions src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchemaRef;
use databend_common_io::constants::DEFAULT_BLOCK_BUFFER_SIZE;
use databend_storages_common_blocks::ParquetWriteOptions;
use databend_storages_common_blocks::blocks_to_parquet;
use databend_storages_common_table_meta::table::TableCompression;

Expand Down Expand Up @@ -54,14 +55,10 @@ impl OutputFormat for ParquetOutputFormat {
}
let mut buf = Vec::with_capacity(DEFAULT_BLOCK_BUFFER_SIZE);
// While unloading data as parquet, enable dictionary unconditionally, usually this leads to smaller size.
let _ = blocks_to_parquet(
&self.schema,
blocks,
&mut buf,
TableCompression::Zstd,
true,
None,
)?;
let options = ParquetWriteOptions::builder(TableCompression::Zstd)
.enable_dictionary(true)
.build();
let _ = blocks_to_parquet(&self.schema, blocks, &mut buf, &options)?;
Ok(buf)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,14 @@ impl Interpreter for RefreshIndexInterpreter {
let sink_schema = Arc::new(sink_schema);

build_res.main_pipeline.try_resize(1)?;
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let write_settings = fuse_table.get_write_settings_with_ctx(&table_ctx)?;
build_res.main_pipeline.add_sink(|input| {
AggIndexSink::try_create(
input,
fuse_table.get_operator(),
self.plan.index_id,
fuse_table.get_write_settings(),
write_settings.clone(),
sink_schema.clone(),
block_name_offset,
self.plan.user_defined_block_name,
Expand Down
12 changes: 4 additions & 8 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use databend_common_storages_fuse::io::TableMetaLocationGenerator;
use databend_common_storages_fuse::io::WriteSettings;
use databend_common_storages_fuse::io::build_column_hlls;
use databend_common_storages_fuse::io::serialize_block;
use databend_storages_common_blocks::ParquetWriteOptions;
use databend_storages_common_blocks::blocks_to_parquet;
use databend_storages_common_index::BloomIndex;
use databend_storages_common_index::BloomIndexBuilder;
Expand Down Expand Up @@ -152,14 +153,9 @@ impl<'a> BlockWriter<'a> {
let filter_schema = bloom_index.filter_schema;
let mut data = Vec::with_capacity(DEFAULT_BLOCK_INDEX_BUFFER_SIZE);
let index_block_schema = &filter_schema;
let meta = blocks_to_parquet(
index_block_schema,
vec![index_block],
&mut data,
TableCompression::None,
false,
None,
)?;
let options = ParquetWriteOptions::builder(TableCompression::None).build();
let meta =
blocks_to_parquet(index_block_schema, vec![index_block], &mut data, &options)?;
let size = data.len() as u64;
data_accessor.write(&location.0, data).await?;
Ok((size, Some(location), Some(meta)))
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_parquet_delta_binary_packed_heuristic_rule", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables the DELTA_BINARY_PACKED heuristic rule for Parquet integer columns.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("external_server_connect_timeout_secs", DefaultSettingValue {
value: UserSettingValue::UInt64(10),
desc: "Connection timeout to external server",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ impl Settings {
Ok(self.try_get_u64("enable_parquet_prewhere")? != 0)
}

pub fn get_enable_parquet_delta_binary_packed_heuristic_rule(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_parquet_delta_binary_packed_heuristic_rule")? != 0)
}

pub fn get_numeric_cast_option(&self) -> Result<String> {
self.try_get_string("numeric_cast_option")
}
Expand Down
11 changes: 3 additions & 8 deletions src/query/storages/basic/src/result_cache/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use databend_common_exception::Result;
use databend_common_expression::DataBlock;
use databend_common_expression::TableSchemaRef;
use databend_storages_common_blocks::ParquetWriteOptions;
use databend_storages_common_blocks::blocks_to_parquet;
use databend_storages_common_table_meta::table::TableCompression;
use opendal::Operator;
Expand Down Expand Up @@ -72,14 +73,8 @@ impl ResultCacheWriter {
#[async_backtrace::framed]
pub async fn write_to_storage(&self) -> Result<String> {
let mut buf = Vec::with_capacity(self.current_bytes);
let _ = blocks_to_parquet(
&self.schema,
self.blocks.clone(),
&mut buf,
TableCompression::None,
false,
None,
)?;
let options = ParquetWriteOptions::builder(TableCompression::None).build();
let _ = blocks_to_parquet(&self.schema, self.blocks.clone(), &mut buf, &options)?;

let file_location = format!("{}/{}.parquet", self.location, Uuid::new_v4().as_simple());

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchema;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::number::NumberScalar;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use parquet::basic::Encoding;
use parquet::file::properties::WriterPropertiesBuilder;

use crate::encoding_rules::ColumnPathsCache;
use crate::encoding_rules::EncodingStatsProvider;

// NDV must be close to row count (~90%+). Empirical value based on experiments and operational experience.
const DELTA_HIGH_CARDINALITY_RATIO: f64 = 0.9;
// Span (max - min + 1) should be close to NDV. Empirical value based on experiments and operational experience.
const DELTA_RANGE_TOLERANCE: f64 = 1.05;

pub fn apply_delta_binary_packed_heuristic(
mut builder: WriterPropertiesBuilder,
metrics: &dyn EncodingStatsProvider,
table_schema: &TableSchema,
num_rows: usize,
column_paths_cache: &mut ColumnPathsCache,
) -> WriterPropertiesBuilder {
for field in table_schema.leaf_fields() {
// Restrict the DBP heuristic to native INT32/UINT32 columns for now.
// INT64 columns with high zero bits already compress well with PLAIN+Zstd, and other
// widths need more validation before enabling DBP.
if !matches!(
field.data_type().remove_nullable(),
TableDataType::Number(NumberDataType::Int32)
| TableDataType::Number(NumberDataType::UInt32)
) {
continue;
}
let column_id = field.column_id();
let Some(stats) = metrics.column_stats(&column_id) else {
continue;
};
let Some(ndv) = metrics.column_ndv(&column_id) else {
continue;
};
if should_apply_delta_binary_packed(stats, ndv, num_rows) {
let column_paths = column_paths_cache.get_or_build(table_schema);
if let Some(path) = column_paths.get(&column_id) {
builder = builder
.set_column_dictionary_enabled(path.clone(), false)
.set_column_encoding(path.clone(), Encoding::DELTA_BINARY_PACKED);
}
}
}
builder
}

/// Evaluate whether Delta Binary Packed (DBP) is worth enabling for a 32-bit integer column.
///
/// The DBP heuristic rule is intentionally conservative:
/// - DBP is only considered when the block looks like a contiguous INT32/UINT32 range (no NULLs).
/// - NDV must be very close to the row count (`DELTA_HIGH_CARDINALITY_RATIO`).
/// - The `[min, max]` span should be close to NDV (`DELTA_RANGE_TOLERANCE`).
/// Experiments show that such blocks shrink dramatically after DBP + compression while decode CPU
/// remains affordable, yielding the best IO + CPU trade-off.
fn should_apply_delta_binary_packed(stats: &ColumnStatistics, ndv: u64, num_rows: usize) -> bool {
// Nulls weaken the contiguous-range signal, so we avoid the heuristic when they exist.
if num_rows == 0 || ndv == 0 || stats.null_count > 0 {
return false;
}
let Some(min) = scalar_to_i64(&stats.min) else {
return false;
};
let Some(max) = scalar_to_i64(&stats.max) else {
return false;
};
// Degenerate spans (single value) already compress well without DBP.
if max <= min {
return false;
}
// Use ratio-based heuristics instead of absolute NDV threshold to decouple from block size.
let ndv_ratio = ndv as f64 / num_rows as f64;
if ndv_ratio < DELTA_HIGH_CARDINALITY_RATIO {
return false;
}
let span = (max - min + 1) as f64;
let contiguous_ratio = span / ndv as f64;
contiguous_ratio <= DELTA_RANGE_TOLERANCE
}

fn scalar_to_i64(val: &Scalar) -> Option<i64> {
// Only 32-bit integers reach the delta heuristic (see matches! check above),
// so we deliberately reject other widths to avoid misinterpreting large values.
match val {
Scalar::Number(NumberScalar::Int32(v)) => Some(*v as i64),
Scalar::Number(NumberScalar::UInt32(v)) => Some(*v as i64),
_ => None,
}
}
43 changes: 43 additions & 0 deletions src/query/storages/common/blocks/src/encoding_rules/dictionary.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use databend_common_expression::TableSchema;
use parquet::file::properties::WriterPropertiesBuilder;

use crate::encoding_rules::ColumnPathsCache;
use crate::encoding_rules::EncodingStatsProvider;

/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;

pub fn apply_dictionary_high_cardinality_heuristic(
mut builder: WriterPropertiesBuilder,
metrics: &dyn EncodingStatsProvider,
table_schema: &TableSchema,
num_rows: usize,
column_paths_cache: &mut ColumnPathsCache,
) -> WriterPropertiesBuilder {
if num_rows == 0 {
return builder;
}
let column_paths = column_paths_cache.get_or_build(table_schema);
for (column_id, column_path) in column_paths.iter() {
if let Some(ndv) = metrics.column_ndv(column_id) {
if (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD {
builder = builder.set_column_dictionary_enabled(column_path.clone(), false);
}
}
}
builder
}
80 changes: 80 additions & 0 deletions src/query/storages/common/blocks/src/encoding_rules/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2021 Datafuse Labs
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;

use databend_common_expression::ColumnId;
use databend_common_expression::TableSchema;
use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
use parquet::schema::types::ColumnPath;

pub mod delta_binary_packed;
pub mod page_limit;

pub mod dictionary;

pub struct ColumnPathsCache {
cache: Option<HashMap<ColumnId, ColumnPath>>,
}

impl ColumnPathsCache {
pub fn new() -> Self {
Self { cache: None }
}

pub fn get_or_build(&mut self, table_schema: &TableSchema) -> &HashMap<ColumnId, ColumnPath> {
if self.cache.is_none() {
self.cache = Some(
table_schema_arrow_leaf_paths(table_schema)
.into_iter()
.map(|(id, path)| (id, ColumnPath::from(path)))
.collect(),
);
}
self.cache.as_ref().unwrap()
}
}

/// Provides per column NDV statistics.
pub trait NdvProvider {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64>;
}

impl NdvProvider for &StatisticsOfColumns {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
self.get(column_id).and_then(|item| item.distinct_of_values)
}
}

pub trait EncodingStatsProvider: NdvProvider {
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics>;
}

pub struct ColumnStatsView<'a>(pub &'a StatisticsOfColumns);

impl<'a> NdvProvider for ColumnStatsView<'a> {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
self.0
.get(column_id)
.and_then(|item| item.distinct_of_values)
}
}

impl<'a> EncodingStatsProvider for ColumnStatsView<'a> {
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> {
self.0.get(column_id)
}
}
Loading
Loading