Skip to content

Commit af9c8f2

Browse files
committed
GC
1 parent e68943f commit af9c8f2

File tree

13 files changed

+760
-7
lines changed

13 files changed

+760
-7
lines changed

examples/ucm_config_example.yaml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ ucm_connectors:
1717
storage_backends: "/mnt/test"
1818
io_direct: false
1919
# cache_buffer_capacity_gb: 256
20+
# posix_capacity_gb: 1024
2021

2122
# When you use UcmNfsStore, you should set enable_event_sync to false.
2223
enable_event_sync: true

ucm/integration/vllm/ucm_connector.py

Lines changed: 19 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -193,6 +193,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
193193
)
194194
self.tp_size = self._vllm_config.parallel_config.tensor_parallel_size
195195
self.kv_cache_dtype: torch.dtype = None
196+
self.num_head = vllm_config.model_config.get_num_kv_heads(
197+
vllm_config.parallel_config
198+
)
199+
self.head_size = vllm_config.model_config.get_head_size()
200+
self.element_size = vllm_config.model_config.dtype.itemsize
196201

197202
if current_platform.is_cuda_alike():
198203
logger.info("CUDA device is available.")
@@ -300,6 +305,20 @@ def _create_store(
300305
config["shard_size"] = kv_cache_layout.shard_size * self.blocks_per_chunk
301306
config["block_size"] = kv_cache_layout.block_size * self.blocks_per_chunk
302307
config["local_rank_size"] = self.tp_size if self.is_mla else 1
308+
else:
309+
config_base = self.block_size * self.element_size * self.head_size
310+
config["block_size"] = (
311+
config_base
312+
* self.num_layers
313+
* (1 if self.is_mla else self.num_head * 2)
314+
* self.blocks_per_chunk
315+
)
316+
# GC only enabled for Scheduler with data_parallel_rank == 0
317+
if config.get("posix_gc_enable", False):
318+
dp_rank = self._vllm_config.parallel_config.data_parallel_rank
319+
if self._role == KVConnectorRole.WORKER or dp_rank != 0:
320+
config["posix_gc_enable"] = False
321+
303322
logger.info(f"create {name} with config: {config}")
304323
return UcmConnectorFactoryV1.create_connector(name, config, module_path)
305324

ucm/store/posix/cc/global_config.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,14 @@ struct Config {
4343
size_t commitConcurrency{4};
4444
size_t timeoutMs{30000};
4545
size_t dataDirShardBytes{3};
46+
bool posixGcEnable{true};
47+
double posixGcRecyclePercent{0.1};
48+
size_t posixGcConcurrency{16};
49+
size_t posixGcCheckIntervalSec{30};
50+
size_t posixCapacityGb{0};
51+
double posixGcTriggerThresholdRatio{0.7};
52+
size_t posixGcMaxRecycleCountPerShard{1000000};
53+
double posixGcShardSampleRatio{0.1};
4654
};
4755

4856
} // namespace UC::PosixStore
Lines changed: 78 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,78 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
* */
24+
#include "hotness_tracker.h"
25+
#include <utime.h>
26+
#include "logger/logger.h"
27+
28+
namespace UC::PosixStore {
29+
30+
HotnessTracker::~HotnessTracker()
31+
{
32+
stop_.store(true);
33+
if (utimeWorker_.joinable()) { utimeWorker_.join(); }
34+
}
35+
36+
Status HotnessTracker::Setup(const SpaceLayout* layout)
37+
{
38+
layout_ = layout;
39+
40+
utimeWorker_ = std::thread(&HotnessTracker::UtimeWorkerLoop, this);
41+
return Status::OK();
42+
}
43+
44+
void HotnessTracker::Touch(const Detail::BlockId& blockId)
45+
{
46+
std::lock_guard<std::mutex> lock(queueMtx_);
47+
produceQueue_.push_back(blockId);
48+
}
49+
50+
void HotnessTracker::UtimeWorkerLoop()
51+
{
52+
std::deque<Detail::BlockId> consumeQueue;
53+
constexpr size_t kSpinLimit = 16;
54+
size_t spinCount = 0;
55+
while (!stop_.load()) {
56+
{
57+
std::lock_guard<std::mutex> lock(queueMtx_);
58+
consumeQueue.swap(produceQueue_);
59+
}
60+
if (consumeQueue.empty()) {
61+
if (++spinCount < kSpinLimit) {
62+
std::this_thread::yield();
63+
} else {
64+
std::this_thread::sleep_for(std::chrono::microseconds(100));
65+
spinCount = 0;
66+
}
67+
continue;
68+
}
69+
spinCount = 0;
70+
while (!consumeQueue.empty()) {
71+
auto filePath = layout_->DataFilePath(consumeQueue.front(), false);
72+
utime(filePath.c_str(), nullptr);
73+
consumeQueue.pop_front();
74+
}
75+
}
76+
}
77+
78+
} // namespace UC::PosixStore
Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,60 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
* */
24+
#ifndef UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H
25+
#define UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H
26+
27+
#include <atomic>
28+
#include <deque>
29+
#include <mutex>
30+
#include <thread>
31+
#include "space_layout.h"
32+
#include "type/types.h"
33+
34+
namespace UC::PosixStore {
35+
36+
class HotnessTracker {
37+
public:
38+
HotnessTracker() = default;
39+
HotnessTracker(const HotnessTracker&) = delete;
40+
HotnessTracker& operator=(const HotnessTracker&) = delete;
41+
~HotnessTracker();
42+
43+
Status Setup(const SpaceLayout* layout);
44+
void Touch(const Detail::BlockId& blockId);
45+
46+
private:
47+
void UtimeWorkerLoop();
48+
49+
const SpaceLayout* layout_{nullptr};
50+
51+
std::deque<Detail::BlockId> produceQueue_;
52+
std::mutex queueMtx_;
53+
54+
std::atomic<bool> stop_{false};
55+
std::thread utimeWorker_;
56+
};
57+
58+
} // namespace UC::PosixStore
59+
60+
#endif

