Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Checkpointer step #1114

Merged
merged 11 commits into from
Jan 28, 2025
Merged

Add Checkpointer step #1114

merged 11 commits into from
Jan 28, 2025

Conversation

plaguss
Copy link
Contributor

@plaguss plaguss commented Jan 28, 2025

Description

Adds a first version of Checkpointer step to write data to the hub while the pipeline executes, useful for longer pipelines.

from datasets import Dataset

from distilabel.pipeline import Pipeline
from distilabel.steps import Checkpointer

dataset = Dataset.from_dict({"a": [1, 2, 3, 4] * 50, "b": [5, 6, 7, 8] * 50})

from distilabel.steps.base import Step, StepInput
from typing import TYPE_CHECKING

if TYPE_CHECKING:
    from distilabel.typing import StepOutput

class DoNothing(Step):
    def process(self, *inputs: StepInput) -> "StepOutput":
        for input in inputs:
            yield input

with Pipeline(name="pipeline-with-checkpoints") as pipeline:
    text_generation = DoNothing(
        input_batch_size=60
    )
    checkpoint = Checkpointer(
        repo_id="plaguss/streaming_test_1",
        private=True,
        input_batch_size=50
    )
    text_generation >> checkpoint


if __name__ == "__main__":
    distiset = pipeline.run(
        dataset=dataset,
        use_cache=False
    )
    distiset.push_to_hub(repo_id="plaguss/streaming_test")

logs for the sample pipeline:

[01/28/25 13:37:43] INFO     ['distilabel.pipeline'] 📝 Pipeline data will be written to                                                                                                           base.py:1033
                             '/Users/agus/.cache/distilabel/pipelines/simple-text-generation-pipeline/a2ac64d816f67e4080cfe18d32edb52dc7310f04/executions/dc6b0ffa74e408501f561b8d5c5df2f4d983a5b1             
                             /data/steps_outputs'                                                                                                                                                              
                    INFO     ['distilabel.pipeline'] ⌛ The steps of the pipeline will be loaded in stages:                                                                                        base.py:1065
                              * Legend: 🚰 GeneratorStep 🌐 GlobalStep 🔄 Step                                                                                                                                 
                              * Stage 0:                                                                                                                                                                       
                                - 🚰 'load_data_from_hub_0'                                                                                                                                                    
                                - 🔄 'do_nothing_0'                                                                                                                                                            
                                - 🔄 'checkpointer_0'                                                                                                                                                          
[01/28/25 13:37:44] INFO     ['distilabel.pipeline'] ⏳ Waiting for all the steps of stage 0 to load...                                                                                            base.py:1401
[01/28/25 13:37:45] INFO     ['distilabel.step.checkpointer_0'] Creating repo plaguss/streaming_test_1                                                                                       checkpointer.py:38
[01/28/25 13:37:46] INFO     ['distilabel.pipeline'] ⏳ Steps from stage 0 loaded: 3/3                                                                                                             base.py:1437
                              * 'do_nothing_0' replicas: 1/1                                                                                                                                                   
                              * 'checkpointer_0' replicas: 1/1                                                                                                                                                 
                              * 'load_data_from_hub_0' replicas: 1/1                                                                                                                                           
                    INFO     ['distilabel.pipeline'] ✅ All the steps from stage 0 have been loaded!                                                                                               base.py:1441
                    INFO     ['distilabel.step.load_data_from_hub_0'] 🚰 Starting yielding batches from generator step 'load_data_from_hub_0'. Offset: 0                                    step_wrapper.py:179
                    INFO     ['distilabel.step.load_data_from_hub_0'] 📨 Step 'load_data_from_hub_0' sending batch 0 to output queue                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.load_data_from_hub_0'] 📨 Step 'load_data_from_hub_0' sending batch 1 to output queue                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.do_nothing_0'] 📦 Processing batch 0 in 'do_nothing_0' (replica ID: 0)                                                                       step_wrapper.py:229
                    INFO     ['distilabel.step.do_nothing_0'] 📨 Step 'do_nothing_0' sending batch 0 to output queue                                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.checkpointer_0'] 📦 Processing batch 0 in 'checkpointer_0' (replica ID: 0)                                                                   step_wrapper.py:229
                    INFO     ['distilabel.step.load_data_from_hub_0'] 📨 Step 'load_data_from_hub_0' sending batch 2 to output queue                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.do_nothing_0'] 📦 Processing batch 1 in 'do_nothing_0' (replica ID: 0)                                                                       step_wrapper.py:229
                    INFO     ['distilabel.step.do_nothing_0'] 📨 Step 'do_nothing_0' sending batch 1 to output queue                                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.load_data_from_hub_0'] 📨 Step 'load_data_from_hub_0' sending batch 3 to output queue                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.load_data_from_hub_0'] 🏁 Finished running step 'load_data_from_hub_0' (replica ID: 0)                                                       step_wrapper.py:129
                    INFO     ['distilabel.step.do_nothing_0'] 📦 Processing batch 2 in 'do_nothing_0' (replica ID: 0)                                                                       step_wrapper.py:229
                    INFO     ['distilabel.step.do_nothing_0'] 📨 Step 'do_nothing_0' sending batch 2 to output queue                                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.do_nothing_0'] 📦 Processing batch 3 in 'do_nothing_0' (replica ID: 0)                                                                       step_wrapper.py:229
                    INFO     ['distilabel.step.do_nothing_0'] 📨 Step 'do_nothing_0' sending batch 3 to output queue                                                                        step_wrapper.py:289
                    INFO     ['distilabel.step.do_nothing_0'] 🏁 Finished running step 'do_nothing_0' (replica ID: 0)                                                                       step_wrapper.py:129
