Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix issue 3744: Regions are not respected for buckets created with sky launch #3789

Open
wants to merge 26 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 23 commits
Commits
Show all changes
26 commits
Select commit Hold shift + click to select a range
c4ad6b3
Fix regions are not respected for buckets created with sky launch (#3…
Shrinandan-N Jul 26, 2024
aaa5c7d
Format storage.py and task.py
Shrinandan-N Jul 26, 2024
e1e2590
add default value for region if not provided
Shrinandan-N Jul 26, 2024
3835992
fix yaml parse error
Shrinandan-N Jul 27, 2024
04d148f
reset task and execution, delay add_store in storage
Shrinandan-N Jul 29, 2024
387a8a1
lint
Shrinandan-N Jul 30, 2024
39b3dd8
lint
Shrinandan-N Jul 30, 2024
126ea2c
format
Shrinandan-N Jul 30, 2024
00ccb1d
format
Shrinandan-N Jul 30, 2024
00c6d00
Merge branch 'skypilot-org:master' into fix/issue-3744-sky-launch-buc…
Shrinandan-N Aug 3, 2024
1801e6c
Merge branch 'fix/issue-3744-sky-launch-bucket-regions' of github.com…
Shrinandan-N Aug 3, 2024
f949832
fix bucket not created error
Shrinandan-N Aug 3, 2024
8e8de6b
Delete config.yaml
Shrinandan-N Aug 3, 2024
7715412
Merge branch 'skypilot-org:master' into fix/issue-3744-sky-launch-buc…
Shrinandan-N Aug 15, 2024
ccb25d9
resolve edge cases
Shrinandan-N Aug 17, 2024
7f3a506
fix null error
Shrinandan-N Aug 17, 2024
d117b73
add init store field
Shrinandan-N Aug 31, 2024
63e84fc
add sync_bucket function
Shrinandan-N Sep 21, 2024
6a62986
fix format
Shrinandan-N Sep 21, 2024
481f15d
fix formatting
Shrinandan-N Sep 21, 2024
4262bd2
remove sync bucket from azure
Shrinandan-N Sep 21, 2024
50eb810
fix null issue with store
Shrinandan-N Sep 21, 2024
19df941
fix bucket file sync and adjusted smoke tests
Shrinandan-N Oct 27, 2024
be12fcf
fix bucket creation edge case
Shrinandan-N Nov 8, 2024
837c9d8
Merge remote-tracking branch 'upstream/master' into fix/issue-3744-sk…
Shrinandan-N Nov 8, 2024
14abcbf
add azure support
Shrinandan-N Dec 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
140 changes: 87 additions & 53 deletions sky/data/storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,12 @@ def initialize(self):
"""
pass

def sync_bucket(self, region: Optional[str] = None):
"""Fetches bucket if exists, or creates it if
it does not.
"""
pass

def _validate(self) -> None:
"""Runs validation checks on class args"""
pass
Expand Down Expand Up @@ -808,14 +814,15 @@ def _add_store_from_metadata(
'to be reconstructed while the corresponding '
'storage account '
f'{s_metadata.storage_account_name!r} does '
'not exist.')
'not exist')
else:
logger.debug(f'Storage object {self.name!r} was attempted '
'to be reconstructed while the corresponding '
'bucket was externally deleted.')
continue

self._add_store(store, is_reconstructed=True)
# self._sync_store(store)

@classmethod
def from_metadata(cls, metadata: StorageMetadata,
Expand Down Expand Up @@ -917,14 +924,13 @@ def add_store(self,
f'store with name {self.name!r}.')
raise

# Add store to storage
self._add_store(store)
return store

# Upload source to store
def initialize_and_sync_store(self, store: AbstractStore) -> None:
"""Public method to initialize and sync the given store."""
self._add_store(store)
self._sync_store(store)

return store

def _add_store(self, store: AbstractStore, is_reconstructed: bool = False):
# Adds a store object to the storage
store_type = StoreType.from_store(store)
Expand Down Expand Up @@ -1020,7 +1026,7 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':

name = config.pop('name', None)
source = config.pop('source', None)
store = config.pop('store', None)
store = config.pop('store', None) # where do we store this?!!?!?!?
mode_str = config.pop('mode', None)
force_delete = config.pop('_force_delete', None)
if force_delete is None:
Expand All @@ -1043,8 +1049,8 @@ def from_yaml_config(cls, config: Dict[str, Any]) -> 'Storage':
source=source,
persistent=persistent,
mode=mode)
if store is not None:
storage_obj.add_store(StoreType(store.upper()))
if store:
storage_obj.add_store(store_type=store.upper())

# Add force deletion flag
storage_obj.force_delete = force_delete
Expand Down Expand Up @@ -1138,11 +1144,11 @@ def _validate(self):
if not _is_storage_cloud_enabled(str(clouds.AWS())):
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(
'Storage \'store: s3\' specified, but ' \
'AWS access is disabled. To fix, enable '\
'AWS by running `sky check`. More info: '\
'https://skypilot.readthedocs.io/en/latest/getting-started/installation.html.' # pylint: disable=line-too-long
)
'Storage \'store: s3\' specified, but '
'AWS access is disabled. To fix, enable '
'AWS by running `sky check`. More info: '
'https://skypilot.readthedocs.io/en/latest/getting-started/installation.html.' # pylint: disable=line-too-long
)

@classmethod
def validate_name(cls, name: str) -> str:
Expand Down Expand Up @@ -1215,13 +1221,21 @@ def initialize(self):
StorageInitError: If general initialization fails.
"""
self.client = data_utils.create_s3_client(self.region)
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.bucket = None

def sync_bucket(self, region: Optional[str] = None):
if region:
self.region = region
if self.bucket is None:
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state)
# and we should set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket

return self.bucket

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1256,7 +1270,7 @@ def delete(self) -> None:
msg_str = f'Deleted S3 bucket {self.name}.'
else:
msg_str = f'S3 bucket {self.name} may have been deleted ' \
f'externally. Removing from local state.'
f'externally. Removing from local state.'
logger.info(f'{colorama.Fore.GREEN}{msg_str}'
f'{colorama.Style.RESET_ALL}')

Expand Down Expand Up @@ -1629,13 +1643,20 @@ def initialize(self):
StorageInitError: If general initialization fails.
"""
self.client = gcp.storage_client()
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.bucket = None

def sync_bucket(self, region: Optional[str] = None):
if region:
self.region = region
if self.bucket is None:
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state)
# and we should set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
return self.bucket

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -1672,7 +1693,7 @@ def delete(self) -> None:
msg_str = f'Deleted GCS bucket {self.name}.'
else:
msg_str = f'GCS bucket {self.name} may have been deleted ' \
f'externally. Removing from local state.'
f'externally. Removing from local state.'
logger.info(f'{colorama.Fore.GREEN}{msg_str}'
f'{colorama.Style.RESET_ALL}')

Expand Down Expand Up @@ -2724,11 +2745,11 @@ def _validate(self):
if not _is_storage_cloud_enabled(cloudflare.NAME):
with ux_utils.print_exception_no_traceback():
raise exceptions.ResourcesUnavailableError(
'Storage \'store: r2\' specified, but ' \
'Cloudflare R2 access is disabled. To fix, '\
'enable Cloudflare R2 by running `sky check`. '\
'Storage \'store: r2\' specified, but '
'Cloudflare R2 access is disabled. To fix, '
'enable Cloudflare R2 by running `sky check`. '
'More info: https://skypilot.readthedocs.io/en/latest/getting-started/installation.html.' # pylint: disable=line-too-long
)
)

def initialize(self):
"""Initializes the R2 store object on the cloud.
Expand All @@ -2742,13 +2763,19 @@ def initialize(self):
StorageInitError: If general initialization fails.
"""
self.client = data_utils.create_r2_client(self.region)
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.bucket = None

def sync_bucket(self, region: Optional[str] = None):
if region:
self.region = region
if self.bucket is None:
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state)
# and we should set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket

def upload(self):
"""Uploads source to store bucket.
Expand Down Expand Up @@ -2783,7 +2810,7 @@ def delete(self) -> None:
msg_str = f'Deleted R2 bucket {self.name}.'
else:
msg_str = f'R2 bucket {self.name} may have been deleted ' \
f'externally. Removing from local state.'
f'externally. Removing from local state.'
logger.info(f'{colorama.Fore.GREEN}{msg_str}'
f'{colorama.Style.RESET_ALL}')

