Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions examples/ucm_config_example.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ ucm_connectors:
storage_backends: "/mnt/test"
io_direct: false
# cache_buffer_capacity_gb: 256
# posix_capacity_gb: 1024

# When you use UcmNfsStore, you should set enable_event_sync to false.
enable_event_sync: true
Expand Down
18 changes: 18 additions & 0 deletions ucm/integration/vllm/ucm_connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -213,6 +213,11 @@ def __init__(self, vllm_config: "VllmConfig", role: KVConnectorRole):
)
self.tp_size = self._vllm_config.parallel_config.tensor_parallel_size
self.kv_cache_dtype: torch.dtype = None
self.num_head = vllm_config.model_config.get_num_kv_heads(
vllm_config.parallel_config
)
self.head_size = vllm_config.model_config.get_head_size()
self.element_size = vllm_config.model_config.dtype.itemsize

if current_platform.is_cuda_alike():
logger.info("CUDA device is available.")
Expand Down Expand Up @@ -323,6 +328,19 @@ def _create_store(
config["local_rank_size"] = self.tp_size if self.is_mla else 1
if cpu_affinity_cores:
config["cpu_affinity_cores"] = list(cpu_affinity_cores)
else:
config_base = self.block_size * self.element_size * self.head_size
config["block_size"] = (
config_base
* self.num_layers
* (1 if self.is_mla else self.num_head * 2)
* self.blocks_per_chunk
)
# GC only enabled for Scheduler with data_parallel_rank == 0
dp_rank = self._vllm_config.parallel_config.data_parallel_rank
if self._role == KVConnectorRole.WORKER or dp_rank != 0:
config["posix_gc_enable"] = False

logger.info(f"create {name} with config: {config}")
return UcmConnectorFactoryV1.create_connector(name, config, module_path)

Expand Down
8 changes: 8 additions & 0 deletions ucm/store/posix/cc/global_config.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,14 @@ struct Config {
size_t commitConcurrency{4};
size_t timeoutMs{30000};
size_t dataDirShardBytes{3};
bool posixGcEnable{true};
double posixGcRecyclePercent{0.1};
size_t posixGcConcurrency{16};
size_t posixGcCheckIntervalSec{30};
size_t posixCapacityGb{0};
double posixGcTriggerThresholdRatio{0.7};
size_t posixGcMaxRecycleCountPerShard{1000};
double posixGcShardSampleRatio{0.1};
};

} // namespace UC::PosixStore
Expand Down
82 changes: 82 additions & 0 deletions ucm/store/posix/cc/hotness_tracker.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#include "hotness_tracker.h"
#include <utime.h>
#include "logger/logger.h"

namespace UC::PosixStore {

HotnessTracker::~HotnessTracker()
{
stop_.store(true);
if (utimeWorker_.joinable()) { utimeWorker_.join(); }
}

Status HotnessTracker::Setup(const SpaceLayout* layout)
{
layout_ = layout;
try {
utimeWorker_ = std::thread(&HotnessTracker::UtimeWorkerLoop, this);
} catch (const std::exception& e) {
UC_ERROR("Failed({}) to create utime worker thread.", e.what());
return Status::OutOfMemory();
}
return Status::OK();
}

void HotnessTracker::Touch(const Detail::BlockId& blockId)
{
std::lock_guard<std::mutex> lock(queueMtx_);
produceQueue_.push_back(blockId);
}

void HotnessTracker::UtimeWorkerLoop()
{
std::deque<Detail::BlockId> consumeQueue;
constexpr size_t kSpinLimit = 16;
size_t spinCount = 0;
while (!stop_.load()) {
{
std::lock_guard<std::mutex> lock(queueMtx_);
consumeQueue.swap(produceQueue_);
}
if (consumeQueue.empty()) {
if (++spinCount < kSpinLimit) {
std::this_thread::yield();
} else {
std::this_thread::sleep_for(std::chrono::microseconds(100));
spinCount = 0;
}
continue;
}
spinCount = 0;
while (!consumeQueue.empty()) {
auto filePath = layout_->DataFilePath(consumeQueue.front(), false);
utime(filePath.c_str(), nullptr);
consumeQueue.pop_front();
}
}
}

} // namespace UC::PosixStore
56 changes: 56 additions & 0 deletions ucm/store/posix/cc/hotness_tracker.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/**
* MIT License
*
* Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
* */
#ifndef UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H
#define UNIFIEDCACHE_POSIX_STORE_CC_HOTNESS_TRACKER_H

#include <atomic>
#include <deque>
#include <mutex>
#include <thread>
#include "space_layout.h"
#include "type/types.h"

