Skip to content

Commit d38c91c

Browse files
authored
fix(storge): write progress in compact hook (#16901)
* fix write progress in compact hook * fix test
1 parent 1cb0ebe commit d38c91c

File tree

2 files changed

+44
-36
lines changed

2 files changed

+44
-36
lines changed

src/query/service/src/interpreters/hook/compact_hook.rs

Lines changed: 32 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -78,43 +78,50 @@ async fn do_hook_compact(
7878
}
7979

8080
pipeline.set_on_finished(move |info: &ExecutionInfo| {
81-
let compaction_limits = match compact_target.mutation_kind {
82-
MutationKind::Insert => {
83-
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
84-
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
85-
if compaction_num_block_hint == 0 {
86-
return Ok(());
87-
}
88-
CompactionLimits {
89-
segment_limit: None,
90-
block_limit: Some(compaction_num_block_hint as usize),
81+
if info.res.is_ok() {
82+
let op_name = &trace_ctx.operation_name;
83+
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
84+
info!("execute {op_name} finished successfully. running table optimization job.");
85+
86+
let compact_start_at = Instant::now();
87+
let compaction_limits = match compact_target.mutation_kind {
88+
MutationKind::Insert => {
89+
let compaction_num_block_hint = ctx.get_compaction_num_block_hint(&compact_target.table);
90+
info!("table {} hint number of blocks need to be compacted {}", compact_target.table, compaction_num_block_hint);
91+
if compaction_num_block_hint == 0 {
92+
return Ok(());
93+
}
94+
CompactionLimits {
95+
segment_limit: None,
96+
block_limit: Some(compaction_num_block_hint as usize),
97+
}
9198
}
92-
}
93-
_ => {
94-
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
95-
CompactionLimits {
96-
segment_limit: Some(auto_compaction_segments_limit as usize),
97-
block_limit: None,
99+
_ => {
100+
let auto_compaction_segments_limit = ctx.get_settings().get_auto_compaction_segments_limit()?;
101+
CompactionLimits {
102+
segment_limit: Some(auto_compaction_segments_limit as usize),
103+
block_limit: None,
104+
}
98105
}
99-
}
100-
};
106+
};
101107

102-
let op_name = &trace_ctx.operation_name;
103-
metrics_inc_compact_hook_main_operation_time_ms(op_name, trace_ctx.start.elapsed().as_millis() as u64);
108+
// keep the original progress value
109+
let progress = ctx.get_write_progress();
110+
let progress_value = progress.as_ref().get_values();
104111

105-
let compact_start_at = Instant::now();
106-
if info.res.is_ok() {
107-
info!("execute {op_name} finished successfully. running table optimization job.");
108112
match GlobalIORuntime::instance().block_on({
109113
compact_table(ctx, compact_target, compaction_limits, lock_opt)
110114
}) {
111115
Ok(_) => {
112116
info!("execute {op_name} finished successfully. table optimization job finished.");
113117
}
114-
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e) }
118+
Err(e) => { info!("execute {op_name} finished successfully. table optimization job failed. {:?}", e); }
115119
}
120+
121+
// reset the progress value
122+
progress.set(&progress_value);
123+
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
116124
}
117-
metrics_inc_compact_hook_compact_time_ms(&trace_ctx.operation_name, compact_start_at.elapsed().as_millis() as u64);
118125

119126
Ok(())
120127
});
@@ -139,8 +146,6 @@ async fn compact_table(
139146
)
140147
.await?;
141148
let settings = ctx.get_settings();
142-
// keep the original progress value
143-
let progress_value = ctx.get_write_progress_value();
144149

145150
let do_recluster = !table.cluster_keys(ctx.clone()).is_empty();
146151
let do_compact = compaction_limits.block_limit.is_some() || !do_recluster;
@@ -203,7 +208,5 @@ async fn compact_table(
203208
assert!(build_res.main_pipeline.is_empty());
204209
}
205210

206-
// reset the progress value
207-
ctx.get_write_progress().set(&progress_value);
208211
Ok(())
209212
}

tests/sqllogictests/suites/base/09_fuse_engine/09_0041_auto_compaction.test

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,19 @@ insert into t1 values(3),(5),(8);
9090
statement ok
9191
insert into t1 values(4),(6);
9292

93-
query III
94-
select segment_count, block_count, row_count from fuse_snapshot('i15760', 't1') limit 10;
93+
query II
94+
select segment_count, row_count from fuse_snapshot('i15760', 't1') limit 10;
9595
----
96-
1 1 8
97-
1 2 8
98-
3 3 8
99-
2 2 6
100-
1 1 3
96+
1 8
97+
1 8
98+
3 8
99+
2 6
100+
1 3
101+
102+
query F
103+
select average_depth from clustering_information('i15760', 't1')
104+
----
105+
1.0
101106

102107
statement ok
103108
drop table t1 all;

0 commit comments

Comments
 (0)