Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feat][Core] Re-implement dashboard subprocess modules with aiohttp server/client and converts HealthzHead #51172

Draft
wants to merge 54 commits into
base: master
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
54 commits
Select commit Hold shift + click to select a range
2b6b729
initial
rynewang Jan 7, 2025
a945a42
split
rynewang Jan 13, 2025
d161548
redo health check with regular messages, now it takes a request ID.
rynewang Jan 13, 2025
5f17eba
fix tests and default routes
rynewang Jan 13, 2025
d994687
streaming support (without exc handling)
rynewang Jan 13, 2025
08bb903
Merge remote-tracking branch 'origin/master' into dash-procs
rynewang Jan 13, 2025
6a435cf
Update python/ray/dashboard/subprocesses/module.py
rynewang Jan 14, 2025
9c8272d
thread safe dict
rynewang Jan 14, 2025
4b43962
import bad
rynewang Jan 14, 2025
0075c8d
rename methods
rynewang Jan 14, 2025
ac7e0b9
add logging
rynewang Jan 14, 2025
b155df5
rename and new event loop
rynewang Jan 14, 2025
f0dc835
fix
rynewang Jan 14, 2025
2defd09
exception handling in stream
rynewang Jan 15, 2025
5b85a36
fix test
rynewang Jan 15, 2025
6cad47b
Merge remote-tracking branch 'origin/master' into dash-procs
rynewang Jan 15, 2025
06381d8
lint
rynewang Jan 16, 2025
5102da0
fix test
rynewang Jan 16, 2025
eede87b
Merge remote-tracking branch 'origin/master' into dash-procs
rynewang Jan 16, 2025
f1d37ce
health check.
rynewang Jan 16, 2025
0e81b81
docs
rynewang Jan 16, 2025
2aff055
declare ourselves non minimal.
rynewang Jan 17, 2025
03228a9
add gandalf the grey, and some renames
rynewang Jan 17, 2025
31ea86e
make test log more robust by removing lineno
rynewang Jan 18, 2025
9e28dc9
type annotations, docs
rynewang Jan 19, 2025
322ef0f
add to head.py
rynewang Jan 15, 2025
9df07aa
support minimal
rynewang Jan 17, 2025
276be83
make HealthzHead non minimal.
rynewang Jan 17, 2025
18b1288
Merge remote-tracking branch 'origin/master' into dashboard-procs-int…
rynewang Jan 22, 2025
dd6b2ad
fix
rynewang Jan 22, 2025
d6cccdf
remove optional comments
rynewang Jan 22, 2025
dadb79f
incarnation aware logs
rynewang Jan 22, 2025
89d14a0
fix abstract class, and a multi_get optimization
rynewang Jan 22, 2025
d4dbeb9
fix tests
rynewang Jan 23, 2025
e36a2c9
add thread name
rynewang Jan 23, 2025
be12cc6
rename health_check
rynewang Jan 23, 2025
80c8db1
reorg to keep module init in a single async func
rynewang Jan 23, 2025
be8e80b
kill subproc on destroy, setproctitle
rynewang Jan 24, 2025
a71d17e
signum, frame
rynewang Jan 24, 2025
34886f7
add parent process liveness check
rynewang Jan 24, 2025
e47c4a9
fix tests
rynewang Jan 25, 2025
5b9fe6a
spawn for linux
rynewang Jan 25, 2025
e1862bb
Merge branch 'master' into dashboard-procs-integration
rynewang Jan 25, 2025
323e0e7
lint
rynewang Jan 25, 2025
eaced59
[Feat][Core] Re-implement dashboard subprocess modules with aiohttp s…
MortalHappiness Mar 7, 2025
defa8a4
WIP: test
MortalHappiness Mar 7, 2025
b6f7719
Remove ThreadSafeDict
MortalHappiness Mar 7, 2025
f2500af
Add repr for BindInfo
MortalHappiness Mar 7, 2025
4c63abe
Fix route binding
MortalHappiness Mar 7, 2025
5f4a4cf
Fix http tests
MortalHappiness Mar 7, 2025
354af07
Fix remaining http tests except for websocket tests
MortalHappiness Mar 7, 2025
1674de8
Support stream response
MortalHappiness Mar 8, 2025
c7f1602
Support websocket response
MortalHappiness Mar 10, 2025
6d04c69
Add tests for websocket response
MortalHappiness Mar 10, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 20 additions & 0 deletions python/ray/_private/node.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import json
import logging
import os
import pathlib
import random
import signal
import socket
Expand Down Expand Up @@ -485,6 +486,11 @@ def _init_temp(self):
# Create a directory to be used for socket files.
self._sockets_dir = os.path.join(self._session_dir, "sockets")
try_to_create_directory(self._sockets_dir)
# Create a directory to be used for dashboard socket files
dashboard_socket_dir = os.path.join(
self._sockets_dir, ray_constants.RAY_DASHBOARD_SOCKET_DIR
)
try_to_create_directory(dashboard_socket_dir)
# Create a directory to be used for process log files.
self._logs_dir = os.path.join(self._session_dir, "logs")
try_to_create_directory(self._logs_dir)
Expand Down Expand Up @@ -988,6 +994,20 @@ def _prepare_socket_file(self, socket_path: str, default_prefix: str):
Args:
socket_path: the socket file to prepare.
"""
ray_dashboard_sockets_dir = (
pathlib.PurePath(self._sockets_dir) / ray_constants.RAY_DASHBOARD_SOCKET_DIR
)
if socket_path is not None:
if pathlib.PurePath(socket_path).is_relative_to(ray_dashboard_sockets_dir):
raise ValueError(
f"The socket path {socket_path} cannot be {ray_dashboard_sockets_dir} or use it as parent folder."
)
else:
if default_prefix == ray_constants.RAY_DASHBOARD_SOCKET_DIR:
raise ValueError(
f"The socket path cannot be {ray_dashboard_sockets_dir} or use it as parent folder."
)

result = socket_path
is_mac = sys.platform.startswith("darwin")
if sys.platform == "win32":
Expand Down
4 changes: 4 additions & 0 deletions python/ray/_private/ray_constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -548,3 +548,7 @@ def gcs_actor_scheduling_enabled():
)

RAY_EXPORT_EVENT_MAX_BACKUP_COUNT = env_bool("RAY_EXPORT_EVENT_MAX_BACKUP_COUNT", 20)

# The directory where the sockets for dashboard modules are stored.
# It's a sub-folder under the /tmp/ray/session_latest/sockets/ folder.
RAY_DASHBOARD_SOCKET_DIR = "dashboard"
20 changes: 10 additions & 10 deletions python/ray/_private/usage/usage_lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -561,8 +561,8 @@ def get_total_num_nodes_to_report(gcs_client, timeout=None) -> Optional[int]:
if node_info.state == gcs_pb2.GcsNodeInfo.GcsNodeState.ALIVE:
total_num_nodes += 1
return total_num_nodes
except Exception as e:
logger.info(f"Faile to query number of nodes in the cluster: {e}")
except Exception:
logger.exception("Faile to query number of nodes in the cluster")
return None


Expand Down Expand Up @@ -592,25 +592,25 @@ def get_extra_usage_tags_to_report(gcs_client) -> Dict[str, str]:
for kv in kvs:
k, v = kv.split("=")
extra_usage_tags[k] = v
except Exception as e:
logger.info(f"Failed to parse extra usage tags env var. Error: {e}")
except Exception:
logger.exception("Failed to parse extra usage tags env var")

valid_tag_keys = [tag_key.lower() for tag_key in TagKey.keys()]
try:
keys = gcs_client.internal_kv_keys(
usage_constant.EXTRA_USAGE_TAG_PREFIX.encode(),
namespace=usage_constant.USAGE_STATS_NAMESPACE.encode(),
)
for key in keys:
value = gcs_client.internal_kv_get(
key, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
)
kv = gcs_client.internal_kv_multi_get(
keys, namespace=usage_constant.USAGE_STATS_NAMESPACE.encode()
)
for key, value in kv.items():
key = key.decode("utf-8")
key = key[len(usage_constant.EXTRA_USAGE_TAG_PREFIX) :]
assert key in valid_tag_keys
extra_usage_tags[key] = value.decode("utf-8")
except Exception as e:
logger.info(f"Failed to get extra usage tags from kv store {e}")
except Exception:
logger.exception("Failed to get extra usage tags from kv store")
return extra_usage_tags


Expand Down
17 changes: 16 additions & 1 deletion python/ray/dashboard/dashboard.py
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,12 @@ def __init__(
cluster_id_hex: str,
grpc_port: int,
node_ip_address: str,
log_dir: str = None,
log_dir: str,
logging_level: int,
logging_format: str,
logging_filename: str,
logging_rotate_bytes: int,
logging_rotate_backup_count: int,
temp_dir: str = None,
session_dir: str = None,
minimal: bool = False,
Expand All @@ -67,6 +72,11 @@ def __init__(
node_ip_address=node_ip_address,
grpc_port=grpc_port,
log_dir=log_dir,
logging_level=logging_level,
logging_format=logging_format,
logging_filename=logging_filename,
logging_rotate_bytes=logging_rotate_bytes,
logging_rotate_backup_count=logging_rotate_backup_count,
temp_dir=temp_dir,
session_dir=session_dir,
minimal=minimal,
Expand Down Expand Up @@ -232,6 +242,11 @@ async def run(self):
grpc_port=args.grpc_port,
node_ip_address=args.node_ip_address,
log_dir=args.log_dir,
logging_level=args.logging_level,
logging_format=args.logging_format,
logging_filename=args.logging_filename,
logging_rotate_bytes=args.logging_rotate_bytes,
logging_rotate_backup_count=args.logging_rotate_backup_count,
temp_dir=args.temp_dir,
session_dir=args.session_dir,
minimal=args.minimal,
Expand Down
160 changes: 136 additions & 24 deletions python/ray/dashboard/head.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@
import logging
from concurrent.futures import ThreadPoolExecutor
from pathlib import Path
from typing import Optional, Set
from typing import Optional, Set, List, Tuple

import ray
import ray.dashboard.consts as dashboard_consts
import ray.dashboard.utils as dashboard_utils
import ray.experimental.internal_kv as internal_kv
Expand All @@ -27,6 +28,11 @@
prometheus_client = None


# Type alias for SubprocessModuleHandle
# We can't import SubprocessModuleHandle because it's non-minimal only, and importing it
# in minimal Ray causes ImportError.
TypeForSubprocessModuleHandle = object

logger = logging.getLogger(__name__)

GRPC_CHANNEL_OPTIONS = (
Expand Down Expand Up @@ -70,6 +76,11 @@ def __init__(
node_ip_address: str,
grpc_port: int,
log_dir: str,
logging_level: int,
logging_format: str,
logging_filename: str,
logging_rotate_bytes: int,
logging_rotate_backup_count: int,
temp_dir: str,
session_dir: str,
minimal: bool,
Expand All @@ -83,6 +94,11 @@ def __init__(
http_port_retries: The maximum retry to bind ports for the Http server.
gcs_address: The GCS address in the {address}:{port} format.
log_dir: The log directory. E.g., /tmp/session_latest/logs.
logging_level: The logging level (e.g. logging.INFO, logging.DEBUG)
logging_format: The format string for log messages
logging_filename: The name of the log file
logging_rotate_bytes: Max size in bytes before rotating log file
logging_rotate_backup_count: Number of backup files to keep when rotating
temp_dir: The temp directory. E.g., /tmp.
session_dir: The session directory. E.g., tmp/session_latest.
minimal: Whether or not it will load the minimal modules.
Expand Down Expand Up @@ -117,6 +133,11 @@ def __init__(
self.gcs_address = gcs_address
self.cluster_id_hex = cluster_id_hex
self.log_dir = log_dir
self.logging_level = logging_level
self.logging_format = logging_format
self.logging_filename = logging_filename
self.logging_rotate_bytes = logging_rotate_bytes
self.logging_rotate_backup_count = logging_rotate_backup_count
self.temp_dir = temp_dir
self.session_dir = session_dir
self.session_name = Path(session_dir).name
Expand All @@ -137,7 +158,11 @@ def __init__(
# be configured to expose APIs.
self.http_server = None

async def _configure_http_server(self, modules):
async def _configure_http_server(
self,
dashboard_head_modules: List[DashboardHeadModule],
subprocess_module_handles: List[TypeForSubprocessModuleHandle],
):
from ray.dashboard.http_server_head import HttpServerDashboardHead

self.http_server = HttpServerDashboardHead(
Expand All @@ -149,7 +174,7 @@ async def _configure_http_server(self, modules):
self.session_name,
self.metrics,
)
await self.http_server.run(modules)
await self.http_server.run(dashboard_head_modules, subprocess_module_handles)

@property
def http_session(self):
Expand All @@ -173,8 +198,41 @@ async def _gcs_check_alive(self):
except Exception:
logger.warning("Failed to check gcs aliveness, will retry", exc_info=True)

def _load_modules(self, modules_to_load: Optional[Set[str]] = None):
"""Load dashboard head modules.
def _load_modules(
self, modules_to_load: Optional[Set[str]] = None
) -> Tuple[List[DashboardHeadModule], List[TypeForSubprocessModuleHandle]]:
"""
If minimal, only load DashboardHeadModule.
If non-minimal, load both kinds of modules: DashboardHeadModule, SubprocessModule.

If modules_to_load is not None, only load the modules in the set.
"""
dashboard_head_modules = self._load_dashboard_head_modules(modules_to_load)
subprocess_module_handles = self._load_subprocess_module_handles(
modules_to_load
)