ucm/store/posix/cc/posix_store.cc

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,14 +43,17 @@ class PosixStore : public StoreV1 {
4343
UC_ERROR("Failed to check config params: {}.", s);
4444
return s;
4545
}
46+
transEnable_ = config.deviceId >= 0;
47+
48+
ShowConfig(config);
49+
4650
s = spaceMgr_.Setup(config);
4751
if (s.Failure()) [[unlikely]] { return s; }
48-
transEnable_ = config.deviceId >= 0;
52+
4953
if (transEnable_) {
5054
s = transMgr_.Setup(config, spaceMgr_.GetLayout());
5155
if (s.Failure()) [[unlikely]] { return s; }
5256
}
53-
ShowConfig(config);
5457
return Status::OK();
5558
}
5659
std::string Readme() const override { return "PosixStore"; }
@@ -115,6 +118,15 @@ class PosixStore : public StoreV1 {
115118
inConfig.GetNumber("posix_commit_concurrency", config.commitConcurrency);
116119
inConfig.GetNumber("timeout_ms", config.timeoutMs);
117120
inConfig.GetNumber("data_dir_shard_bytes", config.dataDirShardBytes);
121+
inConfig.Get("posix_gc_enable", config.posixGcEnable);
122+
inConfig.Get("posix_gc_recycle_percent", config.posixGcRecyclePercent);
123+
inConfig.GetNumber("posix_gc_concurrency", config.posixGcConcurrency);
124+
inConfig.GetNumber("posix_gc_check_interval_sec", config.posixGcCheckIntervalSec);
125+
inConfig.GetNumber("posix_capacity_gb", config.posixCapacityGb);
126+
inConfig.Get("posix_gc_trigger_threshold_ratio", config.posixGcTriggerThresholdRatio);
127+
inConfig.GetNumber("posix_gc_max_recycle_count_per_shard",
128+
config.posixGcMaxRecycleCountPerShard);
129+
inConfig.Get("posix_gc_shard_sample_ratio", config.posixGcShardSampleRatio);
118130
return config;
119131
}
120132
Status CheckConfig(const Config& config)

ucm/store/posix/cc/shard_gc.cc

