Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

idempotency, double ask and only once. #673

Open
gedw99 opened this issue Aug 29, 2024 · 9 comments
Open

idempotency, double ask and only once. #673

gedw99 opened this issue Aug 29, 2024 · 9 comments

Comments

@gedw99
Copy link

gedw99 commented Aug 29, 2024

I really like Dagu and the other projects.

I think that I cant use Dagu the way it is now, because it won't protect from race conditions and double work.

Could someone please elaborate on the idempotency of the current architecture ?

For any workflow you kind of have a message queue pattern, in that you want a system to be able to run a workflow, and if it fails, then you it will rerun it and it will only run what did not run before.

For example a message queue has the "only once" or "at least once".

If a Dagu ops occur then we dont really have a way to control what it does. Like a FS change or an email sent. So we cant control that.

If however there was the notion of a BUS or Message Queue then we could.

NATS Jetstream can do inprocess now btw.
https://github.com/nats-io/nats.go/blob/b61c7c554f188ff40e15fb3c4668bad959ccde86/nats.go#L870

And the way they test is to use NATS embedded and InProcess: https://github.com/nats-io/nats.go/blob/b61c7c554f188ff40e15fb3c4668bad959ccde86/test/nats_test.go#L1125

Many examples on line:

https://github.com/search?q=nats.InProcessServer+language%3AGo&type=code&ref=advsearch

Often used for jobs runners !!

https://github.com/synadia-io/rethink_connectivity/blob/8965aa656bbf913b7ac48f89cbc5c29f92a496ef/20-embedding-nats-server/main.go#L63

It already does out of process and cluster and super cluster.
I bring this up, because at least if the DAG pushed all "command" through via NATS, then NATS could keep track of ensuring "only once" for us. If it was already done, it won't do it again.

So you can run Dags and know that if it failed, it will restart and only run what actually did not run last time,

NATS has the Double ACK approach for this. Here is some devs explaining it better than me :)

https://www.reddit.com/r/NATS_io/comments/1b9tjmm/how_does_nats_jetstream_ensure_exactly_once/

@gedw99
Copy link
Author

gedw99 commented Aug 29, 2024

Is very easy o do by just r ordering the ars into nas kv on each run .

then wen i runs again , check in nats kv if I already ran and skip it if I did .

Thee as probably some aspects i am missing.

the UUID representing a workflow can be file name and path . Simple by effective .

@yohamta
Copy link
Collaborator

yohamta commented Aug 29, 2024

Hi, thank you so much for the detailed information. As for the message queue, Dagu currently doesn't include this functionality. It immediately execute the DAG if it's not running or not. NATS is definitely an interesting option, but for now, I prefer to keep things as simple as possible. My plan is to have Dagu focus on being the execution layer, while queuing and task management would be handled by the controller layer, which would be managed by another software.

@gedw99
Copy link
Author

gedw99 commented Sep 2, 2024

Hey @yohamta

Your plan is exactly how I would also do it.

Hi, thank you so much for the detailed information. As for the message queue, Dagu currently doesn't include this functionality. It immediately execute the DAG if it's not running or not. NATS is definitely an interesting option, but for now, I prefer to keep things as simple as possible. My plan is to have Dagu focus on being the execution layer, while queuing and task management would be handled by the controller layer, which would be managed by another software.

Thats pretty much how I was planning to do it too.

I woudl be happy to help.

@ghansham
Copy link

ghansham commented Sep 17, 2024

We are building a queueing system within dagu which actually allows user to manage the jobs in dagu. But we have a configurable parameter called waiting queue length which controls how many dags are running concurrently. Retry jobs dont come under it. Only jobs that are started via post api or UI are considered. We will be try to create a pull request for it soon.

Actually we were following the approach mentioned above but that does not stop user from running as many jobs as he wants from the UI.
And one more issue in above approach is to get the list of running dags from server side. The api/dags/v1 end point is very slow as far as getting number of running dags is concerned. So we have created couple of data structures that keep list of dag ids running and waiting in json format.
Our developer @kriyanshii is working on it.we are using 1.14.3 currently. We are thankful to her.

@kriyanshii
Copy link
Contributor

hey you can check the stable version with queueing here: https://github.com/kriyanshii/dagu/tree/queue.

@gedw99
Copy link
Author

gedw99 commented Oct 7, 2024

hey you can check the stable version with queueing here: https://github.com/kriyanshii/dagu/tree/queue.

Hey @kriyanshii

I just pulled and run the code and its worked well for me.

Launches both the web UI server and scheduler process
# http://127.0.0.1:8080
rio-dagu start-all
time=2024-10-07T23:50:17.624+11:00 level=INFO msg="Server initialization" host=127.0.0.1 port=8080
time=2024-10-07T23:50:17.624+11:00 level=INFO msg="Scheduler initialization" dags=/Users/apple/.config/dagu/dags
time=2024-10-07T23:50:17.624+11:00 level=INFO msg="Scheduler initialized" specs=""
2024/10/07 23:50:17 queue being watched
time=2024-10-07T23:50:17.635+11:00 level=INFO msg="Serving dagu at http://127.0.0.1:8080"
time=2024-10-07T23:50:23.519+11:00 level=INFO msg="Request: GET /api/v1/dags"
time=2024-10-07T23:50:25.991+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:50:26.445+11:00 level=INFO msg="Request: GET /api/v1/dags"
time=2024-10-07T23:50:27.786+11:00 level=INFO msg="Request: GET /api/v1/dags"
time=2024-10-07T23:50:30.487+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:50:32.928+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:50:35.598+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:50:36.212+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:52:03.851+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:52:08.124+11:00 level=INFO msg="Request: GET /api/v1/dags"
time=2024-10-07T23:52:09.672+11:00 level=INFO msg="Request: GET /api/v1/search"
time=2024-10-07T23:52:10.743+11:00 level=INFO msg="Request: GET /api/v1/dags"
^Ctime=2024-10-07T23:53:32.981+11:00 level=INFO msg="Scheduler stopped"
time=2024-10-07T23:53:32.981+11:00 level=INFO msg="Shutting down... "
time=2024-10-07T23:53:32.981+11:00 level=INFO msg="Stopped serving dagu at http://127.0.0.1:8080"
time=2024-10-07T23:53:32.981+11:00 level=INFO msg="Server stopped"

I did not really dig into the queue aspects yet.

If there any API surface for it, or is it inherent ?

@kriyanshii
Copy link
Contributor

it's inherent. will introduce api for purging tho.
so the idea is - initially when you start the dag it will check number of dags currently running and if it is equal to queueLength it will queue it.

@gedw99
Copy link
Author

gedw99 commented Oct 9, 2024

Ah good to know .
I was wondering about its features / logic .

Thanks 🙏

@ghansham
Copy link

This feature may be more useful for non cron dags where the execution time is relatively more and you have data received from sensors at regular intervals for processing. Basically batch prpcessing jobs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

No branches or pull requests

5 participants
@yohamta @ghansham @gedw99 @kriyanshii and others