Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 6 additions & 6 deletions nemo_curator/stages/text/deduplication/semantic.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
3. Optional duplicate removal based on identified duplicates
"""

import os
import posixpath
import time
from dataclasses import dataclass, field
from typing import Any, Literal
Expand Down Expand Up @@ -179,14 +179,14 @@ def __post_init__(self):
self.cache_path = self.cache_path or self.output_path

# Intermediate paths
self.embeddings_path = os.path.join(self.cache_path, "embeddings")
self.semantic_dedup_path = os.path.join(self.cache_path, "semantic_dedup")
self.embeddings_path = posixpath.join(self.cache_path, "embeddings")
self.semantic_dedup_path = posixpath.join(self.cache_path, "semantic_dedup")
# Output paths
self.duplicates_path = None if self.eps is None else os.path.join(self.output_path, "duplicates")
self.duplicates_path = None if self.eps is None else posixpath.join(self.output_path, "duplicates")
self.deduplicated_output_path = (
None if not self.perform_removal else os.path.join(self.output_path, "deduplicated")
None if not self.perform_removal else posixpath.join(self.output_path, "deduplicated")
)
self.id_generator_state_file = os.path.join(self.output_path, "semantic_id_generator.json")
self.id_generator_state_file = posixpath.join(self.output_path, "semantic_id_generator.json")

self._validate_config()

Expand Down
36 changes: 27 additions & 9 deletions nemo_curator/stages/text/download/base/download.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,13 @@
# limitations under the License.

import os
import posixpath
import subprocess
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Any

import fsspec
from loguru import logger

from nemo_curator.stages.base import ProcessingStage
Expand All @@ -37,12 +39,14 @@
"""
self._download_dir = download_dir
self._verbose = verbose
os.makedirs(download_dir, exist_ok=True)
# Use fsspec for cloud-compatible directory creation
fs, _ = fsspec.core.url_to_fs(download_dir)
fs.makedirs(download_dir, exist_ok=True)

def _check_s5cmd_installed(self) -> bool:
"""Check if s5cmd is installed."""
try:
subprocess.run(["s5cmd", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) # noqa: S603, S607
subprocess.run(["s5cmd", "version"], stdout=subprocess.DEVNULL, stderr=subprocess.DEVNULL, check=False) # noqa: S607

Check failure on line 49 in nemo_curator/stages/text/download/base/download.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (S603)

nemo_curator/stages/text/download/base/download.py:49:13: S603 `subprocess` call: check for execution of untrusted input

Check failure on line 49 in nemo_curator/stages/text/download/base/download.py

View workflow job for this annotation

GitHub Actions / ruff

Ruff (S603)

nemo_curator/stages/text/download/base/download.py:49:13: S603 `subprocess` call: check for execution of untrusted input
except FileNotFoundError:
return False
else:
Expand Down Expand Up @@ -87,14 +91,20 @@
"""
# Generate output filename
output_name = self._get_output_filename(url)
output_file = os.path.join(self._download_dir, output_name)
output_file = posixpath.join(self._download_dir, output_name)
temp_file = output_file + ".tmp"

# Use fsspec for cloud-compatible file operations
fs, _ = fsspec.core.url_to_fs(output_file)

# If final file exists and is non-empty, assume it's complete
if os.path.exists(output_file) and os.path.getsize(output_file) > 0:
if self._verbose:
logger.info(f"File: {output_file} exists. Not downloading")
return output_file
if fs.exists(output_file):
file_info = fs.info(output_file)
file_size = file_info.get("size", 0)
if file_size > 0:
if self._verbose:
logger.info(f"File: {output_file} exists. Not downloading")
return output_file

# Download to temporary file
success, error_message = self._download_to_path(url, temp_file)
Expand All @@ -103,8 +113,16 @@
# Download successful, atomically move temp file to final location
os.rename(temp_file, output_file)
if self._verbose:
file_size = os.path.getsize(output_file)
logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)")
# Try to get file size for logging, but don't fail if we can't
try:
fs, _ = fsspec.core.url_to_fs(output_file)
file_info = fs.info(output_file)
file_size = file_info.get("size", 0)
logger.info(f"Successfully downloaded to {output_file} ({file_size} bytes)")
except (OSError, KeyError, ValueError):
# If we can't get file size, just log without size
logger.info(f"Successfully downloaded to {output_file}")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

But it wasn't successful in this case?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@copilot , please fix

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch! Fixed the misleading message - now it clearly logs "Successfully downloaded" even when we can't get the file size, and adds a debug message explaining why the size couldn't be retrieved. Fixed in commit 903688e.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed the misleading message in commit 903688e - now it clearly logs "Successfully downloaded" even when we can't get the file size, and adds a debug message explaining why the size couldn't be retrieved.

logger.debug(f"Could not retrieve file size for {output_file}")
return output_file
else:
# Download failed
Expand Down
6 changes: 3 additions & 3 deletions nemo_curator/stages/text/download/base/iterator.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import posixpath
from abc import ABC, abstractmethod
from collections.abc import Iterator
from dataclasses import dataclass
Expand Down Expand Up @@ -89,8 +89,8 @@ def process(self, task: FileGroupTask) -> DocumentBatch:
if self.record_limit and record_count >= self.record_limit:
break
if self.add_filename_column:
# TODO: Support cloud storage https://github.com/NVIDIA-NeMo/Curator/issues/779
record_dict[self.filename_col] = os.path.basename(file_path) # type: ignore[reportReturnType]
# Use posixpath for cloud storage compatibility
record_dict[self.filename_col] = posixpath.basename(file_path) # type: ignore[reportReturnType]
records.append(record_dict)
record_count += 1

Expand Down
9 changes: 5 additions & 4 deletions nemo_curator/stages/text/filters/fasttext_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,8 @@
# See the License for the specific language governing permissions and
# limitations under the License.

import os

import fasttext
import fsspec
import numpy as np

from nemo_curator.stages.text.filters.doc_filter import DocumentFilter
Expand All @@ -32,7 +31,8 @@ def __init__(self, model_path: str | None = None, label: str = "__label__hq", al
self._name = "fasttext_quality_filter"

def model_check_or_download(self) -> None:
if not os.path.exists(self._model_path):
fs, _ = fsspec.core.url_to_fs(self._model_path)
if not fs.exists(self._model_path):
msg = f"Model file {self._model_path} not found"
raise FileNotFoundError(msg)

Expand Down Expand Up @@ -66,7 +66,8 @@ def __init__(self, model_path: str | None = None, min_langid_score: float = 0.3)
self._name = "lang_id"

def model_check_or_download(self) -> None:
if not os.path.exists(self._model_path):
fs, _ = fsspec.core.url_to_fs(self._model_path)
if not fs.exists(self._model_path):
msg = f"Model file {self._model_path} not found"
raise FileNotFoundError(msg)

Expand Down
4 changes: 3 additions & 1 deletion nemo_curator/stages/text/filters/heuristic_filter.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import tarfile
from typing import Literal

import fsspec
import huggingface_hub
import requests
from platformdirs import user_cache_dir
Expand Down Expand Up @@ -789,7 +790,8 @@ def _download_histograms(self) -> None:
raise requests.exceptions.RequestException(msg)

# Open a file to write the content
os.makedirs(self._cache_dir, exist_ok=True)
fs, _ = fsspec.core.url_to_fs(self._cache_dir)
fs.makedirs(self._cache_dir, exist_ok=True)
download_dest_path = os.path.join(self._cache_dir, "histograms.tar.gz")
with open(download_dest_path, "wb") as file:
file.write(response.content)
Expand Down
4 changes: 2 additions & 2 deletions nemo_curator/stages/text/utils/text_utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
# limitations under the License.

import ast
import os
import posixpath
import string
import tokenize
import warnings
Expand Down Expand Up @@ -167,7 +167,7 @@ def get_docstrings(source: str, module: str = "<string>") -> list[str]:
"""Parse Python source code from file or string and print docstrings."""
if hasattr(source, "read"):
filename = getattr(source, "name", module)
module = os.path.splitext(os.path.basename(filename))[0]
module = posixpath.splitext(posixpath.basename(filename))[0]
source = source.read()

docstrings = sorted(parse_docstrings(source), key=lambda x: (NODE_TYPES.get(type(x[0])), x[1]))
Expand Down
Loading
Loading