From 14624dbd63c811914c4602136b9e6ad719b4a030 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 22:10:58 +0000 Subject: [PATCH 1/5] Initial plan From 568019579fc56ef2449d02c9781411f79eca282e Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 22:23:02 +0000 Subject: [PATCH 2/5] Add comprehensive cloud compatibility tests for Text Components Co-authored-by: VibhuJawa <4837571+VibhuJawa@users.noreply.github.com> --- tests/stages/text/test_cloud_compatibility.py | 385 ++++++++++++++++++ .../stages/text/test_specific_cloud_issues.py | 351 ++++++++++++++++ 2 files changed, 736 insertions(+) create mode 100644 tests/stages/text/test_cloud_compatibility.py create mode 100644 tests/stages/text/test_specific_cloud_issues.py diff --git a/tests/stages/text/test_cloud_compatibility.py b/tests/stages/text/test_cloud_compatibility.py new file mode 100644 index 000000000..2f7b01151 --- /dev/null +++ b/tests/stages/text/test_cloud_compatibility.py @@ -0,0 +1,385 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Cloud compatibility tests for Text Components. + +This module tests that Text Components use fsspec for cloud storage URIs +instead of os/pathlib/glob/shutil which don't work with s3://, gs://, abfs://, etc. +""" + +import os +import posixpath +import tempfile +from unittest.mock import Mock, patch + +import pytest + +# We can't import the actual modules due to dependencies, so we'll test the patterns + + +class TestTextUtilsCloudCompatibilityPatterns: + """Test that text_utils functions handle cloud URIs correctly.""" + + def test_filename_extraction_from_cloud_uris(self): + """Test filename extraction patterns used in text_utils.py.""" + # Simulate the pattern from text_utils.py line 170: + # module = os.path.splitext(os.path.basename(filename))[0] + + test_cases = [ + ("s3://bucket/path/to/file.py", "file"), + ("gs://my-bucket/deep/nested/path/script.py", "script"), + ("abfs://container@account.dfs.core.windows.net/data/code.py", "code"), + ("https://example.com/api/v1/source.py", "source"), + ("/local/path/local_file.py", "local_file"), # Local files should still work + ] + + for filename, expected_module_name in test_cases: + # Current problematic pattern (but works on POSIX systems) + os_module = os.path.splitext(os.path.basename(filename))[0] + + # Recommended cloud-compatible pattern + posix_module = posixpath.splitext(posixpath.basename(filename))[0] + + # Both should work on POSIX systems, but posixpath is more explicit for cloud URIs + assert os_module == expected_module_name, f"os.path pattern failed for {filename}" + assert posix_module == expected_module_name, f"posixpath pattern failed for {filename}" + + # Verify the posixpath approach is cloud-compatible + assert posix_module == expected_module_name + + def test_cloud_uri_filename_extraction_robustness(self): + """Test robustness of filename extraction with various cloud URI formats.""" + edge_cases = [ + ("s3://bucket/file", "file"), # No extension + ("gs://bucket/path.with.dots/file.tar.gz", "file.tar"), # Multiple dots + ("abfs://container@account.dfs.core.windows.net/", ""), # Directory ending + ("https://example.com/path/", ""), # Directory ending with slash + ("s3://bucket/dir/file.json", "file"), # JSON file + ] + + for uri, expected in edge_cases: + result = posixpath.splitext(posixpath.basename(uri))[0] + assert result == expected, f"Failed for {uri}: got {result}, expected {expected}" + + +class TestSemanticDedupeCloudCompatibilityPatterns: + """Test semantic deduplication path construction patterns.""" + + def test_path_joining_for_cloud_uris(self): + """Test path joining patterns used in semantic.py.""" + # Simulate the patterns from semantic.py lines 182-189: + # self.embeddings_path = os.path.join(self.cache_path, "embeddings") + + cloud_base_paths = [ + "s3://my-bucket/dedup-cache", + "gs://bucket/cache", + "abfs://container@account.dfs.core.windows.net/cache", + "https://storage.example.com/cache", + "/local/cache/path", # Local paths should still work + ] + + subdirs = ["embeddings", "semantic_dedup", "duplicates"] + + for base_path in cloud_base_paths: + for subdir in subdirs: + # Current pattern (works on POSIX but may have issues on Windows with protocols) + os_path = os.path.join(base_path, subdir) + + # Recommended cloud-compatible pattern + posix_path = posixpath.join(base_path, subdir) + + # Both should produce valid paths on POSIX systems + assert os_path.startswith(base_path), f"os.path result should start with base: {os_path}" + assert posix_path.startswith(base_path), f"posixpath result should start with base: {posix_path}" + + # Verify cloud URIs maintain their structure + if "://" in base_path: + assert "://" in posix_path, f"Cloud protocol should be preserved: {posix_path}" + assert "/" in posix_path, f"Should use forward slashes: {posix_path}" + assert "\\" not in posix_path, f"Should not contain backslashes: {posix_path}" + + def test_multiple_path_components_joining(self): + """Test joining multiple path components for complex directory structures.""" + base_paths = [ + "s3://bucket/base", + "gs://my-bucket/data", + "abfs://container@account.dfs.core.windows.net/root" + ] + + # Test patterns like: os.path.join(base, "output", "duplicates", "final") + components = ["output", "duplicates", "final"] + + for base in base_paths: + # Use posixpath for cloud-compatible joining + result = base + for component in components: + result = posixpath.join(result, component) + + expected = f"{base}/output/duplicates/final" + assert result == expected, f"Multi-component join failed: {result} != {expected}" + + # Verify cloud URI structure is maintained + assert result.startswith(base), "Should start with base path" + assert "://" in result, "Should maintain protocol" + assert result.count("://") == 1, "Should have exactly one protocol marker" + + +class TestFsspecCloudOperations: + """Test that fsspec operations work correctly with cloud URIs.""" + + def test_fsspec_vs_os_file_existence_checks(self): + """Test file existence patterns: os.path.exists vs fs.exists.""" + # Mock fsspec filesystem + with tempfile.NamedTemporaryFile() as tmp_file: + local_path = tmp_file.name + + # Test with local file - both should work + assert os.path.exists(local_path) is True + + # Mock fsspec for cloud URIs + mock_fs = Mock() + mock_fs.exists.return_value = True + + cloud_uris = [ + "s3://bucket/file.txt", + "gs://bucket/file.txt", + "abfs://container@account.dfs.core.windows.net/file.txt" + ] + + for uri in cloud_uris: + # os.path.exists would return False for cloud URIs (WRONG) + assert os.path.exists(uri) is False, f"os.path.exists should fail for {uri}" + + # fs.exists should work for cloud URIs (CORRECT) + assert mock_fs.exists(uri) is True, f"fs.exists should work for {uri}" + + def test_fsspec_vs_builtin_file_opening(self): + """Test file opening patterns: open() vs fs.open().""" + cloud_uris = [ + "s3://bucket/file.txt", + "gs://bucket/file.txt", + "abfs://container@account.dfs.core.windows.net/file.txt" + ] + + for uri in cloud_uris: + # open(uri) would fail for cloud URIs + with pytest.raises((OSError, FileNotFoundError)): + open(uri, "r") # This should fail + + # Mock fs.open() to simulate cloud success + mock_fs = Mock() + mock_file = Mock() + mock_file.read.return_value = "test content" + mock_fs.open.return_value.__enter__ = Mock(return_value=mock_file) + mock_fs.open.return_value.__exit__ = Mock(return_value=None) + + # fs.open should work for cloud URIs (CORRECT) + with mock_fs.open(uri, "r") as f: + content = f.read() + assert content == "test content" + + def test_fsspec_vs_os_directory_operations(self): + """Test directory listing: os.listdir vs fs.ls.""" + cloud_dirs = [ + "s3://bucket/folder/", + "gs://bucket/data/", + "abfs://container@account.dfs.core.windows.net/path/" + ] + + for uri in cloud_dirs: + # os.listdir would fail for cloud URIs + with pytest.raises((OSError, FileNotFoundError)): + os.listdir(uri) # This should fail + + # Mock fs.ls() to simulate cloud success + mock_fs = Mock() + mock_files = ["file1.txt", "file2.json", "subdir/"] + mock_fs.ls.return_value = mock_files + + # fs.ls should work for cloud URIs (CORRECT) + files = mock_fs.ls(uri) + assert files == mock_files + + def test_fsspec_vs_glob_pattern_matching(self): + """Test glob operations: glob.glob vs fs.glob.""" + import glob + + cloud_patterns = [ + "s3://bucket/**/*.json", + "gs://bucket/data/*.txt", + "abfs://container@account.dfs.core.windows.net/logs/**/*.log" + ] + + for pattern in cloud_patterns: + # glob.glob would return empty list for cloud URIs (WRONG) + assert glob.glob(pattern) == [], f"glob.glob should return empty for {pattern}" + + # Mock fs.glob() to simulate cloud success + mock_fs = Mock() + mock_matches = ["file1.json", "subdir/file2.json"] + mock_fs.glob.return_value = mock_matches + + # fs.glob should work for cloud URIs (CORRECT) + matches = mock_fs.glob(pattern) + assert matches == mock_matches + + def test_fsspec_vs_os_directory_creation_removal(self): + """Test directory creation/removal: os.makedirs vs fs.makedirs.""" + cloud_dirs = [ + "s3://bucket/new-folder/", + "gs://bucket/output/", + "abfs://container@account.dfs.core.windows.net/temp/" + ] + + for uri in cloud_dirs: + # os.makedirs might not fail on all systems but would create wrong local directories + # The point is that it shouldn't be used for cloud URIs + try: + os.makedirs(uri, exist_ok=True) + # If it doesn't fail, it probably created a local directory with the wrong name + # This is the problematic behavior we want to avoid + print(f"Warning: os.makedirs({uri}) didn't fail - this creates wrong local paths") + except (OSError, FileNotFoundError): + # This is the expected behavior on most systems + pass + + # Mock fs operations to simulate cloud success + mock_fs = Mock() + mock_fs.makedirs = Mock() + mock_fs.rm = Mock() + + # fs.makedirs should work for cloud URIs (CORRECT) + mock_fs.makedirs(uri, exist_ok=True) + mock_fs.makedirs.assert_called_once_with(uri, exist_ok=True) + + # fs.rm should work for cloud URIs (CORRECT) + mock_fs.rm(uri, recursive=True) + mock_fs.rm.assert_called_once_with(uri, recursive=True) + + +class TestCloudUriNormalizationPatterns: + """Test cloud URI path normalization and manipulation.""" + + def test_posixpath_vs_os_path_normalization(self): + """Test path normalization for cloud URIs.""" + test_cases = [ + # Note: normpath can collapse // to / which breaks protocols + # This test shows why we need careful handling of cloud URIs + ("s3://bucket/folder/../other/file.json", "gs://bucket/other/file.json"), + ("abfs://container@account.dfs.core.windows.net/a/./b/c", + "abfs://container@account.dfs.core.windows.net/a/b/c"), + ] + + for input_path, _ in test_cases: + # os.path.normpath might mangle protocols or give wrong results + os_result = os.path.normpath(input_path) + + # posixpath.normpath is more appropriate for cloud URIs + posix_result = posixpath.normpath(input_path) + + # Both might have issues with double slashes in protocols + # The key point is that we should use fsspec's url_to_fs instead + print(f"Input: {input_path}") + print(f" os.path.normpath: {os_result}") + print(f" posixpath.normpath: {posix_result}") + + # The lesson: don't use normpath directly on cloud URIs + # Use fsspec.core.url_to_fs() to separate protocol from path + # Then normalize the path component only + + def test_cloud_uri_component_extraction(self): + """Test extracting components from cloud URIs safely.""" + test_uris = [ + "s3://bucket/path/to/file.ext", + "gs://my-bucket/deep/nested/path/data.json", + "abfs://container@account.dfs.core.windows.net/dir/subdir/file.parquet", + "https://storage.example.com/api/v1/data.xml" + ] + + for uri in test_uris: + # Safe way to extract directory + directory = posixpath.dirname(uri) + assert directory.startswith(uri.split("/")[0] + "//"), f"Directory should maintain protocol: {directory}" + + # Safe way to extract filename + filename = posixpath.basename(uri) + assert "://" not in filename, f"Filename should not contain protocol: {filename}" + + # Safe way to extract extension + name, ext = posixpath.splitext(filename) + assert ext.startswith(".") or ext == "", f"Extension should start with dot or be empty: {ext}" + + def test_relative_path_resolution_for_cloud_uris(self): + """Test that relative paths work correctly with cloud URIs.""" + # This test shows the challenges with relative paths in cloud URIs + # The recommended approach is to avoid relative paths with cloud URIs + # or use fsspec.core.url_to_fs() to handle the protocol separately + + base_uris = [ + "s3://bucket/project", + "gs://my-bucket/workspace", + "abfs://container@account.dfs.core.windows.net/base" + ] + + # Safe relative paths that don't go above the base + safe_relative_paths = ["./temp", "subfolder/data", "output/results"] + + for base in base_uris: + for rel_path in safe_relative_paths: + # Use posixpath for cloud-safe relative path resolution + result = posixpath.join(base, rel_path) + + # Verify protocol is maintained + protocol = base.split("://")[0] + assert result.startswith(f"{protocol}://"), f"Protocol should be maintained: {result}" + + # Verify path structure makes sense + assert base in result, f"Base should be contained in result: {result}" + + def test_fsspec_url_to_fs_recommended_pattern(self): + """Test the recommended fsspec pattern for handling cloud URIs safely.""" + # This is the RECOMMENDED way to handle cloud URIs + + test_uris = [ + "s3://bucket/path/file.txt", + "gs://bucket/data.json", + "abfs://container@account.dfs.core.windows.net/file.parquet" + ] + + for uri in test_uris: + # Mock fsspec.core.url_to_fs behavior + def mock_url_to_fs(url): + if url.startswith("s3://"): + return Mock(), url[5:] + elif url.startswith("gs://"): + return Mock(), url[5:] + elif url.startswith("abfs://"): + return Mock(), url[7:] + else: + return Mock(), url + + # Simulate the recommended pattern + fs, path = mock_url_to_fs(uri) + + # Now we can safely use posixpath operations on the path + dirname = posixpath.dirname(path) + basename = posixpath.basename(path) + + # And join additional path components safely + new_path = posixpath.join(path, "subfolder", "newfile.txt") + + # Verify no protocol mangling + assert "://" not in path, f"Path should not contain protocol: {path}" + assert "://" not in new_path, f"New path should not contain protocol: {new_path}" \ No newline at end of file diff --git a/tests/stages/text/test_specific_cloud_issues.py b/tests/stages/text/test_specific_cloud_issues.py new file mode 100644 index 000000000..b418669f4 --- /dev/null +++ b/tests/stages/text/test_specific_cloud_issues.py @@ -0,0 +1,351 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for specific cloud compatibility issues found in text components. + +This module tests the specific problematic patterns identified in the codebase +and ensures they can be fixed to work with cloud URIs. +""" + +import os +import posixpath + + +class TestTextUtilsSpecificIssues: + """Test specific issues found in nemo_curator/stages/text/utils/text_utils.py.""" + + def test_line_170_os_path_basename_splitext_issue(self): + """ + Test the specific issue on line 170 of text_utils.py: + module = os.path.splitext(os.path.basename(filename))[0] + + This pattern should be cloud-compatible. + """ + # The problematic code pattern from line 170 + def extract_module_name_old_way(filename): + """Original problematic pattern.""" + return os.path.splitext(os.path.basename(filename))[0] + + # The cloud-compatible version + def extract_module_name_cloud_way(filename): + """Cloud-compatible pattern.""" + return posixpath.splitext(posixpath.basename(filename))[0] + + test_filenames = [ + ("s3://bucket/path/to/file.py", "file"), + ("gs://my-bucket/deep/nested/script.py", "script"), + ("abfs://container@account.dfs.core.windows.net/code.py", "code"), + ("https://example.com/api/source.py", "source"), + ("/local/path/local_file.py", "local_file"), # Still works for local + ] + + for filename, expected_module in test_filenames: + # Both should work on POSIX systems, but cloud version is more explicit + old_result = extract_module_name_old_way(filename) + new_result = extract_module_name_cloud_way(filename) + + assert old_result == expected_module, f"Old way failed for {filename}" + assert new_result == expected_module, f"New way failed for {filename}" + assert old_result == new_result, f"Results differ for {filename}" + + def test_get_docstrings_mock_file_handling(self): + """ + Test how the get_docstrings function handles file-like objects with cloud URIs. + + This simulates the pattern where a file-like object has a .name attribute + that contains a cloud URI. + """ + from unittest.mock import Mock + + # Simulate the get_docstrings logic for filename extraction + def simulate_get_docstrings_filename_logic(source): + """Simulate the logic from get_docstrings function.""" + if hasattr(source, "read"): + filename = getattr(source, "name", "") + # This is the problematic line 170 + module = os.path.splitext(os.path.basename(filename))[0] + return module + return "" + + cloud_uris = [ + "s3://bucket/path/file.py", + "gs://bucket/script.py", + "abfs://container@account.dfs.core.windows.net/code.py", + "https://example.com/source.py" + ] + + for cloud_uri in cloud_uris: + mock_file = Mock() + mock_file.name = cloud_uri + mock_file.read.return_value = "# Python source code" + + # This should extract the correct module name + module_name = simulate_get_docstrings_filename_logic(mock_file) + expected_name = posixpath.splitext(posixpath.basename(cloud_uri))[0] + + assert module_name == expected_name, f"Module extraction failed for {cloud_uri}" + + +class TestSemanticDedupeSpecificIssues: + """Test specific issues found in nemo_curator/stages/text/deduplication/semantic.py.""" + + def test_lines_182_189_os_path_join_issues(self): + """ + Test the specific issues on lines 182-189 of semantic.py: + - self.embeddings_path = os.path.join(self.cache_path, "embeddings") + - self.semantic_dedup_path = os.path.join(self.cache_path, "semantic_dedup") + - etc. + + These patterns should be cloud-compatible. + """ + # Simulate the problematic pattern from semantic.py + def create_paths_old_way(cache_path, output_path): + """Original problematic patterns from lines 182-189.""" + embeddings_path = os.path.join(cache_path, "embeddings") + semantic_dedup_path = os.path.join(cache_path, "semantic_dedup") + duplicates_path = os.path.join(output_path, "duplicates") + deduplicated_path = os.path.join(output_path, "deduplicated") + state_file = os.path.join(output_path, "semantic_id_generator.json") + + return { + "embeddings": embeddings_path, + "semantic_dedup": semantic_dedup_path, + "duplicates": duplicates_path, + "deduplicated": deduplicated_path, + "state_file": state_file, + } + + # Cloud-compatible version + def create_paths_cloud_way(cache_path, output_path): + """Cloud-compatible patterns.""" + embeddings_path = posixpath.join(cache_path, "embeddings") + semantic_dedup_path = posixpath.join(cache_path, "semantic_dedup") + duplicates_path = posixpath.join(output_path, "duplicates") + deduplicated_path = posixpath.join(output_path, "deduplicated") + state_file = posixpath.join(output_path, "semantic_id_generator.json") + + return { + "embeddings": embeddings_path, + "semantic_dedup": semantic_dedup_path, + "duplicates": duplicates_path, + "deduplicated": deduplicated_path, + "state_file": state_file, + } + + test_cases = [ + ("s3://bucket/cache", "s3://bucket/output"), + ("gs://my-bucket/cache", "gs://my-bucket/output"), + ("abfs://container@account.dfs.core.windows.net/cache", + "abfs://container@account.dfs.core.windows.net/output"), + ("/local/cache", "/local/output"), # Local paths should still work + ] + + for cache_path, output_path in test_cases: + old_paths = create_paths_old_way(cache_path, output_path) + new_paths = create_paths_cloud_way(cache_path, output_path) + + # Both should produce the same results on POSIX systems + for key in old_paths: + assert old_paths[key] == new_paths[key], f"Path mismatch for {key}: {old_paths[key]} != {new_paths[key]}" + + # Verify cloud URIs maintain their structure + if "://" in cache_path or "://" in output_path: + assert "://" in new_paths[key], f"Cloud protocol lost in {key}: {new_paths[key]}" + assert "/" in new_paths[key], f"Should use forward slashes in {key}: {new_paths[key]}" + + def test_semantic_dedupe_path_construction_robustness(self): + """Test robust path construction for complex cloud URI scenarios.""" + complex_cases = [ + # S3 with nested buckets and regions + ("s3://my-bucket-us-west-2/projects/nlp/cache", + "s3://my-bucket-us-west-2/projects/nlp/output"), + + # Google Cloud Storage with complex paths + ("gs://my-project-bucket/datasets/v2/cache", + "gs://my-project-bucket/datasets/v2/output"), + + # Azure Blob Storage with container and account + ("abfs://data@myaccount.dfs.core.windows.net/projects/dedup/cache", + "abfs://data@myaccount.dfs.core.windows.net/projects/dedup/output"), + + # HTTPS endpoints + ("https://storage.example.com/api/v1/cache", + "https://storage.example.com/api/v1/output"), + ] + + for cache_path, output_path in complex_cases: + # Test the path joining patterns from semantic.py + embeddings_path = posixpath.join(cache_path, "embeddings") + duplicates_path = posixpath.join(output_path, "duplicates") + state_file = posixpath.join(output_path, "semantic_id_generator.json") + + # Verify paths maintain their cloud structure + assert embeddings_path.startswith(cache_path), f"Embeddings path should start with cache: {embeddings_path}" + assert duplicates_path.startswith(output_path), f"Duplicates path should start with output: {duplicates_path}" + assert state_file.startswith(output_path), f"State file should start with output: {state_file}" + + # Verify no path corruption + protocol_count = cache_path.count("://") + assert embeddings_path.count("://") == protocol_count, f"Protocol corruption in embeddings: {embeddings_path}" + assert duplicates_path.count("://") == protocol_count, f"Protocol corruption in duplicates: {duplicates_path}" + + +class TestDownloadModuleSpecificIssues: + """Test specific issues found in download modules.""" + + def test_arxiv_download_path_operations(self): + """ + Test patterns from nemo_curator/stages/text/download/arxiv/ files. + + Found patterns like: + - download_dir = os.path.split(file_path)[0] + - bname = os.path.split(file_path)[-1] + - os.path.splitext(os.path.split(item)[-1])[0] + """ + # Problematic patterns from arxiv download files + def extract_download_info_old_way(file_path): + """Original patterns from arxiv download.""" + download_dir = os.path.split(file_path)[0] + bname = os.path.split(file_path)[-1] + name_without_ext = os.path.splitext(os.path.split(file_path)[-1])[0] + return download_dir, bname, name_without_ext + + # Cloud-compatible version + def extract_download_info_cloud_way(file_path): + """Cloud-compatible patterns.""" + download_dir = posixpath.dirname(file_path) + bname = posixpath.basename(file_path) + name_without_ext = posixpath.splitext(posixpath.basename(file_path))[0] + return download_dir, bname, name_without_ext + + test_file_paths = [ + "s3://arxiv-bucket/src/papers/2023/paper.tar.gz", + "gs://arxiv-mirror/papers/math/0601001.pdf", + "abfs://papers@storage.dfs.core.windows.net/cs/0601001.tar", + "/local/path/papers/paper.pdf", # Local paths should still work + ] + + for file_path in test_file_paths: + old_dir, old_name, old_name_no_ext = extract_download_info_old_way(file_path) + new_dir, new_name, new_name_no_ext = extract_download_info_cloud_way(file_path) + + # Results should be the same on POSIX systems + assert old_dir == new_dir, f"Directory extraction differs: {old_dir} != {new_dir}" + assert old_name == new_name, f"Filename extraction differs: {old_name} != {new_name}" + assert old_name_no_ext == new_name_no_ext, f"Name without ext differs: {old_name_no_ext} != {new_name_no_ext}" + + # Verify cloud URI structure is maintained in directory + if "://" in file_path: + assert "://" in new_dir, f"Protocol should be preserved in directory: {new_dir}" + assert "://" not in new_name, f"Protocol should not be in filename: {new_name}" + + def test_common_crawl_warc_iterator_patterns(self): + """ + Test patterns that might be found in WARC iterator modules. + + These often use pathlib.Path which can cause issues with cloud URIs. + """ + # Test that we handle path-like operations correctly for cloud URIs + cloud_warc_paths = [ + "s3://commoncrawl/crawl-data/CC-MAIN-2023-06/segments/warc.gz", + "gs://commoncrawl-mirror/2023/warc-files/segment.warc.gz", + "abfs://crawldata@storage.dfs.core.windows.net/warc/file.warc.gz", + ] + + for warc_path in cloud_warc_paths: + # Simulate operations that might be done on WARC paths + + # Extract directory (for organizing downloaded files) + directory = posixpath.dirname(warc_path) + assert directory.startswith(warc_path.split("://")[0] + "://"), f"Directory should maintain protocol: {directory}" + + # Extract filename (for local storage naming) + filename = posixpath.basename(warc_path) + assert "://" not in filename, f"Filename should not contain protocol: {filename}" + assert filename.endswith(".warc.gz") or filename.endswith(".gz"), f"Should preserve file extension: {filename}" + + # Extract name without extension (for processing logic) + name_part = posixpath.splitext(filename)[0] + if name_part.endswith(".warc"): + name_part = posixpath.splitext(name_part)[0] + + assert len(name_part) > 0, f"Should extract meaningful name: {name_part}" + assert "." not in name_part or name_part.count(".") < filename.count("."), f"Should remove extensions: {name_part}" + + +class TestGeneralCloudCompatibilityPatterns: + """Test general patterns that should be avoided in favor of fsspec.""" + + def test_problematic_os_operations_on_cloud_uris(self): + """Test operations that definitely don't work with cloud URIs.""" + cloud_uris = [ + "s3://bucket/file.txt", + "gs://bucket/data.json", + "abfs://container@account.dfs.core.windows.net/file.parquet" + ] + + for uri in cloud_uris: + # These operations should NOT work with cloud URIs + # (Testing that they fail as expected) + + # File existence - os.path.exists returns False for cloud URIs + assert os.path.exists(uri) is False, f"os.path.exists should be False for {uri}" + + # File size - os.path.getsize should fail + try: + os.path.getsize(uri) + assert False, f"os.path.getsize should fail for {uri}" + except (OSError, FileNotFoundError): + pass # Expected behavior + + # Directory check - os.path.isdir should be False + assert os.path.isdir(uri) is False, f"os.path.isdir should be False for {uri}" + + # File check - os.path.isfile should be False + assert os.path.isfile(uri) is False, f"os.path.isfile should be False for {uri}" + + def test_recommended_fsspec_patterns(self): + """Test the recommended fsspec patterns for cloud compatibility.""" + from unittest.mock import Mock + + cloud_uris = [ + "s3://bucket/file.txt", + "gs://bucket/data.json", + "abfs://container@account.dfs.core.windows.net/file.parquet" + ] + + for uri in cloud_uris: + # Mock fsspec filesystem + mock_fs = Mock() + mock_fs.exists.return_value = True + mock_fs.isdir.return_value = False + mock_fs.isfile.return_value = True + mock_fs.size.return_value = 12345 + + # These operations SHOULD work with fsspec + assert mock_fs.exists(uri) is True, f"fs.exists should work for {uri}" + assert mock_fs.isfile(uri) is True, f"fs.isfile should work for {uri}" + assert mock_fs.size(uri) == 12345, f"fs.size should work for {uri}" + + # Mock file operations + mock_file = Mock() + mock_file.read.return_value = b"test content" + mock_fs.open.return_value.__enter__ = Mock(return_value=mock_file) + mock_fs.open.return_value.__exit__ = Mock(return_value=None) + + # File reading should work + with mock_fs.open(uri, "rb") as f: + content = f.read() + assert content == b"test content", f"fs.open should work for {uri}" \ No newline at end of file From dc2acfab433b0b1a2518d183fae3641ee9983909 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 22:43:38 +0000 Subject: [PATCH 3/5] Fix cloud compatibility issues in Text Components by using fsspec and posixpath Co-authored-by: VibhuJawa <4837571+VibhuJawa@users.noreply.github.com> --- .../stages/text/deduplication/semantic.py | 11 +- .../stages/text/download/base/download.py | 35 +- .../stages/text/download/base/iterator.py | 5 +- .../stages/text/filters/fasttext_filter.py | 7 +- .../stages/text/filters/heuristic_filter.py | 4 +- nemo_curator/stages/text/utils/text_utils.py | 3 +- tests/stages/text/test_cloud_compatibility.py | 385 ------------------ .../text/test_cloud_compatibility_fixes.py | 261 ++++++++++++ .../stages/text/test_specific_cloud_issues.py | 351 ---------------- 9 files changed, 307 insertions(+), 755 deletions(-) delete mode 100644 tests/stages/text/test_cloud_compatibility.py create mode 100644 tests/stages/text/test_cloud_compatibility_fixes.py delete mode 100644 tests/stages/text/test_specific_cloud_issues.py diff --git a/nemo_curator/stages/text/deduplication/semantic.py b/nemo_curator/stages/text/deduplication/semantic.py index d4e096dfa..416d4f06b 100644 --- a/nemo_curator/stages/text/deduplication/semantic.py +++ b/nemo_curator/stages/text/deduplication/semantic.py @@ -22,6 +22,7 @@ """ import os +import posixpath import time from dataclasses import dataclass, field from typing import Any, Literal @@ -179,14 +180,14 @@ def __post_init__(self): self.cache_path = self.cache_path or self.output_path # Intermediate paths - self.embeddings_path = os.path.join(self.cache_path, "embeddings") - self.semantic_dedup_path = os.path.join(self.cache_path, "semantic_dedup") + self.embeddings_path = posixpath.join(self.cache_path, "embeddings") + self.semantic_dedup_path = posixpath.join(self.cache_path, "semantic_dedup") # Output paths - self.duplicates_path = None if self.eps is None else os.path.join(self.output_path, "duplicates") + self.duplicates_path = None if self.eps is None else posixpath.join(self.output_path, "duplicates") self.deduplicated_output_path = ( - None if not self.perform_removal else os.path.join(self.output_path, "deduplicated") + None if not self.perform_removal else posixpath.join(self.output_path, "deduplicated") ) - self.id_generator_state_file = os.path.join(self.output_path, "semantic_id_generator.json") + self.id_generator_state_file = posixpath.join(self.output_path, "semantic_id_generator.json") self._validate_config() diff --git a/nemo_curator/stages/text/download/base/download.py b/nemo_curator/stages/text/download/base/download.py index 86687963b..603977339 100644 --- a/nemo_curator/stages/text/download/base/download.py +++ b/nemo_curator/stages/text/download/base/download.py @@ -13,11 +13,13 @@ # limitations under the License. import os +import posixpath import subprocess from abc import ABC, abstractmethod from dataclasses import dataclass from typing import Any +import fsspec from loguru import logger from nemo_curator.stages.base import ProcessingStage @@ -37,7 +39,9 @@ def __init__(self, download_dir: str, verbose: bool = False): """ self._download_dir = download_dir self._verbose = verbose - os.makedirs(download_dir, exist_ok=True) + # Use fsspec for cloud-compatible directory creation + fs, _ = fsspec.core.url_to_fs(download_dir) + fs.makedirs(download_dir, exist_ok=True) def _check_s5cmd_installed(self) -> bool: """Check if s5cmd is installed.""" @@ -87,14 +91,24 @@ def download(self, url: str) -> str | None: """ # Generate output filename output_name = self._get_output_filename(url) - output_file = os.path.join(self._download_dir, output_name) + output_file = posixpath.join(self._download_dir, output_name) temp_file = output_file + ".tmp" + # Use fsspec for cloud-compatible file operations + fs, _ = fsspec.core.url_to_fs(output_file) + # If final file exists and is non-empty, assume it's complete - if os.path.exists(output_file) and os.path.getsize(output_file) > 0: - if self._verbose: - logger.info(f"File: {output_file} exists. Not downloading") - return output_file + if fs.exists(output_file): + try: + file_info = fs.info(output_file) + file_size = file_info.get("size", 0) + if file_size > 0: + if self._verbose: + logger.info(f"File: {output_file} exists. Not downloading") + return output_file + except Exception: + # If we can't get file info, proceed with download + pass # Download to temporary file success, error_message = self._download_to_path(url, temp_file) @@ -103,8 +117,13 @@ def download(self, url: str) -> str | None: # Download successful, atomically move temp file to final location os.rename(temp_file, output_file) if self._verbose: - file_size = os.path.getsize(output_file) - logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)") + try: + fs, _ = fsspec.core.url_to_fs(output_file) + file_info = fs.info(output_file) + file_size = file_info.get("size", 0) + logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)") + except Exception: + logger.info(f"Successfully downloaded to {output_file}") return output_file else: # Download failed diff --git a/nemo_curator/stages/text/download/base/iterator.py b/nemo_curator/stages/text/download/base/iterator.py index 9b5e93890..d6a8e3e17 100644 --- a/nemo_curator/stages/text/download/base/iterator.py +++ b/nemo_curator/stages/text/download/base/iterator.py @@ -13,6 +13,7 @@ # limitations under the License. import os +import posixpath from abc import ABC, abstractmethod from collections.abc import Iterator from dataclasses import dataclass @@ -89,8 +90,8 @@ def process(self, task: FileGroupTask) -> DocumentBatch: if self.record_limit and record_count >= self.record_limit: break if self.add_filename_column: - # TODO: Support cloud storage https://github.com/NVIDIA-NeMo/Curator/issues/779 - record_dict[self.filename_col] = os.path.basename(file_path) # type: ignore[reportReturnType] + # Use posixpath for cloud storage compatibility + record_dict[self.filename_col] = posixpath.basename(file_path) # type: ignore[reportReturnType] records.append(record_dict) record_count += 1 diff --git a/nemo_curator/stages/text/filters/fasttext_filter.py b/nemo_curator/stages/text/filters/fasttext_filter.py index 75e6a070c..53d24a8cf 100644 --- a/nemo_curator/stages/text/filters/fasttext_filter.py +++ b/nemo_curator/stages/text/filters/fasttext_filter.py @@ -15,6 +15,7 @@ import os import fasttext +import fsspec import numpy as np from nemo_curator.stages.text.filters.doc_filter import DocumentFilter @@ -32,7 +33,8 @@ def __init__(self, model_path: str | None = None, label: str = "__label__hq", al self._name = "fasttext_quality_filter" def model_check_or_download(self) -> None: - if not os.path.exists(self._model_path): + fs, _ = fsspec.core.url_to_fs(self._model_path) + if not fs.exists(self._model_path): msg = f"Model file {self._model_path} not found" raise FileNotFoundError(msg) @@ -66,7 +68,8 @@ def __init__(self, model_path: str | None = None, min_langid_score: float = 0.3) self._name = "lang_id" def model_check_or_download(self) -> None: - if not os.path.exists(self._model_path): + fs, _ = fsspec.core.url_to_fs(self._model_path) + if not fs.exists(self._model_path): msg = f"Model file {self._model_path} not found" raise FileNotFoundError(msg) diff --git a/nemo_curator/stages/text/filters/heuristic_filter.py b/nemo_curator/stages/text/filters/heuristic_filter.py index cfccce2c0..0124a6b64 100644 --- a/nemo_curator/stages/text/filters/heuristic_filter.py +++ b/nemo_curator/stages/text/filters/heuristic_filter.py @@ -16,6 +16,7 @@ import tarfile from typing import Literal +import fsspec import huggingface_hub import requests from platformdirs import user_cache_dir @@ -789,7 +790,8 @@ def _download_histograms(self) -> None: raise requests.exceptions.RequestException(msg) # Open a file to write the content - os.makedirs(self._cache_dir, exist_ok=True) + fs, _ = fsspec.core.url_to_fs(self._cache_dir) + fs.makedirs(self._cache_dir, exist_ok=True) download_dest_path = os.path.join(self._cache_dir, "histograms.tar.gz") with open(download_dest_path, "wb") as file: file.write(response.content) diff --git a/nemo_curator/stages/text/utils/text_utils.py b/nemo_curator/stages/text/utils/text_utils.py index b706fad72..1f920e22d 100644 --- a/nemo_curator/stages/text/utils/text_utils.py +++ b/nemo_curator/stages/text/utils/text_utils.py @@ -14,6 +14,7 @@ import ast import os +import posixpath import string import tokenize import warnings @@ -167,7 +168,7 @@ def get_docstrings(source: str, module: str = "") -> list[str]: """Parse Python source code from file or string and print docstrings.""" if hasattr(source, "read"): filename = getattr(source, "name", module) - module = os.path.splitext(os.path.basename(filename))[0] + module = posixpath.splitext(posixpath.basename(filename))[0] source = source.read() docstrings = sorted(parse_docstrings(source), key=lambda x: (NODE_TYPES.get(type(x[0])), x[1])) diff --git a/tests/stages/text/test_cloud_compatibility.py b/tests/stages/text/test_cloud_compatibility.py deleted file mode 100644 index 2f7b01151..000000000 --- a/tests/stages/text/test_cloud_compatibility.py +++ /dev/null @@ -1,385 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Cloud compatibility tests for Text Components. - -This module tests that Text Components use fsspec for cloud storage URIs -instead of os/pathlib/glob/shutil which don't work with s3://, gs://, abfs://, etc. -""" - -import os -import posixpath -import tempfile -from unittest.mock import Mock, patch - -import pytest - -# We can't import the actual modules due to dependencies, so we'll test the patterns - - -class TestTextUtilsCloudCompatibilityPatterns: - """Test that text_utils functions handle cloud URIs correctly.""" - - def test_filename_extraction_from_cloud_uris(self): - """Test filename extraction patterns used in text_utils.py.""" - # Simulate the pattern from text_utils.py line 170: - # module = os.path.splitext(os.path.basename(filename))[0] - - test_cases = [ - ("s3://bucket/path/to/file.py", "file"), - ("gs://my-bucket/deep/nested/path/script.py", "script"), - ("abfs://container@account.dfs.core.windows.net/data/code.py", "code"), - ("https://example.com/api/v1/source.py", "source"), - ("/local/path/local_file.py", "local_file"), # Local files should still work - ] - - for filename, expected_module_name in test_cases: - # Current problematic pattern (but works on POSIX systems) - os_module = os.path.splitext(os.path.basename(filename))[0] - - # Recommended cloud-compatible pattern - posix_module = posixpath.splitext(posixpath.basename(filename))[0] - - # Both should work on POSIX systems, but posixpath is more explicit for cloud URIs - assert os_module == expected_module_name, f"os.path pattern failed for {filename}" - assert posix_module == expected_module_name, f"posixpath pattern failed for {filename}" - - # Verify the posixpath approach is cloud-compatible - assert posix_module == expected_module_name - - def test_cloud_uri_filename_extraction_robustness(self): - """Test robustness of filename extraction with various cloud URI formats.""" - edge_cases = [ - ("s3://bucket/file", "file"), # No extension - ("gs://bucket/path.with.dots/file.tar.gz", "file.tar"), # Multiple dots - ("abfs://container@account.dfs.core.windows.net/", ""), # Directory ending - ("https://example.com/path/", ""), # Directory ending with slash - ("s3://bucket/dir/file.json", "file"), # JSON file - ] - - for uri, expected in edge_cases: - result = posixpath.splitext(posixpath.basename(uri))[0] - assert result == expected, f"Failed for {uri}: got {result}, expected {expected}" - - -class TestSemanticDedupeCloudCompatibilityPatterns: - """Test semantic deduplication path construction patterns.""" - - def test_path_joining_for_cloud_uris(self): - """Test path joining patterns used in semantic.py.""" - # Simulate the patterns from semantic.py lines 182-189: - # self.embeddings_path = os.path.join(self.cache_path, "embeddings") - - cloud_base_paths = [ - "s3://my-bucket/dedup-cache", - "gs://bucket/cache", - "abfs://container@account.dfs.core.windows.net/cache", - "https://storage.example.com/cache", - "/local/cache/path", # Local paths should still work - ] - - subdirs = ["embeddings", "semantic_dedup", "duplicates"] - - for base_path in cloud_base_paths: - for subdir in subdirs: - # Current pattern (works on POSIX but may have issues on Windows with protocols) - os_path = os.path.join(base_path, subdir) - - # Recommended cloud-compatible pattern - posix_path = posixpath.join(base_path, subdir) - - # Both should produce valid paths on POSIX systems - assert os_path.startswith(base_path), f"os.path result should start with base: {os_path}" - assert posix_path.startswith(base_path), f"posixpath result should start with base: {posix_path}" - - # Verify cloud URIs maintain their structure - if "://" in base_path: - assert "://" in posix_path, f"Cloud protocol should be preserved: {posix_path}" - assert "/" in posix_path, f"Should use forward slashes: {posix_path}" - assert "\\" not in posix_path, f"Should not contain backslashes: {posix_path}" - - def test_multiple_path_components_joining(self): - """Test joining multiple path components for complex directory structures.""" - base_paths = [ - "s3://bucket/base", - "gs://my-bucket/data", - "abfs://container@account.dfs.core.windows.net/root" - ] - - # Test patterns like: os.path.join(base, "output", "duplicates", "final") - components = ["output", "duplicates", "final"] - - for base in base_paths: - # Use posixpath for cloud-compatible joining - result = base - for component in components: - result = posixpath.join(result, component) - - expected = f"{base}/output/duplicates/final" - assert result == expected, f"Multi-component join failed: {result} != {expected}" - - # Verify cloud URI structure is maintained - assert result.startswith(base), "Should start with base path" - assert "://" in result, "Should maintain protocol" - assert result.count("://") == 1, "Should have exactly one protocol marker" - - -class TestFsspecCloudOperations: - """Test that fsspec operations work correctly with cloud URIs.""" - - def test_fsspec_vs_os_file_existence_checks(self): - """Test file existence patterns: os.path.exists vs fs.exists.""" - # Mock fsspec filesystem - with tempfile.NamedTemporaryFile() as tmp_file: - local_path = tmp_file.name - - # Test with local file - both should work - assert os.path.exists(local_path) is True - - # Mock fsspec for cloud URIs - mock_fs = Mock() - mock_fs.exists.return_value = True - - cloud_uris = [ - "s3://bucket/file.txt", - "gs://bucket/file.txt", - "abfs://container@account.dfs.core.windows.net/file.txt" - ] - - for uri in cloud_uris: - # os.path.exists would return False for cloud URIs (WRONG) - assert os.path.exists(uri) is False, f"os.path.exists should fail for {uri}" - - # fs.exists should work for cloud URIs (CORRECT) - assert mock_fs.exists(uri) is True, f"fs.exists should work for {uri}" - - def test_fsspec_vs_builtin_file_opening(self): - """Test file opening patterns: open() vs fs.open().""" - cloud_uris = [ - "s3://bucket/file.txt", - "gs://bucket/file.txt", - "abfs://container@account.dfs.core.windows.net/file.txt" - ] - - for uri in cloud_uris: - # open(uri) would fail for cloud URIs - with pytest.raises((OSError, FileNotFoundError)): - open(uri, "r") # This should fail - - # Mock fs.open() to simulate cloud success - mock_fs = Mock() - mock_file = Mock() - mock_file.read.return_value = "test content" - mock_fs.open.return_value.__enter__ = Mock(return_value=mock_file) - mock_fs.open.return_value.__exit__ = Mock(return_value=None) - - # fs.open should work for cloud URIs (CORRECT) - with mock_fs.open(uri, "r") as f: - content = f.read() - assert content == "test content" - - def test_fsspec_vs_os_directory_operations(self): - """Test directory listing: os.listdir vs fs.ls.""" - cloud_dirs = [ - "s3://bucket/folder/", - "gs://bucket/data/", - "abfs://container@account.dfs.core.windows.net/path/" - ] - - for uri in cloud_dirs: - # os.listdir would fail for cloud URIs - with pytest.raises((OSError, FileNotFoundError)): - os.listdir(uri) # This should fail - - # Mock fs.ls() to simulate cloud success - mock_fs = Mock() - mock_files = ["file1.txt", "file2.json", "subdir/"] - mock_fs.ls.return_value = mock_files - - # fs.ls should work for cloud URIs (CORRECT) - files = mock_fs.ls(uri) - assert files == mock_files - - def test_fsspec_vs_glob_pattern_matching(self): - """Test glob operations: glob.glob vs fs.glob.""" - import glob - - cloud_patterns = [ - "s3://bucket/**/*.json", - "gs://bucket/data/*.txt", - "abfs://container@account.dfs.core.windows.net/logs/**/*.log" - ] - - for pattern in cloud_patterns: - # glob.glob would return empty list for cloud URIs (WRONG) - assert glob.glob(pattern) == [], f"glob.glob should return empty for {pattern}" - - # Mock fs.glob() to simulate cloud success - mock_fs = Mock() - mock_matches = ["file1.json", "subdir/file2.json"] - mock_fs.glob.return_value = mock_matches - - # fs.glob should work for cloud URIs (CORRECT) - matches = mock_fs.glob(pattern) - assert matches == mock_matches - - def test_fsspec_vs_os_directory_creation_removal(self): - """Test directory creation/removal: os.makedirs vs fs.makedirs.""" - cloud_dirs = [ - "s3://bucket/new-folder/", - "gs://bucket/output/", - "abfs://container@account.dfs.core.windows.net/temp/" - ] - - for uri in cloud_dirs: - # os.makedirs might not fail on all systems but would create wrong local directories - # The point is that it shouldn't be used for cloud URIs - try: - os.makedirs(uri, exist_ok=True) - # If it doesn't fail, it probably created a local directory with the wrong name - # This is the problematic behavior we want to avoid - print(f"Warning: os.makedirs({uri}) didn't fail - this creates wrong local paths") - except (OSError, FileNotFoundError): - # This is the expected behavior on most systems - pass - - # Mock fs operations to simulate cloud success - mock_fs = Mock() - mock_fs.makedirs = Mock() - mock_fs.rm = Mock() - - # fs.makedirs should work for cloud URIs (CORRECT) - mock_fs.makedirs(uri, exist_ok=True) - mock_fs.makedirs.assert_called_once_with(uri, exist_ok=True) - - # fs.rm should work for cloud URIs (CORRECT) - mock_fs.rm(uri, recursive=True) - mock_fs.rm.assert_called_once_with(uri, recursive=True) - - -class TestCloudUriNormalizationPatterns: - """Test cloud URI path normalization and manipulation.""" - - def test_posixpath_vs_os_path_normalization(self): - """Test path normalization for cloud URIs.""" - test_cases = [ - # Note: normpath can collapse // to / which breaks protocols - # This test shows why we need careful handling of cloud URIs - ("s3://bucket/folder/../other/file.json", "gs://bucket/other/file.json"), - ("abfs://container@account.dfs.core.windows.net/a/./b/c", - "abfs://container@account.dfs.core.windows.net/a/b/c"), - ] - - for input_path, _ in test_cases: - # os.path.normpath might mangle protocols or give wrong results - os_result = os.path.normpath(input_path) - - # posixpath.normpath is more appropriate for cloud URIs - posix_result = posixpath.normpath(input_path) - - # Both might have issues with double slashes in protocols - # The key point is that we should use fsspec's url_to_fs instead - print(f"Input: {input_path}") - print(f" os.path.normpath: {os_result}") - print(f" posixpath.normpath: {posix_result}") - - # The lesson: don't use normpath directly on cloud URIs - # Use fsspec.core.url_to_fs() to separate protocol from path - # Then normalize the path component only - - def test_cloud_uri_component_extraction(self): - """Test extracting components from cloud URIs safely.""" - test_uris = [ - "s3://bucket/path/to/file.ext", - "gs://my-bucket/deep/nested/path/data.json", - "abfs://container@account.dfs.core.windows.net/dir/subdir/file.parquet", - "https://storage.example.com/api/v1/data.xml" - ] - - for uri in test_uris: - # Safe way to extract directory - directory = posixpath.dirname(uri) - assert directory.startswith(uri.split("/")[0] + "//"), f"Directory should maintain protocol: {directory}" - - # Safe way to extract filename - filename = posixpath.basename(uri) - assert "://" not in filename, f"Filename should not contain protocol: {filename}" - - # Safe way to extract extension - name, ext = posixpath.splitext(filename) - assert ext.startswith(".") or ext == "", f"Extension should start with dot or be empty: {ext}" - - def test_relative_path_resolution_for_cloud_uris(self): - """Test that relative paths work correctly with cloud URIs.""" - # This test shows the challenges with relative paths in cloud URIs - # The recommended approach is to avoid relative paths with cloud URIs - # or use fsspec.core.url_to_fs() to handle the protocol separately - - base_uris = [ - "s3://bucket/project", - "gs://my-bucket/workspace", - "abfs://container@account.dfs.core.windows.net/base" - ] - - # Safe relative paths that don't go above the base - safe_relative_paths = ["./temp", "subfolder/data", "output/results"] - - for base in base_uris: - for rel_path in safe_relative_paths: - # Use posixpath for cloud-safe relative path resolution - result = posixpath.join(base, rel_path) - - # Verify protocol is maintained - protocol = base.split("://")[0] - assert result.startswith(f"{protocol}://"), f"Protocol should be maintained: {result}" - - # Verify path structure makes sense - assert base in result, f"Base should be contained in result: {result}" - - def test_fsspec_url_to_fs_recommended_pattern(self): - """Test the recommended fsspec pattern for handling cloud URIs safely.""" - # This is the RECOMMENDED way to handle cloud URIs - - test_uris = [ - "s3://bucket/path/file.txt", - "gs://bucket/data.json", - "abfs://container@account.dfs.core.windows.net/file.parquet" - ] - - for uri in test_uris: - # Mock fsspec.core.url_to_fs behavior - def mock_url_to_fs(url): - if url.startswith("s3://"): - return Mock(), url[5:] - elif url.startswith("gs://"): - return Mock(), url[5:] - elif url.startswith("abfs://"): - return Mock(), url[7:] - else: - return Mock(), url - - # Simulate the recommended pattern - fs, path = mock_url_to_fs(uri) - - # Now we can safely use posixpath operations on the path - dirname = posixpath.dirname(path) - basename = posixpath.basename(path) - - # And join additional path components safely - new_path = posixpath.join(path, "subfolder", "newfile.txt") - - # Verify no protocol mangling - assert "://" not in path, f"Path should not contain protocol: {path}" - assert "://" not in new_path, f"New path should not contain protocol: {new_path}" \ No newline at end of file diff --git a/tests/stages/text/test_cloud_compatibility_fixes.py b/tests/stages/text/test_cloud_compatibility_fixes.py new file mode 100644 index 000000000..b9bc484c5 --- /dev/null +++ b/tests/stages/text/test_cloud_compatibility_fixes.py @@ -0,0 +1,261 @@ +# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +""" +Tests for cloud compatibility fixes in Text Components. + +This module tests that Text Components properly use fsspec and posixpath +for cloud storage URIs instead of os/pathlib operations. +""" + +import posixpath +from io import StringIO +from unittest.mock import Mock, patch + +import pytest + +from nemo_curator.stages.text.utils.text_utils import get_docstrings + + +class TestTextUtilsCloudFixes: + """Test cloud compatibility fixes in text_utils.py.""" + + def test_get_docstrings_with_cloud_uri(self): + """Test that get_docstrings handles cloud URIs correctly.""" + # Create a mock file-like object with a cloud URI name + source_code = ''' +def example_function(): + """This is a docstring.""" + pass +''' + mock_file = StringIO(source_code) + # Simulate a cloud URI filename + mock_file.name = "s3://bucket/path/to/script.py" + + # This should work without errors using posixpath operations + result = get_docstrings(mock_file) + + # Verify the result contains the expected docstring + assert len(result) > 0 + assert "This is a docstring." in str(result) + + def test_get_docstrings_filename_extraction_patterns(self): + """Test filename extraction works with various cloud URI patterns.""" + test_cases = [ + ("s3://bucket/path/to/file.py", "file"), + ("gs://my-bucket/deep/nested/path/script.py", "script"), + ("abfs://container@account.dfs.core.windows.net/data/code.py", "code"), + ("https://example.com/api/v1/source.py", "source"), + ("/local/path/local_file.py", "local_file"), # Local files should still work + ] + + for uri, expected_module_name in test_cases: + # Test the pattern that's now used in the fixed code + module_name = posixpath.splitext(posixpath.basename(uri))[0] + assert module_name == expected_module_name, f"Failed for URI: {uri}" + + +class TestSemanticDeduplicationCloudFixes: + """Test cloud compatibility fixes in semantic deduplication.""" + + def test_path_construction_patterns(self): + """Test that path construction uses posixpath for cloud compatibility.""" + # Test the patterns now used in the fixed semantic.py + base_path = "s3://bucket/cache" + output_path = "gs://bucket/output" + + # These are the patterns now used in the fixed code + embeddings_path = posixpath.join(base_path, "embeddings") + semantic_dedup_path = posixpath.join(base_path, "semantic_dedup") + duplicates_path = posixpath.join(output_path, "duplicates") + deduplicated_path = posixpath.join(output_path, "deduplicated") + state_file = posixpath.join(output_path, "semantic_id_generator.json") + + # Verify the paths are constructed correctly + assert embeddings_path == "s3://bucket/cache/embeddings" + assert semantic_dedup_path == "s3://bucket/cache/semantic_dedup" + assert duplicates_path == "gs://bucket/output/duplicates" + assert deduplicated_path == "gs://bucket/output/deduplicated" + assert state_file == "gs://bucket/output/semantic_id_generator.json" + + def test_complex_cloud_uri_handling(self): + """Test complex cloud URI scenarios.""" + test_uris = [ + "s3://my-bucket/path/with/multiple/levels/", + "gs://another-bucket/data/2024/01/15/", + "abfs://container@storage.dfs.core.windows.net/datasets/processed/", + ] + + for base_uri in test_uris: + # Test subdirectory creation patterns + subdir = posixpath.join(base_uri, "embeddings") + assert subdir.startswith(base_uri) + assert subdir.endswith("embeddings") + + +class TestDownloadCloudFixes: + """Test cloud compatibility fixes in download modules.""" + + @patch('fsspec.core.url_to_fs') + def test_download_file_operations(self, mock_url_to_fs): + """Test that download operations use fsspec for cloud URIs.""" + # Mock fsspec filesystem + mock_fs = Mock() + mock_fs.makedirs.return_value = None + mock_fs.exists.return_value = True + mock_fs.info.return_value = {"size": 1024} + mock_url_to_fs.return_value = (mock_fs, "bucket/path") + + # Import after patching to ensure mock is used + from nemo_curator.stages.text.download.base.download import DocumentDownloader + + # Create a concrete subclass for testing + class TestDownloader(DocumentDownloader): + def _get_output_filename(self, url: str) -> str: + return "test_file.txt" + + def _download_to_path(self, url: str, path: str) -> tuple[bool, str]: + return True, "" + + # Test cloud URI download directory + cloud_download_dir = "s3://test-bucket/downloads/" + downloader = TestDownloader(cloud_download_dir) + + # Verify fsspec was called for directory creation + mock_url_to_fs.assert_called() + mock_fs.makedirs.assert_called_with(cloud_download_dir, exist_ok=True) + + def test_filename_extraction_from_cloud_paths(self): + """Test filename extraction from cloud paths.""" + test_cases = [ + ("s3://bucket/path/to/file.txt", "file.txt"), + ("gs://my-bucket/data/document.pdf", "document.pdf"), + ("abfs://container@account.dfs.core.windows.net/files/archive.zip", "archive.zip"), + ("https://example.com/downloads/data.json", "data.json"), + ] + + for cloud_path, expected_filename in test_cases: + # Test the pattern now used in the fixed iterator.py + filename = posixpath.basename(cloud_path) + assert filename == expected_filename, f"Failed for path: {cloud_path}" + + +class TestFilterCloudFixes: + """Test cloud compatibility fixes in filter modules.""" + + @patch('fsspec.core.url_to_fs') + def test_fasttext_filter_model_check(self, mock_url_to_fs): + """Test that FastText filter uses fsspec for model file checks.""" + # Mock fsspec filesystem + mock_fs = Mock() + mock_fs.exists.return_value = True + mock_url_to_fs.return_value = (mock_fs, "bucket/path") + + # Import after patching + from nemo_curator.stages.text.filters.fasttext_filter import FastTextQualityFilter + + # Test cloud URI model path + cloud_model_path = "s3://models/fasttext_quality.bin" + filter_instance = FastTextQualityFilter(model_path=cloud_model_path) + + # This should not raise an exception with fsspec + filter_instance.model_check_or_download() + + # Verify fsspec was used + mock_url_to_fs.assert_called_with(cloud_model_path) + mock_fs.exists.assert_called_with(cloud_model_path) + + @patch('fsspec.core.url_to_fs') + def test_heuristic_filter_cache_directory(self, mock_url_to_fs): + """Test that heuristic filter uses fsspec for cache directory creation.""" + # Mock fsspec filesystem + mock_fs = Mock() + mock_fs.makedirs.return_value = None + mock_url_to_fs.return_value = (mock_fs, "bucket/path") + + # We can't easily test the full heuristic filter due to dependencies, + # but we can test the pattern directly + cache_dir = "s3://bucket/cache/" + + # This is the pattern now used in the fixed code + fs, _ = mock_url_to_fs(cache_dir) + fs.makedirs(cache_dir, exist_ok=True) + + # Verify fsspec was used + mock_url_to_fs.assert_called_with(cache_dir) + mock_fs.makedirs.assert_called_with(cache_dir, exist_ok=True) + + +class TestCloudCompatibilityIntegration: + """Integration tests for cloud compatibility across components.""" + + def test_end_to_end_cloud_uri_patterns(self): + """Test that common cloud URI patterns work across all fixed components.""" + cloud_uris = [ + "s3://my-data-bucket/datasets/train/", + "gs://ml-models/embeddings/bert/", + "abfs://data@storage.dfs.core.windows.net/processed/", + "https://api.example.com/v1/data/", + ] + + for uri in cloud_uris: + # Test path construction (semantic deduplication pattern) + embeddings_path = posixpath.join(uri, "embeddings") + assert embeddings_path.startswith(uri) + + # Test filename extraction (download pattern) + test_file_path = posixpath.join(uri, "test_file.json") + filename = posixpath.basename(test_file_path) + assert filename == "test_file.json" + + # Test module name extraction (text_utils pattern) + script_path = posixpath.join(uri, "script.py") + module_name = posixpath.splitext(posixpath.basename(script_path))[0] + assert module_name == "script" + + def test_backward_compatibility_with_local_paths(self): + """Ensure fixes don't break local filesystem operations.""" + local_paths = [ + "/home/user/data/", + "./local_data/", + "../relative/path/", + "simple_filename.txt", + ] + + for path in local_paths: + # All the fixed patterns should work with local paths too + subpath = posixpath.join(path, "subdir") + filename = posixpath.basename(subpath) + + # These operations should succeed without errors + assert isinstance(subpath, str) + assert isinstance(filename, str) + + def test_error_handling_for_invalid_uris(self): + """Test that invalid URIs are handled gracefully.""" + invalid_uris = [ + "", + "invalid://bad-protocol/path", + "s3://", # Missing bucket + "gs:///no-bucket", + ] + + for uri in invalid_uris: + # The posixpath operations should not crash on invalid URIs + try: + result = posixpath.basename(uri) + assert isinstance(result, str) + except Exception: + # If an exception occurs, it should be a reasonable one + pass \ No newline at end of file diff --git a/tests/stages/text/test_specific_cloud_issues.py b/tests/stages/text/test_specific_cloud_issues.py deleted file mode 100644 index b418669f4..000000000 --- a/tests/stages/text/test_specific_cloud_issues.py +++ /dev/null @@ -1,351 +0,0 @@ -# Copyright (c) 2025, NVIDIA CORPORATION. All rights reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - -""" -Tests for specific cloud compatibility issues found in text components. - -This module tests the specific problematic patterns identified in the codebase -and ensures they can be fixed to work with cloud URIs. -""" - -import os -import posixpath - - -class TestTextUtilsSpecificIssues: - """Test specific issues found in nemo_curator/stages/text/utils/text_utils.py.""" - - def test_line_170_os_path_basename_splitext_issue(self): - """ - Test the specific issue on line 170 of text_utils.py: - module = os.path.splitext(os.path.basename(filename))[0] - - This pattern should be cloud-compatible. - """ - # The problematic code pattern from line 170 - def extract_module_name_old_way(filename): - """Original problematic pattern.""" - return os.path.splitext(os.path.basename(filename))[0] - - # The cloud-compatible version - def extract_module_name_cloud_way(filename): - """Cloud-compatible pattern.""" - return posixpath.splitext(posixpath.basename(filename))[0] - - test_filenames = [ - ("s3://bucket/path/to/file.py", "file"), - ("gs://my-bucket/deep/nested/script.py", "script"), - ("abfs://container@account.dfs.core.windows.net/code.py", "code"), - ("https://example.com/api/source.py", "source"), - ("/local/path/local_file.py", "local_file"), # Still works for local - ] - - for filename, expected_module in test_filenames: - # Both should work on POSIX systems, but cloud version is more explicit - old_result = extract_module_name_old_way(filename) - new_result = extract_module_name_cloud_way(filename) - - assert old_result == expected_module, f"Old way failed for {filename}" - assert new_result == expected_module, f"New way failed for {filename}" - assert old_result == new_result, f"Results differ for {filename}" - - def test_get_docstrings_mock_file_handling(self): - """ - Test how the get_docstrings function handles file-like objects with cloud URIs. - - This simulates the pattern where a file-like object has a .name attribute - that contains a cloud URI. - """ - from unittest.mock import Mock - - # Simulate the get_docstrings logic for filename extraction - def simulate_get_docstrings_filename_logic(source): - """Simulate the logic from get_docstrings function.""" - if hasattr(source, "read"): - filename = getattr(source, "name", "") - # This is the problematic line 170 - module = os.path.splitext(os.path.basename(filename))[0] - return module - return "" - - cloud_uris = [ - "s3://bucket/path/file.py", - "gs://bucket/script.py", - "abfs://container@account.dfs.core.windows.net/code.py", - "https://example.com/source.py" - ] - - for cloud_uri in cloud_uris: - mock_file = Mock() - mock_file.name = cloud_uri - mock_file.read.return_value = "# Python source code" - - # This should extract the correct module name - module_name = simulate_get_docstrings_filename_logic(mock_file) - expected_name = posixpath.splitext(posixpath.basename(cloud_uri))[0] - - assert module_name == expected_name, f"Module extraction failed for {cloud_uri}" - - -class TestSemanticDedupeSpecificIssues: - """Test specific issues found in nemo_curator/stages/text/deduplication/semantic.py.""" - - def test_lines_182_189_os_path_join_issues(self): - """ - Test the specific issues on lines 182-189 of semantic.py: - - self.embeddings_path = os.path.join(self.cache_path, "embeddings") - - self.semantic_dedup_path = os.path.join(self.cache_path, "semantic_dedup") - - etc. - - These patterns should be cloud-compatible. - """ - # Simulate the problematic pattern from semantic.py - def create_paths_old_way(cache_path, output_path): - """Original problematic patterns from lines 182-189.""" - embeddings_path = os.path.join(cache_path, "embeddings") - semantic_dedup_path = os.path.join(cache_path, "semantic_dedup") - duplicates_path = os.path.join(output_path, "duplicates") - deduplicated_path = os.path.join(output_path, "deduplicated") - state_file = os.path.join(output_path, "semantic_id_generator.json") - - return { - "embeddings": embeddings_path, - "semantic_dedup": semantic_dedup_path, - "duplicates": duplicates_path, - "deduplicated": deduplicated_path, - "state_file": state_file, - } - - # Cloud-compatible version - def create_paths_cloud_way(cache_path, output_path): - """Cloud-compatible patterns.""" - embeddings_path = posixpath.join(cache_path, "embeddings") - semantic_dedup_path = posixpath.join(cache_path, "semantic_dedup") - duplicates_path = posixpath.join(output_path, "duplicates") - deduplicated_path = posixpath.join(output_path, "deduplicated") - state_file = posixpath.join(output_path, "semantic_id_generator.json") - - return { - "embeddings": embeddings_path, - "semantic_dedup": semantic_dedup_path, - "duplicates": duplicates_path, - "deduplicated": deduplicated_path, - "state_file": state_file, - } - - test_cases = [ - ("s3://bucket/cache", "s3://bucket/output"), - ("gs://my-bucket/cache", "gs://my-bucket/output"), - ("abfs://container@account.dfs.core.windows.net/cache", - "abfs://container@account.dfs.core.windows.net/output"), - ("/local/cache", "/local/output"), # Local paths should still work - ] - - for cache_path, output_path in test_cases: - old_paths = create_paths_old_way(cache_path, output_path) - new_paths = create_paths_cloud_way(cache_path, output_path) - - # Both should produce the same results on POSIX systems - for key in old_paths: - assert old_paths[key] == new_paths[key], f"Path mismatch for {key}: {old_paths[key]} != {new_paths[key]}" - - # Verify cloud URIs maintain their structure - if "://" in cache_path or "://" in output_path: - assert "://" in new_paths[key], f"Cloud protocol lost in {key}: {new_paths[key]}" - assert "/" in new_paths[key], f"Should use forward slashes in {key}: {new_paths[key]}" - - def test_semantic_dedupe_path_construction_robustness(self): - """Test robust path construction for complex cloud URI scenarios.""" - complex_cases = [ - # S3 with nested buckets and regions - ("s3://my-bucket-us-west-2/projects/nlp/cache", - "s3://my-bucket-us-west-2/projects/nlp/output"), - - # Google Cloud Storage with complex paths - ("gs://my-project-bucket/datasets/v2/cache", - "gs://my-project-bucket/datasets/v2/output"), - - # Azure Blob Storage with container and account - ("abfs://data@myaccount.dfs.core.windows.net/projects/dedup/cache", - "abfs://data@myaccount.dfs.core.windows.net/projects/dedup/output"), - - # HTTPS endpoints - ("https://storage.example.com/api/v1/cache", - "https://storage.example.com/api/v1/output"), - ] - - for cache_path, output_path in complex_cases: - # Test the path joining patterns from semantic.py - embeddings_path = posixpath.join(cache_path, "embeddings") - duplicates_path = posixpath.join(output_path, "duplicates") - state_file = posixpath.join(output_path, "semantic_id_generator.json") - - # Verify paths maintain their cloud structure - assert embeddings_path.startswith(cache_path), f"Embeddings path should start with cache: {embeddings_path}" - assert duplicates_path.startswith(output_path), f"Duplicates path should start with output: {duplicates_path}" - assert state_file.startswith(output_path), f"State file should start with output: {state_file}" - - # Verify no path corruption - protocol_count = cache_path.count("://") - assert embeddings_path.count("://") == protocol_count, f"Protocol corruption in embeddings: {embeddings_path}" - assert duplicates_path.count("://") == protocol_count, f"Protocol corruption in duplicates: {duplicates_path}" - - -class TestDownloadModuleSpecificIssues: - """Test specific issues found in download modules.""" - - def test_arxiv_download_path_operations(self): - """ - Test patterns from nemo_curator/stages/text/download/arxiv/ files. - - Found patterns like: - - download_dir = os.path.split(file_path)[0] - - bname = os.path.split(file_path)[-1] - - os.path.splitext(os.path.split(item)[-1])[0] - """ - # Problematic patterns from arxiv download files - def extract_download_info_old_way(file_path): - """Original patterns from arxiv download.""" - download_dir = os.path.split(file_path)[0] - bname = os.path.split(file_path)[-1] - name_without_ext = os.path.splitext(os.path.split(file_path)[-1])[0] - return download_dir, bname, name_without_ext - - # Cloud-compatible version - def extract_download_info_cloud_way(file_path): - """Cloud-compatible patterns.""" - download_dir = posixpath.dirname(file_path) - bname = posixpath.basename(file_path) - name_without_ext = posixpath.splitext(posixpath.basename(file_path))[0] - return download_dir, bname, name_without_ext - - test_file_paths = [ - "s3://arxiv-bucket/src/papers/2023/paper.tar.gz", - "gs://arxiv-mirror/papers/math/0601001.pdf", - "abfs://papers@storage.dfs.core.windows.net/cs/0601001.tar", - "/local/path/papers/paper.pdf", # Local paths should still work - ] - - for file_path in test_file_paths: - old_dir, old_name, old_name_no_ext = extract_download_info_old_way(file_path) - new_dir, new_name, new_name_no_ext = extract_download_info_cloud_way(file_path) - - # Results should be the same on POSIX systems - assert old_dir == new_dir, f"Directory extraction differs: {old_dir} != {new_dir}" - assert old_name == new_name, f"Filename extraction differs: {old_name} != {new_name}" - assert old_name_no_ext == new_name_no_ext, f"Name without ext differs: {old_name_no_ext} != {new_name_no_ext}" - - # Verify cloud URI structure is maintained in directory - if "://" in file_path: - assert "://" in new_dir, f"Protocol should be preserved in directory: {new_dir}" - assert "://" not in new_name, f"Protocol should not be in filename: {new_name}" - - def test_common_crawl_warc_iterator_patterns(self): - """ - Test patterns that might be found in WARC iterator modules. - - These often use pathlib.Path which can cause issues with cloud URIs. - """ - # Test that we handle path-like operations correctly for cloud URIs - cloud_warc_paths = [ - "s3://commoncrawl/crawl-data/CC-MAIN-2023-06/segments/warc.gz", - "gs://commoncrawl-mirror/2023/warc-files/segment.warc.gz", - "abfs://crawldata@storage.dfs.core.windows.net/warc/file.warc.gz", - ] - - for warc_path in cloud_warc_paths: - # Simulate operations that might be done on WARC paths - - # Extract directory (for organizing downloaded files) - directory = posixpath.dirname(warc_path) - assert directory.startswith(warc_path.split("://")[0] + "://"), f"Directory should maintain protocol: {directory}" - - # Extract filename (for local storage naming) - filename = posixpath.basename(warc_path) - assert "://" not in filename, f"Filename should not contain protocol: {filename}" - assert filename.endswith(".warc.gz") or filename.endswith(".gz"), f"Should preserve file extension: {filename}" - - # Extract name without extension (for processing logic) - name_part = posixpath.splitext(filename)[0] - if name_part.endswith(".warc"): - name_part = posixpath.splitext(name_part)[0] - - assert len(name_part) > 0, f"Should extract meaningful name: {name_part}" - assert "." not in name_part or name_part.count(".") < filename.count("."), f"Should remove extensions: {name_part}" - - -class TestGeneralCloudCompatibilityPatterns: - """Test general patterns that should be avoided in favor of fsspec.""" - - def test_problematic_os_operations_on_cloud_uris(self): - """Test operations that definitely don't work with cloud URIs.""" - cloud_uris = [ - "s3://bucket/file.txt", - "gs://bucket/data.json", - "abfs://container@account.dfs.core.windows.net/file.parquet" - ] - - for uri in cloud_uris: - # These operations should NOT work with cloud URIs - # (Testing that they fail as expected) - - # File existence - os.path.exists returns False for cloud URIs - assert os.path.exists(uri) is False, f"os.path.exists should be False for {uri}" - - # File size - os.path.getsize should fail - try: - os.path.getsize(uri) - assert False, f"os.path.getsize should fail for {uri}" - except (OSError, FileNotFoundError): - pass # Expected behavior - - # Directory check - os.path.isdir should be False - assert os.path.isdir(uri) is False, f"os.path.isdir should be False for {uri}" - - # File check - os.path.isfile should be False - assert os.path.isfile(uri) is False, f"os.path.isfile should be False for {uri}" - - def test_recommended_fsspec_patterns(self): - """Test the recommended fsspec patterns for cloud compatibility.""" - from unittest.mock import Mock - - cloud_uris = [ - "s3://bucket/file.txt", - "gs://bucket/data.json", - "abfs://container@account.dfs.core.windows.net/file.parquet" - ] - - for uri in cloud_uris: - # Mock fsspec filesystem - mock_fs = Mock() - mock_fs.exists.return_value = True - mock_fs.isdir.return_value = False - mock_fs.isfile.return_value = True - mock_fs.size.return_value = 12345 - - # These operations SHOULD work with fsspec - assert mock_fs.exists(uri) is True, f"fs.exists should work for {uri}" - assert mock_fs.isfile(uri) is True, f"fs.isfile should work for {uri}" - assert mock_fs.size(uri) == 12345, f"fs.size should work for {uri}" - - # Mock file operations - mock_file = Mock() - mock_file.read.return_value = b"test content" - mock_fs.open.return_value.__enter__ = Mock(return_value=mock_file) - mock_fs.open.return_value.__exit__ = Mock(return_value=None) - - # File reading should work - with mock_fs.open(uri, "rb") as f: - content = f.read() - assert content == b"test content", f"fs.open should work for {uri}" \ No newline at end of file From 7dc61111d1561097e10d1a473ae07d03a0397b61 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 22:59:11 +0000 Subject: [PATCH 4/5] Fix ruff linting issues in cloud compatibility fixes Co-authored-by: VibhuJawa <4837571+VibhuJawa@users.noreply.github.com> --- .../stages/text/deduplication/semantic.py | 1 - .../stages/text/download/base/download.py | 8 +- .../stages/text/download/base/iterator.py | 1 - .../stages/text/filters/fasttext_filter.py | 2 - nemo_curator/stages/text/utils/text_utils.py | 1 - .../text/test_cloud_compatibility_fixes.py | 80 +++++++++---------- 6 files changed, 42 insertions(+), 51 deletions(-) diff --git a/nemo_curator/stages/text/deduplication/semantic.py b/nemo_curator/stages/text/deduplication/semantic.py index 416d4f06b..7361fc4ad 100644 --- a/nemo_curator/stages/text/deduplication/semantic.py +++ b/nemo_curator/stages/text/deduplication/semantic.py @@ -21,7 +21,6 @@ 3. Optional duplicate removal based on identified duplicates """ -import os import posixpath import time from dataclasses import dataclass, field diff --git a/nemo_curator/stages/text/download/base/download.py b/nemo_curator/stages/text/download/base/download.py index 603977339..4f1e1fa65 100644 --- a/nemo_curator/stages/text/download/base/download.py +++ b/nemo_curator/stages/text/download/base/download.py @@ -46,7 +46,7 @@ def __init__(self, download_dir: str, verbose: bool = False): def _check_s5cmd_installed(self) -> bool: """Check if s5cmd is installed.""" try: - subprocess.run(["s5cmd", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) # noqa: S603, S607 + subprocess.run(["s5cmd", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) # noqa: S607 except FileNotFoundError: return False else: @@ -96,7 +96,7 @@ def download(self, url: str) -> str | None: # Use fsspec for cloud-compatible file operations fs, _ = fsspec.core.url_to_fs(output_file) - + # If final file exists and is non-empty, assume it's complete if fs.exists(output_file): try: @@ -106,7 +106,7 @@ def download(self, url: str) -> str | None: if self._verbose: logger.info(f"File: {output_file} exists. Not downloading") return output_file - except Exception: + except (OSError, KeyError, ValueError): # If we can't get file info, proceed with download pass @@ -122,7 +122,7 @@ def download(self, url: str) -> str | None: file_info = fs.info(output_file) file_size = file_info.get("size", 0) logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)") - except Exception: + except (OSError, KeyError, ValueError): logger.info(f"Successfully downloaded to {output_file}") return output_file else: diff --git a/nemo_curator/stages/text/download/base/iterator.py b/nemo_curator/stages/text/download/base/iterator.py index d6a8e3e17..e4b7aecc5 100644 --- a/nemo_curator/stages/text/download/base/iterator.py +++ b/nemo_curator/stages/text/download/base/iterator.py @@ -12,7 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os import posixpath from abc import ABC, abstractmethod from collections.abc import Iterator diff --git a/nemo_curator/stages/text/filters/fasttext_filter.py b/nemo_curator/stages/text/filters/fasttext_filter.py index 53d24a8cf..79df4b5fe 100644 --- a/nemo_curator/stages/text/filters/fasttext_filter.py +++ b/nemo_curator/stages/text/filters/fasttext_filter.py @@ -12,8 +12,6 @@ # See the License for the specific language governing permissions and # limitations under the License. -import os - import fasttext import fsspec import numpy as np diff --git a/nemo_curator/stages/text/utils/text_utils.py b/nemo_curator/stages/text/utils/text_utils.py index 1f920e22d..ec530b65d 100644 --- a/nemo_curator/stages/text/utils/text_utils.py +++ b/nemo_curator/stages/text/utils/text_utils.py @@ -13,7 +13,6 @@ # limitations under the License. import ast -import os import posixpath import string import tokenize diff --git a/tests/stages/text/test_cloud_compatibility_fixes.py b/tests/stages/text/test_cloud_compatibility_fixes.py index b9bc484c5..bbf56d42d 100644 --- a/tests/stages/text/test_cloud_compatibility_fixes.py +++ b/tests/stages/text/test_cloud_compatibility_fixes.py @@ -23,8 +23,6 @@ from io import StringIO from unittest.mock import Mock, patch -import pytest - from nemo_curator.stages.text.utils.text_utils import get_docstrings @@ -42,10 +40,10 @@ def example_function(): mock_file = StringIO(source_code) # Simulate a cloud URI filename mock_file.name = "s3://bucket/path/to/script.py" - + # This should work without errors using posixpath operations result = get_docstrings(mock_file) - + # Verify the result contains the expected docstring assert len(result) > 0 assert "This is a docstring." in str(result) @@ -59,7 +57,7 @@ def test_get_docstrings_filename_extraction_patterns(self): ("https://example.com/api/v1/source.py", "source"), ("/local/path/local_file.py", "local_file"), # Local files should still work ] - + for uri, expected_module_name in test_cases: # Test the pattern that's now used in the fixed code module_name = posixpath.splitext(posixpath.basename(uri))[0] @@ -74,14 +72,14 @@ def test_path_construction_patterns(self): # Test the patterns now used in the fixed semantic.py base_path = "s3://bucket/cache" output_path = "gs://bucket/output" - + # These are the patterns now used in the fixed code embeddings_path = posixpath.join(base_path, "embeddings") semantic_dedup_path = posixpath.join(base_path, "semantic_dedup") duplicates_path = posixpath.join(output_path, "duplicates") deduplicated_path = posixpath.join(output_path, "deduplicated") state_file = posixpath.join(output_path, "semantic_id_generator.json") - + # Verify the paths are constructed correctly assert embeddings_path == "s3://bucket/cache/embeddings" assert semantic_dedup_path == "s3://bucket/cache/semantic_dedup" @@ -96,7 +94,7 @@ def test_complex_cloud_uri_handling(self): "gs://another-bucket/data/2024/01/15/", "abfs://container@storage.dfs.core.windows.net/datasets/processed/", ] - + for base_uri in test_uris: # Test subdirectory creation patterns subdir = posixpath.join(base_uri, "embeddings") @@ -107,8 +105,8 @@ def test_complex_cloud_uri_handling(self): class TestDownloadCloudFixes: """Test cloud compatibility fixes in download modules.""" - @patch('fsspec.core.url_to_fs') - def test_download_file_operations(self, mock_url_to_fs): + @patch("fsspec.core.url_to_fs") + def test_download_file_operations(self, mock_url_to_fs: Mock) -> None: """Test that download operations use fsspec for cloud URIs.""" # Mock fsspec filesystem mock_fs = Mock() @@ -116,23 +114,24 @@ def test_download_file_operations(self, mock_url_to_fs): mock_fs.exists.return_value = True mock_fs.info.return_value = {"size": 1024} mock_url_to_fs.return_value = (mock_fs, "bucket/path") - + # Import after patching to ensure mock is used from nemo_curator.stages.text.download.base.download import DocumentDownloader - + # Create a concrete subclass for testing class TestDownloader(DocumentDownloader): - def _get_output_filename(self, url: str) -> str: + def _get_output_filename(self, _url: str) -> str: return "test_file.txt" - - def _download_to_path(self, url: str, path: str) -> tuple[bool, str]: + + def _download_to_path(self, _url: str, _path: str) -> tuple[bool, str]: return True, "" - + # Test cloud URI download directory cloud_download_dir = "s3://test-bucket/downloads/" downloader = TestDownloader(cloud_download_dir) - - # Verify fsspec was called for directory creation + + # Verify the downloader was created and fsspec was called for directory creation + assert downloader is not None mock_url_to_fs.assert_called() mock_fs.makedirs.assert_called_with(cloud_download_dir, exist_ok=True) @@ -144,7 +143,7 @@ def test_filename_extraction_from_cloud_paths(self): ("abfs://container@account.dfs.core.windows.net/files/archive.zip", "archive.zip"), ("https://example.com/downloads/data.json", "data.json"), ] - + for cloud_path, expected_filename in test_cases: # Test the pattern now used in the fixed iterator.py filename = posixpath.basename(cloud_path) @@ -154,44 +153,44 @@ def test_filename_extraction_from_cloud_paths(self): class TestFilterCloudFixes: """Test cloud compatibility fixes in filter modules.""" - @patch('fsspec.core.url_to_fs') - def test_fasttext_filter_model_check(self, mock_url_to_fs): + @patch("fsspec.core.url_to_fs") + def test_fasttext_filter_model_check(self, mock_url_to_fs: Mock) -> None: """Test that FastText filter uses fsspec for model file checks.""" # Mock fsspec filesystem mock_fs = Mock() mock_fs.exists.return_value = True mock_url_to_fs.return_value = (mock_fs, "bucket/path") - + # Import after patching from nemo_curator.stages.text.filters.fasttext_filter import FastTextQualityFilter - + # Test cloud URI model path cloud_model_path = "s3://models/fasttext_quality.bin" filter_instance = FastTextQualityFilter(model_path=cloud_model_path) - + # This should not raise an exception with fsspec filter_instance.model_check_or_download() - + # Verify fsspec was used mock_url_to_fs.assert_called_with(cloud_model_path) mock_fs.exists.assert_called_with(cloud_model_path) - @patch('fsspec.core.url_to_fs') - def test_heuristic_filter_cache_directory(self, mock_url_to_fs): + @patch("fsspec.core.url_to_fs") + def test_heuristic_filter_cache_directory(self, mock_url_to_fs: Mock) -> None: """Test that heuristic filter uses fsspec for cache directory creation.""" # Mock fsspec filesystem mock_fs = Mock() mock_fs.makedirs.return_value = None mock_url_to_fs.return_value = (mock_fs, "bucket/path") - + # We can't easily test the full heuristic filter due to dependencies, # but we can test the pattern directly cache_dir = "s3://bucket/cache/" - + # This is the pattern now used in the fixed code fs, _ = mock_url_to_fs(cache_dir) fs.makedirs(cache_dir, exist_ok=True) - + # Verify fsspec was used mock_url_to_fs.assert_called_with(cache_dir) mock_fs.makedirs.assert_called_with(cache_dir, exist_ok=True) @@ -208,17 +207,17 @@ def test_end_to_end_cloud_uri_patterns(self): "abfs://data@storage.dfs.core.windows.net/processed/", "https://api.example.com/v1/data/", ] - + for uri in cloud_uris: # Test path construction (semantic deduplication pattern) embeddings_path = posixpath.join(uri, "embeddings") assert embeddings_path.startswith(uri) - + # Test filename extraction (download pattern) test_file_path = posixpath.join(uri, "test_file.json") filename = posixpath.basename(test_file_path) assert filename == "test_file.json" - + # Test module name extraction (text_utils pattern) script_path = posixpath.join(uri, "script.py") module_name = posixpath.splitext(posixpath.basename(script_path))[0] @@ -232,12 +231,12 @@ def test_backward_compatibility_with_local_paths(self): "../relative/path/", "simple_filename.txt", ] - + for path in local_paths: # All the fixed patterns should work with local paths too subpath = posixpath.join(path, "subdir") filename = posixpath.basename(subpath) - + # These operations should succeed without errors assert isinstance(subpath, str) assert isinstance(filename, str) @@ -250,12 +249,9 @@ def test_error_handling_for_invalid_uris(self): "s3://", # Missing bucket "gs:///no-bucket", ] - + for uri in invalid_uris: # The posixpath operations should not crash on invalid URIs - try: - result = posixpath.basename(uri) - assert isinstance(result, str) - except Exception: - # If an exception occurs, it should be a reasonable one - pass \ No newline at end of file + # and should handle invalid URIs gracefully + result = posixpath.basename(uri) + assert isinstance(result, str) From 903688e6f40283c603307d5d84a3770c6b6f93df Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Fri, 5 Sep 2025 23:07:19 +0000 Subject: [PATCH 5/5] Simplify exception handling in download.py based on review feedback Co-authored-by: VibhuJawa <4837571+VibhuJawa@users.noreply.github.com> --- .../stages/text/download/base/download.py | 19 +++++++++---------- 1 file changed, 9 insertions(+), 10 deletions(-) diff --git a/nemo_curator/stages/text/download/base/download.py b/nemo_curator/stages/text/download/base/download.py index 4f1e1fa65..6834efc52 100644 --- a/nemo_curator/stages/text/download/base/download.py +++ b/nemo_curator/stages/text/download/base/download.py @@ -99,16 +99,12 @@ def download(self, url: str) -> str | None: # If final file exists and is non-empty, assume it's complete if fs.exists(output_file): - try: - file_info = fs.info(output_file) - file_size = file_info.get("size", 0) - if file_size > 0: - if self._verbose: - logger.info(f"File: {output_file} exists. Not downloading") - return output_file - except (OSError, KeyError, ValueError): - # If we can't get file info, proceed with download - pass + file_info = fs.info(output_file) + file_size = file_info.get("size", 0) + if file_size > 0: + if self._verbose: + logger.info(f"File: {output_file} exists. Not downloading") + return output_file # Download to temporary file success, error_message = self._download_to_path(url, temp_file) @@ -117,13 +113,16 @@ def download(self, url: str) -> str | None: # Download successful, atomically move temp file to final location os.rename(temp_file, output_file) if self._verbose: + # Try to get file size for logging, but don't fail if we can't try: fs, _ = fsspec.core.url_to_fs(output_file) file_info = fs.info(output_file) file_size = file_info.get("size", 0) logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)") except (OSError, KeyError, ValueError): + # If we can't get file size, just log without size logger.info(f"Successfully downloaded to {output_file}") + logger.debug(f"Could not retrieve file size for {output_file}") return output_file else: # Download failed