Skip to content

Commit c2f0c67

Browse files
committed
refactor: heuristics int delta_binary_packed encoding rule
1 parent 3237d1b commit c2f0c67

File tree

27 files changed

+315
-32
lines changed

27 files changed

+315
-32
lines changed

src/query/ee/src/storages/fuse/operations/virtual_columns.rs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,7 @@ pub async fn do_refresh_virtual_column(
128128
let segment_reader =
129129
MetaReaders::segment_info_reader(fuse_table.get_operator(), table_schema.clone());
130130

131-
let write_settings = fuse_table.get_write_settings();
131+
let write_settings = fuse_table.get_write_settings_with_ctx(&ctx)?;
132132
let storage_format = write_settings.storage_format;
133133

134134
let operator = fuse_table.get_operator_ref();

src/query/formats/src/output_format/parquet.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -60,6 +60,7 @@ impl OutputFormat for ParquetOutputFormat {
6060
&mut buf,
6161
TableCompression::Zstd,
6262
true,
63+
false,
6364
None,
6465
)?;
6566
Ok(buf)

src/query/service/src/interpreters/interpreter_index_refresh.rs

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -324,12 +324,14 @@ impl Interpreter for RefreshIndexInterpreter {
324324
let sink_schema = Arc::new(sink_schema);
325325

326326
build_res.main_pipeline.try_resize(1)?;
327+
let table_ctx: Arc<dyn TableContext> = self.ctx.clone();
328+
let write_settings = fuse_table.get_write_settings_with_ctx(&table_ctx)?;
327329
build_res.main_pipeline.add_sink(|input| {
328330
AggIndexSink::try_create(
329331
input,
330332
fuse_table.get_operator(),
331333
self.plan.index_id,
332-
fuse_table.get_write_settings(),
334+
write_settings.clone(),
333335
sink_schema.clone(),
334336
block_name_offset,
335337
self.plan.user_defined_block_name,

src/query/service/src/test_kits/block_writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,7 @@ impl<'a> BlockWriter<'a> {
158158
&mut data,
159159
TableCompression::None,
160160
false,
161+
false,
161162
None,
162163
)?;
163164
let size = data.len() as u64;

src/query/settings/src/settings_default.rs

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -956,6 +956,13 @@ impl DefaultSettings {
956956
scope: SettingScope::Both,
957957
range: Some(SettingRange::Numeric(0..=1)),
958958
}),
959+
("enable_parquet_int32_delta_encoding", DefaultSettingValue {
960+
value: UserSettingValue::UInt64(1),
961+
desc: "Enables automatic INT32 DELTA_BINARY_PACKED encoding when heuristics match.",
962+
mode: SettingMode::Both,
963+
scope: SettingScope::Both,
964+
range: Some(SettingRange::Numeric(0..=1)),
965+
}),
959966
("external_server_connect_timeout_secs", DefaultSettingValue {
960967
value: UserSettingValue::UInt64(10),
961968
desc: "Connection timeout to external server",

src/query/settings/src/settings_getter_setter.rs

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -721,6 +721,10 @@ impl Settings {
721721
Ok(self.try_get_u64("enable_parquet_prewhere")? != 0)
722722
}
723723

724+
pub fn get_enable_parquet_int32_delta_encoding(&self) -> Result<bool> {
725+
Ok(self.try_get_u64("enable_parquet_int32_delta_encoding")? != 0)
726+
}
727+
724728
pub fn get_numeric_cast_option(&self) -> Result<String> {
725729
self.try_get_string("numeric_cast_option")
726730
}

src/query/storages/basic/src/result_cache/write/writer.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -78,6 +78,7 @@ impl ResultCacheWriter {
7878
&mut buf,
7979
TableCompression::None,
8080
false,
81+
false,
8182
None,
8283
)?;
8384

src/query/storages/common/blocks/src/parquet_rs.rs

Lines changed: 136 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -12,13 +12,19 @@
1212
// See the License for the specific language governing permissions and
1313
// limitations under the License.
1414

15+
use std::collections::HashMap;
1516
use std::sync::Arc;
1617

1718
use databend_common_exception::Result;
1819
use databend_common_expression::ColumnId;
1920
use databend_common_expression::DataBlock;
21+
use databend_common_expression::Scalar;
22+
use databend_common_expression::TableDataType;
2023
use databend_common_expression::TableSchema;
2124
use databend_common_expression::converts::arrow::table_schema_arrow_leaf_paths;
25+
use databend_common_expression::types::NumberDataType;
26+
use databend_common_expression::types::number::NumberScalar;
27+
use databend_storages_common_table_meta::meta::ColumnStatistics;
2228
use databend_storages_common_table_meta::meta::StatisticsOfColumns;
2329
use databend_storages_common_table_meta::table::TableCompression;
2430
use parquet::arrow::ArrowWriter;
@@ -32,6 +38,9 @@ use parquet::schema::types::ColumnPath;
3238

3339
/// Disable dictionary encoding once the NDV-to-row ratio is greater than this threshold.
3440
const HIGH_CARDINALITY_RATIO_THRESHOLD: f64 = 0.1;
41+
const DELTA_HIGH_CARDINALITY_RATIO: f64 = 0.9;
42+
const DELTA_RANGE_TOLERANCE: f64 = 1.02;
43+
const DELTA_MIN_NDV: u64 = 1024;
3544

3645
/// Serialize data blocks to parquet format.
3746
pub fn blocks_to_parquet(
@@ -40,6 +49,7 @@ pub fn blocks_to_parquet(
4049
write_buffer: &mut Vec<u8>,
4150
compression: TableCompression,
4251
enable_dictionary: bool,
52+
enable_parquet_int32_delta_encoding: bool,
4353
metadata: Option<Vec<KeyValue>>,
4454
) -> Result<FileMetaData> {
4555
blocks_to_parquet_with_stats(
@@ -48,6 +58,7 @@ pub fn blocks_to_parquet(
4858
write_buffer,
4959
compression,
5060
enable_dictionary,
61+
enable_parquet_int32_delta_encoding,
5162
metadata,
5263
None,
5364
)
@@ -69,6 +80,7 @@ pub fn blocks_to_parquet_with_stats(
6980
write_buffer: &mut Vec<u8>,
7081
compression: TableCompression,
7182
enable_dictionary: bool,
83+
enable_parquet_int32_delta_encoding: bool,
7284
metadata: Option<Vec<KeyValue>>,
7385
column_stats: Option<&StatisticsOfColumns>,
7486
) -> Result<FileMetaData> {
@@ -78,11 +90,15 @@ pub fn blocks_to_parquet_with_stats(
7890
// the streaming writer and only rely on the first block's NDV (and row count) snapshot.
7991
let num_rows = blocks[0].num_rows();
8092
let arrow_schema = Arc::new(table_schema.into());
93+
let column_metrics = column_stats.map(ColumnStatsView);
8194

8295
let props = build_parquet_writer_properties(
8396
compression,
8497
enable_dictionary,
85-
column_stats,
98+
enable_parquet_int32_delta_encoding,
99+
column_metrics
100+
.as_ref()
101+
.map(|view| view as &dyn EncodingStatsProvider),
86102
metadata,
87103
num_rows,
88104
table_schema,
@@ -105,7 +121,8 @@ pub fn blocks_to_parquet_with_stats(
105121
pub fn build_parquet_writer_properties(
106122
compression: TableCompression,
107123
enable_dictionary: bool,
108-
cols_stats: Option<impl NdvProvider>,
124+
enable_parquet_int32_delta_encoding: bool,
125+
column_metrics: Option<&dyn EncodingStatsProvider>,
109126
metadata: Option<Vec<KeyValue>>,
110127
num_rows: usize,
111128
table_schema: &TableSchema,
@@ -120,27 +137,62 @@ pub fn build_parquet_writer_properties(
120137
.set_key_value_metadata(metadata);
121138

122139
if enable_dictionary {
123-
// Enable dictionary for all columns
124-
builder = builder
125-
.set_writer_version(WriterVersion::PARQUET_2_0)
126-
.set_dictionary_enabled(true);
127-
if let Some(cols_stats) = cols_stats {
128-
// Disable dictionary of columns that have high cardinality
129-
for (column_id, components) in table_schema_arrow_leaf_paths(table_schema) {
130-
if let Some(ndv) = cols_stats.column_ndv(&column_id) {
140+
builder = builder.set_dictionary_enabled(true);
141+
} else {
142+
builder = builder.set_dictionary_enabled(false);
143+
}
144+
145+
if enable_dictionary || enable_parquet_int32_delta_encoding {
146+
builder = builder.set_writer_version(WriterVersion::PARQUET_2_0);
147+
}
148+
149+
let column_paths: HashMap<ColumnId, ColumnPath> = table_schema_arrow_leaf_paths(table_schema)
150+
.into_iter()
151+
.map(|(id, path)| (id, ColumnPath::from(path)))
152+
.collect();
153+
154+
if enable_dictionary {
155+
if let Some(metrics) = column_metrics {
156+
for (column_id, column_path) in &column_paths {
157+
if let Some(ndv) = metrics.column_ndv(column_id) {
131158
if num_rows > 0
132159
&& (ndv as f64 / num_rows as f64) > HIGH_CARDINALITY_RATIO_THRESHOLD
133160
{
161+
builder = builder.set_column_dictionary_enabled(column_path.clone(), false);
162+
}
163+
}
164+
}
165+
}
166+
}
167+
168+
if enable_parquet_int32_delta_encoding {
169+
if let Some(metrics) = column_metrics {
170+
for field in table_schema.leaf_fields() {
171+
if !matches!(
172+
field.data_type().remove_nullable(),
173+
TableDataType::Number(NumberDataType::Int32)
174+
) {
175+
continue;
176+
}
177+
let column_id = field.column_id();
178+
let Some(stats) = metrics.column_stats(&column_id) else {
179+
continue;
180+
};
181+
let Some(ndv) = metrics.column_ndv(&column_id) else {
182+
continue;
183+
};
184+
if should_apply_int32_delta(stats, ndv, num_rows) {
185+
if let Some(path) = column_paths.get(&column_id) {
134186
builder = builder
135-
.set_column_dictionary_enabled(ColumnPath::from(components), false);
187+
.set_column_dictionary_enabled(path.clone(), false)
188+
.set_column_encoding(path.clone(), Encoding::DELTA_BINARY_PACKED);
136189
}
137190
}
138191
}
139192
}
140-
builder.build()
141-
} else {
142-
builder.set_dictionary_enabled(false).build()
143193
}
194+
195+
builder.build()
144196
}
145197

146198
/// Provides per column NDV statistics
@@ -154,6 +206,65 @@ impl NdvProvider for &StatisticsOfColumns {
154206
}
155207
}
156208

209+
pub trait EncodingStatsProvider: NdvProvider {
210+
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics>;
211+
}
212+
213+
struct ColumnStatsView<'a>(&'a StatisticsOfColumns);
214+
215+
impl<'a> NdvProvider for ColumnStatsView<'a> {
216+
fn column_ndv(&self, column_id: &ColumnId) -> Option<u64> {
217+
self.0
218+
.get(column_id)
219+
.and_then(|item| item.distinct_of_values)
220+
}
221+
}
222+
223+
impl<'a> EncodingStatsProvider for ColumnStatsView<'a> {
224+
fn column_stats(&self, column_id: &ColumnId) -> Option<&ColumnStatistics> {
225+
self.0.get(column_id)
226+
}
227+
}
228+
229+
fn should_apply_int32_delta(stats: &ColumnStatistics, ndv: u64, num_rows: usize) -> bool {
230+
if ndv < DELTA_MIN_NDV || num_rows == 0 || stats.null_count > 0 {
231+
return false;
232+
}
233+
let Some(min) = scalar_to_i64(&stats.min) else {
234+
return false;
235+
};
236+
let Some(max) = scalar_to_i64(&stats.max) else {
237+
return false;
238+
};
239+
if max <= min {
240+
return false;
241+
}
242+
let ndv_ratio = ndv as f64 / num_rows as f64;
243+
if ndv_ratio < DELTA_HIGH_CARDINALITY_RATIO {
244+
return false;
245+
}
246+
let span = (max - min + 1) as f64;
247+
let contiguous_ratio = span / ndv as f64;
248+
contiguous_ratio <= DELTA_RANGE_TOLERANCE
249+
}
250+
251+
fn scalar_to_i64(val: &Scalar) -> Option<i64> {
252+
match val {
253+
Scalar::Number(num) => match num {
254+
NumberScalar::Int8(v) => Some(*v as i64),
255+
NumberScalar::Int16(v) => Some(*v as i64),
256+
NumberScalar::Int32(v) => Some(*v as i64),
257+
NumberScalar::Int64(v) => Some(*v),
258+
NumberScalar::UInt8(v) => Some(*v as i64),
259+
NumberScalar::UInt16(v) => Some(*v as i64),
260+
NumberScalar::UInt32(v) => Some(*v as i64),
261+
NumberScalar::UInt64(v) => i64::try_from(*v).ok(),
262+
_ => None,
263+
},
264+
_ => None,
265+
}
266+
}
267+
157268
#[cfg(test)]
158269
mod tests {
159270
use std::collections::HashMap;
@@ -174,6 +285,12 @@ mod tests {
174285
}
175286
}
176287

288+
impl EncodingStatsProvider for TestNdvProvider {
289+
fn column_stats(&self, _column_id: &ColumnId) -> Option<&ColumnStatistics> {
290+
None
291+
}
292+
}
293+
177294
fn sample_schema() -> TableSchema {
178295
TableSchema::new(vec![
179296
TableField::new("simple", TableDataType::Number(NumberDataType::Int32)),
@@ -211,10 +328,12 @@ mod tests {
211328
.map(|(id, path)| (id, ColumnPath::from(path)))
212329
.collect();
213330

331+
let provider = TestNdvProvider { ndv };
214332
let props = build_parquet_writer_properties(
215333
TableCompression::Zstd,
216334
true,
217-
Some(TestNdvProvider { ndv }),
335+
false,
336+
Some(&provider),
218337
None,
219338
1000,
220339
&schema,
@@ -250,7 +369,8 @@ mod tests {
250369
let props = build_parquet_writer_properties(
251370
TableCompression::Zstd,
252371
false,
253-
None::<TestNdvProvider>,
372+
false,
373+
None::<&TestNdvProvider>,
254374
None,
255375
1000,
256376
&schema,

src/query/storages/fuse/benches/bench.rs

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -148,6 +148,7 @@ mod dummy {
148148
max_page_size,
149149
block_per_seg,
150150
enable_parquet_dictionary,
151+
enable_parquet_int32_delta_encoding: false,
151152
};
152153
let schema = Arc::new(schema);
153154
let mut buffer = Vec::new();

src/query/storages/fuse/src/fuse_table.rs

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -331,9 +331,23 @@ impl FuseTable {
331331
max_page_size,
332332
block_per_seg,
333333
enable_parquet_dictionary: enable_parquet_dictionary_encoding,
334+
enable_parquet_int32_delta_encoding: true,
334335
}
335336
}
336337

338+
pub fn get_write_settings_with_ctx(
339+
&self,
340+
ctx: &Arc<dyn TableContext>,
341+
) -> Result<WriteSettings> {
342+
let mut settings = self.get_write_settings();
343+
if matches!(settings.storage_format, FuseStorageFormat::Parquet) {
344+
settings.enable_parquet_int32_delta_encoding = ctx
345+
.get_settings()
346+
.get_enable_parquet_int32_delta_encoding()?;
347+
}
348+
Ok(settings)
349+
}
350+
337351
/// Get max page size.
338352
/// For native storage format.
339353
pub fn get_max_page_size(&self) -> Option<usize> {

0 commit comments

Comments
 (0)