namespace UC::PosixStore {

class HotnessTracker {
public:
HotnessTracker() = default;
HotnessTracker(const HotnessTracker&) = delete;
HotnessTracker& operator=(const HotnessTracker&) = delete;
~HotnessTracker();
Status Setup(const SpaceLayout* layout);
void Touch(const Detail::BlockId& blockId);

private:
void UtimeWorkerLoop();
const SpaceLayout* layout_{nullptr};
std::deque<Detail::BlockId> produceQueue_;
std::mutex queueMtx_;
std::atomic<bool> stop_{false};
std::thread utimeWorker_;
};

} // namespace UC::PosixStore

#endif
48 changes: 48 additions & 0 deletions ucm/store/posix/cc/posix_store.cc
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,15 @@ class PosixStore : public StoreV1 {
inConfig.GetNumber("posix_commit_concurrency", config.commitConcurrency);
inConfig.GetNumber("timeout_ms", config.timeoutMs);
inConfig.GetNumber("data_dir_shard_bytes", config.dataDirShardBytes);
inConfig.Get("posix_gc_enable", config.posixGcEnable);
inConfig.Get("posix_gc_recycle_percent", config.posixGcRecyclePercent);
inConfig.GetNumber("posix_gc_concurrency", config.posixGcConcurrency);
inConfig.GetNumber("posix_gc_check_interval_sec", config.posixGcCheckIntervalSec);
inConfig.GetNumber("posix_capacity_gb", config.posixCapacityGb);
inConfig.Get("posix_gc_trigger_threshold_ratio", config.posixGcTriggerThresholdRatio);
inConfig.GetNumber("posix_gc_max_recycle_count_per_shard",
config.posixGcMaxRecycleCountPerShard);
inConfig.Get("posix_gc_shard_sample_ratio", config.posixGcShardSampleRatio);
return config;
}
Status CheckConfig(const Config& config)
Expand Down Expand Up @@ -157,6 +166,33 @@ class PosixStore : public StoreV1 {
} else {
return Status::InvalidParam("invalid io engine({})", config.ioEngine);
}
if (config.posixGcEnable && config.posixCapacityGb > 0) {
if (config.posixGcRecyclePercent <= 0 || config.posixGcRecyclePercent > 1.0) {
return Status::InvalidParam("invalid gc recycle percent({})",
config.posixGcRecyclePercent);
}
if (config.posixGcConcurrency == 0) {
return Status::InvalidParam("invalid gc concurrency({})",
config.posixGcConcurrency);
}
if (config.posixGcCheckIntervalSec == 0) {
return Status::InvalidParam("invalid gc check interval({})",
config.posixGcCheckIntervalSec);
}
if (config.posixGcTriggerThresholdRatio <= 0 ||
config.posixGcTriggerThresholdRatio > 1.0) {
return Status::InvalidParam("invalid gc trigger threshold ratio({})",
config.posixGcTriggerThresholdRatio);
}
if (config.posixGcMaxRecycleCountPerShard == 0) {
return Status::InvalidParam("invalid gc max recycle count per shard({})",
config.posixGcMaxRecycleCountPerShard);
}
if (config.posixGcShardSampleRatio <= 0 || config.posixGcShardSampleRatio > 1.0) {
return Status::InvalidParam("invalid gc shard sample ratio({})",
config.posixGcShardSampleRatio);
}
}
return Status::OK();
}
void ShowConfig(const Config& config)
Expand All @@ -179,6 +215,18 @@ class PosixStore : public StoreV1 {
UC_INFO("Set {}::CommitConcurrency to {}.", ns, config.commitConcurrency);
UC_INFO("Set {}::TimeoutMs to {}.", ns, config.timeoutMs);
UC_INFO("Set {}::DataDirShardBytes to {}.", ns, config.dataDirShardBytes);
if (config.posixGcEnable && config.posixCapacityGb > 0) {
UC_INFO("Set {}::PosixGcEnable to {}.", ns, config.posixGcEnable);
UC_INFO("Set {}::PosixCapacityGb to {}.", ns, config.posixCapacityGb);
UC_INFO("Set {}::PosixGcRecyclePercent to {}.", ns, config.posixGcRecyclePercent);
UC_INFO("Set {}::PosixGcConcurrency to {}.", ns, config.posixGcConcurrency);
UC_INFO("Set {}::PosixGcCheckIntervalSec to {}.", ns, config.posixGcCheckIntervalSec);
UC_INFO("Set {}::PosixGcTriggerThresholdRatio to {}.", ns,
config.posixGcTriggerThresholdRatio);
UC_INFO("Set {}::PosixGcMaxRecycleCountPerShard to {}.", ns,
config.posixGcMaxRecycleCountPerShard);
UC_INFO("Set {}::PosixGcShardSampleRatio to {}.", ns, config.posixGcShardSampleRatio);
}
}
};

Expand Down
Loading
Loading