From b54eb3ad9807c9fdd0098ed1a8f737a51bf90ccc Mon Sep 17 00:00:00 2001 From: Dludora Date: Sun, 28 Dec 2025 21:40:35 +0800 Subject: [PATCH 01/18] feat: support hdfs and iceberg --- data_juicer/core/data/load_strategy.py | 264 +++++++++++++++++++++++++ 1 file changed, 264 insertions(+) diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index cdd6417ed2..1da4a76240 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -376,6 +376,131 @@ def load_data(self, **kwargs): raise NotImplementedError("ModelScope data load strategy is not implemented") +@DataLoadStrategyRegistry.register("default", "remote", "hdfs") +class DefaultHDFSDataLoadStrategy(DefaultDataLoadStrategy): + """ + data load strategy for HDFS datasets for LocalExecutor + Uses fsspec-compatible storage_options passed through huggingface datasets + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": ["host", "port", "user", "kerb_ticket", "extra_conf"], + "field_types": {"path": str}, + "custom_validators": { + "path": lambda x: x.startswith("hdfs://"), + }, + } + + def _build_storage_options(self): + storage_options = {} + if "host" in self.ds_config: + storage_options["host"] = self.ds_config["host"] + if "port" in self.ds_config: + storage_options["port"] = self.ds_config["port"] + if "user" in self.ds_config: + storage_options["user"] = self.ds_config["user"] + if "kerb_ticket" in self.ds_config: + storage_options["kerb_ticket"] = self.ds_config["kerb_ticket"] + if "extra_conf" in self.ds_config and isinstance(self.ds_config["extra_conf"], dict): + storage_options.update(self.ds_config["extra_conf"]) + return storage_options + + def load_data(self, **kwargs): + import datasets + + from data_juicer.core.data import NestedDataset + + path = self.ds_config["path"] + load_data_np = kwargs.get("num_proc", 1) + + # Get config values with defaults + text_keys = getattr(self.cfg, "text_keys", ["text"]) + + # Determine file format from extension + file_extension = os.path.splitext(path)[1].lower() + file_extension_map = { + ".json": "json", + ".jsonl": "json", + ".txt": "text", + ".csv": "csv", + ".tsv": "csv", + ".parquet": "parquet", + } + data_format = file_extension_map.get(file_extension, "json") + + storage_options = self._build_storage_options() + + try: + ds = datasets.load_dataset( + data_format, + data_files=path, + storage_options=storage_options, + **kwargs, + ) + # Handle DatasetDict (multiple splits) vs Dataset (single) + if isinstance(ds, datasets.DatasetDict): + ds = NestedDataset(datasets.concatenate_datasets([d for d in ds.values()])) + else: + ds = NestedDataset(ds) + + # Unify format + ds = unify_format(ds, text_keys=text_keys, num_proc=load_data_np, global_cfg=self.cfg) + return ds + except Exception as e: + raise RuntimeError( + f"Failed to load dataset from HDFS path {path}. " + f"Ensure fsspec hdfs dependencies and HDFS configs are available. " + f"Error: {str(e)}" + ) + + +@DataLoadStrategyRegistry.register("default", "remote", "iceberg") +class DefaultIcebergDataLoadStrategy(DefaultDataLoadStrategy): + """ + data load strategy for Iceberg tables for LocalExecutor + Relies on pyiceberg to read the table and converts to HF Dataset + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table"], + "optional_fields": ["catalog", "catalog_props"], + "field_types": {"table": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + import datasets + + from data_juicer.core.data import NestedDataset + + text_keys = getattr(self.cfg, "text_keys", ["text"]) + table_id = self.ds_config["table"] + catalog_name = self.ds_config.get("catalog", None) + catalog_props = self.ds_config.get("catalog_props", {}) or {} + + try: + from pyiceberg.catalog import load_catalog + + catalog = load_catalog(catalog_name) if catalog_name else load_catalog() + table = ( + catalog.load_table(table_id, properties=catalog_props) + if catalog_props + else catalog.load_table(table_id) + ) + arrow_table = table.scan().to_arrow() + ds = datasets.Dataset.from_table(arrow_table) + ds = NestedDataset(ds) + ds = unify_format(ds, text_keys=text_keys, num_proc=kwargs.get("num_proc", 1), global_cfg=self.cfg) + return ds + except Exception as e: + raise RuntimeError( + f"Failed to load Iceberg table {table_id}. " + f"Ensure pyiceberg is installed and catalog configs are correct. " + f"Error: {str(e)}" + ) + + @DataLoadStrategyRegistry.register("default", "remote", "arxiv") class DefaultArxivDataLoadStrategy(DefaultDataLoadStrategy): """ @@ -543,6 +668,145 @@ def load_data(self, **kwargs): ) +@DataLoadStrategyRegistry.register("ray", "remote", "hdfs") +class RayHDFSDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for HDFS datasets for RayExecutor + Uses PyArrow HadoopFileSystem to read from HDFS + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": ["host", "port", "user", "kerb_ticket", "extra_conf"], + "field_types": {"path": str}, + "custom_validators": { + "path": lambda x: x.startswith("hdfs://"), + }, + } + + def _create_hdfs_fs(self): + import pyarrow.fs as fs + + host = self.ds_config.get("host", None) + port = self.ds_config.get("port", None) + user = self.ds_config.get("user", None) + kerb_ticket = self.ds_config.get("kerb_ticket", None) + extra_conf = self.ds_config.get("extra_conf", None) + return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + path = self.ds_config["path"] + hdfs_fs = self._create_hdfs_fs() + + logger.info(f"Loading dataset from HDFS: {path}") + + file_extension_map = { + ".json": "json", + ".jsonl": "json", + ".txt": "text", + ".csv": "csv", + ".tsv": "csv", + ".parquet": "parquet", + ".npy": "numpy", + ".tfrecords": "tfrecords", + ".lance": "lance", + } + + auto_detect = False + data_source = self.ds_config.get("source", None) + if data_source is None: + auto_detect = True + else: + suffix = os.path.splitext(data_source)[1] + if suffix in file_extension_map: + data_format = file_extension_map[suffix] + elif "." + data_source in file_extension_map: + data_format = file_extension_map["." + data_source] + else: + auto_detect = True + + if auto_detect: + file_extension = os.path.splitext(path)[1] + data_format = file_extension_map.get(file_extension, "parquet") + logger.info(f"Auto-detected data format: {data_format}") + else: + logger.info(f"Using specified data format: {data_format}") + + try: + import ray.data + + if data_format in {"json", "jsonl"}: + from data_juicer.core.data.ray_dataset import read_json_stream + + dataset = read_json_stream(path, filesystem=hdfs_fs) + elif data_format == "parquet": + dataset = ray.data.read_parquet(path, filesystem=hdfs_fs) + elif data_format == "csv": + dataset = ray.data.read_csv(path, filesystem=hdfs_fs) + elif data_format == "text": + dataset = ray.data.read_text(path, filesystem=hdfs_fs) + elif data_format == "numpy": + dataset = ray.data.read_numpy(path, filesystem=hdfs_fs) + elif data_format == "tfrecords": + dataset = ray.data.read_tfrecords(path, filesystem=hdfs_fs) + elif data_format == "lance": + dataset = ray.data.read_lance(path, filesystem=hdfs_fs) + else: + raise ValueError(f"Unsupported data format for HDFS: {data_format}") + + return RayDataset(dataset, dataset_path=path, cfg=self.cfg) + except Exception as e: + raise RuntimeError( + f"Failed to load {data_format} data from HDFS path {path}. " + f"Ensure Hadoop native libs and configs are available. " + f"Error: {str(e)}" + ) + + +@DataLoadStrategyRegistry.register("ray", "remote", "iceberg") +class RayIcebergDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Iceberg tables for RayExecutor + Uses ray.data.read_iceberg backed by pyiceberg + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table"], + "optional_fields": ["catalog", "catalog_props"], + "field_types": {"table": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_id = self.ds_config["table"] + catalog_name = self.ds_config.get("catalog", None) + catalog_props = self.ds_config.get("catalog_props", {}) or {} + + read_kwargs = {} + if catalog_name: + read_kwargs["catalog"] = catalog_name + if catalog_props: + read_kwargs["catalog_properties"] = catalog_props + + logger.info(f"Loading Iceberg table via Ray: {table_id}") + + try: + import ray.data + + dataset = ray.data.read_iceberg(table_id, **read_kwargs) + return RayDataset(dataset, dataset_path=table_id, cfg=self.cfg) + except Exception as e: + raise RuntimeError( + f"Failed to load Iceberg table {table_id} with Ray. " + f"Ensure pyiceberg and catalog configs are available. " + f"Error: {str(e)}" + ) + + @DataLoadStrategyRegistry.register("ray", "remote", "s3") class RayS3DataLoadStrategy(RayDataLoadStrategy): """ From 06f034a9a8ed0c26a9172da4c252a295855f5290 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 29 Dec 2025 07:07:01 +0000 Subject: [PATCH 02/18] feat: add load data from hdfs source --- data_juicer/core/data/load_strategy.py | 179 ++++++++----------------- 1 file changed, 58 insertions(+), 121 deletions(-) diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index 1da4a76240..68bf73393d 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -392,33 +392,29 @@ class DefaultHDFSDataLoadStrategy(DefaultDataLoadStrategy): }, } - def _build_storage_options(self): - storage_options = {} - if "host" in self.ds_config: - storage_options["host"] = self.ds_config["host"] - if "port" in self.ds_config: - storage_options["port"] = self.ds_config["port"] - if "user" in self.ds_config: - storage_options["user"] = self.ds_config["user"] - if "kerb_ticket" in self.ds_config: - storage_options["kerb_ticket"] = self.ds_config["kerb_ticket"] - if "extra_conf" in self.ds_config and isinstance(self.ds_config["extra_conf"], dict): - storage_options.update(self.ds_config["extra_conf"]) - return storage_options + def _create_hdfs_fs(self): + import pyarrow.fs as fs + + host = self.ds_config.get("host", None) + port = self.ds_config.get("port", None) + if port is not None: + port = int(port) + user = self.ds_config.get("user", None) + kerb_ticket = self.ds_config.get("kerb_ticket", None) + extra_conf = self.ds_config.get("extra_conf", None) + return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) def load_data(self, **kwargs): - import datasets + from urllib.parse import urlparse from data_juicer.core.data import NestedDataset path = self.ds_config["path"] load_data_np = kwargs.get("num_proc", 1) - - # Get config values with defaults text_keys = getattr(self.cfg, "text_keys", ["text"]) - # Determine file format from extension - file_extension = os.path.splitext(path)[1].lower() + file_path = urlparse(path).path + file_extension = os.path.splitext(file_path)[1].lower() file_extension_map = { ".json": "json", ".jsonl": "json", @@ -429,74 +425,52 @@ def load_data(self, **kwargs): } data_format = file_extension_map.get(file_extension, "json") - storage_options = self._build_storage_options() + hdfs = self._create_hdfs_fs() try: - ds = datasets.load_dataset( - data_format, - data_files=path, - storage_options=storage_options, - **kwargs, - ) - # Handle DatasetDict (multiple splits) vs Dataset (single) - if isinstance(ds, datasets.DatasetDict): - ds = NestedDataset(datasets.concatenate_datasets([d for d in ds.values()])) - else: - ds = NestedDataset(ds) - - # Unify format - ds = unify_format(ds, text_keys=text_keys, num_proc=load_data_np, global_cfg=self.cfg) - return ds - except Exception as e: - raise RuntimeError( - f"Failed to load dataset from HDFS path {path}. " - f"Ensure fsspec hdfs dependencies and HDFS configs are available. " - f"Error: {str(e)}" + with hdfs.open_input_stream(file_path) as stream: + + # Use ray.data functions directly with PyArrow filesystem support + # Ray's read functions support filesystem parameter via PyArrow + if data_format in {"json", "jsonl"}: + # For JSON, we need to use read_json_stream with filesystem + import pyarrow.json + + arrow_table = pyarrow.json.read_json(stream) + elif data_format == "parquet": + from pyarrow.parquet import read_table + + arrow_table = read_table(stream) + elif data_format in {"csv", "tsv"}: + import pyarrow.csv + + delimiter = "\t" if file_extension == ".tsv" else "," + parse_opts = pyarrow.csv.ParseOptions(delimiter=delimiter) + arrow_table = pyarrow.csv.read_csv(stream, parse_options=parse_opts) + elif data_format == "text": + import pyarrow.csv + + read_opts = pyarrow.csv.ReadOptions(column_names=["text"]) + parse_opts = pyarrow.csv.ParseOptions(delimiter="\0", quote_char=False) + arrow_table = pyarrow.csv.read_csv(stream, read_options=read_opts, parse_options=parse_opts) + else: + raise ValueError(f"Unsupported data format for hdfs: {file_extension}") + + dataset = datasets.Dataset(arrow_table) + dataset = NestedDataset(dataset) + dataset = unify_format( + dataset, + text_keys=text_keys, + num_proc=load_data_np, + global_cfg=self.cfg, ) + return dataset -@DataLoadStrategyRegistry.register("default", "remote", "iceberg") -class DefaultIcebergDataLoadStrategy(DefaultDataLoadStrategy): - """ - data load strategy for Iceberg tables for LocalExecutor - Relies on pyiceberg to read the table and converts to HF Dataset - """ - - CONFIG_VALIDATION_RULES = { - "required_fields": ["table"], - "optional_fields": ["catalog", "catalog_props"], - "field_types": {"table": str}, - "custom_validators": {}, - } - - def load_data(self, **kwargs): - import datasets - - from data_juicer.core.data import NestedDataset - - text_keys = getattr(self.cfg, "text_keys", ["text"]) - table_id = self.ds_config["table"] - catalog_name = self.ds_config.get("catalog", None) - catalog_props = self.ds_config.get("catalog_props", {}) or {} - - try: - from pyiceberg.catalog import load_catalog - - catalog = load_catalog(catalog_name) if catalog_name else load_catalog() - table = ( - catalog.load_table(table_id, properties=catalog_props) - if catalog_props - else catalog.load_table(table_id) - ) - arrow_table = table.scan().to_arrow() - ds = datasets.Dataset.from_table(arrow_table) - ds = NestedDataset(ds) - ds = unify_format(ds, text_keys=text_keys, num_proc=kwargs.get("num_proc", 1), global_cfg=self.cfg) - return ds except Exception as e: raise RuntimeError( - f"Failed to load Iceberg table {table_id}. " - f"Ensure pyiceberg is installed and catalog configs are correct. " + f"Failed to load {data_format} data from HDFS path {path}. " + f"Ensure Hadoop native libs and configs are available. " f"Error: {str(e)}" ) @@ -689,12 +663,16 @@ def _create_hdfs_fs(self): host = self.ds_config.get("host", None) port = self.ds_config.get("port", None) + if port is not None: + port = int(port) user = self.ds_config.get("user", None) kerb_ticket = self.ds_config.get("kerb_ticket", None) extra_conf = self.ds_config.get("extra_conf", None) return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) def load_data(self, **kwargs): + from urllib.parse import urlparse + from data_juicer.core.data.ray_dataset import RayDataset path = self.ds_config["path"] @@ -702,6 +680,7 @@ def load_data(self, **kwargs): logger.info(f"Loading dataset from HDFS: {path}") + path = urlparse(path).path file_extension_map = { ".json": "json", ".jsonl": "json", @@ -765,48 +744,6 @@ def load_data(self, **kwargs): ) -@DataLoadStrategyRegistry.register("ray", "remote", "iceberg") -class RayIcebergDataLoadStrategy(RayDataLoadStrategy): - """ - data load strategy for Iceberg tables for RayExecutor - Uses ray.data.read_iceberg backed by pyiceberg - """ - - CONFIG_VALIDATION_RULES = { - "required_fields": ["table"], - "optional_fields": ["catalog", "catalog_props"], - "field_types": {"table": str}, - "custom_validators": {}, - } - - def load_data(self, **kwargs): - from data_juicer.core.data.ray_dataset import RayDataset - - table_id = self.ds_config["table"] - catalog_name = self.ds_config.get("catalog", None) - catalog_props = self.ds_config.get("catalog_props", {}) or {} - - read_kwargs = {} - if catalog_name: - read_kwargs["catalog"] = catalog_name - if catalog_props: - read_kwargs["catalog_properties"] = catalog_props - - logger.info(f"Loading Iceberg table via Ray: {table_id}") - - try: - import ray.data - - dataset = ray.data.read_iceberg(table_id, **read_kwargs) - return RayDataset(dataset, dataset_path=table_id, cfg=self.cfg) - except Exception as e: - raise RuntimeError( - f"Failed to load Iceberg table {table_id} with Ray. " - f"Ensure pyiceberg and catalog configs are available. " - f"Error: {str(e)}" - ) - - @DataLoadStrategyRegistry.register("ray", "remote", "s3") class RayS3DataLoadStrategy(RayDataLoadStrategy): """ From c4c7f0b62dbc0e2e372a266cc97f60e1b11e7b91 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 29 Dec 2025 07:17:03 +0000 Subject: [PATCH 03/18] feat: demo for hdfs load --- .../config/process_from_hdfs.yaml | 76 +++++++++ .../data/demo-dataset.json | 147 ++++++++++++++++++ .../data/demo-dataset.jsonl | 146 +++++++++++++++++ 3 files changed, 369 insertions(+) create mode 100644 demos/process_diff_sources/config/process_from_hdfs.yaml create mode 100644 demos/process_diff_sources/data/demo-dataset.json create mode 100644 demos/process_diff_sources/data/demo-dataset.jsonl diff --git a/demos/process_diff_sources/config/process_from_hdfs.yaml b/demos/process_diff_sources/config/process_from_hdfs.yaml new file mode 100644 index 0000000000..d3787b16ca --- /dev/null +++ b/demos/process_diff_sources/config/process_from_hdfs.yaml @@ -0,0 +1,76 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo' + + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/demo-processed-ray' + +dataset: + configs: + - type: remote + source: hdfs + path: hdfs://your_hdfs_path/demo-dataset.jsonl + host: your_hdfs_host + port: 8020 + user: your_username + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/demos/process_diff_sources/data/demo-dataset.json b/demos/process_diff_sources/data/demo-dataset.json new file mode 100644 index 0000000000..9df0c2513e --- /dev/null +++ b/demos/process_diff_sources/data/demo-dataset.json @@ -0,0 +1,147 @@ +[ + { + "text": "Give three tips for staying healthy.", + "input": "", + "output": "1.Eat a balanced diet and make sure to include plenty of fruits and vegetables. \n2. Exercise regularly to keep your body active and strong. \n3. Get enough sleep and maintain a consistent sleep schedule." + }, + { + "text": "What are the three primary colors?", + "input": "", + "output": "The three primary colors are red, blue, and yellow." + }, + { + "text": "Describe the structure of an atom.", + "input": "", + "output": "An atom is made up of a nucleus, which contains protons and neutrons, surrounded by electrons that travel in orbits around the nucleus. The protons and neutrons have a positive charge, while the electrons have a negative charge, resulting in an overall neutral atom. The number of each particle determines the atomic number and the type of atom." + }, + { + "text": "How can we reduce air pollution?", + "input": "", + "output": "There are a number of ways to reduce air pollution, such as shifting to renewable energy sources, encouraging the use of public transportation, prohibiting the burning of fossil fuels, implementing policies to reduce emissions from industrial sources, and implementing vehicle emissions standards. Additionally, individuals can do their part to reduce air pollution by reducing car use, avoiding burning materials such as wood, and changing to energy efficient appliances." + }, + { + "text": "Describe a time when you had to make a difficult decision.", + "input": "", + "output": "I had to make a difficult decision when I was working as a project manager at a construction company. I was in charge of a project that needed to be completed by a certain date in order to meet the client\u2019s expectations. However, due to unexpected delays, we were not able to meet the deadline and so I had to make a difficult decision. I decided to extend the deadline, but I had to stretch the team\u2019s resources even further and increase the budget. Although it was a risky decision, I ultimately decided to go ahead with it to ensure that the project was completed on time and that the client\u2019s expectations were met. The project was eventually successfully completed and this was seen as a testament to my leadership and decision-making abilities." + }, + { + "text": "Identify the odd one out.", + "input": "Twitter, Instagram, Telegram", + "output": "Telegram" + }, + { + "text": "Explain why the following fraction is equivalent to 1/4", + "input": "4/16", + "output": "The fraction 4/16 is equivalent to 1/4 because both numerators and denominators are divisible by 4. Dividing both the top and bottom numbers by 4 yields the fraction 1/4." + }, + { + "text": "Write a short story in third person narration about a protagonist who has to make an important career decision.", + "input": "", + "output": "John was at a crossroads in his life. He had just graduated college and was now facing the big decision of what career to pursue. After much deliberation, he decided that he wanted to be an accountant and help the financially disadvantaged. He had always been good with numbers and enjoyed seeing the tangible results of his work. \n\nJohn enrolled in accounting courses and initially found it quite challenging. He had to learn multiple systems and regulations quickly, but he worked hard and eventually excelled in his studies. After a few years, John started working at an accounting firm in his city. He was eager to put his knowledge of taxes and accounting to use in a real-world setting.\n\nJohn loved his job, as it let him express his creativity in finding strategies to save his clients money. After a few years at the firm, he became a senior accountant and was asked to manage bigger and more challenging cases. He was now a respected figure in the financial industry, but he still remembers when he was just a recent college graduate, unsure of the direction in which his life would take him." + }, + { + "text": "Render a 3D model of a house", + "input": "", + "output": " This type of text cannot be fulfilled by a GPT model." + }, + { + "text": "Evaluate this sentence for spelling and grammar mistakes", + "input": "He finnished his meal and left the resturant", + "output": "He finished his meal and left the restaurant." + }, + { + "text": "How did Julius Caesar die?", + "input": "", + "output": "Julius Caesar was assassinated by a group of up to 60 conspirators, led by Gaius Cassius Longinus and Marcus Junius Brutus, in the Senate House on the Ides of March (15 March) of 44 BC." + }, + { + "text": "What is the capital of France?", + "input": "", + "output": "The capital of France is Paris." + }, + { + "text": "Generate a list of ten items a person might need for a camping trip", + "input": "", + "output": "1. Tent\n2. Sleeping bags\n3. Flashlight\n4. Matches/lighter\n5. Insect repellent\n6. Extra clothes\n7. First aid kit\n8. Camp stove\n9. Cooking utensils\n10. Biodegradable soaps/shampoos" + }, + { + "text": "Discuss the causes of the Great Depression", + "input": "", + "output": "The Great Depression was a severe global economic downturn that lasted from 1929 to 1939. It was caused by several factors, including the stock market crash of 1929, structural weaknesses in the economy, low consumer spending, high unemployment, and a lack of confidence in the banking system. Other contributing factors included overproduction of commodities, reliance on borrowing to finance investments, agricultural overproduction, and a decrease in international trade." + }, + { + "text": "Classify the following into animals, plants, and minerals", + "input": "Oak tree, copper ore, elephant", + "output": "Oak tree: Plant\n Copper ore: Mineral\n Elephant: Animal" + }, + { + "text": "Explain the use of word embeddings in Natural Language Processing", + "input": "", + "output": "Word embeddings are one of the most powerful tools available for Natural Language Processing (NLP). They are mathematical representations of words or phrases in a vector space, allowing similarities between words and the context in which they are used to be measured. Word embeddings are useful for tasks such as sentiment analysis, text classification, predicting the next word in a sequence, and understanding synonyms and analogies. They allow for words to be processed as numerical values, giving machines an easier way to perform NLP tasks." + }, + { + "text": "Describe the function of a computer motherboard", + "input": "", + "output": "A computer motherboard is the main circuit board in a computer. It is the backbone of the system, connecting all other components such as the processor, RAM, storage, graphics card and peripheral ports. It provides communication pathways between all of these components, allowing them to interact and work together. The motherboard is responsible for regulating power, and also houses slots for expansion cards, as well as ports for USB, audio, Ethernet, and other connection types." + }, + { + "text": "Reverse engineer this code to create a new version", + "input": "def factorialize(num):\n factorial = 1\n for i in range(1, num):\n factorial *= i\n \n return factorial", + "output": "def factorialize(num):\n factorial = 1\n for i in range(num, 0, -1):\n factorial *= i\n \n return factorial" + }, + { + "text": "Propose an ethical solution to the problem of data privacy", + "input": "", + "output": "One ethical solution to the problem of data privacy is to implement an opt-in data collection policy. This means that companies cannot collect and use personal data without the explicit permission of the data owners. This approach gives users the power to determine which data companies can use and for what purpose. Companies should also be transparent about how they use the collected data, so that users can make informed decisions and have control over their own data." + }, + { + "text": "Generate three verbs that mean the same as \"to apologize\"", + "input": "", + "output": "Acknowledge, express regret, make amends." + }, + { + "text": "What does DNA stand for?", + "input": "", + "output": "DNA stands for deoxyribonucleic acid." + }, + { + "text": "Compare and contrast the Cuban Missile Crisis and the Vietnam War.", + "input": "", + "output": "The Cuban Missile Crisis and the Vietnam War were two of the largest and deadliest conflicts of the 20th century. The Cuban Missile Crisis involved a standoff between the United States and the Soviet Union over Soviet nuclear missile bases in Cuba, while the Vietnam War was a prolonged military conflict in Indochina between the North, which was backed by the Soviet Union and the South, which was backed by the United States. The Cuban Missile Crisis unfolded over a period of 13 days in October 1962, with the resolution coming in the form of a diplomatic agreement. The Vietnam War lasted for 19 years, beginning in 1954 and ending in 1973, with the signing of the Paris Peace Accords. While both of these conflicts were Cold War related, they were also very different in terms of causes, objectives, strategic objectives and outcomes. The Cuban Missile Crisis did not result in any direct military conflict and was resolved primarily through diplomacy and the decision to withdraw nuclear weapons from Cuba. On the other hand, the Vietnam War was a prolonged series of battles resulting in significant losses for both sides. The Vietnam War also had a significant impact on the United States, as it highlighted the realities of a prolonged conflict and the limits of U.S. military intervention. Additionally, the Vietnam War also drastically changed the way public opinion viewed the military and the use of force." + }, + { + "text": "Generate a list of random words.", + "input": "", + "output": "sneeze, conflict, ancestor, thunder, companion, amulet, forge, serpent, magnify, jester, cascade, universe, surge, nausea, oasis, anomaly, pursue, rapture, pagoda, mosaic." + }, + { + "text": "Transcribe the recording into text.", + "input": "Recording about the history of Waterloo", + "output": "The Battle of Waterloo, fought on June 18, 1815, was the last major battle of the Napoleonic Wars. It marked the end of Napoleon Bonaparte's rule. The battle was fought between Napoleon Bonaparte's French forces and an allied force of British, Dutch, Belgian and Prussian troops, led by the Duke of Wellington. It was fought near the town of Waterloo in the former Austrian Netherlands, now present-day Belgium. The battle was a defeat for Napoleon's forces and ended Napoleon's rule over France. Napoleon himself was exiled to St. Helena, where he would die in 1821. The battle marked the end of French domination of Europe and the beginning of a period of peace in the region. It was also the last major battle of the Napoleonic Wars and is remembered as one of the most important battles in history. The victory of the allied forces at Waterloo helped to bring an end to French imperial ambitions in Europe. It also established the British as one of the most powerful nations in Europe. The battle was fought with a combination of infantry, cavalry, and artillery tactics, and showed the beginning of the changing nature of warfare. Although the French forces greatly outnumbered the allies, their strategic and tactical mistake led to the loss of the battle. This defeat signaled the end of French imperial power in Europe. The legacy of Waterloo still stands today, and it cemented the British Empire's position for the next one hundred years." + }, + { + "text": "Who is the world's most famous painter?", + "input": "", + "output": "The world's most famous painter is generally considered to be the Italian renaissance artist Leonardo da Vinci. His work includes the Mona Lisa and The Last Supper, both of which are considered masterpieces. Da Vinci was also a master of science, inventing the helicopter and the tank, among other inventions. He was also a prolific musician, sculptor, and architect. His work has inspired generations of artists and continues to be revered today." + }, + { + "text": "Explain the concept of cogging torque.", + "input": "", + "output": "Cogging torque is the torque that is applied to a rotating device when the device experiences resistance from the teeth of the gears that are connected to the device. Cogging torque occurs due to the reaction force generated when the teeth of the gear drive mesh. This force pushes against each other, resulting in a drag force that is transmitted from the gears to the shaft. The magnitude of the cogging torque can vary depending on the size and number of teeth, as well as the gear's material and design. Cogging torque is undesirable in most motorized applications, as it can lead to loss of efficiency. To reduce cogging torque, gear designs are often symmetrical or have certain tooth profiles which reduce the force of contact between the teeth when they come into contact with each other." + }, + { + "text": "Look up the boiling point of water.", + "input": "", + "output": "The boiling point of water is 100\u00b0C (212\u00b0F), at standard atmospheric pressure. When heated at higher pressures, water's boiling point increases. For example, water will boil at a temperature of 115\u00b0C (239\u00b0F) at an atmospheric pressure of 1 bar. At higher pressures, such as those experienced at the bottom of the ocean, the boiling point of water can be as high as 374\u00b0C (705\u00b0F)." + }, + { + "text": "Describe the life and reign of King Charles II.", + "input": "", + "output": "King Charles II ascended to the throne of England following the English Civil War and the death of his father, Charles I. He was crowned in 1661 and his reign lasted until his death in 1685. Charles II was also a patron of the arts, working with some of the greatest painters and sculptors of his day. He also encouraged theater and sports and had an interest in sciences such as astronomy and natural history. Charles II was a moderate ruler who respected the civil and religious liberties of his citizens. He was tolerant of other religions and gave Jews and Catholics more rights in the kingdom. Charles II was also a strong supporter of the Royal Navy, which saw England increase its power in the world. He also promoted the development of industry and commerce, which helped to modernize the country. Charles II's reign was an important step in restoring the monarchy of England after the turmoil of the English Civil War." + }, + { + "text": "Find the area of a circle given its radius.", + "input": "Radius = 4", + "output": "The area of a circle with a radius of 4 is equal to 12.5664 square units. This is calculated by using the formula A = \u03c0r2, where A is the area, \u03c0 is roughly equal to 3.1416 and r is the radius of the circle." + } +] diff --git a/demos/process_diff_sources/data/demo-dataset.jsonl b/demos/process_diff_sources/data/demo-dataset.jsonl new file mode 100644 index 0000000000..a52d3af6d7 --- /dev/null +++ b/demos/process_diff_sources/data/demo-dataset.jsonl @@ -0,0 +1,146 @@ + + { + "text": "Give three tips for staying healthy.", + "input": "", + "output": "1.Eat a balanced diet and make sure to include plenty of fruits and vegetables. \n2. Exercise regularly to keep your body active and strong. \n3. Get enough sleep and maintain a consistent sleep schedule." + } + { + "text": "What are the three primary colors?", + "input": "", + "output": "The three primary colors are red, blue, and yellow." + } + { + "text": "Describe the structure of an atom.", + "input": "", + "output": "An atom is made up of a nucleus, which contains protons and neutrons, surrounded by electrons that travel in orbits around the nucleus. The protons and neutrons have a positive charge, while the electrons have a negative charge, resulting in an overall neutral atom. The number of each particle determines the atomic number and the type of atom." + } + { + "text": "How can we reduce air pollution?", + "input": "", + "output": "There are a number of ways to reduce air pollution, such as shifting to renewable energy sources, encouraging the use of public transportation, prohibiting the burning of fossil fuels, implementing policies to reduce emissions from industrial sources, and implementing vehicle emissions standards. Additionally, individuals can do their part to reduce air pollution by reducing car use, avoiding burning materials such as wood, and changing to energy efficient appliances." + } + { + "text": "Describe a time when you had to make a difficult decision.", + "input": "", + "output": "I had to make a difficult decision when I was working as a project manager at a construction company. I was in charge of a project that needed to be completed by a certain date in order to meet the client\u2019s expectations. However, due to unexpected delays, we were not able to meet the deadline and so I had to make a difficult decision. I decided to extend the deadline, but I had to stretch the team\u2019s resources even further and increase the budget. Although it was a risky decision, I ultimately decided to go ahead with it to ensure that the project was completed on time and that the client\u2019s expectations were met. The project was eventually successfully completed and this was seen as a testament to my leadership and decision-making abilities." + } + { + "text": "Identify the odd one out.", + "input": "Twitter, Instagram, Telegram", + "output": "Telegram" + } + { + "text": "Explain why the following fraction is equivalent to 1/4", + "input": "4/16", + "output": "The fraction 4/16 is equivalent to 1/4 because both numerators and denominators are divisible by 4. Dividing both the top and bottom numbers by 4 yields the fraction 1/4." + } + { + "text": "Write a short story in third person narration about a protagonist who has to make an important career decision.", + "input": "", + "output": "John was at a crossroads in his life. He had just graduated college and was now facing the big decision of what career to pursue. After much deliberation, he decided that he wanted to be an accountant and help the financially disadvantaged. He had always been good with numbers and enjoyed seeing the tangible results of his work. \n\nJohn enrolled in accounting courses and initially found it quite challenging. He had to learn multiple systems and regulations quickly, but he worked hard and eventually excelled in his studies. After a few years, John started working at an accounting firm in his city. He was eager to put his knowledge of taxes and accounting to use in a real-world setting.\n\nJohn loved his job, as it let him express his creativity in finding strategies to save his clients money. After a few years at the firm, he became a senior accountant and was asked to manage bigger and more challenging cases. He was now a respected figure in the financial industry, but he still remembers when he was just a recent college graduate, unsure of the direction in which his life would take him." + } + { + "text": "Render a 3D model of a house", + "input": "", + "output": " This type of text cannot be fulfilled by a GPT model." + } + { + "text": "Evaluate this sentence for spelling and grammar mistakes", + "input": "He finnished his meal and left the resturant", + "output": "He finished his meal and left the restaurant." + } + { + "text": "How did Julius Caesar die?", + "input": "", + "output": "Julius Caesar was assassinated by a group of up to 60 conspirators, led by Gaius Cassius Longinus and Marcus Junius Brutus, in the Senate House on the Ides of March (15 March) of 44 BC." + } + { + "text": "What is the capital of France?", + "input": "", + "output": "The capital of France is Paris." + } + { + "text": "Generate a list of ten items a person might need for a camping trip", + "input": "", + "output": "1. Tent\n2. Sleeping bags\n3. Flashlight\n4. Matches/lighter\n5. Insect repellent\n6. Extra clothes\n7. First aid kit\n8. Camp stove\n9. Cooking utensils\n10. Biodegradable soaps/shampoos" + } + { + "text": "Discuss the causes of the Great Depression", + "input": "", + "output": "The Great Depression was a severe global economic downturn that lasted from 1929 to 1939. It was caused by several factors, including the stock market crash of 1929, structural weaknesses in the economy, low consumer spending, high unemployment, and a lack of confidence in the banking system. Other contributing factors included overproduction of commodities, reliance on borrowing to finance investments, agricultural overproduction, and a decrease in international trade." + } + { + "text": "Classify the following into animals, plants, and minerals", + "input": "Oak tree, copper ore, elephant", + "output": "Oak tree: Plant\n Copper ore: Mineral\n Elephant: Animal" + } + { + "text": "Explain the use of word embeddings in Natural Language Processing", + "input": "", + "output": "Word embeddings are one of the most powerful tools available for Natural Language Processing (NLP). They are mathematical representations of words or phrases in a vector space, allowing similarities between words and the context in which they are used to be measured. Word embeddings are useful for tasks such as sentiment analysis, text classification, predicting the next word in a sequence, and understanding synonyms and analogies. They allow for words to be processed as numerical values, giving machines an easier way to perform NLP tasks." + } + { + "text": "Describe the function of a computer motherboard", + "input": "", + "output": "A computer motherboard is the main circuit board in a computer. It is the backbone of the system, connecting all other components such as the processor, RAM, storage, graphics card and peripheral ports. It provides communication pathways between all of these components, allowing them to interact and work together. The motherboard is responsible for regulating power, and also houses slots for expansion cards, as well as ports for USB, audio, Ethernet, and other connection types." + } + { + "text": "Reverse engineer this code to create a new version", + "input": "def factorialize(num):\n factorial = 1\n for i in range(1, num):\n factorial *= i\n \n return factorial", + "output": "def factorialize(num):\n factorial = 1\n for i in range(num, 0, -1):\n factorial *= i\n \n return factorial" + } + { + "text": "Propose an ethical solution to the problem of data privacy", + "input": "", + "output": "One ethical solution to the problem of data privacy is to implement an opt-in data collection policy. This means that companies cannot collect and use personal data without the explicit permission of the data owners. This approach gives users the power to determine which data companies can use and for what purpose. Companies should also be transparent about how they use the collected data, so that users can make informed decisions and have control over their own data." + } + { + "text": "Generate three verbs that mean the same as \"to apologize\"", + "input": "", + "output": "Acknowledge, express regret, make amends." + } + { + "text": "What does DNA stand for?", + "input": "", + "output": "DNA stands for deoxyribonucleic acid." + } + { + "text": "Compare and contrast the Cuban Missile Crisis and the Vietnam War.", + "input": "", + "output": "The Cuban Missile Crisis and the Vietnam War were two of the largest and deadliest conflicts of the 20th century. The Cuban Missile Crisis involved a standoff between the United States and the Soviet Union over Soviet nuclear missile bases in Cuba, while the Vietnam War was a prolonged military conflict in Indochina between the North, which was backed by the Soviet Union and the South, which was backed by the United States. The Cuban Missile Crisis unfolded over a period of 13 days in October 1962, with the resolution coming in the form of a diplomatic agreement. The Vietnam War lasted for 19 years, beginning in 1954 and ending in 1973, with the signing of the Paris Peace Accords. While both of these conflicts were Cold War related, they were also very different in terms of causes, objectives, strategic objectives and outcomes. The Cuban Missile Crisis did not result in any direct military conflict and was resolved primarily through diplomacy and the decision to withdraw nuclear weapons from Cuba. On the other hand, the Vietnam War was a prolonged series of battles resulting in significant losses for both sides. The Vietnam War also had a significant impact on the United States, as it highlighted the realities of a prolonged conflict and the limits of U.S. military intervention. Additionally, the Vietnam War also drastically changed the way public opinion viewed the military and the use of force." + } + { + "text": "Generate a list of random words.", + "input": "", + "output": "sneeze, conflict, ancestor, thunder, companion, amulet, forge, serpent, magnify, jester, cascade, universe, surge, nausea, oasis, anomaly, pursue, rapture, pagoda, mosaic." + } + { + "text": "Transcribe the recording into text.", + "input": "Recording about the history of Waterloo", + "output": "The Battle of Waterloo, fought on June 18, 1815, was the last major battle of the Napoleonic Wars. It marked the end of Napoleon Bonaparte's rule. The battle was fought between Napoleon Bonaparte's French forces and an allied force of British, Dutch, Belgian and Prussian troops, led by the Duke of Wellington. It was fought near the town of Waterloo in the former Austrian Netherlands, now present-day Belgium. The battle was a defeat for Napoleon's forces and ended Napoleon's rule over France. Napoleon himself was exiled to St. Helena, where he would die in 1821. The battle marked the end of French domination of Europe and the beginning of a period of peace in the region. It was also the last major battle of the Napoleonic Wars and is remembered as one of the most important battles in history. The victory of the allied forces at Waterloo helped to bring an end to French imperial ambitions in Europe. It also established the British as one of the most powerful nations in Europe. The battle was fought with a combination of infantry, cavalry, and artillery tactics, and showed the beginning of the changing nature of warfare. Although the French forces greatly outnumbered the allies, their strategic and tactical mistake led to the loss of the battle. This defeat signaled the end of French imperial power in Europe. The legacy of Waterloo still stands today, and it cemented the British Empire's position for the next one hundred years." + } + { + "text": "Who is the world's most famous painter?", + "input": "", + "output": "The world's most famous painter is generally considered to be the Italian renaissance artist Leonardo da Vinci. His work includes the Mona Lisa and The Last Supper, both of which are considered masterpieces. Da Vinci was also a master of science, inventing the helicopter and the tank, among other inventions. He was also a prolific musician, sculptor, and architect. His work has inspired generations of artists and continues to be revered today." + } + { + "text": "Explain the concept of cogging torque.", + "input": "", + "output": "Cogging torque is the torque that is applied to a rotating device when the device experiences resistance from the teeth of the gears that are connected to the device. Cogging torque occurs due to the reaction force generated when the teeth of the gear drive mesh. This force pushes against each other, resulting in a drag force that is transmitted from the gears to the shaft. The magnitude of the cogging torque can vary depending on the size and number of teeth, as well as the gear's material and design. Cogging torque is undesirable in most motorized applications, as it can lead to loss of efficiency. To reduce cogging torque, gear designs are often symmetrical or have certain tooth profiles which reduce the force of contact between the teeth when they come into contact with each other." + } + { + "text": "Look up the boiling point of water.", + "input": "", + "output": "The boiling point of water is 100\u00b0C (212\u00b0F), at standard atmospheric pressure. When heated at higher pressures, water's boiling point increases. For example, water will boil at a temperature of 115\u00b0C (239\u00b0F) at an atmospheric pressure of 1 bar. At higher pressures, such as those experienced at the bottom of the ocean, the boiling point of water can be as high as 374\u00b0C (705\u00b0F)." + } + { + "text": "Describe the life and reign of King Charles II.", + "input": "", + "output": "King Charles II ascended to the throne of England following the English Civil War and the death of his father, Charles I. He was crowned in 1661 and his reign lasted until his death in 1685. Charles II was also a patron of the arts, working with some of the greatest painters and sculptors of his day. He also encouraged theater and sports and had an interest in sciences such as astronomy and natural history. Charles II was a moderate ruler who respected the civil and religious liberties of his citizens. He was tolerant of other religions and gave Jews and Catholics more rights in the kingdom. Charles II was also a strong supporter of the Royal Navy, which saw England increase its power in the world. He also promoted the development of industry and commerce, which helped to modernize the country. Charles II's reign was an important step in restoring the monarchy of England after the turmoil of the English Civil War." + } + { + "text": "Find the area of a circle given its radius.", + "input": "Radius = 4", + "output": "The area of a circle with a radius of 4 is equal to 12.5664 square units. This is calculated by using the formula A = \u03c0r2, where A is the area, \u03c0 is roughly equal to 3.1416 and r is the radius of the circle." + } From 37db9bfd70ad40acba4cb5cd047aee1cb5f6a099 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 29 Dec 2025 21:12:23 +0800 Subject: [PATCH 04/18] feat: read iceberg file --- data_juicer/core/data/load_strategy.py | 106 +++++++++++++++++- data_juicer/core/ray_exporter.py | 24 ++++ .../config/process_from_iceberg.yaml | 78 +++++++++++++ pyproject.toml | 1 + 4 files changed, 206 insertions(+), 3 deletions(-) create mode 100644 demos/process_diff_sources/config/process_from_iceberg.yaml diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index 68bf73393d..897a1aa80d 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -671,8 +671,6 @@ def _create_hdfs_fs(self): return fs.HadoopFileSystem(host=host, port=port, user=user, kerb_ticket=kerb_ticket, extra_conf=extra_conf) def load_data(self, **kwargs): - from urllib.parse import urlparse - from data_juicer.core.data.ray_dataset import RayDataset path = self.ds_config["path"] @@ -680,7 +678,6 @@ def load_data(self, **kwargs): logger.info(f"Loading dataset from HDFS: {path}") - path = urlparse(path).path file_extension_map = { ".json": "json", ".jsonl": "json", @@ -859,3 +856,106 @@ def load_data(self, **kwargs): f"Ensure your AWS credentials are configured. " f"Error: {str(e)}" ) + + +@DataLoadStrategyRegistry.register("default", "remote", "iceberg") +class DefaultIcebergDataLoadStrategy(DefaultDataLoadStrategy): + """ + data load strategy for Iceberg tables for LocalExecutor + Relies on pyiceberg to read the table and converts to HF Dataset + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table"], + "optional_fields": ["catalog", "catalog_props"], + "field_types": {"table": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data import NestedDataset + + text_keys = getattr(self.cfg, "text_keys", ["text"]) + table_id = self.ds_config["table"] + # Default catalog name is usually 'default' in pyiceberg if not specified + catalog_name = self.ds_config.get("catalog", "default") + catalog_props = self.ds_config.get("catalog_props", {}) or {} + + try: + from pyiceberg.catalog import load_catalog + + # Load catalog with optional properties (e.g., uri, credentials) + # if props are empty, it relies on pyiceberg.yaml or env vars + catalog = load_catalog(catalog_name, **catalog_props) + + # Load the table + table = catalog.load_table(table_id) + + # Scan table to PyArrow Table + # Note: For very large tables on LocalExecutor, this might consume memory + # equivalent to the table size. + arrow_table = table.scan().to_arrow() + + # Convert to HF Dataset + ds = datasets.Dataset(arrow_table) + + ds = NestedDataset(ds) + ds = unify_format(ds, text_keys=text_keys, num_proc=kwargs.get("num_proc", 1), global_cfg=self.cfg) + return ds + except ImportError: + raise RuntimeError( + "pyiceberg is not installed. Please install it via `pip install pyiceberg` " + "to use Iceberg data load strategy." + ) + except Exception as e: + raise RuntimeError( + f"Failed to load Iceberg table {table_id}. " f"Ensure catalog configs are correct. " f"Error: {str(e)}" + ) + + +@DataLoadStrategyRegistry.register("ray", "remote", "iceberg") +class RayIcebergDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Iceberg tables for RayExecutor + Uses ray.data.read_iceberg + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table"], + "optional_fields": ["catalog", "catalog_props"], + "field_types": {"table": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_id = self.ds_config["table"] + catalog_name = self.ds_config.get("catalog", "default") + catalog_props = self.ds_config.get("catalog_props", {}) or {} + + logger.info(f"Loading Iceberg table: {table_id} from catalog: {catalog_name}") + + try: + import ray.data + from pyiceberg.catalog import load_catalog + + # Ray's read_iceberg often requires the table input to be a PyIceberg table object + # or arguments to construct one. To be most robust and support authentication, + # we load the table via pyiceberg first, then pass it to Ray. + + catalog = load_catalog(catalog_name, **catalog_props) + table = catalog.load_table(table_id) + + # Ray reads the table distributedly based on the snapshots + dataset = ray.data.read_iceberg(table) + + return RayDataset(dataset, dataset_path=table_id, cfg=self.cfg) + + except ImportError: + raise RuntimeError( + "pyiceberg is not installed. Please install it via `pip install pyiceberg` " + "to use Iceberg data load strategy in Ray." + ) + except Exception as e: + raise RuntimeError(f"Failed to load Iceberg table {table_id} in Ray. " f"Error: {str(e)}") diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index f0a231b0e8..df723d045a 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -82,6 +82,27 @@ def __init__( self.s3_filesystem = create_pyarrow_s3_filesystem(s3_config) logger.info(f"Detected S3 export path: {export_path}. S3 filesystem configured.") + # Check if export_path is HDFS and create filesystem if needed + self.hdfs_filesystem = None + if export_path.startswith("hdfs://"): + import pyarrow.fs as fs + + hdfs_config = {} + # Extract HDFS config from export_extra_args + if "host" in self.export_extra_args: + hdfs_config["host"] = self.export_extra_args.pop("host") + if "port" in self.export_extra_args: + hdfs_config["port"] = int(self.export_extra_args.pop("port")) + if "user" in self.export_extra_args: + hdfs_config["user"] = self.export_extra_args.pop("user") + if "kerb_ticket" in self.export_extra_args: + hdfs_config["kerb_ticket"] = self.export_extra_args.pop("kerb_ticket") + if "extra_conf" in self.export_extra_args: + hdfs_config["extra_conf"] = self.export_extra_args.pop("extra_conf") + + self.hdfs_filesystem = fs.HadoopFileSystem(**hdfs_config) + logger.info(f"Detected HDFS export path: {export_path}. HDFS filesystem configured.") + self.max_shard_size_str = "" # get the string format of shard size @@ -157,6 +178,9 @@ def _export_impl(self, dataset, export_path, columns=None): # Add S3 filesystem if available if self.s3_filesystem is not None: export_kwargs["export_extra_args"]["filesystem"] = self.s3_filesystem + elif self.hdfs_filesystem is not None: + export_kwargs["export_extra_args"]["filesystem"] = self.hdfs_filesystem + if self.export_shard_size > 0: # compute the min_rows_per_file for export methods dataset_nbytes = dataset.size_bytes() diff --git a/demos/process_diff_sources/config/process_from_iceberg.yaml b/demos/process_diff_sources/config/process_from_iceberg.yaml new file mode 100644 index 0000000000..cf497cc169 --- /dev/null +++ b/demos/process_diff_sources/config/process_from_iceberg.yaml @@ -0,0 +1,78 @@ +# Process config example for dataset + +# global parameters +project_name: 'demo' + + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/demo-processed.jsonl' + +dataset: + configs: + - type: remote + source: iceberg + catalog: local + table: "default.test_table" + catalog_props: + type: sql + uri: sqlite:////root/test_catalog.db + warehouse: file:///root/tmp/iceberg_warehouse + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/pyproject.toml b/pyproject.toml index fe68e62c34..5e88b2038a 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,6 +118,7 @@ distributed = [ "ray[default]>=2.51.0", # distributed computing "pyspark==3.5.5", # distributed data processing "s3fs", # S3 filesystem support for cloud storage + "pyiceberg[pyarrow]" ] # Development & Tools From 1368eaf0a52a69feb6daaf2f439a3242a97fbead Mon Sep 17 00:00:00 2001 From: Dludora Date: Tue, 30 Dec 2025 16:26:28 +0800 Subject: [PATCH 05/18] feat: iceberg read --- data_juicer/core/data/load_strategy.py | 46 +++++++++++--------------- 1 file changed, 19 insertions(+), 27 deletions(-) diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index 897a1aa80d..2234889140 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -866,9 +866,9 @@ class DefaultIcebergDataLoadStrategy(DefaultDataLoadStrategy): """ CONFIG_VALIDATION_RULES = { - "required_fields": ["table"], - "optional_fields": ["catalog", "catalog_props"], - "field_types": {"table": str}, + "required_fields": ["table_identifier", "catalog_kwargs"], + "optional_fields": [], + "field_types": {"table_identifier": str, "catalog_kwargs": dict}, "custom_validators": {}, } @@ -876,21 +876,17 @@ def load_data(self, **kwargs): from data_juicer.core.data import NestedDataset text_keys = getattr(self.cfg, "text_keys", ["text"]) - table_id = self.ds_config["table"] - # Default catalog name is usually 'default' in pyiceberg if not specified - catalog_name = self.ds_config.get("catalog", "default") - catalog_props = self.ds_config.get("catalog_props", {}) or {} - + table_identifier = self.ds_config["table_identifier"] + catalog_kwargs = self.ds_config.get("catalog_kwargs", {}) or {} try: from pyiceberg.catalog import load_catalog # Load catalog with optional properties (e.g., uri, credentials) # if props are empty, it relies on pyiceberg.yaml or env vars - catalog = load_catalog(catalog_name, **catalog_props) + catalog = load_catalog(**catalog_kwargs) # Load the table - table = catalog.load_table(table_id) - + table = catalog.load_table(table_identifier) # Scan table to PyArrow Table # Note: For very large tables on LocalExecutor, this might consume memory # equivalent to the table size. @@ -909,7 +905,9 @@ def load_data(self, **kwargs): ) except Exception as e: raise RuntimeError( - f"Failed to load Iceberg table {table_id}. " f"Ensure catalog configs are correct. " f"Error: {str(e)}" + f"Failed to load Iceberg table {table_identifier}. " + f"Ensure catalog configs are correct. " + f"Error: {str(e)}" ) @@ -921,36 +919,30 @@ class RayIcebergDataLoadStrategy(RayDataLoadStrategy): """ CONFIG_VALIDATION_RULES = { - "required_fields": ["table"], - "optional_fields": ["catalog", "catalog_props"], - "field_types": {"table": str}, + "required_fields": ["table_identifier", "catalog_kwargs"], + "optional_fields": [], + "field_types": {"table_identifier": str, "catalog_kwargs": dict}, "custom_validators": {}, } def load_data(self, **kwargs): from data_juicer.core.data.ray_dataset import RayDataset - table_id = self.ds_config["table"] - catalog_name = self.ds_config.get("catalog", "default") - catalog_props = self.ds_config.get("catalog_props", {}) or {} - - logger.info(f"Loading Iceberg table: {table_id} from catalog: {catalog_name}") + table_identifier = self.ds_config["table_identifier"] + catalog_kwargs = self.ds_config.get("catalog_kwargs", {}) or {} + logger.info(f"Loading Iceberg table: {table_identifier} with catalog kwargs: {catalog_kwargs}") try: import ray.data - from pyiceberg.catalog import load_catalog # Ray's read_iceberg often requires the table input to be a PyIceberg table object # or arguments to construct one. To be most robust and support authentication, # we load the table via pyiceberg first, then pass it to Ray. - catalog = load_catalog(catalog_name, **catalog_props) - table = catalog.load_table(table_id) - # Ray reads the table distributedly based on the snapshots - dataset = ray.data.read_iceberg(table) + dataset = ray.data.read_iceberg(table_identifier=table_identifier, catalog_kwargs=catalog_kwargs) - return RayDataset(dataset, dataset_path=table_id, cfg=self.cfg) + return RayDataset(dataset, dataset_path=table_identifier, cfg=self.cfg) except ImportError: raise RuntimeError( @@ -958,4 +950,4 @@ def load_data(self, **kwargs): "to use Iceberg data load strategy in Ray." ) except Exception as e: - raise RuntimeError(f"Failed to load Iceberg table {table_id} in Ray. " f"Error: {str(e)}") + raise RuntimeError(f"Failed to load Iceberg table {table_identifier} in Ray. " f"Error: {str(e)}") From f4856b6eeb37b058e7310d4fb8b07ca123e8f358 Mon Sep 17 00:00:00 2001 From: Dludora Date: Tue, 30 Dec 2025 16:51:55 +0800 Subject: [PATCH 06/18] feat: process iceberg and hdfs --- .../config/process_from_hdfs.yaml | 0 .../config/process_from_iceberg.yaml | 19 +++-- .../config/process_from_s3.yaml | 78 +++++++++++++++++++ .../data/demo-dataset.json | 0 .../data/demo-dataset.jsonl | 0 5 files changed, 90 insertions(+), 7 deletions(-) rename demos/{process_diff_sources => process_dist_sources}/config/process_from_hdfs.yaml (100%) rename demos/{process_diff_sources => process_dist_sources}/config/process_from_iceberg.yaml (93%) create mode 100644 demos/process_dist_sources/config/process_from_s3.yaml rename demos/{process_diff_sources => process_dist_sources}/data/demo-dataset.json (100%) rename demos/{process_diff_sources => process_dist_sources}/data/demo-dataset.jsonl (100%) diff --git a/demos/process_diff_sources/config/process_from_hdfs.yaml b/demos/process_dist_sources/config/process_from_hdfs.yaml similarity index 100% rename from demos/process_diff_sources/config/process_from_hdfs.yaml rename to demos/process_dist_sources/config/process_from_hdfs.yaml diff --git a/demos/process_diff_sources/config/process_from_iceberg.yaml b/demos/process_dist_sources/config/process_from_iceberg.yaml similarity index 93% rename from demos/process_diff_sources/config/process_from_iceberg.yaml rename to demos/process_dist_sources/config/process_from_iceberg.yaml index cf497cc169..6470e053ab 100644 --- a/demos/process_diff_sources/config/process_from_iceberg.yaml +++ b/demos/process_dist_sources/config/process_from_iceberg.yaml @@ -2,22 +2,27 @@ # global parameters project_name: 'demo' - +executor_type: ray +ray_address: local np: 1 # number of subprocess to process your dataset -export_path: './outputs/demo/demo-processed.jsonl' +export_path: './outputs/demo/' dataset: configs: - type: remote source: iceberg - catalog: local - table: "default.test_table" - catalog_props: + table_identifier: default.test + catalog_kwargs: type: sql - uri: sqlite:////root/test_catalog.db - warehouse: file:///root/tmp/iceberg_warehouse + name: local + uri: sqlite:////root/Project/iceberg/test_catalog.db + warehouse: s3://iceberg-data/warehouse + s3.endpoint: http://10.29.240.39:9000 + s3.access-key-id: admin + s3.secret-access-key: password + s3.region: us-east-1 # process schedule # a list of several process operators with their arguments diff --git a/demos/process_dist_sources/config/process_from_s3.yaml b/demos/process_dist_sources/config/process_from_s3.yaml new file mode 100644 index 0000000000..76d64ff9e0 --- /dev/null +++ b/demos/process_dist_sources/config/process_from_s3.yaml @@ -0,0 +1,78 @@ +project_name: 'demo' +executor_type: ray +ray_address: local + +np: 1 # number of subprocess to process your dataset + +export_path: './outputs/demo/' + + +dataset: + configs: + - type: remote + source: s3 + path: s3://datajuicer/demo-dataset.jsonl + aws_access_key_id: admin # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) + aws_secret_access_key: password + aws_region: us-east-1 + endpoint_url: http://10.29.240.39:9000 # Optional, for S3-compatible object storage + + + +# process schedule +# a list of several process operators with their arguments +process: + # Filter ops + - alphanumeric_filter: # filter text with alphabet/numeric ratio out of specific range. + tokenization: false # Whether to count the ratio of alphanumeric to the total number of tokens. + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.9 # the max ratio of filter range + - average_line_length_filter: # filter text with the average length of lines out of specific range. + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - character_repetition_filter: # filter text with the character repetition ratio out of specific range + rep_len: 10 # repetition length for char-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range + - flagged_words_filter: # filter text with the flagged-word ratio larger than a specific max value + lang: en # consider flagged words in what language + tokenization: false # whether to use model to tokenize documents + max_ratio: 0.0045 # the max ratio to filter text + flagged_words_dir: ./assets # directory to store flagged words dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - language_id_score_filter: # filter text in specific language with language scores larger than a specific max value + lang: en # keep text in what language + min_score: 0.8 # the min language scores to filter text + - maximum_line_length_filter: # filter text with the maximum length of lines out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - perplexity_filter: # filter text with perplexity score out of specific range + lang: en # compute perplexity in what language + max_ppl: 1500 # the max perplexity score to filter text + - special_characters_filter: # filter text with special-char ratio out of specific range + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.25 # the max ratio of filter range + - stopwords_filter: # filter text with stopword ratio smaller than a specific min value + lang: en # consider stopwords in what language + tokenization: false # whether to use model to tokenize documents + min_ratio: 0.3 # the min ratio to filter text + stopwords_dir: ./assets # directory to store stopwords dictionaries + use_words_aug: false # whether to augment words, especially for Chinese and Vietnamese + words_aug_group_sizes: [2] # the group size of words to augment + words_aug_join_char: "" # the join char between words to augment + - text_length_filter: # filter text with length out of specific range + min_len: 10 # the min length of filter range + max_len: 10000 # the max length of filter range + - words_num_filter: # filter text with number of words out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + min_num: 10 # the min number of filter range + max_num: 10000 # the max number of filter range + - word_repetition_filter: # filter text with the word repetition ratio out of specific range + lang: en # sample in which language + tokenization: false # whether to use model to tokenize documents + rep_len: 10 # repetition length for word-level n-gram + min_ratio: 0.0 # the min ratio of filter range + max_ratio: 0.5 # the max ratio of filter range diff --git a/demos/process_diff_sources/data/demo-dataset.json b/demos/process_dist_sources/data/demo-dataset.json similarity index 100% rename from demos/process_diff_sources/data/demo-dataset.json rename to demos/process_dist_sources/data/demo-dataset.json diff --git a/demos/process_diff_sources/data/demo-dataset.jsonl b/demos/process_dist_sources/data/demo-dataset.jsonl similarity index 100% rename from demos/process_diff_sources/data/demo-dataset.jsonl rename to demos/process_dist_sources/data/demo-dataset.jsonl From 9f8a55167f437e49bd2dca7bebf15588859e415e Mon Sep 17 00:00:00 2001 From: Dludora Date: Tue, 30 Dec 2025 17:18:02 +0800 Subject: [PATCH 07/18] refractor: secret --- .../config/process_from_iceberg.yaml | 10 +++++----- demos/process_dist_sources/config/process_from_s3.yaml | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/demos/process_dist_sources/config/process_from_iceberg.yaml b/demos/process_dist_sources/config/process_from_iceberg.yaml index 6470e053ab..f67f693489 100644 --- a/demos/process_dist_sources/config/process_from_iceberg.yaml +++ b/demos/process_dist_sources/config/process_from_iceberg.yaml @@ -17,11 +17,11 @@ dataset: catalog_kwargs: type: sql name: local - uri: sqlite:////root/Project/iceberg/test_catalog.db - warehouse: s3://iceberg-data/warehouse - s3.endpoint: http://10.29.240.39:9000 - s3.access-key-id: admin - s3.secret-access-key: password + uri: sqlite:///your.db + warehouse: s3://yourbucket/your_warehouse_path + s3.endpoint: http://yourhost:yourport + s3.access-key-id: your_access_key + s3.secret-access-key: your_secret_key s3.region: us-east-1 # process schedule diff --git a/demos/process_dist_sources/config/process_from_s3.yaml b/demos/process_dist_sources/config/process_from_s3.yaml index 76d64ff9e0..3f653cb5c5 100644 --- a/demos/process_dist_sources/config/process_from_s3.yaml +++ b/demos/process_dist_sources/config/process_from_s3.yaml @@ -12,10 +12,10 @@ dataset: - type: remote source: s3 path: s3://datajuicer/demo-dataset.jsonl - aws_access_key_id: admin # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) - aws_secret_access_key: password + aws_access_key_id: your_access_key # Recommended: set via environment variables (e.g., AWS_ACCESS_KEY_ID) + aws_secret_access_key: your_secret_key aws_region: us-east-1 - endpoint_url: http://10.29.240.39:9000 # Optional, for S3-compatible object storage + endpoint_url: http://yourhost:yourport # Optional, for S3-compatible object storage From 2b3b620acf6072a2fba9a88843e4ebd552dde277 Mon Sep 17 00:00:00 2001 From: Dludora Date: Tue, 30 Dec 2025 15:24:12 +0000 Subject: [PATCH 08/18] feat: export iceberg and others --- data_juicer/core/executor/ray_executor.py | 20 ----- data_juicer/core/ray_exporter.py | 93 +++++++++-------------- data_juicer/utils/file_utils.py | 30 +++++++- 3 files changed, 65 insertions(+), 78 deletions(-) diff --git a/data_juicer/core/executor/ray_executor.py b/data_juicer/core/executor/ray_executor.py index 2313b18aef..8fb0a4527d 100644 --- a/data_juicer/core/executor/ray_executor.py +++ b/data_juicer/core/executor/ray_executor.py @@ -73,26 +73,6 @@ def __init__(self, cfg: Optional[Namespace] = None): # Prepare export extra args, including S3 credentials if export_path is S3 export_extra_args = dict(self.cfg.export_extra_args) if hasattr(self.cfg, "export_extra_args") else {} - # If export_path is S3, extract AWS credentials with priority: - # 1. export_aws_credentials (export-specific) - # 2. dataset config (for backward compatibility) - # 3. environment variables (handled by exporter) - if self.cfg.export_path.startswith("s3://"): - # Pass export-specific credentials if provided. - # The RayExporter will handle falling back to environment variables or other credential mechanisms. - if hasattr(self.cfg, "export_aws_credentials") and self.cfg.export_aws_credentials: - export_aws_creds = self.cfg.export_aws_credentials - # Iterate through the required fields directly, and copy them to export_extra_args if they exist. - credential_fields = { - "aws_access_key_id", - "aws_secret_access_key", - "aws_session_token", - "aws_region", - "endpoint_url", - } - for field in credential_fields.intersection(export_aws_creds): - export_extra_args[field] = export_aws_creds[field] - self.exporter = RayExporter( self.cfg.export_path, self.cfg.export_type, diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index df723d045a..ebdb2d750b 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -1,10 +1,11 @@ import os +import copy from functools import partial from loguru import logger from data_juicer.utils.constant import Fields, HashKeys -from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str +from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str, create_filesystem_from_args from data_juicer.utils.model_utils import filter_arguments from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format @@ -22,6 +23,7 @@ class RayExporter: "tfrecords", "webdataset", "lance", + "iceberg", # 'images', # 'numpy', } @@ -51,7 +53,13 @@ def __init__( self.export_shard_size = export_shard_size self.keep_stats_in_res_ds = keep_stats_in_res_ds self.keep_hashes_in_res_ds = keep_hashes_in_res_ds - self.export_format = self._get_export_format(export_path) if export_type is None else export_type + + if export_type: + self.export_format = export_type + elif export_path: + self.export_format = self._get_export_format(export_path) + else: + raise ValueError("Either export_path or export_type should be provided.") if self.export_format not in self._SUPPORTED_FORMATS: raise NotImplementedError( f'export data format "{self.export_format}" is not supported ' @@ -59,50 +67,13 @@ def __init__( ) self.export_extra_args = kwargs if kwargs is not None else {} - # Check if export_path is S3 and create filesystem if needed - self.s3_filesystem = None - if export_path.startswith("s3://"): - # Extract AWS credentials from export_extra_args (if provided) - s3_config = {} - if "aws_access_key_id" in self.export_extra_args: - s3_config["aws_access_key_id"] = self.export_extra_args.pop("aws_access_key_id") - if "aws_secret_access_key" in self.export_extra_args: - s3_config["aws_secret_access_key"] = self.export_extra_args.pop("aws_secret_access_key") - if "aws_session_token" in self.export_extra_args: - s3_config["aws_session_token"] = self.export_extra_args.pop("aws_session_token") - if "aws_region" in self.export_extra_args: - s3_config["aws_region"] = self.export_extra_args.pop("aws_region") - if "endpoint_url" in self.export_extra_args: - s3_config["endpoint_url"] = self.export_extra_args.pop("endpoint_url") - - # Create PyArrow S3FileSystem with credentials - # This matches the pattern used in RayS3DataLoadStrategy - from data_juicer.utils.s3_utils import create_pyarrow_s3_filesystem - - self.s3_filesystem = create_pyarrow_s3_filesystem(s3_config) - logger.info(f"Detected S3 export path: {export_path}. S3 filesystem configured.") - - # Check if export_path is HDFS and create filesystem if needed - self.hdfs_filesystem = None - if export_path.startswith("hdfs://"): - import pyarrow.fs as fs - - hdfs_config = {} - # Extract HDFS config from export_extra_args - if "host" in self.export_extra_args: - hdfs_config["host"] = self.export_extra_args.pop("host") - if "port" in self.export_extra_args: - hdfs_config["port"] = int(self.export_extra_args.pop("port")) - if "user" in self.export_extra_args: - hdfs_config["user"] = self.export_extra_args.pop("user") - if "kerb_ticket" in self.export_extra_args: - hdfs_config["kerb_ticket"] = self.export_extra_args.pop("kerb_ticket") - if "extra_conf" in self.export_extra_args: - hdfs_config["extra_conf"] = self.export_extra_args.pop("extra_conf") - - self.hdfs_filesystem = fs.HadoopFileSystem(**hdfs_config) - logger.info(f"Detected HDFS export path: {export_path}. HDFS filesystem configured.") + fs_args = copy.deepcopy(self.export_extra_args) + self.fs = create_filesystem_from_args(export_path, fs_args) + self._check_shard_size() + def _check_shard_size(self): + if self.export_shard_size == 0: + return self.max_shard_size_str = "" # get the string format of shard size @@ -170,25 +141,30 @@ def _export_impl(self, dataset, export_path, columns=None): if len(removed_fields): dataset = dataset.drop_columns(removed_fields) - export_method = RayExporter._router()[self.export_format] + router = self._router() + if self.export_format in router: + export_method = router[self.export_format] + else: + export_method = RayExporter.write_others + export_kwargs = { "export_extra_args": self.export_extra_args, "export_format": self.export_format, } - # Add S3 filesystem if available - if self.s3_filesystem is not None: - export_kwargs["export_extra_args"]["filesystem"] = self.s3_filesystem - elif self.hdfs_filesystem is not None: - export_kwargs["export_extra_args"]["filesystem"] = self.hdfs_filesystem + # Add filesystem if available + if self.fs is not None: + export_kwargs["export_extra_args"]["filesystem"] = self.fs if self.export_shard_size > 0: - # compute the min_rows_per_file for export methods dataset_nbytes = dataset.size_bytes() dataset_num_rows = dataset.count() - num_shards = int(dataset_nbytes / self.export_shard_size) + 1 - num_shards = min(num_shards, dataset_num_rows) - rows_per_file = int(dataset_num_rows / num_shards) - export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file + + if dataset_num_rows > 0: + num_shards = int(dataset_nbytes / self.export_shard_size) + 1 + num_shards = min(num_shards, dataset_num_rows) + rows_per_file = max(1, int(dataset_num_rows / num_shards)) + export_kwargs["export_extra_args"]["min_rows_per_file"] = rows_per_file + return export_method(dataset, export_path, **export_kwargs) def export(self, dataset, columns=None): @@ -260,7 +236,10 @@ def write_others(dataset, export_path, **kwargs): # Add S3 filesystem if available if "filesystem" in export_extra_args: filtered_kwargs["filesystem"] = export_extra_args["filesystem"] - return write_method(export_path, **filtered_kwargs) + if export_path: + return write_method(export_path, **filtered_kwargs) + else: + return write_method(**filtered_kwargs) # suffix to export method @staticmethod diff --git a/data_juicer/utils/file_utils.py b/data_juicer/utils/file_utils.py index aeaa5ab084..7e0cbdfc45 100644 --- a/data_juicer/utils/file_utils.py +++ b/data_juicer/utils/file_utils.py @@ -7,11 +7,12 @@ import warnings from datetime import datetime, timezone from pathlib import Path -from typing import AsyncGenerator, Dict, List, Optional, Union +from typing import AsyncGenerator, Dict, List, Optional, Union, Any import aiohttp import pandas as pd from datasets.utils.extract import ZstdExtractor as Extractor +from loguru import logger from data_juicer.utils.common_utils import dict_to_hash from data_juicer.utils.constant import DEFAULT_PREFIX, Fields @@ -24,6 +25,33 @@ class Sizes: TiB = 2**40 # 1024*1024*1024*1024 +def create_filesystem_from_args(path: str, args: Dict[str, Any]): + """ + Create a PyArrow FileSystem based on the path prefix and parameters. + Automatically extract relevant credentials from args and remove them from args (using pop) to avoid polluting subsequent parameters. + """ + fs = None + if path.startswith("s3://"): + from data_juicer.utils.s3_utils import create_pyarrow_s3_filesystem + + s3_keys = ["aws_access_key_id", "aws_secret_access_key", "aws_session_token", "aws_region", "endpoint_url"] + s3_conf = {k: args.pop(k) for k in s3_keys if k in args} + fs = create_pyarrow_s3_filesystem(s3_conf) + logger.info(f"Detected S3 export path: {path}. S3 filesystem configured.") + + elif path.startswith("hdfs://"): + import pyarrow.fs as pa_fs + + hdfs_keys = ["host", "port", "user", "kerb_ticket", "extra_conf"] + hdfs_conf = {k: args.pop(k) for k in hdfs_keys if k in args} + if "port" in hdfs_conf: + hdfs_conf["port"] = int(hdfs_conf["port"]) + fs = pa_fs.HadoopFileSystem(**hdfs_conf) + logger.info(f"Detected HDFS export path: {path}. HDFS filesystem configured.") + + return fs + + def byte_size_to_size_str(byte_size: int): # get the string format of shard size if byte_size // Sizes.TiB: From 4a24682d1ff44ea0eea37da2e44f170e7e9a476a Mon Sep 17 00:00:00 2001 From: Dludora Date: Wed, 31 Dec 2025 16:25:32 +0800 Subject: [PATCH 09/18] feat: write iceberg --- data_juicer/core/ray_exporter.py | 42 +++++++++++++++++++++++++++++--- 1 file changed, 38 insertions(+), 4 deletions(-) diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index ebdb2d750b..69c017d2e8 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -241,6 +241,43 @@ def write_others(dataset, export_path, **kwargs): else: return write_method(**filtered_kwargs) + def write_iceberg(dataset, export_path, **kwargs): + """ + Export method for iceberg target tables. + It will create the table if it does not exist. + """ + from pyiceberg.catalog import load_catalog + from pyiceberg.exceptions import NoSuchTableError, NoSuchNamespaceError + from pyiceberg.io.pyarrow import pyarrow_to_schema + + export_extra_args = kwargs.get("export_extra_args", {}) + catalog_kwargs = export_extra_args.get("catalog_kwargs", {}) + table_identifier = export_extra_args.get("table_identifier", export_path).split(os.sep)[-1] + + try: + catalog = load_catalog(**catalog_kwargs) + try: + catalog.load_table(table_identifier) + logger.info(f"Iceberg table {table_identifier} exists.") + except NoSuchTableError: + logger.info(f"Iceberg table {table_identifier} not found.") + except Exception as e: + logger.error(f"Failed to check Iceberg table: {e}, export to {export_path}...") + filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) + return dataset.write_json(export_path, filtered_kwargs) + + filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) + return dataset.write_iceberg(table_identifier, **filtered_kwargs) + + @staticmethod + def _router(): + return { + "jsonl": RayExporter.write_json, + "json": RayExporter.write_json, + "webdataset": RayExporter.write_webdataset, + "iceberg": RayExporter.write_iceberg, # 新增这一行 + } + # suffix to export method @staticmethod def _router(): @@ -253,8 +290,5 @@ def _router(): "jsonl": RayExporter.write_json, "json": RayExporter.write_json, "webdataset": RayExporter.write_webdataset, - "parquet": RayExporter.write_others, - "csv": RayExporter.write_others, - "tfrecords": RayExporter.write_others, - "lance": RayExporter.write_others, + "iceberg": RayExporter.write_iceberg, } From ebd23d279c77f642620ec535a892e41461ee2823 Mon Sep 17 00:00:00 2001 From: Dludora Date: Wed, 31 Dec 2025 17:55:15 +0800 Subject: [PATCH 10/18] refractor: move fs --- data_juicer/utils/file_utils.py | 27 --------------------------- data_juicer/utils/s3_utils.py | 27 +++++++++++++++++++++++++++ 2 files changed, 27 insertions(+), 27 deletions(-) diff --git a/data_juicer/utils/file_utils.py b/data_juicer/utils/file_utils.py index 7e0cbdfc45..352d5d5eba 100644 --- a/data_juicer/utils/file_utils.py +++ b/data_juicer/utils/file_utils.py @@ -25,33 +25,6 @@ class Sizes: TiB = 2**40 # 1024*1024*1024*1024 -def create_filesystem_from_args(path: str, args: Dict[str, Any]): - """ - Create a PyArrow FileSystem based on the path prefix and parameters. - Automatically extract relevant credentials from args and remove them from args (using pop) to avoid polluting subsequent parameters. - """ - fs = None - if path.startswith("s3://"): - from data_juicer.utils.s3_utils import create_pyarrow_s3_filesystem - - s3_keys = ["aws_access_key_id", "aws_secret_access_key", "aws_session_token", "aws_region", "endpoint_url"] - s3_conf = {k: args.pop(k) for k in s3_keys if k in args} - fs = create_pyarrow_s3_filesystem(s3_conf) - logger.info(f"Detected S3 export path: {path}. S3 filesystem configured.") - - elif path.startswith("hdfs://"): - import pyarrow.fs as pa_fs - - hdfs_keys = ["host", "port", "user", "kerb_ticket", "extra_conf"] - hdfs_conf = {k: args.pop(k) for k in hdfs_keys if k in args} - if "port" in hdfs_conf: - hdfs_conf["port"] = int(hdfs_conf["port"]) - fs = pa_fs.HadoopFileSystem(**hdfs_conf) - logger.info(f"Detected HDFS export path: {path}. HDFS filesystem configured.") - - return fs - - def byte_size_to_size_str(byte_size: int): # get the string format of shard size if byte_size // Sizes.TiB: diff --git a/data_juicer/utils/s3_utils.py b/data_juicer/utils/s3_utils.py index 650f10f9c6..69a9dae569 100644 --- a/data_juicer/utils/s3_utils.py +++ b/data_juicer/utils/s3_utils.py @@ -117,3 +117,30 @@ def validate_s3_path(path: str) -> None: """ if not path.startswith("s3://"): raise ValueError(f"S3 path must start with 's3://', got: {path}") + + +def create_filesystem_from_args(path: str, args: Dict[str, Any]): + """ + Create a PyArrow FileSystem based on the path prefix and parameters. + Automatically extract relevant credentials from args and remove them from args (using pop) to avoid polluting subsequent parameters. + """ + fs = None + if path.startswith("s3://"): + validate_s3_path(path) + + s3_keys = ["aws_access_key_id", "aws_secret_access_key", "aws_session_token", "aws_region", "endpoint_url"] + s3_conf = {k: args.pop(k) for k in s3_keys if k in args} + fs = create_pyarrow_s3_filesystem(s3_conf) + logger.info(f"Detected S3 export path: {path}. S3 filesystem configured.") + + elif path.startswith("hdfs://"): + import pyarrow.fs as pa_fs + + hdfs_keys = ["host", "port", "user", "kerb_ticket", "extra_conf"] + hdfs_conf = {k: args.pop(k) for k in hdfs_keys if k in args} + if "port" in hdfs_conf: + hdfs_conf["port"] = int(hdfs_conf["port"]) + fs = pa_fs.HadoopFileSystem(**hdfs_conf) + logger.info(f"Detected HDFS export path: {path}. HDFS filesystem configured.") + + return fs From 12a45245e52a239e645d743489cb7e6e9236f093 Mon Sep 17 00:00:00 2001 From: Dludora Date: Wed, 31 Dec 2025 17:55:56 +0800 Subject: [PATCH 11/18] refractor: move fs --- data_juicer/core/ray_exporter.py | 24 ++++++++++++------------ 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index 69c017d2e8..12b4d1aaaa 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -5,7 +5,8 @@ from loguru import logger from data_juicer.utils.constant import Fields, HashKeys -from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str, create_filesystem_from_args +from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str +from data_juicer.utils.s3_utils import create_filesystem_from_args from data_juicer.utils.model_utils import filter_arguments from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format @@ -247,24 +248,23 @@ def write_iceberg(dataset, export_path, **kwargs): It will create the table if it does not exist. """ from pyiceberg.catalog import load_catalog - from pyiceberg.exceptions import NoSuchTableError, NoSuchNamespaceError - from pyiceberg.io.pyarrow import pyarrow_to_schema + from pyiceberg.exceptions import NoSuchTableError export_extra_args = kwargs.get("export_extra_args", {}) catalog_kwargs = export_extra_args.get("catalog_kwargs", {}) - table_identifier = export_extra_args.get("table_identifier", export_path).split(os.sep)[-1] + table_identifier = export_extra_args.get("table_identifier", export_path) + catalog = load_catalog(**catalog_kwargs) try: - catalog = load_catalog(**catalog_kwargs) - try: - catalog.load_table(table_identifier) - logger.info(f"Iceberg table {table_identifier} exists.") - except NoSuchTableError: - logger.info(f"Iceberg table {table_identifier} not found.") + catalog.load_table(table_identifier) + logger.info(f"Iceberg table {table_identifier} exists.") + except NoSuchTableError: + logger.info(f"Iceberg table {table_identifier} not found. exporting to json...") + + filtered_kwargs = filter_arguments(dataset.write_json, export_extra_args) + return dataset.write_json(export_path) except Exception as e: logger.error(f"Failed to check Iceberg table: {e}, export to {export_path}...") - filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) - return dataset.write_json(export_path, filtered_kwargs) filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) return dataset.write_iceberg(table_identifier, **filtered_kwargs) From 2744914c2b3a7021a1a52462a08053cc69afd67a Mon Sep 17 00:00:00 2001 From: Dludora Date: Wed, 31 Dec 2025 17:56:19 +0800 Subject: [PATCH 12/18] feat: delta and hudi --- data_juicer/core/data/load_strategy.py | 103 +++++++++++++++++++++++-- 1 file changed, 98 insertions(+), 5 deletions(-) diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index 2234889140..a93cbb87e6 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -931,16 +931,31 @@ def load_data(self, **kwargs): table_identifier = self.ds_config["table_identifier"] catalog_kwargs = self.ds_config.get("catalog_kwargs", {}) or {} - logger.info(f"Loading Iceberg table: {table_identifier} with catalog kwargs: {catalog_kwargs}") + logger.info(f"Loading Iceberg table.....") try: import ray.data - # Ray's read_iceberg often requires the table input to be a PyIceberg table object - # or arguments to construct one. To be most robust and support authentication, - # we load the table via pyiceberg first, then pass it to Ray. + # from data_juicer.utils.s3_utils import get_aws_credentials + from data_juicer.utils.model_utils import filter_arguments + + # s3_config = {} + # if "s3.access_key_id" in catalog_kwargs: + # s3_config["aws_access_key_id"] = catalog_kwargs.pop("s3.access_key_id") + # if "s3.secret_access_key" in catalog_kwargs: + # s3_config["aws_secret_access_key"] = catalog_kwargs.pop("s3.secret_access_key") + # if "s3.session_token" in catalog_kwargs: + # s3_config["aws_session_token"] = catalog_kwargs.pop("s3.session_token") + # if "s3.region" in catalog_kwargs: + # s3_config["aws_region"] = catalog_kwargs.pop("s3.region") + # if "s3.endpoint" in catalog_kwargs: + # s3_config["endpoint_url"] = catalog_kwargs.pop("s3.endpoint") + + # aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region = get_aws_credentials(s3_config) + + read_config = filter_arguments(ray.data.read_iceberg, self.ds_config) # Ray reads the table distributedly based on the snapshots - dataset = ray.data.read_iceberg(table_identifier=table_identifier, catalog_kwargs=catalog_kwargs) + dataset = ray.data.read_iceberg(**read_config) return RayDataset(dataset, dataset_path=table_identifier, cfg=self.cfg) @@ -951,3 +966,81 @@ def load_data(self, **kwargs): ) except Exception as e: raise RuntimeError(f"Failed to load Iceberg table {table_identifier} in Ray. " f"Error: {str(e)}") + + +@DataLoadStrategyRegistry.register("ray", "remote", "delta") +class RayDeltaDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Delta Lake tables for RayExecutor + Uses ray.data.read_delta + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["path"], + "optional_fields": [ + "aws_access_key_id", + "aws_secret_access_key", + "aws_session_token", + "aws_region", + "endpoint_url", + ], + "field_types": {"path": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_path = self.ds_config["path"] + + logger.info(f"Loading Delta Lake table from path: {table_path}") + try: + import ray.data + from data_juicer.utils.model_utils import filter_arguments + + read_config = filter_arguments(ray.data.read_delta, self.ds_config) + + dataset = ray.data.read_delta( + **read_config, + ) + + return RayDataset(dataset, dataset_path=table_path, cfg=self.cfg) + + except Exception as e: + raise RuntimeError(f"Failed to load Delta Lake table from path {table_path} in Ray. " f"Error: {str(e)}") + + +@DataLoadStrategyRegistry.register("ray", "remote", "hudi") +class RayHudiDataLoadStrategy(RayDataLoadStrategy): + """ + data load strategy for Hudi tables for RayExecutor + Uses ray.data.read_hudi + """ + + CONFIG_VALIDATION_RULES = { + "required_fields": ["table_uri"], + "optional_fields": [], + "field_types": {"path": str}, + "custom_validators": {}, + } + + def load_data(self, **kwargs): + from data_juicer.core.data.ray_dataset import RayDataset + + table_uri = self.ds_config["table_uri"] + + logger.info(f"Loading Hudi table from path: {table_uri}") + try: + import ray.data + from data_juicer.utils.model_utils import filter_arguments + + read_config = filter_arguments(ray.data.read_hudi, self.ds_config) + + dataset = ray.data.read_hudi( + **read_config, + ) + + return RayDataset(dataset, dataset_path=table_uri, cfg=self.cfg) + + except Exception as e: + raise RuntimeError(f"Failed to load Hudi table from path {table_uri} in Ray. " f"Error: {str(e)}") From 79b3e0420f154d6f6f288c21b284e27ca8da45d5 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 5 Jan 2026 14:36:28 +0800 Subject: [PATCH 13/18] refractor: restore file_utils --- data_juicer/utils/file_utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/data_juicer/utils/file_utils.py b/data_juicer/utils/file_utils.py index 352d5d5eba..aeaa5ab084 100644 --- a/data_juicer/utils/file_utils.py +++ b/data_juicer/utils/file_utils.py @@ -7,12 +7,11 @@ import warnings from datetime import datetime, timezone from pathlib import Path -from typing import AsyncGenerator, Dict, List, Optional, Union, Any +from typing import AsyncGenerator, Dict, List, Optional, Union import aiohttp import pandas as pd from datasets.utils.extract import ZstdExtractor as Extractor -from loguru import logger from data_juicer.utils.common_utils import dict_to_hash from data_juicer.utils.constant import DEFAULT_PREFIX, Fields From cc7b07b89da311c449aa2d4b016991d6aa592a5a Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 5 Jan 2026 16:23:38 +0800 Subject: [PATCH 14/18] refractor: add Any type --- data_juicer/utils/s3_utils.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/data_juicer/utils/s3_utils.py b/data_juicer/utils/s3_utils.py index 69a9dae569..719b343d63 100644 --- a/data_juicer/utils/s3_utils.py +++ b/data_juicer/utils/s3_utils.py @@ -6,7 +6,7 @@ """ import os -from typing import Dict, Tuple +from typing import Dict, Tuple, Any import pyarrow.fs from loguru import logger From b4c698c51b09ba9d8468861b64ba418cfcbca5ed Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 5 Jan 2026 17:55:12 +0800 Subject: [PATCH 15/18] feat: fallback ray_expoeter --- data_juicer/core/ray_exporter.py | 44 ++++++++++++++++++++++++-------- 1 file changed, 34 insertions(+), 10 deletions(-) diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index 12b4d1aaaa..3c0772e975 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -242,10 +242,11 @@ def write_others(dataset, export_path, **kwargs): else: return write_method(**filtered_kwargs) + @staticmethod def write_iceberg(dataset, export_path, **kwargs): """ Export method for iceberg target tables. - It will create the table if it does not exist. + Checks for table existence/connectivity. If check fails, safe fall-back to JSON. """ from pyiceberg.catalog import load_catalog from pyiceberg.exceptions import NoSuchTableError @@ -254,20 +255,43 @@ def write_iceberg(dataset, export_path, **kwargs): catalog_kwargs = export_extra_args.get("catalog_kwargs", {}) table_identifier = export_extra_args.get("table_identifier", export_path) - catalog = load_catalog(**catalog_kwargs) + use_iceberg = False + try: + catalog = load_catalog(**catalog_kwargs) catalog.load_table(table_identifier) - logger.info(f"Iceberg table {table_identifier} exists.") - except NoSuchTableError: - logger.info(f"Iceberg table {table_identifier} not found. exporting to json...") + logger.info(f"Iceberg table {table_identifier} exists. Writing to Iceberg.") + use_iceberg = True - filtered_kwargs = filter_arguments(dataset.write_json, export_extra_args) - return dataset.write_json(export_path) + except NoSuchTableError as e: + logger.warning( + f"Iceberg target unavailable ({e.__class__.__name__}). Fallback to exporting to {export_path}..." + ) except Exception as e: - logger.error(f"Failed to check Iceberg table: {e}, export to {export_path}...") + logger.error(f"Unexpected error checking Iceberg: {e}. Fallback to exporting to {export_path}...") + + if use_iceberg: + try: + filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) + return dataset.write_iceberg(table_identifier, **filtered_kwargs) + except Exception as e: + logger.error(f"Write to Iceberg failed during execution: {e}. Fallback to json...") - filtered_kwargs = filter_arguments(dataset.write_iceberg, export_extra_args) - return dataset.write_iceberg(table_identifier, **filtered_kwargs) + suffix = os.path.splitext(export_path)[-1].strip(".").lower() + if not suffix: + suffix = "jsonl" + logger.warning(f"No suffix found in {export_path}, using default fallback: {suffix}") + + logger.info(f"Falling back to file export. Format: [{suffix}], Path: [{export_path}]") + + fallback_kwargs = {} + if "filesystem" in export_extra_args: + fallback_kwargs["filesystem"] = export_extra_args["filesystem"] + if suffix in ["json", "jsonl"]: + return RayExporter.write_json(dataset, export_path, **fallback_kwargs) + else: + fallback_kwargs["export_format"] = suffix + return RayExporter.write_others(dataset, export_path, **fallback_kwargs) @staticmethod def _router(): From 087941c5272845cc666e53a243c8c77d6e50a4f8 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 5 Jan 2026 19:42:05 +0800 Subject: [PATCH 16/18] fix: pyproject --- pyproject.toml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyproject.toml b/pyproject.toml index 5ba39d60e2..fd634c8365 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -118,7 +118,7 @@ distributed = [ "ray[default]>=2.51.0", # distributed computing "pyspark==3.5.5", # distributed data processing "s3fs", # S3 filesystem support for cloud storage - "pyiceberg[pyarrow]" + "pyiceberg[pyarrow]", "bitarray", # efficient arrays of booleans ] From 9d180598bb01759312c3489757e8d32cba987bb3 Mon Sep 17 00:00:00 2001 From: Dludora Date: Mon, 5 Jan 2026 19:46:15 +0800 Subject: [PATCH 17/18] style: code style check --- data_juicer/core/data/load_strategy.py | 6 +++--- data_juicer/core/ray_exporter.py | 13 ++----------- data_juicer/utils/s3_utils.py | 2 +- 3 files changed, 6 insertions(+), 15 deletions(-) diff --git a/data_juicer/core/data/load_strategy.py b/data_juicer/core/data/load_strategy.py index a93cbb87e6..47d06f4e84 100644 --- a/data_juicer/core/data/load_strategy.py +++ b/data_juicer/core/data/load_strategy.py @@ -929,9 +929,8 @@ def load_data(self, **kwargs): from data_juicer.core.data.ray_dataset import RayDataset table_identifier = self.ds_config["table_identifier"] - catalog_kwargs = self.ds_config.get("catalog_kwargs", {}) or {} - logger.info(f"Loading Iceberg table.....") + logger.info("Loading Iceberg table.....") try: import ray.data @@ -949,7 +948,6 @@ def load_data(self, **kwargs): # s3_config["aws_region"] = catalog_kwargs.pop("s3.region") # if "s3.endpoint" in catalog_kwargs: # s3_config["endpoint_url"] = catalog_kwargs.pop("s3.endpoint") - # aws_access_key_id, aws_secret_access_key, aws_session_token, aws_region = get_aws_credentials(s3_config) read_config = filter_arguments(ray.data.read_iceberg, self.ds_config) @@ -996,6 +994,7 @@ def load_data(self, **kwargs): logger.info(f"Loading Delta Lake table from path: {table_path}") try: import ray.data + from data_juicer.utils.model_utils import filter_arguments read_config = filter_arguments(ray.data.read_delta, self.ds_config) @@ -1032,6 +1031,7 @@ def load_data(self, **kwargs): logger.info(f"Loading Hudi table from path: {table_uri}") try: import ray.data + from data_juicer.utils.model_utils import filter_arguments read_config = filter_arguments(ray.data.read_hudi, self.ds_config) diff --git a/data_juicer/core/ray_exporter.py b/data_juicer/core/ray_exporter.py index 3c0772e975..bc87375fbf 100644 --- a/data_juicer/core/ray_exporter.py +++ b/data_juicer/core/ray_exporter.py @@ -1,13 +1,13 @@ -import os import copy +import os from functools import partial from loguru import logger from data_juicer.utils.constant import Fields, HashKeys from data_juicer.utils.file_utils import Sizes, byte_size_to_size_str -from data_juicer.utils.s3_utils import create_filesystem_from_args from data_juicer.utils.model_utils import filter_arguments +from data_juicer.utils.s3_utils import create_filesystem_from_args from data_juicer.utils.webdataset_utils import reconstruct_custom_webdataset_format @@ -293,15 +293,6 @@ def write_iceberg(dataset, export_path, **kwargs): fallback_kwargs["export_format"] = suffix return RayExporter.write_others(dataset, export_path, **fallback_kwargs) - @staticmethod - def _router(): - return { - "jsonl": RayExporter.write_json, - "json": RayExporter.write_json, - "webdataset": RayExporter.write_webdataset, - "iceberg": RayExporter.write_iceberg, # 新增这一行 - } - # suffix to export method @staticmethod def _router(): diff --git a/data_juicer/utils/s3_utils.py b/data_juicer/utils/s3_utils.py index 719b343d63..2a5c76a6a1 100644 --- a/data_juicer/utils/s3_utils.py +++ b/data_juicer/utils/s3_utils.py @@ -6,7 +6,7 @@ """ import os -from typing import Dict, Tuple, Any +from typing import Any, Dict, Tuple import pyarrow.fs from loguru import logger From 2b00595774cd1f52da4f243d6b8481e2c9f256de Mon Sep 17 00:00:00 2001 From: Dludora Date: Tue, 6 Jan 2026 10:18:20 +0800 Subject: [PATCH 18/18] restore: ray_executor s3 --- data_juicer/core/executor/ray_executor.py | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/data_juicer/core/executor/ray_executor.py b/data_juicer/core/executor/ray_executor.py index 8fb0a4527d..2313b18aef 100644 --- a/data_juicer/core/executor/ray_executor.py +++ b/data_juicer/core/executor/ray_executor.py @@ -73,6 +73,26 @@ def __init__(self, cfg: Optional[Namespace] = None): # Prepare export extra args, including S3 credentials if export_path is S3 export_extra_args = dict(self.cfg.export_extra_args) if hasattr(self.cfg, "export_extra_args") else {} + # If export_path is S3, extract AWS credentials with priority: + # 1. export_aws_credentials (export-specific) + # 2. dataset config (for backward compatibility) + # 3. environment variables (handled by exporter) + if self.cfg.export_path.startswith("s3://"): + # Pass export-specific credentials if provided. + # The RayExporter will handle falling back to environment variables or other credential mechanisms. + if hasattr(self.cfg, "export_aws_credentials") and self.cfg.export_aws_credentials: + export_aws_creds = self.cfg.export_aws_credentials + # Iterate through the required fields directly, and copy them to export_extra_args if they exist. + credential_fields = { + "aws_access_key_id", + "aws_secret_access_key", + "aws_session_token", + "aws_region", + "endpoint_url", + } + for field in credential_fields.intersection(export_aws_creds): + export_extra_args[field] = export_aws_creds[field] + self.exporter = RayExporter( self.cfg.export_path, self.cfg.export_type,