From 28adeee849eddbc77ae619bfd39de652ebc169f9 Mon Sep 17 00:00:00 2001 From: Kevin Su Date: Tue, 14 Nov 2023 13:00:45 -0800 Subject: [PATCH] Fix remote file system (get/put) (#1955) * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * test Signed-off-by: Kevin Su * lint Signed-off-by: Kevin Su * update-get Signed-off-by: Kevin Su * add unit test Signed-off-by: Kevin Su --------- Signed-off-by: Kevin Su --- flytekit/core/data_persistence.py | 11 ++++++-- tests/flytekit/unit/core/test_data.py | 37 +++++++++++++++++++++++++++ 2 files changed, 46 insertions(+), 2 deletions(-) diff --git a/flytekit/core/data_persistence.py b/flytekit/core/data_persistence.py index a3ee8b4ac2..fdde60ba49 100644 --- a/flytekit/core/data_persistence.py +++ b/flytekit/core/data_persistence.py @@ -248,7 +248,10 @@ def get(self, from_path: str, to_path: str, recursive: bool = False, **kwargs): self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True ) print(f"Getting {from_path} to {to_path}") - return file_system.get(from_path, to_path, recursive=recursive, **kwargs) + dst = file_system.get(from_path, to_path, recursive=recursive, **kwargs) + if isinstance(dst, (str, pathlib.Path)): + return dst + return to_path except OSError as oe: logger.debug(f"Error in getting {from_path} to {to_path} rec {recursive} {oe}") file_system = self.get_filesystem(get_protocol(from_path), anonymous=True) @@ -271,7 +274,11 @@ def put(self, from_path: str, to_path: str, recursive: bool = False, **kwargs): self.strip_file_header(from_path), self.strip_file_header(to_path), dirs_exist_ok=True ) from_path, to_path = self.recursive_paths(from_path, to_path) - return file_system.put(from_path, to_path, recursive=recursive, **kwargs) + dst = file_system.put(from_path, to_path, recursive=recursive, **kwargs) + if isinstance(dst, (str, pathlib.Path)): + return dst + else: + return to_path def put_raw_data( self, diff --git a/tests/flytekit/unit/core/test_data.py b/tests/flytekit/unit/core/test_data.py index d65d58f550..792a466978 100644 --- a/tests/flytekit/unit/core/test_data.py +++ b/tests/flytekit/unit/core/test_data.py @@ -7,6 +7,7 @@ import fsspec import mock import pytest +from s3fs import S3FileSystem from flytekit.configuration import Config, DataConfig, S3Config from flytekit.core.context_manager import FlyteContextManager @@ -126,6 +127,42 @@ def test_local_provider(source_folder): assert len(files) == 2 +def test_async_file_system(): + remote_path = "test:///tmp/test.py" + local_path = "test.py" + + class MockAsyncFileSystem(S3FileSystem): + def __init__(self, *args, **kwargs): + super().__init__(args, kwargs) + + async def _put_file(self, *args, **kwargs): + # s3fs._put_file returns None as well + return None + + async def _get_file(self, *args, **kwargs): + # s3fs._get_file returns None as well + return None + + async def _lsdir( + self, + path, + refresh=False, + max_items=None, + delimiter="/", + prefix="", + versions=False, + ): + return False + + fsspec.register_implementation("test", MockAsyncFileSystem) + + ctx = FlyteContextManager.current_context() + dst = ctx.file_access.put(local_path, remote_path) + assert dst == remote_path + dst = ctx.file_access.get(remote_path, local_path) + assert dst == local_path + + @pytest.mark.sandbox_test def test_s3_provider(source_folder): # Running mkdir on s3 filesystem doesn't do anything so leaving out for now