diff --git a/src/common/base/src/mem_allocator/jemalloc.rs b/src/common/base/src/mem_allocator/jemalloc.rs index cffdc5dcd1c02..7a6b15ba57768 100644 --- a/src/common/base/src/mem_allocator/jemalloc.rs +++ b/src/common/base/src/mem_allocator/jemalloc.rs @@ -249,7 +249,7 @@ pub mod not_linux { #[inline(always)] unsafe fn deallocate(&self, ptr: NonNull, layout: Layout) { - StdAllocator.deallocate(ptr, layout) + unsafe { StdAllocator.deallocate(ptr, layout) } } unsafe fn grow( @@ -258,7 +258,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.grow(ptr, old_layout, new_layout) + unsafe { StdAllocator.grow(ptr, old_layout, new_layout) } } unsafe fn grow_zeroed( @@ -267,7 +267,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.grow_zeroed(ptr, old_layout, new_layout) + unsafe { StdAllocator.grow_zeroed(ptr, old_layout, new_layout) } } unsafe fn shrink( @@ -276,7 +276,7 @@ pub mod not_linux { old_layout: Layout, new_layout: Layout, ) -> Result, AllocError> { - StdAllocator.shrink(ptr, old_layout, new_layout) + unsafe { StdAllocator.shrink(ptr, old_layout, new_layout) } } } } diff --git a/src/common/exception/src/exception_backtrace.rs b/src/common/exception/src/exception_backtrace.rs index a1f9569fb2774..7835913faf2db 100644 --- a/src/common/exception/src/exception_backtrace.rs +++ b/src/common/exception/src/exception_backtrace.rs @@ -77,6 +77,7 @@ pub struct ResolvedStackFrame { pub column: Option, } +#[cfg(target_os = "linux")] #[derive(serde::Serialize, serde::Deserialize, Debug, Clone)] pub struct PhysicalAddr { pub physical_addr: usize, diff --git a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs index 55759c9269118..2434c45c1db71 100644 --- a/src/query/storages/fuse/src/io/read/meta/meta_readers.rs +++ b/src/query/storages/fuse/src/io/read/meta/meta_readers.rs @@ -45,7 +45,7 @@ use opendal::Operator; use parquet::format::FileMetaData; use parquet::thrift::TSerializable; -use self::thrift_file_meta_read::read_thrift_file_metadata; +pub use self::thrift_file_meta_read::read_thrift_file_metadata; pub type TableSnapshotStatisticsReader = InMemoryCacheReader>; diff --git a/src/query/storages/fuse/src/io/read/meta/mod.rs b/src/query/storages/fuse/src/io/read/meta/mod.rs index ce776a78db6fc..450bb81ca67dc 100644 --- a/src/query/storages/fuse/src/io/read/meta/mod.rs +++ b/src/query/storages/fuse/src/io/read/meta/mod.rs @@ -19,3 +19,4 @@ pub use meta_readers::MetaReaders; pub use meta_readers::SegmentStatsReader; pub use meta_readers::TableSnapshotReader; pub use meta_readers::bytes_reader; +pub use meta_readers::read_thrift_file_metadata; diff --git a/src/query/storages/fuse/src/table_functions/fuse_encoding.rs b/src/query/storages/fuse/src/table_functions/fuse_encoding.rs index eb4ecb66a6c4b..4dbe97f551567 100644 --- a/src/query/storages/fuse/src/table_functions/fuse_encoding.rs +++ b/src/query/storages/fuse/src/table_functions/fuse_encoding.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashMap; use std::sync::Arc; use databend_common_catalog::catalog_kind::CATALOG_DEFAULT; @@ -19,9 +20,11 @@ use databend_common_catalog::plan::DataSourcePlan; use databend_common_catalog::plan::Filters; use databend_common_catalog::table::Table; use databend_common_catalog::table_args::TableArgs; +use databend_common_catalog::table_args::string_value; use databend_common_exception::ErrorCode; use databend_common_exception::Result; use databend_common_expression::Column; +use databend_common_expression::ColumnId; use databend_common_expression::DataBlock; use databend_common_expression::Evaluator; use databend_common_expression::Expr; @@ -39,6 +42,7 @@ use databend_common_expression::types::BooleanType; use databend_common_expression::types::NumberDataType; use databend_common_expression::types::StringType; use databend_common_expression::types::UInt32Type; +use databend_common_expression::types::UInt64Type; use databend_common_expression::types::nullable::NullableColumnBuilder; use databend_common_expression::types::string::StringColumnBuilder; use databend_common_functions::BUILTIN_FUNCTIONS; @@ -49,19 +53,29 @@ use databend_common_native::stat::stat_simple; use databend_storages_common_io::MergeIOReader; use databend_storages_common_io::ReadSettings; use databend_storages_common_table_meta::meta::SegmentInfo; +use databend_storages_common_table_meta::meta::TableSnapshot; +use futures::stream; +use futures::stream::StreamExt; +use futures::stream::TryStreamExt; +use opendal::Operator; +use parquet::basic::Compression as ParquetCompression; +use parquet::basic::Encoding as ParquetEncoding; +use parquet::format::Type as ParquetPhysicalType; use crate::BlockReadResult; use crate::FuseStorageFormat; use crate::FuseTable; use crate::io::SegmentsIO; +use crate::io::read::meta::read_thrift_file_metadata; use crate::sessions::TableContext; use crate::table_functions::SimpleArgFunc; use crate::table_functions::SimpleArgFuncTemplate; -use crate::table_functions::parse_db_tb_col_args; use crate::table_functions::string_literal; pub struct FuseEncodingArgs { database_name: String, + table_name: Option, + column_name: Option, } impl TryFrom<(&str, TableArgs)> for FuseEncodingArgs { @@ -69,14 +83,42 @@ impl TryFrom<(&str, TableArgs)> for FuseEncodingArgs { fn try_from( (func_name, table_args): (&str, TableArgs), ) -> std::result::Result { - let database_name = parse_db_tb_col_args(&table_args, func_name)?; - Ok(Self { database_name }) + let args = table_args.expect_all_positioned(func_name, None)?; + if args.is_empty() || args.len() > 3 { + return Err(ErrorCode::BadArguments(format!( + "{} must accept between 1 and 3 string literals", + func_name + ))); + } + let database_name = string_value(&args[0])?; + let table_name = if args.len() > 1 { + Some(string_value(&args[1])?) + } else { + None + }; + let column_name = if args.len() > 2 { + Some(string_value(&args[2])?) + } else { + None + }; + Ok(Self { + database_name, + table_name, + column_name, + }) } } impl From<&FuseEncodingArgs> for TableArgs { fn from(args: &FuseEncodingArgs) -> Self { - TableArgs::new_positioned(vec![string_literal(args.database_name.as_str())]) + let mut positional = vec![string_literal(args.database_name.as_str())]; + if let Some(table_name) = &args.table_name { + positional.push(string_literal(table_name.as_str())); + } + if let Some(column_name) = &args.column_name { + positional.push(string_literal(column_name.as_str())); + } + TableArgs::new_positioned(positional) } } @@ -114,9 +156,15 @@ impl SimpleArgFunc for FuseEncoding { .collect::>(); let filters = plan.push_downs.as_ref().and_then(|x| x.filters.clone()); - FuseEncodingImpl::new(ctx.clone(), fuse_tables, filters) - .get_blocks() - .await + FuseEncodingImpl::new( + ctx.clone(), + fuse_tables, + filters, + args.table_name.clone(), + args.column_name.clone(), + ) + .get_blocks() + .await } } @@ -124,6 +172,21 @@ pub struct FuseEncodingImpl<'a> { pub ctx: Arc, pub tables: Vec<&'a FuseTable>, pub filters: Option, + pub table_name_filter: Option, + pub column_name_filter: Option, +} + +struct EncodingRow { + table_name: String, + storage_format: FuseStorageFormat, + block_location: String, + column_name: String, + column_type: String, + validity_size: Option, + compressed_size: u64, + uncompressed_size: u64, + level_one: String, + level_two: Option, } impl<'a> FuseEncodingImpl<'a> { @@ -131,81 +194,58 @@ impl<'a> FuseEncodingImpl<'a> { ctx: Arc, tables: Vec<&'a FuseTable>, filters: Option, + table_name_filter: Option, + column_name_filter: Option, ) -> Self { Self { ctx, tables, filters, + table_name_filter, + column_name_filter, } } #[async_backtrace::framed] pub async fn get_blocks(&self) -> Result { - let mut info = Vec::new(); + let mut rows = Vec::new(); for table in self.tables.clone() { - if matches!(table.storage_format, FuseStorageFormat::Parquet) { + if !self.table_matches(table.name()) { continue; } - let mut columns_info = vec![]; - let snapshot = table.read_table_snapshot().await?; - if snapshot.is_none() { + let Some(snapshot) = table.read_table_snapshot().await? else { continue; - } - let snapshot = snapshot.unwrap(); + }; let segments_io = SegmentsIO::create(self.ctx.clone(), table.operator.clone(), table.schema()); - let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; - let schema = table.schema(); - let fields = schema.fields(); - for chunk in snapshot.segments.chunks(chunk_size) { - let segments = segments_io - .read_segments::(chunk, false) + match table.storage_format { + FuseStorageFormat::Native => { + self.collect_native_rows( + table, + snapshot.as_ref(), + &segments_io, + chunk_size, + &mut rows, + ) + .await?; + } + FuseStorageFormat::Parquet => { + self.collect_parquet_rows( + table, + snapshot.as_ref(), + &segments_io, + chunk_size, + &mut rows, + ) .await?; - for segment in segments { - let segment = segment?; - for block in segment.blocks.iter() { - for field in fields { - if field.is_nested() { - continue; - } - let column_id = field.column_id; - let column_meta = block.col_metas.get(&column_id).unwrap(); - let (offset, len) = column_meta.offset_length(); - let ranges = vec![(column_id, offset..(offset + len))]; - let read_settings = ReadSettings::from_ctx(&self.ctx)?; - let merge_io_result = MergeIOReader::merge_io_read( - &read_settings, - table.operator.clone(), - &block.location.0, - &ranges, - ) - .await?; - - let block_read_res = - BlockReadResult::create(merge_io_result, vec![], vec![]); - - let column_chunks = block_read_res.columns_chunks()?; - let pages = column_chunks - .get(&column_id) - .unwrap() - .as_raw_data() - .unwrap() - .to_bytes(); - let pages = std::io::Cursor::new(pages); - let page_metas = column_meta.as_native().unwrap().pages.clone(); - let reader = NativeReader::new(pages, page_metas, vec![]); - let this_column_info = stat_simple(reader, field.clone())?; - columns_info.push((field.data_type.sql_name(), this_column_info)); - } - } } } - info.push((table.name(), columns_info)); } - let data_block = self.to_block(&info).await?; + + let data_block = self.to_block(&rows).await?; let result = if let Some(filter) = self.filters.as_ref().map(|f| &f.filter) { let func_ctx = FunctionContext::default(); let evaluator = Evaluator::new(&data_block, &func_ctx, &BUILTIN_FUNCTIONS); @@ -226,71 +266,327 @@ impl<'a> FuseEncodingImpl<'a> { } #[async_backtrace::framed] - async fn to_block(&self, info: &Vec<(&str, Vec<(String, ColumnInfo)>)>) -> Result { - let mut validity_size = Vec::new(); - let mut compressed_size = Vec::new(); - let mut uncompressed_size = Vec::new(); - let mut l1 = StringColumnBuilder::with_capacity(0); - let mut l2 = NullableColumnBuilder::::with_capacity(0, &[]); - let mut table_name = StringColumnBuilder::with_capacity(0); - let mut column_name = StringColumnBuilder::with_capacity(0); - let mut column_type = StringColumnBuilder::with_capacity(0); - let mut all_num_rows = 0; - for (table, columns_info) in info { - for (type_str, column_info) in columns_info { - let pages_info = &column_info.pages; - let num_row = pages_info.len(); - all_num_rows += num_row; - validity_size.reserve(num_row); - compressed_size.reserve(num_row); - uncompressed_size.reserve(num_row); - let tmp_table_name = StringColumnBuilder::repeat(table, num_row); - let tmp_column_name = StringColumnBuilder::repeat(&column_info.field.name, num_row); - let tmp_column_type = StringColumnBuilder::repeat(type_str, num_row); - for p in pages_info { - validity_size.push(p.validity_size); - compressed_size.push(p.compressed_size); - uncompressed_size.push(p.uncompressed_size); - l1.put_and_commit(encoding_to_string(&p.body)); - let l2_encoding = match &p.body { - PageBody::Dict(dict) => Some(encoding_to_string(&dict.indices.body)), - PageBody::Freq(freq) => freq - .exceptions - .as_ref() - .map(|e| encoding_to_string(&e.body)), - _ => None, - }; - if let Some(l2_encoding) = l2_encoding { - l2.push(&l2_encoding); - } else { - l2.push_null(); - } - } - - table_name.append_column(&tmp_table_name.build()); - column_name.append_column(&tmp_column_name.build()); - column_type.append_column(&tmp_column_type.build()); + async fn to_block(&self, rows: &[EncodingRow]) -> Result { + let num_rows = rows.len(); + let mut validity_size: Vec> = Vec::with_capacity(num_rows); + let mut compressed_size: Vec = Vec::with_capacity(num_rows); + let mut uncompressed_size: Vec = Vec::with_capacity(num_rows); + let mut l1 = StringColumnBuilder::with_capacity(num_rows); + let mut l2 = NullableColumnBuilder::::with_capacity(num_rows, &[]); + let mut table_name = StringColumnBuilder::with_capacity(num_rows); + let mut storage_format = StringColumnBuilder::with_capacity(num_rows); + let mut block_location = StringColumnBuilder::with_capacity(num_rows); + let mut column_name = StringColumnBuilder::with_capacity(num_rows); + let mut column_type = StringColumnBuilder::with_capacity(num_rows); + for row in rows { + table_name.put_and_commit(&row.table_name); + storage_format.put_and_commit(row.storage_format.to_string()); + block_location.put_and_commit(&row.block_location); + column_name.put_and_commit(&row.column_name); + column_type.put_and_commit(&row.column_type); + validity_size.push(row.validity_size); + compressed_size.push(row.compressed_size); + uncompressed_size.push(row.uncompressed_size); + l1.put_and_commit(&row.level_one); + if let Some(level_two) = &row.level_two { + l2.push(level_two); + } else { + l2.push_null(); } } Ok(DataBlock::new( vec![ Column::String(table_name.build()).into(), + Column::String(storage_format.build()).into(), + Column::String(block_location.build()).into(), Column::String(column_name.build()).into(), Column::String(column_type.build()).into(), UInt32Type::from_opt_data(validity_size).into(), - UInt32Type::from_data(compressed_size).into(), - UInt32Type::from_data(uncompressed_size).into(), + UInt64Type::from_data(compressed_size).into(), + UInt64Type::from_data(uncompressed_size).into(), Column::String(l1.build()).into(), Column::Nullable(Box::new(l2.build().upcast())).into(), ], - all_num_rows, + num_rows, )) } + #[async_backtrace::framed] + async fn collect_native_rows( + &self, + table: &'a FuseTable, + snapshot: &TableSnapshot, + segments_io: &SegmentsIO, + chunk_size: usize, + rows: &mut Vec, + ) -> Result<()> { + let schema = table.schema(); + let fields = schema.fields(); + for chunk in snapshot.segments.chunks(chunk_size) { + let segments = segments_io + .read_segments::(chunk, false) + .await?; + for segment in segments { + let segment = segment?; + for block in segment.blocks.iter() { + for field in fields { + if field.is_nested() { + continue; + } + if !self.column_matches(field.name()) { + continue; + } + let column_id = field.column_id; + let Some(column_meta) = block.col_metas.get(&column_id) else { + continue; + }; + let (offset, len) = column_meta.offset_length(); + let ranges = vec![(column_id, offset..(offset + len))]; + let read_settings = ReadSettings::from_ctx(&self.ctx)?; + let merge_io_result = MergeIOReader::merge_io_read( + &read_settings, + table.operator.clone(), + &block.location.0, + &ranges, + ) + .await?; + + let block_read_res = + BlockReadResult::create(merge_io_result, vec![], vec![]); + let column_chunks = block_read_res.columns_chunks()?; + let pages = column_chunks + .get(&column_id) + .unwrap() + .as_raw_data() + .unwrap() + .to_bytes(); + let pages = std::io::Cursor::new(pages); + let page_metas = column_meta.as_native().unwrap().pages.clone(); + let reader = NativeReader::new(pages, page_metas, vec![]); + let column_info = stat_simple(reader, field.clone())?; + self.push_native_column_rows( + table.name(), + table.storage_format, + &block.location.0, + field, + column_info, + rows, + ); + } + } + } + } + Ok(()) + } + + #[async_backtrace::framed] + async fn collect_parquet_block_rows_from_meta( + operator: Operator, + location: String, + file_size: u64, + storage_format: FuseStorageFormat, + table_name: Arc, + fields: Arc>, + column_id_to_index: Arc>, + column_name_filter: Arc>, + ) -> Result> { + let file_meta = read_thrift_file_metadata(operator, &location, Some(file_size)).await?; + if file_meta.row_groups.len() != 1 { + return Err(ErrorCode::ParquetFileInvalid(format!( + "invalid parquet file {}, expects one row group but got {}", + location, + file_meta.row_groups.len() + ))); + } + let row_group = &file_meta.row_groups[0]; + let columns = &row_group.columns; + let mut block_rows = Vec::new(); + + for field in fields.iter() { + if field.is_nested() { + continue; + } + if !Self::filter_matches(column_name_filter.as_ref(), field.name()) { + continue; + } + let column_id = field.column_id; + let Some(column_idx) = column_id_to_index.get(&column_id) else { + continue; + }; + let Some(column_chunk) = columns.get(*column_idx) else { + // Missing column caused by schema evolutions + continue; + }; + let chunk_meta = column_chunk.meta_data.as_ref().ok_or_else(|| { + ErrorCode::ParquetFileInvalid(format!( + "invalid parquet file {}, meta data of column {} is empty", + location, column_id + )) + })?; + + let compressed_size = + u64::try_from(chunk_meta.total_compressed_size).map_err(|_| { + ErrorCode::ParquetFileInvalid(format!( + "invalid parquet file {}, compressed size overflow for column {}", + location, + field.name() + )) + })?; + let uncompressed_size = + u64::try_from(chunk_meta.total_uncompressed_size).map_err(|_| { + ErrorCode::ParquetFileInvalid(format!( + "invalid parquet file {}, uncompressed size overflow for column {}", + location, + field.name() + )) + })?; + + let physical_type = parquet_physical_type_to_string(chunk_meta.type_); + block_rows.push(EncodingRow { + table_name: table_name.as_str().to_string(), + storage_format, + block_location: location.clone(), + column_name: field.name().to_string(), + column_type: format!("{} ({})", field.data_type().sql_name(), physical_type), + validity_size: None, + compressed_size, + uncompressed_size, + level_one: parquet_encodings_to_string(&chunk_meta.encodings), + level_two: Some(parquet_codec_to_string(chunk_meta.codec)), + }); + } + + Ok(block_rows) + } + + #[async_backtrace::framed] + async fn collect_parquet_rows( + &self, + table: &'a FuseTable, + snapshot: &TableSnapshot, + segments_io: &SegmentsIO, + chunk_size: usize, + rows: &mut Vec, + ) -> Result<()> { + let schema = table.schema(); + let fields = Arc::new(schema.fields().clone()); + let column_id_to_index: Arc> = Arc::new( + schema + .to_leaf_column_ids() + .into_iter() + .enumerate() + .map(|(idx, id)| (id, idx)) + .collect(), + ); + let column_name_filter = Arc::new(self.column_name_filter.clone()); + let table_name = Arc::new(table.name().to_string()); + let storage_format = table.storage_format; + let mut max_io_requests = self.ctx.get_settings().get_max_storage_io_requests()? as usize; + if max_io_requests == 0 { + max_io_requests = 1; + } + + for chunk in snapshot.segments.chunks(chunk_size) { + let segments = segments_io + .read_segments::(chunk, false) + .await?; + let mut block_locations = Vec::new(); + for segment in segments { + let segment = segment?; + for block in segment.blocks.iter() { + block_locations.push((block.location.0.clone(), block.file_size)); + } + } + + if block_locations.is_empty() { + continue; + } + + let table_operator = table.operator.clone(); + let fields_arc = fields.clone(); + let column_id_to_index_arc = column_id_to_index.clone(); + let table_name_arc = table_name.clone(); + let column_name_filter_arc = column_name_filter.clone(); + let mut block_stream = stream::iter(block_locations.into_iter().map( + move |(location, file_size)| { + let operator = table_operator.clone(); + let fields = fields_arc.clone(); + let column_id_to_index = column_id_to_index_arc.clone(); + let table_name = table_name_arc.clone(); + let column_name_filter = column_name_filter_arc.clone(); + async move { + Self::collect_parquet_block_rows_from_meta( + operator, + location, + file_size, + storage_format, + table_name, + fields, + column_id_to_index, + column_name_filter, + ) + .await + } + }, + )) + .buffer_unordered(max_io_requests); + + while let Some(mut block_rows) = block_stream.try_next().await? { + rows.append(&mut block_rows); + } + } + Ok(()) + } + + fn push_native_column_rows( + &self, + table_name: &str, + storage_format: FuseStorageFormat, + block_location: &str, + field: &TableField, + column_info: ColumnInfo, + rows: &mut Vec, + ) { + let column_name = field.name().clone(); + let column_type = field.data_type().sql_name(); + let block_location = block_location.to_string(); + for page in column_info.pages { + rows.push(EncodingRow { + table_name: table_name.to_string(), + storage_format, + block_location: block_location.clone(), + column_name: column_name.clone(), + column_type: column_type.clone(), + validity_size: page.validity_size, + compressed_size: page.compressed_size as u64, + uncompressed_size: page.uncompressed_size as u64, + level_one: encoding_to_string(&page.body), + level_two: native_level_two_encoding(&page.body), + }); + } + } + + fn table_matches(&self, table_name: &str) -> bool { + Self::filter_matches(&self.table_name_filter, table_name) + } + + fn column_matches(&self, column_name: &str) -> bool { + Self::filter_matches(&self.column_name_filter, column_name) + } + + fn filter_matches(filter: &Option, value: &str) -> bool { + match filter { + Some(filter) => filter == value, + None => true, + } + } + pub fn schema() -> Arc { TableSchemaRefExt::create(vec![ TableField::new("table_name", TableDataType::String), + TableField::new("storage_format", TableDataType::String), + TableField::new("block_location", TableDataType::String), TableField::new("column_name", TableDataType::String), TableField::new("column_type", TableDataType::String), TableField::new( @@ -299,11 +595,11 @@ impl<'a> FuseEncodingImpl<'a> { ), TableField::new( "compressed_size", - TableDataType::Number(NumberDataType::UInt32), + TableDataType::Number(NumberDataType::UInt64), ), TableField::new( "uncompressed_size", - TableDataType::Number(NumberDataType::UInt32), + TableDataType::Number(NumberDataType::UInt64), ), TableField::new("level_one", TableDataType::String), TableField::new( @@ -327,6 +623,73 @@ fn encoding_to_string(page_body: &PageBody) -> String { } } +fn native_level_two_encoding(page_body: &PageBody) -> Option { + match page_body { + PageBody::Dict(dict) => Some(encoding_to_string(&dict.indices.body)), + PageBody::Freq(freq) => freq + .exceptions + .as_ref() + .map(|e| encoding_to_string(&e.body)), + _ => None, + } +} + +fn parquet_encodings_to_string(encodings: &[parquet::format::Encoding]) -> String { + if encodings.is_empty() { + "unknown".to_string() + } else { + encodings + .iter() + .map(|encoding| parquet_encoding_to_string(*encoding)) + .collect::>() + .join(",") + } +} + +fn parquet_encoding_to_string(encoding: parquet::format::Encoding) -> &'static str { + match ParquetEncoding::try_from(encoding) { + Ok(ParquetEncoding::PLAIN) => "plain", + Ok(ParquetEncoding::PLAIN_DICTIONARY) => "plain_dictionary", + Ok(ParquetEncoding::RLE) => "rle", + #[allow(deprecated)] + Ok(ParquetEncoding::BIT_PACKED) => "bit_packed", + Ok(ParquetEncoding::DELTA_BINARY_PACKED) => "delta_binary_packed", + Ok(ParquetEncoding::DELTA_LENGTH_BYTE_ARRAY) => "delta_length_byte_array", + Ok(ParquetEncoding::DELTA_BYTE_ARRAY) => "delta_byte_array", + Ok(ParquetEncoding::RLE_DICTIONARY) => "rle_dictionary", + Ok(ParquetEncoding::BYTE_STREAM_SPLIT) => "byte_stream_split", + Err(_) => "unknown", + } +} + +fn parquet_codec_to_string(codec: parquet::format::CompressionCodec) -> String { + match ParquetCompression::try_from(codec) { + Ok(ParquetCompression::UNCOMPRESSED) => "uncompressed".to_string(), + Ok(ParquetCompression::SNAPPY) => "snappy".to_string(), + Ok(ParquetCompression::GZIP(_)) => "gzip".to_string(), + Ok(ParquetCompression::LZO) => "lzo".to_string(), + Ok(ParquetCompression::BROTLI(_)) => "brotli".to_string(), + Ok(ParquetCompression::LZ4) => "lz4".to_string(), + Ok(ParquetCompression::ZSTD(_)) => "zstd".to_string(), + Ok(ParquetCompression::LZ4_RAW) => "lz4_raw".to_string(), + Err(_) => format!("compression_codec({})", codec.0), + } +} + +fn parquet_physical_type_to_string(ty: ParquetPhysicalType) -> &'static str { + match ty { + ParquetPhysicalType::BOOLEAN => "BOOLEAN", + ParquetPhysicalType::INT32 => "INT32", + ParquetPhysicalType::INT64 => "INT64", + ParquetPhysicalType::INT96 => "INT96", + ParquetPhysicalType::FLOAT => "FLOAT", + ParquetPhysicalType::DOUBLE => "DOUBLE", + ParquetPhysicalType::BYTE_ARRAY => "BYTE_ARRAY", + ParquetPhysicalType::FIXED_LEN_BYTE_ARRAY => "FIXED_LEN_BYTE_ARRAY", + parquet::format::Type(_) => "UNKNOWN", + } +} + pub fn as_expr( remote_expr: &RemoteExpr, fn_registry: &FunctionRegistry, diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test b/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test index 5070195b08137..e8d386eae7a94 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0027_func_fuse_encoding.test @@ -13,12 +13,12 @@ create table t(c int) storage_format = 'native' compression = 'lz4'; statement ok insert into t select number from numbers(2048); -query III -select * from fuse_encoding('db_09_0027'); +query TTTTIIITT +select * exclude(block_location) from fuse_encoding('db_09_0027'); ---- -t c INT NULL 2048 2592 8192 DeltaBitpack NULL +t Native c INT NULL 2048 2592 8192 DeltaBitpack NULL -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') group by level_one,level_two; ---- DeltaBitpack NULL 1 @@ -26,13 +26,13 @@ DeltaBitpack NULL 1 statement ok insert into t select 1 from t limit 2048; -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') group by level_one,level_two order by level_one; ---- DeltaBitpack NULL 1 OneValue NULL 1 -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') where level_one='DeltaBitpack' group by level_one,level_two order by level_one; ---- DeltaBitpack NULL 1 @@ -46,7 +46,7 @@ insert into t2 select 'a' as a from numbers(10) limit 10; statement ok insert into t2 select 'b' as a from numbers(10) limit 10; -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') where table_name='t2' group by level_one,level_two order by level_one; ---- OneValue NULL 2 @@ -54,21 +54,39 @@ OneValue NULL 2 statement ok optimize table t2 compact; -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') where table_name='t2' group by level_one,level_two order by level_one; ---- Dict Rle 1 -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') where column_name='d' group by level_one,level_two order by level_one; ---- Dict Rle 1 -query III +query TTI select level_one,level_two,count(*) from fuse_encoding('db_09_0027') where column_type like '%INT%' group by level_one,level_two order by level_one; ---- DeltaBitpack NULL 1 OneValue NULL 1 + +statement ok +create table t_parquet(c1 int, c2 int) storage_format='parquet' compression = 'zstd'; + +statement ok +insert into t_parquet(c1, c2) select 1, number from numbers(100); + +query TTTTIIITT +select * exclude(block_location) from fuse_encoding('db_09_0027', 't_parquet') order by column_name; +---- +t_parquet Parquet c1 INT NULL (INT32) NULL 57 48 plain,rle,rle_dictionary zstd +t_parquet Parquet c2 INT NULL (INT32) NULL 202 429 plain,rle zstd + +query TTTTIIITT +select * exclude(block_location) from fuse_encoding('db_09_0027', 't_parquet', 'c2'); +---- +t_parquet Parquet c2 INT NULL (INT32) NULL 202 429 plain,rle zstd + statement ok DROP DATABASE db_09_0027