all_names = {type(m).__name__ for m in dashboard_head_modules} | {
h.module_cls.__name__ for h in subprocess_module_handles
}
assert len(all_names) == len(dashboard_head_modules) + len(
subprocess_module_handles
), "Duplicate module names. A module name can't be a DashboardHeadModule and a SubprocessModule at the same time."

# Verify modules are loaded as expected.
if modules_to_load is not None and all_names != modules_to_load:
assert False, (
f"Actual loaded modules {all_names}, doesn't match the requested modules "
f"to load, {modules_to_load}."
)

self._modules_loaded = True
return dashboard_head_modules, subprocess_module_handles

def _load_dashboard_head_modules(
self, modules_to_load: Optional[Set[str]] = None
) -> List[DashboardHeadModule]:
"""Load `DashboardHeadModule`s.

Args:
modules: A list of module names to load. By default (None),
Expand All @@ -198,27 +256,77 @@ def _load_modules(self, modules_to_load: Optional[Set[str]] = None):
)

# Select modules to load.
modules_to_load = modules_to_load or {m.__name__ for m in head_cls_list}
logger.info("Modules to load: %s", modules_to_load)
if modules_to_load is not None:
head_cls_list = [
cls for cls in head_cls_list if cls.__name__ in modules_to_load
]

for cls in head_cls_list:
logger.info("Loading %s: %s", DashboardHeadModule.__name__, cls)
if cls.__name__ in modules_to_load:
c = cls(config)
modules.append(c)
logger.info(f"DashboardHeadModules to load: {modules_to_load}.")

# Verify modules are loaded as expected.
loaded_modules = {type(m).__name__ for m in modules}
if loaded_modules != modules_to_load:
assert False, (
"Actual loaded modules, {}, doesn't match the requested modules "
"to load, {}".format(loaded_modules, modules_to_load)
)
for cls in head_cls_list:
logger.info(f"Loading {DashboardHeadModule.__name__}: {cls}.")
c = cls(config)
modules.append(c)

self._modules_loaded = True
logger.info("Loaded %d modules. %s", len(modules), modules)
logger.info(f"Loaded {len(modules)} dashboard head modules: {modules}.")
return modules

def _load_subprocess_module_handles(
self, modules_to_load: Optional[Set[str]] = None
) -> List[TypeForSubprocessModuleHandle]:
"""
If minimal, return an empty list.
If non-minimal, load `SubprocessModule`s by creating Handles to them.

