Skip to content

Commit 64be838

Browse files
CPU core split for vllm worker and store
1 parent 13bfd47 commit 64be838

File tree

1 file changed

+134
-2
lines changed

1 file changed

+134
-2
lines changed

ucm/integration/vllm/ucm_connector.py

Lines changed: 134 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
import math
44
import os
55
import pickle
6+
import subprocess
67
import time
78
from collections import defaultdict
89
from dataclasses import dataclass, field
@@ -275,7 +276,9 @@ def generate_hash(
275276
return ret
276277

277278
def _create_store(
278-
self, kv_cache_layout: Optional[KVCacheLayout]
279+
self,
280+
kv_cache_layout: Optional[KVCacheLayout],
281+
cpu_affinity_cores: Optional[list[int]] = None,
279282
) -> UcmKVStoreBaseV1:
280283
if len(self.connector_configs) != 1:
281284
raise RuntimeError(
@@ -300,9 +303,121 @@ def _create_store(
300303
config["shard_size"] = kv_cache_layout.shard_size * self.blocks_per_chunk
301304
config["block_size"] = kv_cache_layout.block_size * self.blocks_per_chunk
302305
config["local_rank_size"] = self.tp_size if self.is_mla else 1
306+
if cpu_affinity_cores:
307+
config["cpu_affinity_cores"] = list(cpu_affinity_cores)
303308
logger.info(f"create {name} with config: {config}")
304309
return UcmConnectorFactoryV1.create_connector(name, config, module_path)
305310

311+
def split_worker_and_store_cores(self, local_rank, is_cuda: bool):
312+
"""
313+
Split CPU cores into worker/store groups based on NUMA locality.
314+
Strategy:
315+
CUDA:
316+
GPU -> PCI -> NUMA
317+
NPU:
318+
rank/device_id -> NUMA (fallback if PCI not available)
319+
Then:
320+
NUMA -> cpulist -> split per segment
321+
"""
322+
323+
worker_cores, store_cores = [], []
324+
pci_bus_id, numa_node = None, None
325+
326+
try:
327+
# =========================
328+
# CUDA path
329+
# =========================
330+
if is_cuda:
331+
prop = torch.cuda.get_device_properties(local_rank)
332+
pci_bus_id = (
333+
f"{prop.pci_domain_id:04x}:"
334+
f"{prop.pci_bus_id:02x}:"
335+
f"{prop.pci_device_id:02x}.0"
336+
)
337+
338+
numa_path = f"/sys/bus/pci/devices/{pci_bus_id}/numa_node"
339+
if os.path.exists(numa_path):
340+
with open(numa_path) as f:
341+
numa_node = int(f.read().strip())
342+
if numa_node < 0:
343+
numa_node = None
344+
345+
# =========================
346+
# NPU path (Ascend)
347+
# =========================
348+
else:
349+
visible = os.environ.get("ASCEND_RT_VISIBLE_DEVICES") or os.environ.get(
350+
"ASCEND_VISIBLE_DEVICES"
351+
)
352+
353+
device_id = local_rank
354+
if visible:
355+
dev_list = [int(x.strip()) for x in visible.split(",") if x.strip()]
356+
if local_rank < len(dev_list):
357+
device_id = dev_list[local_rank]
358+
359+
if hasattr(self, "npu_to_numa"):
360+
numa_node = self.npu_to_numa.get(device_id)
361+
362+
# fallback: rank -> numa
363+
if numa_node is None:
364+
node_base = "/sys/devices/system/node"
365+
if os.path.exists(node_base):
366+
nodes = sorted(
367+
int(n[4:])
368+
for n in os.listdir(node_base)
369+
if n.startswith("node") and n[4:].isdigit()
370+
)
371+
if nodes:
372+
numa_node = nodes[device_id % len(nodes)]
373+
374+
# =========================
375+
# NUMA -> CPU cores
376+
# =========================
377+
if numa_node is not None:
378+
cpu_list_path = f"/sys/devices/system/node/node{numa_node}/cpulist"
379+
if os.path.exists(cpu_list_path):
380+
with open(cpu_list_path) as f:
381+
cpu_list_str = f.read().strip()
382+
383+
# Split each cpulist segment (e.g. "0-43,88-131") evenly to preserve locality.
384+
for part in cpu_list_str.split(","):
385+
if "-" in part:
386+
a, b = map(int, part.split("-"))
387+
seg = list(range(a, b + 1))
388+
else:
389+
seg = [int(part)]
390+
391+
mid = len(seg) // 2
392+
worker_cores.extend(seg[:mid])
393+
store_cores.extend(seg[mid:])
394+
395+
except Exception as e:
396+
logger.warning(f"[CPU Affinity] NUMA detect failed: {e}")
397+
398+
# =========================
399+
# fallback
400+
# =========================
401+
if not worker_cores:
402+
try:
403+
cores = sorted(os.sched_getaffinity(0))
404+
except Exception:
405+
cores = list(range(os.cpu_count()))
406+
407+
mid = len(cores) // 2
408+
worker_cores = cores[:mid]
409+
store_cores = cores[mid:]
410+
411+
logger.warning(f"[CPU Affinity] fallback cores={cores}")
412+
413+
logger.info(
414+
f"[CPU Affinity] rank={local_rank}, numa={numa_node}, pci={pci_bus_id}\n"
415+
f"[worker_cores] ={worker_cores}\n"
416+
f"[store_cores] ={store_cores}"
417+
)
418+
419+
return worker_cores, store_cores
420+
306421
def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
307422
if has_ucm_sparse() and os.getenv("VLLM_HASH_ATTENTION") == "1":
308423
for layer_name, value in kv_caches.items():
@@ -329,8 +444,25 @@ def register_kv_caches(self, kv_caches: dict[str, torch.Tensor]):
329444
}
330445
self.first_layer_id = next(iter(self.layer_name_to_id.values()))
331446

332-
self.store = self._create_store(self.kv_cache_layout)
447+
is_cuda = self._vllm_config.device_config.device_type == "cuda"
448+
449+
enable_affinity = os.getenv("VLLM_CPU_AFFINITY") == "1"
450+
worker_cores, store_cores = (
451+
self.split_worker_and_store_cores(self.local_rank, is_cuda)
452+
if enable_affinity
453+
else (None, None)
454+
)
455+
456+
self.store = self._create_store(self.kv_cache_layout, store_cores)
333457
self.device = create_device()
458+
459+
if worker_cores:
460+
try:
461+
os.sched_setaffinity(0, worker_cores)
462+
logger.info(f"[VLLM CPU Affinity] Worker bound to cores {worker_cores}")
463+
except Exception as e:
464+
logger.warning(f"Failed to bind worker: {e}")
465+
334466
if self.device is None:
335467
raise RuntimeError(f"Unsupported device platform for UCMDirectConnector.")
336468

0 commit comments

Comments
 (0)