Skip to content

Commit caf8823

Browse files
refactor v2
1 parent 881dc69 commit caf8823

1 file changed

Lines changed: 90 additions & 70 deletions

File tree

ucm/integration/vllm/device.py

Lines changed: 90 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -45,69 +45,6 @@ def get_cpu_affinity(self, local_rank: int) -> Optional[str]:
4545
"""
4646
pass
4747

48-
def _fallback_cpu_affinity(self, local_rank: int) -> Optional[str]:
49-
"""
50-
Fallback path for both CUDA and NPU:
51-
split current allowed CPUs by local_rank within visible devices.
52-
"""
53-
try:
54-
cores = sorted(os.sched_getaffinity(0))
55-
if not cores:
56-
return None
57-
58-
if current_platform.is_cuda_alike():
59-
visible = os.environ.get("CUDA_VISIBLE_DEVICES")
60-
total_devices = (
61-
len([x.strip() for x in visible.split(",") if x.strip()])
62-
if visible
63-
else torch.cuda.device_count()
64-
)
65-
dev_name = "cuda"
66-
elif current_platform.device_type == "npu":
67-
visible = os.environ.get("ASCEND_RT_VISIBLE_DEVICES") or os.environ.get(
68-
"ASCEND_VISIBLE_DEVICES"
69-
)
70-
total_devices = (
71-
len([x.strip() for x in visible.split(",") if x.strip()])
72-
if visible
73-
else torch.npu.device_count()
74-
)
75-
dev_name = "npu"
76-
else:
77-
return None
78-
79-
if total_devices <= 0 or local_rank < 0 or local_rank >= total_devices:
80-
logger.warning(
81-
f"[CPU Affinity] invalid fallback split: "
82-
f"{dev_name} local_rank={local_rank}, total_devices={total_devices}"
83-
)
84-
return None
85-
86-
base = len(cores) // total_devices
87-
extra = len(cores) % total_devices
88-
89-
start = local_rank * base + min(local_rank, extra)
90-
length = base + (1 if local_rank < extra else 0)
91-
sliced = cores[start : start + length]
92-
93-
if not sliced:
94-
return None
95-
96-
parts = []
97-
s = e = sliced[0]
98-
for c in sliced[1:]:
99-
if c == e + 1:
100-
e = c
101-
else:
102-
parts.append(f"{s}-{e}" if s != e else str(s))
103-
s = e = c
104-
parts.append(f"{s}-{e}" if s != e else str(s))
105-
return ",".join(parts)
106-
107-
except Exception as e:
108-
logger.error(f"fallback cpu affinity failed: {e}")
109-
return None
110-
11148
def split_cores(self, local_rank: int) -> Tuple[List[int], List[int]]:
11249
"""
11350
Shared split logic for both CUDA and NPU.
@@ -203,17 +140,57 @@ def get_cpu_affinity(self, local_rank: int) -> Optional[str]:
203140
if os.path.exists(cpu_list_path):
204141
with open(cpu_list_path) as f:
205142
return f.read().strip()
206-
207143
except Exception as e:
208144
logger.warning(f"get cuda cpu affinity from numa failed: {e}")
209145

210-
cpu_affinity = self._fallback_cpu_affinity(local_rank)
211-
if cpu_affinity:
146+
try:
147+
cores = sorted(os.sched_getaffinity(0))
148+
if not cores:
149+
return None
150+
151+
visible = os.environ.get("CUDA_VISIBLE_DEVICES")
152+
total_devices = (
153+
len([x.strip() for x in visible.split(",") if x.strip()])
154+
if visible
155+
else torch.cuda.device_count()
156+
)
157+
158+
if total_devices <= 0 or local_rank < 0 or local_rank >= total_devices:
159+
logger.warning(
160+
f"[CPU Affinity] invalid cuda fallback split: "
161+
f"local_rank={local_rank}, total_devices={total_devices}"
162+
)
163+
return None
164+
165+
base = len(cores) // total_devices
166+
extra = len(cores) % total_devices
167+
start = local_rank * base + min(local_rank, extra)
168+
length = base + (1 if local_rank < extra else 0)
169+
sliced = cores[start : start + length]
170+
171+
if not sliced:
172+
return None
173+
174+
parts = []
175+
s = e = sliced[0]
176+
for c in sliced[1:]:
177+
if c == e + 1:
178+
e = c
179+
else:
180+
parts.append(f"{s}-{e}" if s != e else str(s))
181+
s = e = c
182+
parts.append(f"{s}-{e}" if s != e else str(s))
183+
184+
cpu_affinity = ",".join(parts)
212185
logger.warning(
213186
f"[CPU Affinity] fallback to sliced allowed CPUs for cuda rank={local_rank}: "
214187
f"{cpu_affinity}"
215188
)
216-
return cpu_affinity
189+
return cpu_affinity
190+
191+
except Exception as e:
192+
logger.error(f"get cuda cpu affinity fallback failed: {e}")
193+
return None
217194

218195

219196
class NpuDevice(Device):
@@ -293,13 +270,56 @@ def get_cpu_affinity(self, local_rank: int) -> Optional[str]:
293270
except Exception as e:
294271
logger.warning(f"get npu cpu affinity from topo failed: {e}")
295272

296-
cpu_affinity = self._fallback_cpu_affinity(local_rank)
297-
if cpu_affinity:
273+
try:
274+
cores = sorted(os.sched_getaffinity(0))
275+
if not cores:
276+
return None
277+
278+
visible = os.environ.get("ASCEND_RT_VISIBLE_DEVICES") or os.environ.get(
279+
"ASCEND_VISIBLE_DEVICES"
280+
)
281+
total_devices = (
282+
len([x.strip() for x in visible.split(",") if x.strip()])
283+
if visible
284+
else torch.npu.device_count()
285+
)
286+
287+
if total_devices <= 0 or local_rank < 0 or local_rank >= total_devices:
288+
logger.warning(
289+
f"[CPU Affinity] invalid npu fallback split: "
290+
f"local_rank={local_rank}, total_devices={total_devices}"
291+
)
292+
return None
293+
294+
base = len(cores) // total_devices
295+
extra = len(cores) % total_devices
296+
start = local_rank * base + min(local_rank, extra)
297+
length = base + (1 if local_rank < extra else 0)
298+
sliced = cores[start : start + length]
299+
300+
if not sliced:
301+
return None
302+
303+
parts = []
304+
s = e = sliced[0]
305+
for c in sliced[1:]:
306+
if c == e + 1:
307+
e = c
308+
else:
309+
parts.append(f"{s}-{e}" if s != e else str(s))
310+
s = e = c
311+
parts.append(f"{s}-{e}" if s != e else str(s))
312+
313+
cpu_affinity = ",".join(parts)
298314
logger.warning(
299315
f"[CPU Affinity] fallback to sliced allowed CPUs for npu rank={local_rank}: "
300316
f"{cpu_affinity}"
301317
)
302-
return cpu_affinity
318+
return cpu_affinity
319+
320+
except Exception as e:
321+
logger.error(f"get npu cpu affinity fallback failed: {e}")
322+
return None
303323

304324

305325
def create_device() -> Optional[Device]:

0 commit comments

Comments
 (0)