Skip to content

Update --labels and add --labels-file options for Label Selector API #51706

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 20 commits into from
Apr 4, 2025
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 79 additions & 15 deletions python/ray/_private/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import threading
import time
import warnings
import yaml
from inspect import signature
from pathlib import Path
from subprocess import list2cmdline
Expand Down Expand Up @@ -82,6 +83,14 @@
# Match the standard alphabet used for UUIDs.
RANDOM_STRING_ALPHABET = string.ascii_lowercase + string.digits

# Regex patterns used to validate that labels conform to Kubernetes label syntax rules.
# Regex for optional prefix (DNS subdomain)
LABEL_PREFIX_REGEX = (
r"^([a-z0-9]([-a-z0-9]*[a-z0-9])?(\.[a-z0-9]([-a-z0-9]*[a-z0-9])?)*)$"
)
# Regex for mandatory name (label key without prefix) or value
LABEL_REGEX = r"^[a-zA-Z0-9]([a-zA-Z0-9_.-]*[a-zA-Z0-9])?$"


def get_random_alphanumeric_string(length: int):
"""Generates random string of length consisting exclusively of
Expand Down Expand Up @@ -1803,27 +1812,58 @@ def update_envs(env_vars: Dict[str, str]):
os.environ[key] = result


def parse_node_labels_json(
labels_json: str, cli_logger, cf, command_arg="--labels"
def parse_node_labels_string(
labels_str: str, cli_logger, cf, command_arg="--labels"
) -> Dict[str, str]:
try:
labels = json.loads(labels_json)
if not isinstance(labels, dict):
raise ValueError(
"The format after deserialization is not a key-value pair map"
)
for key, value in labels.items():
if not isinstance(key, str):
raise ValueError("The key is not string type.")
if not isinstance(value, str):
raise ValueError(f'The value of the "{key}" is not string type')
labels = {}
if labels_str == "":
return labels
# Labels argument should consist of a string of key=value pairs
# separated by commas. Labels follow Kubernetes label syntax.
label_pairs = labels_str.split(",")
for pair in label_pairs:
# Split each pair by `=`
key_value = pair.split("=")
if len(key_value) != 2:
raise ValueError("Label value is not a key-value pair.")
key = key_value[0].strip()
value = key_value[1].strip()
labels[key] = value
except Exception as e:
cli_logger.abort(
"`{}` is not a valid string of key-value pairs, detail error:{}"
"Valid values look like this: `{}`",
cf.bold(f"{command_arg}={labels_str}"),
str(e),
cf.bold(f'{command_arg}="key1=val1,key2=val2"'),
)
return labels


def parse_node_labels_from_yaml_file(
path: str, cli_logger, cf, command_arg="--labels-file"
) -> Dict[str, str]:
try:
with open(path, "r") as file:
# Expects valid YAML content
labels = yaml.safe_load(file)
if not isinstance(labels, dict):
raise ValueError(
"The format after deserialization is not a key-value pair map"
)
for key, value in labels.items():
if not isinstance(key, str):
raise ValueError("The key is not string type.")
if not isinstance(value, str):
raise ValueError(f'The value of the "{key}" is not string type')
except Exception as e:
cli_logger.abort(
"`{}` is not a valid JSON string, detail error:{}"
"The file at `{}` is not a valid YAML file, detail error:{}"
"Valid values look like this: `{}`",
cf.bold(f"{command_arg}={labels_json}"),
cf.bold(f"{command_arg}={path}"),
str(e),
cf.bold(f'{command_arg}=\'{{"gpu_type": "A100", "region": "us"}}\''),
cf.bold(f"{command_arg}='gpu_type: A100\nregion: us'"),
)
return labels

Expand All @@ -1838,6 +1878,30 @@ def validate_node_labels(labels: Dict[str, str]):
f"`{ray_constants.RAY_DEFAULT_LABEL_KEYS_PREFIX}`. "
f"This is reserved for Ray defined labels."
)
if "/" in key:
prefix, name = key.rsplit("/")
if len(prefix) > 253 or not re.match(LABEL_PREFIX_REGEX, prefix):
raise ValueError(
f"Invalid label key prefix `{prefix}`. Prefix must be a series of DNS labels "
f"separated by dots (.),not longer than 253 characters in total."
)
else:
name = key
if len(name) > 63 or not re.match(LABEL_REGEX, name):
raise ValueError(
f"Invalid label key name `{name}`. Name must be 63 chars or less beginning and ending "
f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
f"dots (.), and alphanumerics between."
)
value = labels.get(key)
if value is None or value == "":
return
if len(value) > 63 or not re.match(LABEL_REGEX, value):
raise ValueError(
f"Invalid label key value `{value}`. Value must be 63 chars or less beginning and ending "
f"with an alphanumeric character ([a-z0-9A-Z]) with dashes (-), underscores (_),"
f"dots (.), and alphanumerics between."
)


def parse_pg_formatted_resources_to_original(
Expand Down
29 changes: 25 additions & 4 deletions python/ray/scripts/scripts.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,8 @@
check_ray_client_dependencies_installed,
load_class,
parse_resources_json,
parse_node_labels_json,
parse_node_labels_from_yaml_file,
parse_node_labels_string,
)
from ray._private.internal_api import memory_summary
from ray._private.usage import usage_lib
Expand Down Expand Up @@ -629,9 +630,17 @@ def debug(address: str, verbose: bool):
"--labels",
required=False,
hidden=True,
default="{}",
default="",
type=str,
help="a JSON serialized dictionary mapping label name to label value.",
help="a string list of key-value pairs mapping label name to label value.",
)
@click.option(
"--labels-file",
required=False,
hidden=True,
default="",
type=str,
help="a path to a YAML file containing a serialized dictionary mapping of labels.",
)
@click.option(
"--include-log-monitor",
Expand Down Expand Up @@ -688,6 +697,7 @@ def start(
ray_debugger_external,
disable_usage_stats,
labels,
labels_file,
include_log_monitor,
):
"""Start Ray processes manually on the local machine."""
Expand All @@ -708,7 +718,18 @@ def start(
node_ip_address = services.resolve_ip_for_localhost(node_ip_address)

resources = parse_resources_json(resources, cli_logger, cf)
labels_dict = parse_node_labels_json(labels, cli_logger, cf)

# Compose labels passed in with `--labels` and `--labels-from-file`.
# The label value from `--labels` will overrwite the value of any duplicate keys.
labels_from_file_dict = parse_node_labels_from_yaml_file(
labels_file, cli_logger, cf
)
labels_from_string = parse_node_labels_string(labels, cli_logger, cf)
labels_dict = (
{**labels_from_file_dict, **labels_from_string}
if labels_from_file_dict
else labels_from_string
)

if plasma_store_socket_name is not None:
warnings.warn(
Expand Down
107 changes: 106 additions & 1 deletion python/ray/tests/test_utils.py
Original file line number Diff line number Diff line change
@@ -1,19 +1,25 @@
# coding: utf-8
"""
Unit/Integration Testing for python/_private/utils.py
Unit/Integration Testing for python/ray/_private/utils.py

