From 5becab9c46126516942e62c8d677027df4e09431 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=85=95=E7=99=BD?= Date: Fri, 10 Dec 2021 10:30:38 +0800 Subject: [PATCH 1/3] support specify initial_round_scale_out_workers --- mars/deploy/oscar/base_config.yml | 1 + .../scheduling/supervisor/autoscale.py | 35 +++++++++++----- mars/services/task/supervisor/processor.py | 17 ++++++-- mars/services/task/supervisor/stage.py | 42 ++++++++++++++++--- mars/storage/ray.py | 23 ++++++---- mars/utils.py | 11 +++++ 6 files changed, 103 insertions(+), 26 deletions(-) diff --git a/mars/deploy/oscar/base_config.yml b/mars/deploy/oscar/base_config.yml index 0eb5bdc179..8152b2532e 100644 --- a/mars/deploy/oscar/base_config.yml +++ b/mars/deploy/oscar/base_config.yml @@ -34,5 +34,6 @@ scheduling: enabled: false min_workers: 1 # Must >=1, mars need at least 1 worker to fetch data max_workers: 100 + initial_round_scale_out_workers: 2 scheduler_backlog_timeout: 60 worker_idle_timeout: 120 diff --git a/mars/services/scheduling/supervisor/autoscale.py b/mars/services/scheduling/supervisor/autoscale.py index b5157b4022..5039b9be73 100644 --- a/mars/services/scheduling/supervisor/autoscale.py +++ b/mars/services/scheduling/supervisor/autoscale.py @@ -22,7 +22,9 @@ from typing import List, Set, Dict, Optional, Any from .... import oscar as mo +from ....metric import Metrics from ....typing import BandType +from ....utils import report_event from ....lib.aio import alru_cache from ...cluster.api import ClusterAPI from ...cluster.core import NodeRole, NodeStatus @@ -54,8 +56,8 @@ async def __post_create__(self): self._cluster_api = await ClusterAPI.create(self.address) self._strategy = await strategy_cls.create(self._autoscale_conf, self) if self._enabled: - logger.info(f"Auto scale strategy %s started", self._strategy) await self._strategy.start() + logger.info(f"Autoscaler started with conf %s and strategy %s", self._autoscale_conf, self._strategy) async def __pre_destroy__(self): if self._enabled: @@ -80,12 +82,14 @@ async def request_worker( ) if worker_address: self._dynamic_workers.add(worker_address) - logger.info( - "Requested new worker %s in %.4f seconds, current dynamic worker nums is %s", + duration = time.time() - start_time + msg = "Requested new worker {} in {} seconds, current dynamic worker nums is {}".format( worker_address, - time.time() - start_time, + duration, self.get_dynamic_worker_nums(), - ) + ) + logger.info(msg) + report_event("INFO", "AUTO_SCALING_OUT", msg) return worker_address else: logger.warning( @@ -119,14 +123,19 @@ async def release_workers(self, addresses: List[str]): async def release_worker(address): logger.info("Start to release worker %s.", address) + start = time.time() worker_bands = workers_bands[address] await asyncio.gather( *[self.global_slot_ref.wait_band_idle(band) for band in worker_bands] ) await self._migrate_data_of_bands(worker_bands, excluded_bands) await self._cluster_api.release_worker(address) + duration = time.time() - start + self._offline_worker_time_secs.record(duration) self._dynamic_workers.remove(address) - logger.info("Released worker %s.", address) + msg = f"Released worker {address}." + logger.info(msg) + report_event("INFO", "AUTO_SCALING_IN", msg) await asyncio.gather(*[release_worker(address) for address in addresses]) @@ -256,11 +265,15 @@ def __init__(self, autoscale_conf: Dict[str, Any], autoscaler): self._sustained_scheduler_backlog_timeout = autoscale_conf.get( "sustained_scheduler_backlog_timeout", self._scheduler_backlog_timeout ) + self._scheduler_scale_out_enabled = autoscale_conf.get( + "scheduler_scale_out_enabled", True + ) # Make worker_idle_timeout greater than scheduler_backlog_timeout to # avoid cluster fluctuate back and forth。 self._worker_idle_timeout = autoscale_conf.get( "worker_idle_timeout", 2 * self._scheduler_backlog_timeout ) + self._initial_round_scale_out_workers = autoscale_conf.get("initial_round_scale_out_workers", 2) self._min_workers = autoscale_conf.get("min_workers", 1) assert self._min_workers >= 1, "Mars need at least 1 worker." self._max_workers = autoscale_conf.get("max_workers", 100) @@ -301,7 +314,9 @@ async def _run(self): async def _run_round(self): queueing_refs = list(self._autoscaler.queueing_refs.values()) - if any([await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs]): + if self._scheduler_scale_out_enabled and any( + [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] + ): await self._scale_out(queueing_refs) else: await self._scale_in() @@ -323,7 +338,7 @@ async def _scale_out(self, queueing_refs): while any( [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] ): - worker_num = 2**rnd + worker_num = self._initial_round_scale_out_workers ** rnd if ( self._autoscaler.get_dynamic_worker_nums() + worker_num > self._max_workers @@ -368,7 +383,7 @@ async def _scale_in(self): } worker_addresses = set(band[0] for band in idle_bands) if worker_addresses: - logger.debug( + logger.info( "Bands %s of workers % has been idle for as least %s seconds.", idle_bands, worker_addresses, @@ -380,7 +395,7 @@ async def _scale_in(self): < self._min_workers ): worker_address = worker_addresses.pop() - logger.debug( + logger.info( "Skip offline idle worker %s to keep at least %s dynamic workers. " "Current total dynamic workers is %s.", worker_address, diff --git a/mars/services/task/supervisor/processor.py b/mars/services/task/supervisor/processor.py index 7df620cd4d..7be1d94e4b 100644 --- a/mars/services/task/supervisor/processor.py +++ b/mars/services/task/supervisor/processor.py @@ -37,6 +37,7 @@ ProfilingData, MARS_ENABLE_PROFILING, ) +from ....oscar.debug import debug_async_timeout from ....typing import TileableType, BandType from ....utils import build_fetch, Timer from ...cluster.api import ClusterAPI @@ -793,8 +794,13 @@ async def _decref_input_subtasks( if subtask.subtask_id not in self._subtask_decref_events: self._subtask_decref_events[subtask.subtask_id] = asyncio.Event() else: # pragma: no cover - await self._subtask_decref_events[subtask.subtask_id].wait() - return + with debug_async_timeout( + "async_call_timeout", + "Wait event of decref input of subtask %s.", + subtask.subtask_id, + ): + await self._subtask_decref_events[subtask.subtask_id].wait() + return decref_chunk_keys = [] for in_subtask in subtask_graph.iter_predecessors(subtask): @@ -811,7 +817,12 @@ async def _decref_input_subtasks( # decref main key as well decref_chunk_keys.extend([key[0] for key in data_keys]) decref_chunk_keys.append(result_chunk.key) - await self._lifecycle_api.decref_chunks(decref_chunk_keys) + with debug_async_timeout( + "async_call_timeout", + "Call lifecycle_api.decref_chunks with keys %s.", + decref_chunk_keys, + ): + await self._lifecycle_api.decref_chunks(decref_chunk_keys) # `set_subtask_result` will be called when subtask finished # but report progress will call set_subtask_result too, diff --git a/mars/services/task/supervisor/stage.py b/mars/services/task/supervisor/stage.py index c20bea4826..61a1b1b223 100644 --- a/mars/services/task/supervisor/stage.py +++ b/mars/services/task/supervisor/stage.py @@ -20,6 +20,7 @@ from .... import oscar as mo from ....core import ChunkGraph from ....core.operand import Fuse +from ....metric import Metrics from ....optimization.logical import OptimizationRecords from ....typing import BandType, TileableType from ....utils import get_params_fields @@ -84,12 +85,24 @@ def __init__( self._done = asyncio.Event() self._cancelled = asyncio.Event() + # metrics + self._stage_execution_time = Metrics.gauge( + 'mars.stage_execution_time_secs', + 'Time consuming in seconds to execute a stage', + ('session_id', 'task_id', 'stage_id')) + def is_cancelled(self): return self._cancelled.is_set() async def _schedule_subtasks(self, subtasks: List[Subtask]): + """Submitted subtasks won't be resubmitted again.""" if not subtasks: return + subtasks = [ + subtask + for subtask in subtasks + if subtask.subtask_id not in self._submitted_subtask_ids + ] self._submitted_subtask_ids.update(subtask.subtask_id for subtask in subtasks) return await self._scheduling_api.add_subtasks( subtasks, [subtask.priority for subtask in subtasks] @@ -115,11 +128,10 @@ async def _update_chunks_meta(self, chunk_graph: ChunkGraph): def _schedule_done(self): self._done.set() - async def set_subtask_result(self, result: SubtaskResult): + async def set_subtask_result(self, result: SubtaskResult, band: BandType = None): + assert result.status.is_done subtask = self.subtask_id_to_subtask[result.subtask_id] - self.subtask_results[subtask] = result.merge_bands( - self.subtask_results.get(subtask) - ) + # update subtask_results in `TaskProcessorActor.set_subtask_result` self._submitted_subtask_ids.difference_update([result.subtask_id]) all_done = len(self.subtask_results) == len(self.subtask_graph) @@ -136,7 +148,7 @@ async def set_subtask_result(self, result: SubtaskResult): # tell scheduling to finish subtasks await self._scheduling_api.finish_subtasks( - [result.subtask_id], schedule_next=not error_or_cancelled + [result.subtask_id], bands=[band], schedule_next=not error_or_cancelled ) if self.result.status != TaskStatus.terminated: self.result = TaskResult( @@ -160,11 +172,30 @@ async def set_subtask_result(self, result: SubtaskResult): result.traceback, ), ) + if result.status == SubtaskStatus.cancelled: + logger.warning( + "Subtask %s from band %s canceled.", + subtask.subtask_id, + band, + ) + logger.info( + "Start to cancel stage %s of task %s.", self.stage_id, self.task + ) # if error or cancel, cancel all submitted subtasks await self._scheduling_api.cancel_subtasks( list(self._submitted_subtask_ids) ) self._schedule_done() + cost_time_secs = self.result.end_time - self.result.start_time + logger.info( + 'Time consuming to execute a stage is %ss with ' + 'session id %s, task id %s, stage id %s', + cost_time_secs, self.result.session_id, self.result.task_id, + self.result.stage_id) + self._stage_execution_time.record(cost_time_secs, { + 'session_id': self.result.session_id, + 'task_id': self.result.task_id, + 'stage_id': self.result.stage_id}) else: # not terminated, push success subtasks to queue if they are ready to_schedule_subtasks = [] @@ -196,6 +227,7 @@ async def run(self): await self._done.wait() async def cancel(self): + logger.debug("Start to cancel stage %s of task %s.", self.stage_id, self.task) if self._done.is_set(): # pragma: no cover # already finished, ignore cancel return diff --git a/mars/storage/ray.py b/mars/storage/ray.py index 829205a8ce..e4a05ca970 100644 --- a/mars/storage/ray.py +++ b/mars/storage/ray.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging import inspect from typing import Any, Dict, List, Tuple @@ -22,6 +22,7 @@ from .core import BufferWrappedFileObject, StorageFileObject ray = lazy_import("ray") +logger = logging.getLogger(__name__) # TODO(fyrestone): make the SparseMatrix pickleable. @@ -145,18 +146,24 @@ async def get(self, object_id, **kwargs) -> object: if kwargs: # pragma: no cover raise NotImplementedError(f'Got unsupported args: {",".join(kwargs)}') with debug_async_timeout( - "ray_object_retrieval_timeout", "Storage get object timeout" + "ray_object_retrieval_timeout", + "Storage get object timeout, ObjectRef: %s", + object_id ): return await object_id @implements(StorageBackend.put) async def put(self, obj, importance=0) -> ObjectInfo: - if support_specify_owner() and self._owner_address: - if not self._owner: - self._owner = ray.get_actor(self._owner_address) - object_id = ray.put(obj, _owner=self._owner) - else: - object_id = ray.put(obj) + try: + if support_specify_owner() and self._owner_address: + if not self._owner: + self._owner = ray.get_actor(self._owner_address) + object_id = ray.put(obj, _owner=self._owner) + else: + object_id = ray.put(obj) + except Exception as e: + logger.exception("ray.put error %s", e) + raise # We can't get the serialized bytes length from ray.put return ObjectInfo(object_id=object_id) diff --git a/mars/utils.py b/mars/utils.py index bfb62f7e19..dec30f9cee 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -60,6 +60,11 @@ from .lib.version import parse as parse_version from .typing import ChunkType, TileableType, EntityType, OperandType +try: + import ray +except ImportError: + ray = None + logger = logging.getLogger(__name__) random.seed(int(time.time()) * os.getpid()) pd_release_version: Tuple[int] = parse_version(pd.__version__).release @@ -1575,3 +1580,9 @@ def __str__(self): "__basename__": name, }, )().with_traceback(traceback) + + +def report_event(severity, label, message): + if ray and ray.is_initialized(): + severity = getattr(ray.EventSeverity, severity) if isinstance(severity, str) else severity + ray.report_event(severity, label, message) From 1ad594a93ac64e303bbeed6f6ec1df71a2ac97a2 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E6=85=95=E7=99=BD?= Date: Thu, 16 Dec 2021 08:33:33 +0800 Subject: [PATCH 2/3] refine subtask balance --- .../scheduling/supervisor/assigner.py | 121 +++++++++++----- .../scheduling/supervisor/autoscale.py | 77 ++++++---- .../scheduling/supervisor/queueing.py | 134 ++++++++++++------ .../supervisor/tests/test_queue_balance.py | 33 +++-- 4 files changed, 242 insertions(+), 123 deletions(-) diff --git a/mars/services/scheduling/supervisor/assigner.py b/mars/services/scheduling/supervisor/assigner.py index 43e6ae8158..b793ccbfca 100644 --- a/mars/services/scheduling/supervisor/assigner.py +++ b/mars/services/scheduling/supervisor/assigner.py @@ -16,14 +16,14 @@ import itertools import random from collections import defaultdict -from typing import Dict, List +from typing import Dict, List, Set from .... import oscar as mo from ....core.operand import Fetch, FetchShuffle from ....typing import BandType from ...core import NodeRole from ...subtask import Subtask -from ..errors import NoMatchingSlots +from ..errors import NoMatchingSlots, NoAvailableBand class AssignerActor(mo.Actor): @@ -85,11 +85,33 @@ def _get_device_bands(self, is_gpu: bool): raise NoMatchingSlots("gpu" if is_gpu else "cpu") return filtered_bands - def _get_random_band(self, is_gpu: bool): - avail_bands = self._get_device_bands(is_gpu) - return random.choice(avail_bands) + def _get_random_band( + self, + is_gpu: bool, + exclude_bands: Set[BandType] = None, + exclude_bands_force: bool = False, + ): + if exclude_bands: + avail_bands = [ + band + for band in self._get_device_bands(is_gpu) + if band not in exclude_bands + ] + if avail_bands: + return random.choice(avail_bands) + elif exclude_bands_force: + raise NoAvailableBand( + f"No bands available after excluding bands {exclude_bands}" + ) + return random.choice(self._get_device_bands(is_gpu)) - async def assign_subtasks(self, subtasks: List[Subtask]): + async def assign_subtasks( + self, + subtasks: List[Subtask], + exclude_bands: Set[BandType] = None, + exclude_bands_force: bool = False, + ): + exclude_bands = exclude_bands or set() inp_keys = set() selected_bands = dict() @@ -101,28 +123,30 @@ async def assign_subtasks(self, subtasks: List[Subtask]): for subtask in subtasks: is_gpu = any(c.op.gpu for c in subtask.chunk_graph) if subtask.expect_bands: - if all( - expect_band in self._bands for expect_band in subtask.expect_bands - ): - # pass if all expected bands are available - selected_bands[subtask.subtask_id] = subtask.expect_bands - else: - # exclude expected but unready bands + # exclude expected but unready bands + expect_available_bands = [ + expect_band + for expect_band in subtask.expect_bands + if expect_band in self._bands and expect_band not in exclude_bands + ] + # fill in if all expected bands are unready + if not expect_available_bands: expect_available_bands = [ - expect_band - for expect_band in subtask.expect_bands - if expect_band in self._bands + self._get_random_band( + is_gpu, exclude_bands, exclude_bands_force + ) ] - # fill in if all expected bands are unready - if not expect_available_bands: - expect_available_bands = [self._get_random_band(is_gpu)] - selected_bands[subtask.subtask_id] = expect_available_bands + selected_bands[subtask.subtask_id] = expect_available_bands continue for indep_chunk in subtask.chunk_graph.iter_indep(): if isinstance(indep_chunk.op, Fetch): inp_keys.add(indep_chunk.key) elif isinstance(indep_chunk.op, FetchShuffle): - selected_bands[subtask.subtask_id] = [self._get_random_band(is_gpu)] + selected_bands[subtask.subtask_id] = [ + self._get_random_band( + is_gpu, exclude_bands, exclude_bands_force + ) + ] break fields = ["store_size", "bands"] @@ -152,11 +176,14 @@ async def assign_subtasks(self, subtasks: List[Subtask]): b for b in self._address_to_bands[band[0]] if b[1].startswith(band_prefix) + and b not in exclude_bands ] if sel_bands: - band = (band[0], random.choice(sel_bands)) - if band not in filtered_bands: - band = self._get_random_band(is_gpu) + band = random.choice(sel_bands) + if band not in filtered_bands or band in exclude_bands: + band = self._get_random_band( + is_gpu, exclude_bands, exclude_bands_force + ) band_sizes[band] += meta["store_size"] bands = [] max_size = -1 @@ -166,12 +193,24 @@ async def assign_subtasks(self, subtasks: List[Subtask]): max_size = size elif size == max_size: bands.append(band) - assigns.append(random.choice(bands)) + band = random.choice(bands) + if band in exclude_bands and exclude_bands_force: + raise NoAvailableBand( + f"No bands available for subtask {subtask.subtask_id} after " + f"excluded {exclude_bands}" + ) + if subtask.bands_specified and band not in subtask.expect_bands: + raise Exception( + f"No bands available for subtask {subtask.subtask_id} on bands {subtask.expect_bands} " + f"after excluded {exclude_bands}" + ) + assigns.append([band]) return assigns async def reassign_subtasks( - self, band_to_queued_num: Dict[BandType, int] + self, band_to_queued_num: Dict[BandType, int], used_slots: Dict[BandType, int] = None ) -> Dict[BandType, int]: + used_slots = used_slots or {} move_queued_subtasks = {} for is_gpu in (False, True): band_name_prefix = "numa" if not is_gpu else "gpu" @@ -187,8 +226,8 @@ async def reassign_subtasks( if not filtered_bands: continue - - num_used_bands = len(filtered_band_to_queued_num.keys()) + used_bands = filtered_band_to_queued_num.keys() + num_used_bands = len(used_bands) if num_used_bands == 1: [(band, length)] = filtered_band_to_queued_num.items() if length == 0: @@ -213,7 +252,7 @@ async def reassign_subtasks( k: filtered_band_to_queued_num[k] for k in unready_bands } # approximate total of subtasks moving to each ready band - num_all_subtasks = sum(filtered_band_to_queued_num.values()) + num_all_subtasks = sum(filtered_band_to_queued_num.values()) + sum(used_slots.values()) mean = int(num_all_subtasks / len(filtered_bands)) # all_bands (namely) includes: # a. ready bands recorded in band_num_queued_subtasks @@ -229,19 +268,23 @@ async def reassign_subtasks( # assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks band_move_nums = {} for band in all_bands: + existing_subtask_nums = filtered_band_to_queued_num.get(band, 0) if band in filtered_bands: - band_move_nums[band] = mean - filtered_band_to_queued_num.get( - band, 0 - ) + band_move_num = mean - existing_subtask_nums - used_slots.get(band, 0) + # If slot of band already has some subtasks running, band_move_num may be greater than + # existing_subtask_nums. + if band_move_num + existing_subtask_nums < 0: + band_move_num = -existing_subtask_nums else: - band_move_nums[band] = -filtered_band_to_queued_num.get(band, 0) + band_move_num = -existing_subtask_nums + band_move_nums[band] = band_move_num # ensure the balance of moving in and out total_move = sum(band_move_nums.values()) - # int() is going to be closer to zero, so `mean` is no more than actual mean value - # total_move = mean * len(self._bands) - num_all_subtasks - # <= actual_mean * len(self._bands) - num_all_subtasks = 0 - assert total_move <= 0 - if total_move != 0: - band_move_nums[self._get_random_band(False)] -= total_move + exclude_bands = set(used_bands) + unit = (total_move > 0) - (total_move < 0) + while total_move != 0: + band_move_nums[self._get_random_band(False, exclude_bands=exclude_bands)] -= unit + total_move -= unit + assert sum(band_move_nums.values()) == 0, f"band_move_nums {band_move_nums}" move_queued_subtasks.update(band_move_nums) return dict(sorted(move_queued_subtasks.items(), key=lambda item: item[1])) diff --git a/mars/services/scheduling/supervisor/autoscale.py b/mars/services/scheduling/supervisor/autoscale.py index 5039b9be73..dd4c6acc34 100644 --- a/mars/services/scheduling/supervisor/autoscale.py +++ b/mars/services/scheduling/supervisor/autoscale.py @@ -40,6 +40,16 @@ def __init__(self, autoscale_conf: Dict[str, Any]): self.queueing_refs = dict() self.global_slot_ref = None self._dynamic_workers: Set[str] = set() + self._max_workers = autoscale_conf.get("max_workers", 100) + self._initial_request_worker_timeout = autoscale_conf.get("initial_request_worker_timeout", 30) + self._request_worker_time_secs = Metrics.gauge( + "mars.scheduling.request_worker_time_sec", + "Time consuming in seconds to request a worker", + ) + self._offline_worker_time_secs = Metrics.gauge( + "mars.scheduling.offline_worker_time_sec", + "Time consuming in seconds to offline a worker", + ) async def __post_create__(self): strategy = self._autoscale_conf.get("strategy") @@ -77,6 +87,7 @@ async def request_worker( self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None ) -> str: start_time = time.time() + timeout = timeout or self._initial_request_worker_timeout worker_address = await self._cluster_api.request_worker( worker_cpu, worker_mem, timeout ) @@ -89,14 +100,15 @@ async def request_worker( self.get_dynamic_worker_nums(), ) logger.info(msg) - report_event("INFO", "AUTO_SCALING_OUT", msg) + report_event("WARNING", "AUTO_SCALING_OUT", msg) return worker_address else: logger.warning( - "Request worker with resource %s failed in %.4f seconds.", - dict(worker_cpu=worker_cpu, worker_mem=worker_mem), + "Request worker with resource %s and timeout %s failed in %.4f seconds.", + dict(worker_cpu=worker_cpu, worker_mem=worker_mem), timeout, time.time() - start_time, ) + self._initial_request_worker_timeout *= 2 async def release_workers(self, addresses: List[str]): """ @@ -135,7 +147,7 @@ async def release_worker(address): self._dynamic_workers.remove(address) msg = f"Released worker {address}." logger.info(msg) - report_event("INFO", "AUTO_SCALING_IN", msg) + report_event("WARNING", "AUTO_SCALING_IN", msg) await asyncio.gather(*[release_worker(address) for address in addresses]) @@ -274,6 +286,10 @@ def __init__(self, autoscale_conf: Dict[str, Any], autoscaler): "worker_idle_timeout", 2 * self._scheduler_backlog_timeout ) self._initial_round_scale_out_workers = autoscale_conf.get("initial_round_scale_out_workers", 2) + if self._initial_round_scale_out_workers < 2: + logger.warning("Initial round scale_out workers %s is less than 2, set it to 2.", + self._initial_round_scale_out_workers) + self._initial_round_scale_out_workers = 2 self._min_workers = autoscale_conf.get("min_workers", 1) assert self._min_workers >= 1, "Mars need at least 1 worker." self._max_workers = autoscale_conf.get("max_workers", 100) @@ -310,34 +326,29 @@ async def _run(self): logger.info("Canceled pending task backlog strategy.") except Exception as e: # pragma: no cover logger.exception("Exception occurred when try to auto scale") - raise e + pass async def _run_round(self): queueing_refs = list(self._autoscaler.queueing_refs.values()) - if self._scheduler_scale_out_enabled and any( - [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] - ): + if self._scheduler_scale_out_enabled and await self.all_bands_busy(queueing_refs): await self._scale_out(queueing_refs) else: await self._scale_in() + @staticmethod + async def all_bands_busy(queueing_refs): + return any( + [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] + ) + async def _scale_out(self, queueing_refs): logger.info( "Try to scale out, current dynamic workers %s", self._autoscaler.get_dynamic_worker_nums(), ) start_time = time.time() - while not await self._autoscaler.request_worker(): - logger.warning( - "Request worker failed, wait %s seconds and retry.", - self._scheduler_check_interval, - ) - await asyncio.sleep(self._scheduler_check_interval) - await asyncio.sleep(self._scheduler_backlog_timeout) rnd = 1 - while any( - [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] - ): + while await self.all_bands_busy(queueing_refs): worker_num = self._initial_round_scale_out_workers ** rnd if ( self._autoscaler.get_dynamic_worker_nums() + worker_num @@ -346,18 +357,23 @@ async def _scale_out(self, queueing_refs): worker_num = ( self._max_workers - self._autoscaler.get_dynamic_worker_nums() ) - while set( - await asyncio.gather( - *[self._autoscaler.request_worker() for _ in range(worker_num)] - ) - ) == {None}: + if worker_num == 0: + break + requested_workers = set(await asyncio.gather( + *[self._autoscaler.request_worker() for _ in range(worker_num)] + )) + while len(requested_workers) == 0 and await self.all_bands_busy(queueing_refs): logger.warning( "Request %s workers all failed, wait %s seconds and retry.", worker_num, self._scheduler_check_interval, ) await asyncio.sleep(self._scheduler_check_interval) + requested_workers.update(await asyncio.gather( + *[self._autoscaler.request_worker() for _ in range(worker_num)] + )) rnd += 1 + logger.info("Requested %s workers.", len(requested_workers)) await asyncio.sleep(self._sustained_scheduler_backlog_timeout) logger.info( "Scale out finished in %s round, took %s seconds, current dynamic workers %s", @@ -383,19 +399,13 @@ async def _scale_in(self): } worker_addresses = set(band[0] for band in idle_bands) if worker_addresses: - logger.info( - "Bands %s of workers % has been idle for as least %s seconds.", - idle_bands, - worker_addresses, - self._worker_idle_timeout, - ) while ( worker_addresses and self._autoscaler.get_dynamic_worker_nums() - len(worker_addresses) < self._min_workers ): worker_address = worker_addresses.pop() - logger.info( + logger.debug( "Skip offline idle worker %s to keep at least %s dynamic workers. " "Current total dynamic workers is %s.", worker_address, @@ -405,6 +415,13 @@ async def _scale_in(self): idle_bands.difference_update( set(await self._autoscaler.get_worker_bands(worker_address)) ) + if worker_addresses: + logger.info( + "Bands %s of workers % has been idle for as least %s seconds.", + idle_bands, + worker_addresses, + self._worker_idle_timeout, + ) if worker_addresses: start_time = time.time() logger.info( diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 2b401bce71..1f6170bfee 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -22,6 +22,8 @@ from .... import oscar as mo from ....lib.aio import alru_cache +from ....metric import Metrics +from ....oscar.debug import create_task_with_ex_logged from ....utils import dataslots from ...subtask import Subtask from ...task import TaskAPI @@ -75,39 +77,8 @@ async def __post_create__(self): self._cluster_api = await ClusterAPI.create(self.address) self._band_slot_nums = {} - - async def watch_bands(): - async for bands in self._cluster_api.watch_all_bands(): - # confirm ready bands indeed changed - if bands != self._band_slot_nums: - old_band_slot_nums = self._band_slot_nums - self._band_slot_nums = copy.deepcopy(bands) - if self._band_queues: - await self.balance_queued_subtasks() - # Refresh global slot manager to get latest bands, - # so that subtasks reassigned to the new bands can be - # ensured to get submitted as least one subtask every band - # successfully. - await self._slots_ref.refresh_bands() - all_bands = {*bands.keys(), *old_band_slot_nums.keys()} - bands_delta = {} - for b in all_bands: - delta = bands.get(b, 0) - old_band_slot_nums.get(b, 0) - if delta != 0: - bands_delta[b] = delta - # Submit tasks on new bands manually, otherwise some subtasks - # will never got submitted. Note that we must ensure every new - # band will get at least one subtask submitted successfully. - # Later subtasks submit on the band will be triggered by the - # success of previous subtasks on the same band. - logger.info( - "Bands changed with delta %s, submit all bands.", - bands_delta, - ) - await self.ref().submit_subtasks() - - self._band_watch_task = asyncio.create_task(watch_bands()) - + cluster_info = await self._cluster_api.get_cluster_info() + self._config = cluster_info.get('config', {}) from .globalslot import GlobalSlotManagerActor [self._slots_ref] = await self._cluster_api.get_supervisor_refs( @@ -119,6 +90,39 @@ async def watch_bands(): AssignerActor.gen_uid(self._session_id), address=self.address ) + async def watch_bands(): + async for bands in self._cluster_api.watch_all_bands(): + # confirm ready bands indeed changed + if bands != self._band_slot_nums: + old_band_slot_nums = self._band_slot_nums + self._band_slot_nums = copy.deepcopy(bands) + await self._slots_ref.refresh_bands() + all_bands = {*bands.keys(), *old_band_slot_nums.keys()} + bands_delta = {} + for b in all_bands: + delta = bands.get(b, 0) - old_band_slot_nums.get(b, 0) + if delta != 0: + bands_delta[b] = delta + # Submit tasks on new bands manually, otherwise some subtasks + # will never got submitted. Note that we must ensure every new + # band will get at least one subtask submitted successfully. + # Later subtasks submit on the band will be triggered by the + # success of previous subtasks on the same band. + logger.info( + "Bands changed with delta %s, submit all bands.", + bands_delta, + ) + # submit_subtasks may empty _band_queues, so use `ref()` to wait previous `submit_subtasks` call + # finish. + await self.ref().balance_queued_subtasks() + # Refresh global slot manager to get latest bands, + # so that subtasks reassigned to the new bands can be + # ensured to get submitted as least one subtask every band + # successfully. + await self.ref().submit_subtasks() + + self._band_watch_task = create_task_with_ex_logged(watch_bands()) + if self._submit_period > 0: self._periodical_submit_task = self.ref().periodical_submit.tell_delay( delay=self._submit_period @@ -160,7 +164,7 @@ async def add_subtasks(self, subtasks: List[Subtask], priorities: List[Tuple]): logger.debug("%d subtasks enqueued", len(subtasks)) async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None): - logger.debug("Submitting subtasks with limit %s", limit) + logger.debug("Submitting subtasks with limit %s to band %s", limit, band) if not limit and band not in self._band_slot_nums: self._band_slot_nums = await self._cluster_api.get_all_bands() @@ -177,8 +181,10 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) band_limit = limit or self._band_slot_nums[band] task_queue = self._band_queues[band] submit_items = dict() - while task_queue and len(submit_items) < band_limit: + while ( self._ensure_top_item_valid(task_queue) + and len(submit_items) < band_limit + ): item = heapq.heappop(task_queue) submit_items[item.subtask.subtask_id] = item @@ -225,6 +231,7 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) if stid not in submitted_ids: continue item = submit_items[stid] + item.subtask.submitted = True logger.debug( "Submit subtask %s to band %r", item.subtask.subtask_id, @@ -243,18 +250,30 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) logger.debug("No slots available") for stid in non_submitted_ids: - heapq.heappush(task_queue, submit_items[stid]) + item = submit_items[stid] + self._max_enqueue_id += 1 + # lower priority to ensure other subtasks can be scheduled. + item.priority = item.priority[:-1] + (self._max_enqueue_id,) + heapq.heappush(task_queue, item) + if non_submitted_ids: + log_func = logger.info if self._periodical_submit_task is None else logger.debug + log_func("No slots available, band queues status: %s", self.band_queue_subtask_nums()) if submit_aio_tasks: yield asyncio.gather(*submit_aio_tasks) def _ensure_top_item_valid(self, task_queue): - """Clean invalid subtask item from queue to ensure that""" + """Clean invalid subtask item from the queue to ensure that when the queue is not empty, + there is always some subtasks waiting being scheduled.""" while ( task_queue and task_queue[0].subtask.subtask_id not in self._stid_to_items ): # skip removed items (as they may be re-pushed into the queue) heapq.heappop(task_queue) + if task_queue: + return True + else: + return False @mo.extensible def update_subtask_priority(self, subtask_id: str, priority: Tuple): @@ -280,28 +299,59 @@ async def all_bands_busy(self) -> bool: return all(len(self._band_queues[band]) > 0 for band in bands) return False + def band_queue_subtask_nums(self): + return {q: len(subtasks) for q, subtasks in self._band_queues.items()} + async def balance_queued_subtasks(self): + used_slots = await self._slots_ref.get_used_slots() + remaining_slots = await self._slots_ref.get_remaining_slots() # record length of band queues band_num_queued_subtasks = { - band: len(queue) for band, queue in self._band_queues.items() + band: len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + for band, queue in self._band_queues.items() } + logger.info("Start to balance subtasks:\n" + "used_slots %s\n remaining_slots %s\n queue size %s\n band_num_queued_subtasks %s", + used_slots, remaining_slots, + {band: len(queue) for band, queue in self._band_queues.items()}, + band_num_queued_subtasks) + if sum(band_num_queued_subtasks.values()) == 0: + logger.info("No subtasks in queue, skip balance.") + return move_queued_subtasks = await self._assigner_ref.reassign_subtasks( - band_num_queued_subtasks + band_num_queued_subtasks, used_slots=used_slots ) items = [] # rewrite band queues according to feedbacks from assigner for band, move in move_queued_subtasks.items(): task_queue = self._band_queues[band] - assert move + len(task_queue) >= 0 + assert move + len(task_queue) >= 0, f"move {move} queue length {len(task_queue)}" for _ in range(abs(move)): if move < 0: # TODO: pop item of low priority + self._ensure_top_item_valid(task_queue) item = heapq.heappop(task_queue) self._stid_to_bands[item.subtask.subtask_id].remove(band) items.append(item) elif move > 0: item = items.pop() - self._stid_to_bands[item.subtask.subtask_id].append(band) - heapq.heappush(task_queue, item) + subtask = item.subtask + if subtask.bands_specified and band not in subtask.expect_bands: + logger.warning("Skip reschedule subtask %s to band %s because it's band is specified to %s.", + subtask.subtask_id, band, subtask.expect_bands) + specified_band = subtask.expect_band + specified_band_queue = self._band_queues[specified_band] + heapq.heappush(specified_band_queue, item) + self._stid_to_bands[item.subtask.subtask_id].append(specified_band) + else: + subtask.expect_bands = [band] + self._stid_to_bands[item.subtask.subtask_id].append(band) + heapq.heappush(task_queue, item) if len(task_queue) == 0: self._band_queues.pop(band) + balanced_num_queued_subtasks = { + band: len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + for band, queue in self._band_queues.items() + } + logger.info("Balance subtasks succeed:\n move_queued_subtasks %s\n " + "balanced_num_queued_subtasks %s", move_queued_subtasks, balanced_num_queued_subtasks) diff --git a/mars/services/scheduling/supervisor/tests/test_queue_balance.py b/mars/services/scheduling/supervisor/tests/test_queue_balance.py index 821b95cf99..5f7fe8811e 100644 --- a/mars/services/scheduling/supervisor/tests/test_queue_balance.py +++ b/mars/services/scheduling/supervisor/tests/test_queue_balance.py @@ -14,8 +14,8 @@ import asyncio import pytest +from collections import defaultdict from typing import Tuple, List - from ..... import oscar as mo from ....cluster import ClusterAPI from ....cluster.core import NodeRole, NodeStatus @@ -109,12 +109,21 @@ def apply_subtask_slots( ): return subtask_ids + def refresh_bands(self): + pass + + def get_used_slots(self): + return {} + + def get_remaining_slots(self): + return {} + class MockAssignerActor(mo.Actor): def assign_subtasks(self, subtasks: List[Subtask]): return [subtask.expect_bands[0] for subtask in subtasks] - def reassign_subtasks(self, band_num_queued_subtasks): + def reassign_subtasks(self, band_num_queued_subtasks, used_slots=None): if len(band_num_queued_subtasks.keys()) == 1: [(band, _)] = band_num_queued_subtasks.items() return {band: 0} @@ -127,15 +136,15 @@ def reassign_subtasks(self, band_num_queued_subtasks): class MockSubtaskManagerActor(mo.Actor): def __init__(self): - self._subtask_ids, self._bands = [], [] + self._submitted_subtask_ids = defaultdict(list) @mo.extensible def submit_subtask_to_band(self, subtask_id: str, band: Tuple): - self._subtask_ids.append(subtask_id) - self._bands.append(band) + print(f"submit subtask {subtask_id} to band {band}") + self._submitted_subtask_ids[band].append(subtask_id) def dump_data(self): - return self._subtask_ids, self._bands + return self._submitted_subtask_ids @pytest.fixture @@ -207,15 +216,15 @@ async def test_subtask_queueing(actor_pool): # 9 subtasks on ('address0', 'numa-0') await queueing_ref.submit_subtasks(band=("address0", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 9 + commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] + assert len(commited_subtask_ids) == 9, f"commited_subtask_ids {commited_subtask_ids}" # 0 subtasks on ('address1', 'numa-0') await queueing_ref.submit_subtasks(band=("address1", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 9 + commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] + assert len(commited_subtask_ids) == 9, f"commited_subtask_ids {commited_subtask_ids}" # 9 subtasks on ('address2', 'numa-0') await queueing_ref.submit_subtasks(band=("address2", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 18 + submitted_subtask_ids = await manager_ref.dump_data() + assert sum(len(v) for v in submitted_subtask_ids.values()) == 18 From 159759efe2afad49d82e9a09ac492e9a34657085 Mon Sep 17 00:00:00 2001 From: mubai Date: Fri, 4 Mar 2022 11:51:44 +0800 Subject: [PATCH 3/3] use heap for subtask reassign --- .../scheduling/supervisor/assigner.py | 75 +++++++++--------- .../scheduling/supervisor/queueing.py | 79 +++++++++++-------- .../supervisor/tests/test_assigner.py | 51 ++++-------- 3 files changed, 99 insertions(+), 106 deletions(-) diff --git a/mars/services/scheduling/supervisor/assigner.py b/mars/services/scheduling/supervisor/assigner.py index b793ccbfca..fa647134b2 100644 --- a/mars/services/scheduling/supervisor/assigner.py +++ b/mars/services/scheduling/supervisor/assigner.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import heapq import itertools import random from collections import defaultdict @@ -114,12 +115,10 @@ async def assign_subtasks( exclude_bands = exclude_bands or set() inp_keys = set() selected_bands = dict() - if not self._bands: self._update_bands( list(await self._cluster_api.get_all_bands(NodeRole.WORKER)) ) - for subtask in subtasks: is_gpu = any(c.op.gpu for c in subtask.chunk_graph) if subtask.expect_bands: @@ -204,7 +203,7 @@ async def assign_subtasks( f"No bands available for subtask {subtask.subtask_id} on bands {subtask.expect_bands} " f"after excluded {exclude_bands}" ) - assigns.append([band]) + assigns.append(band) return assigns async def reassign_subtasks( @@ -214,53 +213,39 @@ async def reassign_subtasks( move_queued_subtasks = {} for is_gpu in (False, True): band_name_prefix = "numa" if not is_gpu else "gpu" - - filtered_bands = [ + device_bands = [ band for band in self._bands if band[1].startswith(band_name_prefix) ] - filtered_band_to_queued_num = { + if not device_bands: + continue + device_bands_set = set(device_bands) + device_band_to_queued_num = { k: v for k, v in band_to_queued_num.items() if k[1].startswith(band_name_prefix) } - - if not filtered_bands: - continue - used_bands = filtered_band_to_queued_num.keys() + used_bands = device_band_to_queued_num.keys() num_used_bands = len(used_bands) if num_used_bands == 1: - [(band, length)] = filtered_band_to_queued_num.items() + [(band, length)] = device_band_to_queued_num.items() if length == 0: move_queued_subtasks.update({band: 0}) continue # no need to balance when there's only one band initially - if len(filtered_bands) == 1 and band == filtered_bands[0]: + if len(device_bands) == 1 and band == device_bands[0]: move_queued_subtasks.update({band: 0}) continue - # unready bands recorded in band_num_queued_subtasks, some of them may hold 0 subtasks - unready_bands = list( - set(filtered_band_to_queued_num.keys()) - set(filtered_bands) - ) - # ready bands not recorded in band_num_queued_subtasks, all of them hold 0 subtasks - new_ready_bands = list( - set(filtered_bands) - set(filtered_band_to_queued_num.keys()) - ) - # when there are new ready bands, make all bands hold same amount of subtasks - # when there are no new ready bands now, move out subtasks left on them - if not new_ready_bands and unready_bands: - filtered_band_to_queued_num = { - k: filtered_band_to_queued_num[k] for k in unready_bands - } # approximate total of subtasks moving to each ready band - num_all_subtasks = sum(filtered_band_to_queued_num.values()) + sum(used_slots.values()) - mean = int(num_all_subtasks / len(filtered_bands)) + num_all_subtasks = sum(device_band_to_queued_num.values()) + sum(used_slots.values()) + # If the remainder is not 0, move in and out may be unequal. + mean = int(num_all_subtasks / len(device_bands)) # all_bands (namely) includes: # a. ready bands recorded in band_num_queued_subtasks # b. ready bands not recorded in band_num_queued_subtasks # c. unready bands recorded in band_num_queued_subtasks # a. + b. = self._bands, a. + c. = bands in band_num_queued_subtasks all_bands = list( - set(filtered_bands) | set(filtered_band_to_queued_num.keys()) + device_bands_set | set(device_band_to_queued_num.keys()) ) # calculate the differential steps of moving subtasks # move < 0 means subtasks should move out and vice versa @@ -268,8 +253,8 @@ async def reassign_subtasks( # assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks band_move_nums = {} for band in all_bands: - existing_subtask_nums = filtered_band_to_queued_num.get(band, 0) - if band in filtered_bands: + existing_subtask_nums = device_band_to_queued_num.get(band, 0) + if band in device_bands: band_move_num = mean - existing_subtask_nums - used_slots.get(band, 0) # If slot of band already has some subtasks running, band_move_num may be greater than # existing_subtask_nums. @@ -278,13 +263,27 @@ async def reassign_subtasks( else: band_move_num = -existing_subtask_nums band_move_nums[band] = band_move_num - # ensure the balance of moving in and out - total_move = sum(band_move_nums.values()) - exclude_bands = set(used_bands) - unit = (total_move > 0) - (total_move < 0) - while total_move != 0: - band_move_nums[self._get_random_band(False, exclude_bands=exclude_bands)] -= unit - total_move -= unit + self._balance_move_out_subtasks(band_move_nums) assert sum(band_move_nums.values()) == 0, f"band_move_nums {band_move_nums}" move_queued_subtasks.update(band_move_nums) return dict(sorted(move_queued_subtasks.items(), key=lambda item: item[1])) + + def _balance_move_out_subtasks(self, band_move_nums: Dict[BandType, int]): + # ensure the balance of moving in and out + total_move = sum(band_move_nums.values()) + if total_move != 0: + unit = (total_move > 0) - (total_move < 0) + + class BandHeapItem: + def __init__(self, band_): + self.band = band_ + + def __lt__(self, other: "BandHeapItem"): + return (band_move_nums.get(self.band, 0) > band_move_nums.get(other.band, 0)) * unit + bands_queue = [BandHeapItem(band) for band, move_nums in band_move_nums.items() if move_nums * unit > 0] + heapq.heapify(bands_queue) + while total_move != 0: + item = heapq.heappop(bands_queue) + band_move_nums[item.band] -= unit + total_move -= unit + heapq.heappush(bands_queue, item) diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 1f6170bfee..fe43376cee 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -172,7 +172,6 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) bands = [band] if band is not None else list(self._band_slot_nums.keys()) submit_aio_tasks = [] manager_ref = await self._get_manager_ref() - apply_delays = [] submit_items_list = [] submitted_bands = [] @@ -191,41 +190,51 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) subtask_ids = list(submit_items) if not subtask_ids: continue - submitted_bands.append(band) submit_items_list.append(submit_items) - - # todo it is possible to provide slot data with more accuracy - subtask_slots = [1] * len(subtask_ids) - + subtask_resources = [ + Resource( + num_cpus=item.subtask.num_cpus, + num_mem_bytes=item.subtask.num_mem_bytes if self._hbo_enabled else 0, + ) + for item in submit_items.values() + ] apply_delays.append( - self._slots_ref.apply_subtask_slots.delay( - band, self._session_id, subtask_ids, subtask_slots + self._slots_ref.apply_subtask_resources.delay( + band, self._session_id, subtask_ids, subtask_resources ) ) async with redirect_subtask_errors( - self, - [ - item.subtask - for submit_items in submit_items_list - for item in submit_items.values() - ], + self, + [ + item.subtask + for submit_items in submit_items_list + for item in submit_items.values() + ], ): - submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch( + submitted_ids_list = await self._slots_ref.apply_subtask_resources.batch( *apply_delays ) - for band, submit_items, submitted_ids in zip( - submitted_bands, submit_items_list, submitted_ids_list + submitted_bands, submit_items_list, submitted_ids_list ): subtask_ids = list(submit_items) task_queue = self._band_queues[band] - async with redirect_subtask_errors( self, [item.subtask for item in submit_items.values()] ): non_submitted_ids = [k for k in submit_items if k not in submitted_ids] + metrics_options = { + "session_id": self._session_id, + "band": band[0] if band else "", + } + self._submitted_subtask_count.record( + len(submitted_ids), metrics_options + ) + self._unsubmitted_subtask_count.record( + len(non_submitted_ids), metrics_options + ) if submitted_ids: for stid in subtask_ids: if stid not in submitted_ids: @@ -261,6 +270,8 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) if submit_aio_tasks: yield asyncio.gather(*submit_aio_tasks) + else: + logger.debug("No subtasks to submit, perhaps because of the lack of resources.") def _ensure_top_item_valid(self, task_queue): """Clean invalid subtask item from the queue to ensure that when the queue is not empty, @@ -270,10 +281,7 @@ def _ensure_top_item_valid(self, task_queue): ): # skip removed items (as they may be re-pushed into the queue) heapq.heappop(task_queue) - if task_queue: - return True - else: - return False + return bool(task_queue) @mo.extensible def update_subtask_priority(self, subtask_id: str, priority: Tuple): @@ -303,17 +311,18 @@ def band_queue_subtask_nums(self): return {q: len(subtasks) for q, subtasks in self._band_queues.items()} async def balance_queued_subtasks(self): - used_slots = await self._slots_ref.get_used_slots() + used_slots = {band: slots for band, slots in (await self._slots_ref.get_used_slots()).items() if slots > 0} remaining_slots = await self._slots_ref.get_remaining_slots() # record length of band queues - band_num_queued_subtasks = { - band: len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) - for band, queue in self._band_queues.items() - } + band_num_queued_subtasks = {} + for band, queue in self._band_queues.items(): + queue_size = len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + if queue_size > 0: + band_num_queued_subtasks[band] = queue_size logger.info("Start to balance subtasks:\n" "used_slots %s\n remaining_slots %s\n queue size %s\n band_num_queued_subtasks %s", used_slots, remaining_slots, - {band: len(queue) for band, queue in self._band_queues.items()}, + {band: len(queue) for band, queue in self._band_queues.items() if len(queue) > 0}, band_num_queued_subtasks) if sum(band_num_queued_subtasks.values()) == 0: logger.info("No subtasks in queue, skip balance.") @@ -325,7 +334,10 @@ async def balance_queued_subtasks(self): # rewrite band queues according to feedbacks from assigner for band, move in move_queued_subtasks.items(): task_queue = self._band_queues[band] - assert move + len(task_queue) >= 0, f"move {move} queue length {len(task_queue)}" + queue_size = len([item for item in task_queue if item.subtask.subtask_id in self._stid_to_items]) + assert queue_size + move >= 0, f"move {move} from {band, task_queue} " \ + f"move_queued_subtasks {move_queued_subtasks} " \ + f"band_num_queued_subtasks {band_num_queued_subtasks}" for _ in range(abs(move)): if move < 0: # TODO: pop item of low priority @@ -349,9 +361,10 @@ async def balance_queued_subtasks(self): heapq.heappush(task_queue, item) if len(task_queue) == 0: self._band_queues.pop(band) - balanced_num_queued_subtasks = { - band: len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) - for band, queue in self._band_queues.items() - } + balanced_num_queued_subtasks = {} + for band, queue in self._band_queues.items(): + band_length = len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + if band_length > 0: + balanced_num_queued_subtasks[band] = band_length logger.info("Balance subtasks succeed:\n move_queued_subtasks %s\n " "balanced_num_queued_subtasks %s", move_queued_subtasks, balanced_num_queued_subtasks) diff --git a/mars/services/scheduling/supervisor/tests/test_assigner.py b/mars/services/scheduling/supervisor/tests/test_assigner.py index 56a66395cd..82462647c4 100644 --- a/mars/services/scheduling/supervisor/tests/test_assigner.py +++ b/mars/services/scheduling/supervisor/tests/test_assigner.py @@ -46,7 +46,7 @@ def __init__(self, timeout=None, check_interval=None, with_gpu=False): self.all_bands = self.ready_bands.copy() async def update_node_info( - self, address, role, env=None, resource=None, detail=None, status=None + self, address, role, env=None, resource=None, detail=None, status=None ): if "address" in address and status == NodeStatus.STOPPING: del self.ready_bands[(address, "numa-0")] @@ -163,19 +163,19 @@ async def test_assign_cpu_tasks(actor_pool): ) subtask = Subtask("test_task", session_id, chunk_graph=chunk_graph) - [result] = await assigner_ref.assign_subtasks([subtask]) + [[result]] = await assigner_ref.assign_subtasks([subtask]) assert result in (("address0", "numa-0"), ("address2", "numa-0")) subtask.expect_bands = [("address0", "numa-0")] - [result] = await assigner_ref.assign_subtasks([subtask]) + [[result]] = await assigner_ref.assign_subtasks([subtask]) assert result == ("address0", "numa-0") subtask.expect_bands = [("address0", "numa-0"), ("address1", "numa-0")] - [result] = await assigner_ref.assign_subtasks([subtask]) + [[result]] = await assigner_ref.assign_subtasks([subtask]) assert result == ("address0", "numa-0") subtask.expect_bands = [("address1", "numa-0")] - [result] = await assigner_ref.assign_subtasks([subtask]) + [[result]] = await assigner_ref.assign_subtasks([subtask]) assert result in (("address0", "numa-0"), ("address2", "numa-0")) result_chunk.op.gpu = True @@ -211,7 +211,7 @@ async def test_assign_gpu_tasks(actor_pool): ) subtask = Subtask("test_task", session_id, chunk_graph=chunk_graph) - [result] = await assigner_ref.assign_subtasks([subtask]) + [[result]] = await assigner_ref.assign_subtasks([subtask]) assert result[1].startswith("gpu") @@ -229,23 +229,11 @@ async def test_reassign_subtasks(actor_pool): move_queued_subtasks = await assigner_ref.reassign_subtasks( band_num_queued_subtasks ) - assert move_queued_subtasks in ( - { - ("address1", "numa-0"): -1, - ("address0", "numa-0"): -1, - ("address2", "numa-0"): 2, - }, - { - ("address1", "numa-0"): -2, - ("address0", "numa-0"): 0, - ("address2", "numa-0"): 2, - }, - { - ("address1", "numa-0"): -2, - ("address0", "numa-0"): -1, - ("address2", "numa-0"): 3, - }, - ) + assert move_queued_subtasks == { + ("address1", "numa-0"): -2, + ("address0", "numa-0"): 0, + ("address2", "numa-0"): 2, + } # ('address0', 'numa-0'), ('address2', 'numa-0') are ready await cluster_api.set_node_status( @@ -260,18 +248,11 @@ async def test_reassign_subtasks(actor_pool): move_queued_subtasks = await assigner_ref.reassign_subtasks( band_num_queued_subtasks ) - assert move_queued_subtasks in ( - { - ("address1", "numa-0"): -7, - ("address0", "numa-0"): 3, - ("address2", "numa-0"): 4, - }, - { - ("address1", "numa-0"): -7, - ("address0", "numa-0"): 4, - ("address2", "numa-0"): 3, - }, - ) + assert move_queued_subtasks == { + ('address1', 'numa-0'): -7, + ('address0', 'numa-0'): -1, + ('address2', 'numa-0'): 8 + } band_num_queued_subtasks = {("address0", "numa-0"): 9, ("address1", "numa-0"): 7} move_queued_subtasks = await assigner_ref.reassign_subtasks(