Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
35 commits
Select commit Hold shift + click to select a range
3cf4c86
Add S3 download and upload mappers for distributed processing
kyo-tom Dec 4, 2025
8d6f3d9
feat: test s3 download mapper
Dec 24, 2025
8f42978
feat: resume file download
Dec 24, 2025
f16e036
feat: s3 download test
Dludora Dec 25, 2025
cd9cbb0
Merge branch 'datajuicer:main' into pr-839
Dludora Dec 25, 2025
4f6983b
fix: pre-commit
Dludora Dec 25, 2025
4041ed8
feat: test upload
Dludora Dec 25, 2025
2a3b25c
Merge branch 'pr-839' of github.com:Dludora/data-juicer into pr-839
Dludora Dec 25, 2025
82a513d
feat: unittest 4 s3 upload
Dludora Dec 25, 2025
2fc184e
fix: s3 demo dataset config
Dludora Dec 25, 2025
7a7b26a
feat: timeout config 4 s3
Dludora Dec 25, 2025
6ba0c8b
style: pre-commit
Dludora Dec 25, 2025
b54eb3a
feat: support hdfs and iceberg
Dludora Dec 28, 2025
06f034a
feat: add load data from hdfs source
Dec 29, 2025
c4c7f0b
feat: demo for hdfs load
Dec 29, 2025
37db9bf
feat: read iceberg file
Dludora Dec 29, 2025
1368eaf
feat: iceberg read
Dludora Dec 30, 2025
f4856b6
feat: process iceberg and hdfs
Dludora Dec 30, 2025
9f8a551
refractor: secret
Dludora Dec 30, 2025
2b3b620
feat: export iceberg and others
Dec 30, 2025
4a24682
feat: write iceberg
Dludora Dec 31, 2025
ebd23d2
refractor: move fs
Dludora Dec 31, 2025
12a4524
refractor: move fs
Dludora Dec 31, 2025
2744914
feat: delta and hudi
Dludora Dec 31, 2025
79b3e04
refractor: restore file_utils
Dludora Jan 5, 2026
cc7b07b
refractor: add Any type
Dludora Jan 5, 2026
b4c698c
feat: fallback ray_expoeter
Dludora Jan 5, 2026
9d78a7a
Merge remote-tracking branch 'upstream/main'
Dludora Jan 5, 2026
087941c
fix: pyproject
Dludora Jan 5, 2026
9d18059
style: code style check
Dludora Jan 5, 2026
2b00595
restore: ray_executor s3
Dludora Jan 6, 2026
75457da
Merge remote-tracking branch 'origin/main' into pr-839
Dludora Jan 6, 2026
185be51
fix: tests/config/test_config.py SpecifiedFieldFilter OP python 3.11+…
Dludora Jan 6, 2026
aea124c
Merge remote-tracking branch 'origin/main' into pr-839
Dludora Jan 6, 2026
c261c29
Merge remote-tracking branch 'upstream/main' into pr-839
Dludora Jan 6, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
386 changes: 386 additions & 0 deletions data_juicer/core/data/load_strategy.py

Large diffs are not rendered by default.

126 changes: 89 additions & 37 deletions data_juicer/core/ray_exporter.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import copy
import os
from functools import partial

Expand All @@ -6,6 +7,7 @@
from data_juicer.utils.constant import Fields, HashKeys
from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str
from data_juicer.utils.model_utils import filter_arguments
from data_juicer.utils.s3_utils import create_filesystem_from_args
from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format