Lines changed: 155 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,155 @@
1+
/**
2+
* MIT License
3+
*
4+
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
5+
*
6+
* Permission is hereby granted, free of charge, to any person obtaining a copy
7+
* of this software and associated documentation files (the "Software"), to deal
8+
* in the Software without restriction, including without limitation the rights
9+
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
10+
* copies of the Software, and to permit persons to whom the Software is
11+
* furnished to do so, subject to the following conditions:
12+
*
13+
* The above copyright notice and this permission notice shall be included in all
14+
* copies or substantial portions of the Software.
15+
*
16+
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
17+
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
18+
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
19+
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
20+
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
21+
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
22+
* SOFTWARE.
23+
* */
24+
#include "shard_gc.h"
25+
#include "logger/logger.h"
26+
#include "posix_file.h"
27+
28+
namespace UC::PosixStore {
29+
30+
ShardGarbageCollector::~ShardGarbageCollector() { StopBackgroundCheck(); }
31+
32+
Status ShardGarbageCollector::Setup(const SpaceLayout* layout, const ShardGCConfig& config)
33+
{
34+
layout_ = layout;
35+
config_ = config;
36+
shards_ = layout_->RelativeRoots();
37+
38+
if (shards_.empty()) { return Status::InvalidParam("no shards available"); }
39+
if (config_.recyclePercent <= 0 || config_.recyclePercent > 1.0f) {
40+
return Status::InvalidParam("invalid recycle percent");
41+
}
42+
if (config_.gcConcurrency == 0) { return Status::InvalidParam("invalid gc concurrency"); }
43+
44+
auto success = gcPool_.SetWorkerFn([this](ShardTaskContext& ctx, auto&) { ProcessTask(ctx); })
45+
.SetNWorker(config_.gcConcurrency)
46+
.Run();
47+
48+
if (!success) { return Status::Error("failed to start gc thread pool"); }
49+
50+
UC_INFO(
51+
"ShardGC setup: recyclePercent={}, concurrency={}, maxFileCount={}, "
52+
"thresholdRatio={}, checkIntervalSec={}",
53+
config_.recyclePercent, config_.gcConcurrency, config_.maxFileCount, config_.thresholdRatio,
54+
config_.gcCheckIntervalSec);
55+
56+
if (config_.maxFileCount > 0) {
57+
gcCheckWorker_ = std::thread(&ShardGarbageCollector::GCCheckLoop, this);
58+
while (!stop_.load() && ShouldTrigger()) { Execute(); }
59+
}
60+
61+
return Status::OK();
62+
}
63+
64+
void ShardGarbageCollector::StopBackgroundCheck()
65+
{
66+
{
67+
std::lock_guard<std::mutex> lock(gcCheckMtx_);
68+
stop_ = true;
69+
}
70+
gcCheckCv_.notify_all();
71+
if (gcCheckWorker_.joinable()) { gcCheckWorker_.join(); }
72+
}
73+
74+
void ShardGarbageCollector::GCCheckLoop()
75+
{
76+
while (!stop_.load()) {
77+
{
78+
std::unique_lock<std::mutex> lock(gcCheckMtx_);
79+
gcCheckCv_.wait_for(lock, std::chrono::seconds(config_.gcCheckIntervalSec),
80+
[this] { return stop_.load(); });
81+
}
82+
if (stop_.load()) { break; }
83+
while (!stop_.load() && ShouldTrigger()) { Execute(); }
84+
}
85+
}
86+
87+
void ShardGarbageCollector::Execute()
88+
{
89+
std::shared_ptr<Latch> waiter;
90+
91+
try {
92+
waiter = std::make_shared<Latch>();
93+
} catch (const std::exception& e) {
94+
UC_ERROR("Failed to allocate latch: ", e.what());
95+
return;
96+
}
97+
98+
waiter->Set(shards_.size());
99+
100+
for (const auto& shard : shards_) {
101+
gcPool_.Push({ShardTaskContext::Type::GC, shard, waiter, nullptr});
102+
}
103+
104+
waiter->Wait();
105+
}
106+
107+
bool ShardGarbageCollector::ShouldTrigger()
108+
{
109+
auto sampleShards = layout_->SampleShards(config_.shardSampleRatio);
110+
if (sampleShards.empty()) { return false; }
111+
112+
std::shared_ptr<Latch> waiter;
113+
114+
try {
115+
waiter = std::make_shared<Latch>();
116+
} catch (const std::exception& e) {
117+
UC_ERROR("Failed to allocate latch: {}", e.what());
118+
return false;
119+
}
120+
121+
std::atomic<size_t> sampledFiles{0};
122+
waiter->Set(sampleShards.size());
123+
124+
for (const auto& shard : sampleShards) {
125+
gcPool_.Push({ShardTaskContext::Type::SAMPLE, shard, waiter, &sampledFiles});
126+
}
127+
128+
waiter->Wait();
129+
130+
size_t avgFilesPerShard = sampledFiles.load() / sampleShards.size();
131+
size_t thresholdFilesPerShard = config_.maxFileCount / shards_.size();
132+
133+
UC_INFO("GC sampling: avgFiles/shard={}, threshold={}", avgFilesPerShard,
134+
static_cast<size_t>(thresholdFilesPerShard * config_.thresholdRatio));
135+
136+
return avgFilesPerShard >= static_cast<size_t>(thresholdFilesPerShard * config_.thresholdRatio);
137+
}
138+
139+
void ShardGarbageCollector::ProcessTask(ShardTaskContext& ctx)
140+
{
141+
if (ctx.type == ShardTaskContext::Type::SAMPLE) {
142+
size_t count = layout_->CountFilesInShard(ctx.shard);
143+
ctx.sampledFiles->fetch_add(count, std::memory_order_relaxed);
144+
} else {
145+
auto filesToDelete = layout_->GetOldestFiles(ctx.shard, config_.recyclePercent,
146+
config_.maxRecycleCountPerShard);
147+
for (const auto& blockId : filesToDelete) {
148+
PosixFile{layout_->DataFilePath(blockId, false)}.Remove();
149+
}
150+
}
151+
152+
ctx.waiter->Done();
153+
}
154+
155+
} // namespace UC::PosixStore

0 commit comments

Comments
 (0)