Skip to content

Commit

Permalink
Improve deploy backend (#2958)
Browse files Browse the repository at this point in the history
* new_cluster support backend

* Fix

* Fix

* Fix lint

* Remove unused code

Co-authored-by: 刘宝 <[email protected]>
  • Loading branch information
fyrestone and 刘宝 authored Apr 26, 2022
1 parent 712d7f1 commit b93c02b
Show file tree
Hide file tree
Showing 10 changed files with 82 additions and 68 deletions.
2 changes: 2 additions & 0 deletions mars/deploy/oscar/base_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ task:
fuse_enabled: yes
initial_same_color_num: null
as_broadcaster_successor_num: null
task_executor_config:
backend: mars
scheduling:
autoscale:
enabled: false
Expand Down
25 changes: 19 additions & 6 deletions mars/deploy/oscar/local.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,9 @@

import asyncio
import atexit
import logging
import os
import sys
import logging
from concurrent.futures import Future as SyncFuture
from typing import Dict, List, Union

Expand All @@ -27,14 +28,13 @@
from ...resource import cpu_count, cuda_count, mem_total, Resource
from ...services import NodeRole
from ...typing import ClusterType, ClientType
from ..utils import get_third_party_modules_from_config
from ..utils import get_third_party_modules_from_config, load_config
from .pool import create_supervisor_actor_pool, create_worker_actor_pool
from .service import (
start_supervisor,
start_worker,
stop_supervisor,
stop_worker,
load_config,
)
from .session import AbstractSession, _new_session, ensure_isolation_created

Expand All @@ -46,6 +46,15 @@
)
atexit.register(stop_isolation)

# The default config file.
DEFAULT_CONFIG_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "config.yml"
)


def _load_config(config: Union[str, Dict] = None):
return load_config(config, default_config_file=DEFAULT_CONFIG_FILE)


