Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 18 additions & 0 deletions docs/curate-text/process-data/quality-assessment/classifier.md
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,24 @@ The classifier-based filtering process involves:

## Usage

Run classifiers with a Curator pipeline.

```python
from ray_curator.pipeline import Pipeline
from ray_curator.backends.xenna.executor import XennaExecutor
from ray_curator.stages.text.io.reader import JsonlReader
from ray_curator.stages.text.classifiers.domain import DomainClassifier
from ray_curator.stages.text.io.writer import ParquetWriter

pipeline = Pipeline(name="domain_classification")
pipeline.add_stage(JsonlReader(file_paths="input_data/*.jsonl", files_per_partition=8))
pipeline.add_stage(DomainClassifier(filter_by=["Games", "Sports"], pred_column="domain_pred"))
pipeline.add_stage(ParquetWriter(output_dir="/path/to/output/classified"))

executor = XennaExecutor()
pipeline.run(executor)
```

:::{note}
Training fastText classifiers requires using CLI commands. The trained models can then be used with the Python API for filtering datasets.
:::
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,18 @@ NVIDIA NeMo Curator provides a module for performing distributed classification

## How It Works

The distributed data classification in NeMo Curator works by:
Classification pipelines are built from `ray_curator` stages and executed with a distributed executor:

1. **Parallel Processing**: Chunking datasets across multiple computing nodes and GPUs to accelerate classification
2. **Pre-trained Models**: Using specialized models for different classification tasks
3. **Batched Inference**: Optimizing throughput with intelligent batching via CrossFit integration
4. **Consistent API**: Providing a unified interface through the `DistributedDataClassifier` base class

The `DistributedDataClassifier` is designed to run on GPU clusters with minimal code changes regardless of which specific classifier you're using. All classifiers support filtering based on classification results and storing prediction scores as metadata.
- Parallelize with `XennaExecutor` or other backends
- Use pre-trained HF models via `DistributedDataClassifier` subclasses
- Tokenize once with `TokenizerStage` and run batched model inference
- Optionally filter results in the same composite stage

---

## Usage

NVIDIA NeMo Curator provides a base class `DistributedDataClassifier` that can be extended to fit your specific model. The only requirement is that the model can fit on a single GPU. This module operates on the GPU, so the Dask cluster must be started as a GPU cluster, and `DocumentDataset` requires `backend="cudf"`.
Use the built-in classifier stages; models must fit in a single GPU. Run with a distributed executor.

### Classifier Comparison

Expand All @@ -50,60 +48,65 @@ NVIDIA NeMo Curator provides a base class `DistributedDataClassifier` that can b
The Domain Classifier categorizes English text documents into specific domains or subject areas.

```python
from nemo_curator.classifiers import DomainClassifier
from nemo_curator.datasets import DocumentDataset

# Load your dataset with cuDF backend
input_dataset = DocumentDataset.read_json("books_dataset/*.jsonl", backend="cudf")

# Apply the classifier, filtering for specific domains
domain_classifier = DomainClassifier(filter_by=["Games", "Sports"])
result_dataset = domain_classifier(dataset=input_dataset)

# Save the results
result_dataset.to_json("games_and_sports/")
from ray_curator.pipeline.pipeline import Pipeline
from ray_curator.backends.xenna.executor import XennaExecutor
from ray_curator.stages.text.io.reader.jsonl import JsonlReader
from ray_curator.stages.text.classifiers.domain import DomainClassifier
from ray_curator.stages.text.io.writer.jsonl import JsonlWriter

pipeline = Pipeline(name="domain_classification")
pipeline.add_stage(JsonlReader(file_paths="books_dataset/*.jsonl"))
pipeline.add_stage(DomainClassifier(filter_by=["Games", "Sports"]))
pipeline.add_stage(JsonlWriter(output_dir="games_and_sports/"))

XennaExecutor().run(stages=pipeline._stages) # or pipeline.run(XennaExecutor())
```

### Multilingual Domain Classifier

Functionally similar to the Domain Classifier, but supports 52 languages.

```python
from nemo_curator.classifiers import MultilingualDomainClassifier
from ray_curator.stages.text.classifiers.domain import MultilingualDomainClassifier

input_dataset = DocumentDataset.read_json("multilingual_dataset/*.jsonl", backend="cudf")
classifier = MultilingualDomainClassifier(filter_by=["Games", "Sports"])
result_dataset = classifier(dataset=input_dataset)
pipeline = Pipeline(name="multilingual_domain")
pipeline.add_stage(JsonlReader(file_paths="multilingual_dataset/*.jsonl"))
pipeline.add_stage(MultilingualDomainClassifier(filter_by=["Games", "Sports"]))
pipeline.add_stage(JsonlWriter(output_dir="multilingual_domains/"))
pipeline.run(XennaExecutor())
```

### Quality Classifier

The Quality Classifier assesses document quality on a scale from Low to High.

```python
from nemo_curator.classifiers import QualityClassifier
from ray_curator.stages.text.classifiers.quality import QualityClassifier

input_dataset = DocumentDataset.read_json("web_documents/*.jsonl", backend="cudf")
quality_classifier = QualityClassifier(filter_by=["High", "Medium"])
result_dataset = quality_classifier(dataset=input_dataset)
pipeline = Pipeline(name="quality_classifier")
pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl"))
pipeline.add_stage(QualityClassifier(filter_by=["High", "Medium"]))
pipeline.add_stage(JsonlWriter(output_dir="quality_medium_plus/"))
pipeline.run(XennaExecutor())
```

### AEGIS Safety Model

The AEGIS classifier detects unsafe content across 13 critical risk categories. It requires a HuggingFace token for access to Llama Guard.

```python
from nemo_curator.classifiers import AegisClassifier

input_dataset = DocumentDataset.read_json("content/*.jsonl", backend="cudf")
from ray_curator.stages.text.classifiers.aegis import AegisClassifier

token = "hf_1234" # Your HuggingFace user access token
safety_classifier = AegisClassifier(
pipeline = Pipeline(name="aegis")
pipeline.add_stage(JsonlReader(file_paths="content/*.jsonl"))
pipeline.add_stage(AegisClassifier(
aegis_variant="nvidia/Aegis-AI-Content-Safety-LlamaGuard-Defensive-1.0",
token=token,
filter_by=["safe", "O13"] # Keep only safe content and "needs caution" category
)
result_dataset = safety_classifier(dataset=input_dataset)
filter_by=["safe", "O13"],
))
pipeline.add_stage(JsonlWriter(output_dir="safe_and_O13/"))
pipeline.run(XennaExecutor())
```

The classifier adds a column with labels: "safe," "O1" through "O13" (each representing specific safety risks), or "unknown." For raw LLM output, use:
Expand All @@ -122,14 +125,14 @@ safety_classifier = AegisClassifier(
Detects LLM poisoning attacks in instruction-response datasets. Requires HuggingFace token access.

```python
from nemo_curator.classifiers import InstructionDataGuardClassifier

# For instruction-response data: "Instruction: {instruction}. Input: {input_}. Response: {response}."
input_dataset = DocumentDataset.read_json("instruction_data/*.jsonl", backend="cudf")
from ray_curator.stages.text.classifiers.aegis import InstructionDataGuardClassifier

token = "hf_1234" # Your HuggingFace user access token
classifier = InstructionDataGuardClassifier(token=token)
result_dataset = classifier(dataset=input_dataset)
pipeline = Pipeline(name="instruction_data_guard")
pipeline.add_stage(JsonlReader(file_paths="instruction_data/*.jsonl"))
pipeline.add_stage(InstructionDataGuardClassifier(token=token))
pipeline.add_stage(JsonlWriter(output_dir="guard_scores/"))
pipeline.run(XennaExecutor())
```

The output includes two columns: a float score `instruction_data_guard_poisoning_score` and a Boolean `is_poisoned`.
Expand All @@ -139,18 +142,17 @@ The output includes two columns: a float score `instruction_data_guard_poisoning
Scores documents on educational value from 0–5. This helps prioritize content for knowledge-intensive tasks.

```python
from nemo_curator.classifiers import FineWebEduClassifier

input_dataset = DocumentDataset.read_json("web_documents/*.jsonl", backend="cudf")
edu_classifier = FineWebEduClassifier(
batch_size=256,
pred_column="fineweb-edu-score", # Raw float scores
int_column="fineweb-edu-score-int" # Rounded integer scores
)
result_dataset = edu_classifier(dataset=input_dataset)

# Extract highly educational content (scores 4-5)
high_edu_dataset = result_dataset[result_dataset["fineweb-edu-score-int"] >= 4]
from ray_curator.stages.text.classifiers.fineweb_edu import FineWebEduClassifier

pipeline = Pipeline(name="fineweb_edu")
pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl"))
pipeline.add_stage(FineWebEduClassifier(
model_inference_batch_size=256,
pred_column="fineweb-edu-score-float",
int_score_column="fineweb-edu-score-int",
))
pipeline.add_stage(JsonlWriter(output_dir="fineweb_scores/"))
pipeline.run(XennaExecutor())
```

### FineWeb Mixtral and Nemotron Edu Classifiers
Expand All @@ -163,55 +165,48 @@ Similar to the FineWeb Edu Classifier but trained with different annotation sour
Both provide a quality label column marking scores above 2.5 as "high_quality":

```python
from nemo_curator.classifiers import FineWebMixtralEduClassifier # or FineWebNemotronEduClassifier

classifier = FineWebMixtralEduClassifier(
pred_column="score", # Raw float scores
int_column="score-int", # Rounded integer scores
quality_label_column="quality-label" # "high_quality" or "low_quality"
from ray_curator.stages.text.classifiers.fineweb_edu import (
FineWebMixtralEduClassifier,
FineWebNemotronEduClassifier,
)
result_dataset = classifier(dataset=input_dataset)

pipeline = Pipeline(name="fineweb_mixtral")
pipeline.add_stage(JsonlReader(file_paths="web_documents/*.jsonl"))
pipeline.add_stage(FineWebMixtralEduClassifier())
pipeline.add_stage(JsonlWriter(output_dir="fineweb_mixtral/"))
pipeline.run(XennaExecutor())
```

### Content Type Classifier

Categorizes documents into 11 distinct speech types.

```python
from nemo_curator.classifiers import ContentTypeClassifier
from ray_curator.stages.text.classifiers.content_type import ContentTypeClassifier

input_dataset = DocumentDataset.read_json("content/*.jsonl", backend="cudf")
classifier = ContentTypeClassifier(filter_by=["Blogs", "News"])
result_dataset = classifier(dataset=input_dataset)
pipeline = Pipeline(name="content_type")
pipeline.add_stage(JsonlReader(file_paths="content/*.jsonl"))
pipeline.add_stage(ContentTypeClassifier(filter_by=["Blogs", "News"]))
pipeline.add_stage(JsonlWriter(output_dir="blogs_or_news/"))
pipeline.run(XennaExecutor())
```

### Prompt Task and Complexity Classifier

Classifies prompts by task type and complexity dimensions.

```python
from nemo_curator.classifiers import PromptTaskComplexityClassifier
from ray_curator.stages.text.classifiers.prompt_task_complexity import (
PromptTaskComplexityClassifier,
)

input_dataset = DocumentDataset.read_json("prompts/*.jsonl", backend="cudf")
classifier = PromptTaskComplexityClassifier()
result_dataset = classifier(dataset=input_dataset)
pipeline = Pipeline(name="prompt_task_and_complexity")
pipeline.add_stage(JsonlReader(file_paths="prompts/*.jsonl"))
pipeline.add_stage(PromptTaskComplexityClassifier())
pipeline.add_stage(JsonlWriter(output_dir="prompt_task_complexity/"))
pipeline.run(XennaExecutor())
```

## CrossFit Integration

CrossFit is an open-source library by RAPIDS AI for fast offline inference scaled to multi-node multi-GPU environments. It accelerates NVIDIA NeMo Curator's classifiers with:

- PyTorch integration for model inference
- Efficient I/O and tokenization with cuDF
- Smart batching/chunking for optimized processing
- 1.4x-4x performance improvement over Dask + PyTorch baselines

### Sorted Sequence Data Loader

The key feature of CrossFit used in NVIDIA NeMo Curator is the sorted sequence data loader, which optimizes throughput by:

- Sorting input sequences by length
- Grouping similar-length sequences into batches
- Efficiently allocating batches to GPU memory based on estimated memory footprints
## Execution Backends

See the [rapidsai/crossfit](https://github.com/rapidsai/crossfit) repository for more information.
Pipelines can run on different executors. The `XennaExecutor` schedules stages with declared `Resources` over available nodes/GPUs. See `ray_curator.backends.xenna` for details.
65 changes: 23 additions & 42 deletions docs/curate-text/process-data/quality-assessment/heuristic.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,55 +54,36 @@ The filtering process typically involves:

## Usage

::::{tab-set}
Build a Curator pipeline with a reader, one or more `ScoreFilter` stages, and a writer:

:::{tab-item} Python
```python
import nemo_curator as nc
from nemo_curator.datasets import DocumentDataset
from nemo_curator.filters import (
from ray_curator.pipeline.pipeline import Pipeline
from ray_curator.backends.xenna.executor import XennaExecutor
from ray_curator.stages.text.io.reader.jsonl import JsonlReader
from ray_curator.stages.text.filters.heuristic_filter import (
WordCountFilter,
RepeatingTopNGramsFilter,
PunctuationFilter
PunctuationFilter,
)
from ray_curator.stages.text.modules.score_filter import ScoreFilter
from ray_curator.stages.text.io.writer.jsonl import JsonlWriter

pipeline = Pipeline(name="heuristic_filtering")
pipeline.add_stage(
JsonlReader(file_paths="input_data/*.jsonl", files_per_partition=8)
).add_stage(
ScoreFilter(WordCountFilter(min_words=80), text_field="text", score_field="word_count")
).add_stage(
ScoreFilter(PunctuationFilter(max_num_sentences_without_endmark_ratio=0.85), text_field="text")
).add_stage(
ScoreFilter(RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18), text_field="text")
).add_stage(
JsonlWriter(output_dir="high_quality_output/")
)

# Load your dataset
dataset = DocumentDataset.read_json("input_data/*.jsonl")

# Create a filter chain using Sequential
filter_step = nc.Sequential([
nc.ScoreFilter(
WordCountFilter(min_words=80),
text_field="text",
score_field="word_count",
),
nc.ScoreFilter(PunctuationFilter(max_num_sentences_without_endmark_ratio=0.85)),
nc.ScoreFilter(RepeatingTopNGramsFilter(n=2, max_repeating_ngram_ratio=0.2)),
nc.ScoreFilter(RepeatingTopNGramsFilter(n=3, max_repeating_ngram_ratio=0.18)),
nc.ScoreFilter(RepeatingTopNGramsFilter(n=4, max_repeating_ngram_ratio=0.16)),
])

# Apply the filters to get the high-quality subset
high_quality_data = filter_step(dataset)

# Save the results
high_quality_data.to_json("high_quality_output/", write_to_filename=True)
```
:::

:::{tab-item} Command Line
```bash
filter_documents \
--input-data-dir=/path/to/input/data \
--filter-config-file=./config/heuristic_filter_en.yaml \
--output-retained-document-dir=/path/to/output/high_quality \
--output-removed-document-dir=/path/to/output/low_quality \
--output-document-score-dir=/path/to/output/scores \
--log-dir=/path/to/logs/heuristic_filter
executor = XennaExecutor()
pipeline.run(executor)
```
:::

::::

## Available Filters

Expand Down
Loading