diff --git a/docs/curate-text/process-data/quality-assessment/classifier.md b/docs/curate-text/process-data/quality-assessment/classifier.md index d0f94ebc4..f97f46405 100644 --- a/docs/curate-text/process-data/quality-assessment/classifier.md +++ b/docs/curate-text/process-data/quality-assessment/classifier.md @@ -95,6 +95,24 @@ The classifier-based filtering process involves: ## Usage +Run classifiers with a Curator pipeline. + +```python +from ray_curator.pipeline import Pipeline +from ray_curator.backends.xenna.executor import XennaExecutor +from ray_curator.stages.text.io.reader import JsonlReader +from ray_curator.stages.text.classifiers.domain import DomainClassifier +from ray_curator.stages.text.io.writer import ParquetWriter + +pipeline = Pipeline(name="domain_classification") +pipeline.add_stage(JsonlReader(file_paths="input_data/*.jsonl", files_per_partition=8)) +pipeline.add_stage(DomainClassifier(filter_by=["Games", "Sports"], pred_column="domain_pred")) +pipeline.add_stage(ParquetWriter(output_dir="/path/to/output/classified")) + +executor = XennaExecutor() +pipeline.run(executor) +``` + :::{note} Training fastText classifiers requires using CLI commands. The trained models can then be used with the Python API for filtering datasets. ::: diff --git a/docs/curate-text/process-data/quality-assessment/distributed-classifier.md b/docs/curate-text/process-data/quality-assessment/distributed-classifier.md index 7fbf90f94..2074c7f46 100644 --- a/docs/curate-text/process-data/quality-assessment/distributed-classifier.md +++ b/docs/curate-text/process-data/quality-assessment/distributed-classifier.md @@ -15,20 +15,18 @@ NVIDIA NeMo Curator provides a module for performing distributed classification ## How It Works -The distributed data classification in NeMo Curator works by: +Classification pipelines are built from `ray_curator` stages and executed with a distributed executor: -1. **Parallel Processing**: Chunking datasets across multiple computing nodes and GPUs to accelerate classification -2. **Pre-trained Models**: Using specialized models for different classification tasks -3. **Batched Inference**: Optimizing throughput with intelligent batching via CrossFit integration -4. **Consistent API**: Providing a unified interface through the `DistributedDataClassifier` base class - -The `DistributedDataClassifier` is designed to run on GPU clusters with minimal code changes regardless of which specific classifier you're using. All classifiers support filtering based on classification results and storing prediction scores as metadata. +- Parallelize with `XennaExecutor` or other backends +- Use pre-trained HF models via `DistributedDataClassifier` subclasses +- Tokenize once with `TokenizerStage` and run batched model inference +- Optionally filter results in the same composite stage --- ## Usage -NVIDIA NeMo Curator provides a base class `DistributedDataClassifier` that can be extended to fit your specific model. The only requirement is that the model can fit on a single GPU. This module operates on the GPU, so the Dask cluster must be started as a GPU cluster, and `DocumentDataset` requires `backend="cudf"`. +Use the built-in classifier stages; models must fit in a single GPU. Run with a distributed executor. ### Classifier Comparison @@ -50,18 +48,18 @@ NVIDIA NeMo Curator provides a base class `DistributedDataClassifier` that can b The Domain Classifier categorizes English text documents into specific domains or subject areas. ```python -from nemo_curator.classifiers import DomainClassifier -from nemo_curator.datasets import DocumentDataset - -# Load your dataset with cuDF backend -input_dataset = DocumentDataset.read_json("books_dataset/*.jsonl", backend="cudf") - -# Apply the classifier, filtering for specific domains -domain_classifier = DomainClassifier(filter_by=["Games", "Sports"]) -result_dataset = domain_classifier(dataset=input_dataset) - -# Save the results -result_dataset.to_json("games_and_sports/") +from ray_curator.pipeline.pipeline import Pipeline +from ray_curator.backends.xenna.executor import XennaExecutor +from ray_curator.stages.text.io.reader.jsonl import JsonlReader +from ray_curator.stages.text.classifiers.domain import DomainClassifier +from ray_curator.stages.text.io.writer.jsonl import JsonlWriter + +pipeline = Pipeline(name="domain_classification") +pipeline.add_stage(JsonlReader(file_paths="books_dataset/*.jsonl")) +pipeline.add_stage(DomainClassifier(filter_by=["Games", "Sports"])) +pipeline.add_stage(JsonlWriter(output_dir="games_and_sports/")) + +XennaExecutor().run(stages=pipeline._stages) # or pipeline.run(XennaExecutor()) ``` ### Multilingual Domain Classifier @@ -69,11 +67,13 @@ result_dataset.to_json("games_and_sports/") Functionally similar to the Domain Classifier, but supports 52 languages. ```python -from nemo_curator.classifiers import MultilingualDomainClassifier +from ray_curator.stages.text.classifiers.domain import MultilingualDomainClassifier -input_dataset = DocumentDataset.read_json("multilingual_dataset/*.jsonl", backend="cudf") -classifier = MultilingualDomainClassifier(filter_by=["Games", "Sports"]) -result_dataset = classifier(dataset=input_dataset) +pipeline = Pipeline(name="multilingual_domain") +pipeline.add_stage(JsonlReader(file_paths="multilingual_dataset/*.jsonl")) +pipeline.add_stage(MultilingualDomainClassifier(filter_by=["Games", "Sports"])) +pipeline.add_stage(JsonlWriter(output_dir="multilingual_domains/")) +pipeline.run(XennaExecutor()) ``` ### Quality Classifier @@ -81,11 +81,13 @@ result_dataset = classifier(dataset=input_dataset) The Quality Classifier assesses document quality on a scale from Low to High. ```python -from nemo_curator.classifiers import QualityClassifier +from ray_curator.stages.text.classifiers.quality import QualityClassifier -input_dataset = DocumentDataset.read_json("web_documents/*.jsonl", backend="cudf") -quality_classifier = QualityClassifier(filter_by=["High", "Medium"]) -result_dataset = quality_classifier(dataset=input_dataset) +pipeline = Pipeline(name="quality_classifier") +pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl")) +pipeline.add_stage(QualityClassifier(filter_by=["High", "Medium"])) +pipeline.add_stage(JsonlWriter(output_dir="quality_medium_plus/")) +pipeline.run(XennaExecutor()) ``` ### AEGIS Safety Model @@ -93,17 +95,18 @@ result_dataset = quality_classifier(dataset=input_dataset) The AEGIS classifier detects unsafe content across 13 critical risk categories. It requires a HuggingFace token for access to Llama Guard. ```python -from nemo_curator.classifiers import AegisClassifier - -input_dataset = DocumentDataset.read_json("content/*.jsonl", backend="cudf") +from ray_curator.stages.text.classifiers.aegis import AegisClassifier token = "hf_1234" # Your HuggingFace user access token -safety_classifier = AegisClassifier( +pipeline = Pipeline(name="aegis") +pipeline.add_stage(JsonlReader(file_paths="content/*.jsonl")) +pipeline.add_stage(AegisClassifier( aegis_variant="nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0", token=token, - filter_by=["safe", "O13"] # Keep only safe content and "needs caution" category -) -result_dataset = safety_classifier(dataset=input_dataset) + filter_by=["safe", "O13"], +)) +pipeline.add_stage(JsonlWriter(output_dir="safe_and_O13/")) +pipeline.run(XennaExecutor()) ``` The classifier adds a column with labels: "safe," "O1" through "O13" (each representing specific safety risks), or "unknown." For raw LLM output, use: @@ -122,14 +125,14 @@ safety_classifier = AegisClassifier( Detects LLM poisoning attacks in instruction-response datasets. Requires HuggingFace token access. ```python -from nemo_curator.classifiers import InstructionDataGuardClassifier - -# For instruction-response data: "Instruction: {instruction}. Input: {input_}. Response: {response}." -input_dataset = DocumentDataset.read_json("instruction_data/*.jsonl", backend="cudf") +from ray_curator.stages.text.classifiers.aegis import InstructionDataGuardClassifier token = "hf_1234" # Your HuggingFace user access token -classifier = InstructionDataGuardClassifier(token=token) -result_dataset = classifier(dataset=input_dataset) +pipeline = Pipeline(name="instruction_data_guard") +pipeline.add_stage(JsonlReader(file_paths="instruction_data/*.jsonl")) +pipeline.add_stage(InstructionDataGuardClassifier(token=token)) +pipeline.add_stage(JsonlWriter(output_dir="guard_scores/")) +pipeline.run(XennaExecutor()) ``` The output includes two columns: a float score `instruction_data_guard_poisoning_score` and a Boolean `is_poisoned`. @@ -139,18 +142,17 @@ The output includes two columns: a float score `instruction_data_guard_poisoning Scores documents on educational value from 0–5. This helps prioritize content for knowledge-intensive tasks. ```python -from nemo_curator.classifiers import FineWebEduClassifier - -input_dataset = DocumentDataset.read_json("web_documents/*.jsonl", backend="cudf") -edu_classifier = FineWebEduClassifier( - batch_size=256, - pred_column="fineweb-edu-score", # Raw float scores - int_column="fineweb-edu-score-int" # Rounded integer scores -) -result_dataset = edu_classifier(dataset=input_dataset) - -# Extract highly educational content (scores 4-5) -high_edu_dataset = result_dataset[result_dataset["fineweb-edu-score-int"] >= 4] +from ray_curator.stages.text.classifiers.fineweb_edu import FineWebEduClassifier + +pipeline = Pipeline(name="fineweb_edu") +pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl")) +pipeline.add_stage(FineWebEduClassifier( + model_inference_batch_size=256, + pred_column="fineweb-edu-score-float", + int_score_column="fineweb-edu-score-int", +)) +pipeline.add_stage(JsonlWriter(output_dir="fineweb_scores/")) +pipeline.run(XennaExecutor()) ``` ### FineWeb Mixtral and Nemotron Edu Classifiers @@ -163,14 +165,16 @@ Similar to the FineWeb Edu Classifier but trained with different annotation sour Both provide a quality label column marking scores above 2.5 as "high_quality": ```python -from nemo_curator.classifiers import FineWebMixtralEduClassifier # or FineWebNemotronEduClassifier - -classifier = FineWebMixtralEduClassifier( - pred_column="score", # Raw float scores - int_column="score-int", # Rounded integer scores - quality_label_column="quality-label" # "high_quality" or "low_quality" +from ray_curator.stages.text.classifiers.fineweb_edu import ( + FineWebMixtralEduClassifier, + FineWebNemotronEduClassifier, ) -result_dataset = classifier(dataset=input_dataset) + +pipeline = Pipeline(name="fineweb_mixtral") +pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl")) +pipeline.add_stage(FineWebMixtralEduClassifier()) +pipeline.add_stage(JsonlWriter(output_dir="fineweb_mixtral/")) +pipeline.run(XennaExecutor()) ``` ### Content Type Classifier @@ -178,11 +182,13 @@ result_dataset = classifier(dataset=input_dataset) Categorizes documents into 11 distinct speech types. ```python -from nemo_curator.classifiers import ContentTypeClassifier +from ray_curator.stages.text.classifiers.content_type import ContentTypeClassifier -input_dataset = DocumentDataset.read_json("content/*.jsonl", backend="cudf") -classifier = ContentTypeClassifier(filter_by=["Blogs", "News"]) -result_dataset = classifier(dataset=input_dataset) +pipeline = Pipeline(name="content_type") +pipeline.add_stage(JsonlReader(file_paths="content/*.jsonl")) +pipeline.add_stage(ContentTypeClassifier(filter_by=["Blogs", "News"])) +pipeline.add_stage(JsonlWriter(output_dir="blogs_or_news/")) +pipeline.run(XennaExecutor()) ``` ### Prompt Task and Complexity Classifier @@ -190,28 +196,17 @@ result_dataset = classifier(dataset=input_dataset) Classifies prompts by task type and complexity dimensions. ```python -from nemo_curator.classifiers import PromptTaskComplexityClassifier +from ray_curator.stages.text.classifiers.prompt_task_complexity import ( + PromptTaskComplexityClassifier, +) -input_dataset = DocumentDataset.read_json("prompts/*.jsonl", backend="cudf") -classifier = PromptTaskComplexityClassifier() -result_dataset = classifier(dataset=input_dataset) +pipeline = Pipeline(name="prompt_task_and_complexity") +pipeline.add_stage(JsonlReader(file_paths="prompts/*.jsonl")) +pipeline.add_stage(PromptTaskComplexityClassifier()) +pipeline.add_stage(JsonlWriter(output_dir="prompt_task_complexity/")) +pipeline.run(XennaExecutor()) ``` -## CrossFit Integration - -CrossFit is an open-source library by RAPIDS AI for fast offline inference scaled to multi-node multi-GPU environments. It accelerates NVIDIA NeMo Curator's classifiers with: - -- PyTorch integration for model inference -- Efficient I/O and tokenization with cuDF -- Smart batching/chunking for optimized processing -- 1.4x-4x performance improvement over Dask + PyTorch baselines - -### Sorted Sequence Data Loader - -The key feature of CrossFit used in NVIDIA NeMo Curator is the sorted sequence data loader, which optimizes throughput by: - -- Sorting input sequences by length -- Grouping similar-length sequences into batches -- Efficiently allocating batches to GPU memory based on estimated memory footprints +## Execution Backends -See the [rapidsai/crossfit](https://github.com/rapidsai/crossfit) repository for more information. \ No newline at end of file +Pipelines can run on different executors. The `XennaExecutor` schedules stages with declared `Resources` over available nodes/GPUs. See `ray_curator.backends.xenna` for details. \ No newline at end of file diff --git a/docs/curate-text/process-data/quality-assessment/heuristic.md b/docs/curate-text/process-data/quality-assessment/heuristic.md index ba5988da6..d3a232374 100644 --- a/docs/curate-text/process-data/quality-assessment/heuristic.md +++ b/docs/curate-text/process-data/quality-assessment/heuristic.md @@ -54,55 +54,36 @@ The filtering process typically involves: ## Usage -::::{tab-set} +Build a Curator pipeline with a reader, one or more `ScoreFilter` stages, and a writer: -:::{tab-item} Python ```python -import nemo_curator as nc -from nemo_curator.datasets import DocumentDataset -from nemo_curator.filters import ( +from ray_curator.pipeline.pipeline import Pipeline +from ray_curator.backends.xenna.executor import XennaExecutor +from ray_curator.stages.text.io.reader.jsonl import JsonlReader +from ray_curator.stages.text.filters.heuristic_filter import ( WordCountFilter, RepeatingTopNGramsFilter, - PunctuationFilter + PunctuationFilter, +) +from ray_curator.stages.text.modules.score_filter import ScoreFilter +from ray_curator.stages.text.io.writer.jsonl import JsonlWriter + +pipeline = Pipeline(name="heuristic_filtering") +pipeline.add_stage( + JsonlReader(file_paths="input_data/*.jsonl", files_per_partition=8) +).add_stage( + ScoreFilter(WordCountFilter(min_words=80), text_field="text", score_field="word_count") +).add_stage( + ScoreFilter(PunctuationFilter(max_num_sentences_without_endmark_ratio=0.85), text_field="text") +).add_stage( + ScoreFilter(RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18), text_field="text") +).add_stage( + JsonlWriter(output_dir="high_quality_output/") ) -# Load your dataset -dataset = DocumentDataset.read_json("input_data/*.jsonl") - -# Create a filter chain using Sequential -filter_step = nc.Sequential([ - nc.ScoreFilter( - WordCountFilter(min_words=80), - text_field="text", - score_field="word_count", - ), - nc.ScoreFilter(PunctuationFilter(max_num_sentences_without_endmark_ratio=0.85)), - nc.ScoreFilter(RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2)), - nc.ScoreFilter(RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18)), - nc.ScoreFilter(RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16)), -]) - -# Apply the filters to get the high-quality subset -high_quality_data = filter_step(dataset) - -# Save the results -high_quality_data.to_json("high_quality_output/", write_to_filename=True) -``` -::: - -:::{tab-item} Command Line -```bash -filter_documents \ - --input-data-dir=/path/to/input/data \ - --filter-config-file=./config/heuristic_filter_en.yaml \ - --output-retained-document-dir=/path/to/output/high_quality \ - --output-removed-document-dir=/path/to/output/low_quality \ - --output-document-score-dir=/path/to/output/scores \ - --log-dir=/path/to/logs/heuristic_filter +executor = XennaExecutor() +pipeline.run(executor) ``` -::: - -:::: ## Available Filters diff --git a/docs/curate-text/process-data/quality-assessment/index.md b/docs/curate-text/process-data/quality-assessment/index.md index 194c245cf..2f4031a40 100644 --- a/docs/curate-text/process-data/quality-assessment/index.md +++ b/docs/curate-text/process-data/quality-assessment/index.md @@ -9,15 +9,16 @@ modality: "text-only" --- (text-process-data-filter)= + # Quality Assessment & Filtering -Score and remove low-quality content using heuristics and ML classifiers to prepare your data for model training using NeMo Curator's tools and utilities. +Score and remove low-quality content using heuristics and ML classifiers to prepare your data for model training using NVIDIA NeMo Curator tools and utilities. -Large datasets often contain many documents considered to be "low quality." In this context, "low quality" data simply means data we don't want a downstream model to learn from, and "high quality" data is data that we do want a downstream model to learn from. The metrics that define quality can vary widely. +Large datasets often contain documents considered to be "low quality." In this context, "low quality" data means data we do not want a downstream model to learn from, and "high quality" data is data that we do want a downstream model to learn from. The metrics that define quality can vary by use case. ## How It Works -NeMo Curator's filtering framework is built around several key components: +NeMo Curator filtering uses several key components: ::::{tab-set} @@ -26,33 +27,25 @@ NeMo Curator's filtering framework is built around several key components: The `ScoreFilter` is at the center of filtering in NeMo Curator. It applies a filter to a document and optionally saves the score as metadata: ```python -import nemo_curator as nc -from nemo_curator.datasets import DocumentDataset -from nemo_curator.utils.file_utils import get_all_files_paths_under -from nemo_curator.filters import WordCountFilter - -# Load dataset -files = get_all_files_paths_under("books_dataset/", keep_extensions="jsonl") -books = DocumentDataset.read_json(files, add_filename=True) - -# Create and apply filter -filter_step = nc.ScoreFilter( - WordCountFilter(min_words=80), - text_field="text", - score_field="word_count", -) - -# Get filtered dataset -long_books = filter_step(books) - -# Save filtered dataset -long_books.to_json("long_books/", write_to_filename=True) +from ray_curator.pipeline.pipeline import Pipeline +from ray_curator.backends.xenna.executor import XennaExecutor +from ray_curator.stages.text.io.reader.jsonl import JsonlReader +from ray_curator.stages.text.filters.heuristic_filter import WordCountFilter +from ray_curator.stages.text.modules.score_filter import ScoreFilter +from ray_curator.stages.text.io.writer.jsonl import JsonlWriter + +pipeline = Pipeline(name="scorefilter_example") +pipeline.add_stage(JsonlReader(file_paths="books_dataset/*.jsonl", files_per_partition=4)) +pipeline.add_stage(ScoreFilter(WordCountFilter(min_words=80), text_field="text", score_field="word_count")) +pipeline.add_stage(JsonlWriter(output_dir="long_books/")) + +XennaExecutor().run(stages=pipeline._stages) # or pipeline.run(XennaExecutor()) ``` The filter object implements two key methods: - `score_document`: Computes a quality score for a document -- `keep_document`: Determines if a document should be kept based on its score +- `keep_document`: Determines whether to keep a document based on its score ::: @@ -60,67 +53,88 @@ The filter object implements two key methods: For more specific use cases, NeMo Curator provides two specialized modules: -- `Score`: A module that only adds metadata scores to records without filtering +- `Score`: A module that adds metadata scores to records without filtering - Takes a scoring function that evaluates text and returns a score - Adds the score to a specified metadata field - Useful for analysis or multi-stage filtering pipelines ```python -# Example: Score documents without filtering -scoring_step = nc.Score( - WordCountFilter().score_document, # Use just the scoring part - text_field="text", - score_field="word_count" -) -scored_dataset = scoring_step(dataset) +from ray_curator.stages.text.modules.score_filter import Score + +# Add a score without filtering (as a pipeline stage) +pipeline.add_stage(Score(WordCountFilter().score_document, text_field="text", score_field="word_count")) ``` - `Filter`: A module that filters based on pre-computed metadata - Takes a filter function that evaluates metadata and returns True/False - - Only uses existing metadata fields (doesn't compute new scores) + - Uses existing metadata fields (doesn't compute new scores) - Efficient for filtering on pre-computed metrics ```python -# Example: Filter using pre-computed scores -filter_step = nc.Filter( - lambda score: score >= 100, # Keep documents with score >= 100 - filter_field="word_count" -) -filtered_dataset = filter_step(scored_dataset) +from ray_curator.stages.text.modules.score_filter import Filter + +# Filter using a pre-computed column +pipeline.add_stage(Filter(lambda score: score >= 100, filter_field="word_count")) ``` You can combine these modules in pipelines: ```python -pipeline = nc.Sequential([ - nc.Score(word_counter, score_field="word_count"), - nc.Score(symbol_counter, score_field="symbol_ratio"), - nc.Filter(lambda x: x >= 100, filter_field="word_count"), - nc.Filter(lambda x: x <= 0.3, filter_field="symbol_ratio") -]) +# Compose as pipeline stages +pipeline.add_stage(Score(word_counter, score_field="word_count")) +pipeline.add_stage(Score(symbol_counter, score_field="symbol_ratio")) +pipeline.add_stage(Filter(lambda x: x >= 100, filter_field="word_count")) +pipeline.add_stage(Filter(lambda x: x <= 0.3, filter_field="symbol_ratio")) ``` ::: -:::{tab-item} Batched Filtering +:::{tab-item} DocumentBatch + +`DocumentBatch` is the core task type used for text processing stages in Curator. It wraps a Pandas DataFrame or PyArrow Table and flows between stages in your pipeline. Filtering stages (for example, `Score`, `Filter`, `ScoreFilter`, and classifier stages) read and write columns on the underlying data frame within a `DocumentBatch`. -For improved performance, NeMo Curator supports batch processing using the `@batched` decorator: +For higher throughput, you can batch several `DocumentBatch` tasks by setting a stage `batch_size` and implementing `process_batch(self, tasks: list[DocumentBatch])`. This lets your stage operate on several batches at once while keeping each batch’s data frame semantics intact. ```python -from nemo_curator.utils.decorators import batched -import pandas as pd - -class BatchedFilter(DocumentFilter): - @batched - def keep_document(self, scores: pd.Series): - # Process multiple documents in one operation - return scores > 10 +from dataclasses import dataclass +from ray_curator.stages.base import ProcessingStage +from ray_curator.tasks.document import DocumentBatch + +@dataclass +class ThresholdFilter(ProcessingStage[DocumentBatch, DocumentBatch]): + score_field: str + threshold: float + _name: str = "threshold_filter" + _batch_size: int = 8 # executor groups tasks in batches of 8 + + def inputs(self): + return ["data"], [self.score_field] + + def outputs(self): + return ["data"], [] + + def process_batch(self, tasks: list[DocumentBatch]) -> list[DocumentBatch]: + outputs: list[DocumentBatch] = [] + for batch in tasks: + df = batch.to_pandas() + df = df[df[self.score_field] > self.threshold] + outputs.append( + DocumentBatch( + task_id=batch.task_id, + dataset_name=batch.dataset_name, + data=df, + _metadata=batch._metadata, + _stage_perf=batch._stage_perf, + ) + ) + return outputs + +# Configure batch size via with_() if preferred +stage = ThresholdFilter(score_field="word_count", threshold=100).with_(batch_size=8) ``` -The batched processing can significantly improve performance on large datasets by: -- Reducing function call overhead -- Enabling vectorized operations -- Optimizing memory usage +- Existing modules like `Score`, `Filter`, and classifier stages operate within a `DocumentBatch`; for peak throughput, prefer vectorized operations inside your stage or add `process_batch` as shown. +- Model-based stages already batch internally using settings such as `model_inference_batch_size`. ::: @@ -166,7 +180,7 @@ GPU-accelerated classification with pre-trained models :::{grid-item-card} {octicon}`terminal;1.5em;sd-mr-1` Custom Filters :link: custom :link-type: doc -Implement and combine your own custom filters +Create and combine your own custom filters +++ {bdg-secondary}`custom` {bdg-secondary}`flexible` @@ -177,46 +191,31 @@ Implement and combine your own custom filters ## Usage -NeMo Curator provides a CLI tool for document filtering that becomes available after installing the package: +Use the Curator pipeline to read data, apply filters/classifiers, and write results. Example: -```bash -filter_documents \ - --input-data-dir=/path/to/input/data \ - --filter-config-file=./config/heuristic_filter_en.yaml \ - --output-retained-document-dir=/path/to/output/high_quality \ - --output-removed-document-dir=/path/to/output/low_quality \ - --output-document-score-dir=/path/to/output/scores \ - --num-workers=4 -``` - -For distributed processing with multiple workers: +```python +from ray_curator.pipeline import Pipeline +from ray_curator.backends.xenna.executor import XennaExecutor +from ray_curator.stages.text.io.reader.jsonl import JsonlReader +from ray_curator.stages.text.filters.heuristic_filter import WordCountFilter, RepeatingTopNGramsFilter +from ray_curator.stages.text.modules import ScoreFilter +from ray_curator.stages.text.io.writer import JsonlWriter + +pipeline = Pipeline(name="quality_filtering") +pipeline.add_stage( + JsonlReader(file_paths="/path/to/input/*.jsonl", files_per_partition=4) +).add_stage( + ScoreFilter(WordCountFilter(min_words=80), text_field="text", score_field="word_count") +).add_stage( + ScoreFilter(RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18), text_field="text") +).add_stage( + JsonlWriter(output_dir="/path/to/output/high_quality") +) -```bash -filter_documents \ - --input-data-dir=/path/to/input/data \ - --filter-config-file=./config/heuristic_filter_en.yaml \ - --output-retained-document-dir=/path/to/output/high_quality \ - --num-workers=8 \ - --device=gpu \ - --log-dir=./logs +executor = XennaExecutor() +pipeline.run(executor) ``` -### CLI Parameters - -| Parameter | Description | Required | -|-----------|-------------|----------| -| `--input-data-dir` | Directory containing input JSONL files | Yes | -| `--filter-config-file` | YAML configuration for the filter pipeline | Yes | -| `--output-retained-document-dir` | Directory for documents passing filters | Yes | -| `--output-removed-document-dir` | Directory for rejected documents | No | -| `--output-document-score-dir` | Directory for storing score metadata | No | -| `--log-dir` | Directory for storing logs | No | -| `--num-workers` | Number of Dask workers for distributed processing | No | -| `--scheduler-address` | Address of Dask scheduler for distributed processing | No | -| `--device` | Processing device: `cpu` or `gpu` (default: `cpu`) | No | -| `--input-file-type` | Input file format: `jsonl`, `parquet`, etc. (default: `jsonl`) | No | -| `--output-file-type` | Output file format: `jsonl`, `parquet`, etc. (default: `jsonl`) | No | - ```{toctree} :maxdepth: 4 :titlesonly: @@ -227,13 +226,3 @@ Classifier Filters Distributed Classification Custom Filters ``` - -## Best Practices - -When filtering large datasets, consider these performance tips: - -1. **Order matters**: Place computationally inexpensive filters early in your pipeline -2. **Batch size tuning**: Adjust batch sizes based on your hardware capabilities -3. **Use vectorization**: Implement batched methods for compute-intensive filters -4. **Disk I/O**: Consider compression and chunking strategies for large datasets -5. **Distributed processing**: For TB-scale datasets, use distributed filtering with Dask workers (`--num-workers`) or connect to an existing Dask cluster (`--scheduler-address`) \ No newline at end of file