Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,10 @@ impl AppendGenerator {
}
}

pub fn is_overwrite(&self) -> bool {
self.overwrite
}

fn check_fill_default(&self, summary: &Statistics) -> Result<bool> {
let mut fill_default_values = false;
// check if need to fill default value in statistics
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ use crate::FuseTable;
use crate::io::TableMetaLocationGenerator;
use crate::operations::AppendGenerator;
use crate::operations::CommitMeta;
use crate::operations::ConflictResolveContext;
use crate::operations::MutationGenerator;
use crate::operations::SnapshotGenerator;
use crate::operations::TransformMergeCommitMeta;
Expand Down Expand Up @@ -121,6 +122,10 @@ pub struct CommitSink<F: SnapshotGenerator> {
deduplicated_label: Option<String>,
table_meta_timestamps: TableMetaTimestamps,
vacuum_handler: Option<Arc<VacuumHandlerWrapper>>,
// Tracks whether the ongoing mutation produced no physical changes.
// We still need to read the previous snapshot before deciding to skip the commit,
// because new tables must record their first snapshot even for empty writes.
pending_noop_commit: bool,
}

#[derive(Debug)]
Expand Down Expand Up @@ -183,6 +188,7 @@ where F: SnapshotGenerator + Send + Sync + 'static
deduplicated_label,
table_meta_timestamps,
vacuum_handler,
pending_noop_commit: false,
})))
}

Expand Down Expand Up @@ -287,21 +293,45 @@ where F: SnapshotGenerator + Send + Sync + 'static
let meta = CommitMeta::downcast_from(input_meta)
.ok_or_else(|| ErrorCode::Internal("No commit meta. It's a bug"))?;

self.new_segment_locs = meta.new_segment_locs;
let CommitMeta {
conflict_resolve_context,
new_segment_locs,
table_id: _,
virtual_schema,
hll,
} = meta;

self.new_virtual_schema = meta.virtual_schema;
let has_new_segments = !new_segment_locs.is_empty();
let has_virtual_schema = virtual_schema.is_some();
let has_hll = !hll.is_empty();

if !meta.hll.is_empty() {
self.new_segment_locs = new_segment_locs;

self.new_virtual_schema = virtual_schema;

if has_hll {
let binding = self.ctx.get_mutation_status();
let status = binding.read();
self.insert_rows = status.insert_rows + status.update_rows;
self.insert_hll = meta.hll;
self.insert_hll = hll;
}

self.backoff = set_backoff(None, None, self.max_retry_elapsed);

// Decide whether this mutation ended up as a no-op. We postpone the actual
// "skip commit" decision until `State::FillDefault`, after we know whether
// the table already has a snapshot.
self.pending_noop_commit = Self::should_skip_commit(
&conflict_resolve_context,
has_new_segments,
has_virtual_schema,
has_hll,
self.allow_append_only_skip(),
);

self.snapshot_gen
.set_conflict_resolve_context(meta.conflict_resolve_context);
.set_conflict_resolve_context(conflict_resolve_context);

self.state = State::FillDefault;

Ok(Event::Async)
Expand All @@ -318,6 +348,30 @@ where F: SnapshotGenerator + Send + Sync + 'static
.is_some_and(|generator| matches!(generator.mode(), TruncateMode::DropAll))
}

fn should_skip_commit(
ctx: &ConflictResolveContext,
has_new_segments: bool,
has_virtual_schema: bool,
has_new_hll: bool,
allow_append_only_skip: bool,
) -> bool {
if has_new_segments || has_virtual_schema || has_new_hll {
return false;
}

match ctx {
ConflictResolveContext::ModifiedSegmentExistsInLatest(changes) => {
changes.appended_segments.is_empty()
&& changes.replaced_segments.is_empty()
&& changes.removed_segment_indexes.is_empty()
}
ConflictResolveContext::AppendOnly((merged, _)) => {
allow_append_only_skip && merged.merged_segments.is_empty()
}
_ => false,
}
}

fn need_truncate(&self) -> bool {
self.snapshot_gen
.as_any()
Expand All @@ -332,6 +386,17 @@ where F: SnapshotGenerator + Send + Sync + 'static
.is_some()
}

/// Append-only inserts (e.g. `INSERT INTO t SELECT ...`) may skip committing if
/// nothing was written. Overwrite/CTAS (`CREATE OR REPLACE TABLE t AS SELECT ...`
/// or `INSERT OVERWRITE ...`) still need a snapshot even when nothing was written,
/// so we disable skipping when `AppendGenerator` is in overwrite mode.
fn allow_append_only_skip(&self) -> bool {
self.snapshot_gen
.as_any()
.downcast_ref::<AppendGenerator>()
.is_some_and(|g| !g.is_overwrite())
}

async fn clean_history(&self, purge_mode: &PurgeMode) -> Result<()> {
{
let table_info = self.table.get_table_info();
Expand Down Expand Up @@ -496,6 +561,24 @@ where F: SnapshotGenerator + Send + Sync + 'static
// if table_id not match, update table meta will fail
let mut table_info = fuse_table.table_info.clone();

let require_initial_snapshot = self.table.is_temp();
// Only skip when both conditions hold:
// 1) the mutation touched nothing (`pending_noop_commit` is true).
// 2) the table already has a snapshot, or it's safe to skip the initial snapshot.
// CTAS-created temporary tables must still commit even when the SELECT returns zero rows,
// because `system.temporary_tables` currently depends on the committed table meta to show
// correct statistics.
let skip_commit =
self.pending_noop_commit && (previous.is_some() || !require_initial_snapshot);
// Reset the flag so subsequent mutations (or retries) re-evaluate their own no-op status.
self.pending_noop_commit = false;
if skip_commit {
self.ctx
.set_status_info("No table changes detected, skip commit");
self.state = State::Finish;
return Ok(());
}

// merge virtual schema
let old_virtual_schema = std::mem::take(&mut table_info.meta.virtual_schema);
let new_virtual_schema = std::mem::take(&mut self.new_virtual_schema);
Expand Down
105 changes: 105 additions & 0 deletions tests/sqllogictests/suites/base/issues/issue_19173.test
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
# https://github.com/datafuselabs/databend/issues/19173

statement ok
create or replace database issue_19173;

statement ok
use issue_19173;

statement ok
create or replace table target (c int);

statement ok
create or replace table empty like target;

# mutations in autocommit mode

statement ok
merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
delete from target where c = 1;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
update target set c =2 where c = 1;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
insert into target select * from empty;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

# mutations inside explicit transactions

statement ok
begin transaction;

statement ok
merge into target using (select c from empty) s on s.c = target.c when matched then update * when not matched then insert *;

statement ok
commit;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
begin transaction;

statement ok
delete from target where c = 1;

statement ok
commit;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
begin transaction;

statement ok
update target set c =2 where c = 1;

statement ok
commit;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0

statement ok
begin transaction;

statement ok
insert into target select * from empty;

statement ok
commit;

query I
select count() from fuse_snapshot('issue_19173', 'target');
----
0
Loading