async def new_cluster_in_isolation(
address: str = "0.0.0.0",
Expand All @@ -67,6 +76,7 @@ async def new_cluster_in_isolation(
mem_bytes,
cuda_devices,
subprocess_start_method,
backend,
config,
web,
n_supervisor_process,
Expand All @@ -82,6 +92,7 @@ async def new_cluster(
mem_bytes: Union[int, str] = "auto",
cuda_devices: Union[List[int], str] = "auto",
subprocess_start_method: str = None,
backend: str = None,
config: Union[str, Dict] = None,
web: bool = True,
loop: asyncio.AbstractEventLoop = None,
Expand All @@ -95,6 +106,7 @@ async def new_cluster(
mem_bytes=mem_bytes,
cuda_devices=cuda_devices,
subprocess_start_method=subprocess_start_method,
backend=backend,
config=config,
web=web,
n_supervisor_process=n_supervisor_process,
Expand All @@ -121,6 +133,7 @@ def __init__(
mem_bytes: Union[int, str] = "auto",
cuda_devices: Union[List[int], List[List[int]], str] = "auto",
subprocess_start_method: str = None,
backend: str = None,
config: Union[str, Dict] = None,
web: Union[bool, str] = "auto",
n_supervisor_process: int = 0,
Expand All @@ -133,11 +146,11 @@ def __init__(
"spawn" if sys.platform == "win32" else "forkserver"
)
# load config file to dict.
if not config or isinstance(config, str):
config = load_config(config)
self._address = address
self._subprocess_start_method = subprocess_start_method
self._config = config
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
if backend is not None:
self._config["task"]["task_executor_config"]["backend"] = backend
self._n_cpu = cpu_count() if n_cpu == "auto" else n_cpu
self._mem_bytes = mem_total() if mem_bytes == "auto" else mem_bytes
self._n_supervisor_process = n_supervisor_process
Expand Down
31 changes: 10 additions & 21 deletions mars/deploy/oscar/ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@
AbstractClusterBackend,
)
from ...services import NodeRole
from ...utils import merge_dict, flatten_dict_to_nested_dict
from ...utils import lazy_import
from ..utils import load_service_config_file, get_third_party_modules_from_config
from ..utils import (
load_config,
get_third_party_modules_from_config,
)
from .service import start_supervisor, start_worker, stop_supervisor, stop_worker
from .session import (
_new_session,
Expand All @@ -51,31 +53,18 @@
ray = lazy_import("ray")
logger = logging.getLogger(__name__)

# The default config file.
DEFAULT_CONFIG_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "rayconfig.yml"
)
# The default value for supervisor standalone (not share node with worker).
DEFAULT_SUPERVISOR_STANDALONE = False
# The default value for supervisor sub pool count.
DEFAULT_SUPERVISOR_SUB_POOL_NUM = 0


def _load_config(config: Union[str, Dict] = None):
# use default config
if isinstance(config, str):
filename = config
else:
d = os.path.dirname(os.path.abspath(__file__))
filename = os.path.join(d, "rayconfig.yml")
full_config = load_service_config_file(filename)
if config and not isinstance(config, str):
if not isinstance(config, Dict): # pragma: no cover
raise ValueError(f"{config} is not a dict")
flatten_keys = set(k for k in config.keys() if isinstance(k, str) and "." in k)
nested_flatten_config = flatten_dict_to_nested_dict(
{k: config[k] for k in flatten_keys}
)
nested_config = {k: config[k] for k in config.keys() if k not in flatten_keys}
config = merge_dict(nested_config, nested_flatten_config, overwrite=False)
merge_dict(full_config, config)
return full_config
return load_config(config, default_config_file=DEFAULT_CONFIG_FILE)


@register_cluster_backend
Expand Down Expand Up @@ -421,7 +410,7 @@ def __init__(
self._worker_cpu = worker_cpu
self._worker_mem = worker_mem
# load config file to dict.
self._config = _load_config(config)
self._config = load_config(config, default_config_file=DEFAULT_CONFIG_FILE)
self.supervisor_address = None
# Hold actor handles to avoid being freed
self._supervisor_pool = None
Expand Down
18 changes: 0 additions & 18 deletions mars/deploy/oscar/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,14 @@
# limitations under the License.

import logging
import os
from typing import List, Dict, Union

from ...resource import Resource
from ...services import start_services, stop_services, NodeRole
from ..utils import load_service_config_file

logger = logging.getLogger(__name__)


def load_config(filename=None):
# use default config
if not filename: # pragma: no cover
d = os.path.dirname(os.path.abspath(__file__))
filename = os.path.join(d, "config.yml")
return load_service_config_file(filename)


async def start_supervisor(
address: str,
lookup_address: str = None,
Expand All @@ -39,8 +29,6 @@ async def start_supervisor(
web: Union[str, bool] = "auto",
):
logger.debug("Starting Mars supervisor at %s", address)
if not config or isinstance(config, str):
config = load_config(config)
lookup_address = lookup_address or address
backend = config["cluster"].get("backend", "fixed")
if backend == "fixed" and config["cluster"].get("lookup_address") is None:
Expand Down Expand Up @@ -68,8 +56,6 @@ async def start_supervisor(


async def stop_supervisor(address: str, config: Dict = None):
if not config or isinstance(config, str):
config = load_config(config)
await stop_services(NodeRole.SUPERVISOR, address=address, config=config)


Expand All @@ -82,8 +68,6 @@ async def start_worker(
mark_ready: bool = True,
):
logger.debug("Starting Mars worker at %s", address)
if not config or isinstance(config, str):
config = load_config(config)
backend = config["cluster"].get("backend", "fixed")
if backend == "fixed" and config["cluster"].get("lookup_address") is None:
config["cluster"]["lookup_address"] = lookup_address
Expand All @@ -103,6 +87,4 @@ async def start_worker(


async def stop_worker(address: str, config: Dict = None):
if not config or isinstance(config, str):
config = load_config(config)
await stop_services(NodeRole.WORKER, address=address, config=config)
10 changes: 0 additions & 10 deletions mars/deploy/oscar/tests/local_test_with_ray_dag_config.yml

This file was deleted.

6 changes: 3 additions & 3 deletions mars/deploy/oscar/tests/test_checked_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from ....core import TileableType, OperandType
from ....services.task.supervisor.tests import CheckedTaskPreprocessor
from ....services.subtask.worker.tests import CheckedSubtaskProcessor
from ..service import load_config
from ..local import _load_config
from ..tests.session import new_test_session, CONFIG_FILE


Expand Down Expand Up @@ -58,7 +58,7 @@ def test_checked_session(setup):


def test_check_task_preprocessor(setup):
config = load_config(CONFIG_FILE)
config = _load_config(CONFIG_FILE)
config["task"][
"task_preprocessor_cls"
] = "mars.deploy.oscar.tests.test_checked_session.FakeCheckedTaskPreprocessor"
Expand All @@ -78,7 +78,7 @@ def test_check_task_preprocessor(setup):


def test_check_subtask_processor(setup):
config = load_config(CONFIG_FILE)
config = _load_config(CONFIG_FILE)
config["subtask"][
"subtask_processor_cls"
] = "mars.deploy.oscar.tests.test_checked_session.FakeCheckedSubtaskProcessor"
Expand Down
7 changes: 3 additions & 4 deletions mars/deploy/oscar/tests/test_local.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,7 @@
from ....services.storage import StorageAPI
from ....tensor.arithmetic.add import TensorAdd
from ....tests.core import mock, check_dict_structure_same, DICT_NOT_EMPTY
from ..local import new_cluster
from ..service import load_config
from ..local import new_cluster, _load_config
from ..session import (
get_default_async_session,
get_default_session,
Expand Down Expand Up @@ -614,7 +613,7 @@ def cancel():


def test_load_third_party_modules(cleanup_third_party_modules_output): # noqa: F811
config = load_config()
config = _load_config()

config["third_party_modules"] = set()
with pytest.raises(TypeError, match="set"):
Expand Down Expand Up @@ -698,7 +697,7 @@ async def _exec():

@pytest.fixture
async def speculative_cluster():
config = load_config()
config = _load_config()
# coloring based fusion will make subtask too heterogeneous such that the speculative scheduler can't
# get enough homogeneous subtasks to calculate statistics
config["task"]["default_config"]["fuse_enabled"] = False
Expand Down
2 changes: 1 addition & 1 deletion mars/deploy/oscar/tests/test_ray.py
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@
from ....tests.core import require_ray, mock, DICT_NOT_EMPTY
from ....utils import lazy_import
from ..ray import (
new_cluster,
_load_config,
ClusterStateActor,
new_cluster,
new_cluster_in_ray,
new_ray_session,
)
Expand Down
6 changes: 1 addition & 5 deletions mars/deploy/oscar/tests/test_ray_dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,10 +28,6 @@

ray = lazy_import("ray")

CONFIG_TEST_FILE = os.path.join(
os.path.dirname(__file__), "local_test_with_ray_dag_config.yml"
)

EXPECT_PROFILING_STRUCTURE = {
"supervisor": {
"general": {
Expand Down Expand Up @@ -61,7 +57,7 @@ async def create_cluster(request):
start_method = os.environ.get("POOL_START_METHOD", None)
client = await new_cluster(
subprocess_start_method=start_method,
config=CONFIG_TEST_FILE,
backend="ray",
n_worker=2,
n_cpu=2,
use_uvloop=False,
Expand Down
43 changes: 43 additions & 0 deletions mars/deploy/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import yaml

from ..services import NodeRole
from ..utils import merge_dict, flatten_dict_to_nested_dict

DEFAULT_CONFIG_FILE = os.path.join(
os.path.dirname(os.path.abspath(__file__)), "oscar/config.yml"
Expand Down Expand Up @@ -113,6 +114,48 @@ def _clear_meta_cfg(src: Dict):
return cfg


def _merge_config(full_config: Dict, config: Dict) -> Dict:
"""
Merge the config to full_config, the config support flatten key, e.g.
config={
'scheduling.autoscale.enabled': True,
'scheduling.autoscale.scheduler_check_interval': 1,
'scheduling.autoscale.scheduler_backlog_timeout': 1,
'scheduling.autoscale.worker_idle_timeout': 10,
'scheduling.autoscale.min_workers': 1,
'scheduling.autoscale.max_workers': 4
}
"""
if not config:
return full_config
if not isinstance(config, Dict): # pragma: no cover
raise ValueError(
f"The config should be a dict, but the type is {type(config)}."
)
flatten_keys = set(k for k in config.keys() if isinstance(k, str) and "." in k)
nested_flatten_config = flatten_dict_to_nested_dict(
{k: config[k] for k in flatten_keys}
)
nested_config = {k: config[k] for k in config.keys() if k not in flatten_keys}
config = merge_dict(nested_config, nested_flatten_config, overwrite=False)
merge_dict(full_config, config)
return full_config


def load_config(config: Union[str, Dict], default_config_file: str):
"""
Load config based on the default_config.
"""
# use default config
if isinstance(config, str):
filename = config
return load_service_config_file(filename)
else:
full_config = load_service_config_file(default_config_file)
return _merge_config(full_config, config)


async def wait_all_supervisors_ready(endpoint):
"""
Wait till all containers are ready
Expand Down

0 comments on commit b93c02b

Please sign in to comment.