diff --git a/src/query/expression/src/lib.rs b/src/query/expression/src/lib.rs index 8ecea5372657e..ffde486064c24 100755 --- a/src/query/expression/src/lib.rs +++ b/src/query/expression/src/lib.rs @@ -55,6 +55,7 @@ mod kernels; mod property; mod register; pub mod row; +pub mod sampler; pub mod schema; pub mod type_check; pub mod types; diff --git a/src/query/expression/src/sampler/fixed_rate_sampler.rs b/src/query/expression/src/sampler/fixed_rate_sampler.rs new file mode 100644 index 0000000000000..5046096540ff3 --- /dev/null +++ b/src/query/expression/src/sampler/fixed_rate_sampler.rs @@ -0,0 +1,322 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::VecDeque; + +use rand::Rng; +use rate_sampling::Sampling; + +use crate::BlockRowIndex; +use crate::DataBlock; + +pub struct FixedRateSampler { + columns: Vec, + block_size: usize, + + indices: VecDeque>, + sparse_blocks: Vec, + pub dense_blocks: Vec, + + core: Sampling, + s: usize, +} + +impl FixedRateSampler { + pub fn new( + columns: Vec, + block_size: usize, + expectation: usize, + deviation: usize, + rng: R, + ) -> Option { + let mut core = Sampling::new_expectation(expectation, deviation, rng)?; + let s = core.search(); + Some(Self { + columns, + block_size, + indices: VecDeque::new(), + sparse_blocks: Vec::new(), + dense_blocks: Vec::new(), + core, + s, + }) + } + + pub fn add_block(&mut self, data: DataBlock) -> bool { + let rows = data.num_rows(); + assert!(rows > 0); + let block_idx = self.sparse_blocks.len() as u32; + let change = self.add_indices(rows, block_idx); + if change { + let columns = self + .columns + .iter() + .map(|&offset| data.get_by_offset(offset).to_owned()) + .collect::>(); + self.sparse_blocks.push(DataBlock::new(columns, rows)); + } + change + } + + fn add_indices(&mut self, rows: usize, block_idx: u32) -> bool { + let mut change = false; + let mut cur: usize = 0; + + while rows - cur > self.s { + change = true; + cur += self.s; + match self.indices.back_mut() { + Some(back) if back.len() < self.block_size => back.push((block_idx, cur as u32, 1)), + _ => { + let mut v = Vec::with_capacity(self.block_size); + v.push((block_idx, cur as u32, 1)); + self.indices.push_back(v) + } + } + self.s = self.core.search(); + } + + self.s -= rows - cur; + change + } + + pub fn compact_blocks(&mut self, is_final: bool) { + if self.sparse_blocks.is_empty() { + return; + } + + while self + .indices + .front() + .is_some_and(|indices| indices.len() == self.block_size) + { + let indices = self.indices.pop_front().unwrap(); + let block = DataBlock::take_blocks(&self.sparse_blocks, &indices, indices.len()); + self.dense_blocks.push(block) + } + + let Some(mut indices) = self.indices.pop_front() else { + self.sparse_blocks.clear(); + return; + }; + debug_assert!(self.indices.is_empty()); + + if is_final { + let block = DataBlock::take_blocks(&self.sparse_blocks, &indices, indices.len()); + self.sparse_blocks.clear(); + self.dense_blocks.push(block); + return; + } + + if self.sparse_blocks.len() == 1 { + self.indices.push_back(indices); + return; + } + let block = DataBlock::take_blocks(&self.sparse_blocks, &indices, indices.len()); + self.sparse_blocks.clear(); + for (i, index) in indices.iter_mut().enumerate() { + index.0 = 0; + index.1 = i as u32; + } + self.indices.push_back(indices); + self.sparse_blocks.push(block); + } + + pub fn memory_size(self) -> usize { + self.sparse_blocks.iter().map(|b| b.memory_size()).sum() + } + + pub fn num_rows(&self) -> usize { + self.indices.len() + } +} + +mod rate_sampling { + use std::ops::RangeInclusive; + + use rand::Rng; + + pub struct Sampling { + range: RangeInclusive, + r: R, + } + + impl Sampling { + #[allow(dead_code)] + pub fn new(range: RangeInclusive, r: R) -> Self { + Self { range, r } + } + + pub fn new_expectation(expectation: usize, deviation: usize, r: R) -> Option { + if expectation < deviation && usize::MAX - expectation >= deviation { + None + } else { + Some(Self { + range: expectation - deviation..=expectation + deviation, + r, + }) + } + } + + pub fn search(&mut self) -> usize { + self.r.gen_range(self.range.clone()) + } + } +} + +#[cfg(test)] +mod tests { + use rand::rngs::StdRng; + use rand::SeedableRng; + + use super::*; + use crate::types::Int32Type; + use crate::utils::FromData; + + #[test] + fn test_add_indices() { + let rng = StdRng::seed_from_u64(0); + let mut core = Sampling::new(3..=6, rng); + let s = core.search(); + let mut sampler = FixedRateSampler { + columns: vec![0], + block_size: 65536, + indices: VecDeque::new(), + sparse_blocks: Vec::new(), + dense_blocks: Vec::new(), + core, + s, + }; + + sampler.add_indices(15, 0); + + let want: Vec = vec![(0, 6, 1), (0, 9, 1), (0, 14, 1)]; + assert_eq!(Some(&want), sampler.indices.front()); + assert_eq!(3, sampler.s); + + sampler.add_indices(20, 1); + + let want: Vec = vec![ + (0, 6, 1), + (0, 9, 1), + (0, 14, 1), + (1, 3, 1), + (1, 9, 1), + (1, 15, 1), + (1, 18, 1), + ]; + assert_eq!(Some(&want), sampler.indices.front()); + assert_eq!(1, sampler.s); + } + + #[test] + fn test_compact_blocks() { + let rng = StdRng::seed_from_u64(0); + + let sparse_blocks = vec![ + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![1, 2, 3, 4, 5])]), + DataBlock::new_from_columns(vec![Int32Type::from_data(vec![6, 7, 8, 9, 10])]), + ]; + + let indices = VecDeque::from(vec![vec![(0, 1, 1), (0, 2, 1), (1, 0, 1)], vec![ + (1, 1, 1), + (1, 2, 1), + ]]); + + { + let core = Sampling::new(3..=6, rng.clone()); + let mut sampler = FixedRateSampler { + columns: vec![0], + block_size: 3, + indices: indices.clone(), + sparse_blocks: sparse_blocks.clone(), + dense_blocks: Vec::new(), + core, + s: 0, + }; + + sampler.compact_blocks(false); + + assert_eq!(Some(&vec![(0, 0, 1), (0, 1, 1)]), sampler.indices.front()); + assert_eq!( + &Int32Type::from_data(vec![7, 8]), + sampler.sparse_blocks[0].get_last_column() + ); + assert_eq!( + &Int32Type::from_data(vec![2, 3, 6]), + sampler.dense_blocks[0].get_last_column() + ); + + sampler.compact_blocks(true); + assert!(sampler.indices.is_empty()); + assert!(sampler.sparse_blocks.is_empty()); + } + + { + let core = Sampling::new(3..=6, rng.clone()); + let mut sampler = FixedRateSampler { + columns: vec![0], + block_size: 3, + indices: indices.clone(), + sparse_blocks: sparse_blocks.clone(), + dense_blocks: Vec::new(), + core, + s: 0, + }; + + sampler.compact_blocks(true); + + assert!(sampler.indices.is_empty()); + assert_eq!( + &Int32Type::from_data(vec![2, 3, 6]), + sampler.dense_blocks[0].get_last_column() + ); + assert_eq!( + &Int32Type::from_data(vec![7, 8]), + sampler.dense_blocks[1].get_last_column() + ); + } + + { + let indices = VecDeque::from(vec![vec![(0, 1, 1), (0, 2, 1), (1, 0, 1)], vec![ + (1, 1, 1), + (1, 2, 1), + (1, 3, 1), + ]]); + + let core = Sampling::new(3..=6, rng.clone()); + let mut sampler = FixedRateSampler { + columns: vec![0], + block_size: 3, + indices: indices.clone(), + sparse_blocks: sparse_blocks.clone(), + dense_blocks: Vec::new(), + core, + s: 0, + }; + + sampler.compact_blocks(false); + + assert!(sampler.indices.is_empty()); + assert_eq!( + &Int32Type::from_data(vec![2, 3, 6]), + sampler.dense_blocks[0].get_last_column() + ); + assert_eq!( + &Int32Type::from_data(vec![7, 8, 9]), + sampler.dense_blocks[1].get_last_column() + ); + } + } +} diff --git a/src/query/expression/src/sampler/fixed_size_sampler.rs b/src/query/expression/src/sampler/fixed_size_sampler.rs new file mode 100644 index 0000000000000..dd7500d40759b --- /dev/null +++ b/src/query/expression/src/sampler/fixed_size_sampler.rs @@ -0,0 +1,280 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::collections::HashSet; + +use rand::Rng; +use reservoir_sampling::AlgoL; + +use crate::BlockRowIndex; +use crate::DataBlock; + +pub struct FixedSizeSampler { + columns: Vec, + k: usize, + block_size: usize, + + blocks: Vec, + indices: Vec, + core: AlgoL, + + s: usize, +} + +impl FixedSizeSampler { + pub fn new(columns: Vec, block_size: usize, k: usize, rng: R) -> Self { + let core = AlgoL::new(k.try_into().unwrap(), rng); + Self { + columns, + blocks: Vec::new(), + indices: Vec::with_capacity(k), + k, + block_size, + core, + s: usize::MAX, + } + } + + pub fn add_block(&mut self, data: DataBlock) -> bool { + let rows = data.num_rows(); + assert!(rows > 0); + let block_idx = self.blocks.len() as u32; + let change = self.add_indices(rows, block_idx); + if change { + let columns = self + .columns + .iter() + .map(|&offset| data.get_by_offset(offset).to_owned()) + .collect::>(); + + self.blocks.push(DataBlock::new(columns, rows)); + if self.blocks.len() > self.k { + self.compact_blocks() + } + } + change + } + + fn add_indices(&mut self, rows: usize, block_idx: u32) -> bool { + let mut change = false; + let mut cur: usize = 0; + if self.indices.len() < self.k { + if rows + self.indices.len() <= self.k { + for i in 0..rows { + self.indices.push((block_idx, i as u32, 1)); + } + if self.indices.len() == self.k { + self.s = self.core.search() + } + return true; + } + while self.indices.len() < self.k { + self.indices.push((block_idx, cur as u32, 1)); + cur += 1; + } + self.s = self.core.search(); + change = true; + } + + while rows - cur > self.s { + change = true; + cur += self.s; + self.indices[self.core.pos()] = (block_idx, cur as u32, 1); + self.core.update_w(); + self.s = self.core.search(); + } + + self.s -= rows - cur; + change + } + + pub fn compact_indices(&mut self) { + compact_indices(&mut self.indices, &mut self.blocks) + } + + pub fn compact_blocks(&mut self) { + self.blocks = self + .indices + .chunks_mut(self.block_size) + .enumerate() + .map(|(i, indices)| { + let rows = indices.len(); + let block = DataBlock::take_blocks(&self.blocks, indices, rows); + + for (j, (b, r, _)) in indices.iter_mut().enumerate() { + *b = i as u32; + *r = j as u32; + } + + block + }) + .collect::>(); + } + + pub fn memory_size(self) -> usize { + self.blocks.iter().map(|b| b.memory_size()).sum() + } + + pub fn take_blocks(&mut self) -> Vec { + std::mem::take(&mut self.blocks) + } + + pub fn k(&self) -> usize { + self.k + } +} + +fn compact_indices(indices: &mut Vec, blocks: &mut Vec) { + let used_set: HashSet<_> = indices.iter().map(|&(b, _, _)| b).collect(); + if used_set.len() == blocks.len() { + return; + } + + let mut used: Vec<_> = used_set.iter().cloned().collect(); + used.sort(); + + *indices = indices + .drain(..) + .map(|(b, r, c)| (used.binary_search(&b).unwrap() as u32, r, c)) + .collect(); + + *blocks = blocks + .drain(..) + .enumerate() + .filter_map(|(i, block)| { + if used_set.contains(&(i as u32)) { + Some(block) + } else { + None + } + }) + .collect(); +} + +mod reservoir_sampling { + use std::num::NonZeroUsize; + + use rand::Rng; + + /// An implementation of Algorithm `L` (https://en.wikipedia.org/wiki/Reservoir_sampling#An_optimal_algorithm) + pub struct AlgoL { + k: usize, + w: f64, + + r: R, + } + + impl AlgoL { + pub fn new(k: NonZeroUsize, r: R) -> Self { + let mut al = Self { + k: k.into(), + w: 1.0, + r, + }; + al.update_w(); + al + } + + pub fn search(&mut self) -> usize { + let s = (self.rng().log2() / (1.0 - self.w).log2()).floor() + 1.0; + if s.is_normal() { + s as usize + } else { + usize::MAX + } + } + + pub fn pos(&mut self) -> usize { + self.r.sample(rand::distributions::Uniform::new(0, self.k)) + } + + pub fn update_w(&mut self) { + self.w *= (self.rng().log2() / self.k as f64).exp2(); // rng ^ (1/k) + } + + fn rng(&mut self) -> f64 { + self.r.sample(rand::distributions::Open01) + } + } + + #[cfg(test)] + mod tests { + use rand::rngs::StdRng; + use rand::SeedableRng; + + use super::*; + + #[test] + fn test_algo_l() { + let rng = StdRng::seed_from_u64(0); + let mut sample = vec![0_u64; 10]; + + let mut al = AlgoL::new(10.try_into().unwrap(), rng); + for (i, v) in sample.iter_mut().enumerate() { + *v = i as u64 + } + + let mut i = 9; + loop { + i += al.search(); + if i < 100 { + sample[al.pos()] = i as u64; + al.update_w() + } else { + break; + } + } + + let want: Vec = vec![69, 49, 53, 83, 4, 72, 88, 38, 45, 27]; + assert_eq!(want, sample) + } + } +} + +#[cfg(test)] +mod tests { + use rand::rngs::StdRng; + use rand::SeedableRng; + + use super::*; + + #[test] + fn test_add_indices() { + let rng = StdRng::seed_from_u64(0); + let k = 5; + let core = AlgoL::new(k.try_into().unwrap(), rng); + let mut sampler = FixedSizeSampler { + columns: vec![0], + k, + block_size: 65536, + blocks: Vec::new(), + indices: Vec::new(), + core, + s: usize::MAX, + }; + + sampler.add_indices(15, 0); + + let want: Vec = + vec![(0, 10, 1), (0, 1, 1), (0, 2, 1), (0, 8, 1), (0, 12, 1)]; + assert_eq!(&want, &sampler.indices); + assert_eq!(0, sampler.s); + + sampler.add_indices(20, 1); + + let want: Vec = vec![(1, 0, 1), (0, 1, 1), (1, 6, 1), (0, 8, 1), (1, 9, 1)]; + assert_eq!(&want, &sampler.indices); + assert_eq!(1, sampler.s); + } +} diff --git a/src/query/expression/src/sampler/mod.rs b/src/query/expression/src/sampler/mod.rs new file mode 100644 index 0000000000000..c34b36905bd0f --- /dev/null +++ b/src/query/expression/src/sampler/mod.rs @@ -0,0 +1,19 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +mod fixed_rate_sampler; +mod fixed_size_sampler; + +pub use fixed_rate_sampler::FixedRateSampler; +pub use fixed_size_sampler::FixedSizeSampler; diff --git a/src/query/expression/src/types.rs b/src/query/expression/src/types.rs index 4ac3f6079a897..c244358327574 100755 --- a/src/query/expression/src/types.rs +++ b/src/query/expression/src/types.rs @@ -332,7 +332,7 @@ impl DataType { pub trait ValueType: Debug + Clone + PartialEq + Sized + 'static { type Scalar: Debug + Clone + PartialEq; type ScalarRef<'a>: Debug + Clone + PartialEq; - type Column: Debug + Clone + PartialEq; + type Column: Debug + Clone + PartialEq + Send; type Domain: Debug + Clone + PartialEq; type ColumnIterator<'a>: Iterator> + TrustedLen; type ColumnBuilder: Debug + Clone; diff --git a/src/query/expression/src/types/number.rs b/src/query/expression/src/types/number.rs index d0734ed73270f..0462dff42aa3b 100644 --- a/src/query/expression/src/types/number.rs +++ b/src/query/expression/src/types/number.rs @@ -177,7 +177,7 @@ impl ValueType for NumberType { #[inline(always)] unsafe fn index_column_unchecked(col: &Self::Column, index: usize) -> Self::ScalarRef<'_> { - debug_assert!(index < col.len()); + debug_assert!(index < col.len(), "index: {} len: {}", index, col.len()); *col.get_unchecked(index) } diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/algorithm.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/algorithm.rs index 44e7aa513f799..1ccf4ed519240 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/algorithm.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/algorithm.rs @@ -30,7 +30,7 @@ pub type Cursor = RawCursor; pub trait SortAlgorithm: Send { const SHOULD_PEEK_TOP2: bool; - type Rows: Rows + Send; + type Rows: Rows; type PeekMut<'b>: Deref>> + DerefMut where Self: 'b; fn with_capacity(capacity: usize) -> Self; @@ -60,7 +60,7 @@ pub trait SortAlgorithm: Send { pub type HeapSort = BinaryHeap>>; -impl SortAlgorithm for BinaryHeap>> { +impl SortAlgorithm for BinaryHeap>> { const SHOULD_PEEK_TOP2: bool = true; type Rows = R; type PeekMut<'a> @@ -129,7 +129,7 @@ impl fmt::Debug for LoserTreeSort { } } -impl SortAlgorithm for LoserTreeSort { +impl SortAlgorithm for LoserTreeSort { const SHOULD_PEEK_TOP2: bool = false; type Rows = R; type PeekMut<'a> diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs index 4a1cc84a1b52f..367ce50914c95 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/k_way_merge_sort_partition.rs @@ -14,13 +14,11 @@ use std::cmp::Ordering; use std::collections::VecDeque; -use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::BlockMetaInfo; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; -use databend_common_expression::SortColumnDescription; use super::list_domain::Candidate; use super::list_domain::EndDomain; @@ -35,7 +33,6 @@ where S: SortedStream, { schema: DataSchemaRef, - sort_desc: Arc>, unsorted_streams: Vec, pending_streams: VecDeque, @@ -61,7 +58,6 @@ where pub fn new( schema: DataSchemaRef, streams: Vec, - sort_desc: Arc>, batch_rows: usize, limit: Option, ) -> Self { @@ -91,7 +87,6 @@ where Self { schema, - sort_desc, unsorted_streams: streams, pending_streams, buffer, @@ -126,7 +121,7 @@ where continue; } if let Some((block, col)) = input { - self.rows[i] = Some(R::from_column(&col, &self.sort_desc)?); + self.rows[i] = Some(R::from_column(&col)?); self.buffer[i] = block; } } @@ -153,7 +148,8 @@ where let mut candidate = Candidate::new(&self.rows, EndDomain::new(self.min_task, self.max_task)); - candidate.init(); + let ok = candidate.init(); + assert!(ok, "empty candidate"); // if candidate.is_small_task() { // todo: Consider loading multiple blocks at the same time so that we can avoid cutting out too small a task diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs index fab4f3c2daeb0..b3ad7f144988e 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/merger.rs @@ -14,13 +14,11 @@ use std::cmp::Reverse; use std::collections::VecDeque; -use std::sync::Arc; use databend_common_exception::Result; use databend_common_expression::Column; use databend_common_expression::DataBlock; use databend_common_expression::DataSchemaRef; -use databend_common_expression::SortColumnDescription; use super::algorithm::*; use super::Rows; @@ -48,7 +46,6 @@ where S: SortedStream, { schema: DataSchemaRef, - sort_desc: Arc>, unsorted_streams: Vec, sorted_cursors: A, buffer: Vec, @@ -69,7 +66,6 @@ where pub fn create( schema: DataSchemaRef, streams: Vec, - sort_desc: Arc>, batch_rows: usize, limit: Option, ) -> Self { @@ -87,7 +83,6 @@ where buffer, batch_rows, limit, - sort_desc, pending_streams, temp_sorted_num_rows: 0, temp_output_indices: vec![], @@ -119,7 +114,7 @@ where continue; } if let Some((block, col)) = input { - let rows = A::Rows::from_column(&col, &self.sort_desc)?; + let rows = A::Rows::from_column(&col)?; let cursor = Cursor::new(i, rows); self.sorted_cursors.push(i, Reverse(cursor)); self.buffer[i] = block; @@ -141,7 +136,7 @@ where continue; } if let Some((block, col)) = input { - let rows = A::Rows::from_column(&col, &self.sort_desc)?; + let rows = A::Rows::from_column(&col)?; let cursor = Cursor::new(i, rows); self.sorted_cursors.push(i, Reverse(cursor)); self.buffer[i] = block; @@ -339,6 +334,10 @@ where Ok(Some(self.build_output()?)) } + + pub fn streams(self) -> Vec { + self.unsorted_streams + } } pub type HeapMerger = Merger, S>; diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs index 894cd2c2b89f1..37507f5b0df56 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/common.rs @@ -37,6 +37,7 @@ use super::Rows; pub type CommonRows = BinaryColumn; impl Rows for BinaryColumn { + const IS_ASC_COLUMN: bool = true; type Item<'a> = &'a [u8]; type Type = BinaryType; @@ -52,7 +53,7 @@ impl Rows for BinaryColumn { Column::Binary(self.clone()) } - fn try_from_column(col: &Column, _: &[SortColumnDescription]) -> Option { + fn try_from_column(col: &Column) -> Option { col.as_binary().cloned() } diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs index 692450b828ca3..2261bbf1e979e 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/mod.rs @@ -14,6 +14,7 @@ mod common; mod simple; +mod utils; use std::fmt::Debug; @@ -27,6 +28,7 @@ use databend_common_expression::Column; use databend_common_expression::DataSchemaRef; use databend_common_expression::SortColumnDescription; pub use simple::*; +pub use utils::*; /// Convert columns to rows. pub trait RowConverter @@ -41,8 +43,9 @@ where Self: Sized /// Rows can be compared. pub trait Rows -where Self: Sized + Clone + Debug +where Self: Sized + Clone + Debug + Send { + const IS_ASC_COLUMN: bool; type Item<'a>: Ord + Debug where Self: 'a; type Type: ArgType; @@ -51,8 +54,8 @@ where Self: Sized + Clone + Debug fn row(&self, index: usize) -> Self::Item<'_>; fn to_column(&self) -> Column; - fn from_column(col: &Column, desc: &[SortColumnDescription]) -> Result { - Self::try_from_column(col, desc).ok_or_else(|| { + fn from_column(col: &Column) -> Result { + Self::try_from_column(col).ok_or_else(|| { ErrorCode::BadDataValueType(format!( "Order column type mismatched. Expected {} but got {}", Self::data_type(), @@ -61,7 +64,7 @@ where Self: Sized + Clone + Debug }) } - fn try_from_column(col: &Column, desc: &[SortColumnDescription]) -> Option; + fn try_from_column(col: &Column) -> Option; fn data_type() -> DataType { Self::Type::data_type() diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs index a6d2104c8f00e..5ae008061fee2 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/simple.rs @@ -39,8 +39,9 @@ pub struct SimpleRowsAsc { impl Rows for SimpleRowsAsc where T: ArgType, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { + const IS_ASC_COLUMN: bool = true; type Item<'a> = T::ScalarRef<'a> where Self: 'a; @@ -59,14 +60,9 @@ where T::upcast_column(self.inner.clone()) } - fn try_from_column(col: &Column, desc: &[SortColumnDescription]) -> Option { + fn try_from_column(col: &Column) -> Option { let inner = T::try_downcast_column(col)?; - - if desc[0].asc { - Some(Self { inner }) - } else { - None - } + Some(Self { inner }) } fn slice(&self, range: Range) -> Self { @@ -85,8 +81,9 @@ pub struct SimpleRowsDesc { impl Rows for SimpleRowsDesc where T: ArgType, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { + const IS_ASC_COLUMN: bool = false; type Item<'a> = Reverse> where Self: 'a; @@ -106,14 +103,9 @@ where T::upcast_column(self.inner.clone()) } - fn try_from_column(col: &Column, desc: &[SortColumnDescription]) -> Option { + fn try_from_column(col: &Column) -> Option { let inner = T::try_downcast_column(col)?; - - if !desc[0].asc { - Some(Self { inner }) - } else { - None - } + Some(Self { inner }) } fn slice(&self, range: Range) -> Self { @@ -132,7 +124,7 @@ pub struct SimpleRowConverter { impl RowConverter> for SimpleRowConverter where T: ArgType, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { fn create( sort_columns_descriptions: &[SortColumnDescription], @@ -151,7 +143,7 @@ where impl RowConverter> for SimpleRowConverter where T: ArgType, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { fn create( sort_columns_descriptions: &[SortColumnDescription], @@ -174,6 +166,7 @@ impl SimpleRowConverter { num_rows: usize, asc: bool, ) -> Result { + assert!(asc == R::IS_ASC_COLUMN); assert!(columns.len() == 1); let col = &columns[0]; if col.data_type != T::data_type() { @@ -192,12 +185,6 @@ impl SimpleRowConverter { Value::Column(c) => c.clone(), }; - let desc = [SortColumnDescription { - offset: 0, - asc, - nulls_first: false, - }]; - - R::from_column(&col, &desc) + R::from_column(&col) } } diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs new file mode 100644 index 0000000000000..f2e5defc600a9 --- /dev/null +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/rows/utils.rs @@ -0,0 +1,134 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use databend_common_exception::Result; +use databend_common_expression::row::RowConverter as CommonConverter; +use databend_common_expression::types::DataType; +use databend_common_expression::types::DateType; +use databend_common_expression::types::NumberDataType; +use databend_common_expression::types::NumberType; +use databend_common_expression::types::StringType; +use databend_common_expression::types::TimestampType; +use databend_common_expression::with_number_mapped_type; +use databend_common_expression::BlockEntry; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; +use match_template::match_template; + +use super::RowConverter; +use super::Rows; +use super::SimpleRowConverter; +use super::SimpleRowsAsc; +use super::SimpleRowsDesc; +use crate::sort::CommonRows; + +pub fn convert_rows( + schema: DataSchemaRef, + sort_desc: &[SortColumnDescription], + data: DataBlock, +) -> Result { + let num_rows = data.num_rows(); + + if sort_desc.len() == 1 { + let sort_type = schema.field(sort_desc[0].offset).data_type(); + let asc = sort_desc[0].asc; + + let offset = sort_desc[0].offset; + let columns = &data.columns()[offset..offset + 1]; + + match_template! { + T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], + match sort_type { + DataType::T => { + if asc { + convert_columns::,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) + } else { + convert_columns::,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) + } + }, + DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty { + NumberDataType::NUM_TYPE => { + if asc { + convert_columns::>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) + } else { + convert_columns::>,SimpleRowConverter<_>>(schema, sort_desc, columns, num_rows) + } + } + }), + _ => convert_columns::(schema, sort_desc, columns, num_rows), + } + } + } else { + let columns = sort_desc + .iter() + .map(|desc| data.get_by_offset(desc.offset).to_owned()) + .collect::>(); + convert_columns::(schema, sort_desc, &columns, num_rows) + } +} + +fn convert_columns>( + schema: DataSchemaRef, + sort_desc: &[SortColumnDescription], + columns: &[BlockEntry], + num_rows: usize, +) -> Result { + let mut converter = C::create(sort_desc, schema)?; + let rows = C::convert(&mut converter, columns, num_rows)?; + Ok(rows.to_column()) +} + +pub fn select_row_type(visitor: &mut impl RowsTypeVisitor) { + let sort_desc = visitor.sort_desc(); + if sort_desc.len() == 1 { + let schema = visitor.schema(); + let sort_type = schema.field(sort_desc[0].offset).data_type(); + let asc = sort_desc[0].asc; + + match_template! { + T = [ Date => DateType, Timestamp => TimestampType, String => StringType ], + match sort_type { + DataType::T => { + if asc { + visitor.visit_type::>() + } else { + visitor.visit_type::>() + } + }, + DataType::Number(num_ty) => with_number_mapped_type!(|NUM_TYPE| match num_ty { + NumberDataType::NUM_TYPE => { + if asc { + visitor.visit_type::>>() + } else { + visitor.visit_type::>>() + } + } + }), + _ => visitor.visit_type::() + } + } + } else { + visitor.visit_type::() + } +} + +pub trait RowsTypeVisitor { + fn schema(&self) -> DataSchemaRef; + + fn sort_desc(&self) -> &[SortColumnDescription]; + + fn visit_type(&mut self); +} diff --git a/src/query/pipeline/transforms/src/processors/transforms/sort/utils.rs b/src/query/pipeline/transforms/src/processors/transforms/sort/utils.rs index 04ffff807ad2a..835a2ddbb4a96 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/sort/utils.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/sort/utils.rs @@ -54,14 +54,19 @@ fn order_field_type(schema: &DataSchema, desc: &[SortColumnDescription]) -> Data DataType::Binary } +pub fn has_order_field(schema: &DataSchema) -> bool { + schema + .fields + .last() + .is_some_and(|f| f.name() == ORDER_COL_NAME) +} + #[inline(always)] pub fn add_order_field(schema: DataSchemaRef, desc: &[SortColumnDescription]) -> DataSchemaRef { - if let Some(f) = schema.fields.last() - && f.name() == ORDER_COL_NAME - { + if has_order_field(&schema) { schema } else { - let mut fields = schema.fields().clone(); + let mut fields = schema.fields.clone(); fields.push(DataField::new( ORDER_COL_NAME, order_field_type(&schema, desc), diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs index 581b854d7b536..929b431e02bd6 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_k_way_merge_sort.rs @@ -74,7 +74,6 @@ pub fn add_k_way_merge_sort( schema, stream_size, worker, - sort_desc, block_size, limit, remove_order_col, @@ -129,7 +128,6 @@ where A: SortAlgorithm schema: DataSchemaRef, stream_size: usize, worker: usize, - sort_desc: Arc>, block_size: usize, limit: Option, remove_order_col: bool, @@ -146,7 +144,6 @@ where (0..input).map(|_| InputPort::create()).collect(), self.worker, self.schema.clone(), - self.sort_desc.clone(), self.block_size, self.limit, ) @@ -165,7 +162,6 @@ where output, self.schema.clone(), batch_rows, - self.sort_desc.clone(), self.remove_order_col, ) } @@ -206,23 +202,21 @@ where } } -pub fn create_partitioner_pipe( +fn create_partitioner_pipe( inputs_port: Vec>, worker: usize, schema: DataSchemaRef, - sort_desc: Arc>, batch_rows: usize, limit: Option, ) -> Pipe where - R: Rows + Send + 'static, + R: Rows + 'static, { let outputs_port: Vec<_> = (0..worker).map(|_| OutputPort::create()).collect(); let processor = ProcessorPtr::create(Box::new(KWayMergePartitionerProcessor::::new( inputs_port.clone(), outputs_port.clone(), schema, - sort_desc, batch_rows, limit, ))); @@ -249,7 +243,6 @@ impl KWayMergePartitionerProcessor { inputs: Vec>, outputs: Vec>, schema: DataSchemaRef, - sort_desc: Arc>, batch_rows: usize, limit: Option, ) -> Self { @@ -259,7 +252,7 @@ impl KWayMergePartitionerProcessor { .collect::>(); Self { - partitioner: KWaySortPartitioner::new(schema, streams, sort_desc, batch_rows, limit), + partitioner: KWaySortPartitioner::new(schema, streams, batch_rows, limit), inputs, outputs, task: VecDeque::new(), @@ -285,7 +278,7 @@ impl KWayMergePartitionerProcessor { } impl Processor for KWayMergePartitionerProcessor -where R: Rows + Send + 'static +where R: Rows + 'static { fn name(&self) -> String { "KWayMergePartitioner".to_string() @@ -356,7 +349,6 @@ where A: SortAlgorithm stream_size: usize, schema: DataSchemaRef, batch_rows: usize, - sort_desc: Arc>, remove_order_col: bool, buffer: Vec, @@ -374,7 +366,6 @@ where A: SortAlgorithm output: Arc, schema: DataSchemaRef, batch_rows: usize, - sort_desc: Arc>, remove_order_col: bool, ) -> Self { Self { @@ -384,7 +375,6 @@ where A: SortAlgorithm stream_size, schema, batch_rows, - sort_desc, remove_order_col, buffer: Vec::new(), task: None, @@ -511,10 +501,9 @@ where A: SortAlgorithm + 'static debug_assert!(self.ready()); let task = self.task.take().unwrap(); - let mut merger = Merger::::create( + let mut merger = Merger::::create( self.schema.clone(), self.streams(), - self.sort_desc.clone(), if task.total > self.batch_rows { task.total / (task.total / self.batch_rows) } else { diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs index fbf6bc052a36a..148780c8902ff 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_multi_sort_merge.rs @@ -220,15 +220,14 @@ where A: SortAlgorithm schema: DataSchemaRef, block_size: usize, limit: Option, - sort_desc: Arc>, + _sort_desc: Arc>, remove_order_col: bool, ) -> Result { let streams = inputs .iter() .map(|i| InputBlockStream::new(i.clone(), remove_order_col)) .collect::>(); - let merger = - Merger::::create(schema, streams, sort_desc, block_size, limit); + let merger = Merger::::create(schema, streams, block_size, limit); Ok(Self { merger, inputs, diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs index a909b21821ed5..c37c8c27eaaee 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge.rs @@ -42,7 +42,6 @@ use crate::processors::sort::Merger; /// For merge sort with limit, see [`super::transform_sort_merge_limit`] pub struct TransformSortMerge { schema: DataSchemaRef, - sort_desc: Arc>, enable_loser_tree: bool, block_size: usize, @@ -60,13 +59,12 @@ pub struct TransformSortMerge { impl TransformSortMerge { pub fn create( schema: DataSchemaRef, - sort_desc: Arc>, + _sort_desc: Arc>, block_size: usize, enable_loser_tree: bool, ) -> Self { TransformSortMerge { schema, - sort_desc, enable_loser_tree, block_size, buffer: vec![], @@ -78,7 +76,7 @@ impl TransformSortMerge { } } -impl MergeSort for TransformSortMerge { +impl MergeSort for TransformSortMerge { const NAME: &'static str = "TransformSortMerge"; fn add_block(&mut self, block: DataBlock, init_rows: R, _input_index: usize) -> Result<()> { @@ -133,7 +131,7 @@ impl MergeSort for TransformSortMerge { } } -impl TransformSortMerge { +impl TransformSortMerge { fn merge_sort(&mut self, batch_size: usize) -> Result> { if self.buffer.is_empty() { return Ok(vec![]); @@ -170,16 +168,10 @@ impl TransformSortMerge { batch_size: usize, size_hint: usize, ) -> Result> { - let streams = self.buffer.drain(..).collect::>(); + let streams = self.buffer.drain(..).collect::>(); let mut result = Vec::with_capacity(size_hint); - let mut merger = Merger::::create( - self.schema.clone(), - streams, - self.sort_desc.clone(), - batch_size, - None, - ); + let mut merger = Merger::::create(self.schema.clone(), streams, batch_size, None); while let Some(block) = merger.next_block()? { if unlikely(self.aborting.load(Ordering::Relaxed)) { diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs index 94948d40b5c9c..16066059ad705 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_sort_merge_base.rs @@ -197,14 +197,14 @@ where impl AccumulatingTransform for TransformSortMergeBase where M: MergeSort + Send + Sync, - R: Rows + Send + Sync, + R: Rows + Sync, Converter: RowConverter + Send + Sync, { const NAME: &'static str = M::NAME; fn transform(&mut self, mut block: DataBlock) -> Result> { let rows = if self.order_col_generated { - let rows = R::from_column(block.get_last_column(), &self.sort_desc)?; + let rows = R::from_column(block.get_last_column())?; if !self.output_order_col { // The next processor could be a sort spill processor which need order column. // And the order column will be removed in that processor. @@ -370,7 +370,7 @@ impl TransformSortMergeBuilder { where T: ArgType + Send + Sync, T::Column: Send + Sync, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { if asc { self.build_sort_rows::, SimpleRowConverter>() @@ -381,7 +381,7 @@ impl TransformSortMergeBuilder { fn build_sort_rows(self) -> Result> where - R: Rows + Send + Sync + 'static, + R: Rows + Sync + 'static, C: RowConverter + Send + Sync + 'static, { Ok(AccumulatingTransformer::create( @@ -431,7 +431,7 @@ impl TransformSortMergeBuilder { where T: ArgType + Send + Sync, T::Column: Send + Sync, - for<'a> T::ScalarRef<'a>: Ord, + for<'a> T::ScalarRef<'a>: Ord + Send, { if asc { self.build_sort_limit_rows::, SimpleRowConverter>() @@ -442,7 +442,7 @@ impl TransformSortMergeBuilder { fn build_sort_limit_rows(self) -> Result> where - R: Rows + Send + Sync + 'static, + R: Rows + Sync + 'static, C: RowConverter + Send + Sync + 'static, { Ok(AccumulatingTransformer::create( diff --git a/src/query/pipeline/transforms/tests/it/merger.rs b/src/query/pipeline/transforms/tests/it/merger.rs index a228c606add7c..433fa3034cbde 100644 --- a/src/query/pipeline/transforms/tests/it/merger.rs +++ b/src/query/pipeline/transforms/tests/it/merger.rs @@ -13,7 +13,6 @@ // limitations under the License. use std::collections::VecDeque; -use std::sync::Arc; use databend_common_base::base::tokio; use databend_common_exception::Result; @@ -26,7 +25,6 @@ use databend_common_expression::DataBlock; use databend_common_expression::DataField; use databend_common_expression::DataSchemaRefExt; use databend_common_expression::FromData; -use databend_common_expression::SortColumnDescription; use databend_common_pipeline_transforms::processors::sort::algorithm::HeapSort; use databend_common_pipeline_transforms::processors::sort::algorithm::LoserTreeSort; use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; @@ -146,17 +144,12 @@ fn create_test_merger( "a", DataType::Number(NumberDataType::Int32), )]); - let sort_desc = Arc::new(vec![SortColumnDescription { - offset: 0, - asc: true, - nulls_first: true, - }]); let streams = input .into_iter() .map(|v| TestStream::new(v.into_iter().collect::>())) .collect::>(); - TestMerger::::create(schema, streams, sort_desc, 4, limit) + TestMerger::::create(schema, streams, 4, limit) } fn check_result(result: Vec, expected: DataBlock) { diff --git a/src/query/service/Cargo.toml b/src/query/service/Cargo.toml index 2985dcdedc203..1ef9bdba27f68 100644 --- a/src/query/service/Cargo.toml +++ b/src/query/service/Cargo.toml @@ -51,7 +51,6 @@ chrono = { workspace = true } chrono-tz = { workspace = true } ctor = { workspace = true } dashmap = { workspace = true } - databend-common-ast = { workspace = true } databend-common-base = { workspace = true } databend-common-cache = { workspace = true } diff --git a/src/query/service/src/pipelines/builders/builder_sort.rs b/src/query/service/src/pipelines/builders/builder_sort.rs index e05faf0c53b42..ced60f0c5c9eb 100644 --- a/src/query/service/src/pipelines/builders/builder_sort.rs +++ b/src/query/service/src/pipelines/builders/builder_sort.rs @@ -34,6 +34,7 @@ use databend_common_storage::DataOperator; use databend_common_storages_fuse::TableContext; use crate::pipelines::processors::transforms::create_transform_sort_spill; +use crate::pipelines::processors::transforms::create_transform_stream_sort_spill; use crate::pipelines::PipelineBuilder; use crate::sessions::QueryContext; use crate::spillers::Spiller; @@ -285,19 +286,35 @@ impl SortPipelineBuilder { disk_spill: None, use_parquet: settings.get_spilling_file_format()?.is_parquet(), }; + let settings = self.ctx.get_settings(); + let enable_experimental_stream_sort_spilling = + settings.get_enable_experimental_stream_sort_spilling()?; pipeline.add_transform(|input, output| { let op = DataOperator::instance().operator(); let spiller = Spiller::create(self.ctx.clone(), op, config.clone())?; - Ok(ProcessorPtr::create(create_transform_sort_spill( - input, - output, - schema.clone(), - self.sort_desc.clone(), - self.limit, - spiller, - output_order_col, - enable_loser_tree, - ))) + if enable_experimental_stream_sort_spilling { + Ok(ProcessorPtr::create(create_transform_stream_sort_spill( + input, + output, + schema.clone(), + self.sort_desc.clone(), + self.limit, + spiller, + output_order_col, + enable_loser_tree, + ))) + } else { + Ok(ProcessorPtr::create(create_transform_sort_spill( + input, + output, + schema.clone(), + self.sort_desc.clone(), + self.limit, + spiller, + output_order_col, + enable_loser_tree, + ))) + } })?; } diff --git a/src/query/service/src/pipelines/processors/transforms/mod.rs b/src/query/service/src/pipelines/processors/transforms/mod.rs index e4d503c0f37f6..519fab4ad6d9f 100644 --- a/src/query/service/src/pipelines/processors/transforms/mod.rs +++ b/src/query/service/src/pipelines/processors/transforms/mod.rs @@ -35,6 +35,7 @@ mod transform_resort_addon; mod transform_resort_addon_without_source_schema; mod transform_sort_spill; mod transform_srf; +mod transform_stream_sort_spill; mod transform_udf_script; mod transform_udf_server; mod window; @@ -61,6 +62,7 @@ pub use transform_resort_addon::TransformResortAddOn; pub use transform_resort_addon_without_source_schema::TransformResortAddOnWithoutSourceSchema; pub use transform_sort_spill::create_transform_sort_spill; pub use transform_srf::TransformSRF; +pub use transform_stream_sort_spill::*; pub use transform_udf_script::TransformUdfScript; pub use transform_udf_server::TransformUdfServer; pub use window::*; diff --git a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs index 50740346e0e9f..c65d1e964b0b7 100644 --- a/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs +++ b/src/query/service/src/pipelines/processors/transforms/transform_sort_spill.rs @@ -85,8 +85,6 @@ pub struct TransformSortSpill { /// If `ummerged_blocks.len()` < `num_merge`, /// we can use a final merger to merge the last few sorted streams to reduce IO. final_merger: Option>, - - sort_desc: Arc>, } #[inline(always)] @@ -244,7 +242,7 @@ where input: Arc, output: Arc, schema: DataSchemaRef, - sort_desc: Arc>, + _sort_desc: Arc>, limit: Option, spiller: Spiller, output_order_col: bool, @@ -263,7 +261,6 @@ where unmerged_blocks: VecDeque::new(), final_merger: None, batch_rows: 0, - sort_desc, } } @@ -306,13 +303,7 @@ where streams.push(stream); } - Merger::::create( - self.schema.clone(), - streams, - self.sort_desc.clone(), - self.batch_rows, - self.limit, - ) + Merger::::create(self.schema.clone(), streams, self.batch_rows, self.limit) } /// Do an external merge sort until there is only one sorted stream. diff --git a/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs b/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs new file mode 100644 index 0000000000000..7078ca4ee2a77 --- /dev/null +++ b/src/query/service/src/pipelines/processors/transforms/transform_stream_sort_spill.rs @@ -0,0 +1,1150 @@ +// Copyright 2021 Datafuse Labs +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::any::Any; +use std::collections::VecDeque; +use std::fmt; +use std::fmt::Debug; +use std::fmt::Formatter; +use std::mem; +use std::sync::Arc; + +use databend_common_column::bitmap::MutableBitmap; +use databend_common_exception::Result; +use databend_common_expression::sampler::FixedRateSampler; +use databend_common_expression::BlockMetaInfoDowncast; +use databend_common_expression::Column; +use databend_common_expression::DataBlock; +use databend_common_expression::DataSchemaRef; +use databend_common_expression::SortColumnDescription; +use databend_common_pipeline_core::processors::Event; +use databend_common_pipeline_core::processors::InputPort; +use databend_common_pipeline_core::processors::OutputPort; +use databend_common_pipeline_core::processors::Processor; +use databend_common_pipeline_transforms::processors::sort::algorithm::HeapSort; +use databend_common_pipeline_transforms::processors::sort::algorithm::LoserTreeSort; +use databend_common_pipeline_transforms::processors::sort::algorithm::SortAlgorithm; +use databend_common_pipeline_transforms::processors::sort::select_row_type; +use databend_common_pipeline_transforms::processors::sort::utils::has_order_field; +use databend_common_pipeline_transforms::processors::sort::Merger; +use databend_common_pipeline_transforms::processors::sort::Rows; +use databend_common_pipeline_transforms::processors::sort::RowsTypeVisitor; +use databend_common_pipeline_transforms::processors::sort::SortSpillMeta; +use databend_common_pipeline_transforms::processors::sort::SortSpillMetaWithParams; +use databend_common_pipeline_transforms::processors::sort::SortedStream; +use databend_common_pipeline_transforms::processors::SortSpillParams; +use rand::rngs::StdRng; +use rand::SeedableRng; + +use crate::spillers::Layout; +use crate::spillers::Location; +use crate::spillers::Spiller; + +enum State { + /// The initial state of the processor. + Init, + /// This state means the processor will never spill incoming blocks. + Pass, + /// This state means the processor will spill incoming blocks except the last block. + Spill, + /// This state means the processor is restoring spilled blocks. + Restore, + /// Finish the process. + Finish, +} + +pub struct TransformStreamSortSpill { + input: Arc, + output: Arc, + schema: DataSchemaRef, + sort_row_offset: usize, + output_order_col: bool, + limit: Option, + spiller: Arc, + + input_data: Vec, + output_data: Option, + state: State, + + sampler: Option>, + /// Partition boundaries for restoring and sorting blocks, stored in reverse order of Column. + /// Each boundary represents a cutoff point where data less than or equal to it belongs to one partition. + bounds: Vec, + cur_bound: Option, + + batch_rows: usize, + /// Blocks to merge one time. + num_merge: usize, + + subsequent: Vec>, + current: Vec>, + + output_merger: Option>>, +} + +#[inline(always)] +fn take_spill_meta(block: &mut DataBlock) -> Option> { + block.take_meta().map(|meta| { + if SortSpillMeta::downcast_ref_from(&meta).is_some() { + return None; + } + Some( + SortSpillMetaWithParams::downcast_from(meta) + .expect("unknown meta type") + .0, + ) + }) +} + +#[async_trait::async_trait] +impl Processor for TransformStreamSortSpill +where + A: SortAlgorithm + 'static, + A::Rows: 'static, +{ + fn name(&self) -> String { + String::from("TransformSortSpill") + } + + fn as_any(&mut self) -> &mut dyn Any { + self + } + + fn event(&mut self) -> Result { + if self.output.is_finished() { + self.input.finish(); + return Ok(Event::Finished); + } + + if !self.output.can_push() { + match self.state { + State::Init => { + self.input.set_need_data(); + return Ok(Event::NeedData); + } + State::Pass | State::Finish => { + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + State::Spill | State::Restore => { + // may should pull upstream? + self.input.set_not_need_data(); + return Ok(Event::NeedConsume); + } + } + } + + if self.output_data.is_some() { + match self.state { + State::Pass | State::Restore | State::Finish => { + let block = self.output_data.take().unwrap(); + self.output_block(block); + return Ok(Event::NeedConsume); + } + _ => unreachable!(), + } + } + + if matches!(self.state, State::Finish) { + assert!(self.input.is_finished()); + self.output.finish(); + return Ok(Event::Finished); + } + + if self.input.has_data() { + let mut block = self.input.pull_data().unwrap()?; + let meta = take_spill_meta(&mut block); + return match self.state { + State::Init => match meta { + Some(Some(params)) => { + // Need to spill this block. + self.batch_rows = params.batch_rows; + self.num_merge = params.num_merge; + + log::info!( + "batch_rows {} num_merge {}", + params.batch_rows, + params.num_merge + ); + + self.input_data.push(block); + self.state = State::Spill; + + self.sampler = Some( + FixedRateSampler::new( + vec![self.sort_row_offset], + self.batch_rows, + self.batch_rows * self.num_merge, + self.batch_rows, + StdRng::seed_from_u64(rand::random()), + ) + .unwrap(), + ); + + self.input.set_need_data(); + Ok(Event::NeedData) + } + Some(None) => unreachable!(), + None => { + // If we get a memory block at initial state, it means we will never spill data. + self.output_block(block); + self.state = State::Pass; + Ok(Event::NeedConsume) + } + }, + State::Pass => { + debug_assert!(meta.is_none()); + self.output_block(block); + Ok(Event::NeedConsume) + } + State::Spill => { + self.input_data.push(block); + if self.input_rows() + self.subsequent_memory_rows() > self.max_rows() { + Ok(Event::Async) + } else { + self.input.set_need_data(); + Ok(Event::NeedData) + } + } + _ => unreachable!(), + }; + } + + if self.input.is_finished() { + return match &self.state { + State::Init | State::Pass | State::Finish => { + self.output.finish(); + Ok(Event::Finished) + } + State::Spill => { + if self.input_data.is_empty() { + self.state = State::Restore; + } + Ok(Event::Async) + } + State::Restore => Ok(Event::Async), + }; + } + + self.input.set_need_data(); + Ok(Event::NeedData) + } + + #[async_backtrace::framed] + async fn async_process(&mut self) -> Result<()> { + match &self.state { + State::Spill => { + let input = self.input_rows(); + let subsequent = self.subsequent_memory_rows(); + let max = self.max_rows(); + + if subsequent > 0 && subsequent + input > max { + self.subsequent_spill_last(subsequent + input - max).await?; + } + let finished = self.input.is_finished(); + if input > max || finished && input > 0 { + self.sort_input_data()?; + } + if finished { + self.state = State::Restore; + } + Ok(()) + } + State::Restore => { + debug_assert!(self.input_data.is_empty()); + self.on_restore().await + } + _ => unreachable!(), + } + } +} + +impl TransformStreamSortSpill +where + A: SortAlgorithm + 'static, + A::Rows: 'static, +{ + pub fn new( + input: Arc, + output: Arc, + schema: DataSchemaRef, + limit: Option, + spiller: Spiller, + sort_row_offset: usize, + output_order_col: bool, + ) -> Self { + Self { + input, + output, + schema, + sort_row_offset, + output_order_col, + limit, + input_data: Vec::new(), + output_data: None, + state: State::Init, + spiller: Arc::new(spiller), + sampler: None, + bounds: Vec::new(), + cur_bound: None, + batch_rows: 0, + num_merge: 0, + subsequent: Vec::new(), + current: Vec::new(), + output_merger: None, + } + } + + fn output_block(&self, mut block: DataBlock) { + if !self.output_order_col { + block.pop_columns(1); + } + self.output.push_data(Ok(block)); + } + + fn sort_input_data(&mut self) -> Result<()> { + let sampler = self.sampler.as_mut().unwrap(); + for data in &self.input_data { + sampler.add_block(data.clone()); + } + sampler.compact_blocks(false); + + let sorted = if self.input_data.len() == 1 { + let data = self.input_data.pop().unwrap(); + vec![SpillableBlock::new(data, self.sort_row_offset)].into() + } else { + let streams = self + .input_data + .drain(..) + .map(|data| DataBlockStream::new(data, self.sort_row_offset)) + .collect(); + let mut merger = + Merger::::create(self.schema.clone(), streams, self.batch_rows, self.limit); + + let mut sorted = VecDeque::new(); + while let Some(data) = merger.next_block()? { + sorted.push_back(SpillableBlock::new(data, self.sort_row_offset)); + } + debug_assert!(merger.is_finished()); + sorted + }; + + let stream = self.new_stream(sorted, None); + self.subsequent.push(stream); + Ok(()) + } + + async fn subsequent_spill_last(&mut self, target_rows: usize) -> Result<()> { + let Some(s) = self.subsequent.last_mut() else { + return Ok(()); + }; + + let mut released = 0; + for b in s.blocks.iter_mut().rev() { + if b.data.is_some() { + b.spill(&self.spiller).await?; + released += b.rows; + } + if released >= target_rows { + break; + } + } + + Ok(()) + } + + async fn on_restore(&mut self) -> Result<()> { + if self.sampler.is_some() { + self.determine_bounds()?; + } + + if self.output_merger.is_some() { + if self.restore_and_output().await? { + self.state = State::Finish; + } + return Ok(()); + } + + while self.current.is_empty() { + self.choice_streams_by_bound(); + } + + if self.current.len() > self.num_merge { + self.merge_current().await + } else { + if self.restore_and_output().await? { + self.state = State::Finish; + } + Ok(()) + } + } + + async fn merge_current(&mut self) -> Result<()> { + self.subsequent_spill_all().await?; + for (i, s) in self.current.iter_mut().rev().enumerate() { + if i < self.num_merge { + s.spill(1).await?; + } else { + s.spill(0).await?; + } + } + + let streams = self + .current + .drain(self.current.len() - self.num_merge..) + .collect(); + + let mut merger = + Merger::::create(self.schema.clone(), streams, self.batch_rows, None); + + let mut sorted = VecDeque::new(); + while let Some(data) = merger.async_next_block().await? { + let mut block = SpillableBlock::new(data, self.sort_row_offset); + block.spill(&self.spiller).await?; + sorted.push_back(block); + } + debug_assert!(merger.is_finished()); + + let stream = self.new_stream(sorted, self.cur_bound.clone()); + self.current.insert(0, stream); + self.subsequent + .extend(merger.streams().into_iter().filter(|s| !s.is_empty())); + Ok(()) + } + + async fn restore_and_output(&mut self) -> Result { + let merger = match self.output_merger.as_mut() { + Some(merger) => merger, + None => { + debug_assert!(!self.current.is_empty()); + if self.current.len() == 1 { + let mut s = self.current.pop().unwrap(); + s.restore_first().await?; + self.output_data = Some(s.take_next_bounded_block()); + + if !s.is_empty() { + if s.should_include_first() { + self.current.push(s); + } else { + self.subsequent.push(s); + } + return Ok(false); + } + + return Ok(self.subsequent.is_empty()); + } + + self.sort_spill().await?; + + let streams = mem::take(&mut self.current); + let merger = + Merger::::create(self.schema.clone(), streams, self.batch_rows, None); + self.output_merger.insert(merger) + } + }; + + let Some(data) = merger.async_next_block().await? else { + debug_assert!(merger.is_finished()); + let streams = self.output_merger.take().unwrap().streams(); + self.subsequent + .extend(streams.into_iter().filter(|s| !s.is_empty())); + return Ok(self.subsequent.is_empty()); + }; + + let mut sorted = BoundBlockStream { + blocks: VecDeque::new(), + bound: self.cur_bound.clone(), + sort_row_offset: self.sort_row_offset, + spiller: self.spiller.clone(), + }; + sorted + .blocks + .push_back(SpillableBlock::new(data, self.sort_row_offset)); + + if sorted.should_include_first() { + self.output_data = Some(sorted.take_next_bounded_block()); + if sorted.is_empty() { + return Ok(false); + } + } + + while let Some(data) = merger.async_next_block().await? { + let mut block = SpillableBlock::new(data, self.sort_row_offset); + block.spill(&self.spiller).await?; + sorted.blocks.push_back(block); + } + debug_assert!(merger.is_finished()); + + if !sorted.is_empty() { + self.subsequent.push(sorted); + } + let streams = self.output_merger.take().unwrap().streams(); + self.subsequent + .extend(streams.into_iter().filter(|s| !s.is_empty())); + Ok(self.subsequent.is_empty()) + } + + async fn sort_spill(&mut self) -> Result<()> { + let need = self + .current + .iter() + .map(|s| if s.blocks[0].data.is_none() { 1 } else { 0 }) + .sum::() + * self.batch_rows; + + if need + self.subsequent_memory_rows() + self.current_memory_rows() < self.max_rows() { + return Ok(()); + } + + let mut unspilled = self + .current + .iter_mut() + .chain(self.subsequent.iter_mut()) + .flat_map(|s| s.blocks.iter_mut()) + .filter(|s| s.data.is_some()) + .collect::>(); + + unspilled.sort_by(|s1, s2| { + let r1 = s1.domain::(); + let r2 = s2.domain::(); + let cmp = r1.first().cmp(&r2.first()); + cmp + }); + + let mut released = 0; + while let Some(block) = unspilled.pop() { + if released >= need { + break; + } + + block.spill(&self.spiller).await?; + released += block.rows; + } + + Ok(()) + } + + async fn subsequent_spill_all(&mut self) -> Result<()> { + for s in &mut self.subsequent { + s.spill(0).await?; + } + Ok(()) + } + + fn choice_streams_by_bound(&mut self) { + debug_assert!(self.current.is_empty()); + debug_assert!(!self.subsequent.is_empty()); + + self.cur_bound = self.next_bound(); + if self.cur_bound.is_none() { + mem::swap(&mut self.current, &mut self.subsequent); + for s in &mut self.current { + s.bound = None + } + return; + } + + (self.current, self.subsequent) = self + .subsequent + .drain(..) + .map(|mut s| { + s.bound = self.cur_bound.clone(); + s + }) + .partition(|s| s.should_include_first()); + + self.current.sort_by_key(|s| s.blocks[0].data.is_some()); + } + + fn input_rows(&self) -> usize { + self.input_data.iter().map(|b| b.num_rows()).sum::() + } + + fn subsequent_memory_rows(&self) -> usize { + self.subsequent + .iter() + .map(|s| s.in_memory_rows()) + .sum::() + } + + fn current_memory_rows(&self) -> usize { + self.current + .iter() + .map(|s| s.in_memory_rows()) + .sum::() + } + + fn max_rows(&self) -> usize { + debug_assert!(self.num_merge > 0); + self.num_merge * self.batch_rows + } + + fn new_stream( + &mut self, + blocks: VecDeque, + bound: Option, + ) -> BoundBlockStream { + BoundBlockStream:: { + blocks, + bound, + sort_row_offset: self.sort_row_offset, + spiller: self.spiller.clone(), + } + } + + fn next_bound(&mut self) -> Option { + let bounds = self.bounds.last_mut()?; + let bound = match bounds.len() { + 0 => unreachable!(), + 1 => self.bounds.pop().unwrap(), + _ => { + let bound = bounds.slice(0..1).maybe_gc(); + *bounds = bounds.slice(1..bounds.len()); + bound + } + }; + Some(A::Rows::from_column(&bound).unwrap()) + } + + fn determine_bounds(&mut self) -> Result<()> { + let mut sampler = self.sampler.take().unwrap(); + sampler.compact_blocks(true); + let sampled_rows = sampler.dense_blocks; + + match sampled_rows.len() { + 0 => (), + 1 => self.bounds.push( + DataBlock::sort( + &sampled_rows[0], + &[SortColumnDescription { + offset: 0, + asc: A::Rows::IS_ASC_COLUMN, + nulls_first: false, + }], + None, + )? + .get_last_column() + .clone(), + ), + _ => { + let streams = sampled_rows + .into_iter() + .map(|data| { + let data = DataBlock::sort( + &data, + &[SortColumnDescription { + offset: 0, + asc: A::Rows::IS_ASC_COLUMN, + nulls_first: false, + }], + None, + ) + .unwrap(); + DataBlockStream::new(data, 0) + }) + .collect::>(); + + let schema = Arc::new(self.schema.project(&[self.schema.num_fields() - 1])); + let mut merger = Merger::::create(schema, streams, self.batch_rows, None); + + let mut blocks = Vec::new(); + while let Some(block) = merger.next_block()? { + blocks.push(block) + } + debug_assert!(merger.is_finished()); + + self.bounds = blocks + .iter() + .rev() + .map(|b| b.get_last_column().clone()) + .collect::>(); + } + }; + + Ok(()) + } + + #[allow(unused)] + pub fn format_memory_usage(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("TransformStreamSortSpill") + .field("num_merge", &self.num_merge) + .field("batch_rows", &self.batch_rows) + .field("input_rows", &self.input_rows()) + .field("current_memory_rows", &self.current_memory_rows()) + .field("current", &self.current) + .field("subsequent_memory_rows", &self.subsequent_memory_rows()) + .field("subsequent", &self.subsequent) + .field("has_output_merger", &self.output_merger.is_some()) + .field("cur_bound", &self.cur_bound) + .finish() + } +} + +struct SpillableBlock { + data: Option, + rows: usize, + location: Option<(Location, Layout)>, + domain: Column, + processed: usize, +} + +impl SpillableBlock { + fn new(data: DataBlock, sort_row_offset: usize) -> Self { + Self { + location: None, + processed: 0, + rows: data.num_rows(), + domain: get_domain(sort_column(&data, sort_row_offset)), + data: Some(data), + } + } + + fn slice(&mut self, pos: usize, sort_row_offset: usize) -> DataBlock { + let data = self.data.as_ref().unwrap(); + + let left = data.slice(0..pos); + let right = data.slice(pos..data.num_rows()); + + self.domain = get_domain(sort_column(&right, sort_row_offset)); + self.rows = right.num_rows(); + self.data = Some(right); + if self.location.is_some() { + self.processed += pos; + } + left + } + + fn domain(&self) -> R { + R::from_column(&self.domain).unwrap() + } + + async fn spill(&mut self, spiller: &Spiller) -> Result<()> { + let data = self.data.take().unwrap(); + if self.location.is_none() { + let location = spiller.spill_unmanage(vec![data]).await?; + self.location = Some(location); + } + Ok(()) + } +} + +impl Debug for SpillableBlock { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("SpillableBlock") + .field("has_data", &self.data.is_some()) + .field("rows", &self.rows) + .field("location", &self.location) + .field("domain", &self.domain) + .field("processed", &self.processed) + .finish() + } +} + +fn sort_column(data: &DataBlock, sort_row_offset: usize) -> &Column { + data.get_by_offset(sort_row_offset) + .value + .as_column() + .unwrap() +} + +/// BoundBlockStream is a stream of blocks that are cutoff less or equal than bound. +struct BoundBlockStream { + blocks: VecDeque, + bound: Option, + sort_row_offset: usize, + spiller: Arc, +} + +impl Debug for BoundBlockStream { + fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result { + f.debug_struct("BoundBlockStream") + .field("blocks", &self.blocks) + .field("bound", &self.bound) + .field("sort_row_offset", &self.sort_row_offset) + .finish() + } +} + +#[async_trait::async_trait] +impl SortedStream for BoundBlockStream { + async fn async_next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { + if self.should_include_first() { + self.restore_first().await?; + let data = self.take_next_block(); + let col = sort_column(&data, self.sort_row_offset).clone(); + Ok((Some((data, col)), false)) + } else { + Ok((None, false)) + } + } +} + +impl BoundBlockStream { + fn should_include_first(&self) -> bool { + let Some(block) = self.blocks.front() else { + return false; + }; + + match &self.bound { + Some(bound) => block.domain::().first() <= bound.row(0), + None => true, + } + } + + fn take_next_bounded_block(&mut self) -> DataBlock { + let Some(bound) = &self.bound else { + return self.take_next_block(); + }; + + let block = self.blocks.front_mut().unwrap(); + if let Some(pos) = + block_split_off_position(block.data.as_ref().unwrap(), bound, self.sort_row_offset) + { + block.slice(pos, self.sort_row_offset) + } else { + self.take_next_block() + } + } + + fn take_next_block(&mut self) -> DataBlock { + let mut block = self.blocks.pop_front().unwrap(); + block.data.take().unwrap() + } + + async fn restore_first(&mut self) -> Result<()> { + let block = self.blocks.front_mut().unwrap(); + if block.data.is_some() { + return Ok(()); + } + + let location = block.location.as_ref().unwrap(); + let data = self + .spiller + .read_unmanage_spilled_file(&location.0, &location.1) + .await?; + block.data = Some(if block.processed != 0 { + debug_assert_eq!(block.rows + block.processed, data.num_rows()); + data.slice(block.processed..data.num_rows()) + } else { + data + }); + debug_assert_eq!( + block.domain, + get_domain(sort_column( + block.data.as_ref().unwrap(), + self.sort_row_offset + )) + ); + Ok(()) + } + + fn len(&self) -> usize { + self.blocks.len() + } + + fn is_empty(&self) -> bool { + self.len() == 0 + } + + fn in_memory_rows(&self) -> usize { + self.blocks + .iter() + .map(|b| if b.data.is_some() { b.rows } else { 0 }) + .sum() + } + + async fn spill(&mut self, skip: usize) -> Result<()> { + for b in &mut self + .blocks + .iter_mut() + .skip(skip) + .filter(|b| b.data.is_some()) + { + b.spill(&self.spiller).await?; + } + Ok(()) + } +} + +fn block_split_off_position( + data: &DataBlock, + bound: &R, + sort_row_offset: usize, +) -> Option { + let rows = R::from_column(sort_column(data, sort_row_offset)).unwrap(); + debug_assert!(rows.len() > 0); + debug_assert!(bound.len() == 1); + let bound = bound.row(0); + partition_point(&rows, &bound) +} + +/// partition_point find the first element that is greater than bound +fn partition_point<'a, R: Rows>(list: &'a R, bound: &R::Item<'a>) -> Option { + if *bound >= list.last() { + return None; + } + + let mut size = list.len(); + let mut left = 0; + let mut right = size; + while left < right { + let mid = left + size / 2; + if list.row(mid) <= *bound { + left = mid + 1; + } else { + right = mid; + } + size = right - left; + } + Some(left) +} + +struct DataBlockStream(Option<(DataBlock, Column)>); + +impl SortedStream for DataBlockStream { + fn next(&mut self) -> Result<(Option<(DataBlock, Column)>, bool)> { + Ok((self.0.take(), false)) + } +} + +impl DataBlockStream { + fn new(data: DataBlock, sort_row_offset: usize) -> Self { + let col = sort_column(&data, sort_row_offset).clone(); + Self(Some((data, col))) + } +} + +fn get_domain(col: &Column) -> Column { + match col.len() { + 0 => unreachable!(), + 1 | 2 => col.clone(), + n => { + let mut bitmap = MutableBitmap::with_capacity(n); + bitmap.push(true); + bitmap.extend_constant(n - 2, false); + bitmap.push(true); + + col.filter(&bitmap.freeze()) + } + } +} + +pub fn create_transform_stream_sort_spill( + input: Arc, + output: Arc, + schema: DataSchemaRef, + sort_desc: Arc>, + limit: Option, + spiller: Spiller, + output_order_col: bool, + enable_loser_tree: bool, +) -> Box { + debug_assert!(has_order_field(&schema)); + let mut builder = Builder { + schema, + sort_desc, + input, + output, + output_order_col, + limit, + spiller: Some(spiller), + enable_loser_tree, + processor: None, + }; + select_row_type(&mut builder); + builder.processor.take().unwrap() +} + +struct Builder { + schema: DataSchemaRef, + sort_desc: Arc>, + + input: Arc, + output: Arc, + output_order_col: bool, + limit: Option, + spiller: Option, + enable_loser_tree: bool, + processor: Option>, +} + +impl RowsTypeVisitor for Builder { + fn schema(&self) -> DataSchemaRef { + self.schema.clone() + } + + fn sort_desc(&self) -> &[SortColumnDescription] { + &self.sort_desc + } + + fn visit_type(&mut self) { + let sort_row_offset = self.schema.fields().len() - 1; + let processor: Box = if self.enable_loser_tree { + Box::new(TransformStreamSortSpill::>::new( + self.input.clone(), + self.output.clone(), + self.schema.clone(), + self.limit, + self.spiller.take().unwrap(), + sort_row_offset, + self.output_order_col, + )) + } else { + Box::new(TransformStreamSortSpill::>::new( + self.input.clone(), + self.output.clone(), + self.schema.clone(), + self.limit, + self.spiller.take().unwrap(), + sort_row_offset, + self.output_order_col, + )) + }; + self.processor = Some(processor) + } +} + +#[cfg(test)] +mod tests { + use databend_common_expression::types::DataType; + use databend_common_expression::types::Int32Type; + use databend_common_expression::types::NumberDataType; + use databend_common_expression::types::StringType; + use databend_common_expression::BlockEntry; + use databend_common_expression::DataField; + use databend_common_expression::DataSchemaRefExt; + use databend_common_expression::FromData; + use databend_common_expression::Value; + use databend_common_pipeline_transforms::processors::sort::convert_rows; + use databend_common_pipeline_transforms::processors::sort::SimpleRowsAsc; + use databend_common_pipeline_transforms::sort::SimpleRowsDesc; + use databend_common_storage::DataOperator; + + use super::*; + use crate::spillers::SpillerConfig; + use crate::spillers::SpillerType; + use crate::test_kits::*; + + fn test_data() -> (DataSchemaRef, DataBlock) { + let col1 = Int32Type::from_data(vec![7, 7, 8, 11, 3, 5, 10, 11]); + let col2 = StringType::from_data(vec!["e", "w", "d", "g", "h", "d", "e", "f"]); + + let schema = DataSchemaRefExt::create(vec![ + DataField::new("a", DataType::Number(NumberDataType::Int32)), + DataField::new("b", DataType::String), + ]); + + let block = DataBlock::new_from_columns(vec![col1, col2]); + + (schema, block) + } + + async fn run_bound_block_stream( + spiller: Arc, + sort_desc: Arc>, + bound: Column, + block_part: usize, + want: Column, + ) -> Result<()> { + let (schema, block) = test_data(); + let block = DataBlock::sort(&block, &sort_desc, None)?; + let bound = Some(R::from_column(&bound)?); + let sort_row_offset = schema.fields().len(); + + let blocks = vec![ + block.slice(0..block_part), + block.slice(block_part..block.num_rows()), + ] + .into_iter() + .map(|mut data| { + let col = convert_rows(schema.clone(), &sort_desc, data.clone()).unwrap(); + data.add_column(BlockEntry::new(col.data_type(), Value::Column(col))); + SpillableBlock::new(data, sort_row_offset) + }) + .collect::>(); + + let mut stream = BoundBlockStream:: { + blocks, + bound, + sort_row_offset, + spiller: spiller.clone(), + }; + + let data = stream.take_next_bounded_block(); + let got = sort_column(&data, stream.sort_row_offset).clone(); + assert_eq!(want, got); + + Ok(()) + } + + #[tokio::test] + async fn test_bound_block_stream() -> Result<()> { + let fixture = TestFixture::setup().await?; + let ctx = fixture.new_query_ctx().await?; + + let op = DataOperator::instance().operator(); + let spill_config = SpillerConfig { + spiller_type: SpillerType::OrderBy, + location_prefix: "_spill_test".to_string(), + disk_spill: None, + use_parquet: true, + }; + let spiller = Arc::new(Spiller::create(ctx.clone(), op, spill_config)?); + + { + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 0, + asc: true, + nulls_first: false, + }]); + + run_bound_block_stream::>( + spiller.clone(), + sort_desc.clone(), + Int32Type::from_data(vec![5]), + 4, + Int32Type::from_data(vec![3, 5]), + ) + .await?; + + run_bound_block_stream::>( + spiller.clone(), + sort_desc.clone(), + Int32Type::from_data(vec![8]), + 4, + Int32Type::from_data(vec![3, 5, 7, 7]), + ) + .await?; + } + + { + let sort_desc = Arc::new(vec![SortColumnDescription { + offset: 1, + asc: false, + nulls_first: false, + }]); + + run_bound_block_stream::>( + spiller.clone(), + sort_desc.clone(), + StringType::from_data(vec!["f"]), + 4, + StringType::from_data(vec!["w", "h", "g", "f"]), + ) + .await?; + } + + Ok(()) + } +} diff --git a/src/query/service/src/spillers/mod.rs b/src/query/service/src/spillers/mod.rs index b51bb3a765fa8..35551aafbb8e6 100644 --- a/src/query/service/src/spillers/mod.rs +++ b/src/query/service/src/spillers/mod.rs @@ -18,4 +18,5 @@ mod spiller; pub use partition_buffer::PartitionBuffer; pub use partition_buffer::PartitionBufferFetchOption; +pub use serialize::Layout; pub use spiller::*; diff --git a/src/query/service/src/spillers/serialize.rs b/src/query/service/src/spillers/serialize.rs index 6eec57747d3be..30c851718042a 100644 --- a/src/query/service/src/spillers/serialize.rs +++ b/src/query/service/src/spillers/serialize.rs @@ -39,7 +39,7 @@ use parquet::file::reader::ChunkReader; use parquet::file::reader::Length; use parquet::format::FileMetaData; -#[derive(Clone)] +#[derive(Debug, Clone)] pub enum Layout { ArrowIpc(Box<[usize]>), Parquet, diff --git a/src/query/service/src/spillers/spiller.rs b/src/query/service/src/spillers/spiller.rs index e545d6c89ce08..95878cc38d224 100644 --- a/src/query/service/src/spillers/spiller.rs +++ b/src/query/service/src/spillers/spiller.rs @@ -143,8 +143,17 @@ impl Spiller { self.partition_location.keys().copied().collect() } - /// Spill a [`DataBlock`] to storage. + /// Spill some [`DataBlock`] to storage. These blocks will be concat into one. pub async fn spill(&mut self, data_block: Vec) -> Result { + let (location, layout) = self.spill_unmanage(data_block).await?; + + // Record columns layout for spilled data. + self.columns_layout.insert(location.clone(), layout); + + Ok(location) + } + + pub async fn spill_unmanage(&self, data_block: Vec) -> Result<(Location, Layout)> { debug_assert!(!data_block.is_empty()); let instant = Instant::now(); @@ -163,11 +172,9 @@ impl Spiller { // Record statistics. record_write_profile(&location, &instant, data_size); - // Record columns layout for spilled data. - self.columns_layout - .insert(location.clone(), columns_layout.pop().unwrap()); + let layout = columns_layout.pop().unwrap(); - Ok(location) + Ok((location, layout)) } #[async_backtrace::framed] @@ -254,8 +261,15 @@ impl Spiller { /// Read a certain file to a [`DataBlock`]. /// We should guarantee that the file is managed by this spiller. pub async fn read_spilled_file(&self, location: &Location) -> Result { - let columns_layout = self.columns_layout.get(location).unwrap(); + let layout = self.columns_layout.get(location).unwrap(); + self.read_unmanage_spilled_file(location, layout).await + } + pub async fn read_unmanage_spilled_file( + &self, + location: &Location, + columns_layout: &Layout, + ) -> Result { // Read spilled data from storage. let instant = Instant::now(); let data = match location { @@ -381,7 +395,7 @@ impl Spiller { Ok(deserialize_block(layout, data)) } - async fn write_encodes(&mut self, size: usize, buf: DmaWriteBuf) -> Result { + async fn write_encodes(&self, size: usize, buf: DmaWriteBuf) -> Result { let location = match &self.temp_dir { None => None, Some(disk) => disk.new_file_with_size(size)?.map(Location::Local), diff --git a/src/query/settings/src/settings_default.rs b/src/query/settings/src/settings_default.rs index 542b463be0e29..fb81c1227817b 100644 --- a/src/query/settings/src/settings_default.rs +++ b/src/query/settings/src/settings_default.rs @@ -528,6 +528,12 @@ impl DefaultSettings { mode: SettingMode::Both, range: Some(SettingRange::Numeric(4 * 1024..=u64::MAX)), }), + ("enable_experimental_stream_sort_spilling", DefaultSettingValue { + value: UserSettingValue::UInt64(1), + desc: "Enable experimental stream sort spilling", + mode: SettingMode::Both, + range: Some(SettingRange::Numeric(0..=1)), + }), ("group_by_shuffle_mode", DefaultSettingValue { value: UserSettingValue::String(String::from("before_merge")), desc: "Group by shuffle mode, 'before_partial' is more balanced, but more data needs to exchange.", diff --git a/src/query/settings/src/settings_getter_setter.rs b/src/query/settings/src/settings_getter_setter.rs index e0699ec866df4..d5c6e0f5d20ff 100644 --- a/src/query/settings/src/settings_getter_setter.rs +++ b/src/query/settings/src/settings_getter_setter.rs @@ -480,6 +480,10 @@ impl Settings { Ok(self.try_get_u64("sort_spilling_memory_ratio")? as usize) } + pub fn get_enable_experimental_stream_sort_spilling(&self) -> Result { + Ok(self.try_get_u64("enable_experimental_stream_sort_spilling")? != 0) + } + pub fn get_group_by_shuffle_mode(&self) -> Result { self.try_get_string("group_by_shuffle_mode") } diff --git a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql index 2fb47bcee8fea..fb07f5d5f7ad3 100644 --- a/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql +++ b/tests/suites/0_stateless/20+_others/20_0014_sort_spill.sql @@ -1,6 +1,7 @@ SELECT '==TEST GLOBAL SORT=='; -SET max_vacuum_temp_files_after_query=0; +SET max_vacuum_temp_files_after_query= 0; set sort_spilling_bytes_threshold_per_proc = 8; +set enable_experimental_stream_sort_spilling = 0; DROP TABLE if EXISTS t; DROP TABLE IF EXISTS temp_files_count; @@ -95,5 +96,6 @@ SELECT '==Test f=='; INSERT INTO temp_files_count SELECT COUNT() as count, 8 as number FROM system.temp_files; unset max_vacuum_temp_files_after_query; +unset enable_experimental_stream_sort_spilling; set sort_spilling_bytes_threshold_per_proc = 0; SELECT any_if(count, number = 8) - any_if(count, number = 7) FROM temp_files_count; \ No newline at end of file