Skip to content

Conversation

sarahyurick
Copy link
Contributor

@sarahyurick sarahyurick commented Sep 10, 2025

Re-adds https://github.com/NVIDIA-NeMo/Curator/tree/dask/tutorials/llama-nemotron-data-curation with Ray backend instead of Dask.

Related issue: #706.

TODO:

  • Verify reasoning on versus reasoning off composition of the input data
syurick@ipp1-3302:~$ jq -r '.reasoning' /raid/syurick/Llama-Nemotron-Post-Training-Dataset/SFT/chat/chat.jsonl | awk '{counts[$0]++} END {for (k in counts) print counts[k], k}' | sort -nr
31218 off
8574 on
syurick@ipp1-3302:~$ jq -r '.reasoning' /raid/syurick/Llama-Nemotron-Post-Training-Dataset/SFT/math/math_v1.1.jsonl | awk '{counts[$0]++} END {for (k in counts) print counts[k], k}' | sort -nr
2225427 on
  • Verify correctness for heuristic filters
  • Verify correctness for model filters
  • Verify end-to-end run
  • If time allows: generate filtering statistics

Copy link

copy-pr-bot bot commented Sep 10, 2025

Auto-sync is disabled for draft pull requests in this repository. Workflows must be run manually.

Contributors can view more details about this message here.

Copy link
Contributor Author

@sarahyurick sarahyurick left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@Maghoumi left some open comments, but nothing major IMO. Please let me know what you think.

- Remove non-English samples
- Remove samples with total length (system prompt, input, and output responses) longer than 16k tokens (with chat template applied via the tokenizer)
- Remove samples with output responses longer than 8k tokens (with chat template applied via the tokenizer)
- Remove any columns specified by the `--remove-columns` parameter. We recommend removing the columns specified above (so that the remaining columns are the "input", "output", and "completion_token_count" columns)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this reasonable? Does training.jsonl need to keep any additional columns?

Comment on lines +252 to +256
# For maximum accuracy it should be tokenize=True here,
# but for speedups we just use the length of the base string instead of tokenizing it
# (we consider this to be acceptable since 1 token is approx 4 characters for English
# and we are only using the CompletionTokenCountFilter as a proxy for text complexity)
tokenize=False,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IIRC we aligned on this when implementing the Dask tutorial. Just wanted to double check that this is still okay.

def _setup(self, local_files_only: bool = True) -> None:
self.tokenizer = AutoTokenizer.from_pretrained(
self.tokenizer_identifier,
use_fast=True, # TODO: Test without this
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is from the Dask implementation. I want to verify that it still helps.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No time difference here, so I will remove use_fast=True.



@ray.remote
def split_jsonl_by_size(input_file: str, target_size_mb: int, output_dir: str, output_prefix: str) -> None:
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need this function to essentially enable partitions of single JSONL files. It is basically mimicking reshard_jsonl which was deprecated in the Dask to Ray migration: https://github.com/NVIDIA-NeMo/Curator/blob/dask/nemo_curator/scripts/make_data_shards.py

I think it is okay for just the tutorial for now, but we should add it back to the codebase eventually.

Comment on lines +60 to +71
def interleave_datasets(dir1: str, dir2: str, out_path: str) -> None:
gen1 = stream_jsonl_files(dir1)
gen2 = stream_jsonl_files(dir2)

with open(out_path, "w") as out:
for line1, line2 in itertools.zip_longest(gen1, gen2):
if line1 is not None:
out.write(line1 + "\n")
if line2 is not None:
out.write(line2 + "\n")

print(f"Interleaved datasets from directories {dir1} and {dir2} into file {out_path}")
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This function interleaves 2 datasets, row by row. In the Dask version, we had the option to interleave per row or per partition of data (with interleaving per partition of data as the default). Should we maintain that logic in the Ray version too?

Signed-off-by: Sarah Yurick <[email protected]>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants