diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index cdd6417ed2..47d06f4e84 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -376,6 +376,105 @@ def load_data(self, **kwargs): raise NotImplementedError("ModelScope data load strategy is not implemented") +@DataLoadStrategyRegistry.register("default", "remote", "hdfs") +class DefaultHDFSDataLoadStrategy(DefaultDataLoadStrategy): + """ + data load strategy for HDFS datasets for LocalExecutor + Uses fsspec-compatible storage_options passed through huggingface datasets + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": ["host", "port", "user", "kerb_ticket", "extra_conf"], + "field_types": {"path": str}, + "custom_validators": { + "path": lambda x: x.startswith("hdfs://"), + }, + } + + def _create_hdfs_fs(self): + import pyarrow.fs as fs + + host = self.ds_config.get("host", None) + port = self.ds_config.get("port", None) + if port is not None: + port = int(port) + user = self.ds_config.get("user", None) + kerb_ticket = self.ds_config.get("kerb_ticket", None) + extra_conf = self.ds_config.get("extra_conf", None) + return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) + + def load_data(self, **kwargs): + from urllib.parse import urlparse + + from data_juicer.core.data import NestedDataset + + path = self.ds_config["path"] + load_data_np = kwargs.get("num_proc", 1) + text_keys = getattr(self.cfg, "text_keys", ["text"]) + + file_path = urlparse(path).path + file_extension = os.path.splitext(file_path)[1].lower() + file_extension_map = { + ".json": "json", + ".jsonl": "json", + ".txt": "text", + ".csv": "csv", + ".tsv": "csv", + ".parquet": "parquet", + } + data_format = file_extension_map.get(file_extension, "json") + + hdfs = self._create_hdfs_fs() + + try: + with hdfs.open_input_stream(file_path) as stream: + + # Use ray.data functions directly with PyArrow filesystem support + # Ray's read functions support filesystem parameter via PyArrow + if data_format in {"json", "jsonl"}: + # For JSON, we need to use read_json_stream with filesystem + import pyarrow.json + + arrow_table = pyarrow.json.read_json(stream) + elif data_format == "parquet": + from pyarrow.parquet import read_table + + arrow_table = read_table(stream) + elif data_format in {"csv", "tsv"}: + import pyarrow.csv + + delimiter = "\t" if file_extension == ".tsv" else "," + parse_opts = pyarrow.csv.ParseOptions(delimiter=delimiter) + arrow_table = pyarrow.csv.read_csv(stream, parse_options=parse_opts) + elif data_format == "text": + import pyarrow.csv + + read_opts = pyarrow.csv.ReadOptions(column_names=["text"]) + parse_opts = pyarrow.csv.ParseOptions(delimiter="\0", quote_char=False) + arrow_table = pyarrow.csv.read_csv(stream, read_options=read_opts, parse_options=parse_opts) + else: + raise ValueError(f"Unsupported data format for hdfs: {file_extension}") + + dataset = datasets.Dataset(arrow_table) + dataset = NestedDataset(dataset) + dataset = unify_format( + dataset, + text_keys=text_keys, + num_proc=load_data_np, + global_cfg=self.cfg, + ) + + return dataset + + except Exception as e: + raise RuntimeError( + f"Failed to load {data_format} data from HDFS path {path}. " + f"Ensure Hadoop native libs and configs are available. " + f"Error: {str(e)}" + ) + + @DataLoadStrategyRegistry.register("default", "remote", "arxiv") class DefaultArxivDataLoadStrategy(DefaultDataLoadStrategy): """ @@ -543,6 +642,105 @@ def load_data(self, **kwargs): ) +@DataLoadStrategyRegistry.register("ray", "remote", "hdfs") +class RayHDFSDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for HDFS datasets for RayExecutor + Uses PyArrow HadoopFileSystem to read from HDFS + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": ["host", "port", "user", "kerb_ticket", "extra_conf"], + "field_types": {"path": str}, + "custom_validators": { + "path": lambda x: x.startswith("hdfs://"), + }, + } + + def _create_hdfs_fs(self): + import pyarrow.fs as fs + + host = self.ds_config.get("host", None) + port = self.ds_config.get("port", None) + if port is not None: + port = int(port) + user = self.ds_config.get("user", None) + kerb_ticket = self.ds_config.get("kerb_ticket", None) + extra_conf = self.ds_config.get("extra_conf", None) + return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + path = self.ds_config["path"] + hdfs_fs = self._create_hdfs_fs() + + logger.info(f"Loading dataset from HDFS: {path}") + + file_extension_map = { + ".json": "json", + ".jsonl": "json", + ".txt": "text", + ".csv": "csv", + ".tsv": "csv", + ".parquet": "parquet", + ".npy": "numpy", + ".tfrecords": "tfrecords", + ".lance": "lance", + } + + auto_detect = False + data_source = self.ds_config.get("source", None) + if data_source is None: + auto_detect = True + else: + suffix = os.path.splitext(data_source)[1] + if suffix in file_extension_map: + data_format = file_extension_map[suffix] + elif "." + data_source in file_extension_map: + data_format = file_extension_map["." + data_source] + else: + auto_detect = True + + if auto_detect: + file_extension = os.path.splitext(path)[1] + data_format = file_extension_map.get(file_extension, "parquet") + logger.info(f"Auto-detected data format: {data_format}") + else: + logger.info(f"Using specified data format: {data_format}") + + try: + import ray.data + + if data_format in {"json", "jsonl"}: + from data_juicer.core.data.ray_dataset import read_json_stream + + dataset = read_json_stream(path, filesystem=hdfs_fs) + elif data_format == "parquet": + dataset = ray.data.read_parquet(path, filesystem=hdfs_fs) + elif data_format == "csv": + dataset = ray.data.read_csv(path, filesystem=hdfs_fs) + elif data_format == "text": + dataset = ray.data.read_text(path, filesystem=hdfs_fs) + elif data_format == "numpy": + dataset = ray.data.read_numpy(path, filesystem=hdfs_fs) + elif data_format == "tfrecords": + dataset = ray.data.read_tfrecords(path, filesystem=hdfs_fs) + elif data_format == "lance": + dataset = ray.data.read_lance(path, filesystem=hdfs_fs) + else: + raise ValueError(f"Unsupported data format for HDFS: {data_format}") + + return RayDataset(dataset, dataset_path=path, cfg=self.cfg) + except Exception as e: + raise RuntimeError( + f"Failed to load {data_format} data from HDFS path {path}. " + f"Ensure Hadoop native libs and configs are available. " + f"Error: {str(e)}" + ) + + @DataLoadStrategyRegistry.register("ray", "remote", "s3") class RayS3DataLoadStrategy(RayDataLoadStrategy): """ @@ -658,3 +856,191 @@ def load_data(self, **kwargs): f"Ensure your AWS credentials are configured. " f"Error: {str(e)}" ) + + +@DataLoadStrategyRegistry.register("default", "remote", "iceberg") +class DefaultIcebergDataLoadStrategy(DefaultDataLoadStrategy): + """ + data load strategy for Iceberg tables for LocalExecutor + Relies on pyiceberg to read the table and converts to HF Dataset + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table_identifier", "catalog_kwargs"], + "optional_fields": [], + "field_types": {"table_identifier": str, "catalog_kwargs": dict}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data import NestedDataset + + text_keys = getattr(self.cfg, "text_keys", ["text"]) + table_identifier = self.ds_config["table_identifier"] + catalog_kwargs = self.ds_config.get("catalog_kwargs", {}) or {} + try: + from pyiceberg.catalog import load_catalog + + # Load catalog with optional properties (e.g., uri, credentials) + # if props are empty, it relies on pyiceberg.yaml or env vars + catalog = load_catalog(**catalog_kwargs) + + # Load the table + table = catalog.load_table(table_identifier) + # Scan table to PyArrow Table + # Note: For very large tables on LocalExecutor, this might consume memory + # equivalent to the table size. + arrow_table = table.scan().to_arrow() + + # Convert to HF Dataset + ds = datasets.Dataset(arrow_table) + + ds = NestedDataset(ds) + ds = unify_format(ds, text_keys=text_keys, num_proc=kwargs.get("num_proc", 1), global_cfg=self.cfg) + return ds + except ImportError: + raise RuntimeError( + "pyiceberg is not installed. Please install it via `pip install pyiceberg` " + "to use Iceberg data load strategy." + ) + except Exception as e: + raise RuntimeError( + f"Failed to load Iceberg table {table_identifier}. " + f"Ensure catalog configs are correct. " + f"Error: {str(e)}" + ) + + +@DataLoadStrategyRegistry.register("ray", "remote", "iceberg") +class RayIcebergDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Iceberg tables for RayExecutor + Uses ray.data.read_iceberg + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table_identifier", "catalog_kwargs"], + "optional_fields": [], + "field_types": {"table_identifier": str, "catalog_kwargs": dict}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_identifier = self.ds_config["table_identifier"] + + logger.info("Loading Iceberg table.....") + try: + import ray.data + + # from data_juicer.utils.s3_utils import get_aws_credentials + from data_juicer.utils.model_utils import filter_arguments + + # s3_config = {} + # if "s3.access_key_id" in catalog_kwargs: + # s3_config["aws_access_key_id"] = catalog_kwargs.pop("s3.access_key_id") + # if "s3.secret_access_key" in catalog_kwargs: + # s3_config["aws_secret_access_key"] = catalog_kwargs.pop("s3.secret_access_key") + # if "s3.session_token" in catalog_kwargs: + # s3_config["aws_session_token"] = catalog_kwargs.pop("s3.session_token") + # if "s3.region" in catalog_kwargs: + # s3_config["aws_region"] = catalog_kwargs.pop("s3.region") + # if "s3.endpoint" in catalog_kwargs: + # s3_config["endpoint_url"] = catalog_kwargs.pop("s3.endpoint") + # aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region = get_aws_credentials(s3_config) + + read_config = filter_arguments(ray.data.read_iceberg, self.ds_config) + + # Ray reads the table distributedly based on the snapshots + dataset = ray.data.read_iceberg(**read_config) + + return RayDataset(dataset, dataset_path=table_identifier, cfg=self.cfg) + + except ImportError: + raise RuntimeError( + "pyiceberg is not installed. Please install it via `pip install pyiceberg` " + "to use Iceberg data load strategy in Ray." + ) + except Exception as e: + raise RuntimeError(f"Failed to load Iceberg table {table_identifier} in Ray. " f"Error: {str(e)}") + + +@DataLoadStrategyRegistry.register("ray", "remote", "delta") +class RayDeltaDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Delta Lake tables for RayExecutor + Uses ray.data.read_delta + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": [ + "aws_access_key_id", + "aws_secret_access_key", + "aws_session_token", + "aws_region", + "endpoint_url", + ], + "field_types": {"path": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_path = self.ds_config["path"] + + logger.info(f"Loading Delta Lake table from path: {table_path}") + try: + import ray.data + + from data_juicer.utils.model_utils import filter_arguments + + read_config = filter_arguments(ray.data.read_delta, self.ds_config) + + dataset = ray.data.read_delta( + **read_config, + ) + + return RayDataset(dataset, dataset_path=table_path, cfg=self.cfg) + + except Exception as e: + raise RuntimeError(f"Failed to load Delta Lake table from path {table_path} in Ray. " f"Error: {str(e)}") + + +@DataLoadStrategyRegistry.register("ray", "remote", "hudi") +class RayHudiDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Hudi tables for RayExecutor + Uses ray.data.read_hudi + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table_uri"], + "optional_fields": [], + "field_types": {"path": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_uri = self.ds_config["table_uri"] + + logger.info(f"Loading Hudi table from path: {table_uri}") + try: + import ray.data + + from data_juicer.utils.model_utils import filter_arguments + + read_config = filter_arguments(ray.data.read_hudi, self.ds_config) + + dataset = ray.data.read_hudi( + **read_config, + ) + + return RayDataset(dataset, dataset_path=table_uri, cfg=self.cfg) + + except Exception as e: + raise RuntimeError(f"Failed to load Hudi table from path {table_uri} in Ray. " f"Error: {str(e)}") diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index f0a231b0e8..bc87375fbf 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -1,3 +1,4 @@ +import copy import os from functools import partial @@ -6,6 +7,7 @@ from data_juicer.utils.constant import Fields, HashKeys from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str from data_juicer.utils.model_utils import filter_arguments +from data_juicer.utils.s3_utils import create_filesystem_from_args from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format @@ -22,6 +24,7 @@ class RayExporter: "tfrecords", "webdataset", "lance", + "iceberg", # 'images', # 'numpy', } @@ -51,7 +54,13 @@ def __init__( self.export_shard_size = export_shard_size self.keep_stats_in_res_ds = keep_stats_in_res_ds self.keep_hashes_in_res_ds = keep_hashes_in_res_ds - self.export_format = self._get_export_format(export_path) if export_type is None else export_type + + if export_type: + self.export_format = export_type + elif export_path: + self.export_format = self._get_export_format(export_path) + else: + raise ValueError("Either export_path or export_type should be provided.") if self.export_format not in self._SUPPORTED_FORMATS: raise NotImplementedError( f'export data format "{self.export_format}" is not supported ' @@ -59,29 +68,13 @@ def __init__( ) self.export_extra_args = kwargs if kwargs is not None else {} - # Check if export_path is S3 and create filesystem if needed - self.s3_filesystem = None - if export_path.startswith("s3://"): - # Extract AWS credentials from export_extra_args (if provided) - s3_config = {} - if "aws_access_key_id" in self.export_extra_args: - s3_config["aws_access_key_id"] = self.export_extra_args.pop("aws_access_key_id") - if "aws_secret_access_key" in self.export_extra_args: - s3_config["aws_secret_access_key"] = self.export_extra_args.pop("aws_secret_access_key") - if "aws_session_token" in self.export_extra_args: - s3_config["aws_session_token"] = self.export_extra_args.pop("aws_session_token") - if "aws_region" in self.export_extra_args: - s3_config["aws_region"] = self.export_extra_args.pop("aws_region") - if "endpoint_url" in self.export_extra_args: - s3_config["endpoint_url"] = self.export_extra_args.pop("endpoint_url") - - # Create PyArrow S3FileSystem with credentials - # This matches the pattern used in RayS3DataLoadStrategy - from data_juicer.utils.s3_utils import create_pyarrow_s3_filesystem - - self.s3_filesystem = create_pyarrow_s3_filesystem(s3_config) - logger.info(f"Detected S3 export path: {export_path}. S3 filesystem configured.") + fs_args = copy.deepcopy(self.export_extra_args) + self.fs = create_filesystem_from_args(export_path, fs_args) + self._check_shard_size() + def _check_shard_size(self): + if self.export_shard_size == 0: + return self.max_shard_size_str = "" # get the string format of shard size @@ -149,22 +142,30 @@ def _export_impl(self, dataset, export_path, columns=None): if len(removed_fields): dataset = dataset.drop_columns(removed_fields) - export_method = RayExporter._router()[self.export_format] + router = self._router() + if self.export_format in router: + export_method = router[self.export_format] + else: + export_method = RayExporter.write_others + export_kwargs = { "export_extra_args": self.export_extra_args, "export_format": self.export_format, } - # Add S3 filesystem if available - if self.s3_filesystem is not None: - export_kwargs["export_extra_args"]["filesystem"] = self.s3_filesystem + # Add filesystem if available + if self.fs is not None: + export_kwargs["export_extra_args"]["filesystem"] = self.fs + if self.export_shard_size > 0: - # compute the min_rows_per_file for export methods dataset_nbytes = dataset.size_bytes() dataset_num_rows = dataset.count() - num_shards = int(dataset_nbytes / self.export_shard_size) + 1 - num_shards = min(num_shards, dataset_num_rows) - rows_per_file = int(dataset_num_rows / num_shards) - export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file + + if dataset_num_rows > 0: + num_shards = int(dataset_nbytes / self.export_shard_size) + 1 + num_shards = min(num_shards, dataset_num_rows) + rows_per_file = max(1, int(dataset_num_rows / num_shards)) + export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file + return export_method(dataset, export_path, **export_kwargs) def export(self, dataset, columns=None): @@ -236,7 +237,61 @@ def write_others(dataset, export_path, **kwargs): # Add S3 filesystem if available if "filesystem" in export_extra_args: filtered_kwargs["filesystem"] = export_extra_args["filesystem"] - return write_method(export_path, **filtered_kwargs) + if export_path: + return write_method(export_path, **filtered_kwargs) + else: + return write_method(**filtered_kwargs) + + @staticmethod + def write_iceberg(dataset, export_path, **kwargs): + """ + Export method for iceberg target tables. + Checks for table existence/connectivity. If check fails, safe fall-back to JSON. + """ + from pyiceberg.catalog import load_catalog + from pyiceberg.exceptions import NoSuchTableError + + export_extra_args = kwargs.get("export_extra_args", {}) + catalog_kwargs = export_extra_args.get("catalog_kwargs", {}) + table_identifier = export_extra_args.get("table_identifier", export_path) + + use_iceberg = False + + try: + catalog = load_catalog(**catalog_kwargs) + catalog.load_table(table_identifier) + logger.info(f"Iceberg table {table_identifier} exists. Writing to Iceberg.") + use_iceberg = True + + except NoSuchTableError as e: + logger.warning( + f"Iceberg target unavailable ({e.__class__.__name__}). Fallback to exporting to {export_path}..." + ) + except Exception as e: + logger.error(f"Unexpected error checking Iceberg: {e}. Fallback to exporting to {export_path}...") + + if use_iceberg: + try: + filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) + return dataset.write_iceberg(table_identifier, **filtered_kwargs) + except Exception as e: + logger.error(f"Write to Iceberg failed during execution: {e}. Fallback to json...") + + suffix = os.path.splitext(export_path)[-1].strip(".").lower() + if not suffix: + suffix = "jsonl" + logger.warning(f"No suffix found in {export_path}, using default fallback: {suffix}") + + logger.info(f"Falling back to file export. Format: [{suffix}], Path: [{export_path}]") + + fallback_kwargs = {} + if "filesystem" in export_extra_args: + fallback_kwargs["filesystem"] = export_extra_args["filesystem"] + if suffix in ["json", "jsonl"]: + return RayExporter.write_json(dataset, export_path, **fallback_kwargs) + else: + fallback_kwargs["export_format"] = suffix + return RayExporter.write_others(dataset, export_path, **fallback_kwargs) # suffix to export method @staticmethod @@ -250,8 +305,5 @@ def _router(): "jsonl": RayExporter.write_json, "json": RayExporter.write_json, "webdataset": RayExporter.write_webdataset, - "parquet": RayExporter.write_others, - "csv": RayExporter.write_others, - "tfrecords": RayExporter.write_others, - "lance": RayExporter.write_others, + "iceberg": RayExporter.write_iceberg, } diff --git a/data_juicer/ops/mapper/__init__.py b/data_juicer/ops/mapper/__init__.py index 08cbe84a35..82d7d29c41 100644 --- a/data_juicer/ops/mapper/__init__.py +++ b/data_juicer/ops/mapper/__init__.py @@ -74,6 +74,8 @@ RemoveWordsWithIncorrectSubstringsMapper, ) from .replace_content_mapper import ReplaceContentMapper +from .s3_download_file_mapper import S3DownloadFileMapper +from .s3_upload_file_mapper import S3UploadFileMapper from .sdxl_prompt2prompt_mapper import SDXLPrompt2PromptMapper from .sentence_augmentation_mapper import SentenceAugmentationMapper from .sentence_split_mapper import SentenceSplitMapper @@ -173,6 +175,8 @@ "RemoveTableTextMapper", "RemoveWordsWithIncorrectSubstringsMapper", "ReplaceContentMapper", + "S3DownloadFileMapper", + "S3UploadFileMapper", "SDXLPrompt2PromptMapper", "SentenceAugmentationMapper", "SentenceSplitMapper", diff --git a/data_juicer/ops/mapper/s3_download_file_mapper.py b/data_juicer/ops/mapper/s3_download_file_mapper.py new file mode 100644 index 0000000000..0cc5403d60 --- /dev/null +++ b/data_juicer/ops/mapper/s3_download_file_mapper.py @@ -0,0 +1,426 @@ +import asyncio +import copy +import os +import os.path as osp +from typing import List, Union + +from loguru import logger + +from data_juicer.ops.base_op import OPERATORS, Mapper +from data_juicer.utils.lazy_loader import LazyLoader +from data_juicer.utils.s3_utils import get_aws_credentials + +boto3 = LazyLoader("boto3", globals(), "boto3") +botocore = LazyLoader("botocore", globals(), "botocore") +OP_NAME = "s3_download_file_mapper" + + +@OPERATORS.register_module(OP_NAME) +class S3DownloadFileMapper(Mapper): + """Mapper to download files from S3 to local files or load them into memory. + + This operator downloads files from S3 URLs (s3://...) or handles local files. It supports: + - Downloading multiple files concurrently + - Saving files to a specified directory or loading content into memory + - Resume download functionality + - S3 authentication with access keys + - Custom S3 endpoints (for S3-compatible services like MinIO) + + The operator processes nested lists of URLs/paths, maintaining the original structure + in the output.""" + + _batched_op = True + + def __init__( + self, + download_field: str = None, + save_dir: str = None, + save_field: str = None, + resume_download: bool = False, + timeout: int = 30, + max_concurrent: int = 10, + # S3 credentials + aws_access_key_id: str = None, + aws_secret_access_key: str = None, + aws_session_token: str = None, + aws_region: str = None, + endpoint_url: str = None, + *args, + **kwargs, + ): + """ + Initialization method. + + :param download_field: The field name to get the URL/path to download. + :param save_dir: The directory to save downloaded files. + :param save_field: The field name to save the downloaded file content. + :param resume_download: Whether to resume download. If True, skip the sample if it exists. + :param timeout: (Deprecated) Kept for backward compatibility, not used for S3 downloads. + :param max_concurrent: Maximum concurrent downloads. + :param aws_access_key_id: AWS access key ID for S3. + :param aws_secret_access_key: AWS secret access key for S3. + :param aws_session_token: AWS session token for S3 (optional). + :param aws_region: AWS region for S3. + :param endpoint_url: Custom S3 endpoint URL (for S3-compatible services). + :param args: extra args + :param kwargs: extra args + """ + super().__init__(*args, **kwargs) + self._init_parameters = self.remove_extra_parameters(locals()) + + self.download_field = download_field + self.save_dir = save_dir + self.save_field = save_field + self.resume_download = resume_download + + if not (self.save_dir or self.save_field): + logger.warning( + "Both `save_dir` and `save_field` are not specified. Use the default `image_bytes` key to " + "save the downloaded contents." + ) + self.save_field = self.image_bytes_key + + if self.save_dir: + os.makedirs(self.save_dir, exist_ok=True) + + self.timeout = timeout + self.max_concurrent = max_concurrent + + # Prepare config dict for get_aws_credentials + ds_config = {} + if aws_access_key_id: + ds_config["aws_access_key_id"] = aws_access_key_id + if aws_secret_access_key: + ds_config["aws_secret_access_key"] = aws_secret_access_key + if aws_session_token: + ds_config["aws_session_token"] = aws_session_token + if aws_region: + ds_config["aws_region"] = aws_region + if endpoint_url: + ds_config["endpoint_url"] = endpoint_url + + # Get credentials with priority: environment variables > operator parameters + ( + resolved_access_key_id, + resolved_secret_access_key, + resolved_session_token, + resolved_region, + ) = get_aws_credentials(ds_config) + + # Store S3 configuration (don't create client here to avoid serialization issues) + self.s3_config = None + self._s3_client = None + if resolved_access_key_id and resolved_secret_access_key: + self.s3_config = { + "aws_access_key_id": resolved_access_key_id, + "aws_secret_access_key": resolved_secret_access_key, + } + if resolved_session_token: + self.s3_config["aws_session_token"] = resolved_session_token + if resolved_region: + self.s3_config["region_name"] = resolved_region + if endpoint_url: + self.s3_config["endpoint_url"] = endpoint_url + logger.info(f"S3 configuration stored with endpoint: {endpoint_url or 'default'}") + else: + logger.info("No S3 credentials provided. S3 URLs will not be supported.") + + @property + def s3_client(self): + """Lazy initialization of S3 client to avoid serialization issues with Ray.""" + if self._s3_client is None and self.s3_config is not None: + from botocore.config import Config + + config = Config( + connect_timeout=self.timeout, + read_timeout=self.timeout, + retries={"max_attempts": 3}, # 可选:建议增加重试机制 + ) + self.s3_config["config"] = config + self._s3_client = boto3.client("s3", **self.s3_config) + logger.debug(f"S3 client initialized with timeout={self.timeout}") + return self._s3_client + + def _is_s3_url(self, url: str) -> bool: + """Check if the URL is an S3 URL.""" + return url.startswith("s3://") + + def _parse_s3_url(self, s3_url: str): + """Parse S3 URL into bucket and key. + + Example: s3://bucket-name/path/to/file.mp4 -> ('bucket-name', 'path/to/file.mp4') + """ + if not s3_url.startswith("s3://"): + raise ValueError(f"Invalid S3 URL: {s3_url}") + + parts = s3_url[5:].split("/", 1) + bucket = parts[0] + key = parts[1] if len(parts) > 1 else "" + + return bucket, key + + def _download_from_s3(self, s3_url: str, save_path: str = None, return_content: bool = False): + """Download a file from S3. + + :param s3_url: S3 URL (s3://bucket/key) + :param save_path: Local path to save the file + :param return_content: Whether to return file content as bytes + :return: (status, response, content, save_path) + """ + if not self.s3_client: + raise ValueError("S3 client not initialized. Please provide AWS credentials.") + + try: + bucket, key = self._parse_s3_url(s3_url) + + if save_path: + # Ensure parent directory exists + save_dir = os.path.dirname(save_path) + if save_dir: + os.makedirs(save_dir, exist_ok=True) + + # Download to file + try: + tmp_path = save_path + ".tmp" + self.s3_client.download_file(bucket, key, tmp_path) + os.rename(tmp_path, save_path) + logger.debug(f"Downloaded S3 file: {s3_url} -> {save_path}") + except Exception as e: + if os.path.exists(tmp_path): + os.remove(tmp_path) + raise e + + # Read content if needed + content = None + if return_content: + with open(save_path, "rb") as f: + content = f.read() + + return "success", None, content, save_path + + elif return_content: + # Download to memory + response = self.s3_client.get_object(Bucket=bucket, Key=key) + content = response["Body"].read() + logger.debug(f"Downloaded S3 file to memory: {s3_url}") + + return "success", None, content, None + + else: + return "success", None, None, None + + except botocore.exceptions.ClientError as e: + error_msg = f"S3 download failed: {e}" + logger.error(error_msg) + return "failed", error_msg, None, None + except Exception as e: + error_msg = f"S3 download error: {e}" + logger.error(error_msg) + return "failed", error_msg, None, None + + async def download_files_async(self, urls, return_contents, save_dir=None, **kwargs): + """Download files asynchronously from S3.""" + + async def _download_file( + semaphore: asyncio.Semaphore, + idx: int, + url: str, + save_dir=None, + return_content=False, + **kwargs, + ) -> dict: + async with semaphore: + try: + status, response, content, save_path = "success", None, None, None + + # Handle S3 URLs (synchronous operation in async context) + if self._is_s3_url(url): + if save_dir: + filename = os.path.basename(self._parse_s3_url(url)[1]) + save_path = osp.join(save_dir, filename) + + # Check if file exists and resume is enabled + if os.path.exists(save_path) and self.resume_download: + if return_content: + with open(save_path, "rb") as f: + content = f.read() + return idx, save_path, status, response, content + + # Download from S3 (run in executor to avoid blocking) + loop = asyncio.get_event_loop() + status, response, content, save_path = await loop.run_in_executor( + None, self._download_from_s3, url, save_path, return_content + ) + return idx, save_path, status, response, content + + # Check for HTTP/HTTPS URLs - not supported + if url.startswith("http://") or url.startswith("https://"): + raise ValueError( + f"HTTP/HTTPS URLs are not supported. This mapper only supports S3 URLs (s3://...) and local files. Got: {url}" + ) + + # Handle local files + if return_content: + with open(url, "rb") as f: + content = f.read() + if save_dir: + save_path = url + + return idx, save_path, status, response, content + + except Exception as e: + status = "failed" + response = str(e) + save_path = None + content = None + + return idx, save_path, status, response, content + + semaphore = asyncio.Semaphore(self.max_concurrent) + tasks = [ + _download_file(semaphore, idx, url, save_dir, return_contents[idx], **kwargs) + for idx, url in enumerate(urls) + ] + results = await asyncio.gather(*tasks) + results.sort(key=lambda x: x[0]) + + return results + + def _flat_urls(self, nested_urls): + """Flatten nested URLs while preserving structure information.""" + flat_urls = [] + structure_info = [] # save as original index, sub index + + for idx, urls in enumerate(nested_urls): + if isinstance(urls, list): + for sub_idx, url in enumerate(urls): + flat_urls.append(url) + structure_info.append((idx, sub_idx)) + else: + flat_urls.append(urls) + structure_info.append((idx, -1)) # -1 means single str element + + return flat_urls, structure_info + + def _create_path_struct(self, nested_urls, keep_failed_url=True) -> List[Union[str, List[str]]]: + """Create path structure for output.""" + if keep_failed_url: + reconstructed = copy.deepcopy(nested_urls) + else: + reconstructed = [] + for item in nested_urls: + if isinstance(item, list): + reconstructed.append([None] * len(item)) + else: + reconstructed.append(None) + + return reconstructed + + def _create_save_field_struct(self, nested_urls, save_field_contents=None) -> List[Union[bytes, List[bytes]]]: + """Create save field structure for output.""" + if save_field_contents is None: + save_field_contents = [] + for item in nested_urls: + if isinstance(item, list): + save_field_contents.append([None] * len(item)) + else: + save_field_contents.append(None) + else: + # check whether the save_field_contents format is correct and correct it automatically + for i, item in enumerate(nested_urls): + if isinstance(item, list): + if not save_field_contents[i] or len(save_field_contents[i]) != len(item): + save_field_contents[i] = [None] * len(item) + + return save_field_contents + + async def download_nested_urls( + self, nested_urls: List[Union[str, List[str]]], save_dir=None, save_field_contents=None + ): + """Download nested URLs with structure preservation.""" + flat_urls, structure_info = self._flat_urls(nested_urls) + + if save_field_contents is None: + # not save contents, set return_contents to False + return_contents = [False] * len(flat_urls) + else: + # if original content None, set bool value to True to get content else False to skip reload it + return_contents = [] + for item in save_field_contents: + if isinstance(item, list): + return_contents.extend([not c for c in item]) + else: + return_contents.append(not item) + + download_results = await self.download_files_async( + flat_urls, + return_contents, + save_dir, + ) + + if self.save_dir: + reconstructed_path = self._create_path_struct(nested_urls) + else: + reconstructed_path = None + + failed_info = "" + for i, (idx, save_path, status, response, content) in enumerate(download_results): + orig_idx, sub_idx = structure_info[i] + if status != "success": + save_path = flat_urls[i] + failed_info += "\n" + str(response) + + if save_field_contents is not None: + if return_contents[i]: + if sub_idx == -1: + save_field_contents[orig_idx] = content + else: + save_field_contents[orig_idx][sub_idx] = content + + if self.save_dir: + if sub_idx == -1: + reconstructed_path[orig_idx] = save_path + else: + reconstructed_path[orig_idx][sub_idx] = save_path + + return save_field_contents, reconstructed_path, failed_info + + def process_batched(self, samples): + """Process a batch of samples.""" + if self.download_field not in samples or not samples[self.download_field]: + return samples + + batch_nested_urls = samples[self.download_field] + + if self.save_field: + if not self.resume_download: + if self.save_field in samples: + raise ValueError( + f"{self.save_field} is already in samples. " + f"If you want to resume download, please set `resume_download=True`" + ) + save_field_contents = self._create_save_field_struct(batch_nested_urls) + else: + if self.save_field not in samples: + save_field_contents = self._create_save_field_struct(batch_nested_urls) + else: + save_field_contents = self._create_save_field_struct(batch_nested_urls, samples[self.save_field]) + else: + save_field_contents = None + + save_field_contents, reconstructed_path, failed_info = asyncio.run( + self.download_nested_urls( + batch_nested_urls, save_dir=self.save_dir, save_field_contents=save_field_contents + ) + ) + + if self.save_dir: + samples[self.download_field] = reconstructed_path + + if self.save_field: + samples[self.save_field] = save_field_contents + + if len(failed_info): + logger.error(f"Failed files:\n{failed_info}") + + return samples diff --git a/data_juicer/ops/mapper/s3_upload_file_mapper.py b/data_juicer/ops/mapper/s3_upload_file_mapper.py new file mode 100644 index 0000000000..5582b7b265 --- /dev/null +++ b/data_juicer/ops/mapper/s3_upload_file_mapper.py @@ -0,0 +1,321 @@ +import asyncio +import os +from typing import List, Union + +from loguru import logger + +from data_juicer.ops.base_op import OPERATORS, Mapper +from data_juicer.utils.lazy_loader import LazyLoader +from data_juicer.utils.s3_utils import get_aws_credentials + +boto3 = LazyLoader("boto3", globals(), "boto3") +botocore = LazyLoader("botocore", globals(), "botocore") +OP_NAME = "s3_upload_file_mapper" + + +@OPERATORS.register_module(OP_NAME) +class S3UploadFileMapper(Mapper): + """Mapper to upload local files to S3 and update paths to S3 URLs. + + This operator uploads files from local paths to S3 storage. It supports: + - Uploading multiple files concurrently + - Updating file paths in the dataset to S3 URLs + - Optional deletion of local files after successful upload + - Custom S3 endpoints (for S3-compatible services like MinIO) + - Skipping already uploaded files (based on S3 key) + + The operator processes nested lists of paths, maintaining the original structure + in the output.""" + + _batched_op = True + + def __init__( + self, + upload_field: str = None, + s3_bucket: str = None, + s3_prefix: str = "", + # S3 credentials + aws_access_key_id: str = None, + aws_secret_access_key: str = None, + aws_session_token: str = None, + aws_region: str = None, + endpoint_url: str = None, + # Upload options + remove_local: bool = False, + skip_existing: bool = True, + max_concurrent: int = 10, + *args, + **kwargs, + ): + """ + Initialization method. + + :param upload_field: The field name containing file paths to upload. + :param s3_bucket: S3 bucket name to upload files to. + :param s3_prefix: Prefix (folder path) in S3 bucket. E.g., 'videos/' or 'data/videos/'. + :param aws_access_key_id: AWS access key ID for S3. + :param aws_secret_access_key: AWS secret access key for S3. + :param aws_session_token: AWS session token for S3 (optional). + :param aws_region: AWS region for S3. + :param endpoint_url: Custom S3 endpoint URL (for S3-compatible services). + :param remove_local: Whether to delete local files after successful upload. + :param skip_existing: Whether to skip uploading if file already exists in S3. + :param max_concurrent: Maximum concurrent uploads. + :param args: extra args + :param kwargs: extra args + """ + super().__init__(*args, **kwargs) + self._init_parameters = self.remove_extra_parameters(locals()) + + self.upload_field = upload_field + self.s3_bucket = s3_bucket + self.s3_prefix = s3_prefix.rstrip("/") + "/" if s3_prefix and not s3_prefix.endswith("/") else s3_prefix or "" + self.remove_local = remove_local + self.skip_existing = skip_existing + self.max_concurrent = max_concurrent + + if not self.s3_bucket: + raise ValueError("s3_bucket must be specified") + + # Prepare config dict for get_aws_credentials + ds_config = {} + if aws_access_key_id: + ds_config["aws_access_key_id"] = aws_access_key_id + if aws_secret_access_key: + ds_config["aws_secret_access_key"] = aws_secret_access_key + if aws_session_token: + ds_config["aws_session_token"] = aws_session_token + if aws_region: + ds_config["aws_region"] = aws_region + if endpoint_url: + ds_config["endpoint_url"] = endpoint_url + + # Get credentials with priority: environment variables > operator parameters + ( + resolved_access_key_id, + resolved_secret_access_key, + resolved_session_token, + resolved_region, + ) = get_aws_credentials(ds_config) + + if not (resolved_access_key_id and resolved_secret_access_key): + raise ValueError( + "AWS credentials (aws_access_key_id and aws_secret_access_key) must be provided " + "either through operator parameters or environment variables (AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY)" + ) + + # Store S3 configuration (don't create client here to avoid serialization issues) + self.s3_config = { + "aws_access_key_id": resolved_access_key_id, + "aws_secret_access_key": resolved_secret_access_key, + } + if resolved_session_token: + self.s3_config["aws_session_token"] = resolved_session_token + if resolved_region: + self.s3_config["region_name"] = resolved_region + if endpoint_url: + self.s3_config["endpoint_url"] = endpoint_url + + self._s3_client = None + logger.info( + f"S3 upload mapper initialized: bucket={s3_bucket}, prefix={self.s3_prefix}, endpoint={endpoint_url or 'default'}" + ) + + @property + def s3_client(self): + """Lazy initialization of S3 client to avoid serialization issues with Ray.""" + if self._s3_client is None: + self._s3_client = boto3.client("s3", **self.s3_config) + logger.debug("S3 client initialized (lazy)") + return self._s3_client + + def _is_s3_url(self, path: str) -> bool: + """Check if the path is already an S3 URL.""" + return isinstance(path, str) and path.startswith("s3://") + + def _check_s3_exists(self, s3_key: str) -> bool: + """Check if a file exists in S3.""" + try: + self.s3_client.head_object(Bucket=self.s3_bucket, Key=s3_key) + return True + except botocore.exceptions.ClientError: + return False + + def _upload_to_s3(self, local_path: str) -> tuple: + """Upload a single file to S3. + + :param local_path: Local file path to upload + :return: (status, s3_url, error_message) + """ + # Already an S3 URL, skip + if self._is_s3_url(local_path): + logger.debug(f"Path is already S3 URL: {local_path}") + return "skipped", local_path, None + + # Check if file exists locally + if not os.path.exists(local_path): + error_msg = f"Local file not found: {local_path}" + logger.warning(error_msg) + return "failed", local_path, error_msg + + try: + # Construct S3 key + filename = os.path.basename(local_path) + s3_key = self.s3_prefix + filename + s3_url = f"s3://{self.s3_bucket}/{s3_key}" + + # Check if file already exists in S3 + if self.skip_existing and self._check_s3_exists(s3_key): + logger.debug(f"File already exists in S3, skipping: {s3_url}") + + # Delete local file if configured + if self.remove_local: + try: + os.remove(local_path) + logger.debug(f"Removed local file: {local_path}") + except Exception as e: + logger.warning(f"Failed to remove local file {local_path}: {e}") + + return "exists", s3_url, None + + # Upload to S3 + self.s3_client.upload_file(local_path, self.s3_bucket, s3_key) + logger.info(f"Uploaded: {local_path} -> {s3_url}") + + # Delete local file if configured + if self.remove_local: + try: + os.remove(local_path) + logger.debug(f"Removed local file: {local_path}") + except Exception as e: + logger.warning(f"Failed to remove local file {local_path}: {e}") + + return "success", s3_url, None + + except botocore.exceptions.ClientError as e: + error_msg = f"S3 upload failed: {e}" + logger.error(error_msg) + return "failed", local_path, error_msg + except Exception as e: + error_msg = f"Upload error: {e}" + logger.error(error_msg) + return "failed", local_path, error_msg + + async def upload_files_async(self, paths: List[str]) -> List[tuple]: + """Upload multiple files asynchronously. + + :param paths: List of local file paths + :return: List of (idx, status, s3_url, error_message) tuples + """ + + async def _upload_file(semaphore: asyncio.Semaphore, idx: int, path: str) -> tuple: + async with semaphore: + try: + # Upload to S3 (run in executor to avoid blocking) + loop = asyncio.get_event_loop() + status, s3_url, error = await loop.run_in_executor(None, self._upload_to_s3, path) + return idx, status, s3_url, error + except Exception as e: + error_msg = f"Upload error: {e}" + logger.error(error_msg) + return idx, "failed", path, error_msg + + semaphore = asyncio.Semaphore(self.max_concurrent) + tasks = [_upload_file(semaphore, idx, path) for idx, path in enumerate(paths)] + results = await asyncio.gather(*tasks) + results = list(results) if not isinstance(results, list) else results + results.sort(key=lambda x: x[0]) + + return results + + def _flat_paths(self, nested_paths): + """Flatten nested paths while preserving structure information.""" + flat_paths = [] + structure_info = [] # (original_index, sub_index) + + for idx, paths in enumerate(nested_paths): + if isinstance(paths, list): + for sub_idx, path in enumerate(paths): + flat_paths.append(path) + structure_info.append((idx, sub_idx)) + else: + flat_paths.append(paths) + structure_info.append((idx, -1)) # -1 means single element + + return flat_paths, structure_info + + def _create_path_struct(self, nested_paths) -> List: + """Create path structure for output.""" + reconstructed = [] + for item in nested_paths: + if isinstance(item, list): + reconstructed.append([None] * len(item)) + else: + reconstructed.append(None) + return reconstructed + + async def upload_nested_paths(self, nested_paths: List[Union[str, List[str]]]): + """Upload nested paths with structure preservation. + + :param nested_paths: Nested list of file paths + :return: (reconstructed_paths, failed_info) + """ + flat_paths, structure_info = self._flat_paths(nested_paths) + + # Upload all files asynchronously + upload_results = await self.upload_files_async(flat_paths) + + # Reconstruct nested structure + reconstructed_paths = self._create_path_struct(nested_paths) + + failed_info = "" + success_count = 0 + failed_count = 0 + skipped_count = 0 + exists_count = 0 + + for i, (idx, status, s3_url, error) in enumerate(upload_results): + orig_idx, sub_idx = structure_info[i] + + if status == "success": + success_count += 1 + elif status == "failed": + failed_count += 1 + if error: + failed_info += f"\n{flat_paths[i]}: {error}" + elif status == "skipped": + skipped_count += 1 + elif status == "exists": + exists_count += 1 + + # Update path in reconstructed structure + if sub_idx == -1: + reconstructed_paths[orig_idx] = s3_url + else: + reconstructed_paths[orig_idx][sub_idx] = s3_url + + # Log summary + logger.info( + f"Upload summary: {success_count} uploaded, {exists_count} already exists, " + f"{skipped_count} skipped, {failed_count} failed" + ) + + return reconstructed_paths, failed_info + + def process_batched(self, samples): + """Process a batch of samples.""" + if self.upload_field not in samples or not samples[self.upload_field]: + return samples + + batch_nested_paths = samples[self.upload_field] + + # Upload files and get S3 URLs + reconstructed_paths, failed_info = asyncio.run(self.upload_nested_paths(batch_nested_paths)) + + # Update the field with S3 URLs + samples[self.upload_field] = reconstructed_paths + + if len(failed_info): + logger.error(f"Failed uploads:\n{failed_info}") + + return samples diff --git a/data_juicer/utils/s3_utils.py b/data_juicer/utils/s3_utils.py index 650f10f9c6..2a5c76a6a1 100644 --- a/data_juicer/utils/s3_utils.py +++ b/data_juicer/utils/s3_utils.py @@ -6,7 +6,7 @@ """ import os -from typing import Dict, Tuple +from typing import Any, Dict, Tuple import pyarrow.fs from loguru import logger @@ -117,3 +117,30 @@ def validate_s3_path(path: str) -> None: """ if not path.startswith("s3://"): raise ValueError(f"S3 path must start with 's3://', got: {path}") + + +def create_filesystem_from_args(path: str, args: Dict[str, Any]): + """ + Create a PyArrow FileSystem based on the path prefix and parameters. + Automatically extract relevant credentials from args and remove them from args (using pop) to avoid polluting subsequent parameters. + """ + fs = None + if path.startswith("s3://"): + validate_s3_path(path) + + s3_keys = ["aws_access_key_id", "aws_secret_access_key", "aws_session_token", "aws_region", "endpoint_url"] + s3_conf = {k: args.pop(k) for k in s3_keys if k in args} + fs = create_pyarrow_s3_filesystem(s3_conf) + logger.info(f"Detected S3 export path: {path}. S3 filesystem configured.") + + elif path.startswith("hdfs://"): + import pyarrow.fs as pa_fs + + hdfs_keys = ["host", "port", "user", "kerb_ticket", "extra_conf"] + hdfs_conf = {k: args.pop(k) for k in hdfs_keys if k in args} + if "port" in hdfs_conf: + hdfs_conf["port"] = int(hdfs_conf["port"]) + fs = pa_fs.HadoopFileSystem(**hdfs_conf) + logger.info(f"Detected HDFS export path: {path}. HDFS filesystem configured.") + + return fs diff --git a/demos/process_dist_sources/config/process_from_hdfs.yaml b/demos/process_dist_sources/config/process_from_hdfs.yaml new file mode 100644 index 0000000000..d3787b16ca --- /dev/null +++ b/demos/process_dist_sources/config/process_from_hdfs.yaml @@ -0,0 +1,76 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo' + + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/demo-processed-ray' + +dataset: + configs: + - type: remote + source: hdfs + path: hdfs://your_hdfs_path/demo-dataset.jsonl + host: your_hdfs_host + port: 8020 + user: your_username + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/demos/process_dist_sources/config/process_from_iceberg.yaml b/demos/process_dist_sources/config/process_from_iceberg.yaml new file mode 100644 index 0000000000..f67f693489 --- /dev/null +++ b/demos/process_dist_sources/config/process_from_iceberg.yaml @@ -0,0 +1,83 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo' +executor_type: ray +ray_address: local + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/' + +dataset: + configs: + - type: remote + source: iceberg + table_identifier: default.test + catalog_kwargs: + type: sql + name: local + uri: sqlite:///your.db + warehouse: s3://yourbucket/your_warehouse_path + s3.endpoint: http://yourhost:yourport + s3.access-key-id: your_access_key + s3.secret-access-key: your_secret_key + s3.region: us-east-1 + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/demos/process_dist_sources/config/process_from_s3.yaml b/demos/process_dist_sources/config/process_from_s3.yaml new file mode 100644 index 0000000000..3f653cb5c5 --- /dev/null +++ b/demos/process_dist_sources/config/process_from_s3.yaml @@ -0,0 +1,78 @@ +project_name: 'demo' +executor_type: ray +ray_address: local + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/' + + +dataset: + configs: + - type: remote + source: s3 + path: s3://datajuicer/demo-dataset.jsonl + aws_access_key_id: your_access_key # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) + aws_secret_access_key: your_secret_key + aws_region: us-east-1 + endpoint_url: http://yourhost:yourport # Optional, for S3-compatible object storage + + + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/demos/process_dist_sources/data/demo-dataset.json b/demos/process_dist_sources/data/demo-dataset.json new file mode 100644 index 0000000000..9df0c2513e --- /dev/null +++ b/demos/process_dist_sources/data/demo-dataset.json @@ -0,0 +1,147 @@ +[ + { + "text": "Give three tips for staying healthy.", + "input": "", + "output": "1.Eat a balanced diet and make sure to include plenty of fruits and vegetables. \n2. Exercise regularly to keep your body active and strong. \n3. Get enough sleep and maintain a consistent sleep schedule." + }, + { + "text": "What are the three primary colors?", + "input": "", + "output": "The three primary colors are red, blue, and yellow." + }, + { + "text": "Describe the structure of an atom.", + "input": "", + "output": "An atom is made up of a nucleus, which contains protons and neutrons, surrounded by electrons that travel in orbits around the nucleus. The protons and neutrons have a positive charge, while the electrons have a negative charge, resulting in an overall neutral atom. The number of each particle determines the atomic number and the type of atom." + }, + { + "text": "How can we reduce air pollution?", + "input": "", + "output": "There are a number of ways to reduce air pollution, such as shifting to renewable energy sources, encouraging the use of public transportation, prohibiting the burning of fossil fuels, implementing policies to reduce emissions from industrial sources, and implementing vehicle emissions standards. Additionally, individuals can do their part to reduce air pollution by reducing car use, avoiding burning materials such as wood, and changing to energy efficient appliances." + }, + { + "text": "Describe a time when you had to make a difficult decision.", + "input": "", + "output": "I had to make a difficult decision when I was working as a project manager at a construction company. I was in charge of a project that needed to be completed by a certain date in order to meet the client\u2019s expectations. However, due to unexpected delays, we were not able to meet the deadline and so I had to make a difficult decision. I decided to extend the deadline, but I had to stretch the team\u2019s resources even further and increase the budget. Although it was a risky decision, I ultimately decided to go ahead with it to ensure that the project was completed on time and that the client\u2019s expectations were met. The project was eventually successfully completed and this was seen as a testament to my leadership and decision-making abilities." + }, + { + "text": "Identify the odd one out.", + "input": "Twitter, Instagram, Telegram", + "output": "Telegram" + }, + { + "text": "Explain why the following fraction is equivalent to 1/4", + "input": "4/16", + "output": "The fraction 4/16 is equivalent to 1/4 because both numerators and denominators are divisible by 4. Dividing both the top and bottom numbers by 4 yields the fraction 1/4." + }, + { + "text": "Write a short story in third person narration about a protagonist who has to make an important career decision.", + "input": "", + "output": "John was at a crossroads in his life. He had just graduated college and was now facing the big decision of what career to pursue. After much deliberation, he decided that he wanted to be an accountant and help the financially disadvantaged. He had always been good with numbers and enjoyed seeing the tangible results of his work. \n\nJohn enrolled in accounting courses and initially found it quite challenging. He had to learn multiple systems and regulations quickly, but he worked hard and eventually excelled in his studies. After a few years, John started working at an accounting firm in his city. He was eager to put his knowledge of taxes and accounting to use in a real-world setting.\n\nJohn loved his job, as it let him express his creativity in finding strategies to save his clients money. After a few years at the firm, he became a senior accountant and was asked to manage bigger and more challenging cases. He was now a respected figure in the financial industry, but he still remembers when he was just a recent college graduate, unsure of the direction in which his life would take him." + }, + { + "text": "Render a 3D model of a house", + "input": "", + "output": " This type of text cannot be fulfilled by a GPT model." + }, + { + "text": "Evaluate this sentence for spelling and grammar mistakes", + "input": "He finnished his meal and left the resturant", + "output": "He finished his meal and left the restaurant." + }, + { + "text": "How did Julius Caesar die?", + "input": "", + "output": "Julius Caesar was assassinated by a group of up to 60 conspirators, led by Gaius Cassius Longinus and Marcus Junius Brutus, in the Senate House on the Ides of March (15 March) of 44 BC." + }, + { + "text": "What is the capital of France?", + "input": "", + "output": "The capital of France is Paris." + }, + { + "text": "Generate a list of ten items a person might need for a camping trip", + "input": "", + "output": "1. Tent\n2. Sleeping bags\n3. Flashlight\n4. Matches/lighter\n5. Insect repellent\n6. Extra clothes\n7. First aid kit\n8. Camp stove\n9. Cooking utensils\n10. Biodegradable soaps/shampoos" + }, + { + "text": "Discuss the causes of the Great Depression", + "input": "", + "output": "The Great Depression was a severe global economic downturn that lasted from 1929 to 1939. It was caused by several factors, including the stock market crash of 1929, structural weaknesses in the economy, low consumer spending, high unemployment, and a lack of confidence in the banking system. Other contributing factors included overproduction of commodities, reliance on borrowing to finance investments, agricultural overproduction, and a decrease in international trade." + }, + { + "text": "Classify the following into animals, plants, and minerals", + "input": "Oak tree, copper ore, elephant", + "output": "Oak tree: Plant\n Copper ore: Mineral\n Elephant: Animal" + }, + { + "text": "Explain the use of word embeddings in Natural Language Processing", + "input": "", + "output": "Word embeddings are one of the most powerful tools available for Natural Language Processing (NLP). They are mathematical representations of words or phrases in a vector space, allowing similarities between words and the context in which they are used to be measured. Word embeddings are useful for tasks such as sentiment analysis, text classification, predicting the next word in a sequence, and understanding synonyms and analogies. They allow for words to be processed as numerical values, giving machines an easier way to perform NLP tasks." + }, + { + "text": "Describe the function of a computer motherboard", + "input": "", + "output": "A computer motherboard is the main circuit board in a computer. It is the backbone of the system, connecting all other components such as the processor, RAM, storage, graphics card and peripheral ports. It provides communication pathways between all of these components, allowing them to interact and work together. The motherboard is responsible for regulating power, and also houses slots for expansion cards, as well as ports for USB, audio, Ethernet, and other connection types." + }, + { + "text": "Reverse engineer this code to create a new version", + "input": "def factorialize(num):\n factorial = 1\n for i in range(1, num):\n factorial *= i\n \n return factorial", + "output": "def factorialize(num):\n factorial = 1\n for i in range(num, 0, -1):\n factorial *= i\n \n return factorial" + }, + { + "text": "Propose an ethical solution to the problem of data privacy", + "input": "", + "output": "One ethical solution to the problem of data privacy is to implement an opt-in data collection policy. This means that companies cannot collect and use personal data without the explicit permission of the data owners. This approach gives users the power to determine which data companies can use and for what purpose. Companies should also be transparent about how they use the collected data, so that users can make informed decisions and have control over their own data." + }, + { + "text": "Generate three verbs that mean the same as \"to apologize\"", + "input": "", + "output": "Acknowledge, express regret, make amends." + }, + { + "text": "What does DNA stand for?", + "input": "", + "output": "DNA stands for deoxyribonucleic acid." + }, + { + "text": "Compare and contrast the Cuban Missile Crisis and the Vietnam War.", + "input": "", + "output": "The Cuban Missile Crisis and the Vietnam War were two of the largest and deadliest conflicts of the 20th century. The Cuban Missile Crisis involved a standoff between the United States and the Soviet Union over Soviet nuclear missile bases in Cuba, while the Vietnam War was a prolonged military conflict in Indochina between the North, which was backed by the Soviet Union and the South, which was backed by the United States. The Cuban Missile Crisis unfolded over a period of 13 days in October 1962, with the resolution coming in the form of a diplomatic agreement. The Vietnam War lasted for 19 years, beginning in 1954 and ending in 1973, with the signing of the Paris Peace Accords. While both of these conflicts were Cold War related, they were also very different in terms of causes, objectives, strategic objectives and outcomes. The Cuban Missile Crisis did not result in any direct military conflict and was resolved primarily through diplomacy and the decision to withdraw nuclear weapons from Cuba. On the other hand, the Vietnam War was a prolonged series of battles resulting in significant losses for both sides. The Vietnam War also had a significant impact on the United States, as it highlighted the realities of a prolonged conflict and the limits of U.S. military intervention. Additionally, the Vietnam War also drastically changed the way public opinion viewed the military and the use of force." + }, + { + "text": "Generate a list of random words.", + "input": "", + "output": "sneeze, conflict, ancestor, thunder, companion, amulet, forge, serpent, magnify, jester, cascade, universe, surge, nausea, oasis, anomaly, pursue, rapture, pagoda, mosaic." + }, + { + "text": "Transcribe the recording into text.", + "input": "Recording about the history of Waterloo", + "output": "The Battle of Waterloo, fought on June 18, 1815, was the last major battle of the Napoleonic Wars. It marked the end of Napoleon Bonaparte's rule. The battle was fought between Napoleon Bonaparte's French forces and an allied force of British, Dutch, Belgian and Prussian troops, led by the Duke of Wellington. It was fought near the town of Waterloo in the former Austrian Netherlands, now present-day Belgium. The battle was a defeat for Napoleon's forces and ended Napoleon's rule over France. Napoleon himself was exiled to St. Helena, where he would die in 1821. The battle marked the end of French domination of Europe and the beginning of a period of peace in the region. It was also the last major battle of the Napoleonic Wars and is remembered as one of the most important battles in history. The victory of the allied forces at Waterloo helped to bring an end to French imperial ambitions in Europe. It also established the British as one of the most powerful nations in Europe. The battle was fought with a combination of infantry, cavalry, and artillery tactics, and showed the beginning of the changing nature of warfare. Although the French forces greatly outnumbered the allies, their strategic and tactical mistake led to the loss of the battle. This defeat signaled the end of French imperial power in Europe. The legacy of Waterloo still stands today, and it cemented the British Empire's position for the next one hundred years." + }, + { + "text": "Who is the world's most famous painter?", + "input": "", + "output": "The world's most famous painter is generally considered to be the Italian renaissance artist Leonardo da Vinci. His work includes the Mona Lisa and The Last Supper, both of which are considered masterpieces. Da Vinci was also a master of science, inventing the helicopter and the tank, among other inventions. He was also a prolific musician, sculptor, and architect. His work has inspired generations of artists and continues to be revered today." + }, + { + "text": "Explain the concept of cogging torque.", + "input": "", + "output": "Cogging torque is the torque that is applied to a rotating device when the device experiences resistance from the teeth of the gears that are connected to the device. Cogging torque occurs due to the reaction force generated when the teeth of the gear drive mesh. This force pushes against each other, resulting in a drag force that is transmitted from the gears to the shaft. The magnitude of the cogging torque can vary depending on the size and number of teeth, as well as the gear's material and design. Cogging torque is undesirable in most motorized applications, as it can lead to loss of efficiency. To reduce cogging torque, gear designs are often symmetrical or have certain tooth profiles which reduce the force of contact between the teeth when they come into contact with each other." + }, + { + "text": "Look up the boiling point of water.", + "input": "", + "output": "The boiling point of water is 100\u00b0C (212\u00b0F), at standard atmospheric pressure. When heated at higher pressures, water's boiling point increases. For example, water will boil at a temperature of 115\u00b0C (239\u00b0F) at an atmospheric pressure of 1 bar. At higher pressures, such as those experienced at the bottom of the ocean, the boiling point of water can be as high as 374\u00b0C (705\u00b0F)." + }, + { + "text": "Describe the life and reign of King Charles II.", + "input": "", + "output": "King Charles II ascended to the throne of England following the English Civil War and the death of his father, Charles I. He was crowned in 1661 and his reign lasted until his death in 1685. Charles II was also a patron of the arts, working with some of the greatest painters and sculptors of his day. He also encouraged theater and sports and had an interest in sciences such as astronomy and natural history. Charles II was a moderate ruler who respected the civil and religious liberties of his citizens. He was tolerant of other religions and gave Jews and Catholics more rights in the kingdom. Charles II was also a strong supporter of the Royal Navy, which saw England increase its power in the world. He also promoted the development of industry and commerce, which helped to modernize the country. Charles II's reign was an important step in restoring the monarchy of England after the turmoil of the English Civil War." + }, + { + "text": "Find the area of a circle given its radius.", + "input": "Radius = 4", + "output": "The area of a circle with a radius of 4 is equal to 12.5664 square units. This is calculated by using the formula A = \u03c0r2, where A is the area, \u03c0 is roughly equal to 3.1416 and r is the radius of the circle." + } +] diff --git a/demos/process_dist_sources/data/demo-dataset.jsonl b/demos/process_dist_sources/data/demo-dataset.jsonl new file mode 100644 index 0000000000..a52d3af6d7 --- /dev/null +++ b/demos/process_dist_sources/data/demo-dataset.jsonl @@ -0,0 +1,146 @@ + + { + "text": "Give three tips for staying healthy.", + "input": "", + "output": "1.Eat a balanced diet and make sure to include plenty of fruits and vegetables. \n2. Exercise regularly to keep your body active and strong. \n3. Get enough sleep and maintain a consistent sleep schedule." + } + { + "text": "What are the three primary colors?", + "input": "", + "output": "The three primary colors are red, blue, and yellow." + } + { + "text": "Describe the structure of an atom.", + "input": "", + "output": "An atom is made up of a nucleus, which contains protons and neutrons, surrounded by electrons that travel in orbits around the nucleus. The protons and neutrons have a positive charge, while the electrons have a negative charge, resulting in an overall neutral atom. The number of each particle determines the atomic number and the type of atom." + } + { + "text": "How can we reduce air pollution?", + "input": "", + "output": "There are a number of ways to reduce air pollution, such as shifting to renewable energy sources, encouraging the use of public transportation, prohibiting the burning of fossil fuels, implementing policies to reduce emissions from industrial sources, and implementing vehicle emissions standards. Additionally, individuals can do their part to reduce air pollution by reducing car use, avoiding burning materials such as wood, and changing to energy efficient appliances." + } + { + "text": "Describe a time when you had to make a difficult decision.", + "input": "", + "output": "I had to make a difficult decision when I was working as a project manager at a construction company. I was in charge of a project that needed to be completed by a certain date in order to meet the client\u2019s expectations. However, due to unexpected delays, we were not able to meet the deadline and so I had to make a difficult decision. I decided to extend the deadline, but I had to stretch the team\u2019s resources even further and increase the budget. Although it was a risky decision, I ultimately decided to go ahead with it to ensure that the project was completed on time and that the client\u2019s expectations were met. The project was eventually successfully completed and this was seen as a testament to my leadership and decision-making abilities." + } + { + "text": "Identify the odd one out.", + "input": "Twitter, Instagram, Telegram", + "output": "Telegram" + } + { + "text": "Explain why the following fraction is equivalent to 1/4", + "input": "4/16", + "output": "The fraction 4/16 is equivalent to 1/4 because both numerators and denominators are divisible by 4. Dividing both the top and bottom numbers by 4 yields the fraction 1/4." + } + { + "text": "Write a short story in third person narration about a protagonist who has to make an important career decision.", + "input": "", + "output": "John was at a crossroads in his life. He had just graduated college and was now facing the big decision of what career to pursue. After much deliberation, he decided that he wanted to be an accountant and help the financially disadvantaged. He had always been good with numbers and enjoyed seeing the tangible results of his work. \n\nJohn enrolled in accounting courses and initially found it quite challenging. He had to learn multiple systems and regulations quickly, but he worked hard and eventually excelled in his studies. After a few years, John started working at an accounting firm in his city. He was eager to put his knowledge of taxes and accounting to use in a real-world setting.\n\nJohn loved his job, as it let him express his creativity in finding strategies to save his clients money. After a few years at the firm, he became a senior accountant and was asked to manage bigger and more challenging cases. He was now a respected figure in the financial industry, but he still remembers when he was just a recent college graduate, unsure of the direction in which his life would take him." + } + { + "text": "Render a 3D model of a house", + "input": "", + "output": " This type of text cannot be fulfilled by a GPT model." + } + { + "text": "Evaluate this sentence for spelling and grammar mistakes", + "input": "He finnished his meal and left the resturant", + "output": "He finished his meal and left the restaurant." + } + { + "text": "How did Julius Caesar die?", + "input": "", + "output": "Julius Caesar was assassinated by a group of up to 60 conspirators, led by Gaius Cassius Longinus and Marcus Junius Brutus, in the Senate House on the Ides of March (15 March) of 44 BC." + } + { + "text": "What is the capital of France?", + "input": "", + "output": "The capital of France is Paris." + } + { + "text": "Generate a list of ten items a person might need for a camping trip", + "input": "", + "output": "1. Tent\n2. Sleeping bags\n3. Flashlight\n4. Matches/lighter\n5. Insect repellent\n6. Extra clothes\n7. First aid kit\n8. Camp stove\n9. Cooking utensils\n10. Biodegradable soaps/shampoos" + } + { + "text": "Discuss the causes of the Great Depression", + "input": "", + "output": "The Great Depression was a severe global economic downturn that lasted from 1929 to 1939. It was caused by several factors, including the stock market crash of 1929, structural weaknesses in the economy, low consumer spending, high unemployment, and a lack of confidence in the banking system. Other contributing factors included overproduction of commodities, reliance on borrowing to finance investments, agricultural overproduction, and a decrease in international trade." + } + { + "text": "Classify the following into animals, plants, and minerals", + "input": "Oak tree, copper ore, elephant", + "output": "Oak tree: Plant\n Copper ore: Mineral\n Elephant: Animal" + } + { + "text": "Explain the use of word embeddings in Natural Language Processing", + "input": "", + "output": "Word embeddings are one of the most powerful tools available for Natural Language Processing (NLP). They are mathematical representations of words or phrases in a vector space, allowing similarities between words and the context in which they are used to be measured. Word embeddings are useful for tasks such as sentiment analysis, text classification, predicting the next word in a sequence, and understanding synonyms and analogies. They allow for words to be processed as numerical values, giving machines an easier way to perform NLP tasks." + } + { + "text": "Describe the function of a computer motherboard", + "input": "", + "output": "A computer motherboard is the main circuit board in a computer. It is the backbone of the system, connecting all other components such as the processor, RAM, storage, graphics card and peripheral ports. It provides communication pathways between all of these components, allowing them to interact and work together. The motherboard is responsible for regulating power, and also houses slots for expansion cards, as well as ports for USB, audio, Ethernet, and other connection types." + } + { + "text": "Reverse engineer this code to create a new version", + "input": "def factorialize(num):\n factorial = 1\n for i in range(1, num):\n factorial *= i\n \n return factorial", + "output": "def factorialize(num):\n factorial = 1\n for i in range(num, 0, -1):\n factorial *= i\n \n return factorial" + } + { + "text": "Propose an ethical solution to the problem of data privacy", + "input": "", + "output": "One ethical solution to the problem of data privacy is to implement an opt-in data collection policy. This means that companies cannot collect and use personal data without the explicit permission of the data owners. This approach gives users the power to determine which data companies can use and for what purpose. Companies should also be transparent about how they use the collected data, so that users can make informed decisions and have control over their own data." + } + { + "text": "Generate three verbs that mean the same as \"to apologize\"", + "input": "", + "output": "Acknowledge, express regret, make amends." + } + { + "text": "What does DNA stand for?", + "input": "", + "output": "DNA stands for deoxyribonucleic acid." + } + { + "text": "Compare and contrast the Cuban Missile Crisis and the Vietnam War.", + "input": "", + "output": "The Cuban Missile Crisis and the Vietnam War were two of the largest and deadliest conflicts of the 20th century. The Cuban Missile Crisis involved a standoff between the United States and the Soviet Union over Soviet nuclear missile bases in Cuba, while the Vietnam War was a prolonged military conflict in Indochina between the North, which was backed by the Soviet Union and the South, which was backed by the United States. The Cuban Missile Crisis unfolded over a period of 13 days in October 1962, with the resolution coming in the form of a diplomatic agreement. The Vietnam War lasted for 19 years, beginning in 1954 and ending in 1973, with the signing of the Paris Peace Accords. While both of these conflicts were Cold War related, they were also very different in terms of causes, objectives, strategic objectives and outcomes. The Cuban Missile Crisis did not result in any direct military conflict and was resolved primarily through diplomacy and the decision to withdraw nuclear weapons from Cuba. On the other hand, the Vietnam War was a prolonged series of battles resulting in significant losses for both sides. The Vietnam War also had a significant impact on the United States, as it highlighted the realities of a prolonged conflict and the limits of U.S. military intervention. Additionally, the Vietnam War also drastically changed the way public opinion viewed the military and the use of force." + } + { + "text": "Generate a list of random words.", + "input": "", + "output": "sneeze, conflict, ancestor, thunder, companion, amulet, forge, serpent, magnify, jester, cascade, universe, surge, nausea, oasis, anomaly, pursue, rapture, pagoda, mosaic." + } + { + "text": "Transcribe the recording into text.", + "input": "Recording about the history of Waterloo", + "output": "The Battle of Waterloo, fought on June 18, 1815, was the last major battle of the Napoleonic Wars. It marked the end of Napoleon Bonaparte's rule. The battle was fought between Napoleon Bonaparte's French forces and an allied force of British, Dutch, Belgian and Prussian troops, led by the Duke of Wellington. It was fought near the town of Waterloo in the former Austrian Netherlands, now present-day Belgium. The battle was a defeat for Napoleon's forces and ended Napoleon's rule over France. Napoleon himself was exiled to St. Helena, where he would die in 1821. The battle marked the end of French domination of Europe and the beginning of a period of peace in the region. It was also the last major battle of the Napoleonic Wars and is remembered as one of the most important battles in history. The victory of the allied forces at Waterloo helped to bring an end to French imperial ambitions in Europe. It also established the British as one of the most powerful nations in Europe. The battle was fought with a combination of infantry, cavalry, and artillery tactics, and showed the beginning of the changing nature of warfare. Although the French forces greatly outnumbered the allies, their strategic and tactical mistake led to the loss of the battle. This defeat signaled the end of French imperial power in Europe. The legacy of Waterloo still stands today, and it cemented the British Empire's position for the next one hundred years." + } + { + "text": "Who is the world's most famous painter?", + "input": "", + "output": "The world's most famous painter is generally considered to be the Italian renaissance artist Leonardo da Vinci. His work includes the Mona Lisa and The Last Supper, both of which are considered masterpieces. Da Vinci was also a master of science, inventing the helicopter and the tank, among other inventions. He was also a prolific musician, sculptor, and architect. His work has inspired generations of artists and continues to be revered today." + } + { + "text": "Explain the concept of cogging torque.", + "input": "", + "output": "Cogging torque is the torque that is applied to a rotating device when the device experiences resistance from the teeth of the gears that are connected to the device. Cogging torque occurs due to the reaction force generated when the teeth of the gear drive mesh. This force pushes against each other, resulting in a drag force that is transmitted from the gears to the shaft. The magnitude of the cogging torque can vary depending on the size and number of teeth, as well as the gear's material and design. Cogging torque is undesirable in most motorized applications, as it can lead to loss of efficiency. To reduce cogging torque, gear designs are often symmetrical or have certain tooth profiles which reduce the force of contact between the teeth when they come into contact with each other." + } + { + "text": "Look up the boiling point of water.", + "input": "", + "output": "The boiling point of water is 100\u00b0C (212\u00b0F), at standard atmospheric pressure. When heated at higher pressures, water's boiling point increases. For example, water will boil at a temperature of 115\u00b0C (239\u00b0F) at an atmospheric pressure of 1 bar. At higher pressures, such as those experienced at the bottom of the ocean, the boiling point of water can be as high as 374\u00b0C (705\u00b0F)." + } + { + "text": "Describe the life and reign of King Charles II.", + "input": "", + "output": "King Charles II ascended to the throne of England following the English Civil War and the death of his father, Charles I. He was crowned in 1661 and his reign lasted until his death in 1685. Charles II was also a patron of the arts, working with some of the greatest painters and sculptors of his day. He also encouraged theater and sports and had an interest in sciences such as astronomy and natural history. Charles II was a moderate ruler who respected the civil and religious liberties of his citizens. He was tolerant of other religions and gave Jews and Catholics more rights in the kingdom. Charles II was also a strong supporter of the Royal Navy, which saw England increase its power in the world. He also promoted the development of industry and commerce, which helped to modernize the country. Charles II's reign was an important step in restoring the monarchy of England after the turmoil of the English Civil War." + } + { + "text": "Find the area of a circle given its radius.", + "input": "Radius = 4", + "output": "The area of a circle with a radius of 4 is equal to 12.5664 square units. This is calculated by using the formula A = \u03c0r2, where A is the area, \u03c0 is roughly equal to 3.1416 and r is the radius of the circle." + } diff --git a/demos/process_video_on_ray/configs/s3_video_processing_config.yaml b/demos/process_video_on_ray/configs/s3_video_processing_config.yaml new file mode 100644 index 0000000000..9459eba5e5 --- /dev/null +++ b/demos/process_video_on_ray/configs/s3_video_processing_config.yaml @@ -0,0 +1,110 @@ +# S3 Video Processing Configuration Example +# Complete workflow: Download from S3 -> Process -> Upload back to S3 + +# Global parameters +project_name: "s3-video-processing-demo" +executor_type: "ray" +ray_address: "auto" + +# Working directory +work_dir: "./outputs/s3_demo" + +# Dataset configuration +dataset: + configs: + - type: remote + source: s3 + path: s3://your-bucket/dj/dataset/process_video_on_ray/demo-dataset-s3.jsonl + # aws_access_key_id: YOUR_KEY # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) + # aws_secret_access_key: YOUR_SECRET + aws_region: us-east-1 + endpoint_url: null # Optional, for S3-compatible object storage + +# Export configuration - metadata export to S3 +export_path: "s3://your-bucket/dj/dataset/demo-processed" +export_type: "jsonl" + +# S3 credentials (for metadata export) +export_aws_credentials: + aws_access_key_id: "YOUR_AWS_ACCESS_KEY_ID" + aws_secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY" + aws_region: "us-east-1" + endpoint_url: "http://your-s3-endpoint:port" + +# Processing pipeline +process: + # ============================================ + # Step 1: Download videos from S3 to local + # ============================================ + - s3_download_file_mapper: + download_field: "videos" # Field containing S3 URLs + save_dir: "/tmp/dj_downloaded_videos" # Local download directory + resume_download: true # Skip already downloaded files + timeout: 300 # HTTP/HTTPS download timeout (seconds) + max_concurrent: 5 # Maximum concurrent downloads + # S3 credentials + aws_access_key_id: "YOUR_AWS_ACCESS_KEY_ID" + aws_secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY" + aws_region: "us-east-1" + endpoint_url: "http://your-s3-endpoint:port" + + # ============================================ + # Step 2: Video filtering - duration + # ============================================ + - video_duration_filter: + min_duration: 20 # Minimum duration (seconds) + max_duration: 100 # Maximum duration (seconds) + + # ============================================ + # Step 3: Video filtering - resolution + # ============================================ + - video_resolution_filter: + min_width: 200 # Minimum width (pixels) + max_width: 4096 # Maximum width (pixels) + min_height: 200 # Minimum height (pixels) + max_height: 4096 # Maximum height (pixels) + any_or_all: any # 'any' means satisfying any condition is sufficient + + # ============================================ + # Step 4: Split videos by duration + # ============================================ + - video_split_by_duration_mapper: + split_duration: 10 # Duration of each segment (seconds) + min_last_split_duration: 0 # Minimum duration of the last segment + keep_original_sample: false # Do not keep original sample + # Save location for processed videos + save_dir: "/tmp/dj_processed_videos" + + # ============================================ + # Step 5: Adjust video aspect ratio + # ============================================ + - video_resize_aspect_ratio_mapper: + min_ratio: 1.0 # Minimum aspect ratio + max_ratio: 1.1 # Maximum aspect ratio + strategy: increase # Adjustment strategy: increase size + + # ============================================ + # Step 6: Upload processed videos to S3 + # ============================================ + - s3_upload_file_mapper: + upload_field: "videos" # Field containing local file paths + s3_bucket: "your-bucket" # S3 bucket name + s3_prefix: "dj/processed_videos/" # S3 object prefix (folder) + remove_local: true # Delete local files after upload (save space) + skip_existing: true # Skip files already existing in S3 + max_concurrent: 10 # Maximum concurrent uploads + # S3 credentials + aws_access_key_id: "YOUR_AWS_ACCESS_KEY_ID" + aws_secret_access_key: "YOUR_AWS_SECRET_ACCESS_KEY" + aws_region: "us-east-1" + endpoint_url: "http://your-s3-endpoint:port" + +# ============================================ +# Optional: Other configurations +# ============================================ +# Keep statistics information +keep_stats_in_res_ds: true +keep_hashes_in_res_ds: false + +# Operator fusion (optimize performance) +op_fusion: false diff --git a/demos/process_video_on_ray/data/demo-dataset-s3.jsonl b/demos/process_video_on_ray/data/demo-dataset-s3.jsonl new file mode 100644 index 0000000000..e53bb36c56 --- /dev/null +++ b/demos/process_video_on_ray/data/demo-dataset-s3.jsonl @@ -0,0 +1,3 @@ +{"videos": ["s3://your-bucket/videos/video1.mp4"], "text": "<__dj__video> A sample video"} +{"videos": ["s3://your-bucket/videos/video2.mp4"], "text": "<__dj__video> Another sample video"} +{"videos": ["s3://your-bucket/videos/video3.mp4"], "text": "<__dj__video> Third sample video"} diff --git a/docs/Operators.md b/docs/Operators.md index 5772854cc4..164143acd0 100644 --- a/docs/Operators.md +++ b/docs/Operators.md @@ -40,15 +40,15 @@ The operators in Data-Juicer are categorized into 8 types. Data-Juicer 中的算子分为以下 8 种类型。 | Type 类型 | Number 数量 | Description 描述 | -|------|:---------:|-------------| -| [aggregator](#aggregator) | 4 | Aggregate for batched samples, such as summary or conclusion. 对批量样本进行汇总,如得出总结或结论。 | -| [deduplicator](#deduplicator) | 10 | Detects and removes duplicate samples. 识别、删除重复样本。 | -| [filter](#filter) | 54 | Filters out low-quality samples. 过滤低质量样本。 | -| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | -| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | -| [mapper](#mapper) | 96 | Edits and transforms samples. 对数据样本进行编辑和转换。 | -| [pipeline](#pipeline) | 3 | Combines multiple operators into a data processing pipeline. 将多个算子组合成数据处理流水线。 | -| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | +|------|:------:|-------------| +| [aggregator](#aggregator) | 4 | Aggregate for batched samples, such as summary or conclusion. 对批量样本进行汇总,如得出总结或结论。 | +| [deduplicator](#deduplicator) | 10 | Detects and removes duplicate samples. 识别、删除重复样本。 | +| [filter](#filter) | 54 | Filters out low-quality samples. 过滤低质量样本。 | +| [formatter](#formatter) | 8 | Discovers, loads, and canonicalizes source data. 发现、加载、规范化原始数据。 | +| [grouper](#grouper) | 3 | Group samples to batched samples. 将样本分组,每一组组成一个批量样本。 | +| [mapper](#mapper) | 98 | Edits and transforms samples. 对数据样本进行编辑和转换。 | +| [pipeline](#pipeline) | 3 | Applies dataset-level processing; both input and output are datasets. 执行数据集级别的操作,输入和输出均为完整数据集。 | +| [selector](#selector) | 5 | Selects top samples based on ranking. 基于排序选取高质量样本。 | All the specific operators are listed below, each featured with several capability tags. 下面列出所有具体算子,每种算子都通过多个标签来注明其主要功能。 @@ -247,6 +247,8 @@ All the specific operators are listed below, each featured with several capabili | remove_table_text_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove table texts from text samples. 映射器从文本样本中删除表文本。 | [info](operators/mapper/remove_table_text_mapper.md) | - | | remove_words_with_incorrect_substrings_mapper | 🔤Text 💻CPU 🟢Stable | Mapper to remove words containing specified incorrect substrings. 映射程序删除包含指定的不正确子字符串的单词。 | [info](operators/mapper/remove_words_with_incorrect_substrings_mapper.md) | - | | replace_content_mapper | 🔤Text 💻CPU 🟢Stable | Replaces content in the text that matches a specific regular expression pattern with a designated replacement string. 用指定的替换字符串替换与特定正则表达式模式匹配的文本中的内容。 | [info](operators/mapper/replace_content_mapper.md) | - | +| s3_download_file_mapper | 💻CPU 🟡Beta | Mapper to download files from S3 to local files or load them into memory. Mapper将文件从S3下载到本地文件或将其加载到内存中。 | - | - | +| s3_upload_file_mapper | 💻CPU 🟡Beta | Mapper to upload local files to S3 and update paths to S3 URLs. Mapper将本地文件上传到S3并更新S3 url的路径。 | - | - | | sdxl_prompt2prompt_mapper | 🔤Text 🚀GPU 🟢Stable | Generates pairs of similar images using the SDXL model. 使用SDXL模型生成成对的相似图像。 | [info](operators/mapper/sdxl_prompt2prompt_mapper.md) | - | | sentence_augmentation_mapper | 🔤Text 🚀GPU 🧩HF 🟢Stable | Augments sentences by generating enhanced versions using a Hugging Face model. 通过使用拥抱面部模型生成增强版本来增强句子。 | [info](operators/mapper/sentence_augmentation_mapper.md) | - | | sentence_split_mapper | 🔤Text 💻CPU 🟢Stable | Splits text samples into individual sentences based on the specified language. 根据指定的语言将文本样本拆分为单个句子。 | [info](operators/mapper/sentence_split_mapper.md) | - | diff --git a/pyproject.toml b/pyproject.toml index f32dec2c35..d113213930 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -120,6 +120,7 @@ distributed = [ "uvloop==0.21.0", # avoid async error before it's fixed in uvloop "pyspark==3.5.5", # distributed data processing "s3fs", # S3 filesystem support for cloud storage + "pyiceberg[pyarrow]", "bitarray", # efficient arrays of booleans ] diff --git a/tests/ops/mapper/test_s3_download_file_mapper.py b/tests/ops/mapper/test_s3_download_file_mapper.py new file mode 100644 index 0000000000..5103c7ba5f --- /dev/null +++ b/tests/ops/mapper/test_s3_download_file_mapper.py @@ -0,0 +1,210 @@ +import unittest +import os +import os.path as osp +import shutil +import tempfile +import numpy as np +from unittest.mock import MagicMock, patch + +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.utils.mm_utils import load_image, load_image_byte +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +# Assuming the operator is located at this path +from data_juicer.ops.mapper.s3_download_file_mapper import S3DownloadFileMapper + + +class S3DownloadFileMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + + # 1. Setup patching for boto3.client + self.patcher = patch("boto3.client") + self.mock_boto_client = self.patcher.start() + + # 2. Create the mock S3 client instance and link it + self.mock_s3 = MagicMock() + self.mock_boto_client.return_value = self.mock_s3 + + # 3. Setup temporary workspace and data paths + self.temp_dir = tempfile.mkdtemp() + self.data_path = osp.abspath(osp.join(osp.dirname(osp.realpath(__file__)), "..", "data")) + + # 4. Prepare test resources (local images and their S3 URLs) + self.img1_path = osp.join(self.data_path, "img1.png") + self.img1_s3_url = "s3://test-bucket/img1.png" + self.img1_content = load_image_byte(self.img1_path) + + self.img2_path = osp.join(self.data_path, "img2.jpg") + self.img2_s3_url = "s3://test-bucket/img2.jpg" + self.img2_content = load_image_byte(self.img2_path) + + os.environ["AWS_ACCESS_KEY_ID"] = "fake_id" + os.environ["AWS_SECRET_ACCESS_KEY"] = "fake_key" + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_SESSION_TOKEN"] = "fake_token" + + # 5. Define default mock behaviors for the S3 client + self._set_default_mock_behavior() + + def tearDown(self): + # IMPORTANT: Stop the patcher to clean up the global namespace + self.patcher.stop() + + if osp.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + del os.environ["AWS_ACCESS_KEY_ID"] + del os.environ["AWS_SECRET_ACCESS_KEY"] + del os.environ["AWS_REGION"] + del os.environ["AWS_SESSION_TOKEN"] + super().tearDown() + + def _set_default_mock_behavior(self): + """Helper to set up standard successful S3 responses.""" + + # Default behavior for get_object (memory download) + def mock_get_object(Bucket, Key): + content = self.img1_content if "img1" in Key else self.img2_content + return {"Body": MagicMock(read=lambda: content)} + + self.mock_s3.get_object.side_effect = mock_get_object + + # Default behavior for download_file (disk download) + def mock_download_file(Bucket, Key, Filename): + source_path = self.img1_path if "img1" in Key else self.img2_path + shutil.copy(source_path, Filename) + + self.mock_s3.download_file.side_effect = mock_download_file + + def _run_op_and_verify(self, ds_list, op_params): + """Unified runner to execute the operator and return results.""" + op = S3DownloadFileMapper(**op_params) + dataset = Dataset.from_list(ds_list) + dataset = dataset.map(op.process, batch_size=2) + + res_list = dataset.to_list() + return sorted(res_list, key=lambda x: x["id"]) + + def test_s3_download_to_dir(self): + """Test standard S3 download to a local directory.""" + ds_list = [{"images": [self.img1_s3_url], "id": 1}, {"images": [self.img2_s3_url], "id": 2}] + + params = { + "save_dir": self.temp_dir, + "download_field": "images", + "aws_access_key_id": "fake_id", + "aws_secret_access_key": "fake_key", + } + + res_list = self._run_op_and_verify(ds_list, params) + + for res in res_list: + r_path = res["images"][0] + self.assertEqual(osp.dirname(r_path), self.temp_dir) + self.assertTrue(osp.exists(r_path)) + # Data integrity check + actual_img = np.array(load_image(r_path)) + self.assertIsNotNone(actual_img) + + def test_s3_download_to_memory(self): + """Test downloading content directly into a field (bytes).""" + ds_list = [ + {"images": self.img1_s3_url, "id": 1}, + ] + + params = { + "download_field": "images", + "save_field": "image_bytes", + "aws_access_key_id": "fake_id", + "aws_secret_access_key": "fake_key", + } + + res_list = self._run_op_and_verify(ds_list, params) + + self.assertEqual(len(res_list), 1) + self.assertIn("image_bytes", res_list[0]) + self.assertEqual(res_list[0]["image_bytes"], self.img1_content) + + def test_s3_resume_logic_adaptive(self): + """Test Resume Download: Verify adaptive skip logic ensures + existing files do not trigger S3 downloads.""" + os.makedirs(self.temp_dir, exist_ok=True) + + local_file_1 = osp.join(self.temp_dir, "img1.png") + dummy_content = b"existing_local_data" + + with open(local_file_1, "wb") as f: + f.write(dummy_content) + + img1_url = "s3://bucket/img1.png" + img2_url = "s3://bucket/img2.png" + + self.mock_s3.download_file.reset_mock() + + ds_list = [{"images": [img1_url, img2_url], "id": 1}] + params = { + "save_dir": self.temp_dir, + "download_field": "images", + "resume_download": True, + } + + self._run_op_and_verify(ds_list, params) + + # Verify point A: img1 should be skipped (exists) + self.assertEqual(self.mock_s3.download_file.call_count, 1) + + # Verify point B: Ensure img2 was downloaded + self.assertTrue(osp.exists(osp.join(self.temp_dir, "img2.png"))) + + # Verify point C: Ensure img1's content has not been altered (verify resume is effective) + with open(local_file_1, "rb") as f: + self.assertEqual(f.read(), dummy_content) + + def test_s3_resume_logic(self): + """Test that existing local files are skipped when resume_download=True.""" + # 1. Pre-create a dummy file to simulate an existing download + os.makedirs(self.temp_dir, exist_ok=True) + local_file = osp.join(self.temp_dir, "img1.png") + dummy_content = b"old_data" + with open(local_file, "wb") as f: + f.write(dummy_content) + + # 2. Reset mock to track new calls + self.mock_s3.download_file.reset_mock() + + ds_list = [{"images": [self.img1_s3_url], "id": 1}] + params = { + "save_dir": self.temp_dir, + "download_field": "images", + "resume_download": True, + "aws_access_key_id": "fake_id", + "aws_secret_access_key": "fake_key", + } + + self._run_op_and_verify(ds_list, params) + + # 3. Verification: download_file should NOT have been called + self.assertFalse(self.mock_s3.download_file.called) + with open(local_file, "rb") as f: + self.assertEqual(f.read(), dummy_content) + + def test_s3_download_failure(self): + """Test handling of S3 client errors (e.g., file not found).""" + # Override default behavior to simulate failure + self.mock_s3.download_file.side_effect = Exception("S3 Object Not Found") + + ds_list = [{"images": [self.img1_s3_url], "id": 1}] + params = { + "save_dir": self.temp_dir, + "download_field": "images", + } + + res_list = self._run_op_and_verify(ds_list, params) + + # According to operator logic, it should return the original URL on failure + self.assertEqual(res_list[0]["images"][0], self.img1_s3_url) + + +if __name__ == "__main__": + unittest.main() diff --git a/tests/ops/mapper/test_s3_upload_file_mapper.py b/tests/ops/mapper/test_s3_upload_file_mapper.py new file mode 100644 index 0000000000..4670ef27f7 --- /dev/null +++ b/tests/ops/mapper/test_s3_upload_file_mapper.py @@ -0,0 +1,211 @@ +import unittest +import os +import shutil +import tempfile +from unittest.mock import MagicMock, patch + +import botocore.exceptions +from data_juicer.core.data import NestedDataset as Dataset +from data_juicer.utils.unittest_utils import DataJuicerTestCaseBase + +# Ensure the path points to your operator file +from data_juicer.ops.mapper.s3_upload_file_mapper import S3UploadFileMapper + + +class S3UploadFileMapperTest(DataJuicerTestCaseBase): + + def setUp(self): + super().setUp() + + # 1. Patch boto3 client + self.patcher = patch("boto3.client") + self.mock_boto = self.patcher.start() + + # 2. Setup Mock S3 object + self.mock_s3 = MagicMock() + self.mock_boto.return_value = self.mock_s3 + + # [Core Fix]: Default head_object to raise 404 (Not Found). + # This ensures that by default, files are considered "new" and upload_file is called. + error_response = {"Error": {"Code": "404", "Message": "Not Found"}} + self.mock_s3.head_object.side_effect = botocore.exceptions.ClientError(error_response, "HeadObject") + + # 3. Create temporary workspace + self.temp_dir = tempfile.mkdtemp() + + # 4. Create dummy local files + self.file1_name = "file1.txt" + self.file1_path = os.path.join(self.temp_dir, self.file1_name) + with open(self.file1_path, "w") as f: + f.write("content 1") + + self.file2_name = "file2.jpg" + self.file2_path = os.path.join(self.temp_dir, self.file2_name) + with open(self.file2_path, "w") as f: + f.write("content 2") + + # 5. Set AWS credentials in environment variables + os.environ["AWS_ACCESS_KEY_ID"] = "test_key_id" + os.environ["AWS_SECRET_ACCESS_KEY"] = "test_secret_key" + os.environ["AWS_REGION"] = "us-east-1" + os.environ["AWS_SESSION_TOKEN"] = "test_token" + + # 6. Common parameters + self.bucket = "test-bucket" + self.prefix = "dataset/" + self.base_params = { + "upload_field": "files", + "s3_bucket": self.bucket, + "s3_prefix": self.prefix, + } + + def tearDown(self): + self.patcher.stop() + if os.path.exists(self.temp_dir): + shutil.rmtree(self.temp_dir) + + # Clean up environment variables + del os.environ["AWS_ACCESS_KEY_ID"] + del os.environ["AWS_SECRET_ACCESS_KEY"] + del os.environ["AWS_REGION"] + del os.environ["AWS_SESSION_TOKEN"] + super().tearDown() + + def _run_op(self, ds_list, **kwargs): + """Helper function to run the operator.""" + params = self.base_params.copy() + params.update(kwargs) + op = S3UploadFileMapper(**params) + + dataset = Dataset.from_list(ds_list) + dataset = dataset.map(op.process, batch_size=2) + res_list = dataset.to_list() + return sorted(res_list, key=lambda x: x["id"]) + + def test_upload_basic(self): + """Test basic upload: local path should be updated to S3 URL.""" + ds_list = [{"files": [self.file1_path], "id": 1}] + + res_list = self._run_op(ds_list) + + # Expected S3 URL + expected_url = f"s3://{self.bucket}/{self.prefix}{self.file1_name}" + + # 1. Verify path update + self.assertEqual(res_list[0]["files"][0], expected_url) + + # 2. Verify boto3 upload_file was called + self.mock_s3.upload_file.assert_called_with(self.file1_path, self.bucket, self.prefix + self.file1_name) + + def test_upload_nested_structure(self): + """Test nested list structure preservation (List of multiple files).""" + # Fix: Input should be a list of strings [path1, path2], not list of lists [[p1], [p2]] + # The operator expects the elements inside the list to be file paths (strings). + ds_list = [{"files": [self.file1_path, self.file2_path], "id": 1}] + + res_list = self._run_op(ds_list) + + res_files = res_list[0]["files"] + expected_url1 = f"s3://{self.bucket}/{self.prefix}{self.file1_name}" + expected_url2 = f"s3://{self.bucket}/{self.prefix}{self.file2_name}" + + # 1. Verify it remains a list + self.assertIsInstance(res_files, list) + + # 2. Verify elements are now strings (S3 URLs) + self.assertIsInstance(res_files[0], str) + self.assertIsInstance(res_files[1], str) + + # 3. Verify content + self.assertEqual(res_files[0], expected_url1) + self.assertEqual(res_files[1], expected_url2) + + def test_skip_existing_true(self): + """Test skip_existing=True: do not upload if file exists on S3.""" + # [Core Modification]: Simulate file exists (head_object returns success) + self.mock_s3.head_object.side_effect = None + + ds_list = [{"files": [self.file1_path], "id": 1}] + + res_list = self._run_op(ds_list, skip_existing=True) + + # Verify path is still updated to S3 URL + expected_url = f"s3://{self.bucket}/{self.prefix}{self.file1_name}" + self.assertEqual(res_list[0]["files"][0], expected_url) + + # Key Verification: upload_file should NOT be called + self.mock_s3.upload_file.assert_not_called() + # head_object should be called + self.mock_s3.head_object.assert_called() + + def test_skip_existing_false(self): + """Test skip_existing=False: force upload even if file exists on S3.""" + # Simulate file exists + self.mock_s3.head_object.side_effect = None + + ds_list = [{"files": [self.file1_path], "id": 1}] + + self._run_op(ds_list, skip_existing=False) + + # Key Verification: even if file exists, skip_existing=False forces upload + self.mock_s3.upload_file.assert_called() + + def test_remove_local_after_upload(self): + """Test remove_local=True: delete local file after successful upload.""" + ds_list = [{"files": [self.file1_path], "id": 1}] + + # Confirm file exists initially + self.assertTrue(os.path.exists(self.file1_path)) + + self._run_op(ds_list, remove_local=True) + + # Verify boto3 was called + self.mock_s3.upload_file.assert_called() + + # Verify local file is deleted + self.assertFalse(os.path.exists(self.file1_path)) + + def test_already_s3_url(self): + """Test input is already S3 URL: keep as is and do not upload.""" + existing_url = "s3://other-bucket/file.txt" + ds_list = [{"files": [existing_url], "id": 1}] + + self.mock_s3.upload_file.reset_mock() + + res_list = self._run_op(ds_list) + + # Verify URL unchanged + self.assertEqual(res_list[0]["files"][0], existing_url) + + # Verify no upload occurred + self.mock_s3.upload_file.assert_not_called() + + def test_local_file_not_found(self): + """Test local file not found.""" + missing_path = os.path.join(self.temp_dir, "ghost.txt") + ds_list = [{"files": [missing_path], "id": 1}] + + res_list = self._run_op(ds_list) + + # Should preserve original path on failure + self.assertEqual(res_list[0]["files"][0], missing_path) + + # Verify no upload called + self.mock_s3.upload_file.assert_not_called() + + def test_upload_failure(self): + """Test S3 upload exception handling.""" + # Simulate upload failure + self.mock_s3.upload_file.side_effect = Exception("Network Error") + + ds_list = [{"files": [self.file1_path], "id": 1}] + + # Run op + res_list = self._run_op(ds_list) + + # Should preserve local path on failure + self.assertEqual(res_list[0]["files"][0], self.file1_path) + + +if __name__ == "__main__": + unittest.main()