Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(storage): support non_pk_prefix_watermark state cleaning #19889

Open
wants to merge 21 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

3 changes: 3 additions & 0 deletions proto/hummock.proto
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,9 @@ message TableWatermarks {

// The direction of the table watermark.
bool is_ascending = 2;

// The table watermark is non-pk prefix table watermark.
bool is_non_pk_prefix = 3;
}

message EpochNewChangeLog {
Expand Down
12 changes: 12 additions & 0 deletions src/common/src/util/row_serde.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,18 @@ impl OrderedRowSerde {
}
}

#[must_use]
pub fn index(&self, idx: usize) -> Cow<'_, Self> {
if 1 == self.order_types.len() {
Cow::Borrowed(self)
} else {
Cow::Owned(Self {
schema: vec![self.schema[idx].clone()],
order_types: vec![self.order_types[idx]],
})
}
}

/// Note: prefer [`Row::memcmp_serialize`] if possible.
pub fn serialize(&self, row: impl Row, append_to: impl BufMut) {
self.serialize_datums(row.iter(), append_to)
Expand Down
2 changes: 2 additions & 0 deletions src/meta/src/hummock/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ impl CompactStatus {
pub fn is_trivial_reclaim(task: &CompactTask) -> bool {
// Currently all VnodeWatermark tasks are trivial reclaim.
if task.task_type == TaskType::VnodeWatermark {
assert!(task.input_ssts.len() == 2);
assert!(task.input_ssts[1].table_infos.is_empty());
return true;
}
let exist_table_ids = HashSet::<u32>::from_iter(task.existing_table_ids.clone());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,7 @@ fn should_delete_key_by_watermark(
let Some(w) = watermark.vnode_watermarks.get(&vnode) else {
return false;
};
watermark.direction.filter_by_watermark(key, w)
watermark.direction.filter_by_watermark_key(key, w)
}

#[cfg(test)]
Expand Down
43 changes: 42 additions & 1 deletion src/meta/src/hummock/manager/compaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use itertools::Itertools;
use parking_lot::Mutex;
use rand::seq::SliceRandom;
use rand::thread_rng;
use risingwave_common::catalog::TableId;
use risingwave_common::util::epoch::Epoch;
use risingwave_hummock_sdk::compact_task::{CompactTask, ReportTask};
use risingwave_hummock_sdk::compaction_group::StateTableId;
Expand Down Expand Up @@ -728,6 +729,46 @@ impl HummockManager {
}
}

// Filter out the table that has a primary key prefix watermark.
let table_id_with_pk_prefix_watermark: HashSet<_> = self
.metadata_manager
.catalog_controller
.get_table_by_ids(
version
.latest_version()
.table_watermarks
.keys()
.map(|id| id.table_id() as _)
.collect(),
)
.await
.map_err(|e| Error::Internal(e.into()))?
.into_iter()
.filter_map(|table| {
// pk prefix watermark.
if table.clean_watermark_index_in_pk.is_none()
|| table.clean_watermark_index_in_pk.unwrap() == 0
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this assume the pk prefix watermark can only be a single column? Do we have a sanity check somewhere to make sure this assumption holds?

{
Some(TableId::from(table.get_id()))
} else {
None
}
})
.collect();

let table_watermarks = version
.latest_version()
.table_watermarks
.iter()
.filter_map(|(table_id, table_watermarks)| {
if table_id_with_pk_prefix_watermark.contains(table_id) {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We already have a WaterMarkType define in the version, why don't we just use that to filter out table with non pk prefix watermark?

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Also, if we filter out non pk prefix watermark here, how can compactor retrieve the non pk prefix watermark? Based on the logic here, it seems that we rely on the fact that non pk prefix watermark is present in the compact task.

Some((*table_id, table_watermarks.clone()))
} else {
None
}
})
.collect();

while let Some(compact_task) = compact_status.get_compact_task(
version
.latest_version()
Expand All @@ -742,7 +783,7 @@ impl HummockManager {
selector,
&table_id_to_option,
developer_config.clone(),
&version.latest_version().table_watermarks,
&table_watermarks,
&version.latest_version().state_table_info,
) {
let target_level_id = compact_task.input.target_level as u32;
Expand Down
5 changes: 5 additions & 0 deletions src/storage/benches/bench_compactor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,11 +136,13 @@ async fn build_table(
},
);
let table_id_to_vnode = HashMap::from_iter(vec![(0, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);
let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let value = b"1234567890123456789";
let mut full_key = test_key_of(0, epoch, TableId::new(0));
Expand Down Expand Up @@ -186,11 +188,14 @@ async fn build_table_2(
);

let table_id_to_vnode = HashMap::from_iter(vec![(table_id, VirtualNode::COUNT_FOR_TEST)]);
let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let mut builder = SstableBuilder::<_, Xor16FilterBuilder>::for_test(
sstable_object_id,
writer,
opt,
table_id_to_vnode,
table_id_to_watermark_serde,
);
let mut full_key = test_key_of(0, epoch, TableId::new(table_id));
let table_key_len = full_key.user_key.table_key.len();
Expand Down
3 changes: 3 additions & 0 deletions src/storage/benches/bench_merge_iter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@

use std::cell::RefCell;
use std::collections::BTreeMap;
use std::sync::Arc;

use bytes::Bytes;
use criterion::{criterion_group, criterion_main, BenchmarkId, Criterion};
use futures::executor::block_on;
use risingwave_hummock_sdk::key::TableKey;
use risingwave_storage::compaction_catalog_manager::CompactionCatalogAgent;
use risingwave_storage::hummock::iterator::{
Forward, HummockIterator, HummockIteratorUnion, MergeIterator, SkipWatermarkIterator,
};
Expand Down Expand Up @@ -111,6 +113,7 @@ fn criterion_benchmark(c: &mut Criterion) {
let merge_iter = RefCell::new(SkipWatermarkIterator::new(
MergeIterator::new(gen_interleave_shared_buffer_batch_iter(10000, 100)),
BTreeMap::new(),
Arc::new(CompactionCatalogAgent::dummy()),
));
c.bench_with_input(
BenchmarkId::new("bench-merge-iter-skip-empty-watermark", "unordered"),
Expand Down
11 changes: 10 additions & 1 deletion src/storage/benches/bench_multi_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,16 @@ impl<F: SstableWriterFactory> TableBuilderFactory for LocalTableBuilderFactory<F
TableId::default().into(),
VirtualNode::COUNT_FOR_TEST,
)]);
let builder = SstableBuilder::for_test(id, writer, self.options.clone(), table_id_to_vnode);

let table_id_to_watermark_serde = HashMap::from_iter(vec![(0, None)]);

let builder = SstableBuilder::for_test(
id,
writer,
self.options.clone(),
table_id_to_vnode,
table_id_to_watermark_serde,
);

Ok(builder)
}
Expand Down
3 changes: 2 additions & 1 deletion src/storage/benches/bench_table_watermarks.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use risingwave_common::hash::VirtualNode;
use risingwave_common::util::epoch::test_epoch;
use risingwave_hummock_sdk::compaction_group::StaticCompactionGroupId;
use risingwave_hummock_sdk::table_watermark::{
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection,
TableWatermarks, TableWatermarksIndex, VnodeWatermark, WatermarkDirection, WatermarkSerdeType,
};
use risingwave_hummock_sdk::version::{HummockVersion, HummockVersionStateTableInfo};
use risingwave_hummock_sdk::HummockEpoch;
Expand Down Expand Up @@ -101,6 +101,7 @@ fn gen_committed_table_watermarks(
})
.collect(),
direction: WatermarkDirection::Ascending,
watermark_type: WatermarkSerdeType::PkPrefix,
}
}

