Skip to content

Commit 8bb604b

Browse files
committed
Change tests to use FakeAcceleratorManager, add _resolve_resources, and move record hardware usage to node.py
Signed-off-by: Ryan O'Leary <[email protected]>
1 parent 2c0051c commit 8bb604b

File tree

3 files changed

+163
-157
lines changed

3 files changed

+163
-157
lines changed

python/ray/_private/node.py

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1403,6 +1403,16 @@ def start_ray_processes(self):
14031403
# Make sure we don't call `determine_plasma_store_config` multiple
14041404
# times to avoid printing multiple warnings.
14051405
resource_and_label_spec = self.get_resource_and_label_spec()
1406+
if resource_and_label_spec.labels.get(
1407+
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
1408+
):
1409+
from ray._private.usage import usage_lib
1410+
1411+
usage_lib.record_hardware_usage(
1412+
resource_and_label_spec.labels.get(
1413+
ray._raylet.RAY_NODE_ACCELERATOR_TYPE_KEY
1414+
)
1415+
)
14061416

14071417
(
14081418
plasma_directory,

python/ray/_private/resource_and_label_spec.py

Lines changed: 51 additions & 45 deletions
Original file line numberDiff line numberDiff line change
@@ -136,47 +136,9 @@ def resolve(
136136
ResourceAndLabelSpec: This instance with all fields resolved.
137137
"""
138138

139-
# Load environment override resources and merge with resources passed
140-
# in from Ray Params. Separates special case params if found in env.
141-
env_resources = ResourceAndLabelSpec._load_env_resources()
142-
(
143-
num_cpus,
144-
num_gpus,
145-
memory,
146-
object_store_memory,
147-
merged_resources,
148-
) = ResourceAndLabelSpec._merge_resources(env_resources, self.resources or {})
149-
self.num_cpus = self.num_cpus if num_cpus is None else num_cpus
150-
self.num_gpus = self.num_gpus if num_gpus is None else num_gpus
151-
self.memory = self.memory if memory is None else memory
152-
self.object_store_memory = (
153-
self.object_store_memory
154-
if object_store_memory is None
155-
else object_store_memory
156-
)
157-
self.resources = merged_resources
158-
159-
if node_ip_address is None:
160-
node_ip_address = ray.util.get_node_ip_address()
139+
self._resolve_resources(is_head=is_head, node_ip_address=node_ip_address)
161140

162-
# Automatically create a node id resource on each node. This is
163-
# queryable with ray._private.state.node_ids() and
164-
# ray._private.state.current_node_id().
165-
self.resources[NODE_ID_PREFIX + node_ip_address] = 1.0
166-
167-
# Automatically create a head node resource.
168-
if HEAD_NODE_RESOURCE_NAME in self.resources:
169-
raise ValueError(
170-
f"{HEAD_NODE_RESOURCE_NAME}"
171-
" is a reserved resource name, use another name instead."
172-
)
173-
if is_head:
174-
self.resources[HEAD_NODE_RESOURCE_NAME] = 1.0
175-
176-
if self.num_cpus is None:
177-
self.num_cpus = ray._private.utils.get_num_cpus()
178-
179-
# Resolve accelerator resources
141+
# Resolve accelerator-specific resources
180142
(
181143
accelerator_manager,
182144
num_accelerators,
@@ -196,7 +158,7 @@ def resolve(
196158
self._resolve_memory_resources()
197159

198160
self._is_resolved = True
199-
assert self.all_fields_set()
161+
assert self._all_fields_set()
200162
return self
201163

202164
@staticmethod
@@ -235,6 +197,54 @@ def _merge_resources(env_dict: Dict[str, float], params_dict: Dict[str, float]):
235197

236198
return num_cpus, num_gpus, memory, object_store_memory, result
237199

200+
def _resolve_resources(
201+
self, is_head: bool, node_ip_address: Optional[str] = None
202+
) -> None:
203+
"""Resolve CPU, GPU, and custom resources. Merges resources from environment,
204+
Ray params, and defaults in that order of precedence."""
205+
206+
# Load environment override resources and merge with resources passed
207+
# in from Ray Params. Separates special case params if found in env.
208+
env_resources = ResourceAndLabelSpec._load_env_resources()
209+
(
210+
num_cpus,
211+
num_gpus,
212+
memory,
213+
object_store_memory,
214+
merged_resources,
215+
) = ResourceAndLabelSpec._merge_resources(env_resources, self.resources or {})
216+
217+
self.num_cpus = self.num_cpus if num_cpus is None else num_cpus
218+
self.num_gpus = self.num_gpus if num_gpus is None else num_gpus
219+
self.memory = self.memory if memory is None else memory
220+
self.object_store_memory = (
221+
self.object_store_memory
222+
if object_store_memory is None
223+
else object_store_memory
224+
)
225+
self.resources = merged_resources
226+
227+
if node_ip_address is None:
228+
node_ip_address = ray.util.get_node_ip_address()
229+
230+
# Automatically create a node id resource on each node. This is
231+
# queryable with ray._private.state.node_ids() and
232+
# ray._private.state.current_node_id().
233+
self.resources[NODE_ID_PREFIX + node_ip_address] = 1.0
234+
235+
# Automatically create a head node resource.
236+
if HEAD_NODE_RESOURCE_NAME in self.resources:
237+
raise ValueError(
238+
f"{HEAD_NODE_RESOURCE_NAME}"
239+
" is a reserved resource name, use another name instead."
240+
)
241+
if is_head:
242+
self.resources[HEAD_NODE_RESOURCE_NAME] = 1.0
243+
244+
# Auto-detect CPU count if not explicitly set
245+
if self.num_cpus is None:
246+
self.num_cpus = ray._private.utils.get_num_cpus()
247+
238248
@staticmethod
239249
def _load_env_labels() -> Dict[str, str]:
240250
env_override_labels = {}
@@ -347,10 +357,6 @@ def _resolve_accelerator_resources(self, accelerator_manager, num_accelerators):
347357
accelerator_type = accelerator_manager.get_current_node_accelerator_type()
348358
if accelerator_type:
349359
self.resources[f"{RESOURCE_CONSTRAINT_PREFIX}{accelerator_type}"] = 1
350-
351-
from ray._private.usage import usage_lib
352-
353-
usage_lib.record_hardware_usage(accelerator_type)
354360
additional_resources = (
355361
accelerator_manager.get_current_node_additional_resources()
356362
)

0 commit comments

Comments
 (0)