diff --git a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs index aa136bf99a10e..67c19e39e063d 100644 --- a/src/query/storages/fuse/src/operations/common/generators/append_generator.rs +++ b/src/query/storages/fuse/src/operations/common/generators/append_generator.rs @@ -57,6 +57,10 @@ impl AppendGenerator { } } + pub fn is_overwrite(&self) -> bool { + self.overwrite + } + fn check_fill_default(&self, summary: &Statistics) -> Result { let mut fill_default_values = false; // check if need to fill default value in statistics diff --git a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs index 5b19b543bc328..95696837efd16 100644 --- a/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs +++ b/src/query/storages/fuse/src/operations/common/processors/sink_commit.rs @@ -62,6 +62,7 @@ use crate::FuseTable; use crate::io::TableMetaLocationGenerator; use crate::operations::AppendGenerator; use crate::operations::CommitMeta; +use crate::operations::ConflictResolveContext; use crate::operations::MutationGenerator; use crate::operations::SnapshotGenerator; use crate::operations::TransformMergeCommitMeta; @@ -121,6 +122,10 @@ pub struct CommitSink { deduplicated_label: Option, table_meta_timestamps: TableMetaTimestamps, vacuum_handler: Option>, + // Tracks whether the ongoing mutation produced no physical changes. + // We still need to read the previous snapshot before deciding to skip the commit, + // because new tables must record their first snapshot even for empty writes. + pending_noop_commit: bool, } #[derive(Debug)] @@ -183,6 +188,7 @@ where F: SnapshotGenerator + Send + Sync + 'static deduplicated_label, table_meta_timestamps, vacuum_handler, + pending_noop_commit: false, }))) } @@ -287,21 +293,45 @@ where F: SnapshotGenerator + Send + Sync + 'static let meta = CommitMeta::downcast_from(input_meta) .ok_or_else(|| ErrorCode::Internal("No commit meta. It's a bug"))?; - self.new_segment_locs = meta.new_segment_locs; + let CommitMeta { + conflict_resolve_context, + new_segment_locs, + table_id: _, + virtual_schema, + hll, + } = meta; - self.new_virtual_schema = meta.virtual_schema; + let has_new_segments = !new_segment_locs.is_empty(); + let has_virtual_schema = virtual_schema.is_some(); + let has_hll = !hll.is_empty(); - if !meta.hll.is_empty() { + self.new_segment_locs = new_segment_locs; + + self.new_virtual_schema = virtual_schema; + + if has_hll { let binding = self.ctx.get_mutation_status(); let status = binding.read(); self.insert_rows = status.insert_rows + status.update_rows; - self.insert_hll = meta.hll; + self.insert_hll = hll; } self.backoff = set_backoff(None, None, self.max_retry_elapsed); + // Decide whether this mutation ended up as a no-op. We postpone the actual + // "skip commit" decision until `State::FillDefault`, after we know whether + // the table already has a snapshot. + self.pending_noop_commit = Self::should_skip_commit( + &conflict_resolve_context, + has_new_segments, + has_virtual_schema, + has_hll, + self.allow_append_only_skip(), + ); + self.snapshot_gen - .set_conflict_resolve_context(meta.conflict_resolve_context); + .set_conflict_resolve_context(conflict_resolve_context); + self.state = State::FillDefault; Ok(Event::Async) @@ -318,6 +348,30 @@ where F: SnapshotGenerator + Send + Sync + 'static .is_some_and(|generator| matches!(generator.mode(), TruncateMode::DropAll)) } + fn should_skip_commit( + ctx: &ConflictResolveContext, + has_new_segments: bool, + has_virtual_schema: bool, + has_new_hll: bool, + allow_append_only_skip: bool, + ) -> bool { + if has_new_segments || has_virtual_schema || has_new_hll { + return false; + } + + match ctx { + ConflictResolveContext::ModifiedSegmentExistsInLatest(changes) => { + changes.appended_segments.is_empty() + && changes.replaced_segments.is_empty() + && changes.removed_segment_indexes.is_empty() + } + ConflictResolveContext::AppendOnly((merged, _)) => { + allow_append_only_skip && merged.merged_segments.is_empty() + } + _ => false, + } + } + fn need_truncate(&self) -> bool { self.snapshot_gen .as_any() @@ -332,6 +386,17 @@ where F: SnapshotGenerator + Send + Sync + 'static .is_some() } + /// Append-only inserts (e.g. `INSERT INTO t SELECT ...`) may skip committing if + /// nothing was written. Overwrite/CTAS (`CREATE OR REPLACE TABLE t AS SELECT ...` + /// or `INSERT OVERWRITE ...`) still need a snapshot even when nothing was written, + /// so we disable skipping when `AppendGenerator` is in overwrite mode. + fn allow_append_only_skip(&self) -> bool { + self.snapshot_gen + .as_any() + .downcast_ref::() + .is_some_and(|g| !g.is_overwrite()) + } + async fn clean_history(&self, purge_mode: &PurgeMode) -> Result<()> { { let table_info = self.table.get_table_info(); @@ -496,6 +561,24 @@ where F: SnapshotGenerator + Send + Sync + 'static // if table_id not match, update table meta will fail let mut table_info = fuse_table.table_info.clone(); + let require_initial_snapshot = self.table.is_temp(); + // Only skip when both conditions hold: + // 1) the mutation touched nothing (`pending_noop_commit` is true). + // 2) the table already has a snapshot, or it's safe to skip the initial snapshot. + // CTAS-created temporary tables must still commit even when the SELECT returns zero rows, + // because `system.temporary_tables` currently depends on the committed table meta to show + // correct statistics. + let skip_commit = + self.pending_noop_commit && (previous.is_some() || !require_initial_snapshot); + // Reset the flag so subsequent mutations (or retries) re-evaluate their own no-op status. + self.pending_noop_commit = false; + if skip_commit { + self.ctx + .set_status_info("No table changes detected, skip commit"); + self.state = State::Finish; + return Ok(()); + } + // merge virtual schema let old_virtual_schema = std::mem::take(&mut table_info.meta.virtual_schema); let new_virtual_schema = std::mem::take(&mut self.new_virtual_schema); diff --git a/tests/sqllogictests/suites/base/issues/issue_19173.test b/tests/sqllogictests/suites/base/issues/issue_19173.test new file mode 100644 index 0000000000000..bd7ac9855af0d --- /dev/null +++ b/tests/sqllogictests/suites/base/issues/issue_19173.test @@ -0,0 +1,105 @@ +# https://github.com/datafuselabs/databend/issues/19173 + +statement ok +create or replace database issue_19173; + +statement ok +use issue_19173; + +statement ok +create or replace table target (c int); + +statement ok +create or replace table empty like target; + +# mutations in autocommit mode + +statement ok +merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +delete from target where c = 1; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +update target set c =2 where c = 1; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +insert into target select * from empty; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +# mutations inside explicit transactions + +statement ok +begin transaction; + +statement ok +merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *; + +statement ok +commit; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +begin transaction; + +statement ok +delete from target where c = 1; + +statement ok +commit; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +begin transaction; + +statement ok +update target set c =2 where c = 1; + +statement ok +commit; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0 + +statement ok +begin transaction; + +statement ok +insert into target select * from empty; + +statement ok +commit; + +query I +select count() from fuse_snapshot('issue_19173', 'target'); +---- +0