Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] refine auto scaling #2785

Open
wants to merge 4 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions mars/deploy/oscar/base_config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ scheduling:
enabled: false
min_workers: 1 # Must >=1, mars need at least 1 worker to fetch data
max_workers: 100
initial_round_scale_out_workers: 2
scheduler_backlog_timeout: 60
worker_idle_timeout: 120
speculation:
Expand Down
2 changes: 1 addition & 1 deletion mars/metrics/backends/tests/test_metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

from ..metric import (
from ..metrics import (
AbstractMetric,
AbstractCounter,
AbstractGauge,
Expand Down
90 changes: 47 additions & 43 deletions mars/services/scheduling/supervisor/assigner.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
# limitations under the License.

import asyncio
import heapq
import itertools
import numpy as np
from collections import defaultdict
Expand Down Expand Up @@ -111,12 +112,10 @@ async def assign_subtasks(
exclude_bands = exclude_bands or set()
inp_keys = set()
selected_bands = dict()

if not self._bands:
self._update_bands(
list(await self._cluster_api.get_all_bands(NodeRole.WORKER))
)

for subtask in subtasks:
is_gpu = any(c.op.gpu for c in subtask.chunk_graph)
if subtask.expect_bands:
Expand Down Expand Up @@ -207,78 +206,83 @@ async def assign_subtasks(
return assigns

async def reassign_subtasks(
self, band_to_queued_num: Dict[BandType, int]
self, band_to_queued_num: Dict[BandType, int], used_slots: Dict[BandType, int] = None
) -> Dict[BandType, int]:
used_slots = used_slots or {}
move_queued_subtasks = {}
for is_gpu in (False, True):
band_name_prefix = "numa" if not is_gpu else "gpu"

filtered_bands = [
device_bands = [
band for band in self._bands if band[1].startswith(band_name_prefix)
]
filtered_band_to_queued_num = {
if not device_bands:
continue
device_bands_set = set(device_bands)
device_band_to_queued_num = {
k: v
for k, v in band_to_queued_num.items()
if k[1].startswith(band_name_prefix)
}

if not filtered_bands:
continue

num_used_bands = len(filtered_band_to_queued_num.keys())
used_bands = device_band_to_queued_num.keys()
num_used_bands = len(used_bands)
if num_used_bands == 1:
[(band, length)] = filtered_band_to_queued_num.items()
[(band, length)] = device_band_to_queued_num.items()
if length == 0:
move_queued_subtasks.update({band: 0})
continue
# no need to balance when there's only one band initially
if len(filtered_bands) == 1 and band == filtered_bands[0]:
if len(device_bands) == 1 and band == device_bands[0]:
move_queued_subtasks.update({band: 0})
continue
# unready bands recorded in band_num_queued_subtasks, some of them may hold 0 subtasks
unready_bands = list(
set(filtered_band_to_queued_num.keys()) - set(filtered_bands)
)
# ready bands not recorded in band_num_queued_subtasks, all of them hold 0 subtasks
new_ready_bands = list(
set(filtered_bands) - set(filtered_band_to_queued_num.keys())
)
# when there are new ready bands, make all bands hold same amount of subtasks
# when there are no new ready bands now, move out subtasks left on them
if not new_ready_bands and unready_bands:
filtered_band_to_queued_num = {
k: filtered_band_to_queued_num[k] for k in unready_bands
}
# approximate total of subtasks moving to each ready band
num_all_subtasks = sum(filtered_band_to_queued_num.values())
mean = int(num_all_subtasks / len(filtered_bands))
num_all_subtasks = sum(device_band_to_queued_num.values()) + sum(used_slots.values())
# If the remainder is not 0, move in and out may be unequal.
mean = int(num_all_subtasks / len(device_bands))
# all_bands (namely) includes:
# a. ready bands recorded in band_num_queued_subtasks
# b. ready bands not recorded in band_num_queued_subtasks
# c. unready bands recorded in band_num_queued_subtasks
# a. + b. = self._bands, a. + c. = bands in band_num_queued_subtasks
all_bands = list(
set(filtered_bands) | set(filtered_band_to_queued_num.keys())
device_bands_set | set(device_band_to_queued_num.keys())
)
# calculate the differential steps of moving subtasks
# move < 0 means subtasks should move out and vice versa
# unready bands no longer hold subtasks
# assuming bands not recorded in band_num_queued_subtasks hold 0 subtasks
band_move_nums = {}
for band in all_bands:
if band in filtered_bands:
band_move_nums[band] = mean - filtered_band_to_queued_num.get(
band, 0
)
existing_subtask_nums = device_band_to_queued_num.get(band, 0)
if band in device_bands:
band_move_num = mean - existing_subtask_nums - used_slots.get(band, 0)
# If slot of band already has some subtasks running, band_move_num may be greater than
# existing_subtask_nums.
if band_move_num + existing_subtask_nums < 0:
band_move_num = -existing_subtask_nums
else:
band_move_nums[band] = -filtered_band_to_queued_num.get(band, 0)
# ensure the balance of moving in and out
total_move = sum(band_move_nums.values())
# int() is going to be closer to zero, so `mean` is no more than actual mean value
# total_move = mean * len(self._bands) - num_all_subtasks
# <= actual_mean * len(self._bands) - num_all_subtasks = 0
assert total_move <= 0
if total_move != 0:
band_move_nums[self._get_random_band(False)] -= total_move
band_move_num = -existing_subtask_nums
band_move_nums[band] = band_move_num
self._balance_move_out_subtasks(band_move_nums)
assert sum(band_move_nums.values()) == 0, f"band_move_nums {band_move_nums}"
move_queued_subtasks.update(band_move_nums)
return dict(sorted(move_queued_subtasks.items(), key=lambda item: item[1]))

def _balance_move_out_subtasks(self, band_move_nums: Dict[BandType, int]):
# ensure the balance of moving in and out
total_move = sum(band_move_nums.values())
if total_move != 0:
unit = (total_move > 0) - (total_move < 0)

class BandHeapItem:
def __init__(self, band_):
self.band = band_

def __lt__(self, other: "BandHeapItem"):
return (band_move_nums.get(self.band, 0) > band_move_nums.get(other.band, 0)) * unit
bands_queue = [BandHeapItem(band) for band, move_nums in band_move_nums.items() if move_nums * unit > 0]
heapq.heapify(bands_queue)
while total_move != 0:
item = heapq.heappop(bands_queue)
band_move_nums[item.band] -= unit
total_move -= unit
heapq.heappush(bands_queue, item)
96 changes: 64 additions & 32 deletions mars/services/scheduling/supervisor/autoscale.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,9 @@
from typing import List, Set, Dict, Optional, Any

from .... import oscar as mo
from ....metrics import Metrics
from ....typing import BandType
from ....utils import report_event
from ....lib.aio import alru_cache
from ...cluster.api import ClusterAPI
from ...cluster.core import NodeRole, NodeStatus
Expand All @@ -38,6 +40,16 @@ def __init__(self, autoscale_conf: Dict[str, Any]):
self.queueing_refs = dict()
self.global_slot_ref = None
self._dynamic_workers: Set[str] = set()
self._max_workers = autoscale_conf.get("max_workers", 100)
self._initial_request_worker_timeout = autoscale_conf.get("initial_request_worker_timeout", 30)
self._request_worker_time_secs = Metrics.gauge(
"mars.scheduling.request_worker_time_sec",
"Time consuming in seconds to request a worker",
)
self._offline_worker_time_secs = Metrics.gauge(
"mars.scheduling.offline_worker_time_sec",
"Time consuming in seconds to offline a worker",
)

async def __post_create__(self):
strategy = self._autoscale_conf.get("strategy")
Expand All @@ -54,8 +66,8 @@ async def __post_create__(self):
self._cluster_api = await ClusterAPI.create(self.address)
self._strategy = await strategy_cls.create(self._autoscale_conf, self)
if self._enabled:
logger.info(f"Auto scale strategy %s started", self._strategy)
await self._strategy.start()
logger.info(f"Autoscaler started with conf %s and strategy %s", self._autoscale_conf, self._strategy)

async def __pre_destroy__(self):
if self._enabled:
Expand All @@ -75,24 +87,28 @@ async def request_worker(
self, worker_cpu: int = None, worker_mem: int = None, timeout: int = None
) -> str:
start_time = time.time()
timeout = timeout or self._initial_request_worker_timeout
worker_address = await self._cluster_api.request_worker(
worker_cpu, worker_mem, timeout
)
if worker_address:
self._dynamic_workers.add(worker_address)
logger.info(
"Requested new worker %s in %.4f seconds, current dynamic worker nums is %s",
duration = time.time() - start_time
msg = "Requested new worker {} in {} seconds, current dynamic worker nums is {}".format(
worker_address,
time.time() - start_time,
duration,
self.get_dynamic_worker_nums(),
)
)
logger.info(msg)
report_event("WARNING", "AUTO_SCALING_OUT", msg)
return worker_address
else:
logger.warning(
"Request worker with resource %s failed in %.4f seconds.",
dict(worker_cpu=worker_cpu, worker_mem=worker_mem),
"Request worker with resource %s and timeout %s failed in %.4f seconds.",
dict(worker_cpu=worker_cpu, worker_mem=worker_mem), timeout,
time.time() - start_time,
)
self._initial_request_worker_timeout *= 2

async def release_workers(self, addresses: List[str]):
"""
Expand All @@ -119,14 +135,19 @@ async def release_workers(self, addresses: List[str]):

async def release_worker(address):
logger.info("Start to release worker %s.", address)
start = time.time()
worker_bands = workers_bands[address]
await asyncio.gather(
*[self.global_slot_ref.wait_band_idle(band) for band in worker_bands]
)
await self._migrate_data_of_bands(worker_bands, excluded_bands)
await self._cluster_api.release_worker(address)
duration = time.time() - start
self._offline_worker_time_secs.record(duration)
self._dynamic_workers.remove(address)
logger.info("Released worker %s.", address)
msg = f"Released worker {address}."
logger.info(msg)
report_event("WARNING", "AUTO_SCALING_IN", msg)

await asyncio.gather(*[release_worker(address) for address in addresses])

Expand Down Expand Up @@ -256,11 +277,19 @@ def __init__(self, autoscale_conf: Dict[str, Any], autoscaler):
self._sustained_scheduler_backlog_timeout = autoscale_conf.get(
"sustained_scheduler_backlog_timeout", self._scheduler_backlog_timeout
)
self._scheduler_scale_out_enabled = autoscale_conf.get(
"scheduler_scale_out_enabled", True
)
# Make worker_idle_timeout greater than scheduler_backlog_timeout to
# avoid cluster fluctuate back and forth。
self._worker_idle_timeout = autoscale_conf.get(
"worker_idle_timeout", 2 * self._scheduler_backlog_timeout
)
self._initial_round_scale_out_workers = autoscale_conf.get("initial_round_scale_out_workers", 2)
if self._initial_round_scale_out_workers < 2:
logger.warning("Initial round scale_out workers %s is less than 2, set it to 2.",
self._initial_round_scale_out_workers)
self._initial_round_scale_out_workers = 2
self._min_workers = autoscale_conf.get("min_workers", 1)
assert self._min_workers >= 1, "Mars need at least 1 worker."
self._max_workers = autoscale_conf.get("max_workers", 100)
Expand Down Expand Up @@ -297,52 +326,54 @@ async def _run(self):
logger.info("Canceled pending task backlog strategy.")
except Exception as e: # pragma: no cover
logger.exception("Exception occurred when try to auto scale")
raise e
pass

async def _run_round(self):
queueing_refs = list(self._autoscaler.queueing_refs.values())
if any([await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs]):
if self._scheduler_scale_out_enabled and await self.all_bands_busy(queueing_refs):
await self._scale_out(queueing_refs)
else:
await self._scale_in()

@staticmethod
async def all_bands_busy(queueing_refs):
return any(
[await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs]
)

async def _scale_out(self, queueing_refs):
logger.info(
"Try to scale out, current dynamic workers %s",
self._autoscaler.get_dynamic_worker_nums(),
)
start_time = time.time()
while not await self._autoscaler.request_worker():
logger.warning(
"Request worker failed, wait %s seconds and retry.",
self._scheduler_check_interval,
)
await asyncio.sleep(self._scheduler_check_interval)
await asyncio.sleep(self._scheduler_backlog_timeout)
rnd = 1
while any(
[await queueing_ref.all_bands_busy() for queueing_ref in queueing_refs]
):
worker_num = 2**rnd
while await self.all_bands_busy(queueing_refs):
worker_num = self._initial_round_scale_out_workers ** rnd
if (
self._autoscaler.get_dynamic_worker_nums() + worker_num
> self._max_workers
):
worker_num = (
self._max_workers - self._autoscaler.get_dynamic_worker_nums()
)
while set(
await asyncio.gather(
*[self._autoscaler.request_worker() for _ in range(worker_num)]
)
) == {None}:
if worker_num == 0:
break
requested_workers = set(await asyncio.gather(
*[self._autoscaler.request_worker() for _ in range(worker_num)]
))
while len(requested_workers) == 0 and await self.all_bands_busy(queueing_refs):
logger.warning(
"Request %s workers all failed, wait %s seconds and retry.",
worker_num,
self._scheduler_check_interval,
)
await asyncio.sleep(self._scheduler_check_interval)
requested_workers.update(await asyncio.gather(
*[self._autoscaler.request_worker() for _ in range(worker_num)]
))
rnd += 1
logger.info("Requested %s workers.", len(requested_workers))
await asyncio.sleep(self._sustained_scheduler_backlog_timeout)
logger.info(
"Scale out finished in %s round, took %s seconds, current dynamic workers %s",
Expand All @@ -368,12 +399,6 @@ async def _scale_in(self):
}
worker_addresses = set(band[0] for band in idle_bands)
if worker_addresses:
logger.debug(
"Bands %s of workers % has been idle for as least %s seconds.",
idle_bands,
worker_addresses,
self._worker_idle_timeout,
)
while (
worker_addresses
and self._autoscaler.get_dynamic_worker_nums() - len(worker_addresses)
Expand All @@ -390,6 +415,13 @@ async def _scale_in(self):
idle_bands.difference_update(
set(await self._autoscaler.get_worker_bands(worker_address))
)
if worker_addresses:
logger.info(
"Bands %s of workers % has been idle for as least %s seconds.",
idle_bands,
worker_addresses,
self._worker_idle_timeout,
)
if worker_addresses:
start_time = time.time()
logger.info(
Expand Down
Loading