Skip to content

Commit 5ad2ea6

Browse files
committed
[test] Cache on posix hit test
1 parent 1b990bd commit 5ad2ea6

File tree

1 file changed

+112
-0
lines changed

1 file changed

+112
-0
lines changed
Lines changed: 112 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,112 @@
1+
# -*- coding: utf-8 -*-
2+
#
3+
# MIT License
4+
#
5+
# Copyright (c) 2025 Huawei Technologies Co., Ltd. All rights reserved.
6+
#
7+
# Permission is hereby granted, free of charge, to any person obtaining a copy
8+
# of this software and associated documentation files (the "Software"), to deal
9+
# in the Software without restriction, including without limitation the rights
10+
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11+
# copies of the Software, and to permit persons to whom the Software is
12+
# furnished to do so, subject to the following conditions:
13+
#
14+
# The above copyright notice and this permission notice shall be included in all
15+
# copies or substantial portions of the Software.
16+
#
17+
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18+
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19+
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20+
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21+
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22+
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23+
# SOFTWARE.
24+
#
25+
import secrets
26+
import time
27+
28+
import torch
29+
30+
from ucm.store.factory_v1 import UcmConnectorFactoryV1, UcmKVStoreBaseV1
31+
32+
device_id = 0
33+
shard_size = 64 * 1024
34+
shard_number = 27
35+
block_number = 64
36+
storage_backends = ["./build/data"]
37+
38+
39+
def create_worker(store_pipeline, unique_id) -> UcmKVStoreBaseV1:
40+
module_path = "ucm.store.pipeline.connector"
41+
class_name = "UcmPipelineStore"
42+
config = {}
43+
config["store_pipeline"] = store_pipeline
44+
config["tensor_size"] = shard_size
45+
config["shard_size"] = shard_size * shard_number
46+
config["block_size"] = shard_size * shard_number
47+
config["device_id"] = device_id
48+
config["unique_id"] = unique_id
49+
config["share_buffer_enable"] = True
50+
config["cache_buffer_capacity_gb"] = 8
51+
config["storage_backends"] = storage_backends
52+
return UcmConnectorFactoryV1.create_connector(class_name, config, module_path)
53+
54+
55+
def make_tensors(device):
56+
return [
57+
[
58+
torch.rand([shard_size // 2], dtype=torch.bfloat16, device=device)
59+
for _ in range(shard_number)
60+
]
61+
for _ in range(block_number)
62+
]
63+
64+
65+
def cmp_and_print_diff(a, b, rtol=0.0, atol=0.0):
66+
for r, (row_a, row_b) in enumerate(zip(a, b)):
67+
for c, (ta, tb) in enumerate(zip(row_a, row_b)):
68+
if not torch.allclose(ta, tb, rtol=rtol, atol=atol):
69+
mask = ~torch.isclose(ta, tb, rtol=rtol, atol=atol)
70+
diff_a = ta[mask].cpu()
71+
diff_b = tb[mask].cpu()
72+
print(
73+
f"DIFF at d{tb.device}[{r}][{c}] total {mask.sum().item()} element(s)"
74+
)
75+
print(" a val:", diff_a.flatten())
76+
print(" b val:", diff_b.flatten())
77+
assert False
78+
79+
80+
def dump(worker, block_ids, block_tensors):
81+
shard_indexes = [0 for _ in range(len(block_ids))]
82+
tp = time.perf_counter()
83+
task = worker.dump(block_ids, shard_indexes, block_tensors)
84+
worker.wait(task)
85+
cost = time.perf_counter() - tp
86+
print(f"Dump data({cost * 1e3:.3f}ms) successfullyl: {block_tensors[0][0]}")
87+
88+
89+
def load(worker, block_ids):
90+
block_tensors = make_tensors("cuda:{}".format(device_id))
91+
shard_indexes = [0 for _ in range(len(block_ids))]
92+
tp = time.perf_counter()
93+
task = worker.load(block_ids, shard_indexes, block_tensors)
94+
worker.wait(task)
95+
cost = time.perf_counter() - tp
96+
print(f"Load data({cost * 1e3:.3f}ms) successfullyl: {block_tensors[0][0]}")
97+
return block_tensors
98+
99+
100+
if __name__ == "__main__":
101+
unique_id = secrets.token_hex(8)
102+
dumper = create_worker("Posix", unique_id)
103+
loader = create_worker("Cache|Posix", unique_id)
104+
block_ids = [secrets.token_bytes(16) for _ in range(block_number)]
105+
src_tensors = make_tensors("cpu:0")
106+
dump(dumper, block_ids, src_tensors)
107+
hbm_tensors = load(loader, block_ids)
108+
dst_tensors = [[t.to("cpu:0") for t in row] for row in hbm_tensors]
109+
cmp_and_print_diff(src_tensors, dst_tensors)
110+
hbm_tensors = load(loader, block_ids)
111+
dst_tensors = [[t.to("cpu:0") for t in row] for row in hbm_tensors]
112+
cmp_and_print_diff(src_tensors, dst_tensors)

0 commit comments

Comments
 (0)