Skip to content

Commit

Permalink
add
Browse files Browse the repository at this point in the history
  • Loading branch information
cblmemo committed Oct 15, 2023
1 parent e6e682f commit 5f78223
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 6 deletions.
29 changes: 24 additions & 5 deletions sky/spot/controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import time
import traceback
import typing
from typing import Tuple
from typing import List, Tuple

import filelock

Expand Down Expand Up @@ -438,7 +438,7 @@ def _handle_signal(job_id):
f'User sent {user_signal.value} signal.')


def _cleanup(job_id: int, dag_yaml: str):
def _cleanup(job_id: int, dag_yaml: str) -> bool:
"""Clean up the spot cluster(s) and storages.
(1) Clean up the succeeded task(s)' ephemeral storage. The storage has
Expand All @@ -452,13 +452,24 @@ def _cleanup(job_id: int, dag_yaml: str):
# NOTE: The code to get cluster name is same as what we did in the spot
# controller, we should keep it in sync with SpotController.__init__()
dag, _ = _get_dag_and_name(dag_yaml)
terminate_processes: List[multiprocessing.Process] = []
for task in dag.tasks:
cluster_name = spot_utils.generate_spot_cluster_name(task.name, job_id)
recovery_strategy.terminate_cluster(cluster_name)
process = multiprocessing.Process(
target=recovery_strategy.terminate_cluster, args=(cluster_name,))
process.start()
terminate_processes.append(process)
# Clean up Storages with persistent=False.
# TODO(zhwu): this assumes the specific backend.
backend = cloud_vm_ray_backend.CloudVmRayBackend()
backend.teardown_ephemeral_storage(task)
failed = False
for process in terminate_processes:
process.join()
if process.exitcode != 0:
logger.error(f'Failed to terminate cluster {cluster_name}.')
failed = True
return failed


def start(job_id, dag_yaml, retry_until_up):
Expand Down Expand Up @@ -511,10 +522,18 @@ def start(job_id, dag_yaml, retry_until_up):
# https://unix.stackexchange.com/questions/356408/strange-problem-with-trap-and-sigint
# But anyway, a clean solution is killing the controller process
# directly, and then cleanup the cluster state.
_cleanup(job_id, dag_yaml=dag_yaml)
failed = _cleanup(job_id, dag_yaml=dag_yaml)
if failed:
spot_state.set_failed(
job_id=job_id,
task_id=None,
failure_type=spot_state.SpotStatus.FAILED_CLEANUP,
failure_reason=('Failed to clean up the spot '
'cluster. Please check the '
'logs for details.'))
logger.info(f'Spot cluster of job {job_id} has been cleaned up.')

if cancelling:
if not failed and cancelling:
spot_state.set_cancelled(
job_id=job_id,
callback_func=spot_utils.event_callback_func(
Expand Down
7 changes: 6 additions & 1 deletion sky/spot/spot_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,9 @@ class SpotStatus(enum.Enum):
# FAILED_CONTROLLER: The job is finished with failure because of unexpected
# error in the controller process.
FAILED_CONTROLLER = 'FAILED_CONTROLLER'
# FAILED_CLEANUP: The job is finished with failure because of unexpected
# error in the cleanup process.
FAILED_CLEANUP = 'FAILED_CLEANUP'

def is_terminal(self) -> bool:
return self in self.terminal_statuses()
Expand All @@ -220,6 +223,7 @@ def terminal_statuses(cls) -> List['SpotStatus']:
cls.FAILED_PRECHECKS,
cls.FAILED_NO_RESOURCE,
cls.FAILED_CONTROLLER,
cls.FAILED_CLEANUP,
cls.CANCELLING,
cls.CANCELLED,
]
Expand All @@ -228,7 +232,7 @@ def terminal_statuses(cls) -> List['SpotStatus']:
def failure_statuses(cls) -> List['SpotStatus']:
return [
cls.FAILED, cls.FAILED_SETUP, cls.FAILED_PRECHECKS,
cls.FAILED_NO_RESOURCE, cls.FAILED_CONTROLLER
cls.FAILED_NO_RESOURCE, cls.FAILED_CONTROLLER, cls.FAILED_CLEANUP
]


Expand All @@ -244,6 +248,7 @@ def failure_statuses(cls) -> List['SpotStatus']:
SpotStatus.FAILED_SETUP: colorama.Fore.RED,
SpotStatus.FAILED_NO_RESOURCE: colorama.Fore.RED,
SpotStatus.FAILED_CONTROLLER: colorama.Fore.RED,
SpotStatus.FAILED_CLEANUP: colorama.Fore.RED,
SpotStatus.CANCELLING: colorama.Fore.YELLOW,
SpotStatus.CANCELLED: colorama.Fore.YELLOW,
}
Expand Down

0 comments on commit 5f78223

Please sign in to comment.