Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

FEATURE: Allow jobManager to be interrupted by system SIGINT #48

Open
wants to merge 1 commit into
base: main
Choose a base branch
from

Conversation

fcool
Copy link
Contributor

@fcool fcool commented Sep 27, 2020

This is a first draft, to checkout if this is something you are even going to accept. (It is fully functionally though - but it needs to be documented - which I'd love to do, if you accept)

This change allows the JobWorker to be interrupted on the Command-Line by pressing CTRL+C or sending a SIGINT-Signal at a safe state (it will only be interrupted if the last job had been completed successfully or a timeout occured)

Additionally the JobManager provides a new Message queues might use to signal, they are now safe to interrupt (ideally before they go sleeping in their own "waiting for timeout" handling)

@fcool
Copy link
Contributor Author

fcool commented Sep 27, 2020

As I had no Idea how this kind of behavior can be tested I wrote some very stupid application to ensure my assumptions. If you would like to see this, or have an idea how this could be tested (the JobCommand is currently not tested at all), I am thankful for any suggestion

@fcool
Copy link
Contributor Author

fcool commented Oct 7, 2020

Any feedback is highly welcome. Otherwise I would need to maintain an own fork, which I would love to avoid.

Copy link
Contributor

@bwaidelich bwaidelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hey @fcool,

thanks a lot for this contribution and sorry for the late reaction.
I think this totally makes sense but I need to get more into PCNTL to fully understand it. Left some questions and comments for now

@@ -84,6 +90,12 @@ public function workCommand($queue, $exitAfter = null, $limit = null, $verbose =
}
try {
$message = $this->jobManager->waitAndExecute($queue, $timeout);
$this->jobManager->interruptMe();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this method called here? This will be invoked for every successfully processed message in the queue. Does that make sense?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah. By calling this message, it allows the jobManager to be interrputed in this (and only in this) moment. And this is definitely a safe moment, because the last message has been "completed" (maybe by timeout, or successful execution). Usually it will probably simply do nothing.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

OK, I think my confusion came from the name since this doesn't always actually interrupt obviously.

Classes/Job/JobManager.php Show resolved Hide resolved
*
* @throws InterruptException
*/
public function interruptMe(): void
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nitpick: I find the name interruptMe a little weird. Why not just JobManager::interrupt()

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Naming things is always the hardes task in programming. The idea had been, to find a name, which makes clear for the using classes, that this is more a less a signal to the job manager, that this would be a safe point for an interruption. Your suggestion is totally fine for me, if this makes more sense for you.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was confused by the imperative verb. But since this doesn't actually interrupt anything I would suggest to call this something like processPcntlSignals() or so instead.. But this leaves me wondering: Why do we need this methond in the JobManager at all? Can't we just move this line to the CommandController?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is there to make it easy for queues to signal their "safe state" to the job manager. They should not need to be aware of the CommandController. And if no Signal handler was registered, it just do nothing, so means no harm to call.

If you think the signal handler could be managed by the JobManager I'm with you. I had reasons to nut put it there - but I currently do not remember

@@ -74,6 +75,11 @@ public function workCommand($queue, $exitAfter = null, $limit = null, $verbose =
}
$this->outputLine('...');
}
if (function_exists('pcntl_signal')) {
pcntl_signal(SIGINT, static function () {
throw new InterruptException('Interrupted by SIGINT');
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
throw new InterruptException('Interrupted by SIGINT');
throw new InterruptException('Interrupted by SIGINT', 1602072222);

I forgot in my first round: This is missing a unique exception number. We usually add the unix timestamp to the throw calls in order to be able to trace them back to the calling side more easily

Copy link
Contributor

@bwaidelich bwaidelich left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for clarifying.

A ticket with a minimal example would be awesome in order to grasp & communicate this feature better. Would you be up for creating one?

@fcool
Copy link
Contributor Author

fcool commented Oct 7, 2020

What do you mean with "minimal example"? My struggle here is: On the command line it is quite easily testable. Just press CTRL+C after starting a worker command. In the current state it will interrupt anything immediately. But thats not that nice, if the job was already running for several hours and is near its completion. So my example project has a simple sleep job.

As said, if have no idea how to test this with a single threaded php-unit test. As this would run unnecessary long, call a command worker in unknown environment loosing the test scope, etc... But maybe at this place its just me being unexperienced.

@fcool
Copy link
Contributor Author

fcool commented Oct 7, 2020

But of course, I can open an issue, when this is the missing part. ;)

@bwaidelich
Copy link
Contributor

What do you mean with "minimal example"?

On the command line it is quite easily testable

Right. But for that scenario we wouldn't need to adjust the JobManager, would we?

So my example project has a simple sleep job.

That's exactly the kind of example I was thinking of :)
Is that project public somewhere and/or can we provide an example of such a job/queue that makes use of the signals? I couldn't wrap my head around that part yet

@fcool
Copy link
Contributor Author

fcool commented Oct 7, 2020

No problem. I build specifically for this part an own test project.
So no hassle to share it. To get the perfect view, a patch in Flowpack/jobqueue-doctrine makes sense. (only the one line calling interruptMe)

Do you want it with, or without?

I would attach my Test scenarios to the readme.md so you could repeat, what I played with

@bwaidelich
Copy link
Contributor

I just would like to understand this feature better. So a little code snippet would be enough:

  • What line is needed in jobqueue-doctrine and would we have to add it in other implementations, too?
  • How to use this from a custom job/queue?

@fcool
Copy link
Contributor Author

fcool commented Oct 7, 2020

Understood. And completely feel the urge. That asynchronous stuff is hard to wrap ones head around. Especially with the single threaded PHP processes. I had to learn about pcntl and the way it handles interruptions, too. The usual advice is to set declare(ticks=1) which I absolutely disliked. I used the symfony/messenger component as source of inspirtation. ;)

I will publish the test project and with it some lines documenting, in which way the proposed changes changes the game. Cannot promise to do it today. But at latest tomorrow.

@fcool
Copy link
Contributor Author

fcool commented Oct 19, 2020

Wow... sorry. Got unexpectedly a bit busy. Will deliver the next days, if not today (bringing some pressure on me in this topic ;) )

@fcool
Copy link
Contributor Author

fcool commented Oct 25, 2020

Here is the example Project:
https://github.com/fcool/JobQueueSigIntDemonstration

@bwaidelich
Copy link
Contributor

@fcool thanks a lot for putting so much effort into an example project. But do I get it right that it actually doesn't do anything special but providing a patch for the doctrine implementation?
So, to respond to my question from above:
Queues that use sleep to wait for new jobs can now invoke

JobManager::interruptMe();

in order to interrupt processing if requested by the user.
Correct?

@fcool
Copy link
Contributor Author

fcool commented Oct 25, 2020

Now you lost me completely.
No the example project is only there to demonstrate the behvior. The doctrine patch is only "sugar" and not needed to observe the game changing behavior.
Ant that is, that you are able to send SIGINT (by pressing CTRL+C) to your worker, WITHOUT interrupting the working task. So if you go back to master of jobqueue and make the same (start the dummy task and press ctrl+c) you will have a "reserved" job in the database, staing there forewer.
In the "changed" version (the one from this PR), used in the example process, a current task will be worked on until it is completed and THEN the worker stops.

By patching doctrine you allow to the doctrine queue to be interrupted during its "waiting" phases. As in default this is 60 seconds it would be only be interruptable once all 60 seconds, as long as there are no short tasks.
But it is completely "benificial" - the game changer is the jobque itself.

Of course the PCNTL extensions have to be installed.

@kdambekalns
Copy link
Member

Jast saw this, and even thought it is old by now… it still seems a great addition. @fcool, would you bring this up to date as needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants