Skip to content

Commit

Permalink
use a control process to handle signal and terminate service
Browse files Browse the repository at this point in the history
  • Loading branch information
cblmemo committed Oct 15, 2023
1 parent 673c747 commit 003c655
Show file tree
Hide file tree
Showing 14 changed files with 394 additions and 494 deletions.
70 changes: 3 additions & 67 deletions sky/core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1145,7 +1145,6 @@ def serve_down(service_name: str, purge: bool = False) -> None:
with ux_utils.print_exception_no_traceback():
raise ValueError(f'Service {service_name!r} not found.')

service_handle: serve.ServiceHandle = service_record['handle']
controller_name = service_record['controller_name']
handle = global_user_state.get_handle_from_cluster_name(controller_name)

Expand All @@ -1165,38 +1164,18 @@ def serve_down(service_name: str, purge: bool = False) -> None:
code = serve.ServeCodeGen.terminate_service(service_name)

try:
(returncode, terminate_service_payload,
stderr) = backend.run_on_head(handle,
code,
require_outputs=True,
stream_logs=False,
separate_stderr=True)
returncode, stdout, _ = backend.run_on_head(
handle, code, require_outputs=True, stream_logs=False)
except exceptions.FetchIPError as e:
raise RuntimeError(controller_fetch_ip_error_message) from e

subprocess_utils.handle_returncode(
returncode,
code, ('Failed when submit termination request to controller '
f'of service {service_name!r}'),
stderr,
stdout,
stream_logs=False)

resp = serve.load_terminate_service_result(
terminate_service_payload)
if resp.status_code != 200:
with ux_utils.print_exception_no_traceback():
raise RuntimeError('Failed to terminate replica of service '
f'{service_name!r} due to request '
f'failure: {resp.text}')
msg = resp.json().get('message')
if msg:
with ux_utils.print_exception_no_traceback():
raise RuntimeError(
'Unexpected message when tearing down replica of '
f'service {service_name!r}: {msg}. Please login to '
'the controller by `ssh <controller-name>` and '
'make sure the service is properly cleaned up.')

# We want to make sure no matter what error happens, we can still
# clean up the record if purge is True.
# pylint: disable=broad-except
Expand All @@ -1209,52 +1188,9 @@ def serve_down(service_name: str, purge: bool = False) -> None:
with ux_utils.print_exception_no_traceback():
raise RuntimeError(e) from e

try:
if handle is not None:
assert isinstance(handle, backends.CloudVmRayResourceHandle)
backend = backends.CloudVmRayBackend()
backend.register_info(minimize_logging=True)

# Cleanup all files on controller related to this service.
# We have a 10-min grace period for the controller to autostop,
# so it should be fine if this is the last service on the
# controller and its job is the only one running.
# Also, Cleanup the service record in controller VM
code = serve.ServeCodeGen.cleanup_service(service_name)
returncode, _, stderr = backend.run_on_head(handle,
code,
require_outputs=True,
stream_logs=False,
separate_stderr=True)
subprocess_utils.handle_returncode(
returncode,
code, (f'Failed cleaning up service {service_name!r}'),
stderr,
stream_logs=False)

# same as above.
# pylint: disable=broad-except
except Exception as e:
if purge:
logger.warning(
f'Ignoring error when clean up service {service_name!r}: '
f'{common_utils.format_exception(e)}')
else:
with ux_utils.print_exception_no_traceback():
raise RuntimeError(e) from e

# TODO(tian): Maybe add a post_cleanup function?
controller_yaml_path = serve.generate_controller_yaml_file_name(
service_name)
if os.path.exists(controller_yaml_path):
os.remove(controller_yaml_path)
try:
service_handle.cleanup_ephemeral_storage()
# same as above.
except Exception as e: # pylint: disable=broad-except
if purge:
logger.warning('Ignoring error when cleaning up ephemeral storage '
f'of service {service_name}: {e}')
else:
raise RuntimeError(e) from e
global_user_state.remove_service(service_name)
34 changes: 12 additions & 22 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ def upload(self) -> None:
"""
raise NotImplementedError

def delete(self, silent: bool = False) -> None:
def delete(self) -> None:
"""Removes the Storage object from the cloud."""
raise NotImplementedError

Expand Down Expand Up @@ -495,7 +495,7 @@ def __init__(self,
(isinstance(self.source, list) or
not data_utils.is_cloud_store_url(self.source))):
msg = ' and uploading from source'
logger.debug(f'Verifying bucket{msg} for storage {self.name}')
logger.info(f'Verifying bucket{msg} for storage {self.name}')
self.sync_all_stores()

else:
Expand Down Expand Up @@ -726,7 +726,7 @@ def add_store(self, store_type: Union[str, StoreType]) -> AbstractStore:
store_type = StoreType(store_type)

if store_type in self.stores:
logger.debug(f'Storage type {store_type} already exists.')
logger.info(f'Storage type {store_type} already exists.')
return self.stores[store_type]

store_cls: Type[AbstractStore]
Expand Down Expand Up @@ -786,9 +786,7 @@ def _add_store(self, store: AbstractStore, is_reconstructed: bool = False):
global_user_state.add_or_update_storage(self.name, self.handle,
StorageStatus.INIT)

def delete(self,
store_type: Optional[StoreType] = None,
silent: bool = False) -> None:
def delete(self, store_type: Optional[StoreType] = None) -> None:
"""Deletes data for all sky-managed storage objects.
If a storage is not managed by sky, it is not deleted from the cloud.
Expand All @@ -808,7 +806,7 @@ def delete(self,
# remove handle and return
if is_sky_managed:
self.handle.remove_store(store)
store.delete(silent=silent)
store.delete()
# Check remaining stores - if none is sky managed, remove
# the storage from global_user_state.
delete = all(
Expand All @@ -818,16 +816,16 @@ def delete(self,
else:
global_user_state.set_storage_handle(self.name, self.handle)
elif self.force_delete:
store.delete(silent=silent)
store.delete()
# Remove store from bookkeeping
del self.stores[store_type]
else:
for _, store in self.stores.items():
if store.is_sky_managed:
self.handle.remove_store(store)
store.delete(silent=silent)
store.delete()
elif self.force_delete:
store.delete(silent=silent)
store.delete()
self.stores = {}
# Remove storage from global_user_state if present
global_user_state.remove_storage(self.name)
Expand Down Expand Up @@ -1092,10 +1090,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, silent: bool = False) -> None:
def delete(self) -> None:
deleted_by_skypilot = self._delete_s3_bucket(self.name)
if silent:
return
if deleted_by_skypilot:
msg_str = f'Deleted S3 bucket {self.name}.'
else:
Expand Down Expand Up @@ -1492,10 +1488,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, silent: bool = False) -> None:
def delete(self) -> None:
deleted_by_skypilot = self._delete_gcs_bucket(self.name)
if silent:
return
if deleted_by_skypilot:
msg_str = f'Deleted GCS bucket {self.name}.'
else:
Expand Down Expand Up @@ -1866,10 +1860,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, silent: bool = False) -> None:
def delete(self) -> None:
deleted_by_skypilot = self._delete_r2_bucket(self.name)
if silent:
return
if deleted_by_skypilot:
msg_str = f'Deleted R2 bucket {self.name}.'
else:
Expand Down Expand Up @@ -2273,10 +2265,8 @@ def upload(self):
raise exceptions.StorageUploadError(
f'Upload failed for store {self.name}') from e

def delete(self, silent: bool = False) -> None:
def delete(self) -> None:
self._delete_cos_bucket()
if silent:
return
logger.info(f'{colorama.Fore.GREEN}Deleted COS bucket {self.name}.'
f'{colorama.Style.RESET_ALL}')

Expand Down
5 changes: 5 additions & 0 deletions sky/exceptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,3 +250,8 @@ def __init__(self, region: str,
self.reason = reason

super().__init__(reason.message)


class ServeUserTerminatedError(Exception):
"""Raised when a user tear down the service."""
pass
8 changes: 0 additions & 8 deletions sky/execution.py
Original file line number Diff line number Diff line change
Expand Up @@ -1056,13 +1056,6 @@ def serve_up(
'`sky serve up` process hanging abnormally.') from e

_maybe_translate_local_file_mounts_and_sync_up(task, prefix='serve')
ephemeral_storage = []
if task.storage_mounts is not None:
for storage in task.storage_mounts.values():
if not storage.persistent:
ephemeral_storage.append(storage.to_yaml_config())
service_handle.ephemeral_storage = ephemeral_storage
global_user_state.set_service_handle(service_name, service_handle)

with tempfile.NamedTemporaryFile(prefix=f'serve-task-{service_name}-',
mode='w') as f:
Expand All @@ -1086,7 +1079,6 @@ def serve_up(
'service_name': service_name,
'controller_port': controller_port,
'load_balancer_port': load_balancer_port,
'replica_port': task.service.replica_port,
'controller_log_file': controller_log_file,
'load_balancer_log_file': load_balancer_log_file,
'envs': _shared_controller_env_vars(),
Expand Down
1 change: 0 additions & 1 deletion sky/serve/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
from sky.serve.serve_utils import generate_replica_cluster_name
from sky.serve.serve_utils import get_available_controller_name
from sky.serve.serve_utils import load_latest_info
from sky.serve.serve_utils import load_terminate_service_result
from sky.serve.serve_utils import ServeCodeGen
from sky.serve.serve_utils import ServiceComponent
from sky.serve.serve_utils import ServiceHandle
Expand Down
6 changes: 6 additions & 0 deletions sky/serve/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,12 @@
CONTROLLER_FILE_LOCK_PATH = f'{SERVE_PREFIX}/controller.lock'
CONTROLLER_FILE_LOCK_TIMEOUT = 20

# Signal file path for controller to handle signals.
SIGNAL_FILE_PATH = '/tmp/sky_serve_controller_signal_{}'

# Timeout for `sky serve down`.
SERVICE_TERMINATION_TIMEOUT = 180

# The time interval for load balancer to sync with controller. Every time the
# load balancer syncs with controller, it will update all available replica ips
# for each service, also send the number of requests in last query interval.
Expand Down
Loading

0 comments on commit 003c655

Please sign in to comment.