Skip to content

Commit 9b40063

Browse files
authored
feat: clustering_statistics support specify snapshot (#19148)
* clustering_statistics support specify snapshot * fix
1 parent ebcad91 commit 9b40063

File tree

1 file changed

+33
-126
lines changed

1 file changed

+33
-126
lines changed

src/query/storages/fuse/src/table_functions/clustering_statistics.rs

Lines changed: 33 additions & 126 deletions
Original file line numberDiff line numberDiff line change
@@ -14,9 +14,7 @@
1414

1515
use std::sync::Arc;
1616

17-
use databend_common_catalog::plan::DataSourcePlan;
1817
use databend_common_catalog::table::Table;
19-
use databend_common_catalog::table_args::TableArgs;
2018
use databend_common_exception::ErrorCode;
2119
use databend_common_exception::Result;
2220
use databend_common_expression::Column;
@@ -26,7 +24,6 @@ use databend_common_expression::Scalar;
2624
use databend_common_expression::TableDataType;
2725
use databend_common_expression::TableField;
2826
use databend_common_expression::TableSchema;
29-
use databend_common_expression::TableSchemaRef;
3027
use databend_common_expression::TableSchemaRefExt;
3128
use databend_common_expression::types::Int32Type;
3229
use databend_common_expression::types::NumberDataType;
@@ -38,121 +35,50 @@ use databend_storages_common_table_meta::meta::TableSnapshot;
3835
use crate::FuseTable;
3936
use crate::io::SegmentsIO;
4037
use crate::sessions::TableContext;
41-
use crate::table_functions::SimpleArgFunc;
42-
use crate::table_functions::SimpleArgFuncTemplate;
43-
use crate::table_functions::parse_db_tb_args;
44-
use crate::table_functions::string_literal;
45-
46-
pub struct ClusteringStatsArgs {
47-
database_name: String,
48-
table_name: String,
49-
}
50-
51-
impl From<&ClusteringStatsArgs> for TableArgs {
52-
fn from(args: &ClusteringStatsArgs) -> Self {
53-
let tbl_args = vec![
54-
string_literal(args.database_name.as_str()),
55-
string_literal(args.table_name.as_str()),
56-
];
57-
TableArgs::new_positioned(tbl_args)
58-
}
59-
}
60-
61-
impl TryFrom<(&str, TableArgs)> for ClusteringStatsArgs {
62-
type Error = ErrorCode;
63-
fn try_from(
64-
(func_name, table_args): (&str, TableArgs),
65-
) -> std::result::Result<Self, Self::Error> {
66-
let (database_name, table_name) = parse_db_tb_args(&table_args, func_name)?;
67-
Ok(Self {
68-
database_name,
69-
table_name,
70-
})
71-
}
72-
}
73-
74-
pub type ClusteringStatisticsFunc = SimpleArgFuncTemplate<ClusteringStatistics>;
38+
use crate::table_functions::TableMetaFunc;
39+
use crate::table_functions::TableMetaFuncTemplate;
7540

7641
pub struct ClusteringStatistics;
7742

78-
#[async_trait::async_trait]
79-
impl SimpleArgFunc for ClusteringStatistics {
80-
type Args = ClusteringStatsArgs;
43+
pub type ClusteringStatisticsFunc = TableMetaFuncTemplate<ClusteringStatistics>;
8144

82-
fn schema() -> TableSchemaRef {
83-
ClusteringStatisticsImpl::schema()
45+
#[async_trait::async_trait]
46+
impl TableMetaFunc for ClusteringStatistics {
47+
fn schema() -> Arc<TableSchema> {
48+
TableSchemaRefExt::create(vec![
49+
TableField::new("segment_name", TableDataType::String),
50+
TableField::new("block_name", TableDataType::String),
51+
TableField::new("min", TableDataType::String.wrap_nullable()),
52+
TableField::new("max", TableDataType::String.wrap_nullable()),
53+
TableField::new(
54+
"level",
55+
TableDataType::Number(NumberDataType::Int32).wrap_nullable(),
56+
),
57+
TableField::new("pages", TableDataType::String.wrap_nullable()),
58+
])
8459
}
8560

8661
async fn apply(
8762
ctx: &Arc<dyn TableContext>,
88-
args: &Self::Args,
89-
plan: &DataSourcePlan,
63+
tbl: &FuseTable,
64+
snapshot: Arc<TableSnapshot>,
65+
limit: Option<usize>,
9066
) -> Result<DataBlock> {
91-
let tenant_id = ctx.get_tenant();
92-
93-
let tbl = ctx
94-
.get_catalog(databend_common_catalog::catalog_kind::CATALOG_DEFAULT)
95-
.await?
96-
.get_table(
97-
&tenant_id,
98-
args.database_name.as_str(),
99-
args.table_name.as_str(),
100-
)
101-
.await?;
102-
let tbl = FuseTable::try_from_table(tbl.as_ref())?;
103-
67+
// NOTE (design choice):
68+
// Clustering statistics are only meaningful for the current cluster key definition.
69+
// Historical cluster information stored in snapshots is intentionally ignored.
70+
//
71+
// Once the cluster key changes, historical cluster_stats cannot be interpreted
72+
// or compared correctly, so snapshots are evaluated against the live table's
73+
// cluster key only.
10474
let Some(cluster_key_id) = tbl.cluster_key_id() else {
10575
return Err(ErrorCode::UnclusteredTable(format!(
106-
"Unclustered table '{}.{}'",
107-
args.database_name, args.table_name,
76+
"Unclustered table {}",
77+
tbl.get_table_info().desc,
10878
)));
10979
};
11080

111-
let limit = plan.push_downs.as_ref().and_then(|x| x.limit);
112-
ClusteringStatisticsImpl::new(ctx.clone(), tbl, limit, cluster_key_id)
113-
.get_blocks()
114-
.await
115-
}
116-
}
117-
118-
pub struct ClusteringStatisticsImpl<'a> {
119-
pub ctx: Arc<dyn TableContext>,
120-
pub table: &'a FuseTable,
121-
pub limit: Option<usize>,
122-
pub cluster_key_id: u32,
123-
}
124-
125-
impl<'a> ClusteringStatisticsImpl<'a> {
126-
pub fn new(
127-
ctx: Arc<dyn TableContext>,
128-
table: &'a FuseTable,
129-
limit: Option<usize>,
130-
cluster_key_id: u32,
131-
) -> Self {
132-
Self {
133-
ctx,
134-
table,
135-
limit,
136-
cluster_key_id,
137-
}
138-
}
139-
140-
#[async_backtrace::framed]
141-
pub async fn get_blocks(&self) -> Result<DataBlock> {
142-
let tbl = self.table;
143-
let maybe_snapshot = tbl.read_table_snapshot().await?;
144-
if let Some(snapshot) = maybe_snapshot {
145-
return self.to_block(snapshot).await;
146-
}
147-
148-
Ok(DataBlock::empty_with_schema(Arc::new(
149-
Self::schema().into(),
150-
)))
151-
}
152-
153-
#[async_backtrace::framed]
154-
async fn to_block(&self, snapshot: Arc<TableSnapshot>) -> Result<DataBlock> {
155-
let limit = self.limit.unwrap_or(usize::MAX);
81+
let limit = limit.unwrap_or(usize::MAX);
15682
let len = std::cmp::min(snapshot.summary.block_count as usize, limit);
15783

15884
let mut segment_name = Vec::with_capacity(len);
@@ -162,16 +88,11 @@ impl<'a> ClusteringStatisticsImpl<'a> {
16288
let mut level = Vec::with_capacity(len);
16389
let mut pages = Vec::with_capacity(len);
16490

165-
let segments_io = SegmentsIO::create(
166-
self.ctx.clone(),
167-
self.table.operator.clone(),
168-
self.table.schema(),
169-
);
91+
let segments_io = SegmentsIO::create(ctx.clone(), tbl.operator.clone(), tbl.schema());
17092

17193
let mut row_num = 0;
17294
let chunk_size =
173-
std::cmp::min(self.ctx.get_settings().get_max_threads()? as usize * 4, len).max(1);
174-
95+
std::cmp::min(ctx.get_settings().get_max_threads()? as usize * 4, len).max(1);
17596
let format_vec = |v: &[Scalar]| -> String {
17697
format!(
17798
"[{}]",
@@ -200,7 +121,7 @@ impl<'a> ClusteringStatisticsImpl<'a> {
200121
let clustered = block
201122
.cluster_stats
202123
.as_ref()
203-
.is_some_and(|v| v.cluster_key_id == self.cluster_key_id);
124+
.is_some_and(|v| v.cluster_key_id == cluster_key_id);
204125

205126
if clustered {
206127
// Safe to unwrap
@@ -236,18 +157,4 @@ impl<'a> ClusteringStatisticsImpl<'a> {
236157
row_num,
237158
))
238159
}
239-
240-
pub fn schema() -> Arc<TableSchema> {
241-
TableSchemaRefExt::create(vec![
242-
TableField::new("segment_name", TableDataType::String),
243-
TableField::new("block_name", TableDataType::String),
244-
TableField::new("min", TableDataType::String.wrap_nullable()),
245-
TableField::new("max", TableDataType::String.wrap_nullable()),
246-
TableField::new(
247-
"level",
248-
TableDataType::Number(NumberDataType::Int32).wrap_nullable(),
249-
),
250-
TableField::new("pages", TableDataType::String.wrap_nullable()),
251-
])
252-
}
253160
}

0 commit comments

Comments
 (0)