diff --git a/examples/ucm_config_example.yaml b/examples/ucm_config_example.yaml index b5cdb4cf5..c5674ff52 100644 --- a/examples/ucm_config_example.yaml +++ b/examples/ucm_config_example.yaml @@ -17,6 +17,7 @@ ucm_connectors: storage_backends: "/mnt/test" io_direct: false # cache_buffer_capacity_gb: 256 + # posix_capacity_gb: 1024 # When you use UcmNfsStore, you should set enable_event_sync to false. enable_event_sync: true diff --git a/ucm/integration/vllm/ucm_connector.py b/ucm/integration/vllm/ucm_connector.py index a6048a073..f72980007 100644 --- a/ucm/integration/vllm/ucm_connector.py +++ b/ucm/integration/vllm/ucm_connector.py @@ -213,6 +213,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole): ) self.tp_size = self._vllm_config.parallel_config.tensor_parallel_size self.kv_cache_dtype: torch.dtype = None + self.num_head = vllm_config.model_config.get_num_kv_heads( + vllm_config.parallel_config + ) + self.head_size = vllm_config.model_config.get_head_size() + self.element_size = vllm_config.model_config.dtype.itemsize if current_platform.is_cuda_alike(): logger.info("CUDA device is available.") @@ -323,6 +328,19 @@ def _create_store( config["local_rank_size"] = self.tp_size if self.is_mla else 1 if cpu_affinity_cores: config["cpu_affinity_cores"] = list(cpu_affinity_cores) + else: + config_base = self.block_size * self.element_size * self.head_size + config["block_size"] = ( + config_base + * self.num_layers + * (1 if self.is_mla else self.num_head * 2) + * self.blocks_per_chunk + ) + # GC only enabled for Scheduler with data_parallel_rank == 0 + dp_rank = self._vllm_config.parallel_config.data_parallel_rank + if self._role == KVConnectorRole.WORKER or dp_rank != 0: + config["posix_gc_enable"] = False + logger.info(f"create {name} with config: {config}") return UcmConnectorFactoryV1.create_connector(name, config, module_path) diff --git a/ucm/store/posix/cc/global_config.h b/ucm/store/posix/cc/global_config.h index 5e609f23d..6692afee3 100644 --- a/ucm/store/posix/cc/global_config.h +++ b/ucm/store/posix/cc/global_config.h @@ -44,6 +44,14 @@ struct Config { size_t commitConcurrency{4}; size_t timeoutMs{30000}; size_t dataDirShardBytes{3}; + bool posixGcEnable{true}; + double posixGcRecyclePercent{0.1}; + size_t posixGcConcurrency{16}; + size_t posixGcCheckIntervalSec{30}; + size_t posixCapacityGb{0}; + double posixGcTriggerThresholdRatio{0.7}; + size_t posixGcMaxRecycleCountPerShard{1000}; + double posixGcShardSampleRatio{0.1}; }; } // namespace UC::PosixStore diff --git a/ucm/store/posix/cc/hotness_tracker.cc b/ucm/store/posix/cc/hotness_tracker.cc new file mode 100644 index 000000000..1e7970077 --- /dev/null +++ b/ucm/store/posix/cc/hotness_tracker.cc @@ -0,0 +1,82 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "hotness_tracker.h" +#include +#include "logger/logger.h" + +namespace UC::PosixStore { + +HotnessTracker::~HotnessTracker() +{ + stop_.store(true); + if (utimeWorker_.joinable()) { utimeWorker_.join(); } +} + +Status HotnessTracker::Setup(const SpaceLayout* layout) +{ + layout_ = layout; + try { + utimeWorker_ = std::thread(&HotnessTracker::UtimeWorkerLoop, this); + } catch (const std::exception& e) { + UC_ERROR("Failed({}) to create utime worker thread.", e.what()); + return Status::OutOfMemory(); + } + return Status::OK(); +} + +void HotnessTracker::Touch(const Detail::BlockId& blockId) +{ + std::lock_guard lock(queueMtx_); + produceQueue_.push_back(blockId); +} + +void HotnessTracker::UtimeWorkerLoop() +{ + std::deque consumeQueue; + constexpr size_t kSpinLimit = 16; + size_t spinCount = 0; + while (!stop_.load()) { + { + std::lock_guard lock(queueMtx_); + consumeQueue.swap(produceQueue_); + } + if (consumeQueue.empty()) { + if (++spinCount < kSpinLimit) { + std::this_thread::yield(); + } else { + std::this_thread::sleep_for(std::chrono::microseconds(100)); + spinCount = 0; + } + continue; + } + spinCount = 0; + while (!consumeQueue.empty()) { + auto filePath = layout_->DataFilePath(consumeQueue.front(), false); + utime(filePath.c_str(), nullptr); + consumeQueue.pop_front(); + } + } +} + +} // namespace UC::PosixStore diff --git a/ucm/store/posix/cc/hotness_tracker.h b/ucm/store/posix/cc/hotness_tracker.h new file mode 100644 index 000000000..9f33287bf --- /dev/null +++ b/ucm/store/posix/cc/hotness_tracker.h @@ -0,0 +1,56 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H +#define UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H + +#include +#include +#include +#include +#include "space_layout.h" +#include "type/types.h" + +namespace UC::PosixStore { + +class HotnessTracker { +public: + HotnessTracker() = default; + HotnessTracker(const HotnessTracker&) = delete; + HotnessTracker& operator=(const HotnessTracker&) = delete; + ~HotnessTracker(); + Status Setup(const SpaceLayout* layout); + void Touch(const Detail::BlockId& blockId); + +private: + void UtimeWorkerLoop(); + const SpaceLayout* layout_{nullptr}; + std::deque produceQueue_; + std::mutex queueMtx_; + std::atomic stop_{false}; + std::thread utimeWorker_; +}; + +} // namespace UC::PosixStore + +#endif diff --git a/ucm/store/posix/cc/posix_store.cc b/ucm/store/posix/cc/posix_store.cc index a8a76f7f3..f1c5c6d78 100644 --- a/ucm/store/posix/cc/posix_store.cc +++ b/ucm/store/posix/cc/posix_store.cc @@ -116,6 +116,15 @@ class PosixStore : public StoreV1 { inConfig.GetNumber("posix_commit_concurrency", config.commitConcurrency); inConfig.GetNumber("timeout_ms", config.timeoutMs); inConfig.GetNumber("data_dir_shard_bytes", config.dataDirShardBytes); + inConfig.Get("posix_gc_enable", config.posixGcEnable); + inConfig.Get("posix_gc_recycle_percent", config.posixGcRecyclePercent); + inConfig.GetNumber("posix_gc_concurrency", config.posixGcConcurrency); + inConfig.GetNumber("posix_gc_check_interval_sec", config.posixGcCheckIntervalSec); + inConfig.GetNumber("posix_capacity_gb", config.posixCapacityGb); + inConfig.Get("posix_gc_trigger_threshold_ratio", config.posixGcTriggerThresholdRatio); + inConfig.GetNumber("posix_gc_max_recycle_count_per_shard", + config.posixGcMaxRecycleCountPerShard); + inConfig.Get("posix_gc_shard_sample_ratio", config.posixGcShardSampleRatio); return config; } Status CheckConfig(const Config& config) @@ -157,6 +166,33 @@ class PosixStore : public StoreV1 { } else { return Status::InvalidParam("invalid io engine({})", config.ioEngine); } + if (config.posixGcEnable && config.posixCapacityGb > 0) { + if (config.posixGcRecyclePercent <= 0 || config.posixGcRecyclePercent > 1.0) { + return Status::InvalidParam("invalid gc recycle percent({})", + config.posixGcRecyclePercent); + } + if (config.posixGcConcurrency == 0) { + return Status::InvalidParam("invalid gc concurrency({})", + config.posixGcConcurrency); + } + if (config.posixGcCheckIntervalSec == 0) { + return Status::InvalidParam("invalid gc check interval({})", + config.posixGcCheckIntervalSec); + } + if (config.posixGcTriggerThresholdRatio <= 0 || + config.posixGcTriggerThresholdRatio > 1.0) { + return Status::InvalidParam("invalid gc trigger threshold ratio({})", + config.posixGcTriggerThresholdRatio); + } + if (config.posixGcMaxRecycleCountPerShard == 0) { + return Status::InvalidParam("invalid gc max recycle count per shard({})", + config.posixGcMaxRecycleCountPerShard); + } + if (config.posixGcShardSampleRatio <= 0 || config.posixGcShardSampleRatio > 1.0) { + return Status::InvalidParam("invalid gc shard sample ratio({})", + config.posixGcShardSampleRatio); + } + } return Status::OK(); } void ShowConfig(const Config& config) @@ -179,6 +215,18 @@ class PosixStore : public StoreV1 { UC_INFO("Set {}::CommitConcurrency to {}.", ns, config.commitConcurrency); UC_INFO("Set {}::TimeoutMs to {}.", ns, config.timeoutMs); UC_INFO("Set {}::DataDirShardBytes to {}.", ns, config.dataDirShardBytes); + if (config.posixGcEnable && config.posixCapacityGb > 0) { + UC_INFO("Set {}::PosixGcEnable to {}.", ns, config.posixGcEnable); + UC_INFO("Set {}::PosixCapacityGb to {}.", ns, config.posixCapacityGb); + UC_INFO("Set {}::PosixGcRecyclePercent to {}.", ns, config.posixGcRecyclePercent); + UC_INFO("Set {}::PosixGcConcurrency to {}.", ns, config.posixGcConcurrency); + UC_INFO("Set {}::PosixGcCheckIntervalSec to {}.", ns, config.posixGcCheckIntervalSec); + UC_INFO("Set {}::PosixGcTriggerThresholdRatio to {}.", ns, + config.posixGcTriggerThresholdRatio); + UC_INFO("Set {}::PosixGcMaxRecycleCountPerShard to {}.", ns, + config.posixGcMaxRecycleCountPerShard); + UC_INFO("Set {}::PosixGcShardSampleRatio to {}.", ns, config.posixGcShardSampleRatio); + } } }; diff --git a/ucm/store/posix/cc/shard_gc.cc b/ucm/store/posix/cc/shard_gc.cc new file mode 100644 index 000000000..96e0677a2 --- /dev/null +++ b/ucm/store/posix/cc/shard_gc.cc @@ -0,0 +1,156 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#include "shard_gc.h" +#include "logger/logger.h" + +namespace UC::PosixStore { + +ShardGarbageCollector::~ShardGarbageCollector() { StopBackgroundCheck(); } + +Status ShardGarbageCollector::ValidateAndInitCapacity() +{ + size_t storageCapacityBytes = config_.posixCapacityGb * 1024ULL * 1024ULL * 1024ULL; + maxFileCount_ = storageCapacityBytes / config_.blockSize; + size_t thresholdFilesPerShard = static_cast( + maxFileCount_ / layout_->SampleShards(1.0).size() * config_.posixGcTriggerThresholdRatio); + size_t recycleNum = static_cast(thresholdFilesPerShard * config_.posixGcRecyclePercent); + if (recycleNum == 0) { + size_t minFilesPerShard = static_cast(1.0 / (config_.posixGcTriggerThresholdRatio * + config_.posixGcRecyclePercent)) + + 1; + size_t minCapacityBytes = + minFilesPerShard * layout_->SampleShards(1.0).size() * config_.blockSize; + size_t minCapacityGb = + (minCapacityBytes + 1024ULL * 1024ULL * 1024ULL - 1) / (1024ULL * 1024ULL * 1024ULL); + return Status::InvalidParam( + "posix_capacity_gb({}) is too small, GC cannot recycle any files. " + "Minimum recommended: {}GB", + config_.posixCapacityGb, minCapacityGb); + } + + return Status::OK(); +} + +Status ShardGarbageCollector::Setup(const SpaceLayout* layout, const Config& config) +{ + layout_ = layout; + config_ = config; + auto s = ValidateAndInitCapacity(); + if (s.Failure()) { return s; } + auto success = gcPool_.SetWorkerFn([this](ShardTaskContext& ctx, auto&) { ProcessTask(ctx); }) + .SetNWorker(config_.posixGcConcurrency) + .Run(); + if (!success) { return Status::Error("failed to start gc thread pool"); } + try { + gcCheckWorker_ = std::thread(&ShardGarbageCollector::GCCheckLoop, this); + } catch (const std::exception& e) { + UC_ERROR("Failed({}) to create gc check worker thread.", e.what()); + return Status::OutOfMemory(); + } + return Status::OK(); +} + +void ShardGarbageCollector::StopBackgroundCheck() +{ + { + std::lock_guard lock(gcCheckMtx_); + stop_ = true; + } + gcCheckCv_.notify_all(); + if (gcCheckWorker_.joinable()) { gcCheckWorker_.join(); } +} + +void ShardGarbageCollector::GCCheckLoop() +{ + while (!stop_.load()) { + auto [trigger, avgFilesPerShard, threshold] = ShouldTrigger(); + UC_INFO("GC sampling: avgFiles/shard={}, threshold={}, trigger={}", avgFilesPerShard, + threshold, trigger); + int rounds = 0; + while (!stop_.load() && trigger) { + bool gcLimited = Execute(); + rounds++; + if (gcLimited) { continue; } + std::tie(trigger, avgFilesPerShard, threshold) = ShouldTrigger(); + } + if (rounds > 0) { + UC_INFO("GC completed: rounds={}, avgFiles/shard={}, threshold={}", rounds, + avgFilesPerShard, threshold); + } + { + std::unique_lock lock(gcCheckMtx_); + gcCheckCv_.wait_for(lock, std::chrono::seconds(config_.posixGcCheckIntervalSec), + [this] { return stop_.load(); }); + } + if (stop_.load()) { break; } + } +} + +bool ShardGarbageCollector::Execute() +{ + auto waiter = std::make_shared(); + auto shards = layout_->SampleShards(1.0); + waiter->Set(shards.size()); + std::atomic gcLimited{false}; + for (const auto& shard : shards) { + gcPool_.Push({ShardTaskContext::Type::GC, shard, waiter, nullptr, &gcLimited}); + } + waiter->Wait(); + return gcLimited.load(); +} + +std::tuple ShardGarbageCollector::ShouldTrigger() +{ + auto sampleShards = layout_->SampleShards(config_.posixGcShardSampleRatio); + auto waiter = std::make_shared(); + std::atomic sampledFiles{0}; + waiter->Set(sampleShards.size()); + for (const auto& shard : sampleShards) { + gcPool_.Push({ShardTaskContext::Type::SAMPLE, shard, waiter, &sampledFiles}); + } + waiter->Wait(); + size_t avgFilesPerShard = sampledFiles.load() / sampleShards.size(); + size_t thresholdFilesPerShard = maxFileCount_ / layout_->SampleShards(1.0).size(); + size_t threshold = + static_cast(thresholdFilesPerShard * config_.posixGcTriggerThresholdRatio); + return {avgFilesPerShard >= threshold, avgFilesPerShard, threshold}; +} + +void ShardGarbageCollector::ProcessTask(ShardTaskContext& ctx) +{ + if (ctx.type == ShardTaskContext::Type::SAMPLE) { + size_t count = layout_->CountFilesInShard(ctx.shard); + ctx.sampledFiles->fetch_add(count, std::memory_order_relaxed); + } else { + auto filesToDelete = layout_->GetOldestFiles(ctx.shard, config_.posixGcRecyclePercent, + config_.posixGcMaxRecycleCountPerShard); + for (const auto& blockId : filesToDelete) { layout_->RemoveFile(blockId); } + if (filesToDelete.size() >= config_.posixGcMaxRecycleCountPerShard) { + ctx.gcLimited->store(true, std::memory_order_relaxed); + } + } + ctx.waiter->Done(); +} + +} // namespace UC::PosixStore diff --git a/ucm/store/posix/cc/shard_gc.h b/ucm/store/posix/cc/shard_gc.h new file mode 100644 index 000000000..74ea7e638 --- /dev/null +++ b/ucm/store/posix/cc/shard_gc.h @@ -0,0 +1,79 @@ +/** + * MIT License + * + * Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. + * + * Permission is hereby granted, free of charge, to any person obtaining a copy + * of this software and associated documentation files (the "Software"), to deal + * in the Software without restriction, including without limitation the rights + * to use, copy, modify, merge, publish, distribute, sublicense, and/or sell + * copies of the Software, and to permit persons to whom the Software is + * furnished to do so, subject to the following conditions: + * + * The above copyright notice and this permission notice shall be included in all + * copies or substantial portions of the Software. + * + * THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR + * IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, + * FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE + * AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER + * LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, + * OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE + * SOFTWARE. + * */ +#ifndef UNIFIEDCACHE_POSIX_STORE_CC_SHARD_GC_H +#define UNIFIEDCACHE_POSIX_STORE_CC_SHARD_GC_H + +#include +#include +#include +#include +#include +#include +#include +#include +#include "global_config.h" +#include "space_layout.h" +#include "status/status.h" +#include "thread/latch.h" +#include "thread/thread_pool.h" + +namespace UC::PosixStore { + +struct ShardTaskContext { + enum class Type { GC, SAMPLE }; + Type type; + std::string shard; + std::shared_ptr waiter; + std::atomic* sampledFiles{nullptr}; + std::atomic* gcLimited{nullptr}; +}; + +class ShardGarbageCollector { +public: + ShardGarbageCollector() = default; + ShardGarbageCollector(const ShardGarbageCollector&) = delete; + ShardGarbageCollector& operator=(const ShardGarbageCollector&) = delete; + ~ShardGarbageCollector(); + Status Setup(const SpaceLayout* layout, const Config& config); + +private: + Status ValidateAndInitCapacity(); + bool Execute(); + std::tuple ShouldTrigger(); + void ProcessTask(ShardTaskContext& ctx); + void GCCheckLoop(); + void StopBackgroundCheck(); + const SpaceLayout* layout_{nullptr}; + Config config_; + size_t maxFileCount_{0}; + ThreadPool gcPool_; + std::thread gcCheckWorker_; + std::mutex gcCheckMtx_; + std::condition_variable gcCheckCv_; + std::atomic stop_{false}; +}; + +} // namespace UC::PosixStore + +#endif diff --git a/ucm/store/posix/cc/space_layout.cc b/ucm/store/posix/cc/space_layout.cc index f10a9d4c1..4e5eaf4f0 100644 --- a/ucm/store/posix/cc/space_layout.cc +++ b/ucm/store/posix/cc/space_layout.cc @@ -23,15 +23,33 @@ * */ #include "space_layout.h" #include +#include +#include #include +#include +#include +#include #include "logger/logger.h" #include "posix_file.h" +#include "template/topn_heap.h" namespace UC::PosixStore { static const std::string DATA_ROOT = "data"; static const std::string ACTIVATED_FILE_EXTENSION = ".tmp"; +struct FileInfo { + Detail::BlockId blockId; + time_t mtime; +}; + +struct MtimeComparator { + bool operator()(const FileInfo& lhs, const FileInfo& rhs) const + { + return lhs.mtime > rhs.mtime; + } +}; + inline std::string DataFileName(const Detail::BlockId& blockId) { return fmt::format("{:02x}", fmt::join(blockId, "")); @@ -64,6 +82,7 @@ Status SpaceLayout::Setup(const Config& config) for (auto& path : config.storageBackends) { if ((status = AddStorageBackend(path)).Failure()) { return status; } } + shards_ = RelativeRoots(); return status; } @@ -88,6 +107,12 @@ Status SpaceLayout::CommitFile(const Detail::BlockId& blockId, bool success) con return s; } +Status SpaceLayout::RemoveFile(const Detail::BlockId& blockId) const +{ + PosixFile{DataFilePath(blockId, false)}.Remove(); + return Status::OK(); +} + std::vector SpaceLayout::RelativeRoots() const { if (dataDirShard_) { return GenerateHexStrings(dataDirShardBytes_); } @@ -144,4 +169,91 @@ std::string SpaceLayout::StorageBackend(const Detail::BlockId& blockId) const return storageBackends_[hasher(blockId) % number]; } +static Detail::BlockId HexToBlockId(const char* hexStr) +{ + Detail::BlockId blockId; + for (size_t i = 0; i < 16; ++i) { + uint8_t high = static_cast(hexStr[i * 2]); + uint8_t low = static_cast(hexStr[i * 2 + 1]); + high = (high <= '9') ? (high - '0') : (high - 'a' + 10); + low = (low <= '9') ? (low - '0') : (low - 'a' + 10); + blockId[i] = static_cast((high << 4) | low); + } + return blockId; +} + +std::vector SpaceLayout::SampleShards(double sampleRatio) const +{ + if (sampleRatio == 1.0) { return shards_; } + auto shards = shards_; + size_t sampleCount = + std::max(static_cast(1), static_cast(shards.size() * sampleRatio)); + std::random_device rd; + std::mt19937 gen(rd()); + std::shuffle(shards.begin(), shards.end(), gen); + shards.resize(sampleCount); + return shards; +} + +size_t SpaceLayout::CountFilesInShard(const std::string& shard) const +{ + std::string shardPath = storageBackends_.front(); + shardPath += shard; + DIR* dir = opendir(shardPath.c_str()); + if (!dir) { return 0; } + size_t count = 0; + struct dirent* entry; + while ((entry = readdir(dir)) != nullptr) { + if (entry->d_name[0] == '.') { continue; } + if (strstr(entry->d_name, ACTIVATED_FILE_EXTENSION.c_str()) != nullptr) { continue; } + ++count; + } + closedir(dir); + return count; +} + +static size_t ScanFilesInShard(const std::string& shardPath, + TopNHeap& heap) +{ + DIR* dir = opendir(shardPath.c_str()); + if (!dir) { return 0; } + size_t totalFiles = 0; + struct dirent* entry; + while ((entry = readdir(dir)) != nullptr) { + if (entry->d_name[0] == '.') { continue; } + if (strstr(entry->d_name, ACTIVATED_FILE_EXTENSION.c_str()) != nullptr) { continue; } + std::string filePath = shardPath + "/" + entry->d_name; + struct stat st; + if (stat(filePath.c_str(), &st) != 0) { continue; } + if (!S_ISREG(st.st_mode)) { continue; } + heap.Push({HexToBlockId(entry->d_name), st.st_mtime}); + ++totalFiles; + } + closedir(dir); + return totalFiles; +} + +std::vector SpaceLayout::GetOldestFiles(const std::string& shard, + double recyclePercent, + size_t maxRecycleCount) const +{ + std::string shardPath = storageBackends_.front(); + shardPath += shard; + auto heap = std::make_unique>(maxRecycleCount); + size_t totalFiles = ScanFilesInShard(shardPath, *heap); + if (totalFiles == 0) { return {}; } + size_t recycleNum = static_cast(totalFiles * recyclePercent); + if (recycleNum == 0) { return {}; } + recycleNum = std::min(recycleNum, maxRecycleCount); + size_t skipCount = heap->Size() - recycleNum; + for (size_t i = 0; i < skipCount; ++i) { heap->Pop(); } + std::vector result; + result.reserve(recycleNum); + while (!heap->Empty()) { + result.push_back(heap->Top().blockId); + heap->Pop(); + } + return result; +} + } // namespace UC::PosixStore diff --git a/ucm/store/posix/cc/space_layout.h b/ucm/store/posix/cc/space_layout.h index f26921f34..212ace783 100644 --- a/ucm/store/posix/cc/space_layout.h +++ b/ucm/store/posix/cc/space_layout.h @@ -31,7 +31,9 @@ namespace UC::PosixStore { class SpaceLayout { +private: std::vector storageBackends_; + std::vector shards_; bool dataDirShard_; size_t dataDirShardBytes_; @@ -39,6 +41,11 @@ class SpaceLayout { Status Setup(const Config& config); std::string DataFilePath(const Detail::BlockId& blockId, bool activated) const; Status CommitFile(const Detail::BlockId& blockId, bool success) const; + Status RemoveFile(const Detail::BlockId& blockId) const; + std::vector SampleShards(double sampleRatio) const; + size_t CountFilesInShard(const std::string& shard) const; + std::vector GetOldestFiles(const std::string& shard, double recyclePercent, + size_t maxRecycleCount) const; private: std::vector RelativeRoots() const; diff --git a/ucm/store/posix/cc/space_manager.cc b/ucm/store/posix/cc/space_manager.cc index 609b4da13..4d2a89883 100644 --- a/ucm/store/posix/cc/space_manager.cc +++ b/ucm/store/posix/cc/space_manager.cc @@ -29,8 +29,15 @@ namespace UC::PosixStore { Status SpaceManager::Setup(const Config& config) { + gcEnable_ = config.posixGcEnable && config.posixCapacityGb > 0; auto s = layout_.Setup(config); if (s.Failure()) [[unlikely]] { return s; } + if (gcEnable_) { + s = hotnessTracker_.Setup(&layout_); + if (s.Failure()) [[unlikely]] { return s; } + s = gcMgr_.Setup(&layout_, config); + if (s.Failure()) [[unlikely]] { return s; } + } auto prefixSuccess = prefixLookupSrv_ .SetWorkerFn([this](PrefixLookupContext& ctx, auto&) { OnLookupPrefix(ctx); }) @@ -122,6 +129,7 @@ void SpaceManager::OnLookupPrefix(PrefixLookupContext& ctx) } break; } + if (gcEnable_) { hotnessTracker_.Touch(*(ctx.blocks + i)); } } ctx.waiter->Done(); } diff --git a/ucm/store/posix/cc/space_manager.h b/ucm/store/posix/cc/space_manager.h index 9836ffcc6..a549e4e98 100644 --- a/ucm/store/posix/cc/space_manager.h +++ b/ucm/store/posix/cc/space_manager.h @@ -25,6 +25,8 @@ #define UNIFIEDCACHE_POSIX_STORE_CC_SPACE_MANAGER_H #include "global_config.h" +#include "hotness_tracker.h" +#include "shard_gc.h" #include "space_layout.h" #include "thread/latch.h" #include "thread/thread_pool.h" @@ -45,6 +47,9 @@ class SpaceManager { private: SpaceLayout layout_; ThreadPool prefixLookupSrv_; + HotnessTracker hotnessTracker_; + ShardGarbageCollector gcMgr_; + bool gcEnable_{false}; public: Status Setup(const Config& config); diff --git a/ucm/store/test/e2e/posixstore_gc_test.py b/ucm/store/test/e2e/posixstore_gc_test.py new file mode 100644 index 000000000..1268c4b7d --- /dev/null +++ b/ucm/store/test/e2e/posixstore_gc_test.py @@ -0,0 +1,146 @@ +# MIT License +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import mmap +import os +import secrets +import time + +import numpy as np + +from ucm.store.factory_v1 import UcmConnectorFactoryV1, UcmKVStoreBaseV1 + + +def setup( + backends: list[str], + block_size: int, + data_trans_concur: int, + lookup_concur: int, + io_direct: bool, + worker: bool, +) -> UcmKVStoreBaseV1: + module_path = "ucm.store.pipeline.connector" + class_name = "UcmPipelineStore" + config = {} + config["store_pipeline"] = "Posix" + config["storage_backends"] = backends + config["tensor_size"] = block_size + config["shard_size"] = block_size + config["block_size"] = block_size + config["posix_io_engine"] = "psync" + config["posix_data_trans_concurrency"] = data_trans_concur + config["posix_lookup_concurrency"] = lookup_concur + config["io_direct"] = io_direct + config["device_id"] = 0 if worker else -1 + + # GC配置 + config["posix_gc_enable"] = False if worker else True + config["posix_capacity_gb"] = 60 + config["posix_gc_trigger_threshold_ratio"] = 0.7 + config["posix_gc_recycle_percent"] = 0.1 + config["posix_gc_concurrency"] = 16 + config["posix_gc_check_interval_sec"] = 10 + + return UcmConnectorFactoryV1.create_connector(class_name, config, module_path) + + +def make_array(size, alignment=4096, dtype=np.uint8) -> np.ndarray: + itemsize = np.dtype(dtype).itemsize + total_bytes = size * itemsize + mm = mmap.mmap(-1, total_bytes + alignment) + raw_array = np.frombuffer(mm, dtype=np.uint8, count=total_bytes + alignment) + raw_ptr = raw_array.__array_interface__["data"][0] + aligned_addr = (raw_ptr + alignment - 1) & ~(alignment - 1) + offset = aligned_addr - raw_ptr + array = raw_array[offset : offset + total_bytes].view(dtype=dtype) + return array + + +def main(): + backends = ["./build/data"] + block_size = 1048576 + data_trans_concur = 8 + lookup_concur = 8 + io_direct = True + worker = setup( + backends, block_size, data_trans_concur, lookup_concur, io_direct, True + ) + scheduler = setup( + backends, block_size, data_trans_concur, lookup_concur, io_direct, False + ) + batch_number = 64 + batch_size = 1024 + data_size = block_size * batch_size + raw_data1 = [make_array(block_size) for _ in range(batch_size)] + raw_data2 = [make_array(block_size) for _ in range(batch_size)] + data1 = [[d.ctypes.data] for d in raw_data1] + data2 = [[d.ctypes.data] for d in raw_data2] + for idx in range(batch_number): + block_ids = [secrets.token_bytes(16) for _ in range(batch_size)] + shard_idxes = [0 for _ in range(batch_size)] + + tp = time.perf_counter() + founds = scheduler.lookup(block_ids) + cost_fully_lookup1 = time.perf_counter() - tp + assert not any(founds) + + tp = time.perf_counter() + found_idx = scheduler.lookup_on_prefix(block_ids) + cost_prefix_lookup1 = time.perf_counter() - tp + assert found_idx == -1 + + tp = time.perf_counter() + handle = worker.dump_data(block_ids, shard_idxes, data1) + worker.wait(handle) + cost_dump = time.perf_counter() - tp + + tp = time.perf_counter() + founds = scheduler.lookup(block_ids) + cost_fully_lookup2 = time.perf_counter() - tp + assert all(founds) + + tp = time.perf_counter() + found_idx = scheduler.lookup_on_prefix(block_ids) + cost_prefix_lookup2 = time.perf_counter() - tp + assert found_idx == batch_size - 1 + + tp = time.perf_counter() + handle = worker.load_data(block_ids, shard_idxes, data2) + worker.wait(handle) + cost_load = time.perf_counter() - tp + + bw_dump = data_size / cost_dump + bw_load = data_size / cost_load + print( + f"[{idx:03}/{batch_number:03}] [{block_size}] [{batch_size}] " + f"fully_lookup1={cost_fully_lookup1 * 1e3:.3f}ms, " + f"prefix_lookup1={cost_prefix_lookup1 * 1e3:.3f}ms, " + f"fully_lookup2={cost_fully_lookup2 * 1e3:.3f}ms, " + f"prefix_lookup2={cost_prefix_lookup2 * 1e3:.3f}ms, " + f"dump={cost_dump * 1e3:.3f}ms, load={cost_load * 1e3:.3f}ms, " + f"bw_dump={bw_dump / 1e9:.3f}GB/s, bw_load={bw_load / 1e9:.3f}GB/s." + ) + + +if __name__ == "__main__": + os.environ["UC_LOGGER_LEVEL"] = "info" + main()