Skip to content

Commit d44be63

Browse files
committed
Refactor periodic trigger
Signed-off-by: Jin Hai <[email protected]>
1 parent bea6b38 commit d44be63

File tree

8 files changed

+47
-32
lines changed

8 files changed

+47
-32
lines changed

src/common/blocking_queue.cppm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public:
4040

4141
void NotAllowEnqueue() { allow_enqueue_ = false; }
4242

43-
bool Enqueue(T &task) {
43+
bool Enqueue(const T &task) {
4444
{
4545
if (!allow_enqueue_) {
4646
return false;

src/storage/bg_task/bg_task.cppm

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export struct BGTask {
5050
bool async_{false};
5151

5252
bool complete_{false};
53-
std::mutex mutex_{};
53+
mutable std::mutex mutex_{};
5454
std::condition_variable cv_{};
5555

5656
void Wait() {
@@ -70,6 +70,11 @@ export struct BGTask {
7070
cv_.notify_one();
7171
}
7272

73+
bool IsComplete() const {
74+
std::unique_lock<std::mutex> locker(mutex_);
75+
return complete_;
76+
}
77+
7378
Status result_status_{};
7479

7580
virtual std::string ToString() const = 0;
@@ -132,14 +137,11 @@ public:
132137

133138
export class NotifyOptimizeTask final : public BGTask {
134139
public:
135-
NotifyOptimizeTask(bool new_optimize = false) : BGTask(BGTaskType::kNotifyOptimize, true), new_optimize_(new_optimize) {}
140+
NotifyOptimizeTask(bool new_optimize = false) : BGTask(BGTaskType::kNotifyOptimize, true) {}
136141

137142
~NotifyOptimizeTask() override = default;
138143

139144
std::string ToString() const override { return "NotifyOptimizeTask"; }
140-
141-
public:
142-
bool new_optimize_ = false;
143145
};
144146

145147
export class DumpMemIndexTask final : public BGTask {

src/storage/bg_task/periodic_trigger.cppm

Lines changed: 17 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -27,10 +27,12 @@ import global_resource_usage;
2727
namespace infinity {
2828

2929
class NewCleanupTask;
30+
class NotifyOptimizeTask;
31+
class NotifyCompactTask;
3032

3133
export class PeriodicTrigger {
3234
public:
33-
explicit PeriodicTrigger(i64 interval) : interval_(interval), last_check_(std::chrono::system_clock::now()) {
35+
explicit PeriodicTrigger(const i64 interval) : interval_(interval), last_check_(std::chrono::system_clock::now()) {
3436
#ifdef INFINITY_DEBUG
3537
GlobalResourceUsage::IncrObjectCount("PeriodicTrigger");
3638
#endif
@@ -44,7 +46,7 @@ public:
4446

4547
bool Check();
4648

47-
void UpdateInternal(i64 new_interval) { interval_ = new_interval; }
49+
void UpdateInternal(const i64 new_interval) { interval_ = new_interval; }
4850

4951
virtual void Trigger() = 0;
5052

@@ -58,47 +60,49 @@ protected:
5860

5961
export class NewCleanupPeriodicTrigger final : public PeriodicTrigger {
6062
public:
61-
NewCleanupPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
63+
explicit NewCleanupPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
6264

6365
std::shared_ptr<NewCleanupTask> CreateNewCleanupTask();
6466

65-
virtual void Trigger() override;
67+
void Trigger() override;
6668

6769
private:
6870
//
6971
};
7072

7173
export class CheckpointPeriodicTrigger final : public PeriodicTrigger {
7274
public:
73-
explicit CheckpointPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
75+
explicit CheckpointPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
7476

75-
virtual void Trigger() override;
77+
void Trigger() override;
7678
};
7779

78-
export class CompactSegmentPeriodicTrigger final : public PeriodicTrigger {
80+
export class CompactPeriodicTrigger final : public PeriodicTrigger {
7981
public:
80-
explicit CompactSegmentPeriodicTrigger(i64 interval, CompactionProcessor *compact_processor)
82+
explicit CompactPeriodicTrigger(const i64 interval, CompactionProcessor *compact_processor)
8183
: PeriodicTrigger(interval), compact_processor_(compact_processor) {}
8284

83-
explicit CompactSegmentPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
85+
explicit CompactPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
8486

85-
virtual void Trigger() override;
87+
void Trigger() override;
8688

8789
private:
8890
CompactionProcessor *const compact_processor_{};
91+
std::shared_ptr<NotifyCompactTask> compact_task_{};
8992
};
9093

9194
export class OptimizeIndexPeriodicTrigger final : public PeriodicTrigger {
9295
public:
93-
explicit OptimizeIndexPeriodicTrigger(i64 interval, CompactionProcessor *compact_processor)
96+
explicit OptimizeIndexPeriodicTrigger(const i64 interval, CompactionProcessor *compact_processor)
9497
: PeriodicTrigger(interval), compact_processor_(compact_processor) {}
9598

96-
explicit OptimizeIndexPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
99+
explicit OptimizeIndexPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
97100

98-
virtual void Trigger() override;
101+
void Trigger() override;
99102

100103
private:
101104
CompactionProcessor *const compact_processor_ = nullptr;
105+
std::shared_ptr<NotifyOptimizeTask> optimize_task_{};
102106
};
103107

104108
} // namespace infinity

src/storage/bg_task/periodic_trigger_impl.cpp

Lines changed: 15 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -90,16 +90,25 @@ void CheckpointPeriodicTrigger::Trigger() {
9090
bg_processor->Submit(std::move(checkpoint_task));
9191
}
9292

93-
void CompactSegmentPeriodicTrigger::Trigger() {
94-
LOG_DEBUG(fmt::format("Trigger compact segment task, after {} seconds", duration_.load()));
95-
auto compact_task = std::make_shared<NotifyCompactTask>();
96-
compact_processor_->Submit(std::move(compact_task));
93+
void CompactPeriodicTrigger::Trigger() {
94+
if (compact_task_ != nullptr and !compact_task_->IsComplete()) {
95+
LOG_DEBUG(fmt::format("Skipping compact task, after {} seconds", duration_.load()));
96+
}
97+
98+
LOG_DEBUG(fmt::format("Trigger compact task, after {} seconds", duration_.load()));
99+
compact_task_ = std::make_shared<NotifyCompactTask>();
100+
compact_processor_->Submit(compact_task_);
97101
}
98102

99103
void OptimizeIndexPeriodicTrigger::Trigger() {
104+
105+
if (optimize_task_ != nullptr and !optimize_task_->IsComplete()) {
106+
LOG_DEBUG(fmt::format("Skipping optimize index task, after {} seconds", duration_.load()));
107+
}
108+
100109
LOG_DEBUG(fmt::format("Trigger optimize index task, after {} seconds", duration_.load()));
101-
auto optimize_task = std::make_shared<NotifyOptimizeTask>(true);
102-
compact_processor_->Submit(std::move(optimize_task));
110+
optimize_task_ = std::make_shared<NotifyOptimizeTask>();
111+
compact_processor_->Submit(optimize_task_);
103112
}
104113

105114
} // namespace infinity

src/storage/compaction_process.cppm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ public:
5656

5757
void Stop();
5858

59-
void Submit(std::shared_ptr<BGTask> bg_task);
59+
void Submit(const std::shared_ptr<BGTask> &bg_task);
6060

6161
void NewDoCompact();
6262

src/storage/compaction_process_impl.cpp

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -78,8 +78,8 @@ void CompactionProcessor::Stop() {
7878
LOG_INFO("Compaction processor is stopped.");
7979
}
8080

81-
void CompactionProcessor::Submit(std::shared_ptr<BGTask> bg_task) {
82-
task_queue_.Enqueue(std::move(bg_task));
81+
void CompactionProcessor::Submit(const std::shared_ptr<BGTask>& bg_task) {
82+
task_queue_.Enqueue(bg_task);
8383
++task_count_;
8484
}
8585

src/storage/periodic_trigger_thread.cppm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,7 @@ public:
3333
public:
3434
std::shared_ptr<NewCleanupPeriodicTrigger> new_cleanup_trigger_;
3535
std::shared_ptr<CheckpointPeriodicTrigger> checkpoint_trigger_;
36-
std::shared_ptr<CompactSegmentPeriodicTrigger> compact_segment_trigger_;
36+
std::shared_ptr<CompactPeriodicTrigger> compact_segment_trigger_;
3737
std::shared_ptr<OptimizeIndexPeriodicTrigger> optimize_index_trigger_;
3838

3939
private:

src/storage/storage_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -370,12 +370,12 @@ Status Storage::AdminToWriter() {
370370
periodic_trigger_thread_->new_cleanup_trigger_ = std::make_shared<NewCleanupPeriodicTrigger>(cleanup_interval);
371371

372372
i64 optimize_interval = config_ptr_->OptimizeIndexInterval() > 0 ? config_ptr_->OptimizeIndexInterval() : 0;
373-
periodic_trigger_thread_->optimize_index_trigger_ = std::make_shared<OptimizeIndexPeriodicTrigger>(optimize_interval);
373+
periodic_trigger_thread_->optimize_index_trigger_ = std::make_shared<OptimizeIndexPeriodicTrigger>(optimize_interval, compact_processor_.get());
374374

375375
i64 checkpoint_interval_sec = config_ptr_->CheckpointInterval() > 0 ? config_ptr_->CheckpointInterval() : 0;
376376
periodic_trigger_thread_->checkpoint_trigger_ = std::make_shared<CheckpointPeriodicTrigger>(checkpoint_interval_sec);
377377

378-
periodic_trigger_thread_->compact_segment_trigger_ = std::make_shared<CompactSegmentPeriodicTrigger>(compact_interval);
378+
periodic_trigger_thread_->compact_segment_trigger_ = std::make_shared<CompactPeriodicTrigger>(compact_interval, compact_processor_.get());
379379

380380
periodic_trigger_thread_->Start();
381381

@@ -506,7 +506,7 @@ Status Storage::ReaderToWriter() {
506506
// i64 cleanup_interval = config_ptr_->CleanupInterval() > 0 ? config_ptr_->CleanupInterval() : 0;
507507
i64 checkpoint_interval_sec = config_ptr_->CheckpointInterval() > 0 ? config_ptr_->CheckpointInterval() : 0;
508508
periodic_trigger_thread_->checkpoint_trigger_ = std::make_shared<CheckpointPeriodicTrigger>(checkpoint_interval_sec);
509-
periodic_trigger_thread_->compact_segment_trigger_ = std::make_shared<CompactSegmentPeriodicTrigger>(compact_interval, compact_processor_.get());
509+
periodic_trigger_thread_->compact_segment_trigger_ = std::make_shared<CompactPeriodicTrigger>(compact_interval, compact_processor_.get());
510510
periodic_trigger_thread_->optimize_index_trigger_ = std::make_shared<OptimizeIndexPeriodicTrigger>(optimize_interval, compact_processor_.get());
511511
periodic_trigger_thread_->Start();
512512

0 commit comments

Comments
 (0)