Expand All @@ -22,6 +24,7 @@ class RayExporter:
"tfrecords",
"webdataset",
"lance",
"iceberg",
# 'images',
# 'numpy',
}
Expand Down Expand Up @@ -51,37 +54,27 @@ def __init__(
self.export_shard_size = export_shard_size
self.keep_stats_in_res_ds = keep_stats_in_res_ds
self.keep_hashes_in_res_ds = keep_hashes_in_res_ds
self.export_format = self._get_export_format(export_path) if export_type is None else export_type

if export_type:
self.export_format = export_type
elif export_path:
self.export_format = self._get_export_format(export_path)
else:
raise ValueError("Either export_path or export_type should be provided.")
if self.export_format not in self._SUPPORTED_FORMATS:
raise NotImplementedError(
f'export data format "{self.export_format}" is not supported '
f"for now. Only support {self._SUPPORTED_FORMATS}. Please check export_type or export_path."
)
self.export_extra_args = kwargs if kwargs is not None else {}

# Check if export_path is S3 and create filesystem if needed
self.s3_filesystem = None
if export_path.startswith("s3://"):
# Extract AWS credentials from export_extra_args (if provided)
s3_config = {}
if "aws_access_key_id" in self.export_extra_args:
s3_config["aws_access_key_id"] = self.export_extra_args.pop("aws_access_key_id")
if "aws_secret_access_key" in self.export_extra_args:
s3_config["aws_secret_access_key"] = self.export_extra_args.pop("aws_secret_access_key")
if "aws_session_token" in self.export_extra_args:
s3_config["aws_session_token"] = self.export_extra_args.pop("aws_session_token")
if "aws_region" in self.export_extra_args:
s3_config["aws_region"] = self.export_extra_args.pop("aws_region")
if "endpoint_url" in self.export_extra_args:
s3_config["endpoint_url"] = self.export_extra_args.pop("endpoint_url")

# Create PyArrow S3FileSystem with credentials
# This matches the pattern used in RayS3DataLoadStrategy
from data_juicer.utils.s3_utils import create_pyarrow_s3_filesystem

self.s3_filesystem = create_pyarrow_s3_filesystem(s3_config)
logger.info(f"Detected S3 export path: {export_path}. S3 filesystem configured.")
fs_args = copy.deepcopy(self.export_extra_args)
self.fs = create_filesystem_from_args(export_path, fs_args)
self._check_shard_size()

def _check_shard_size(self):
if self.export_shard_size == 0:
return
self.max_shard_size_str = ""

# get the string format of shard size
Expand Down Expand Up @@ -149,22 +142,30 @@ def _export_impl(self, dataset, export_path, columns=None):
if len(removed_fields):
dataset = dataset.drop_columns(removed_fields)

export_method = RayExporter._router()[self.export_format]
router = self._router()
if self.export_format in router:
export_method = router[self.export_format]
else:
export_method = RayExporter.write_others

export_kwargs = {
"export_extra_args": self.export_extra_args,
"export_format": self.export_format,
}
# Add S3 filesystem if available
if self.s3_filesystem is not None:
export_kwargs["export_extra_args"]["filesystem"] = self.s3_filesystem
# Add filesystem if available
if self.fs is not None:
export_kwargs["export_extra_args"]["filesystem"] = self.fs

if self.export_shard_size > 0:
# compute the min_rows_per_file for export methods
dataset_nbytes = dataset.size_bytes()
dataset_num_rows = dataset.count()
num_shards = int(dataset_nbytes / self.export_shard_size) + 1
num_shards = min(num_shards, dataset_num_rows)
rows_per_file = int(dataset_num_rows / num_shards)
export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file

if dataset_num_rows > 0:
num_shards = int(dataset_nbytes / self.export_shard_size) + 1
num_shards = min(num_shards, dataset_num_rows)
rows_per_file = max(1, int(dataset_num_rows / num_shards))
export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file

return export_method(dataset, export_path, **export_kwargs)

def export(self, dataset, columns=None):
Expand Down Expand Up @@ -236,7 +237,61 @@ def write_others(dataset, export_path, **kwargs):
# Add S3 filesystem if available
if "filesystem" in export_extra_args:
filtered_kwargs["filesystem"] = export_extra_args["filesystem"]
return write_method(export_path, **filtered_kwargs)
if export_path:
return write_method(export_path, **filtered_kwargs)
else:
return write_method(**filtered_kwargs)

@staticmethod
def write_iceberg(dataset, export_path, **kwargs):
"""
Export method for iceberg target tables.
Checks for table existence/connectivity. If check fails, safe fall-back to JSON.
"""
from pyiceberg.catalog import load_catalog
from pyiceberg.exceptions import NoSuchTableError

export_extra_args = kwargs.get("export_extra_args", {})
catalog_kwargs = export_extra_args.get("catalog_kwargs", {})
table_identifier = export_extra_args.get("table_identifier", export_path)

use_iceberg = False

try:
catalog = load_catalog(**catalog_kwargs)
catalog.load_table(table_identifier)
logger.info(f"Iceberg table {table_identifier} exists. Writing to Iceberg.")
use_iceberg = True

except NoSuchTableError as e:
logger.warning(
f"Iceberg target unavailable ({e.__class__.__name__}). Fallback to exporting to {export_path}..."
)
except Exception as e:
logger.error(f"Unexpected error checking Iceberg: {e}. Fallback to exporting to {export_path}...")

if use_iceberg:
try:
filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args)
return dataset.write_iceberg(table_identifier, **filtered_kwargs)
except Exception as e:
logger.error(f"Write to Iceberg failed during execution: {e}. Fallback to json...")

suffix = os.path.splitext(export_path)[-1].strip(".").lower()
if not suffix:
suffix = "jsonl"
logger.warning(f"No suffix found in {export_path}, using default fallback: {suffix}")

logger.info(f"Falling back to file export. Format: [{suffix}], Path: [{export_path}]")

fallback_kwargs = {}
if "filesystem" in export_extra_args:
fallback_kwargs["filesystem"] = export_extra_args["filesystem"]
if suffix in ["json", "jsonl"]:
return RayExporter.write_json(dataset, export_path, **fallback_kwargs)
else:
fallback_kwargs["export_format"] = suffix
return RayExporter.write_others(dataset, export_path, **fallback_kwargs)

# suffix to export method
@staticmethod
Expand All @@ -250,8 +305,5 @@ def _router():
"jsonl": RayExporter.write_json,
"json": RayExporter.write_json,
"webdataset": RayExporter.write_webdataset,
"parquet": RayExporter.write_others,
"csv": RayExporter.write_others,
"tfrecords": RayExporter.write_others,
"lance": RayExporter.write_others,
"iceberg": RayExporter.write_iceberg,
}
4 changes: 4 additions & 0 deletions data_juicer/ops/mapper/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,8 @@
RemoveWordsWithIncorrectSubstringsMapper,
)
from .replace_content_mapper import ReplaceContentMapper
from .s3_download_file_mapper import S3DownloadFileMapper
from .s3_upload_file_mapper import S3UploadFileMapper
from .sdxl_prompt2prompt_mapper import SDXLPrompt2PromptMapper
from .sentence_augmentation_mapper import SentenceAugmentationMapper
from .sentence_split_mapper import SentenceSplitMapper
Expand Down Expand Up @@ -173,6 +175,8 @@
"RemoveTableTextMapper",
"RemoveWordsWithIncorrectSubstringsMapper",
"ReplaceContentMapper",
"S3DownloadFileMapper",
"S3UploadFileMapper",
"SDXLPrompt2PromptMapper",
"SentenceAugmentationMapper",
"SentenceSplitMapper",
Expand Down
Loading