Args:
modules: A list of module names to load. By default (None),
it loads all modules.
"""
if self.minimal:
logger.info("Subprocess modules not loaded in minimal mode.")
return []

from ray.dashboard.subprocesses.module import (
SubprocessModule,
SubprocessModuleConfig,
)
from ray.dashboard.subprocesses.handle import SubprocessModuleHandle

handles = []
subprocess_cls_list = dashboard_utils.get_all_modules(SubprocessModule)

loop = ray._private.utils.get_or_create_event_loop()
config = SubprocessModuleConfig(
cluster_id_hex=self.cluster_id_hex,
gcs_address=self.gcs_address,
logging_level=self.logging_level,
logging_format=self.logging_format,
log_dir=self.log_dir,
logging_filename=self.logging_filename,
logging_rotate_bytes=self.logging_rotate_bytes,
logging_rotate_backup_count=self.logging_rotate_backup_count,
socket_dir=str(
Path(self.session_dir)
/ "sockets"
/ ray_constants.RAY_DASHBOARD_SOCKET_DIR
),
)

# Select modules to load.
if modules_to_load is not None:
subprocess_cls_list = [
cls for cls in subprocess_cls_list if cls.__name__ in modules_to_load
]

for cls in subprocess_cls_list:
logger.info(f"Loading {SubprocessModule.__name__}: {cls}.")
handle = SubprocessModuleHandle(loop, cls, config)
handle.start_module()
handles.append(handle)

logger.info(f"Loaded {len(handles)} subprocess modules: {handles}.")
return handles

async def _setup_metrics(self, gcs_aio_client):
metrics = DashboardPrometheusMetrics()

Expand Down Expand Up @@ -293,12 +401,16 @@ async def _async_notify():
except Exception:
logger.exception(f"Error notifying coroutine {co}")

modules = self._load_modules(self._modules_to_load)
dashboard_head_modules, subprocess_module_handles = self._load_modules(
self._modules_to_load
)

http_host, http_port = self.http_host, self.http_port
if self.serve_frontend:
logger.info("Initialize the http server.")
await self._configure_http_server(modules)
await self._configure_http_server(
dashboard_head_modules, subprocess_module_handles
)
http_host, http_port = self.http_server.get_address()
logger.info(f"http server initialized at {http_host}:{http_port}")
else:
Expand Down Expand Up @@ -338,7 +450,7 @@ async def _async_notify():
DataOrganizer.purge(),
DataOrganizer.organize(self._executor),
]
for m in modules:
for m in dashboard_head_modules:
concurrent_tasks.append(m.run(self.server))
if self.server:
concurrent_tasks.append(self.server.wait_for_termination())
Expand Down
Loading