This currently expects to work for minimal installs.
"""

import click
import pytest
import logging
from ray._private.utils import (
parse_pg_formatted_resources_to_original,
try_import_each_module,
get_current_node_cpu_model_name,
parse_node_labels_string,
parse_node_labels_from_yaml_file,
validate_node_labels,
)
from ray.autoscaler._private.cli_logger import cf, cli_logger
from unittest.mock import patch, mock_open
import sys
import tempfile

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -71,6 +77,105 @@ def test_parse_pg_formatted_resources():
assert out == {"CPU": 1, "memory": 100}


def test_parse_node_labels_from_string():
# Empty label argument passed
labels_string = ""
labels_dict = parse_node_labels_string(labels_string, cli_logger, cf)
assert labels_dict == {}

# Valid label key with empty value
labels_string = "region="
labels_dict = parse_node_labels_string(labels_string, cli_logger, cf)
assert labels_dict == {"region": ""}

# Multiple valid label keys and values
labels_string = "ray.io/accelerator-type=A100,region=us-west4"
labels_dict = parse_node_labels_string(labels_string, cli_logger, cf)
assert labels_dict == {"ray.io/accelerator-type": "A100", "region": "us-west4"}

# Invalid label
labels_string = "ray.io/accelerator-type=type=A100"
with pytest.raises(click.exceptions.ClickException) as e:
parse_node_labels_string(labels_string, cli_logger, cf)
assert "is not a valid string of key-value pairs" in str(e)


def test_parse_node_labels_from_yaml_file():
# Empty/invalid yaml
with tempfile.NamedTemporaryFile(mode="w+", delete=True) as test_file:
test_file.write("")
test_file.flush() # Ensure data is written
with pytest.raises(click.exceptions.ClickException) as e:
parse_node_labels_from_yaml_file(test_file.name, cli_logger, cf)
assert "is not a valid YAML string" in str(e)

# Valid label key with empty value
with tempfile.NamedTemporaryFile(mode="w+", delete=True) as test_file:
test_file.write('"ray.io/accelerator-type": ""')
test_file.flush() # Ensure data is written
labels_dict = parse_node_labels_from_yaml_file(test_file.name, cli_logger, cf)
assert labels_dict == {"ray.io/accelerator-type": ""}

# Multiple valid label keys and values
with tempfile.NamedTemporaryFile(mode="w+", delete=True) as test_file:
test_file.write(
'"ray.io/accelerator-type": "A100"\n"region": "us"\n"market-type": "spot"'
)
test_file.flush() # Ensure data is written
labels_dict = parse_node_labels_from_yaml_file(test_file.name, cli_logger, cf)
assert labels_dict == {
"ray.io/accelerator-type": "A100",
"region": "us",
"market-type": "spot",
}

# Non-string label key
with tempfile.NamedTemporaryFile(mode="w+", delete=True) as test_file:
test_file.write('{100: "A100"}')
test_file.flush() # Ensure data is written
with pytest.raises(click.exceptions.ClickException) as e:
parse_node_labels_from_yaml_file(test_file.name, cli_logger, cf)
assert "is not a valid YAML string" in str(e)

# Non-string label value
with tempfile.NamedTemporaryFile(mode="w+", delete=True) as test_file:
test_file.write('{100: "A100"}')
test_file.flush() # Ensure data is written
with pytest.raises(click.exceptions.ClickException) as e:
parse_node_labels_from_yaml_file(test_file.name, cli_logger, cf)
assert "is not a valid YAML string" in str(e)


def test_validate_node_labels():
# Custom label starts with ray.io prefix
labels_dict = {"ray.io/accelerator-type": "A100"}
with pytest.raises(ValueError) as e:
validate_node_labels(labels_dict)
assert "This is reserved for Ray defined labels." in str(e)

# Invalid key prefix syntax
labels_dict = {"invalidPrefix/accelerator-type": "A100"}
with pytest.raises(ValueError) as e:
validate_node_labels(labels_dict)
assert "Invalid label key prefix" in str(e)

# Invalid key name syntax
labels_dict = {"!!accelerator-type?": "A100"}
with pytest.raises(ValueError) as e:
validate_node_labels(labels_dict)
assert "Invalid label key name" in str(e)

# Invalid key value syntax
labels_dict = {"accelerator-type": "??"}
with pytest.raises(ValueError) as e:
validate_node_labels(labels_dict)
assert "Invalid label key value" in str(e)

# Valid node label
labels_dict = {"accelerator-type": "A100"}
validate_node_labels(labels_dict)


@pytest.mark.skipif(
not sys.platform.startswith("linux"), reason="Doesn't support non-linux"
)
Expand Down