Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ pub async fn do_refresh_virtual_column(
let segment_reader =
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());

let write_settings = fuse_table.get_write_settings();
let write_settings = fuse_table.get_write_settings_with_ctx(&ctx)?;
let storage_format = write_settings.storage_format;

let operator = fuse_table.get_operator_ref();
Expand Down
1 change: 1 addition & 0 deletions src/query/formats/src/output_format/parquet.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ impl OutputFormat for ParquetOutputFormat {
&mut buf,
TableCompression::Zstd,
true,
false,
None,
)?;
Ok(buf)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -324,12 +324,14 @@ impl Interpreter for RefreshIndexInterpreter {
let sink_schema = Arc::new(sink_schema);

build_res.main_pipeline.try_resize(1)?;
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
let write_settings = fuse_table.get_write_settings_with_ctx(&table_ctx)?;
build_res.main_pipeline.add_sink(|input| {
AggIndexSink::try_create(
input,
fuse_table.get_operator(),
self.plan.index_id,
fuse_table.get_write_settings(),
write_settings.clone(),
sink_schema.clone(),
block_name_offset,
self.plan.user_defined_block_name,
Expand Down
1 change: 1 addition & 0 deletions src/query/service/src/test_kits/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,7 @@ impl<'a> BlockWriter<'a> {
&mut data,
TableCompression::None,
false,
false,
None,
)?;
let size = data.len() as u64;
Expand Down
7 changes: 7 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -977,6 +977,13 @@ impl DefaultSettings {
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("enable_parquet_int32_delta_encoding", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables automatic INT32 DELTA_BINARY_PACKED encoding when heuristics match.",
mode: SettingMode::Both,
scope: SettingScope::Both,
range: Some(SettingRange::Numeric(0..=1)),
}),
("external_server_connect_timeout_secs", DefaultSettingValue {
value: UserSettingValue::UInt64(10),
desc: "Connection timeout to external server",
Expand Down
4 changes: 4 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,10 @@ impl Settings {
Ok(self.try_get_u64("enable_parquet_prewhere")? != 0)
}

pub fn get_enable_parquet_int32_delta_encoding(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_parquet_int32_delta_encoding")? != 0)
}

pub fn get_numeric_cast_option(&self) -> Result<String> {
self.try_get_string("numeric_cast_option")
}
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/basic/src/result_cache/write/writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl ResultCacheWriter {
&mut buf,
TableCompression::None,
false,
false,
None,
)?;

Expand Down
152 changes: 136 additions & 16 deletions src/query/storages/common/blocks/src/parquet_rs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,19 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::HashMap;
use std::sync::Arc;

use databend_common_exception::Result;
use databend_common_expression::ColumnId;
use databend_common_expression::DataBlock;
use databend_common_expression::Scalar;
use databend_common_expression::TableDataType;
use databend_common_expression::TableSchema;
use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths;
use databend_common_expression::types::NumberDataType;
use databend_common_expression::types::number::NumberScalar;
use databend_storages_common_table_meta::meta::ColumnStatistics;
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
use databend_storages_common_table_meta::table::TableCompression;
use parquet::arrow::ArrowWriter;
Expand All @@ -32,6 +38,9 @@ use parquet::schema::types::ColumnPath;

/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;
const DELTA_HIGH_CARDINALITY_RATIO: f64 = 0.9;
const DELTA_RANGE_TOLERANCE: f64 = 1.05;
const DELTA_MIN_NDV: u64 = 1024;

/// Serialize data blocks to parquet format.
pub fn blocks_to_parquet(
Expand All @@ -40,6 +49,7 @@ pub fn blocks_to_parquet(
write_buffer: &mut Vec<u8>,
compression: TableCompression,
enable_dictionary: bool,
enable_parquet_int32_delta_encoding: bool,
metadata: Option<Vec<KeyValue>>,
) -> Result<FileMetaData> {
blocks_to_parquet_with_stats(
Expand All @@ -48,6 +58,7 @@ pub fn blocks_to_parquet(
write_buffer,
compression,
enable_dictionary,
enable_parquet_int32_delta_encoding,
metadata,
None,
)
Expand All @@ -69,6 +80,7 @@ pub fn blocks_to_parquet_with_stats(
write_buffer: &mut Vec<u8>,
compression: TableCompression,
enable_dictionary: bool,
enable_parquet_int32_delta_encoding: bool,
metadata: Option<Vec<KeyValue>>,
column_stats: Option<&StatisticsOfColumns>,
) -> Result<FileMetaData> {
Expand All @@ -78,11 +90,15 @@ pub fn blocks_to_parquet_with_stats(
// the streaming writer and only rely on the first block's NDV (and row count) snapshot.
let num_rows = blocks[0].num_rows();
let arrow_schema = Arc::new(table_schema.into());
let column_metrics = column_stats.map(ColumnStatsView);

let props = build_parquet_writer_properties(
compression,
enable_dictionary,
column_stats,
enable_parquet_int32_delta_encoding,
column_metrics
.as_ref()
.map(|view| view as &dyn EncodingStatsProvider),
metadata,
num_rows,
table_schema,
Expand All @@ -105,7 +121,8 @@ pub fn blocks_to_parquet_with_stats(
pub fn build_parquet_writer_properties(
compression: TableCompression,
enable_dictionary: bool,
cols_stats: Option<impl NdvProvider>,
enable_parquet_int32_delta_encoding: bool,
column_metrics: Option<&dyn EncodingStatsProvider>,
metadata: Option<Vec<KeyValue>>,
num_rows: usize,
table_schema: &TableSchema,
Expand All @@ -120,27 +137,62 @@ pub fn build_parquet_writer_properties(
.set_key_value_metadata(metadata);

if enable_dictionary {
// Enable dictionary for all columns
builder = builder
.set_writer_version(WriterVersion::PARQUET_2_0)
.set_dictionary_enabled(true);
if let Some(cols_stats) = cols_stats {
// Disable dictionary of columns that have high cardinality
for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) {
if let Some(ndv) = cols_stats.column_ndv(&column_id) {
builder = builder.set_dictionary_enabled(true);
} else {
builder = builder.set_dictionary_enabled(false);
}

if enable_dictionary || enable_parquet_int32_delta_encoding {
builder = builder.set_writer_version(WriterVersion::PARQUET_2_0);
}

let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(table_schema)
.into_iter()
.map(|(id, path)| (id, ColumnPath::from(path)))
.collect();

if enable_dictionary {
if let Some(metrics) = column_metrics {
for (column_id, column_path) in &column_paths {
if let Some(ndv) = metrics.column_ndv(column_id) {
if num_rows > 0
&& (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD
{
builder = builder.set_column_dictionary_enabled(column_path.clone(), false);
}
}
}
}
}

if enable_parquet_int32_delta_encoding {
if let Some(metrics) = column_metrics {
for field in table_schema.leaf_fields() {
if !matches!(
field.data_type().remove_nullable(),
TableDataType::Number(NumberDataType::Int32)
) {
continue;
}
let column_id = field.column_id();
let Some(stats) = metrics.column_stats(&column_id) else {
continue;
};
let Some(ndv) = metrics.column_ndv(&column_id) else {
continue;
};
if should_apply_int32_delta(stats, ndv, num_rows) {
if let Some(path) = column_paths.get(&column_id) {
builder = builder
.set_column_dictionary_enabled(ColumnPath::from(components), false);
.set_column_dictionary_enabled(path.clone(), false)
.set_column_encoding(path.clone(), Encoding::DELTA_BINARY_PACKED);
}
}
}
}
builder.build()
} else {
builder.set_dictionary_enabled(false).build()
}

builder.build()
}

/// Provides per column NDV statistics
Expand All @@ -154,6 +206,65 @@ impl NdvProvider for &StatisticsOfColumns {
}
}

pub trait EncodingStatsProvider: NdvProvider {
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics>;
}

struct ColumnStatsView<'a>(&'a StatisticsOfColumns);

impl<'a> NdvProvider for ColumnStatsView<'a> {
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
self.0
.get(column_id)
.and_then(|item| item.distinct_of_values)
}
}

impl<'a> EncodingStatsProvider for ColumnStatsView<'a> {
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> {
self.0.get(column_id)
}
}

fn should_apply_int32_delta(stats: &ColumnStatistics, ndv: u64, num_rows: usize) -> bool {
if ndv < DELTA_MIN_NDV || num_rows == 0 || stats.null_count > 0 {
return false;
}
let Some(min) = scalar_to_i64(&stats.min) else {
return false;
};
let Some(max) = scalar_to_i64(&stats.max) else {
return false;
};
if max <= min {
return false;
}
let ndv_ratio = ndv as f64 / num_rows as f64;
if ndv_ratio < DELTA_HIGH_CARDINALITY_RATIO {
return false;
}
let span = (max - min + 1) as f64;
let contiguous_ratio = span / ndv as f64;
contiguous_ratio <= DELTA_RANGE_TOLERANCE
}

fn scalar_to_i64(val: &Scalar) -> Option<i64> {
match val {
Scalar::Number(num) => match num {
NumberScalar::Int8(v) => Some(*v as i64),
NumberScalar::Int16(v) => Some(*v as i64),
NumberScalar::Int32(v) => Some(*v as i64),
NumberScalar::Int64(v) => Some(*v),
NumberScalar::UInt8(v) => Some(*v as i64),
NumberScalar::UInt16(v) => Some(*v as i64),
NumberScalar::UInt32(v) => Some(*v as i64),
NumberScalar::UInt64(v) => i64::try_from(*v).ok(),
_ => None,
},
_ => None,
}
}

#[cfg(test)]
mod tests {
use std::collections::HashMap;
Expand All @@ -174,6 +285,12 @@ mod tests {
}
}

impl EncodingStatsProvider for TestNdvProvider {
fn column_stats(&self, _column_id: &ColumnId) -> Option<&ColumnStatistics> {
None
}
}

fn sample_schema() -> TableSchema {
TableSchema::new(vec![
TableField::new("simple", TableDataType::Number(NumberDataType::Int32)),
Expand Down Expand Up @@ -211,10 +328,12 @@ mod tests {
.map(|(id, path)| (id, ColumnPath::from(path)))
.collect();

let provider = TestNdvProvider { ndv };
let props = build_parquet_writer_properties(
TableCompression::Zstd,
true,
Some(TestNdvProvider { ndv }),
false,
Some(&provider),
None,
1000,
&schema,
Expand Down Expand Up @@ -250,7 +369,8 @@ mod tests {
let props = build_parquet_writer_properties(
TableCompression::Zstd,
false,
None::<TestNdvProvider>,
false,
None,
None,
1000,
&schema,
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/benches/bench.rs
Original file line number Diff line number Diff line change
Expand Up @@ -148,6 +148,7 @@ mod dummy {
max_page_size,
block_per_seg,
enable_parquet_dictionary,
enable_parquet_int32_delta_encoding: false,
};
let schema = Arc::new(schema);
let mut buffer = Vec::new();
Expand Down
14 changes: 14 additions & 0 deletions src/query/storages/fuse/src/fuse_table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -335,9 +335,23 @@ impl FuseTable {
max_page_size,
block_per_seg,
enable_parquet_dictionary: enable_parquet_dictionary_encoding,
enable_parquet_int32_delta_encoding: true,
}
}

pub fn get_write_settings_with_ctx(
&self,
ctx: &Arc<dyn TableContext>,
) -> Result<WriteSettings> {
let mut settings = self.get_write_settings();
if matches!(settings.storage_format, FuseStorageFormat::Parquet) {
settings.enable_parquet_int32_delta_encoding = ctx
.get_settings()
.get_enable_parquet_int32_delta_encoding()?;
}
Ok(settings)
}

/// Get max page size.
/// For native storage format.
pub fn get_max_page_size(&self) -> Option<usize> {
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ pub fn serialize_block_with_column_stats(
buf,
write_settings.table_compression,
write_settings.enable_parquet_dictionary,
write_settings.enable_parquet_int32_delta_encoding,
None,
column_stats,
)?;
Expand Down
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/bloom_index_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ impl BloomIndexState {
&mut data,
TableCompression::None,
false,
false,
None,
)?;
let data_size = data.len() as u64;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -338,6 +338,7 @@ impl InvertedIndexWriter {
TableCompression::Zstd,
// No dictionary page for inverted index
false,
false,
None,
)?;

Expand Down
Loading
Loading