Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions data_juicer/utils/process_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,21 @@ def calculate_ray_np(operators):
total_gpu = ray_gpu_count()
available_mem = sum(ray_available_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB
available_gpu_mem = sum(ray_available_gpu_memories()) * _OPS_MEMORY_LIMIT_FRACTION / 1024 # Convert MB to GB

# Validate cluster resources to prevent divide-by-zero
if total_cpu == 0:
raise RuntimeError(
"Ray cluster has no CPU resources available (ray_cpu_count() returned 0). "
"This typically indicates the Ray cluster is not properly initialized. "
"Please ensure the Ray cluster has active worker nodes."
)

if available_mem == 0:
raise RuntimeError(
"Ray cluster has no memory resources available. "
"Please verify the Ray cluster status with ray.cluster_resources()."
)
Comment on lines +247 to +258
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

For a better user experience, it's good practice to collect all validation errors and report them together. This allows the user to see all resource issues at once, rather than fixing them one by one. You can accumulate error messages in a list and raise a single RuntimeError if any issues are found.

Suggested change
if total_cpu == 0:
raise RuntimeError(
"Ray cluster has no CPU resources available (ray_cpu_count() returned 0). "
"This typically indicates the Ray cluster is not properly initialized. "
"Please ensure the Ray cluster has active worker nodes."
)
if available_mem == 0:
raise RuntimeError(
"Ray cluster has no memory resources available. "
"Please verify the Ray cluster status with ray.cluster_resources()."
)
errors = []
if total_cpu == 0:
errors.append(
"Ray cluster has no CPU resources available (ray_cpu_count() returned 0). "
"This typically indicates the Ray cluster is not properly initialized. "
"Please ensure the Ray cluster has active worker nodes."
)
if available_mem == 0:
errors.append(
"Ray cluster has no memory resources available. "
"Please verify the Ray cluster status with ray.cluster_resources()."
)
if errors:
raise RuntimeError('\n'.join(errors))


resource_configs = {}

for op_idx, op in enumerate(operators):
Expand All @@ -268,6 +283,18 @@ def calculate_ray_np(operators):
cpu_required_frac, gpu_required_frac = 0, 0
# GPU operator calculations
if op.use_cuda():
if total_gpu == 0:
raise RuntimeError(
f"Op[{op._name}] requires GPU but no GPUs are available in Ray cluster "
"(ray_gpu_count() returned 0). "
"Please ensure GPU nodes are configured in the Ray cluster."
)
if available_gpu_mem == 0:
raise RuntimeError(
f"Op[{op._name}] requires GPU but no GPU memory is available. "
"Please verify GPU nodes are properly configured."
)
Comment on lines +286 to +296
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Similar to the previous comment, it would be more user-friendly to check for all GPU-related resource issues at once and report them together. This provides a more comprehensive error message if both GPUs and GPU memory are unavailable.

Suggested change
if total_gpu == 0:
raise RuntimeError(
f"Op[{op._name}] requires GPU but no GPUs are available in Ray cluster "
"(ray_gpu_count() returned 0). "
"Please ensure GPU nodes are configured in the Ray cluster."
)
if available_gpu_mem == 0:
raise RuntimeError(
f"Op[{op._name}] requires GPU but no GPU memory is available. "
"Please verify GPU nodes are properly configured."
)
gpu_errors = []
if total_gpu == 0:
gpu_errors.append("no GPUs are available in Ray cluster (ray_gpu_count() returned 0)")
if available_gpu_mem == 0:
gpu_errors.append("no GPU memory is available")
if gpu_errors:
error_details = " and ".join(gpu_errors)
raise RuntimeError(
f"Op[{op._name}] requires GPU but {error_details}. "
"Please ensure GPU nodes are properly configured in the Ray cluster."
)


gpu_req = op.num_gpus
gpu_mem_req = op.memory
if not gpu_req and not gpu_mem_req:
Expand Down