Expand Down
1 change: 1 addition & 0 deletions src/storage/hummock_sdk/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ license = { workspace = true }
repository = { workspace = true }

[dependencies]
bincode = { version = "=2.0.0-rc.3", features = ["serde"] }
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. See comments above.

bytes = "1"
hex = "0.4"
itertools = { workspace = true }
Expand Down
45 changes: 44 additions & 1 deletion src/storage/hummock_sdk/src/compact_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

use std::collections::{BTreeMap, HashMap};
use std::collections::{BTreeMap, HashMap, HashSet};
use std::mem::size_of;

use itertools::Itertools;
Expand All @@ -22,6 +22,7 @@ use risingwave_pb::hummock::{
PbCompactTask, PbKeyRange, PbTableOption, PbTableSchema, PbTableStats, PbValidationTask,
};

use crate::compaction_group::StateTableId;
use crate::key_range::KeyRange;
use crate::level::InputLevel;
use crate::sstable_info::SstableInfo;
Expand Down Expand Up @@ -114,6 +115,48 @@ impl CompactTask {
}
}

impl CompactTask {
// The compact task may need to reclaim key with TTL
pub fn is_contains_ttl(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: contains_ttl

self.table_options
.iter()
.any(|(_, table_option)| table_option.retention_seconds.is_some_and(|ttl| ttl > 0))
}

// The compact task may need to reclaim key with range tombstone
pub fn is_contains_range_tombstone(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: contains_range_tombstone

self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.range_tombstone_count > 0)
}

// The compact task may need to reclaim key with split sst
pub fn is_contains_split_sst(&self) -> bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename: contains_split_sst

self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.any(|sst| sst.sst_id != sst.object_id)
}

pub fn get_table_ids_from_input_ssts(&self) -> impl Iterator<Item = StateTableId> {
self.input_ssts
.iter()
.flat_map(|level| level.table_infos.iter())
.flat_map(|sst| sst.table_ids.clone())
.sorted()
.unique()
}

// filter the table-id that in existing_table_ids with the table-id in compact-task
pub fn build_compact_table_ids(&self) -> Vec<StateTableId> {
let existing_table_ids: HashSet<u32> = HashSet::from_iter(self.existing_table_ids.clone());
self.get_table_ids_from_input_ssts()
.filter(|table_id| existing_table_ids.contains(table_id))
.collect()
}
}

impl From<PbCompactTask> for CompactTask {
#[expect(deprecated)]
fn from(pb_compact_task: PbCompactTask) -> Self {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ pub fn safe_epoch_table_watermarks_impl(
Some(TableWatermarks {
watermarks: vec![(*first_epoch, first_epoch_watermark.clone())],
direction: table_watermarks.direction,
watermark_type: table_watermarks.watermark_type,
})
} else {
None
Expand Down
Loading
Loading