Skip to content

Commit 318ef91

Browse files
authored
chore(query): clean up the useless count_distinct (#19191)
Revert "fix(query): count_distinct needs to handle nullable correctly (#18973)"
1 parent 25e6011 commit 318ef91

File tree

3 files changed

+11
-79
lines changed

3 files changed

+11
-79
lines changed

src/query/functions/src/aggregates/adaptors/aggregate_combinator_distinct.rs

Lines changed: 4 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -35,10 +35,8 @@ use super::AggrState;
3535
use super::AggrStateLoc;
3636
use super::AggregateCountFunction;
3737
use super::AggregateFunction;
38-
use super::AggregateFunctionCombinatorNull;
3938
use super::AggregateFunctionCreator;
4039
use super::AggregateFunctionDescription;
41-
use super::AggregateFunctionFeatures;
4240
use super::AggregateFunctionSortDesc;
4341
use super::CombinatorDescription;
4442
use super::StateAddr;
@@ -49,32 +47,19 @@ use super::aggregate_distinct_state::AggregateDistinctStringState;
4947
use super::aggregate_distinct_state::AggregateDistinctTimestampState;
5048
use super::aggregate_distinct_state::AggregateUniqStringState;
5149
use super::aggregate_distinct_state::DistinctStateFunc;
52-
use super::aggregate_null_result::AggregateNullResultFunction;
5350
use super::assert_variadic_arguments;
51+
use crate::aggregates::AggregateFunctionFeatures;
5452

53+
#[derive(Clone)]
5554
pub struct AggregateDistinctCombinator<State> {
5655
name: String,
5756

5857
nested_name: String,
5958
arguments: Vec<DataType>,
60-
skip_null: bool,
6159
nested: Arc<dyn AggregateFunction>,
6260
_s: PhantomData<fn(State)>,
6361
}
6462

65-
impl<State> Clone for AggregateDistinctCombinator<State> {
66-
fn clone(&self) -> Self {
67-
Self {
68-
name: self.name.clone(),
69-
nested_name: self.nested_name.clone(),
70-
arguments: self.arguments.clone(),
71-
skip_null: self.skip_null,
72-
nested: self.nested.clone(),
73-
_s: PhantomData,
74-
}
75-
}
76-
}
77-
7863
impl<State> AggregateDistinctCombinator<State>
7964
where State: Send + 'static
8065
{
@@ -122,12 +107,12 @@ where State: DistinctStateFunc
122107
input_rows: usize,
123108
) -> Result<()> {
124109
let state = Self::get_state(place);
125-
state.batch_add(columns, validity, input_rows, self.skip_null)
110+
state.batch_add(columns, validity, input_rows)
126111
}
127112

128113
fn accumulate_row(&self, place: AggrState, columns: ProjectedBlock, row: usize) -> Result<()> {
129114
let state = Self::get_state(place);
130-
state.add(columns, row, self.skip_null)
115+
state.add(columns, row)
131116
}
132117

133118
fn serialize_type(&self) -> Vec<StateSerdeItem> {
@@ -234,41 +219,6 @@ pub fn aggregate_uniq_desc() -> AggregateFunctionDescription {
234219
)
235220
}
236221

237-
pub fn aggregate_count_distinct_desc() -> AggregateFunctionDescription {
238-
AggregateFunctionDescription::creator_with_features(
239-
Box::new(|_, params, arguments, _| {
240-
let count_creator = Box::new(AggregateCountFunction::try_create) as _;
241-
match *arguments {
242-
[DataType::Nullable(_)] => {
243-
let new_arguments =
244-
AggregateFunctionCombinatorNull::transform_arguments(&arguments)?;
245-
let nested = try_create(
246-
"count",
247-
params.clone(),
248-
new_arguments,
249-
vec![],
250-
&count_creator,
251-
)?;
252-
AggregateFunctionCombinatorNull::try_create(params, arguments, nested, true)
253-
}
254-
ref arguments
255-
if !arguments.is_empty() && arguments.iter().all(DataType::is_null) =>
256-
{
257-
AggregateNullResultFunction::try_create(DataType::Number(
258-
NumberDataType::UInt64,
259-
))
260-
}
261-
_ => try_create("count", params, arguments, vec![], &count_creator),
262-
}
263-
}),
264-
AggregateFunctionFeatures {
265-
returns_default_when_only_null: true,
266-
keep_nullable: true,
267-
..Default::default()
268-
},
269-
)
270-
}
271-
272222
fn try_create(
273223
nested_name: &str,
274224
params: Vec<Scalar>,
@@ -293,7 +243,6 @@ fn try_create(
293243
> {
294244
nested_name: nested_name.to_owned(),
295245
arguments,
296-
skip_null: false,
297246
nested,
298247
name,
299248
_s: PhantomData,
@@ -305,7 +254,6 @@ fn try_create(
305254
> {
306255
nested_name: nested_name.to_owned(),
307256
arguments,
308-
skip_null: false,
309257
nested,
310258
name,
311259
_s: PhantomData,
@@ -315,7 +263,6 @@ fn try_create(
315263
> {
316264
nested_name: nested_name.to_owned(),
317265
arguments,
318-
skip_null: false,
319266
nested,
320267
name,
321268
_s: PhantomData,
@@ -326,7 +273,6 @@ fn try_create(
326273
> {
327274
name,
328275
arguments,
329-
skip_null: false,
330276
nested,
331277
nested_name: nested_name.to_owned(),
332278
_s: PhantomData,
@@ -337,7 +283,6 @@ fn try_create(
337283
> {
338284
nested_name: nested_name.to_owned(),
339285
arguments,
340-
skip_null: false,
341286
nested,
342287
name,
343288
_s: PhantomData,
@@ -346,9 +291,6 @@ fn try_create(
346291
AggregateDistinctState,
347292
> {
348293
nested_name: nested_name.to_owned(),
349-
skip_null: nested_name == "count"
350-
&& arguments.len() > 1
351-
&& arguments.iter().any(DataType::is_nullable_or_null),
352294
arguments,
353295
nested,
354296
name,

src/query/functions/src/aggregates/aggregate_distinct_state.rs

Lines changed: 7 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -54,13 +54,12 @@ pub(super) trait DistinctStateFunc: Sized + Send + StateSerde + 'static {
5454
fn new() -> Self;
5555
fn is_empty(&self) -> bool;
5656
fn len(&self) -> usize;
57-
fn add(&mut self, columns: ProjectedBlock, row: usize, skip_null: bool) -> Result<()>;
57+
fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()>;
5858
fn batch_add(
5959
&mut self,
6060
columns: ProjectedBlock,
6161
validity: Option<&Bitmap>,
6262
input_rows: usize,
63-
skip_null: bool,
6463
) -> Result<()>;
6564
fn merge(&mut self, rhs: &Self) -> Result<()>;
6665
fn build_entries(&mut self, types: &[DataType]) -> Result<Vec<BlockEntry>>;
@@ -154,7 +153,7 @@ where
154153
self.set.len()
155154
}
156155

157-
fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> {
156+
fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> {
158157
let view = A::downcast_column(&columns);
159158
let v = unsafe { view.index_unchecked(row) };
160159
let key: <A::Access as AccessType>::Scalar = A::Access::to_owned_scalar(v);
@@ -167,7 +166,6 @@ where
167166
columns: ProjectedBlock,
168167
validity: Option<&Bitmap>,
169168
input_rows: usize,
170-
_skip_null: bool,
171169
) -> Result<()> {
172170
let view = A::downcast_column(&columns);
173171
match validity {
@@ -269,16 +267,12 @@ impl DistinctStateFunc for AggregateDistinctState {
269267
self.set.len()
270268
}
271269

272-
fn add(&mut self, columns: ProjectedBlock, row: usize, skip_null: bool) -> Result<()> {
270+
fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> {
273271
let values = columns
274272
.iter()
275273
.map(|entry| unsafe { entry.index_unchecked(row) }.to_owned())
276274
.collect::<Vec<_>>();
277275

278-
if skip_null && values.iter().any(Scalar::is_null) {
279-
return Ok(());
280-
}
281-
282276
let mut buffer = Vec::with_capacity(values.len() * std::mem::size_of::<Scalar>());
283277
values.serialize(&mut buffer)?;
284278
self.set.insert(buffer);
@@ -290,20 +284,19 @@ impl DistinctStateFunc for AggregateDistinctState {
290284
columns: ProjectedBlock,
291285
validity: Option<&Bitmap>,
292286
input_rows: usize,
293-
skip_null: bool,
294287
) -> Result<()> {
295288
match validity {
296289
Some(validity) => {
297290
for (row, b) in (0..input_rows).zip(validity) {
298291
if !b {
299292
continue;
300293
}
301-
self.add(columns, row, skip_null)?;
294+
self.add(columns, row)?;
302295
}
303296
}
304297
None => {
305298
for row in 0..input_rows {
306-
self.add(columns, row, skip_null)?;
299+
self.add(columns, row)?;
307300
}
308301
}
309302
}
@@ -392,7 +385,7 @@ impl DistinctStateFunc for AggregateDistinctStringState {
392385
self.set.len()
393386
}
394387

395-
fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> {
388+
fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> {
396389
let view = columns[0].downcast::<StringType>().unwrap();
397390
let data = unsafe { view.index_unchecked(row) };
398391
let _ = self.set.set_insert(data.as_bytes());
@@ -404,7 +397,6 @@ impl DistinctStateFunc for AggregateDistinctStringState {
404397
columns: ProjectedBlock,
405398
validity: Option<&Bitmap>,
406399
input_rows: usize,
407-
_skip_null: bool,
408400
) -> Result<()> {
409401
let view = columns[0].downcast::<StringType>().unwrap();
410402
match validity {
@@ -499,7 +491,7 @@ impl DistinctStateFunc for AggregateUniqStringState {
499491
self.set.len()
500492
}
501493

502-
fn add(&mut self, columns: ProjectedBlock, row: usize, _skip_null: bool) -> Result<()> {
494+
fn add(&mut self, columns: ProjectedBlock, row: usize) -> Result<()> {
503495
let view = columns[0].downcast::<StringType>().unwrap();
504496
let data = unsafe { view.index_unchecked(row) }.as_bytes();
505497
let mut hasher = SipHasher24::new();
@@ -514,7 +506,6 @@ impl DistinctStateFunc for AggregateUniqStringState {
514506
columns: ProjectedBlock,
515507
validity: Option<&Bitmap>,
516508
input_rows: usize,
517-
_skip_null: bool,
518509
) -> Result<()> {
519510
let view = columns[0].downcast::<StringType>().unwrap();
520511
match validity {

src/query/functions/src/aggregates/aggregator.rs

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -74,7 +74,6 @@ impl Aggregators {
7474
factory.register("sum_zero", AggregateSumZeroFunction::desc());
7575
factory.register("avg", aggregate_avg_function_desc());
7676
factory.register("uniq", aggregate_uniq_desc());
77-
factory.register("count_distinct", aggregate_count_distinct_desc());
7877

7978
factory.register("min", aggregate_min_function_desc());
8079
factory.register("max", aggregate_max_function_desc());

0 commit comments

Comments
 (0)