diff --git a/mars/deploy/oscar/base_config.yml b/mars/deploy/oscar/base_config.yml index 00115d727e..2901958a51 100644 --- a/mars/deploy/oscar/base_config.yml +++ b/mars/deploy/oscar/base_config.yml @@ -34,6 +34,7 @@ scheduling: enabled: false min_workers: 1 # Must >=1, mars need at least 1 worker to fetch data max_workers: 100 + initial_round_scale_out_workers: 2 scheduler_backlog_timeout: 60 worker_idle_timeout: 120 speculation: diff --git a/mars/metrics/backends/tests/test_metric.py b/mars/metrics/backends/tests/test_metric.py index b0d79e940d..eb50fa80b9 100644 --- a/mars/metrics/backends/tests/test_metric.py +++ b/mars/metrics/backends/tests/test_metric.py @@ -12,7 +12,7 @@ # See the License for the specific language governing permissions and # limitations under the License. -from ..metric import ( +from ..metrics import ( AbstractMetric, AbstractCounter, AbstractGauge, diff --git a/mars/services/scheduling/supervisor/assigner.py b/mars/services/scheduling/supervisor/assigner.py index 96c98b343c..1a4bec38c3 100644 --- a/mars/services/scheduling/supervisor/assigner.py +++ b/mars/services/scheduling/supervisor/assigner.py @@ -13,6 +13,7 @@ # limitations under the License. import asyncio +import heapq import itertools import numpy as np from collections import defaultdict @@ -111,12 +112,10 @@ async def assign_subtasks( exclude_bands = exclude_bands or set() inp_keys = set() selected_bands = dict() - if not self._bands: self._update_bands( list(await self._cluster_api.get_all_bands(NodeRole.WORKER)) ) - for subtask in subtasks: is_gpu = any(c.op.gpu for c in subtask.chunk_graph) if subtask.expect_bands: @@ -207,58 +206,45 @@ async def assign_subtasks( return assigns async def reassign_subtasks( - self, band_to_queued_num: Dict[BandType, int] + self, band_to_queued_num: Dict[BandType, int], used_slots: Dict[BandType, int] = None ) -> Dict[BandType, int]: + used_slots = used_slots or {} move_queued_subtasks = {} for is_gpu in (False, True): band_name_prefix = "numa" if not is_gpu else "gpu" - - filtered_bands = [ + device_bands = [ band for band in self._bands if band[1].startswith(band_name_prefix) ] - filtered_band_to_queued_num = { + if not device_bands: + continue + device_bands_set = set(device_bands) + device_band_to_queued_num = { k: v for k, v in band_to_queued_num.items() if k[1].startswith(band_name_prefix) } - - if not filtered_bands: - continue - - num_used_bands = len(filtered_band_to_queued_num.keys()) + used_bands = device_band_to_queued_num.keys() + num_used_bands = len(used_bands) if num_used_bands == 1: - [(band, length)] = filtered_band_to_queued_num.items() + [(band, length)] = device_band_to_queued_num.items() if length == 0: move_queued_subtasks.update({band: 0}) continue # no need to balance when there's only one band initially - if len(filtered_bands) == 1 and band == filtered_bands[0]: + if len(device_bands) == 1 and band == device_bands[0]: move_queued_subtasks.update({band: 0}) continue - # unready bands recorded in band_num_queued_subtasks, some of them may hold 0 subtasks - unready_bands = list( - set(filtered_band_to_queued_num.keys()) - set(filtered_bands) - ) - # ready bands not recorded in band_num_queued_subtasks, all of them hold 0 subtasks - new_ready_bands = list( - set(filtered_bands) - set(filtered_band_to_queued_num.keys()) - ) - # when there are new ready bands, make all bands hold same amount of subtasks - # when there are no new ready bands now, move out subtasks left on them - if not new_ready_bands and unready_bands: - filtered_band_to_queued_num = { - k: filtered_band_to_queued_num[k] for k in unready_bands - } # approximate total of subtasks moving to each ready band - num_all_subtasks = sum(filtered_band_to_queued_num.values()) - mean = int(num_all_subtasks / len(filtered_bands)) + num_all_subtasks = sum(device_band_to_queued_num.values()) + sum(used_slots.values()) + # If the remainder is not 0, move in and out may be unequal. + mean = int(num_all_subtasks / len(device_bands)) # all_bands (namely) includes: # a. ready bands recorded in band_num_queued_subtasks # b. ready bands not recorded in band_num_queued_subtasks # c. unready bands recorded in band_num_queued_subtasks # a. + b. = self._bands, a. + c. = bands in band_num_queued_subtasks all_bands = list( - set(filtered_bands) | set(filtered_band_to_queued_num.keys()) + device_bands_set | set(device_band_to_queued_num.keys()) ) # calculate the differential steps of moving subtasks # move < 0 means subtasks should move out and vice versa @@ -266,19 +252,37 @@ async def reassign_subtasks( # assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks band_move_nums = {} for band in all_bands: - if band in filtered_bands: - band_move_nums[band] = mean - filtered_band_to_queued_num.get( - band, 0 - ) + existing_subtask_nums = device_band_to_queued_num.get(band, 0) + if band in device_bands: + band_move_num = mean - existing_subtask_nums - used_slots.get(band, 0) + # If slot of band already has some subtasks running, band_move_num may be greater than + # existing_subtask_nums. + if band_move_num + existing_subtask_nums < 0: + band_move_num = -existing_subtask_nums else: - band_move_nums[band] = -filtered_band_to_queued_num.get(band, 0) - # ensure the balance of moving in and out - total_move = sum(band_move_nums.values()) - # int() is going to be closer to zero, so `mean` is no more than actual mean value - # total_move = mean * len(self._bands) - num_all_subtasks - # <= actual_mean * len(self._bands) - num_all_subtasks = 0 - assert total_move <= 0 - if total_move != 0: - band_move_nums[self._get_random_band(False)] -= total_move + band_move_num = -existing_subtask_nums + band_move_nums[band] = band_move_num + self._balance_move_out_subtasks(band_move_nums) + assert sum(band_move_nums.values()) == 0, f"band_move_nums {band_move_nums}" move_queued_subtasks.update(band_move_nums) return dict(sorted(move_queued_subtasks.items(), key=lambda item: item[1])) + + def _balance_move_out_subtasks(self, band_move_nums: Dict[BandType, int]): + # ensure the balance of moving in and out + total_move = sum(band_move_nums.values()) + if total_move != 0: + unit = (total_move > 0) - (total_move < 0) + + class BandHeapItem: + def __init__(self, band_): + self.band = band_ + + def __lt__(self, other: "BandHeapItem"): + return (band_move_nums.get(self.band, 0) > band_move_nums.get(other.band, 0)) * unit + bands_queue = [BandHeapItem(band) for band, move_nums in band_move_nums.items() if move_nums * unit > 0] + heapq.heapify(bands_queue) + while total_move != 0: + item = heapq.heappop(bands_queue) + band_move_nums[item.band] -= unit + total_move -= unit + heapq.heappush(bands_queue, item) diff --git a/mars/services/scheduling/supervisor/autoscale.py b/mars/services/scheduling/supervisor/autoscale.py index b5157b4022..945a2dd205 100644 --- a/mars/services/scheduling/supervisor/autoscale.py +++ b/mars/services/scheduling/supervisor/autoscale.py @@ -22,7 +22,9 @@ from typing import List, Set, Dict, Optional, Any from .... import oscar as mo +from ....metrics import Metrics from ....typing import BandType +from ....utils import report_event from ....lib.aio import alru_cache from ...cluster.api import ClusterAPI from ...cluster.core import NodeRole, NodeStatus @@ -38,6 +40,16 @@ def __init__(self, autoscale_conf: Dict[str, Any]): self.queueing_refs = dict() self.global_slot_ref = None self._dynamic_workers: Set[str] = set() + self._max_workers = autoscale_conf.get("max_workers", 100) + self._initial_request_worker_timeout = autoscale_conf.get("initial_request_worker_timeout", 30) + self._request_worker_time_secs = Metrics.gauge( + "mars.scheduling.request_worker_time_sec", + "Time consuming in seconds to request a worker", + ) + self._offline_worker_time_secs = Metrics.gauge( + "mars.scheduling.offline_worker_time_sec", + "Time consuming in seconds to offline a worker", + ) async def __post_create__(self): strategy = self._autoscale_conf.get("strategy") @@ -54,8 +66,8 @@ async def __post_create__(self): self._cluster_api = await ClusterAPI.create(self.address) self._strategy = await strategy_cls.create(self._autoscale_conf, self) if self._enabled: - logger.info(f"Auto scale strategy %s started", self._strategy) await self._strategy.start() + logger.info(f"Autoscaler started with conf %s and strategy %s", self._autoscale_conf, self._strategy) async def __pre_destroy__(self): if self._enabled: @@ -75,24 +87,28 @@ async def request_worker( self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None ) -> str: start_time = time.time() + timeout = timeout or self._initial_request_worker_timeout worker_address = await self._cluster_api.request_worker( worker_cpu, worker_mem, timeout ) if worker_address: self._dynamic_workers.add(worker_address) - logger.info( - "Requested new worker %s in %.4f seconds, current dynamic worker nums is %s", + duration = time.time() - start_time + msg = "Requested new worker {} in {} seconds, current dynamic worker nums is {}".format( worker_address, - time.time() - start_time, + duration, self.get_dynamic_worker_nums(), - ) + ) + logger.info(msg) + report_event("WARNING", "AUTO_SCALING_OUT", msg) return worker_address else: logger.warning( - "Request worker with resource %s failed in %.4f seconds.", - dict(worker_cpu=worker_cpu, worker_mem=worker_mem), + "Request worker with resource %s and timeout %s failed in %.4f seconds.", + dict(worker_cpu=worker_cpu, worker_mem=worker_mem), timeout, time.time() - start_time, ) + self._initial_request_worker_timeout *= 2 async def release_workers(self, addresses: List[str]): """ @@ -119,14 +135,19 @@ async def release_workers(self, addresses: List[str]): async def release_worker(address): logger.info("Start to release worker %s.", address) + start = time.time() worker_bands = workers_bands[address] await asyncio.gather( *[self.global_slot_ref.wait_band_idle(band) for band in worker_bands] ) await self._migrate_data_of_bands(worker_bands, excluded_bands) await self._cluster_api.release_worker(address) + duration = time.time() - start + self._offline_worker_time_secs.record(duration) self._dynamic_workers.remove(address) - logger.info("Released worker %s.", address) + msg = f"Released worker {address}." + logger.info(msg) + report_event("WARNING", "AUTO_SCALING_IN", msg) await asyncio.gather(*[release_worker(address) for address in addresses]) @@ -256,11 +277,19 @@ def __init__(self, autoscale_conf: Dict[str, Any], autoscaler): self._sustained_scheduler_backlog_timeout = autoscale_conf.get( "sustained_scheduler_backlog_timeout", self._scheduler_backlog_timeout ) + self._scheduler_scale_out_enabled = autoscale_conf.get( + "scheduler_scale_out_enabled", True + ) # Make worker_idle_timeout greater than scheduler_backlog_timeout to # avoid cluster fluctuate back and forth。 self._worker_idle_timeout = autoscale_conf.get( "worker_idle_timeout", 2 * self._scheduler_backlog_timeout ) + self._initial_round_scale_out_workers = autoscale_conf.get("initial_round_scale_out_workers", 2) + if self._initial_round_scale_out_workers < 2: + logger.warning("Initial round scale_out workers %s is less than 2, set it to 2.", + self._initial_round_scale_out_workers) + self._initial_round_scale_out_workers = 2 self._min_workers = autoscale_conf.get("min_workers", 1) assert self._min_workers >= 1, "Mars need at least 1 worker." self._max_workers = autoscale_conf.get("max_workers", 100) @@ -297,33 +326,30 @@ async def _run(self): logger.info("Canceled pending task backlog strategy.") except Exception as e: # pragma: no cover logger.exception("Exception occurred when try to auto scale") - raise e + pass async def _run_round(self): queueing_refs = list(self._autoscaler.queueing_refs.values()) - if any([await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs]): + if self._scheduler_scale_out_enabled and await self.all_bands_busy(queueing_refs): await self._scale_out(queueing_refs) else: await self._scale_in() + @staticmethod + async def all_bands_busy(queueing_refs): + return any( + [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] + ) + async def _scale_out(self, queueing_refs): logger.info( "Try to scale out, current dynamic workers %s", self._autoscaler.get_dynamic_worker_nums(), ) start_time = time.time() - while not await self._autoscaler.request_worker(): - logger.warning( - "Request worker failed, wait %s seconds and retry.", - self._scheduler_check_interval, - ) - await asyncio.sleep(self._scheduler_check_interval) - await asyncio.sleep(self._scheduler_backlog_timeout) rnd = 1 - while any( - [await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs] - ): - worker_num = 2**rnd + while await self.all_bands_busy(queueing_refs): + worker_num = self._initial_round_scale_out_workers ** rnd if ( self._autoscaler.get_dynamic_worker_nums() + worker_num > self._max_workers @@ -331,18 +357,23 @@ async def _scale_out(self, queueing_refs): worker_num = ( self._max_workers - self._autoscaler.get_dynamic_worker_nums() ) - while set( - await asyncio.gather( - *[self._autoscaler.request_worker() for _ in range(worker_num)] - ) - ) == {None}: + if worker_num == 0: + break + requested_workers = set(await asyncio.gather( + *[self._autoscaler.request_worker() for _ in range(worker_num)] + )) + while len(requested_workers) == 0 and await self.all_bands_busy(queueing_refs): logger.warning( "Request %s workers all failed, wait %s seconds and retry.", worker_num, self._scheduler_check_interval, ) await asyncio.sleep(self._scheduler_check_interval) + requested_workers.update(await asyncio.gather( + *[self._autoscaler.request_worker() for _ in range(worker_num)] + )) rnd += 1 + logger.info("Requested %s workers.", len(requested_workers)) await asyncio.sleep(self._sustained_scheduler_backlog_timeout) logger.info( "Scale out finished in %s round, took %s seconds, current dynamic workers %s", @@ -368,12 +399,6 @@ async def _scale_in(self): } worker_addresses = set(band[0] for band in idle_bands) if worker_addresses: - logger.debug( - "Bands %s of workers % has been idle for as least %s seconds.", - idle_bands, - worker_addresses, - self._worker_idle_timeout, - ) while ( worker_addresses and self._autoscaler.get_dynamic_worker_nums() - len(worker_addresses) @@ -390,6 +415,13 @@ async def _scale_in(self): idle_bands.difference_update( set(await self._autoscaler.get_worker_bands(worker_address)) ) + if worker_addresses: + logger.info( + "Bands %s of workers % has been idle for as least %s seconds.", + idle_bands, + worker_addresses, + self._worker_idle_timeout, + ) if worker_addresses: start_time = time.time() logger.info( diff --git a/mars/services/scheduling/supervisor/queueing.py b/mars/services/scheduling/supervisor/queueing.py index 8051c24e36..281668890c 100644 --- a/mars/services/scheduling/supervisor/queueing.py +++ b/mars/services/scheduling/supervisor/queueing.py @@ -22,7 +22,7 @@ from .... import oscar as mo from ....lib.aio import alru_cache -from ....utils import dataslots +from ....utils import dataslots, create_task_with_error_log from ...subtask import Subtask from ...task import TaskAPI from ..utils import redirect_subtask_errors @@ -75,39 +75,6 @@ async def __post_create__(self): self._cluster_api = await ClusterAPI.create(self.address) self._band_slot_nums = {} - - async def watch_bands(): - async for bands in self._cluster_api.watch_all_bands(): - # confirm ready bands indeed changed - if bands != self._band_slot_nums: - old_band_slot_nums = self._band_slot_nums - self._band_slot_nums = copy.deepcopy(bands) - if self._band_queues: - await self.balance_queued_subtasks() - # Refresh global slot manager to get latest bands, - # so that subtasks reassigned to the new bands can be - # ensured to get submitted as least one subtask every band - # successfully. - await self._slots_ref.refresh_bands() - all_bands = {*bands.keys(), *old_band_slot_nums.keys()} - bands_delta = {} - for b in all_bands: - delta = bands.get(b, 0) - old_band_slot_nums.get(b, 0) - if delta != 0: - bands_delta[b] = delta - # Submit tasks on new bands manually, otherwise some subtasks - # will never got submitted. Note that we must ensure every new - # band will get at least one subtask submitted successfully. - # Later subtasks submit on the band will be triggered by the - # success of previous subtasks on the same band. - logger.info( - "Bands changed with delta %s, submit all bands.", - bands_delta, - ) - await self.ref().submit_subtasks() - - self._band_watch_task = asyncio.create_task(watch_bands()) - from .globalslot import GlobalSlotManagerActor [self._slots_ref] = await self._cluster_api.get_supervisor_refs( @@ -119,6 +86,39 @@ async def watch_bands(): AssignerActor.gen_uid(self._session_id), address=self.address ) + async def watch_bands(): + async for bands in self._cluster_api.watch_all_bands(): + # confirm ready bands indeed changed + if bands != self._band_slot_nums: + old_band_slot_nums = self._band_slot_nums + self._band_slot_nums = copy.deepcopy(bands) + await self._slots_ref.refresh_bands() + all_bands = {*bands.keys(), *old_band_slot_nums.keys()} + bands_delta = {} + for b in all_bands: + delta = bands.get(b, 0) - old_band_slot_nums.get(b, 0) + if delta != 0: + bands_delta[b] = delta + # Submit tasks on new bands manually, otherwise some subtasks + # will never got submitted. Note that we must ensure every new + # band will get at least one subtask submitted successfully. + # Later subtasks submit on the band will be triggered by the + # success of previous subtasks on the same band. + logger.info( + "Bands changed with delta %s, submit all bands.", + bands_delta, + ) + # submit_subtasks may empty _band_queues, so use `ref()` to wait previous `submit_subtasks` call + # finish. + await self.ref().balance_queued_subtasks() + # Refresh global slot manager to get latest bands, + # so that subtasks reassigned to the new bands can be + # ensured to get submitted as least one subtask every band + # successfully. + await self.ref().submit_subtasks() + + self._band_watch_task = create_task_with_error_log(watch_bands()) + if self._submit_period > 0: self._periodical_submit_task = self.ref().periodical_submit.tell_delay( delay=self._submit_period @@ -174,7 +174,7 @@ async def add_subtasks( logger.debug("%d subtasks enqueued", len(subtasks)) async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None): - logger.debug("Submitting subtasks with limit %s", limit) + logger.debug("Submitting subtasks with limit %s to band %s", limit, band) if not limit and band not in self._band_slot_nums: self._band_slot_nums = await self._cluster_api.get_all_bands() @@ -182,7 +182,6 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) bands = [band] if band is not None else list(self._band_slot_nums.keys()) submit_aio_tasks = [] manager_ref = await self._get_manager_ref() - apply_delays = [] submit_items_list = [] submitted_bands = [] @@ -201,46 +200,57 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) subtask_ids = list(submit_items) if not subtask_ids: continue - submitted_bands.append(band) submit_items_list.append(submit_items) - - # todo it is possible to provide slot data with more accuracy - subtask_slots = [1] * len(subtask_ids) - + subtask_resources = [ + Resource( + num_cpus=item.subtask.num_cpus, + num_mem_bytes=item.subtask.num_mem_bytes if self._hbo_enabled else 0, + ) + for item in submit_items.values() + ] apply_delays.append( - self._slots_ref.apply_subtask_slots.delay( - band, self._session_id, subtask_ids, subtask_slots + self._slots_ref.apply_subtask_resources.delay( + band, self._session_id, subtask_ids, subtask_resources ) ) async with redirect_subtask_errors( - self, - [ - item.subtask - for submit_items in submit_items_list - for item in submit_items.values() - ], + self, + [ + item.subtask + for submit_items in submit_items_list + for item in submit_items.values() + ], ): - submitted_ids_list = await self._slots_ref.apply_subtask_slots.batch( + submitted_ids_list = await self._slots_ref.apply_subtask_resources.batch( *apply_delays ) - for band, submit_items, submitted_ids in zip( - submitted_bands, submit_items_list, submitted_ids_list + submitted_bands, submit_items_list, submitted_ids_list ): subtask_ids = list(submit_items) task_queue = self._band_queues[band] - async with redirect_subtask_errors( self, [item.subtask for item in submit_items.values()] ): non_submitted_ids = [k for k in submit_items if k not in submitted_ids] + metrics_options = { + "session_id": self._session_id, + "band": band[0] if band else "", + } + self._submitted_subtask_count.record( + len(submitted_ids), metrics_options + ) + self._unsubmitted_subtask_count.record( + len(non_submitted_ids), metrics_options + ) if submitted_ids: for stid in subtask_ids: if stid not in submitted_ids: continue item = submit_items[stid] + item.subtask.submitted = True logger.debug( "Submit subtask %s to band %r", item.subtask.subtask_id, @@ -259,10 +269,19 @@ async def submit_subtasks(self, band: Tuple = None, limit: Optional[int] = None) logger.debug("No slots available") for stid in non_submitted_ids: - heapq.heappush(task_queue, submit_items[stid]) + item = submit_items[stid] + self._max_enqueue_id += 1 + # lower priority to ensure other subtasks can be scheduled. + item.priority = item.priority[:-1] + (self._max_enqueue_id,) + heapq.heappush(task_queue, item) + if non_submitted_ids: + log_func = logger.info if self._periodical_submit_task is None else logger.debug + log_func("No slots available, band queues status: %s", self.band_queue_subtask_nums()) if submit_aio_tasks: yield asyncio.gather(*submit_aio_tasks) + else: + logger.debug("No subtasks to submit, perhaps because of the lack of resources.") def _ensure_top_item_valid(self, task_queue): """Clean invalid subtask item from the queue to ensure that when the queue is not empty, @@ -298,28 +317,64 @@ async def all_bands_busy(self) -> bool: return all(len(self._band_queues[band]) > 0 for band in bands) return False + def band_queue_subtask_nums(self): + return {q: len(subtasks) for q, subtasks in self._band_queues.items()} + async def balance_queued_subtasks(self): + used_slots = {band: slots for band, slots in (await self._slots_ref.get_used_slots()).items() if slots > 0} + remaining_slots = await self._slots_ref.get_remaining_slots() # record length of band queues - band_num_queued_subtasks = { - band: len(queue) for band, queue in self._band_queues.items() - } + band_num_queued_subtasks = {} + for band, queue in self._band_queues.items(): + queue_size = len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + if queue_size > 0: + band_num_queued_subtasks[band] = queue_size + logger.info("Start to balance subtasks:\n" + "used_slots %s\n remaining_slots %s\n queue size %s\n band_num_queued_subtasks %s", + used_slots, remaining_slots, + {band: len(queue) for band, queue in self._band_queues.items() if len(queue) > 0}, + band_num_queued_subtasks) + if sum(band_num_queued_subtasks.values()) == 0: + logger.info("No subtasks in queue, skip balance.") + return move_queued_subtasks = await self._assigner_ref.reassign_subtasks( - band_num_queued_subtasks + band_num_queued_subtasks, used_slots=used_slots ) items = [] # rewrite band queues according to feedbacks from assigner for band, move in move_queued_subtasks.items(): task_queue = self._band_queues[band] - assert move + len(task_queue) >= 0 + queue_size = len([item for item in task_queue if item.subtask.subtask_id in self._stid_to_items]) + assert queue_size + move >= 0, f"move {move} from {band, task_queue} " \ + f"move_queued_subtasks {move_queued_subtasks} " \ + f"band_num_queued_subtasks {band_num_queued_subtasks}" for _ in range(abs(move)): if move < 0: # TODO: pop item of low priority + self._ensure_top_item_valid(task_queue) item = heapq.heappop(task_queue) self._stid_to_bands[item.subtask.subtask_id].remove(band) items.append(item) elif move > 0: item = items.pop() - self._stid_to_bands[item.subtask.subtask_id].append(band) - heapq.heappush(task_queue, item) + subtask = item.subtask + if subtask.bands_specified and band not in subtask.expect_bands: + logger.warning("Skip reschedule subtask %s to band %s because it's band is specified to %s.", + subtask.subtask_id, band, subtask.expect_bands) + specified_band = subtask.expect_band + specified_band_queue = self._band_queues[specified_band] + heapq.heappush(specified_band_queue, item) + self._stid_to_bands[item.subtask.subtask_id].append(specified_band) + else: + subtask.expect_bands = [band] + self._stid_to_bands[item.subtask.subtask_id].append(band) + heapq.heappush(task_queue, item) if len(task_queue) == 0: self._band_queues.pop(band) + balanced_num_queued_subtasks = {} + for band, queue in self._band_queues.items(): + band_length = len([item for item in queue if item.subtask.subtask_id in self._stid_to_items]) + if band_length > 0: + balanced_num_queued_subtasks[band] = band_length + logger.info("Balance subtasks succeed:\n move_queued_subtasks %s\n " + "balanced_num_queued_subtasks %s", move_queued_subtasks, balanced_num_queued_subtasks) diff --git a/mars/services/scheduling/supervisor/tests/test_assigner.py b/mars/services/scheduling/supervisor/tests/test_assigner.py index cb71d6424f..f6d02af274 100644 --- a/mars/services/scheduling/supervisor/tests/test_assigner.py +++ b/mars/services/scheduling/supervisor/tests/test_assigner.py @@ -46,7 +46,7 @@ def __init__(self, timeout=None, check_interval=None, with_gpu=False): self.all_bands = self.ready_bands.copy() async def update_node_info( - self, address, role, env=None, resource=None, detail=None, status=None + self, address, role, env=None, resource=None, detail=None, status=None ): if "address" in address and status == NodeStatus.STOPPING: del self.ready_bands[(address, "numa-0")] @@ -249,23 +249,11 @@ async def test_reassign_subtasks(actor_pool): move_queued_subtasks = await assigner_ref.reassign_subtasks( band_num_queued_subtasks ) - assert move_queued_subtasks in ( - { - ("address1", "numa-0"): -1, - ("address0", "numa-0"): -1, - ("address2", "numa-0"): 2, - }, - { - ("address1", "numa-0"): -2, - ("address0", "numa-0"): 0, - ("address2", "numa-0"): 2, - }, - { - ("address1", "numa-0"): -2, - ("address0", "numa-0"): -1, - ("address2", "numa-0"): 3, - }, - ) + assert move_queued_subtasks == { + ("address1", "numa-0"): -2, + ("address0", "numa-0"): 0, + ("address2", "numa-0"): 2, + } # ('address0', 'numa-0'), ('address2', 'numa-0') are ready await cluster_api.set_node_status( @@ -280,18 +268,11 @@ async def test_reassign_subtasks(actor_pool): move_queued_subtasks = await assigner_ref.reassign_subtasks( band_num_queued_subtasks ) - assert move_queued_subtasks in ( - { - ("address1", "numa-0"): -7, - ("address0", "numa-0"): 3, - ("address2", "numa-0"): 4, - }, - { - ("address1", "numa-0"): -7, - ("address0", "numa-0"): 4, - ("address2", "numa-0"): 3, - }, - ) + assert move_queued_subtasks == { + ('address1', 'numa-0'): -7, + ('address0', 'numa-0'): -1, + ('address2', 'numa-0'): 8 + } band_num_queued_subtasks = {("address0", "numa-0"): 9, ("address1", "numa-0"): 7} move_queued_subtasks = await assigner_ref.reassign_subtasks( diff --git a/mars/services/scheduling/supervisor/tests/test_queue_balance.py b/mars/services/scheduling/supervisor/tests/test_queue_balance.py index d5b823ff11..1b4a2eaf74 100644 --- a/mars/services/scheduling/supervisor/tests/test_queue_balance.py +++ b/mars/services/scheduling/supervisor/tests/test_queue_balance.py @@ -14,8 +14,8 @@ import asyncio import pytest +from collections import defaultdict from typing import Tuple, List - from ..... import oscar as mo from ....cluster import ClusterAPI from ....cluster.core import NodeRole, NodeStatus @@ -109,6 +109,15 @@ def apply_subtask_slots( ): return subtask_ids + def refresh_bands(self): + pass + + def get_used_slots(self): + return {} + + def get_remaining_slots(self): + return {} + class MockAssignerActor(mo.Actor): def assign_subtasks( @@ -116,7 +125,7 @@ def assign_subtasks( ): return [subtask.expect_bands[0] for subtask in subtasks] - def reassign_subtasks(self, band_num_queued_subtasks): + def reassign_subtasks(self, band_num_queued_subtasks, used_slots=None): if len(band_num_queued_subtasks.keys()) == 1: [(band, _)] = band_num_queued_subtasks.items() return {band: 0} @@ -129,15 +138,15 @@ def reassign_subtasks(self, band_num_queued_subtasks): class MockSubtaskManagerActor(mo.Actor): def __init__(self): - self._subtask_ids, self._bands = [], [] + self._submitted_subtask_ids = defaultdict(list) @mo.extensible def submit_subtask_to_band(self, subtask_id: str, band: Tuple): - self._subtask_ids.append(subtask_id) - self._bands.append(band) + print(f"submit subtask {subtask_id} to band {band}") + self._submitted_subtask_ids[band].append(subtask_id) def dump_data(self): - return self._subtask_ids, self._bands + return self._submitted_subtask_ids @pytest.fixture @@ -209,15 +218,15 @@ async def test_subtask_queueing(actor_pool): # 9 subtasks on ('address0', 'numa-0') await queueing_ref.submit_subtasks(band=("address0", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 9 + commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] + assert len(commited_subtask_ids) == 9, f"commited_subtask_ids {commited_subtask_ids}" # 0 subtasks on ('address1', 'numa-0') await queueing_ref.submit_subtasks(band=("address1", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 9 + commited_subtask_ids = (await manager_ref.dump_data())[("address0", "numa-0")] + assert len(commited_subtask_ids) == 9, f"commited_subtask_ids {commited_subtask_ids}" # 9 subtasks on ('address2', 'numa-0') await queueing_ref.submit_subtasks(band=("address2", "numa-0"), limit=10) - commited_subtask_ids, _commited_bands = await manager_ref.dump_data() - assert len(commited_subtask_ids) == 18 + submitted_subtask_ids = await manager_ref.dump_data() + assert sum(len(v) for v in submitted_subtask_ids.values()) == 18 diff --git a/mars/storage/ray.py b/mars/storage/ray.py index 829205a8ce..e4a05ca970 100644 --- a/mars/storage/ray.py +++ b/mars/storage/ray.py @@ -11,7 +11,7 @@ # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. - +import logging import inspect from typing import Any, Dict, List, Tuple @@ -22,6 +22,7 @@ from .core import BufferWrappedFileObject, StorageFileObject ray = lazy_import("ray") +logger = logging.getLogger(__name__) # TODO(fyrestone): make the SparseMatrix pickleable. @@ -145,18 +146,24 @@ async def get(self, object_id, **kwargs) -> object: if kwargs: # pragma: no cover raise NotImplementedError(f'Got unsupported args: {",".join(kwargs)}') with debug_async_timeout( - "ray_object_retrieval_timeout", "Storage get object timeout" + "ray_object_retrieval_timeout", + "Storage get object timeout, ObjectRef: %s", + object_id ): return await object_id @implements(StorageBackend.put) async def put(self, obj, importance=0) -> ObjectInfo: - if support_specify_owner() and self._owner_address: - if not self._owner: - self._owner = ray.get_actor(self._owner_address) - object_id = ray.put(obj, _owner=self._owner) - else: - object_id = ray.put(obj) + try: + if support_specify_owner() and self._owner_address: + if not self._owner: + self._owner = ray.get_actor(self._owner_address) + object_id = ray.put(obj, _owner=self._owner) + else: + object_id = ray.put(obj) + except Exception as e: + logger.exception("ray.put error %s", e) + raise # We can't get the serialized bytes length from ray.put return ObjectInfo(object_id=object_id) diff --git a/mars/utils.py b/mars/utils.py index 0c545103a1..ec23a9994a 100644 --- a/mars/utils.py +++ b/mars/utils.py @@ -55,6 +55,10 @@ import numpy as np import pandas as pd +try: + import ray +except ImportError: + ray = None from sklearn.base import BaseEstimator from ._utils import ( # noqa: F401 # pylint: disable=unused-import @@ -1592,3 +1596,9 @@ def create_task_with_error_log(coro, *args, **kwargs): # pragma: no cover else: call_site = None return _create_task(_run_task_with_error_log(coro, call_site), *args, **kwargs) + + +def report_event(severity, label, message): + if ray and ray.is_initialized(): + severity = getattr(ray.EventSeverity, severity) if isinstance(severity, str) else severity + ray.report_event(severity, label, message)