Expand Down Expand Up @@ -3064,8 +3091,8 @@ def __init__(self,
super().__init__(name, source, region, is_sky_managed,
sync_on_reconstruction)
self.bucket_rclone_profile = \
Rclone.generate_rclone_bucket_profile_name(
self.name, Rclone.RcloneClouds.IBM)
Rclone.generate_rclone_bucket_profile_name(
self.name, Rclone.RcloneClouds.IBM)
Comment on lines +3220 to +3221
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to revert these cosmetic changes to make the PR easier to read?

We rely on our linter (./format.py) to enforce a consistent style and make changes if necessary.


def _validate(self):
if self.source is not None and isinstance(self.source, str):
Expand Down Expand Up @@ -3166,13 +3193,19 @@ def initialize(self):
"""
self.client = ibm.get_cos_client(self.region)
self.s3_resource = ibm.get_cos_resource(self.region)
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state) and we should
# set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket
self.bucket = None

def sync_bucket(self, region: Optional[str] = None):
if region:
self.region = region
if self.bucket is None:
self.bucket, is_new_bucket = self._get_bucket()
if self.is_sky_managed is None:
# If is_sky_managed is not specified, then this is a new storage
# object (i.e., did not exist in global_user_state)
# and we should set the is_sky_managed property.
# If is_sky_managed is specified, then we take no action.
self.is_sky_managed = is_new_bucket

def upload(self):
"""Uploads files from local machine to bucket.
Expand Down Expand Up @@ -3340,7 +3373,7 @@ def _get_bucket(self) -> Tuple[StorageHandle, bool]:

# bucket's region doesn't match specified region in URI
if bucket_region and uri_region and uri_region != bucket_region\
and self.sync_on_reconstruction:
and self.sync_on_reconstruction:
with ux_utils.print_exception_no_traceback():
raise exceptions.StorageBucketGetError(
f'Bucket {self.name} exists in '
Expand Down Expand Up @@ -3432,7 +3465,8 @@ def _create_cos_bucket(self,
f'with storage class smart tier')
self.bucket = self.s3_resource.Bucket(bucket_name)

except ibm.ibm_botocore.exceptions.ClientError as e: # type: ignore[union-attr] # pylint: disable=line-too-long
# type: ignore[union-attr] # pylint: disable=line-too-long
except ibm.ibm_botocore.exceptions.ClientError as e:
with ux_utils.print_exception_no_traceback():
raise exceptions.StorageBucketCreateError(
f'Failed to create bucket: '
Expand Down
15 changes: 14 additions & 1 deletion sky/task.py
Original file line number Diff line number Diff line change
Expand Up @@ -436,6 +436,7 @@ def from_yaml_config(
assert mount_path, 'Storage mount path cannot be empty.'
try:
storage_obj = storage_lib.Storage.from_yaml_config(storage[1])

except exceptions.StorageSourceError as e:
# Patch the error message to include the mount path, if included
e.args = (e.args[0].replace('<destination_path>',
Expand Down Expand Up @@ -959,9 +960,21 @@ def sync_storage_mounts(self) -> None:
if len(storage.stores) == 0:
store_type, store_region = self._get_preferred_store()
self.storage_plans[storage] = store_type
storage.add_store(store_type, store_region)
new_store = storage.add_store(store_type, store_region)
new_store.sync_bucket(store_region)
storage.initialize_and_sync_store(new_store)
else:
# We will download the first store that is added to remote.
for store_type, store in storage.stores.items():
if store_type == storage_lib.StoreType.AZURE:
continue

s_type, s_region = self._get_preferred_store()
if store_type == s_type:
store.sync_bucket(s_region)
else:
store.sync_bucket()
storage.initialize_and_sync_store(new_store)
self.storage_plans[storage] = list(storage.stores.keys())[0]

storage_mounts = self.storage_mounts
Expand Down
Loading
Loading