Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[REQUEST] Extractor Checkpoints #301

Open
zprobst opened this issue Apr 29, 2024 · 0 comments · Fixed by #384 · 4 remaining pull requests
Open

[REQUEST] Extractor Checkpoints #301

zprobst opened this issue Apr 29, 2024 · 0 comments · Fixed by #384 · 4 remaining pull requests
Assignees
Labels
enhancement New feature or request

Comments

@zprobst
Copy link
Member

zprobst commented Apr 29, 2024

Is your feature request related to a problem? Please describe.

A common pattern in pipelines is to wake up, extract data from a data lake or object store, and then turn it into a graph. However, this can get tricky when the pipeline throws an unhandled exception, is no longer scheduled, or some other change happens that means that it terminates progress. Assuming the user in some way restarts the pipeline, the pipeline will start from the beginning.

A quick google search of "data pipeline checkpoints" will reveal that this is a common pattern/solution. While nodestream doesn't endeavor to have every bell and whistle, this one seems like an obvious win for long running pipelines.

Describe the solution you'd like

The proposed solution has several components:

  1. Allow users to specify an interval that checkpoints occur
  2. Provide an interface that allows extractors to produce a checkpoint: async def checkpoint(self). The extractor can return anything that is pickle-able.
  3. Provide an interface that allows extractors to resume from a checkpoint: async def resume_checkpoint(self, checkpoint): If this method throws an exception, the error will be logged and extract_records will be called like normal.
  4. Introduce a pluggable ObjectStore api that can be used for more than just checkpoints. [REQUEST] Record Schema Inference and Enforcement  #37 would also benefit from this. Possible implementations are (null, tempfile, and s3)

Describe alternatives you've considered
The only other alternative i can think of is for each step to implement that behavior on its own. However, this is troublesome.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment