diff --git a/docs/source/getting-started/installation.rst b/docs/source/getting-started/installation.rst index deb2307b67b..1d36b5ef6b8 100644 --- a/docs/source/getting-started/installation.rst +++ b/docs/source/getting-started/installation.rst @@ -59,6 +59,7 @@ Install SkyPilot using pip: pip install "skypilot-nightly[runpod]" pip install "skypilot-nightly[fluidstack]" pip install "skypilot-nightly[paperspace]" + pip install "skypilot-nightly[do]" pip install "skypilot-nightly[cudo]" pip install "skypilot-nightly[ibm]" pip install "skypilot-nightly[scp]" diff --git a/sky/adaptors/do.py b/sky/adaptors/do.py new file mode 100644 index 00000000000..d619efebc1c --- /dev/null +++ b/sky/adaptors/do.py @@ -0,0 +1,20 @@ +"""Digital Ocean cloud adaptors""" + +# pylint: disable=import-outside-toplevel + +from sky.adaptors import common + +_IMPORT_ERROR_MESSAGE = ('Failed to import dependencies for DO. ' + 'Try pip install "skypilot[do]"') +pydo = common.LazyImport('pydo', import_error_message=_IMPORT_ERROR_MESSAGE) +azure = common.LazyImport('azure', import_error_message=_IMPORT_ERROR_MESSAGE) +_LAZY_MODULES = (pydo, azure) + + +# `pydo`` inherits Azure exceptions. See: +# https://github.com/digitalocean/pydo/blob/7b01498d99eb0d3a772366b642e5fab3d6fc6aa2/examples/poc_droplets_volumes_sshkeys.py#L6 +@common.load_lazy_modules(modules=_LAZY_MODULES) +def exceptions(): + """Azure exceptions.""" + from azure.core import exceptions as azure_exceptions + return azure_exceptions diff --git a/sky/backends/backend_utils.py b/sky/backends/backend_utils.py index 0333cf49602..1de799e7cf8 100644 --- a/sky/backends/backend_utils.py +++ b/sky/backends/backend_utils.py @@ -1000,6 +1000,7 @@ def _add_auth_to_cluster_config(cloud: clouds.Cloud, cluster_config_file: str): clouds.Cudo, clouds.Paperspace, clouds.Azure, + clouds.DO, )): config = auth.configure_ssh_info(config) elif isinstance(cloud, clouds.GCP): diff --git a/sky/backends/cloud_vm_ray_backend.py b/sky/backends/cloud_vm_ray_backend.py index 9d94f469df3..156f43181b2 100644 --- a/sky/backends/cloud_vm_ray_backend.py +++ b/sky/backends/cloud_vm_ray_backend.py @@ -178,6 +178,7 @@ def _get_cluster_config_template(cloud): clouds.SCP: 'scp-ray.yml.j2', clouds.OCI: 'oci-ray.yml.j2', clouds.Paperspace: 'paperspace-ray.yml.j2', + clouds.DO: 'do-ray.yml.j2', clouds.RunPod: 'runpod-ray.yml.j2', clouds.Kubernetes: 'kubernetes-ray.yml.j2', clouds.Vsphere: 'vsphere-ray.yml.j2', diff --git a/sky/clouds/__init__.py b/sky/clouds/__init__.py index c4d46e93adf..24b805fe8bc 100644 --- a/sky/clouds/__init__.py +++ b/sky/clouds/__init__.py @@ -15,6 +15,7 @@ from sky.clouds.aws import AWS from sky.clouds.azure import Azure from sky.clouds.cudo import Cudo +from sky.clouds.do import DO from sky.clouds.fluidstack import Fluidstack from sky.clouds.gcp import GCP from sky.clouds.ibm import IBM @@ -34,6 +35,7 @@ 'Cudo', 'GCP', 'Lambda', + 'DO', 'Paperspace', 'SCP', 'RunPod', diff --git a/sky/clouds/do.py b/sky/clouds/do.py new file mode 100644 index 00000000000..3a64ead3ad0 --- /dev/null +++ b/sky/clouds/do.py @@ -0,0 +1,303 @@ +""" Digital Ocean Cloud. """ + +import json +import typing +from typing import Dict, Iterator, List, Optional, Tuple, Union + +from sky import clouds +from sky.adaptors import do +from sky.clouds import service_catalog +from sky.provision.do import utils as do_utils +from sky.utils import resources_utils + +if typing.TYPE_CHECKING: + from sky import resources as resources_lib + +_CREDENTIAL_FILE = 'config.yaml' + + +@clouds.CLOUD_REGISTRY.register(aliases=['digitalocean']) +class DO(clouds.Cloud): + """Digital Ocean Cloud""" + + _REPR = 'DO' + _CLOUD_UNSUPPORTED_FEATURES = { + clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: + 'Migrating ' + f'disk is not supported in {_REPR}.', + clouds.CloudImplementationFeatures.SPOT_INSTANCE: + 'Spot instances are ' + f'not supported in {_REPR}.', + clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: + 'Custom disk tiers' + f' is not supported in {_REPR}.', + } + # DO maximum node name length defined as <= 255 + # https://docs.digitalocean.com/reference/api/api-reference/#operation/droplets_create + # 255 - 8 = 247 characters since + # our provisioner adds additional `-worker`. + _MAX_CLUSTER_NAME_LEN_LIMIT = 247 + _regions: List[clouds.Region] = [] + + # Using the latest SkyPilot provisioner API to provision and check status. + PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT + STATUS_VERSION = clouds.StatusVersion.SKYPILOT + + @classmethod + def _unsupported_features_for_resources( + cls, resources: 'resources_lib.Resources' + ) -> Dict[clouds.CloudImplementationFeatures, str]: + """The features not supported based on the resources provided. + + This method is used by check_features_are_supported() to check if the + cloud implementation supports all the requested features. + + Returns: + A dict of {feature: reason} for the features not supported by the + cloud implementation. + """ + del resources # unused + return cls._CLOUD_UNSUPPORTED_FEATURES + + @classmethod + def _max_cluster_name_length(cls) -> Optional[int]: + return cls._MAX_CLUSTER_NAME_LEN_LIMIT + + @classmethod + def regions_with_offering( + cls, + instance_type: str, + accelerators: Optional[Dict[str, int]], + use_spot: bool, + region: Optional[str], + zone: Optional[str], + ) -> List[clouds.Region]: + assert zone is None, 'DO does not support zones.' + del accelerators, zone # unused + if use_spot: + return [] + regions = service_catalog.get_region_zones_for_instance_type( + instance_type, use_spot, 'DO') + if region is not None: + regions = [r for r in regions if r.name == region] + return regions + + @classmethod + def get_vcpus_mem_from_instance_type( + cls, + instance_type: str, + ) -> Tuple[Optional[float], Optional[float]]: + return service_catalog.get_vcpus_mem_from_instance_type(instance_type, + clouds='DO') + + @classmethod + def zones_provision_loop( + cls, + *, + region: str, + num_nodes: int, + instance_type: str, + accelerators: Optional[Dict[str, int]] = None, + use_spot: bool = False, + ) -> Iterator[None]: + del num_nodes # unused + regions = cls.regions_with_offering(instance_type, + accelerators, + use_spot, + region=region, + zone=None) + for r in regions: + assert r.zones is None, r + yield r.zones + + def instance_type_to_hourly_cost( + self, + instance_type: str, + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None, + ) -> float: + return service_catalog.get_hourly_cost( + instance_type, + use_spot=use_spot, + region=region, + zone=zone, + clouds='DO', + ) + + def accelerators_to_hourly_cost( + self, + accelerators: Dict[str, int], + use_spot: bool, + region: Optional[str] = None, + zone: Optional[str] = None, + ) -> float: + """Returns the hourly cost of the accelerators, in dollars/hour.""" + # the acc price is include in the instance price. + del accelerators, use_spot, region, zone # unused + return 0.0 + + def get_egress_cost(self, num_gigabytes: float) -> float: + return 0.0 + + def __repr__(self): + return self._REPR + + @classmethod + def get_default_instance_type( + cls, + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[resources_utils.DiskTier] = None, + ) -> Optional[str]: + """Returns the default instance type for DO.""" + return service_catalog.get_default_instance_type(cpus=cpus, + memory=memory, + disk_tier=disk_tier, + clouds='DO') + + @classmethod + def get_accelerators_from_instance_type( + cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return service_catalog.get_accelerators_from_instance_type( + instance_type, clouds='DO') + + @classmethod + def get_zone_shell_cmd(cls) -> Optional[str]: + return None + + def make_deploy_resources_variables( + self, + resources: 'resources_lib.Resources', + cluster_name: resources_utils.ClusterName, + region: 'clouds.Region', + zones: Optional[List['clouds.Zone']], + num_nodes: int, + dryrun: bool = False) -> Dict[str, Optional[str]]: + del zones, dryrun, cluster_name + + r = resources + acc_dict = self.get_accelerators_from_instance_type(r.instance_type) + if acc_dict is not None: + custom_resources = json.dumps(acc_dict, separators=(',', ':')) + else: + custom_resources = None + image_id = None + if (resources.image_id is not None and + resources.extract_docker_image() is None): + if None in resources.image_id: + image_id = resources.image_id[None] + else: + assert region.name in resources.image_id + image_id = resources.image_id[region.name] + return { + 'instance_type': resources.instance_type, + 'custom_resources': custom_resources, + 'region': region.name, + **({ + 'image_id': image_id + } if image_id else {}) + } + + def _get_feasible_launchable_resources( + self, resources: 'resources_lib.Resources' + ) -> resources_utils.FeasibleResources: + """Returns a list of feasible resources for the given resources.""" + if resources.use_spot: + # TODO: Add hints to all return values in this method to help + # users understand why the resources are not launchable. + return resources_utils.FeasibleResources([], [], None) + if resources.instance_type is not None: + assert resources.is_launchable(), resources + resources = resources.copy(accelerators=None) + return resources_utils.FeasibleResources([resources], [], None) + + def _make(instance_list): + resource_list = [] + for instance_type in instance_list: + r = resources.copy( + cloud=DO(), + instance_type=instance_type, + accelerators=None, + cpus=None, + ) + resource_list.append(r) + return resource_list + + # Currently, handle a filter on accelerators only. + accelerators = resources.accelerators + if accelerators is None: + # Return a default instance type + default_instance_type = DO.get_default_instance_type( + cpus=resources.cpus, + memory=resources.memory, + disk_tier=resources.disk_tier) + return resources_utils.FeasibleResources( + _make([default_instance_type]), [], None) + + assert len(accelerators) == 1, resources + acc, acc_count = list(accelerators.items())[0] + (instance_list, fuzzy_candidate_list) = ( + service_catalog.get_instance_type_for_accelerator( + acc, + acc_count, + use_spot=resources.use_spot, + cpus=resources.cpus, + memory=resources.memory, + region=resources.region, + zone=resources.zone, + clouds='DO', + )) + if instance_list is None: + return resources_utils.FeasibleResources([], fuzzy_candidate_list, + None) + return resources_utils.FeasibleResources(_make(instance_list), + fuzzy_candidate_list, None) + + @classmethod + def check_credentials(cls) -> Tuple[bool, Optional[str]]: + """Verify that the user has valid credentials for DO.""" + try: + # attempt to make a CURL request for listing instances + do_utils.client().droplets.list() + except do.exceptions().HttpResponseError as err: + return False, str(err) + except do_utils.DigitalOceanError as err: + return False, str(err) + + return True, None + + def get_credential_file_mounts(self) -> Dict[str, str]: + try: + do_utils.client() + return { + f'~/.config/doctl/{_CREDENTIAL_FILE}': do_utils.CREDENTIALS_PATH + } + except do_utils.DigitalOceanError: + return {} + + @classmethod + def get_current_user_identity(cls) -> Optional[List[str]]: + # NOTE: used for very advanced SkyPilot functionality + # Can implement later if desired + return None + + @classmethod + def get_image_size(cls, image_id: str, region: Optional[str]) -> float: + del region + try: + response = do_utils.client().images.get(image_id=image_id) + return response['image']['size_gigabytes'] + except do.exceptions().HttpResponseError as err: + raise do_utils.DigitalOceanError( + 'HTTP error while retrieving size of ' + f'image_id {response}: {err.error.message}') from err + except KeyError as err: + raise do_utils.DigitalOceanError( + f'No image_id `{image_id}` found') from err + + def instance_type_exists(self, instance_type: str) -> bool: + return service_catalog.instance_type_exists(instance_type, 'DO') + + def validate_region_zone(self, region: Optional[str], zone: Optional[str]): + return service_catalog.validate_region_zone(region, zone, clouds='DO') diff --git a/sky/clouds/service_catalog/constants.py b/sky/clouds/service_catalog/constants.py index a125258ac35..945152582f6 100644 --- a/sky/clouds/service_catalog/constants.py +++ b/sky/clouds/service_catalog/constants.py @@ -4,4 +4,4 @@ CATALOG_DIR = '~/.sky/catalogs' ALL_CLOUDS = ('aws', 'azure', 'gcp', 'ibm', 'lambda', 'scp', 'oci', 'kubernetes', 'runpod', 'vsphere', 'cudo', 'fluidstack', - 'paperspace') + 'paperspace', 'do') diff --git a/sky/clouds/service_catalog/do_catalog.py b/sky/clouds/service_catalog/do_catalog.py new file mode 100644 index 00000000000..40a53aa6bc4 --- /dev/null +++ b/sky/clouds/service_catalog/do_catalog.py @@ -0,0 +1,111 @@ +"""Digital ocean service catalog. + +This module loads the service catalog file and can be used to +query instance types and pricing information for digital ocean. +""" + +import typing +from typing import Dict, List, Optional, Tuple, Union + +from sky.clouds.service_catalog import common +from sky.utils import ux_utils + +if typing.TYPE_CHECKING: + from sky.clouds import cloud + +_df = common.read_catalog('do/vms.csv') + + +def instance_type_exists(instance_type: str) -> bool: + return common.instance_type_exists_impl(_df, instance_type) + + +def validate_region_zone( + region: Optional[str], + zone: Optional[str]) -> Tuple[Optional[str], Optional[str]]: + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('DO does not support zones.') + return common.validate_region_zone_impl('DO', _df, region, zone) + + +def get_hourly_cost( + instance_type: str, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None, +) -> float: + """Returns the cost, or the cheapest cost among all zones for spot.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('DO does not support zones.') + return common.get_hourly_cost_impl(_df, instance_type, use_spot, region, + zone) + + +def get_vcpus_mem_from_instance_type( + instance_type: str,) -> Tuple[Optional[float], Optional[float]]: + return common.get_vcpus_mem_from_instance_type_impl(_df, instance_type) + + +def get_default_instance_type( + cpus: Optional[str] = None, + memory: Optional[str] = None, + disk_tier: Optional[str] = None, +) -> Optional[str]: + # NOTE: After expanding catalog to multiple entries, you may + # want to specify a default instance type or family. + del disk_tier # unused + return common.get_instance_type_for_cpus_mem_impl(_df, cpus, memory) + + +def get_accelerators_from_instance_type( + instance_type: str) -> Optional[Dict[str, Union[int, float]]]: + return common.get_accelerators_from_instance_type_impl(_df, instance_type) + + +def get_instance_type_for_accelerator( + acc_name: str, + acc_count: int, + cpus: Optional[str] = None, + memory: Optional[str] = None, + use_spot: bool = False, + region: Optional[str] = None, + zone: Optional[str] = None, +) -> Tuple[Optional[List[str]], List[str]]: + """Returns a list of instance types that have the given accelerator.""" + if zone is not None: + with ux_utils.print_exception_no_traceback(): + raise ValueError('DO does not support zones.') + return common.get_instance_type_for_accelerator_impl( + df=_df, + acc_name=acc_name, + acc_count=acc_count, + cpus=cpus, + memory=memory, + use_spot=use_spot, + region=region, + zone=zone, + ) + + +def get_region_zones_for_instance_type(instance_type: str, + use_spot: bool) -> List['cloud.Region']: + df = _df[_df['InstanceType'] == instance_type] + return common.get_region_zones(df, use_spot) + + +def list_accelerators( + gpus_only: bool, + name_filter: Optional[str], + region_filter: Optional[str], + quantity_filter: Optional[int], + case_sensitive: bool = True, + all_regions: bool = False, + require_price: bool = True, +) -> Dict[str, List[common.InstanceTypeInfo]]: + """Returns all instance types in DO offering GPUs.""" + del require_price # unused + return common.list_accelerators_impl('DO', _df, gpus_only, name_filter, + region_filter, quantity_filter, + case_sensitive, all_regions) diff --git a/sky/provision/do/__init__.py b/sky/provision/do/__init__.py new file mode 100644 index 00000000000..75502d3cb05 --- /dev/null +++ b/sky/provision/do/__init__.py @@ -0,0 +1,11 @@ +"""DO provisioner for SkyPilot.""" + +from sky.provision.do.config import bootstrap_instances +from sky.provision.do.instance import cleanup_ports +from sky.provision.do.instance import get_cluster_info +from sky.provision.do.instance import open_ports +from sky.provision.do.instance import query_instances +from sky.provision.do.instance import run_instances +from sky.provision.do.instance import stop_instances +from sky.provision.do.instance import terminate_instances +from sky.provision.do.instance import wait_instances diff --git a/sky/provision/do/config.py b/sky/provision/do/config.py new file mode 100644 index 00000000000..0b10f7f7698 --- /dev/null +++ b/sky/provision/do/config.py @@ -0,0 +1,14 @@ +"""Paperspace configuration bootstrapping.""" + +from sky import sky_logging +from sky.provision import common + +logger = sky_logging.init_logger(__name__) + + +def bootstrap_instances( + region: str, cluster_name: str, + config: common.ProvisionConfig) -> common.ProvisionConfig: + """Bootstraps instances for the given cluster.""" + del region, cluster_name + return config diff --git a/sky/provision/do/constants.py b/sky/provision/do/constants.py new file mode 100644 index 00000000000..0010646f873 --- /dev/null +++ b/sky/provision/do/constants.py @@ -0,0 +1,10 @@ +"""DO cloud constants +""" + +POLL_INTERVAL = 5 +WAIT_DELETE_VOLUMES = 5 + +GPU_IMAGES = { + 'gpu-h100x1-80gb': 'gpu-h100x1-base', + 'gpu-h100x8-640gb': 'gpu-h100x8-base', +} diff --git a/sky/provision/do/instance.py b/sky/provision/do/instance.py new file mode 100644 index 00000000000..098ef6e0595 --- /dev/null +++ b/sky/provision/do/instance.py @@ -0,0 +1,287 @@ +"""DigitalOcean instance provisioning.""" + +import time +from typing import Any, Dict, List, Optional +import uuid + +from sky import sky_logging +from sky import status_lib +from sky.provision import common +from sky.provision.do import constants +from sky.provision.do import utils + +# The maximum number of times to poll for the status of an operation +MAX_POLLS = 60 // constants.POLL_INTERVAL +# Stopping instances can take several minutes, so we increase the timeout +MAX_POLLS_FOR_UP_OR_STOP = MAX_POLLS * 8 + +logger = sky_logging.init_logger(__name__) + + +def _get_head_instance( + instances: Dict[str, Dict[str, Any]]) -> Optional[Dict[str, Any]]: + for instance_name, instance_meta in instances.items(): + if instance_name.endswith('-head'): + return instance_meta + return None + + +def run_instances(region: str, cluster_name_on_cloud: str, + config: common.ProvisionConfig) -> common.ProvisionRecord: + """Runs instances for the given cluster.""" + + pending_status = ['new'] + newly_started_instances = utils.filter_instances(cluster_name_on_cloud, + pending_status + ['off']) + while True: + instances = utils.filter_instances(cluster_name_on_cloud, + pending_status) + if not instances: + break + instance_statuses = [ + instance['status'] for instance in instances.values() + ] + logger.info(f'Waiting for {len(instances)} instances to be ready: ' + f'{instance_statuses}') + time.sleep(constants.POLL_INTERVAL) + + exist_instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=pending_status + + ['active', 'off']) + if len(exist_instances) > config.count: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, but {config.count} are required.') + + stopped_instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=['off']) + for instance in stopped_instances.values(): + utils.start_instance(instance) + for _ in range(MAX_POLLS_FOR_UP_OR_STOP): + instances = utils.filter_instances(cluster_name_on_cloud, ['off']) + if len(instances) == 0: + break + num_stopped_instances = len(stopped_instances) + num_restarted_instances = num_stopped_instances - len(instances) + logger.info( + f'Waiting for {num_restarted_instances}/{num_stopped_instances} ' + 'stopped instances to be restarted.') + time.sleep(constants.POLL_INTERVAL) + else: + msg = ('run_instances: Failed to restart all' + 'instances possibly due to to capacity issue.') + logger.warning(msg) + raise RuntimeError(msg) + + exist_instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=['active']) + head_instance = _get_head_instance(exist_instances) + to_start_count = config.count - len(exist_instances) + if to_start_count < 0: + raise RuntimeError( + f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, but {config.count} are required.') + if to_start_count == 0: + if head_instance is None: + head_instance = list(exist_instances.values())[0] + utils.rename_instance( + head_instance, + f'{cluster_name_on_cloud}-{uuid.uuid4().hex[:4]}-head') + assert head_instance is not None, ('`head_instance` should not be None') + logger.info(f'Cluster {cluster_name_on_cloud} already has ' + f'{len(exist_instances)} nodes, no need to start more.') + return common.ProvisionRecord( + provider_name='do', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance['name'], + resumed_instance_ids=list(newly_started_instances.keys()), + created_instance_ids=[], + ) + + created_instances: List[Dict[str, Any]] = [] + for _ in range(to_start_count): + instance_type = 'head' if head_instance is None else 'worker' + instance = utils.create_instance( + region=region, + cluster_name_on_cloud=cluster_name_on_cloud, + instance_type=instance_type, + config=config) + logger.info(f'Launched instance {instance["name"]}.') + created_instances.append(instance) + if head_instance is None: + head_instance = instance + + # Wait for instances to be ready. + for _ in range(MAX_POLLS_FOR_UP_OR_STOP): + instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=['active']) + logger.info('Waiting for instances to be ready: ' + f'({len(instances)}/{config.count}).') + if len(instances) == config.count: + break + + time.sleep(constants.POLL_INTERVAL) + else: + # Failed to launch config.count of instances after max retries + msg = 'run_instances: Failed to create the instances' + logger.warning(msg) + raise RuntimeError(msg) + assert head_instance is not None, 'head_instance should not be None' + return common.ProvisionRecord( + provider_name='do', + cluster_name=cluster_name_on_cloud, + region=region, + zone=None, + head_instance_id=head_instance['name'], + resumed_instance_ids=list(stopped_instances.keys()), + created_instance_ids=[ + instance['name'] for instance in created_instances + ], + ) + + +def wait_instances(region: str, cluster_name_on_cloud: str, + state: Optional[status_lib.ClusterStatus]) -> None: + del region, cluster_name_on_cloud, state # unused + # We already wait on ready state in `run_instances` no need + + +def stop_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + del provider_config # unused + all_instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=None) + num_instances = len(all_instances) + + # Request a stop on all instances + for instance_name, instance_meta in all_instances.items(): + if worker_only and instance_name.endswith('-head'): + num_instances -= 1 + continue + utils.stop_instance(instance_meta) + + # Wait for instances to stop + for _ in range(MAX_POLLS_FOR_UP_OR_STOP): + all_instances = utils.filter_instances(cluster_name_on_cloud, ['off']) + if len(all_instances) >= num_instances: + break + time.sleep(constants.POLL_INTERVAL) + else: + raise RuntimeError(f'Maximum number of polls: ' + f'{MAX_POLLS_FOR_UP_OR_STOP} reached. ' + f'Instance {all_instances} is still not in ' + 'STOPPED status.') + + +def terminate_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + worker_only: bool = False, +) -> None: + """See sky/provision/__init__.py""" + del provider_config # unused + instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=None) + for instance_name, instance_meta in instances.items(): + logger.debug(f'Terminating instance {instance_name}') + if worker_only and instance_name.endswith('-head'): + continue + utils.down_instance(instance_meta) + + for _ in range(MAX_POLLS_FOR_UP_OR_STOP): + instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=None) + if len(instances) == 0 or len(instances) <= 1 and worker_only: + break + time.sleep(constants.POLL_INTERVAL) + else: + msg = ('Failed to delete all instances') + logger.warning(msg) + raise RuntimeError(msg) + + +def get_cluster_info( + region: str, + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, +) -> common.ClusterInfo: + del region # unused + running_instances = utils.filter_instances(cluster_name_on_cloud, + ['active']) + instances: Dict[str, List[common.InstanceInfo]] = {} + head_instance: Optional[str] = None + for instance_name, instance_meta in running_instances.items(): + if instance_name.endswith('-head'): + head_instance = instance_name + for net in instance_meta['networks']['v4']: + if net['type'] == 'public': + instance_ip = net['ip_address'] + break + instances[instance_name] = [ + common.InstanceInfo( + instance_id=instance_meta['name'], + internal_ip=instance_ip, + external_ip=instance_ip, + ssh_port=22, + tags={}, + ) + ] + + assert head_instance is not None, 'no head instance found' + return common.ClusterInfo( + instances=instances, + head_instance_id=head_instance, + provider_name='do', + provider_config=provider_config, + ) + + +def query_instances( + cluster_name_on_cloud: str, + provider_config: Optional[Dict[str, Any]] = None, + non_terminated_only: bool = True, +) -> Dict[str, Optional[status_lib.ClusterStatus]]: + """See sky/provision/__init__.py""" + # terminated instances are not retrieved by the + # API making `non_terminated_only` argument moot. + del non_terminated_only + assert provider_config is not None, (cluster_name_on_cloud, provider_config) + instances = utils.filter_instances(cluster_name_on_cloud, + status_filters=None) + + status_map = { + 'new': status_lib.ClusterStatus.INIT, + 'archive': status_lib.ClusterStatus.INIT, + 'active': status_lib.ClusterStatus.UP, + 'off': status_lib.ClusterStatus.STOPPED, + } + statuses: Dict[str, Optional[status_lib.ClusterStatus]] = {} + for instance_meta in instances.values(): + status = status_map[instance_meta['status']] + statuses[instance_meta['name']] = status + return statuses + + +def open_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + """See sky/provision/__init__.py""" + logger.debug( + f'Skip opening ports {ports} for DigitalOcean instances, as all ' + 'ports are open by default.') + del cluster_name_on_cloud, provider_config, ports + + +def cleanup_ports( + cluster_name_on_cloud: str, + ports: List[str], + provider_config: Optional[Dict[str, Any]] = None, +) -> None: + del cluster_name_on_cloud, provider_config, ports diff --git a/sky/provision/do/utils.py b/sky/provision/do/utils.py new file mode 100644 index 00000000000..ebc1b4ac389 --- /dev/null +++ b/sky/provision/do/utils.py @@ -0,0 +1,306 @@ +"""DigitalOcean API client wrapper for SkyPilot. + +Example usage of `pydo` client library was mostly taken from here: +https://github.com/digitalocean/pydo/blob/main/examples/poc_droplets_volumes_sshkeys.py +""" + +import copy +import os +import typing +from typing import Any, Dict, List, Optional +import urllib +import uuid + +from sky import sky_logging +from sky.adaptors import do +from sky.provision import common +from sky.provision import constants as provision_constants +from sky.provision.do import constants +from sky.utils import common_utils + +if typing.TYPE_CHECKING: + from sky import resources + from sky import status_lib + +logger = sky_logging.init_logger(__name__) + +POSSIBLE_CREDENTIALS_PATHS = [ + os.path.expanduser( + '~/Library/Application Support/doctl/config.yaml'), # OS X + os.path.expanduser( + os.path.join(os.getenv('XDG_CONFIG_HOME', '~/.config/'), + 'doctl/config.yaml')), # Linux +] +INITIAL_BACKOFF_SECONDS = 10 +MAX_BACKOFF_FACTOR = 10 +MAX_ATTEMPTS = 6 +SSH_KEY_NAME_ON_DO = f'sky-key-{common_utils.get_user_hash()}' + +CREDENTIALS_PATH = '~/.config/doctl/config.yaml' +_client = None +_ssh_key_id = None + + +class DigitalOceanError(Exception): + pass + + +def _init_client(): + global _client, CREDENTIALS_PATH + assert _client is None + CREDENTIALS_PATH = None + credentials_found = 0 + for path in POSSIBLE_CREDENTIALS_PATHS: + if os.path.exists(path): + CREDENTIALS_PATH = path + credentials_found += 1 + logger.debug(f'Digital Ocean credential path found at {path}') + if not credentials_found > 1: + logger.debug('more than 1 credential file found') + if CREDENTIALS_PATH is None: + raise DigitalOceanError( + 'no credentials file found from ' + f'the following paths {POSSIBLE_CREDENTIALS_PATHS}') + + # attempt default context + credentials = common_utils.read_yaml(CREDENTIALS_PATH) + default_token = credentials.get('access-token', None) + if default_token is not None: + try: + test_client = do.pydo.Client(token=default_token) + test_client.droplets.list() + logger.debug('trying `default` context') + _client = test_client + return _client + except do.exceptions().HttpResponseError: + pass + + auth_contexts = credentials.get('auth-contexts', None) + if auth_contexts is not None: + for context, api_token in auth_contexts.items(): + try: + test_client = do.pydo.Client(token=api_token) + test_client.droplets.list() + logger.debug(f'using {context} context') + _client = test_client + break + except do.exceptions().HttpResponseError: + continue + else: + raise DigitalOceanError( + 'no valid api tokens found try ' + 'setting a new API token with `doctl auth init`') + return _client + + +def client(): + global _client + if _client is None: + _client = _init_client() + return _client + + +def ssh_key_id(public_key: str): + global _ssh_key_id + if _ssh_key_id is None: + page = 1 + paginated = True + while paginated: + try: + resp = client().ssh_keys.list(per_page=50, page=page) + for ssh_key in resp['ssh_keys']: + if ssh_key['public_key'] == public_key: + _ssh_key_id = ssh_key + return _ssh_key_id + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: ' + f'{err.error.message}') from err + + pages = resp['links'] + if 'pages' in pages and 'next' in pages['pages']: + pages = pages['pages'] + parsed_url = urllib.parse.urlparse(pages['next']) + page = int(urllib.parse.parse_qs(parsed_url.query)['page'][0]) + else: + paginated = False + + request = { + 'public_key': public_key, + 'name': SSH_KEY_NAME_ON_DO, + } + _ssh_key_id = client().ssh_keys.create(body=request)['ssh_key'] + return _ssh_key_id + + +def _create_volume(request: Dict[str, Any]) -> Dict[str, Any]: + try: + resp = client().volumes.create(body=request) + volume = resp['volume'] + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + else: + return volume + + +def _create_droplet(request: Dict[str, Any]) -> Dict[str, Any]: + try: + resp = client().droplets.create(body=request) + droplet_id = resp['droplet']['id'] + + get_resp = client().droplets.get(droplet_id) + droplet = get_resp['droplet'] + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + return droplet + + +def create_instance(region: str, cluster_name_on_cloud: str, instance_type: str, + config: common.ProvisionConfig) -> Dict[str, Any]: + """Creates a instance and mounts the requested block storage + + Args: + region (str): instance region + instance_name (str): name of instance + config (common.ProvisionConfig): provisioner configuration + + Returns: + Dict[str, Any]: instance metadata + """ + # sort tags by key to support deterministic unit test stubbing + tags = dict(sorted(copy.deepcopy(config.tags).items())) + tags = { + 'Name': cluster_name_on_cloud, + provision_constants.TAG_RAY_CLUSTER_NAME: cluster_name_on_cloud, + provision_constants.TAG_SKYPILOT_CLUSTER_NAME: cluster_name_on_cloud, + **tags + } + tags = [f'{key}:{value}' for key, value in tags.items()] + default_image = constants.GPU_IMAGES.get( + config.node_config['InstanceType'], + 'gpu-h100x1-base', + ) + image_id = config.node_config['ImageId'] + image_id = image_id if image_id is not None else default_image + instance_name = (f'{cluster_name_on_cloud}-' + f'{uuid.uuid4().hex[:4]}-{instance_type}') + instance_request = { + 'name': instance_name, + 'region': region, + 'size': config.node_config['InstanceType'], + 'image': image_id, + 'ssh_keys': [ + ssh_key_id( + config.authentication_config['ssh_public_key'])['fingerprint'] + ], + 'tags': tags, + } + instance = _create_droplet(instance_request) + + volume_request = { + 'size_gigabytes': config.node_config['DiskSize'], + 'name': instance_name, + 'region': region, + 'filesystem_type': 'ext4', + 'tags': tags + } + volume = _create_volume(volume_request) + + attach_request = {'type': 'attach', 'droplet_id': instance['id']} + try: + client().volume_actions.post_by_id(volume['id'], attach_request) + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + logger.debug(f'{instance_name} created') + return instance + + +def start_instance(instance: Dict[str, Any]): + try: + client().droplet_actions.post(droplet_id=instance['id'], + body={'type': 'power_on'}) + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + + +def stop_instance(instance: Dict[str, Any]): + try: + client().droplet_actions.post( + droplet_id=instance['id'], + body={'type': 'shutdown'}, + ) + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + + +def down_instance(instance: Dict[str, Any]): + # We use dangerous destroy to atomically delete + # block storage and instance for autodown + try: + client().droplets.destroy_with_associated_resources_dangerous( + droplet_id=instance['id'], x_dangerous=True) + except do.exceptions().HttpResponseError as err: + if 'a destroy is already in progress' in err.error.message: + return + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + + +def rename_instance(instance: Dict[str, Any], new_name: str): + try: + client().droplet_actions.rename(droplet=instance['id'], + body={ + 'type': 'rename', + 'name': new_name + }) + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + + +def filter_instances( + cluster_name_on_cloud: str, + status_filters: Optional[List[str]] = None) -> Dict[str, Any]: + """Returns Dict mapping instance name + to instance metadata filtered by status + """ + + filtered_instances: Dict[str, Any] = {} + page = 1 + paginated = True + while paginated: + try: + resp = client().droplets.list( + tag_name=f'{provision_constants.TAG_SKYPILOT_CLUSTER_NAME}:' + f'{cluster_name_on_cloud}', + per_page=50, + page=page) + for instance in resp['droplets']: + if status_filters is None or instance[ + 'status'] in status_filters: + filtered_instances[instance['name']] = instance + except do.exceptions().HttpResponseError as err: + raise DigitalOceanError( + f'Error: {err.status_code} {err.reason}: {err.error.message}' + ) from err + + pages = resp['links'] + if 'pages' in pages and 'next' in pages['pages']: + pages = pages['pages'] + parsed_url = urllib.parse.urlparse(pages['next']) + page = int(urllib.parse.parse_qs(parsed_url.query)['page'][0]) + else: + paginated = False + return filtered_instances diff --git a/sky/provision/docker_utils.py b/sky/provision/docker_utils.py index c55508ab41a..848c7a06983 100644 --- a/sky/provision/docker_utils.py +++ b/sky/provision/docker_utils.py @@ -338,14 +338,20 @@ def _check_docker_installed(self): no_exist = 'NoExist' # SkyPilot: Add the current user to the docker group first (if needed), # before checking if docker is installed to avoid permission issues. - cleaned_output = self._run( - 'id -nG $USER | grep -qw docker || ' - 'sudo usermod -aG docker $USER > /dev/null 2>&1;' - f'command -v {self.docker_cmd} || echo {no_exist!r}') - if no_exist in cleaned_output or 'docker' not in cleaned_output: - logger.error( - f'{self.docker_cmd.capitalize()} not installed. Please use an ' - f'image with {self.docker_cmd.capitalize()} installed.') + docker_cmd = ('id -nG $USER | grep -qw docker || ' + 'sudo usermod -aG docker $USER > /dev/null 2>&1;' + f'command -v {self.docker_cmd} || echo {no_exist!r}') + cleaned_output = self._run(docker_cmd) + timeout = 60 * 10 # 10 minute timeout + start = time.time() + while no_exist in cleaned_output or 'docker' not in cleaned_output: + if time.time() - start > timeout: + logger.error( + f'{self.docker_cmd.capitalize()} not installed. Please use ' + f'an image with {self.docker_cmd.capitalize()} installed.') + return + time.sleep(5) + cleaned_output = self._run(docker_cmd) def _check_container_status(self): if self.initialized: diff --git a/sky/provision/provisioner.py b/sky/provision/provisioner.py index cc2ca73e1dc..8f2142df273 100644 --- a/sky/provision/provisioner.py +++ b/sky/provision/provisioner.py @@ -415,7 +415,6 @@ def _post_provision_setup( f'{json.dumps(dataclasses.asdict(provision_record), indent=2)}\n' 'Cluster info:\n' f'{json.dumps(dataclasses.asdict(cluster_info), indent=2)}') - head_instance = cluster_info.get_head_instance() if head_instance is None: e = RuntimeError(f'Provision failed for cluster {cluster_name!r}. ' diff --git a/sky/setup_files/dependencies.py b/sky/setup_files/dependencies.py index 18d2f5cdc08..16590a9fd0d 100644 --- a/sky/setup_files/dependencies.py +++ b/sky/setup_files/dependencies.py @@ -127,6 +127,7 @@ 'fluidstack': [], # No dependencies needed for fluidstack 'cudo': ['cudo-compute>=0.1.10'], 'paperspace': [], # No dependencies needed for paperspace + 'do': ['pydo>=0.3.0', 'azure-core>=1.24.0', 'azure-common'], 'vsphere': [ 'pyvmomi==8.0.1.0.2', # vsphere-automation-sdk is also required, but it does not have diff --git a/sky/templates/do-ray.yml.j2 b/sky/templates/do-ray.yml.j2 new file mode 100644 index 00000000000..ea9db59398e --- /dev/null +++ b/sky/templates/do-ray.yml.j2 @@ -0,0 +1,98 @@ +cluster_name: {{cluster_name_on_cloud}} + +# The maximum number of workers nodes to launch in addition to the head node. +max_workers: {{num_nodes - 1}} +upscaling_speed: {{num_nodes - 1}} +idle_timeout_minutes: 60 + +{%- if docker_image is not none %} +docker: + image: {{docker_image}} + container_name: {{docker_container_name}} + run_options: + - --ulimit nofile=1048576:1048576 + {%- for run_option in docker_run_options %} + - {{run_option}} + {%- endfor %} + {%- if docker_login_config is not none %} + docker_login_config: + username: |- + {{docker_login_config.username}} + password: |- + {{docker_login_config.password}} + server: |- + {{docker_login_config.server}} + {%- endif %} +{%- endif %} + +provider: + type: external + module: sky.provision.do + region: "{{region}}" + +auth: + ssh_user: root + ssh_private_key: {{ssh_private_key}} + ssh_public_key: |- + skypilot:ssh_public_key_content + +available_node_types: + ray_head_default: + resources: {} + node_config: + InstanceType: {{instance_type}} + DiskSize: {{disk_size}} + {%- if image_id is not none %} + ImageId: {{image_id}} + {%- endif %} + +head_node_type: ray_head_default + +# Format: `REMOTE_PATH : LOCAL_PATH` +file_mounts: { + "{{sky_ray_yaml_remote_path}}": "{{sky_ray_yaml_local_path}}", + "{{sky_remote_path}}/{{sky_wheel_hash}}": "{{sky_local_path}}", +{%- for remote_path, local_path in credentials.items() %} + "{{remote_path}}": "{{local_path}}", +{%- endfor %} +} + +rsync_exclude: [] + +initialization_commands: [] + +# List of shell commands to run to set up nodes. +# NOTE: these are very performance-sensitive. Each new item opens/closes an SSH +# connection, which is expensive. Try your best to co-locate commands into fewer +# items! +# +# Increment the following for catching performance bugs easier: +# current num items (num SSH connections): 1 +setup_commands: + # Disable `unattended-upgrades` to prevent apt-get from hanging. It should be called at the beginning before the process started to avoid being blocked. (This is a temporary fix.) + # Create ~/.ssh/config file in case the file does not exist in the image. + # Line 'rm ..': there is another installation of pip. + # Line 'sudo bash ..': set the ulimit as suggested by ray docs for performance. https://docs.ray.io/en/latest/cluster/vms/user-guides/large-cluster-best-practices.html#system-configuration + # Line 'sudo grep ..': set the number of threads per process to unlimited to avoid ray job submit stucking issue when the number of running ray jobs increase. + # Line 'mkdir -p ..': disable host key check + # Line 'python3 -c ..': patch the buggy ray files and enable `-o allow_other` option for `goofys` + - {%- for initial_setup_command in initial_setup_commands %} + {{ initial_setup_command }} + {%- endfor %} + sudo systemctl stop unattended-upgrades || true; + sudo systemctl disable unattended-upgrades || true; + sudo sed -i 's/Unattended-Upgrade "1"/Unattended-Upgrade "0"/g' /etc/apt/apt.conf.d/20auto-upgrades || true; + sudo kill -9 `sudo lsof /var/lib/dpkg/lock-frontend | awk '{print $2}' | tail -n 1` || true; + sudo pkill -9 apt-get; + sudo pkill -9 dpkg; + sudo dpkg --configure -a; + mkdir -p ~/.ssh; touch ~/.ssh/config; + {{ conda_installation_commands }} + {{ ray_skypilot_installation_commands }} + sudo bash -c 'rm -rf /etc/security/limits.d; echo "* soft nofile 1048576" >> /etc/security/limits.conf; echo "* hard nofile 1048576" >> /etc/security/limits.conf'; + sudo grep -e '^DefaultTasksMax' /etc/systemd/system.conf || (sudo bash -c 'echo "DefaultTasksMax=infinity" >> /etc/systemd/system.conf'); sudo systemctl set-property user-$(id -u $(whoami)).slice TasksMax=infinity; sudo systemctl daemon-reload; + mkdir -p ~/.ssh; (grep -Pzo -q "Host \*\n StrictHostKeyChecking no" ~/.ssh/config) || printf "Host *\n StrictHostKeyChecking no\n" >> ~/.ssh/config; + [ -f /etc/fuse.conf ] && sudo sed -i 's/#user_allow_other/user_allow_other/g' /etc/fuse.conf || (sudo sh -c 'echo "user_allow_other" > /etc/fuse.conf'); + +# Command to start ray clusters are now placed in `sky.provision.instance_setup`. +# We do not need to list it here anymore. diff --git a/tests/conftest.py b/tests/conftest.py index ee5caf062b9..59825385a74 100644 --- a/tests/conftest.py +++ b/tests/conftest.py @@ -21,7 +21,7 @@ # To only run tests for managed jobs (without generic tests), use # --managed-jobs. all_clouds_in_smoke_tests = [ - 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', + 'aws', 'gcp', 'azure', 'lambda', 'cloudflare', 'ibm', 'scp', 'oci', 'do', 'kubernetes', 'vsphere', 'cudo', 'fluidstack', 'paperspace', 'runpod' ] default_clouds_to_run = ['aws', 'azure'] @@ -43,6 +43,7 @@ 'fluidstack': 'fluidstack', 'cudo': 'cudo', 'paperspace': 'paperspace', + 'do': 'do', 'runpod': 'runpod' } diff --git a/tests/skyserve/update/bump_version_after.yaml b/tests/skyserve/update/bump_version_after.yaml index 6e845f54b9e..0f2c6925bc6 100644 --- a/tests/skyserve/update/bump_version_after.yaml +++ b/tests/skyserve/update/bump_version_after.yaml @@ -16,7 +16,7 @@ service: replicas: 3 resources: - ports: 8081 + ports: 8080 cpus: 2+ setup: | diff --git a/tests/skyserve/update/bump_version_before.yaml b/tests/skyserve/update/bump_version_before.yaml index c9fd957e41a..de922b66434 100644 --- a/tests/skyserve/update/bump_version_before.yaml +++ b/tests/skyserve/update/bump_version_before.yaml @@ -16,7 +16,7 @@ service: replicas: 2 resources: - ports: 8081 + ports: 8080 cpus: 2+ setup: | diff --git a/tests/smoke_tests/test_basic.py b/tests/smoke_tests/test_basic.py index e8dffe53846..aead10fa733 100644 --- a/tests/smoke_tests/test_basic.py +++ b/tests/smoke_tests/test_basic.py @@ -422,6 +422,7 @@ def test_load_dump_yaml_config_equivalent(self): # ---------- Testing Multiple Accelerators ---------- @pytest.mark.no_fluidstack # Fluidstack does not support K80 gpus for now @pytest.mark.no_paperspace # Paperspace does not support K80 gpus +@pytest.mark.no_do # DO does not support K80s def test_multiple_accelerators_ordered(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -438,6 +439,7 @@ def test_multiple_accelerators_ordered(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs +@pytest.mark.no_do # DO does not have multiple accelerators def test_multiple_accelerators_ordered_with_default(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -454,6 +456,7 @@ def test_multiple_accelerators_ordered_with_default(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs +@pytest.mark.no_do # DO does not have multiple accelerators def test_multiple_accelerators_unordered(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -469,6 +472,7 @@ def test_multiple_accelerators_unordered(): @pytest.mark.no_fluidstack # Fluidstack has low availability for T4 GPUs @pytest.mark.no_paperspace # Paperspace does not support T4 GPUs +@pytest.mark.no_do # DO does not support multiple accelerators def test_multiple_accelerators_unordered_with_default(): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -502,6 +506,7 @@ def test_multiple_resources(): @pytest.mark.no_paperspace # Requires other clouds to be enabled @pytest.mark.no_kubernetes @pytest.mark.aws # SkyBenchmark requires S3 access +@pytest.mark.no_do # requires other clouds to be enabled def test_sky_bench(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( diff --git a/tests/smoke_tests/test_cluster_job.py b/tests/smoke_tests/test_cluster_job.py index 18b82c649e7..dabea9c8752 100644 --- a/tests/smoke_tests/test_cluster_job.py +++ b/tests/smoke_tests/test_cluster_job.py @@ -22,6 +22,7 @@ import pathlib import tempfile import textwrap +from typing import Dict import jinja2 import pytest @@ -43,15 +44,17 @@ @pytest.mark.no_scp # SCP does not have T4 gpus. Run test_scp_job_queue instead @pytest.mark.no_paperspace # Paperspace does not have T4 gpus. @pytest.mark.no_oci # OCI does not have T4 gpus -def test_job_queue(generic_cloud: str): +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) +def test_job_queue(generic_cloud: str, accelerator: Dict[str, str]): + accelerator = accelerator.get(generic_cloud, 'T4') name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( 'job_queue', [ - f'sky launch -y -c {name} --cloud {generic_cloud} examples/job_queue/cluster.yaml', - f'sky exec {name} -n {name}-1 -d examples/job_queue/job.yaml', - f'sky exec {name} -n {name}-2 -d examples/job_queue/job.yaml', - f'sky exec {name} -n {name}-3 -d examples/job_queue/job.yaml', + f'sky launch -y -c {name} --cloud {generic_cloud} --gpus {accelerator} examples/job_queue/cluster.yaml', + f'sky exec {name} -n {name}-1 -d --gpus {accelerator}:0.5 examples/job_queue/job.yaml', + f'sky exec {name} -n {name}-2 -d --gpus {accelerator}:0.5 examples/job_queue/job.yaml', + f'sky exec {name} -n {name}-3 -d --gpus {accelerator}:0.5 examples/job_queue/job.yaml', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-1 | grep RUNNING', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-2 | grep RUNNING', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep PENDING', @@ -59,8 +62,8 @@ def test_job_queue(generic_cloud: str): 'sleep 5', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep RUNNING', f'sky cancel -y {name} 3', - f'sky exec {name} --gpus T4:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', - f'sky exec {name} --gpus T4:1 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:1 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', f'sky logs {name} 4 --status', f'sky logs {name} 5 --status', ], @@ -77,6 +80,7 @@ def test_job_queue(generic_cloud: str): @pytest.mark.no_scp # Doesn't support SCP for now @pytest.mark.no_oci # Doesn't support OCI for now @pytest.mark.no_kubernetes # Doesn't support Kubernetes for now +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) @pytest.mark.parametrize( 'image_id', [ @@ -93,17 +97,19 @@ def test_job_queue(generic_cloud: str): # 2. python>=3.12 works with SkyPilot runtime. 'docker:winglian/axolotl:main-latest' ]) -def test_job_queue_with_docker(generic_cloud: str, image_id: str): +def test_job_queue_with_docker(generic_cloud: str, image_id: str, + accelerator: Dict[str, str]): + accelerator = accelerator.get(generic_cloud, 'T4') name = smoke_tests_utils.get_cluster_name() + image_id[len('docker:'):][:4] total_timeout_minutes = 40 if generic_cloud == 'azure' else 15 time_to_sleep = 300 if generic_cloud == 'azure' else 180 test = smoke_tests_utils.Test( 'job_queue_with_docker', [ - f'sky launch -y -c {name} --cloud {generic_cloud} --image-id {image_id} examples/job_queue/cluster_docker.yaml', - f'sky exec {name} -n {name}-1 -d --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', - f'sky exec {name} -n {name}-2 -d --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', - f'sky exec {name} -n {name}-3 -d --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', + f'sky launch -y -c {name} --cloud {generic_cloud} --gpus {accelerator} --image-id {image_id} examples/job_queue/cluster_docker.yaml', + f'sky exec {name} -n {name}-1 -d --gpus {accelerator}:0.5 --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', + f'sky exec {name} -n {name}-2 -d --gpus {accelerator}:0.5 --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', + f'sky exec {name} -n {name}-3 -d --gpus {accelerator}:0.5 --image-id {image_id} --env TIME_TO_SLEEP={time_to_sleep} examples/job_queue/job_docker.yaml', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-1 | grep RUNNING', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-2 | grep RUNNING', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep PENDING', @@ -112,7 +118,7 @@ def test_job_queue_with_docker(generic_cloud: str, image_id: str): f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep RUNNING', f'sky cancel -y {name} 3', # Make sure the GPU is still visible to the container. - f'sky exec {name} --image-id {image_id} nvidia-smi | grep "Tesla T4"', + f'sky exec {name} --image-id {image_id} nvidia-smi | grep -i "{accelerator}"', f'sky logs {name} 4 --status', f'sky stop -y {name}', # Make sure the job status preserve after stop and start the @@ -122,12 +128,12 @@ def test_job_queue_with_docker(generic_cloud: str, image_id: str): f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-1 | grep FAILED', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-2 | grep CANCELLED', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep CANCELLED', - f'sky exec {name} --gpus T4:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', - f'sky exec {name} --gpus T4:1 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:1 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', f'sky logs {name} 5 --status', f'sky logs {name} 6 --status', # Make sure it is still visible after an stop & start cycle. - f'sky exec {name} --image-id {image_id} nvidia-smi | grep "Tesla T4"', + f'sky exec {name} --image-id {image_id} nvidia-smi | grep -i "{accelerator}"', f'sky logs {name} 7 --status' ], f'sky down -y {name}', @@ -214,16 +220,18 @@ def test_scp_job_queue(): @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus. @pytest.mark.no_kubernetes # Kubernetes not support num_nodes > 1 yet -def test_job_queue_multinode(generic_cloud: str): +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) +def test_job_queue_multinode(generic_cloud: str, accelerator: Dict[str, str]): + accelerator = accelerator.get(generic_cloud, 'T4') name = smoke_tests_utils.get_cluster_name() total_timeout_minutes = 30 if generic_cloud == 'azure' else 15 test = smoke_tests_utils.Test( 'job_queue_multinode', [ - f'sky launch -y -c {name} --cloud {generic_cloud} examples/job_queue/cluster_multinode.yaml', - f'sky exec {name} -n {name}-1 -d examples/job_queue/job_multinode.yaml', - f'sky exec {name} -n {name}-2 -d examples/job_queue/job_multinode.yaml', - f'sky launch -c {name} -n {name}-3 --detach-setup -d examples/job_queue/job_multinode.yaml', + f'sky launch -y -c {name} --cloud {generic_cloud} --gpus {accelerator} examples/job_queue/cluster_multinode.yaml', + f'sky exec {name} -n {name}-1 -d --gpus {accelerator}:0.5 examples/job_queue/job_multinode.yaml', + f'sky exec {name} -n {name}-2 -d --gpus {accelerator}:0.5 examples/job_queue/job_multinode.yaml', + f'sky launch -c {name} -n {name}-3 --detach-setup -d --gpus {accelerator}:0.5 examples/job_queue/job_multinode.yaml', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-1 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-2 | grep RUNNING)', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-3 | grep PENDING)', @@ -232,16 +240,16 @@ def test_job_queue_multinode(generic_cloud: str): 'sleep 5', f's=$(sky queue {name}); echo "$s"; echo; echo; echo "$s" | grep {name}-3 | grep SETTING_UP', f'sky cancel -y {name} 1 2 3', - f'sky launch -c {name} -n {name}-4 --detach-setup -d examples/job_queue/job_multinode.yaml', + f'sky launch -c {name} -n {name}-4 --detach-setup -d --gpus {accelerator} examples/job_queue/job_multinode.yaml', # Test the job status is correctly set to SETTING_UP, during the setup is running, # and the job can be cancelled during the setup. 'sleep 5', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep SETTING_UP)', f'sky cancel -y {name} 4', f's=$(sky queue {name}) && echo "$s" && (echo "$s" | grep {name}-4 | grep CANCELLED)', - f'sky exec {name} --gpus T4:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', - f'sky exec {name} --gpus T4:0.2 --num-nodes 2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', - f'sky exec {name} --gpus T4:1 --num-nodes 2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:0.2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:0.2 --num-nodes 2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', + f'sky exec {name} --gpus {accelerator}:1 --num-nodes 2 "[[ \$SKYPILOT_NUM_GPUS_PER_NODE -eq 1 ]] || exit 1"', f'sky logs {name} 5 --status', f'sky logs {name} 6 --status', f'sky logs {name} 7 --status', @@ -385,6 +393,7 @@ def test_docker_preinstalled_package(generic_cloud: str): @pytest.mark.no_ibm # IBM Cloud does not have T4 gpus @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet @pytest.mark.no_oci # OCI Cloud does not have T4 gpus +@pytest.mark.no_do # DO does not have T4 gpus def test_multi_echo(generic_cloud: str): name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( @@ -427,14 +436,16 @@ def test_multi_echo(generic_cloud: str): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not have V100 (16GB) GPUs. Run test_scp_huggingface instead. -def test_huggingface(generic_cloud: str): +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) +def test_huggingface(generic_cloud: str, accelerator: Dict[str, str]): + accelerator = accelerator.get(generic_cloud, 'T4') name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( 'huggingface_glue_imdb_app', [ - f'sky launch -y -c {name} --cloud {generic_cloud} examples/huggingface_glue_imdb_app.yaml', + f'sky launch -y -c {name} --cloud {generic_cloud} --gpus {accelerator} examples/huggingface_glue_imdb_app.yaml', f'sky logs {name} 1 --status', # Ensure the job succeeded. - f'sky exec {name} examples/huggingface_glue_imdb_app.yaml', + f'sky exec {name} --gpus {accelerator} examples/huggingface_glue_imdb_app.yaml', f'sky logs {name} 2 --status', # Ensure the job succeeded. ], f'sky down -y {name}', @@ -953,6 +964,7 @@ def test_container_logs_two_simultaneous_jobs_kubernetes(): @pytest.mark.no_lambda_cloud # Lambda Cloud does not have V100 gpus @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet +@pytest.mark.no_dos # DO does not have V100 gpus @pytest.mark.skip( reason= 'The resnet_distributed_tf_app is flaky, due to it failing to detect GPUs.') @@ -1228,12 +1240,14 @@ def test_cancel_azure(): @pytest.mark.no_ibm # IBM cloud currently doesn't provide public image with CUDA @pytest.mark.no_paperspace # Paperspace has `gnome-shell` on nvidia-smi @pytest.mark.no_scp # SCP does not support num_nodes > 1 yet -def test_cancel_pytorch(generic_cloud: str): +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) +def test_cancel_pytorch(generic_cloud: str, accelerator: Dict[str, str]): + accelerator = accelerator.get(generic_cloud, 'T4') name = smoke_tests_utils.get_cluster_name() test = smoke_tests_utils.Test( 'cancel-pytorch', [ - f'sky launch -c {name} --cloud {generic_cloud} examples/resnet_distributed_torch.yaml -y -d', + f'sky launch -c {name} --cloud {generic_cloud} --gpus {accelerator} examples/resnet_distributed_torch.yaml -y -d', # Wait the GPU process to start. 'sleep 90', f'sky exec {name} --num-nodes 2 "(nvidia-smi | grep python) || ' @@ -1283,6 +1297,7 @@ def test_cancel_ibm(): @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances +@pytest.mark.no_do def test_use_spot(generic_cloud: str): """Test use-spot and sky exec.""" name = smoke_tests_utils.get_cluster_name() diff --git a/tests/smoke_tests/test_managed_job.py b/tests/smoke_tests/test_managed_job.py index f39dba6f47e..22381fc45e3 100644 --- a/tests/smoke_tests/test_managed_job.py +++ b/tests/smoke_tests/test_managed_job.py @@ -95,6 +95,7 @@ def test_managed_jobs(generic_cloud: str): @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances +@pytest.mark.no_do # DO does not support spot instances @pytest.mark.managed_jobs def test_job_pipeline(generic_cloud: str): """Test a job pipeline.""" @@ -136,6 +137,7 @@ def test_job_pipeline(generic_cloud: str): @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances +@pytest.mark.no_do # DO does not support spot instances @pytest.mark.managed_jobs def test_managed_jobs_failed_setup(generic_cloud: str): """Test managed job with failed setup.""" @@ -387,6 +389,7 @@ def test_managed_jobs_pipeline_recovery_gcp(): @pytest.mark.no_scp # SCP does not support spot instances @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_kubernetes # Kubernetes does not have a notion of spot instances +@pytest.mark.no_do # DO does not have spot instances @pytest.mark.managed_jobs def test_managed_jobs_recovery_default_resources(generic_cloud: str): """Test managed job recovery for default resources.""" @@ -658,6 +661,7 @@ def test_managed_jobs_cancellation_gcp(): @pytest.mark.no_ibm # IBM Cloud does not support spot instances @pytest.mark.no_paperspace # Paperspace does not support spot instances @pytest.mark.no_scp # SCP does not support spot instances +@pytest.mark.no_do # DO does not support spot instances @pytest.mark.managed_jobs def test_managed_jobs_storage(generic_cloud: str): """Test storage with managed job""" diff --git a/tests/smoke_tests/test_sky_serve.py b/tests/smoke_tests/test_sky_serve.py index 5f34eba8728..2e820d30d52 100644 --- a/tests/smoke_tests/test_sky_serve.py +++ b/tests/smoke_tests/test_sky_serve.py @@ -25,7 +25,7 @@ import inspect import json import shlex -from typing import List, Tuple +from typing import Dict, List, Tuple import pytest from smoke_tests import smoke_tests_utils @@ -197,9 +197,11 @@ def test_skyserve_oci_http(): @pytest.mark.no_fluidstack # Fluidstack does not support T4 gpus for now +@pytest.mark.parametrize('accelerator', [{'do': 'H100'}]) @pytest.mark.serve -def test_skyserve_llm(generic_cloud: str): +def test_skyserve_llm(generic_cloud: str, accelerator: Dict[str, str]): """Test skyserve with real LLM usecase""" + accelerator = accelerator.get(generic_cloud, 'T4') name = _get_service_name() def generate_llm_test_command(prompt: str, expected_output: str) -> str: @@ -217,7 +219,7 @@ def generate_llm_test_command(prompt: str, expected_output: str) -> str: test = smoke_tests_utils.Test( f'test-skyserve-llm', [ - f'sky serve up -n {name} --cloud {generic_cloud} -y tests/skyserve/llm/service.yaml', + f'sky serve up -n {name} --cloud {generic_cloud} --gpus {accelerator} -y tests/skyserve/llm/service.yaml', _SERVE_WAIT_UNTIL_READY.format(name=name, replica_num=1), *[ generate_llm_test_command(prompt, output) @@ -257,6 +259,7 @@ def test_skyserve_spot_recovery(): @pytest.mark.no_fluidstack # Fluidstack does not support spot instances @pytest.mark.serve @pytest.mark.no_kubernetes +@pytest.mark.no_do def test_skyserve_base_ondemand_fallback(generic_cloud: str): name = _get_service_name() test = smoke_tests_utils.Test( @@ -321,6 +324,7 @@ def test_skyserve_dynamic_ondemand_fallback(): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack +@pytest.mark.no_do # DO does not support `--cpus 2` @pytest.mark.serve def test_skyserve_user_bug_restart(generic_cloud: str): """Tests that we restart the service after user bug.""" @@ -507,6 +511,7 @@ def test_skyserve_large_readiness_timeout(generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack +@pytest.mark.no_do # DO does not support `--cpus 2` @pytest.mark.serve def test_skyserve_update(generic_cloud: str): """Test skyserve with update""" @@ -537,6 +542,7 @@ def test_skyserve_update(generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack +@pytest.mark.no_do # DO does not support `--cpus 2` @pytest.mark.serve def test_skyserve_rolling_update(generic_cloud: str): """Test skyserve with rolling update""" @@ -654,6 +660,7 @@ def test_skyserve_update_autoscale(generic_cloud: str): @pytest.mark.no_fluidstack # Spot instances are note supported by Fluidstack @pytest.mark.serve @pytest.mark.no_kubernetes # Spot instances are not supported in Kubernetes +@pytest.mark.no_do # Spot instances not on DO @pytest.mark.parametrize('mode', ['rolling', 'blue_green']) def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): """Test skyserve with update that changes autoscaler""" @@ -717,6 +724,7 @@ def test_skyserve_new_autoscaler_update(mode: str, generic_cloud: str): # TODO: fluidstack does not support `--cpus 2`, but the check for services in this test is based on CPUs @pytest.mark.no_fluidstack +@pytest.mark.no_do # DO does not support `--cpus 2` @pytest.mark.serve def test_skyserve_failures(generic_cloud: str): """Test replica failure statuses"""