[01/28/25 13:37:47] INFO     ['distilabel.step.checkpointer_0'] Uploaded checkpoint 0-0                                                                                                      checkpointer.py:56
                    INFO     ['distilabel.step.checkpointer_0'] 📨 Step 'checkpointer_0' sending batch 0 to output queue                                                                    step_wrapper.py:289
                    INFO     ['distilabel.step.checkpointer_0'] 📦 Processing batch 1 in 'checkpointer_0' (replica ID: 0)                                                                   step_wrapper.py:229
                    INFO     ['distilabel.step.checkpointer_0'] Uploaded checkpoint 0-1                                                                                                      checkpointer.py:56
                    INFO     ['distilabel.step.checkpointer_0'] 📨 Step 'checkpointer_0' sending batch 1 to output queue                                                                    step_wrapper.py:289
                    INFO     ['distilabel.step.checkpointer_0'] 📦 Processing batch 2 in 'checkpointer_0' (replica ID: 0)                                                                   step_wrapper.py:229
[01/28/25 13:37:48] INFO     ['distilabel.step.checkpointer_0'] Uploaded checkpoint 0-2                                                                                                      checkpointer.py:56
                    INFO     ['distilabel.step.checkpointer_0'] 📨 Step 'checkpointer_0' sending batch 2 to output queue                                                                    step_wrapper.py:289
                    INFO     ['distilabel.step.checkpointer_0'] 📦 Processing batch 3 in 'checkpointer_0' (replica ID: 0)                                                                   step_wrapper.py:229
                    INFO     ['distilabel.step.checkpointer_0'] Uploaded checkpoint 0-3                                                                                                      checkpointer.py:56
                    INFO     ['distilabel.step.checkpointer_0'] 📨 Step 'checkpointer_0' sending batch 3 to output queue                                                                    step_wrapper.py:289
                    INFO     ['distilabel.step.checkpointer_0'] 🏁 Finished running step 'checkpointer_0' (replica ID: 0)                                                                   step_wrapper.py:129
Generating train split: 200 examples [00:00, 107353.57 examples/s]
Creating parquet from Arrow format: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 1449.31ba/s]
Uploading the dataset shards: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00,  1.31it/s]

@plaguss plaguss requested a review from gabrielmbmb January 28, 2025 11:03
Copy link

Documentation for this PR has been built. You can view it at: https://distilabel.argilla.io/pr-1114/

Copy link

codspeed-hq bot commented Jan 28, 2025

CodSpeed Performance Report

Merging #1114 will degrade performances by 35.85%

Comparing stream-push-to-hub (61f3387) with develop (84ea198)

Summary

❌ 1 regressions

⚠️ Please fix the performance issues or acknowledge them on CodSpeed.

Benchmarks breakdown

Benchmark BASE HEAD Change
test_cache_time 557.5 ms 869.2 ms -35.85%

Copy link
Member

@gabrielmbmb gabrielmbmb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM! just some comments

from huggingface_hub import HfApi


class Checkpointer(Step):
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe it's better if we call it HuggingFaceHubCheckpointer?

Comment on lines +107 to +111
if not self._api.repo_exists(repo_id=self.repo_id, repo_type="dataset"):
self._logger.info(f"Creating repo {self.repo_id}")
self._api.create_repo(
repo_id=self.repo_id, repo_type="dataset", private=self.private
)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's move this logic to a method that can be unit tested

for i, input in enumerate(inputs):
# Each section of *inputs corresponds to a different configuration of the pipeline
with tempfile.NamedTemporaryFile(
mode="w", suffix=".jsonl", delete=False
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why delete=False instead of letting the context manager handle that?

@plaguss plaguss marked this pull request as ready for review January 28, 2025 19:05
@plaguss plaguss merged commit f5ddbc6 into develop Jan 28, 2025
6 of 7 checks passed
@plaguss plaguss deleted the stream-push-to-hub branch January 28, 2025 19:05
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants