diff --git a/src/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs b/src/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs index c1b32d83ac26f..eb9e6c5b9a017 100644 --- a/src/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs +++ b/src/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs @@ -35,10 +35,8 @@ use super::AggrState; use super::AggrStateLoc; use super::AggregateCountFunction; use super::AggregateFunction; -use super::AggregateFunctionCombinatorNull; use super::AggregateFunctionCreator; use super::AggregateFunctionDescription; -use super::AggregateFunctionFeatures; use super::AggregateFunctionSortDesc; use super::CombinatorDescription; use super::StateAddr; @@ -49,32 +47,19 @@ use super::aggregate_distinct_state::AggregateDistinctStringState; use super::aggregate_distinct_state::AggregateDistinctTimestampState; use super::aggregate_distinct_state::AggregateUniqStringState; use super::aggregate_distinct_state::DistinctStateFunc; -use super::aggregate_null_result::AggregateNullResultFunction; use super::assert_variadic_arguments; +use crate::aggregates::AggregateFunctionFeatures; +#[derive(Clone)] pub struct AggregateDistinctCombinator { name: String, nested_name: String, arguments: Vec, - skip_null: bool, nested: Arc, _s: PhantomData, } -impl Clone for AggregateDistinctCombinator { - fn clone(&self) -> Self { - Self { - name: self.name.clone(), - nested_name: self.nested_name.clone(), - arguments: self.arguments.clone(), - skip_null: self.skip_null, - nested: self.nested.clone(), - _s: PhantomData, - } - } -} - impl AggregateDistinctCombinator where State: Send + 'static { @@ -122,12 +107,12 @@ where State: DistinctStateFunc input_rows: usize, ) -> Result<()> { let state = Self::get_state(place); - state.batch_add(columns, validity, input_rows, self.skip_null) + state.batch_add(columns, validity, input_rows) } fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()> { let state = Self::get_state(place); - state.add(columns, row, self.skip_null) + state.add(columns, row) } fn serialize_type(&self) -> Vec { @@ -234,41 +219,6 @@ pub fn aggregate_uniq_desc() -> AggregateFunctionDescription { ) } -pub fn aggregate_count_distinct_desc() -> AggregateFunctionDescription { - AggregateFunctionDescription::creator_with_features( - Box::new(|_, params, arguments, _| { - let count_creator = Box::new(AggregateCountFunction::try_create) as _; - match *arguments { - [DataType::Nullable(_)] => { - let new_arguments = - AggregateFunctionCombinatorNull::transform_arguments(&arguments)?; - let nested = try_create( - "count", - params.clone(), - new_arguments, - vec![], - &count_creator, - )?; - AggregateFunctionCombinatorNull::try_create(params, arguments, nested, true) - } - ref arguments - if !arguments.is_empty() && arguments.iter().all(DataType::is_null) => - { - AggregateNullResultFunction::try_create(DataType::Number( - NumberDataType::UInt64, - )) - } - _ => try_create("count", params, arguments, vec![], &count_creator), - } - }), - AggregateFunctionFeatures { - returns_default_when_only_null: true, - keep_nullable: true, - ..Default::default() - }, - ) -} - fn try_create( nested_name: &str, params: Vec, @@ -293,7 +243,6 @@ fn try_create( > { nested_name: nested_name.to_owned(), arguments, - skip_null: false, nested, name, _s: PhantomData, @@ -305,7 +254,6 @@ fn try_create( > { nested_name: nested_name.to_owned(), arguments, - skip_null: false, nested, name, _s: PhantomData, @@ -315,7 +263,6 @@ fn try_create( > { nested_name: nested_name.to_owned(), arguments, - skip_null: false, nested, name, _s: PhantomData, @@ -326,7 +273,6 @@ fn try_create( > { name, arguments, - skip_null: false, nested, nested_name: nested_name.to_owned(), _s: PhantomData, @@ -337,7 +283,6 @@ fn try_create( > { nested_name: nested_name.to_owned(), arguments, - skip_null: false, nested, name, _s: PhantomData, @@ -346,9 +291,6 @@ fn try_create( AggregateDistinctState, > { nested_name: nested_name.to_owned(), - skip_null: nested_name == "count" - && arguments.len() > 1 - && arguments.iter().any(DataType::is_nullable_or_null), arguments, nested, name, diff --git a/src/query/functions/src/aggregates/aggregate_distinct_state.rs b/src/query/functions/src/aggregates/aggregate_distinct_state.rs index e8967869f5315..f1a02aa890a12 100644 --- a/src/query/functions/src/aggregates/aggregate_distinct_state.rs +++ b/src/query/functions/src/aggregates/aggregate_distinct_state.rs @@ -54,13 +54,12 @@ pub(super) trait DistinctStateFunc: Sized + Send + StateSerde + 'static { fn new() -> Self; fn is_empty(&self) -> bool; fn len(&self) -> usize; - fn add(&mut self, columns: ProjectedBlock, row: usize, skip_null: bool) -> Result<()>; + fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()>; fn batch_add( &mut self, columns: ProjectedBlock, validity: Option<&Bitmap>, input_rows: usize, - skip_null: bool, ) -> Result<()>; fn merge(&mut self, rhs: &Self) -> Result<()>; fn build_entries(&mut self, types: &[DataType]) -> Result>; @@ -154,7 +153,7 @@ where self.set.len() } - fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> { + fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> { let view = A::downcast_column(&columns); let v = unsafe { view.index_unchecked(row) }; let key: ::Scalar = A::Access::to_owned_scalar(v); @@ -167,7 +166,6 @@ where columns: ProjectedBlock, validity: Option<&Bitmap>, input_rows: usize, - _skip_null: bool, ) -> Result<()> { let view = A::downcast_column(&columns); match validity { @@ -269,16 +267,12 @@ impl DistinctStateFunc for AggregateDistinctState { self.set.len() } - fn add(&mut self, columns: ProjectedBlock, row: usize, skip_null: bool) -> Result<()> { + fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> { let values = columns .iter() .map(|entry| unsafe { entry.index_unchecked(row) }.to_owned()) .collect::>(); - if skip_null && values.iter().any(Scalar::is_null) { - return Ok(()); - } - let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::()); values.serialize(&mut buffer)?; self.set.insert(buffer); @@ -290,7 +284,6 @@ impl DistinctStateFunc for AggregateDistinctState { columns: ProjectedBlock, validity: Option<&Bitmap>, input_rows: usize, - skip_null: bool, ) -> Result<()> { match validity { Some(validity) => { @@ -298,12 +291,12 @@ impl DistinctStateFunc for AggregateDistinctState { if !b { continue; } - self.add(columns, row, skip_null)?; + self.add(columns, row)?; } } None => { for row in 0..input_rows { - self.add(columns, row, skip_null)?; + self.add(columns, row)?; } } } @@ -392,7 +385,7 @@ impl DistinctStateFunc for AggregateDistinctStringState { self.set.len() } - fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> { + fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> { let view = columns[0].downcast::().unwrap(); let data = unsafe { view.index_unchecked(row) }; let _ = self.set.set_insert(data.as_bytes()); @@ -404,7 +397,6 @@ impl DistinctStateFunc for AggregateDistinctStringState { columns: ProjectedBlock, validity: Option<&Bitmap>, input_rows: usize, - _skip_null: bool, ) -> Result<()> { let view = columns[0].downcast::().unwrap(); match validity { @@ -499,7 +491,7 @@ impl DistinctStateFunc for AggregateUniqStringState { self.set.len() } - fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> { + fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> { let view = columns[0].downcast::().unwrap(); let data = unsafe { view.index_unchecked(row) }.as_bytes(); let mut hasher = SipHasher24::new(); @@ -514,7 +506,6 @@ impl DistinctStateFunc for AggregateUniqStringState { columns: ProjectedBlock, validity: Option<&Bitmap>, input_rows: usize, - _skip_null: bool, ) -> Result<()> { let view = columns[0].downcast::().unwrap(); match validity { diff --git a/src/query/functions/src/aggregates/aggregator.rs b/src/query/functions/src/aggregates/aggregator.rs index 3be08a27384db..6b8806583bf2b 100644 --- a/src/query/functions/src/aggregates/aggregator.rs +++ b/src/query/functions/src/aggregates/aggregator.rs @@ -74,7 +74,6 @@ impl Aggregators { factory.register("sum_zero", AggregateSumZeroFunction::desc()); factory.register("avg", aggregate_avg_function_desc()); factory.register("uniq", aggregate_uniq_desc()); - factory.register("count_distinct", aggregate_count_distinct_desc()); factory.register("min", aggregate_min_function_desc()); factory.register("max", aggregate_max_function_desc());