-
Notifications
You must be signed in to change notification settings - Fork 848
feat(query): add group-by type shrinking rule #19177
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Closed
Closed
Changes from 2 commits
Commits
Show all changes
9 commits
Select commit
Hold shift + click to select a range
eaa9556
feat(query): add group-by type shrinking rule
zhang2014 11cc2b3
z
zhang2014 cd12194
z
zhang2014 fc0b3b1
z
zhang2014 0840c69
z
zhang2014 62ad140
z
zhang2014 05fc3c4
Merge branch 'main' into feat-query-groupby-shrink
zhang2014 9e1301e
z
zhang2014 fa0fb36
Merge branch 'feat-query-groupby-shrink' of github.com:zhang2014/data…
zhang2014 File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
371 changes: 371 additions & 0 deletions
371
src/query/sql/src/planner/optimizer/optimizers/rule/agg_rules/rule_shrink_group_by_type.rs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,371 @@ | ||
| // Copyright 2021 Datafuse Labs | ||
| // | ||
| // Licensed under the Apache License, Version 2.0 (the "License"); | ||
| // you may not use this file except in compliance with the License. | ||
| // You may obtain a copy of the License at | ||
| // | ||
| // http://www.apache.org/licenses/LICENSE-2.0 | ||
| // | ||
| // Unless required by applicable law or agreed to in writing, software | ||
| // distributed under the License is distributed on an "AS IS" BASIS, | ||
| // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| // See the License for the specific language governing permissions and | ||
| // limitations under the License. | ||
|
|
||
| use std::sync::Arc; | ||
|
|
||
| use databend_common_exception::Result; | ||
| use databend_common_expression::types::DataType; | ||
| use databend_common_expression::types::NumberDataType; | ||
| use databend_common_storage::Datum; | ||
|
|
||
| use crate::IndexType; | ||
| use crate::MetadataRef; | ||
| use crate::Visibility; | ||
| use crate::binder::ColumnBinding; | ||
| use crate::binder::ColumnBindingBuilder; | ||
| use crate::optimizer::ir::ColumnStat; | ||
| use crate::optimizer::ir::Matcher; | ||
| use crate::optimizer::ir::RelExpr; | ||
| use crate::optimizer::ir::SExpr; | ||
| use crate::optimizer::optimizers::rule::Rule; | ||
| use crate::optimizer::optimizers::rule::RuleID; | ||
| use crate::optimizer::optimizers::rule::TransformResult; | ||
| use crate::plans::Aggregate; | ||
| use crate::plans::AggregateMode; | ||
| use crate::plans::BoundColumnRef; | ||
| use crate::plans::CastExpr; | ||
| use crate::plans::EvalScalar; | ||
| use crate::plans::RelOp; | ||
| use crate::plans::ScalarExpr; | ||
| use crate::plans::ScalarItem; | ||
|
|
||
| pub struct RuleShrinkGroupByType { | ||
| id: RuleID, | ||
| matchers: Vec<Matcher>, | ||
| metadata: MetadataRef, | ||
| } | ||
|
|
||
| impl RuleShrinkGroupByType { | ||
| pub fn new(metadata: MetadataRef) -> Self { | ||
| Self { | ||
| id: RuleID::ShrinkGroupByType, | ||
| matchers: vec![Matcher::MatchOp { | ||
| op_type: RelOp::Aggregate, | ||
| children: vec![Matcher::Leaf], | ||
| }], | ||
| metadata, | ||
| } | ||
| } | ||
| } | ||
|
|
||
| impl Rule for RuleShrinkGroupByType { | ||
| fn id(&self) -> RuleID { | ||
| self.id | ||
| } | ||
|
|
||
| fn apply(&self, s_expr: &SExpr, state: &mut TransformResult) -> Result<()> { | ||
| let agg: Aggregate = s_expr.plan().clone().try_into()?; | ||
| if agg.group_items.is_empty() | ||
| || agg.mode != AggregateMode::Initial | ||
| || agg.grouping_sets.is_some() | ||
| { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let rel_expr = RelExpr::with_s_expr(s_expr); | ||
| let child_stat = rel_expr.derive_cardinality_child(0)?; | ||
| let column_stats = &child_stat.statistics.column_stats; | ||
|
|
||
| let mut rewrites = Vec::with_capacity(agg.group_items.len()); | ||
| for (idx, item) in agg.group_items.iter().enumerate() { | ||
| let ScalarExpr::BoundColumnRef(col_ref) = &item.scalar else { | ||
| continue; | ||
| }; | ||
| let Some(stat) = column_stats.get(&item.index) else { | ||
| continue; | ||
| }; | ||
| let Some(target_type) = | ||
| shrink_group_by_data_type(col_ref.column.data_type.as_ref(), stat) | ||
| else { | ||
| continue; | ||
| }; | ||
|
|
||
| let mut writer = self.metadata.write(); | ||
| let shrink_index = writer.add_derived_column( | ||
| format!("{}_shrink", col_ref.column.column_name), | ||
| target_type.clone(), | ||
| ); | ||
| drop(writer); | ||
|
|
||
| let shrink_binding = ColumnBindingBuilder::new( | ||
| format!("{}_shrink", col_ref.column.column_name), | ||
| shrink_index, | ||
| Box::new(target_type.clone()), | ||
| Visibility::Visible, | ||
| ) | ||
| .build(); | ||
|
|
||
| rewrites.push(GroupByRewrite { | ||
| position: idx, | ||
| original_binding: col_ref.column.clone(), | ||
| original_index: item.index, | ||
| shrink_binding, | ||
| shrink_type: target_type, | ||
| shrink_index, | ||
| }); | ||
| } | ||
|
|
||
| if rewrites.is_empty() { | ||
| return Ok(()); | ||
| } | ||
|
|
||
| let mut new_group_items = agg.group_items.clone(); | ||
| let mut lower_items = Vec::with_capacity(rewrites.len()); | ||
| let mut upper_items = Vec::with_capacity(rewrites.len()); | ||
|
|
||
| for rewrite in rewrites.iter() { | ||
| new_group_items[rewrite.position].index = rewrite.shrink_index; | ||
| new_group_items[rewrite.position].scalar = ScalarExpr::BoundColumnRef(BoundColumnRef { | ||
| span: None, | ||
| column: rewrite.shrink_binding.clone(), | ||
| }); | ||
|
|
||
| lower_items.push(ScalarItem { | ||
| index: rewrite.shrink_index, | ||
| scalar: ScalarExpr::CastExpr(CastExpr { | ||
| span: None, | ||
| is_try: false, | ||
| argument: Box::new(ScalarExpr::BoundColumnRef(BoundColumnRef { | ||
| span: None, | ||
| column: rewrite.original_binding.clone(), | ||
| })), | ||
| target_type: Box::new(rewrite.shrink_type.clone()), | ||
| }), | ||
| }); | ||
|
|
||
| upper_items.push(ScalarItem { | ||
| index: rewrite.original_index, | ||
| scalar: ScalarExpr::CastExpr(CastExpr { | ||
| span: None, | ||
| is_try: false, | ||
| argument: Box::new(ScalarExpr::BoundColumnRef(BoundColumnRef { | ||
| span: None, | ||
| column: rewrite.shrink_binding.clone(), | ||
| })), | ||
| target_type: Box::new((*rewrite.original_binding.data_type).clone()), | ||
| }), | ||
| }); | ||
| } | ||
|
|
||
| let mut new_child = s_expr.child(0)?.clone(); | ||
| if !lower_items.is_empty() { | ||
| new_child = SExpr::create_unary( | ||
| Arc::new(EvalScalar { items: lower_items }.into()), | ||
| Arc::new(new_child), | ||
| ); | ||
| } | ||
|
|
||
| let mut new_agg = agg; | ||
| new_agg.group_items = new_group_items; | ||
| let mut new_expr = SExpr::create_unary(Arc::new(new_agg.into()), Arc::new(new_child)); | ||
|
|
||
| if !upper_items.is_empty() { | ||
| new_expr = SExpr::create_unary( | ||
| Arc::new(EvalScalar { items: upper_items }.into()), | ||
| Arc::new(new_expr), | ||
| ); | ||
| } | ||
|
|
||
| state.add_result(new_expr); | ||
| Ok(()) | ||
| } | ||
|
|
||
| fn matchers(&self) -> &[Matcher] { | ||
| &self.matchers | ||
| } | ||
| } | ||
|
|
||
| struct GroupByRewrite { | ||
| position: usize, | ||
| original_binding: ColumnBinding, | ||
| original_index: IndexType, | ||
| shrink_binding: ColumnBinding, | ||
| shrink_type: DataType, | ||
| shrink_index: IndexType, | ||
| } | ||
|
|
||
| fn shrink_group_by_data_type(data_type: &DataType, stat: &ColumnStat) -> Option<DataType> { | ||
| match data_type { | ||
| DataType::Nullable(inner) => { | ||
| shrink_group_by_data_type(inner, stat).map(|dt| DataType::Nullable(Box::new(dt))) | ||
| } | ||
| DataType::Number(number_type) => { | ||
| shrink_number_type(*number_type, &stat.origin_min, &stat.origin_max) | ||
| .map(DataType::Number) | ||
| } | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| fn shrink_number_type( | ||
| number_type: NumberDataType, | ||
| min: &Datum, | ||
| max: &Datum, | ||
| ) -> Option<NumberDataType> { | ||
| use NumberDataType::*; | ||
| match number_type { | ||
| UInt8 | Int8 => None, | ||
| UInt16 => shrink_unsigned(min, max, &[UInt8]), | ||
| UInt32 => shrink_unsigned(min, max, &[UInt8, UInt16]), | ||
| UInt64 => shrink_unsigned(min, max, &[UInt8, UInt16, UInt32]), | ||
| Int16 => shrink_signed(min, max, &[Int8]), | ||
| Int32 => shrink_signed(min, max, &[Int8, Int16]), | ||
| Int64 => shrink_signed(min, max, &[Int8, Int16, Int32]), | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| fn shrink_unsigned( | ||
| min: &Datum, | ||
| max: &Datum, | ||
| candidates: &[NumberDataType], | ||
| ) -> Option<NumberDataType> { | ||
| let (Some(min_v), Some(max_v)) = (datum_to_u128(min), datum_to_u128(max)) else { | ||
| return None; | ||
| }; | ||
| if max_v < min_v { | ||
| return None; | ||
| } | ||
| for candidate in candidates { | ||
| match candidate { | ||
| NumberDataType::UInt8 if max_v <= u8::MAX as u128 => return Some(*candidate), | ||
| NumberDataType::UInt16 if max_v <= u16::MAX as u128 => return Some(*candidate), | ||
| NumberDataType::UInt32 if max_v <= u32::MAX as u128 => return Some(*candidate), | ||
| _ => continue, | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| fn shrink_signed( | ||
| min: &Datum, | ||
| max: &Datum, | ||
| candidates: &[NumberDataType], | ||
| ) -> Option<NumberDataType> { | ||
| let (Some(min_v), Some(max_v)) = (datum_to_i128(min), datum_to_i128(max)) else { | ||
| return None; | ||
| }; | ||
| if max_v < min_v { | ||
| return None; | ||
| } | ||
| for candidate in candidates { | ||
| match candidate { | ||
| NumberDataType::Int8 if min_v >= i8::MIN as i128 && max_v <= i8::MAX as i128 => { | ||
| return Some(*candidate); | ||
| } | ||
| NumberDataType::Int16 if min_v >= i16::MIN as i128 && max_v <= i16::MAX as i128 => { | ||
| return Some(*candidate); | ||
| } | ||
| NumberDataType::Int32 if min_v >= i32::MIN as i128 && max_v <= i32::MAX as i128 => { | ||
| return Some(*candidate); | ||
| } | ||
| _ => continue, | ||
| } | ||
| } | ||
| None | ||
| } | ||
|
|
||
| fn datum_to_u128(value: &Datum) -> Option<u128> { | ||
| match value { | ||
| Datum::UInt(v) => Some(*v as u128), | ||
| Datum::Int(v) if *v >= 0 => Some(*v as u128), | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| fn datum_to_i128(value: &Datum) -> Option<i128> { | ||
| match value { | ||
| Datum::Int(v) => Some(*v as i128), | ||
| Datum::UInt(v) => Some(*v as i128), | ||
| _ => None, | ||
| } | ||
| } | ||
|
|
||
| #[cfg(test)] | ||
| mod tests { | ||
| use databend_common_storage::Datum; | ||
|
|
||
| use super::*; | ||
| use crate::optimizer::ir::ColumnStat; | ||
| use crate::optimizer::ir::Ndv; | ||
|
|
||
| #[test] | ||
| fn test_shrink_unsigned() { | ||
| let stat = ColumnStat { | ||
| min: Datum::UInt(0), | ||
| max: Datum::UInt(100), | ||
| ndv: Ndv::Stat(10.0), | ||
| null_count: 0, | ||
| origin_min: Datum::UInt(0), | ||
| origin_max: Datum::UInt(100), | ||
| histogram: None, | ||
| }; | ||
| let t = shrink_group_by_data_type(&DataType::Number(NumberDataType::UInt64), &stat); | ||
| assert_eq!(t, Some(DataType::Number(NumberDataType::UInt8))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_shrink_signed() { | ||
| let stat = ColumnStat { | ||
| min: Datum::Int(-100), | ||
| max: Datum::Int(100), | ||
| ndv: Ndv::Stat(10.0), | ||
| null_count: 0, | ||
| origin_min: Datum::Int(-100), | ||
| origin_max: Datum::Int(100), | ||
| histogram: None, | ||
| }; | ||
| let t = shrink_group_by_data_type(&DataType::Number(NumberDataType::Int64), &stat); | ||
| assert_eq!(t, Some(DataType::Number(NumberDataType::Int8))); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_shrink_nullable() { | ||
| let stat = ColumnStat { | ||
| min: Datum::UInt(0), | ||
| max: Datum::UInt(50000), | ||
| ndv: Ndv::Stat(10.0), | ||
| null_count: 0, | ||
| origin_min: Datum::UInt(0), | ||
| origin_max: Datum::UInt(50000), | ||
| histogram: None, | ||
| }; | ||
| let t = shrink_group_by_data_type( | ||
| &DataType::Nullable(Box::new(DataType::Number(NumberDataType::UInt64))), | ||
| &stat, | ||
| ); | ||
| assert_eq!( | ||
| t, | ||
| Some(DataType::Nullable(Box::new(DataType::Number( | ||
| NumberDataType::UInt16 | ||
| )))) | ||
| ); | ||
| } | ||
|
|
||
| #[test] | ||
| fn test_no_shrink() { | ||
| let stat = ColumnStat { | ||
| min: Datum::UInt(0), | ||
| max: Datum::UInt(u32::MAX as u64 + 1), | ||
| ndv: Ndv::Stat(10.0), | ||
| null_count: 0, | ||
| origin_min: Datum::UInt(0), | ||
| origin_max: Datum::UInt(u32::MAX as u64 + 1), | ||
| histogram: None, | ||
| }; | ||
| assert!( | ||
| shrink_group_by_data_type(&DataType::Number(NumberDataType::UInt64), &stat).is_none() | ||
| ); | ||
| } | ||
| } | ||
Oops, something went wrong.
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When the rule rewrites a group key to use the new
shrink_index(group_items[position].index = rewrite.shrink_index), no column statistics are created for that derived column (EvalScalar’sderive_statsreturns the child stats unchanged).Aggregate::derive_agg_statslater indexescolumn_stats[&group.index]and unwraps, so any aggregate that triggers this shrink will panic during planning because the stats map lacks an entry for the new index. Add stats for the shrink column or avoid replacing the group index in stats-based code paths.Useful? React with 👍 / 👎.