From 1b990bd1cb8439065c36d9354f6c3747091a455b Mon Sep 17 00:00:00 2001 From: "Mag1c.H" Date: Wed, 1 Apr 2026 16:59:27 +0800 Subject: [PATCH 1/2] [Perf] Pipeline-friendly shard task submission --- ucm/store/cache/cc/load_queue.cc | 60 +++++++++++++------------------- ucm/store/cache/cc/load_queue.h | 1 - 2 files changed, 24 insertions(+), 37 deletions(-) diff --git a/ucm/store/cache/cc/load_queue.cc b/ucm/store/cache/cc/load_queue.cc index c58389916..81836ceb5 100644 --- a/ucm/store/cache/cc/load_queue.cc +++ b/ucm/store/cache/cc/load_queue.cc @@ -82,42 +82,34 @@ void LoadQueue::DispatchOneTask(TaskPair&& pair) } auto tp = waiter->startTp; auto tpWait = NowTime::Now(); - Detail::TaskDesc backendTaskDesc; - backendTaskDesc.brief = "Backend2Cache"; const auto nShard = task->desc.size(); - UC_DEBUG("Try to load ({}) shards.", nShard); - std::vector backendTaskIndex; - backendTaskIndex.reserve(nShard); - std::vector shardTasks(nShard); for (size_t i = 0; i < nShard; i++) { auto& shard = task->desc[i]; - auto& shardTask = shardTasks[i]; + ShardTask shardTask; shardTask.bufferHandle = buffer_->Get(shard.owner, shard.index); shardTask.backendTaskHandle = 0; if (shardTask.bufferHandle.Owner() && !shardTask.bufferHandle.Ready()) { - backendTaskDesc.push_back( - Detail::Shard{shard.owner, shard.index, {shardTask.bufferHandle.Data()}}); - backendTaskIndex.emplace_back(i); + Detail::TaskDesc backendTask{ + Detail::Shard{shard.owner, shard.index, {shardTask.bufferHandle.Data()}} + }; + backendTask.brief = "Backend2Cache"; + auto res = backend_->Load(std::move(backendTask)); + if (!res) [[unlikely]] { + UC_ERROR("Failed({}) to submit load task({}) to backend.", res.Error(), task->id); + failureSet_->Insert(task->id); + waiter->Done(); + return; + } + shardTask.backendTaskHandle = res.Value(); } shardTask.taskHandle = task->id; shardTask.shard = std::move(shard); shardTask.waiter = (i + 1 < nShard) ? nullptr : waiter; + running_.Push(std::move(shardTask)); } - auto tpMakeBuffer = NowTime::Now(); - if (!backendTaskDesc.empty()) { - auto res = backend_->Load(std::move(backendTaskDesc)); - if (!res) [[unlikely]] { - UC_ERROR("Failed({}) to submit load task({}) to backend.", res.Error(), task->id); - failureSet_->Insert(task->id); - waiter->Done(); - return; - } - for (const auto& i : backendTaskIndex) { shardTasks[i].backendTaskHandle = res.Value(); } - } - for (size_t i = 0; i < nShard; i++) { running_.Push(std::move(shardTasks[i])); } - auto tpBackend = NowTime::Now(); - UC_DEBUG("Cache task({}) wait={:.3f}ms, mk_buf={:.3f}ms, back={:.3f}ms.", task->id, - (tpWait - tp) * 1e3, (tpMakeBuffer - tpWait) * 1e3, (tpBackend - tpMakeBuffer) * 1e3); + auto tpDispatch = NowTime::Now(); + UC_DEBUG("Cache task({}) dispatch shards({}), wait={:.3f}ms, cost={:.3f}ms.", task->id, nShard, + (tpWait - tp) * 1e3, (tpDispatch - tpWait) * 1e3); } void LoadQueue::TransferStage(std::promise& started) @@ -166,24 +158,20 @@ void LoadQueue::TransferOneTask(CopyStream& stream, ShardTask&& task) Status LoadQueue::WaitBackendTaskReady(ShardTask& task) { - if (task.bufferHandle.Ready()) { return Status::OK(); } - if (!task.bufferHandle.Owner()) { - for (;;) { - if (failureSet_->Contains(task.taskHandle)) { return Status::Error(); } - if (task.bufferHandle.Ready()) { return Status::OK(); } - std::this_thread::yield(); - } - } - if (task.backendTaskHandle > finishedBackendTaskHandle_) { + if (task.backendTaskHandle != 0) { auto s = backend_->Wait(task.backendTaskHandle); - finishedBackendTaskHandle_ = task.backendTaskHandle; if (s.Failure()) [[unlikely]] { UC_ERROR("Failed({}) to wait backend({}) for task({}).", s, task.backendTaskHandle, task.taskHandle); return s; } + task.bufferHandle.MarkReady(); + return Status::OK(); + } + while (!task.bufferHandle.Ready()) { + if (failureSet_->Contains(task.taskHandle)) { return Status::Error(); } + std::this_thread::yield(); } - task.bufferHandle.MarkReady(); return Status::OK(); } diff --git a/ucm/store/cache/cc/load_queue.h b/ucm/store/cache/cc/load_queue.h index 6e34ed6b2..1df88e3e5 100644 --- a/ucm/store/cache/cc/load_queue.h +++ b/ucm/store/cache/cc/load_queue.h @@ -51,7 +51,6 @@ class LoadQueue { private: alignas(64) std::atomic_bool stop_{false}; - Detail::TaskHandle finishedBackendTaskHandle_{0}; TaskIdSet* failureSet_{nullptr}; TransBuffer* buffer_{nullptr}; StoreV1* backend_{nullptr}; From 74256e2814dba286ead1eafaa1136993025f2ef6 Mon Sep 17 00:00:00 2001 From: "Mag1c.H" Date: Thu, 2 Apr 2026 10:02:02 +0800 Subject: [PATCH 2/2] [test] Cache on posix hit test --- ucm/store/test/e2e/cache_on_posix_hit_test.py | 112 ++++++++++++++++++ 1 file changed, 112 insertions(+) create mode 100644 ucm/store/test/e2e/cache_on_posix_hit_test.py diff --git a/ucm/store/test/e2e/cache_on_posix_hit_test.py b/ucm/store/test/e2e/cache_on_posix_hit_test.py new file mode 100644 index 000000000..550db0677 --- /dev/null +++ b/ucm/store/test/e2e/cache_on_posix_hit_test.py @@ -0,0 +1,112 @@ +# -*- coding: utf-8 -*- +# +# MIT License +# +# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved. +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +import secrets +import time + +import torch + +from ucm.store.factory_v1 import UcmConnectorFactoryV1, UcmKVStoreBaseV1 + +device_id = 0 +shard_size = 64 * 1024 +shard_number = 64 +block_number = 1024 +storage_backends = ["./build/data"] + + +def create_worker(store_pipeline, unique_id) -> UcmKVStoreBaseV1: + module_path = "ucm.store.pipeline.connector" + class_name = "UcmPipelineStore" + config = {} + config["store_pipeline"] = store_pipeline + config["tensor_size"] = shard_size + config["shard_size"] = shard_size * shard_number + config["block_size"] = shard_size * shard_number + config["device_id"] = device_id + config["unique_id"] = unique_id + config["share_buffer_enable"] = True + config["cache_buffer_capacity_gb"] = 8 + config["storage_backends"] = storage_backends + return UcmConnectorFactoryV1.create_connector(class_name, config, module_path) + + +def make_tensors(device): + return [ + [ + torch.rand([shard_size // 2], dtype=torch.bfloat16, device=device) + for _ in range(shard_number) + ] + for _ in range(block_number) + ] + + +def cmp_and_print_diff(a, b, rtol=0.0, atol=0.0): + for r, (row_a, row_b) in enumerate(zip(a, b)): + for c, (ta, tb) in enumerate(zip(row_a, row_b)): + if not torch.allclose(ta, tb, rtol=rtol, atol=atol): + mask = ~torch.isclose(ta, tb, rtol=rtol, atol=atol) + diff_a = ta[mask].cpu() + diff_b = tb[mask].cpu() + print( + f"DIFF at d{tb.device}[{r}][{c}] total {mask.sum().item()} element(s)" + ) + print(" a val:", diff_a.flatten()) + print(" b val:", diff_b.flatten()) + assert False + + +def dump(worker, block_ids, block_tensors): + shard_indexes = [0 for _ in range(len(block_ids))] + tp = time.perf_counter() + task = worker.dump(block_ids, shard_indexes, block_tensors) + worker.wait(task) + cost = time.perf_counter() - tp + print(f"Dump data({cost * 1e3:.3f}ms) successfullyl: {block_tensors[0][0]}") + + +def load(worker, block_ids): + block_tensors = make_tensors("cuda:{}".format(device_id)) + shard_indexes = [0 for _ in range(len(block_ids))] + tp = time.perf_counter() + task = worker.load(block_ids, shard_indexes, block_tensors) + worker.wait(task) + cost = time.perf_counter() - tp + print(f"Load data({cost * 1e3:.3f}ms) successfullyl: {block_tensors[0][0]}") + return block_tensors + + +if __name__ == "__main__": + unique_id = secrets.token_hex(8) + dumper = create_worker("Posix", unique_id) + loader = create_worker("Cache|Posix", unique_id) + block_ids = [secrets.token_bytes(16) for _ in range(block_number)] + src_tensors = make_tensors("cpu:0") + dump(dumper, block_ids, src_tensors) + hbm_tensors = load(loader, block_ids) + dst_tensors = [[t.to("cpu:0") for t in row] for row in hbm_tensors] + cmp_and_print_diff(src_tensors, dst_tensors) + hbm_tensors = load(loader, block_ids) + dst_tensors = [[t.to("cpu:0") for t in row] for row in hbm_tensors] + cmp_and_print_diff(src_tensors, dst_tensors)