-
Notifications
You must be signed in to change notification settings - Fork 538
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
[DigitalOcean] droplet integration (#3832)
* init digital ocean droplet integration * abbreviate cloud name * switch to pydo * adjust polling logic and mount block storage to instance * filter by paginated * lint * sky launch, start, stop functional * fix credential file mounts, autodown works now * set gpu droplet image * cleanup * remove more tests * atomically destroy instance and block storage simulatenously * install docker * disable spot test * fix ip address bug for multinode * lint * patch ssh from job/serve controller * switch to EA slugs * do adaptor * lint * Update sky/clouds/do.py Co-authored-by: Tian Xia <[email protected]> * Update sky/clouds/do.py Co-authored-by: Tian Xia <[email protected]> * comment template * comment patch * add h100 test case * comment on instance name length * Update sky/clouds/do.py Co-authored-by: Tian Xia <[email protected]> * Update sky/clouds/service_catalog/do_catalog.py Co-authored-by: Tian Xia <[email protected]> * comment on max node char len * comment on weird azure import * comment acc price is included in instance price * fix return type * switch with do_utils * remove broad except * Update sky/provision/do/instance.py Co-authored-by: Tian Xia <[email protected]> * Update sky/provision/do/instance.py Co-authored-by: Tian Xia <[email protected]> * remove azure * comment on non_terminated_only * add open port debug message * wrap start instance api * use f-string * wrap stop * wrap instance down * assert credentials and check against all contexts * assert client is None * remove pending instances during instance restart * wrap rename * rename ssh key var * fix tags * add tags for block device * f strings for errors * support image ids * update do tests * only store head instance id * rename image slugs * add digital ocean alias * wait for docker to be available * update requirements and tests * increase docker timeout * lint * move tests * lint * patch test * lint * typo fix * fix typo * patch tests * fix tests * no_mark spot test * handle 2cpu serve tests * lint * lint * use logger.debug * fix none cred path * lint * handle get_cred path * pylint * patch for DO test_optimizer_dryruns.py * revert optimizer dryrun --------- Co-authored-by: Tian Xia <[email protected]> Co-authored-by: Ubuntu <[email protected]>
- Loading branch information
1 parent
17156f7
commit 51bca22
Showing
24 changed files
with
1,250 additions
and
46 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,20 @@ | ||
"""Digital Ocean cloud adaptors""" | ||
|
||
# pylint: disable=import-outside-toplevel | ||
|
||
from sky.adaptors import common | ||
|
||
_IMPORT_ERROR_MESSAGE = ('Failed to import dependencies for DO. ' | ||
'Try pip install "skypilot[do]"') | ||
pydo = common.LazyImport('pydo', import_error_message=_IMPORT_ERROR_MESSAGE) | ||
azure = common.LazyImport('azure', import_error_message=_IMPORT_ERROR_MESSAGE) | ||
_LAZY_MODULES = (pydo, azure) | ||
|
||
|
||
# `pydo`` inherits Azure exceptions. See: | ||
# https://github.com/digitalocean/pydo/blob/7b01498d99eb0d3a772366b642e5fab3d6fc6aa2/examples/poc_droplets_volumes_sshkeys.py#L6 | ||
@common.load_lazy_modules(modules=_LAZY_MODULES) | ||
def exceptions(): | ||
"""Azure exceptions.""" | ||
from azure.core import exceptions as azure_exceptions | ||
return azure_exceptions |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,303 @@ | ||
""" Digital Ocean Cloud. """ | ||
|
||
import json | ||
import typing | ||
from typing import Dict, Iterator, List, Optional, Tuple, Union | ||
|
||
from sky import clouds | ||
from sky.adaptors import do | ||
from sky.clouds import service_catalog | ||
from sky.provision.do import utils as do_utils | ||
from sky.utils import resources_utils | ||
|
||
if typing.TYPE_CHECKING: | ||
from sky import resources as resources_lib | ||
|
||
_CREDENTIAL_FILE = 'config.yaml' | ||
|
||
|
||
@clouds.CLOUD_REGISTRY.register(aliases=['digitalocean']) | ||
class DO(clouds.Cloud): | ||
"""Digital Ocean Cloud""" | ||
|
||
_REPR = 'DO' | ||
_CLOUD_UNSUPPORTED_FEATURES = { | ||
clouds.CloudImplementationFeatures.CLONE_DISK_FROM_CLUSTER: | ||
'Migrating ' | ||
f'disk is not supported in {_REPR}.', | ||
clouds.CloudImplementationFeatures.SPOT_INSTANCE: | ||
'Spot instances are ' | ||
f'not supported in {_REPR}.', | ||
clouds.CloudImplementationFeatures.CUSTOM_DISK_TIER: | ||
'Custom disk tiers' | ||
f' is not supported in {_REPR}.', | ||
} | ||
# DO maximum node name length defined as <= 255 | ||
# https://docs.digitalocean.com/reference/api/api-reference/#operation/droplets_create | ||
# 255 - 8 = 247 characters since | ||
# our provisioner adds additional `-worker`. | ||
_MAX_CLUSTER_NAME_LEN_LIMIT = 247 | ||
_regions: List[clouds.Region] = [] | ||
|
||
# Using the latest SkyPilot provisioner API to provision and check status. | ||
PROVISIONER_VERSION = clouds.ProvisionerVersion.SKYPILOT | ||
STATUS_VERSION = clouds.StatusVersion.SKYPILOT | ||
|
||
@classmethod | ||
def _unsupported_features_for_resources( | ||
cls, resources: 'resources_lib.Resources' | ||
) -> Dict[clouds.CloudImplementationFeatures, str]: | ||
"""The features not supported based on the resources provided. | ||
This method is used by check_features_are_supported() to check if the | ||
cloud implementation supports all the requested features. | ||
Returns: | ||
A dict of {feature: reason} for the features not supported by the | ||
cloud implementation. | ||
""" | ||
del resources # unused | ||
return cls._CLOUD_UNSUPPORTED_FEATURES | ||
|
||
@classmethod | ||
def _max_cluster_name_length(cls) -> Optional[int]: | ||
return cls._MAX_CLUSTER_NAME_LEN_LIMIT | ||
|
||
@classmethod | ||
def regions_with_offering( | ||
cls, | ||
instance_type: str, | ||
accelerators: Optional[Dict[str, int]], | ||
use_spot: bool, | ||
region: Optional[str], | ||
zone: Optional[str], | ||
) -> List[clouds.Region]: | ||
assert zone is None, 'DO does not support zones.' | ||
del accelerators, zone # unused | ||
if use_spot: | ||
return [] | ||
regions = service_catalog.get_region_zones_for_instance_type( | ||
instance_type, use_spot, 'DO') | ||
if region is not None: | ||
regions = [r for r in regions if r.name == region] | ||
return regions | ||
|
||
@classmethod | ||
def get_vcpus_mem_from_instance_type( | ||
cls, | ||
instance_type: str, | ||
) -> Tuple[Optional[float], Optional[float]]: | ||
return service_catalog.get_vcpus_mem_from_instance_type(instance_type, | ||
clouds='DO') | ||
|
||
@classmethod | ||
def zones_provision_loop( | ||
cls, | ||
*, | ||
region: str, | ||
num_nodes: int, | ||
instance_type: str, | ||
accelerators: Optional[Dict[str, int]] = None, | ||
use_spot: bool = False, | ||
) -> Iterator[None]: | ||
del num_nodes # unused | ||
regions = cls.regions_with_offering(instance_type, | ||
accelerators, | ||
use_spot, | ||
region=region, | ||
zone=None) | ||
for r in regions: | ||
assert r.zones is None, r | ||
yield r.zones | ||
|
||
def instance_type_to_hourly_cost( | ||
self, | ||
instance_type: str, | ||
use_spot: bool, | ||
region: Optional[str] = None, | ||
zone: Optional[str] = None, | ||
) -> float: | ||
return service_catalog.get_hourly_cost( | ||
instance_type, | ||
use_spot=use_spot, | ||
region=region, | ||
zone=zone, | ||
clouds='DO', | ||
) | ||
|
||
def accelerators_to_hourly_cost( | ||
self, | ||
accelerators: Dict[str, int], | ||
use_spot: bool, | ||
region: Optional[str] = None, | ||
zone: Optional[str] = None, | ||
) -> float: | ||
"""Returns the hourly cost of the accelerators, in dollars/hour.""" | ||
# the acc price is include in the instance price. | ||
del accelerators, use_spot, region, zone # unused | ||
return 0.0 | ||
|
||
def get_egress_cost(self, num_gigabytes: float) -> float: | ||
return 0.0 | ||
|
||
def __repr__(self): | ||
return self._REPR | ||
|
||
@classmethod | ||
def get_default_instance_type( | ||
cls, | ||
cpus: Optional[str] = None, | ||
memory: Optional[str] = None, | ||
disk_tier: Optional[resources_utils.DiskTier] = None, | ||
) -> Optional[str]: | ||
"""Returns the default instance type for DO.""" | ||
return service_catalog.get_default_instance_type(cpus=cpus, | ||
memory=memory, | ||
disk_tier=disk_tier, | ||
clouds='DO') | ||
|
||
@classmethod | ||
def get_accelerators_from_instance_type( | ||
cls, instance_type: str) -> Optional[Dict[str, Union[int, float]]]: | ||
return service_catalog.get_accelerators_from_instance_type( | ||
instance_type, clouds='DO') | ||
|
||
@classmethod | ||
def get_zone_shell_cmd(cls) -> Optional[str]: | ||
return None | ||
|
||
def make_deploy_resources_variables( | ||
self, | ||
resources: 'resources_lib.Resources', | ||
cluster_name: resources_utils.ClusterName, | ||
region: 'clouds.Region', | ||
zones: Optional[List['clouds.Zone']], | ||
num_nodes: int, | ||
dryrun: bool = False) -> Dict[str, Optional[str]]: | ||
del zones, dryrun, cluster_name | ||
|
||
r = resources | ||
acc_dict = self.get_accelerators_from_instance_type(r.instance_type) | ||
if acc_dict is not None: | ||
custom_resources = json.dumps(acc_dict, separators=(',', ':')) | ||
else: | ||
custom_resources = None | ||
image_id = None | ||
if (resources.image_id is not None and | ||
resources.extract_docker_image() is None): | ||
if None in resources.image_id: | ||
image_id = resources.image_id[None] | ||
else: | ||
assert region.name in resources.image_id | ||
image_id = resources.image_id[region.name] | ||
return { | ||
'instance_type': resources.instance_type, | ||
'custom_resources': custom_resources, | ||
'region': region.name, | ||
**({ | ||
'image_id': image_id | ||
} if image_id else {}) | ||
} | ||
|
||
def _get_feasible_launchable_resources( | ||
self, resources: 'resources_lib.Resources' | ||
) -> resources_utils.FeasibleResources: | ||
"""Returns a list of feasible resources for the given resources.""" | ||
if resources.use_spot: | ||
# TODO: Add hints to all return values in this method to help | ||
# users understand why the resources are not launchable. | ||
return resources_utils.FeasibleResources([], [], None) | ||
if resources.instance_type is not None: | ||
assert resources.is_launchable(), resources | ||
resources = resources.copy(accelerators=None) | ||
return resources_utils.FeasibleResources([resources], [], None) | ||
|
||
def _make(instance_list): | ||
resource_list = [] | ||
for instance_type in instance_list: | ||
r = resources.copy( | ||
cloud=DO(), | ||
instance_type=instance_type, | ||
accelerators=None, | ||
cpus=None, | ||
) | ||
resource_list.append(r) | ||
return resource_list | ||
|
||
# Currently, handle a filter on accelerators only. | ||
accelerators = resources.accelerators | ||
if accelerators is None: | ||
# Return a default instance type | ||
default_instance_type = DO.get_default_instance_type( | ||
cpus=resources.cpus, | ||
memory=resources.memory, | ||
disk_tier=resources.disk_tier) | ||
return resources_utils.FeasibleResources( | ||
_make([default_instance_type]), [], None) | ||
|
||
assert len(accelerators) == 1, resources | ||
acc, acc_count = list(accelerators.items())[0] | ||
(instance_list, fuzzy_candidate_list) = ( | ||
service_catalog.get_instance_type_for_accelerator( | ||
acc, | ||
acc_count, | ||
use_spot=resources.use_spot, | ||
cpus=resources.cpus, | ||
memory=resources.memory, | ||
region=resources.region, | ||
zone=resources.zone, | ||
clouds='DO', | ||
)) | ||
if instance_list is None: | ||
return resources_utils.FeasibleResources([], fuzzy_candidate_list, | ||
None) | ||
return resources_utils.FeasibleResources(_make(instance_list), | ||
fuzzy_candidate_list, None) | ||
|
||
@classmethod | ||
def check_credentials(cls) -> Tuple[bool, Optional[str]]: | ||
"""Verify that the user has valid credentials for DO.""" | ||
try: | ||
# attempt to make a CURL request for listing instances | ||
do_utils.client().droplets.list() | ||
except do.exceptions().HttpResponseError as err: | ||
return False, str(err) | ||
except do_utils.DigitalOceanError as err: | ||
return False, str(err) | ||
|
||
return True, None | ||
|
||
def get_credential_file_mounts(self) -> Dict[str, str]: | ||
try: | ||
do_utils.client() | ||
return { | ||
f'~/.config/doctl/{_CREDENTIAL_FILE}': do_utils.CREDENTIALS_PATH | ||
} | ||
except do_utils.DigitalOceanError: | ||
return {} | ||
|
||
@classmethod | ||
def get_current_user_identity(cls) -> Optional[List[str]]: | ||
# NOTE: used for very advanced SkyPilot functionality | ||
# Can implement later if desired | ||
return None | ||
|
||
@classmethod | ||
def get_image_size(cls, image_id: str, region: Optional[str]) -> float: | ||
del region | ||
try: | ||
response = do_utils.client().images.get(image_id=image_id) | ||
return response['image']['size_gigabytes'] | ||
except do.exceptions().HttpResponseError as err: | ||
raise do_utils.DigitalOceanError( | ||
'HTTP error while retrieving size of ' | ||
f'image_id {response}: {err.error.message}') from err | ||
except KeyError as err: | ||
raise do_utils.DigitalOceanError( | ||
f'No image_id `{image_id}` found') from err | ||
|
||
def instance_type_exists(self, instance_type: str) -> bool: | ||
return service_catalog.instance_type_exists(instance_type, 'DO') | ||
|
||
def validate_region_zone(self, region: Optional[str], zone: Optional[str]): | ||
return service_catalog.validate_region_zone(region, zone, clouds='DO') |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Oops, something went wrong.