Skip to content

Commit b0b2660

Browse files
authored
Refactor code (#2987)
### What problem does this PR solve? 1. Refactor code 2. Make txn_context can be viewed in debug mode. ### Type of change - [x] Refactoring --------- Signed-off-by: Jin Hai <[email protected]>
1 parent 7316618 commit b0b2660

28 files changed

+248
-224
lines changed

src/common/blocking_queue.cppm

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ public:
4040

4141
void NotAllowEnqueue() { allow_enqueue_ = false; }
4242

43-
bool Enqueue(T &task) {
43+
bool Enqueue(const T &task) {
4444
{
4545
if (!allow_enqueue_) {
4646
return false;

src/executor/operator/physical_command_impl.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -299,7 +299,7 @@ bool PhysicalCommand::Execute(QueryContext *query_context, OperatorState *operat
299299
Status status = Status::InvalidCommand(fmt::format("Attempt to set compact segment interval: {}", interval));
300300
RecoverableError(status);
301301
}
302-
query_context->storage()->periodic_trigger_thread()->compact_segment_trigger_->UpdateInternal(interval);
302+
query_context->storage()->periodic_trigger_thread()->compact_trigger_->UpdateInternal(interval);
303303
config->SetCompactInterval(interval);
304304
break;
305305
}

src/storage/background_process.cppm

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@ import :blocking_queue;
1818

1919
namespace infinity {
2020

21-
class CleanupPeriodicTrigger;
2221
class BGTask;
2322

2423
export class BGTaskProcessor {

src/storage/background_process_impl.cpp

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -75,7 +75,7 @@ void BGTaskProcessor::Process() {
7575
running = false;
7676
break;
7777
}
78-
case BGTaskType::kNewCheckpoint: {
78+
case BGTaskType::kCheckpoint: {
7979
StorageMode storage_mode = InfinityContext::instance().storage()->GetStorageMode();
8080
if (storage_mode == StorageMode::kUnInitialized) {
8181
UnrecoverableError("Uninitialized storage mode");
@@ -102,7 +102,7 @@ void BGTaskProcessor::Process() {
102102
}
103103
break;
104104
}
105-
case BGTaskType::kNewCleanup: {
105+
case BGTaskType::kCleanup: {
106106
StorageMode storage_mode = InfinityContext::instance().storage()->GetStorageMode();
107107
if (storage_mode == StorageMode::kUnInitialized) {
108108
UnrecoverableError("Uninitialized storage mode");
@@ -122,8 +122,8 @@ void BGTaskProcessor::Process() {
122122
CleanupTxnStore *cleanup_txn_store = static_cast<CleanupTxnStore *>(new_txn_shared->GetTxnStore());
123123
if (cleanup_txn_store != nullptr) {
124124
TxnTimeStamp clean_ts = cleanup_txn_store->timestamp_;
125-
std::shared_ptr<BGTaskInfo> bg_task_info = std::make_shared<BGTaskInfo>(BGTaskType::kNewCleanup);
126-
std::string task_text = fmt::format("NewCleanup task, cleanup timestamp: {}", clean_ts);
125+
std::shared_ptr<BGTaskInfo> bg_task_info = std::make_shared<BGTaskInfo>(BGTaskType::kCleanup);
126+
std::string task_text = fmt::format("Cleanup task, cleanup timestamp: {}", clean_ts);
127127
bg_task_info->task_info_list_.emplace_back(task_text);
128128
if (status.ok()) {
129129
bg_task_info->status_list_.emplace_back("OK");
@@ -133,7 +133,7 @@ void BGTaskProcessor::Process() {
133133
}
134134
new_txn_mgr->AddTaskInfo(bg_task_info);
135135
}
136-
LOG_DEBUG("NewCleanup task in background done");
136+
LOG_DEBUG("Cleanup task in background done");
137137
}
138138
break;
139139
}

src/storage/bg_task/bg_task.cppm

Lines changed: 11 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -50,7 +50,7 @@ export struct BGTask {
5050
bool async_{false};
5151

5252
bool complete_{false};
53-
std::mutex mutex_{};
53+
mutable std::mutex mutex_{};
5454
std::condition_variable cv_{};
5555

5656
void Wait() {
@@ -70,6 +70,11 @@ export struct BGTask {
7070
cv_.notify_one();
7171
}
7272

73+
bool IsComplete() const {
74+
std::unique_lock<std::mutex> locker(mutex_);
75+
return complete_;
76+
}
77+
7378
Status result_status_{};
7479

7580
virtual std::string ToString() const = 0;
@@ -88,7 +93,7 @@ export struct CheckpointTaskBase : public BGTask {
8893
};
8994

9095
export struct NewCheckpointTask final : public CheckpointTaskBase {
91-
NewCheckpointTask(i64 wal_size) : CheckpointTaskBase(BGTaskType::kNewCheckpoint, false), wal_size_(wal_size) {}
96+
NewCheckpointTask(i64 wal_size) : CheckpointTaskBase(BGTaskType::kCheckpoint, false), wal_size_(wal_size) {}
9297

9398
std::string ToString() const final { return "New catalog"; }
9499

@@ -99,11 +104,11 @@ export struct NewCheckpointTask final : public CheckpointTaskBase {
99104
i64 wal_size_{};
100105
};
101106

102-
export class NewCleanupTask final : public BGTask {
107+
export class CleanupTask final : public BGTask {
103108
public:
104-
NewCleanupTask() : BGTask(BGTaskType::kNewCleanup, false) {}
109+
CleanupTask() : BGTask(BGTaskType::kCleanup, false) {}
105110

106-
std::string ToString() const override { return "NewCleanupTask"; }
111+
std::string ToString() const override { return "CleanupTask"; }
107112

108113
Status Execute(TxnTimeStamp last_cleanup_ts, TxnTimeStamp &cur_cleanup_ts);
109114

@@ -132,14 +137,11 @@ public:
132137

133138
export class NotifyOptimizeTask final : public BGTask {
134139
public:
135-
NotifyOptimizeTask(bool new_optimize = false) : BGTask(BGTaskType::kNotifyOptimize, true), new_optimize_(new_optimize) {}
140+
NotifyOptimizeTask(bool new_optimize = false) : BGTask(BGTaskType::kNotifyOptimize, true) {}
136141

137142
~NotifyOptimizeTask() override = default;
138143

139144
std::string ToString() const override { return "NotifyOptimizeTask"; }
140-
141-
public:
142-
bool new_optimize_ = false;
143145
};
144146

145147
export class DumpMemIndexTask final : public BGTask {

src/storage/bg_task/bg_task_impl.cpp

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,7 @@ Status NewCheckpointTask::ExecuteWithNewTxn() {
5757

5858
auto *ckp_idx_store = static_cast<CheckpointTxnStore *>(new_txn_shared->GetTxnStore());
5959
if (ckp_idx_store != nullptr) {
60-
std::shared_ptr<BGTaskInfo> bg_task_info = std::make_shared<BGTaskInfo>(BGTaskType::kNewCheckpoint);
60+
std::shared_ptr<BGTaskInfo> bg_task_info = std::make_shared<BGTaskInfo>(BGTaskType::kCheckpoint);
6161
for (const std::shared_ptr<FlushDataEntry> &flush_data_entry : ckp_idx_store->entries_) {
6262
std::string task_text = fmt::format("Txn: {}, commit: {}, checkpoint data: {}.{}.{}.{} {}",
6363
new_txn_shared->TxnID(),
@@ -76,7 +76,7 @@ Status NewCheckpointTask::ExecuteWithNewTxn() {
7676
return status;
7777
}
7878

79-
Status NewCleanupTask::Execute(TxnTimeStamp last_cleanup_ts, TxnTimeStamp &cur_cleanup_ts) {
79+
Status CleanupTask::Execute(TxnTimeStamp last_cleanup_ts, TxnTimeStamp &cur_cleanup_ts) {
8080
auto *new_txn_mgr = InfinityContext::instance().storage()->new_txn_manager();
8181
auto *txn = new_txn_mgr->BeginTxn(std::make_unique<std::string>("cleanup"), TransactionType::kCleanup);
8282
Status status = txn->Cleanup();
@@ -88,7 +88,7 @@ Status NewCleanupTask::Execute(TxnTimeStamp last_cleanup_ts, TxnTimeStamp &cur_c
8888
}
8989

9090
NewCompactTask::NewCompactTask(NewTxn *new_txn, std::string db_name, std::string table_name)
91-
: BGTask(BGTaskType::kNewCompact, false), new_txn_(new_txn), db_name_(db_name), table_name_(table_name) {}
91+
: BGTask(BGTaskType::kCompact, false), new_txn_(new_txn), db_name_(db_name), table_name_(table_name) {}
9292

9393
DumpMemIndexTask::DumpMemIndexTask(const std::string &db_name,
9494
const std::string &table_name,

src/storage/bg_task/bg_task_type.cppm

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -27,11 +27,11 @@ namespace infinity {
2727

2828
export enum class BGTaskType {
2929
kStopProcessor,
30-
kNewCheckpoint,
30+
kCheckpoint,
3131
kNotifyCompact,
32-
kNewCompact,
32+
kCompact,
3333
kNotifyOptimize,
34-
kNewCleanup,
34+
kCleanup,
3535
kUpdateSegmentBloomFilterData, // Not used
3636
kDumpMemIndex,
3737
kAppendMemIndex,

src/storage/bg_task/bg_task_type_impl.cpp

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -34,24 +34,24 @@ std::string ToString(BGTaskType type) {
3434
return "StopProcessor";
3535
break;
3636
}
37-
case BGTaskType::kNewCheckpoint: {
38-
return "NewCheckpoint";
37+
case BGTaskType::kCheckpoint: {
38+
return "Checkpoint";
3939
break;
4040
}
4141
case BGTaskType::kNotifyCompact: {
4242
return "NotifyCompact";
4343
break;
4444
}
45-
case BGTaskType::kNewCompact: {
46-
return "NewCompact";
45+
case BGTaskType::kCompact: {
46+
return "Compact";
4747
break;
4848
}
4949
case BGTaskType::kNotifyOptimize: {
5050
return "NotifyOptimize";
5151
break;
5252
}
53-
case BGTaskType::kNewCleanup: {
54-
return "NewCleanup";
53+
case BGTaskType::kCleanup: {
54+
return "Cleanup";
5555
break;
5656
}
5757
case BGTaskType::kUpdateSegmentBloomFilterData: {

src/storage/bg_task/periodic_trigger.cppm

Lines changed: 22 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -26,11 +26,13 @@ import global_resource_usage;
2626

2727
namespace infinity {
2828

29-
class NewCleanupTask;
29+
class CleanupTask;
30+
class NotifyOptimizeTask;
31+
class NotifyCompactTask;
3032

3133
export class PeriodicTrigger {
3234
public:
33-
explicit PeriodicTrigger(i64 interval) : interval_(interval), last_check_(std::chrono::system_clock::now()) {
35+
explicit PeriodicTrigger(const i64 interval) : interval_(interval), last_check_(std::chrono::system_clock::now()) {
3436
#ifdef INFINITY_DEBUG
3537
GlobalResourceUsage::IncrObjectCount("PeriodicTrigger");
3638
#endif
@@ -44,7 +46,7 @@ public:
4446

4547
bool Check();
4648

47-
void UpdateInternal(i64 new_interval) { interval_ = new_interval; }
49+
void UpdateInternal(const i64 new_interval) { interval_ = new_interval; }
4850

4951
virtual void Trigger() = 0;
5052

@@ -56,51 +58,48 @@ protected:
5658
std::atomic_int64_t duration_{0};
5759
};
5860

59-
export class NewCleanupPeriodicTrigger final : public PeriodicTrigger {
61+
export class CleanupPeriodicTrigger final : public PeriodicTrigger {
6062
public:
61-
NewCleanupPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
63+
explicit CleanupPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
6264

63-
std::shared_ptr<NewCleanupTask> CreateNewCleanupTask();
65+
std::shared_ptr<CleanupTask> CreateCleanupTask();
6466

65-
virtual void Trigger() override;
66-
67-
private:
68-
//
67+
void Trigger() override;
6968
};
7069

7170
export class CheckpointPeriodicTrigger final : public PeriodicTrigger {
7271
public:
73-
explicit CheckpointPeriodicTrigger(i64 interval) : PeriodicTrigger(interval) {}
72+
explicit CheckpointPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
7473

75-
virtual void Trigger() override;
74+
void Trigger() override;
7675
};
7776

78-
export class CompactSegmentPeriodicTrigger final : public PeriodicTrigger {
77+
export class CompactPeriodicTrigger final : public PeriodicTrigger {
7978
public:
80-
explicit CompactSegmentPeriodicTrigger(i64 interval, CompactionProcessor *compact_processor)
79+
explicit CompactPeriodicTrigger(const i64 interval, CompactionProcessor *compact_processor)
8180
: PeriodicTrigger(interval), compact_processor_(compact_processor) {}
8281

83-
explicit CompactSegmentPeriodicTrigger(i64 interval) : PeriodicTrigger(interval), new_compaction_(true) {}
82+
explicit CompactPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
8483

85-
virtual void Trigger() override;
84+
void Trigger() override;
8685

8786
private:
88-
CompactionProcessor *const compact_processor_{};
89-
bool new_compaction_ = false;
87+
CompactionProcessor *compact_processor_{};
88+
std::shared_ptr<NotifyCompactTask> compact_task_{};
9089
};
9190

9291
export class OptimizeIndexPeriodicTrigger final : public PeriodicTrigger {
9392
public:
94-
explicit OptimizeIndexPeriodicTrigger(i64 interval, CompactionProcessor *compact_processor)
93+
explicit OptimizeIndexPeriodicTrigger(const i64 interval, CompactionProcessor *compact_processor)
9594
: PeriodicTrigger(interval), compact_processor_(compact_processor) {}
9695

97-
explicit OptimizeIndexPeriodicTrigger(i64 interval) : PeriodicTrigger(interval), new_optimize_(true) {}
96+
explicit OptimizeIndexPeriodicTrigger(const i64 interval) : PeriodicTrigger(interval) {}
9897

99-
virtual void Trigger() override;
98+
void Trigger() override;
10099

101100
private:
102-
CompactionProcessor *const compact_processor_ = nullptr;
103-
bool new_optimize_ = false;
101+
CompactionProcessor *compact_processor_{};
102+
std::shared_ptr<NotifyOptimizeTask> optimize_task_{};
104103
};
105104

106105
} // namespace infinity

src/storage/bg_task/periodic_trigger_impl.cpp

Lines changed: 19 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -45,7 +45,7 @@ bool PeriodicTrigger::Check() {
4545
return true;
4646
}
4747

48-
std::shared_ptr<NewCleanupTask> NewCleanupPeriodicTrigger::CreateNewCleanupTask() {
48+
std::shared_ptr<CleanupTask> CleanupPeriodicTrigger::CreateCleanupTask() {
4949
auto *bg_processor = InfinityContext::instance().storage()->bg_processor();
5050
auto *new_txn_mgr = InfinityContext::instance().storage()->new_txn_manager();
5151

@@ -55,11 +55,11 @@ std::shared_ptr<NewCleanupTask> NewCleanupPeriodicTrigger::CreateNewCleanupTask(
5555
return nullptr;
5656
}
5757

58-
return std::make_shared<NewCleanupTask>();
58+
return std::make_shared<CleanupTask>();
5959
}
6060

61-
void NewCleanupPeriodicTrigger::Trigger() {
62-
auto cleanup_task = CreateNewCleanupTask();
61+
void CleanupPeriodicTrigger::Trigger() {
62+
auto cleanup_task = CreateCleanupTask();
6363
if (!cleanup_task) {
6464
return;
6565
}
@@ -90,23 +90,25 @@ void CheckpointPeriodicTrigger::Trigger() {
9090
bg_processor->Submit(std::move(checkpoint_task));
9191
}
9292

93-
void CompactSegmentPeriodicTrigger::Trigger() {
94-
LOG_DEBUG(fmt::format("Trigger compact segment task, after {} seconds", duration_.load()));
95-
auto compact_task = std::make_shared<NotifyCompactTask>();
96-
auto *compact_processor = InfinityContext::instance().storage()->compaction_processor();
97-
compact_processor->Submit(std::move(compact_task));
93+
void CompactPeriodicTrigger::Trigger() {
94+
if (compact_task_ != nullptr and !compact_task_->IsComplete()) {
95+
LOG_DEBUG(fmt::format("Skipping compact task, after {} seconds", duration_.load()));
96+
}
97+
98+
LOG_DEBUG(fmt::format("Trigger compact task, after {} seconds", duration_.load()));
99+
compact_task_ = std::make_shared<NotifyCompactTask>();
100+
compact_processor_->Submit(compact_task_);
98101
}
99102

100103
void OptimizeIndexPeriodicTrigger::Trigger() {
101-
LOG_DEBUG(fmt::format("Trigger optimize index task, after {} seconds", duration_.load()));
102-
if (!new_optimize_) {
103-
auto optimize_task = std::make_shared<NotifyOptimizeTask>();
104-
compact_processor_->Submit(std::move(optimize_task));
105-
} else {
106-
auto optimize_task = std::make_shared<NotifyOptimizeTask>(true);
107-
auto *compact_processor = InfinityContext::instance().storage()->compaction_processor();
108-
compact_processor->Submit(std::move(optimize_task));
104+
105+
if (optimize_task_ != nullptr and !optimize_task_->IsComplete()) {
106+
LOG_DEBUG(fmt::format("Skipping optimize index task, after {} seconds", duration_.load()));
109107
}
108+
109+
LOG_DEBUG(fmt::format("Trigger optimize index task, after {} seconds", duration_.load()));
110+
optimize_task_ = std::make_shared<NotifyOptimizeTask>();
111+
compact_processor_->Submit(optimize_task_);
110112
}
111113

112114
} // namespace infinity

0 commit comments

Comments
 (0)