Skip to content

Commit 7a5ba63

Browse files
committed
* Removed "simple:" from command names
* Changed behaviour of dotenv option to simplify * Added option to stop working when queue is empty
1 parent 8aae9ce commit 7a5ba63

File tree

3 files changed

+33
-18
lines changed

3 files changed

+33
-18
lines changed

src/Command/Symfony/Restart.php

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ public function __construct(CacheInterface $cache)
3737
protected function configure()
3838
{
3939
$this
40-
->setName('simple:task-worker:restart')
40+
->setName('task-worker:restart')
4141
->setDescription('Inform all task worker to stop after current task is complete')
4242
->setHelp('This command allows you to safely stop all task workers');
4343
}

src/Command/Symfony/Work.php

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,16 @@ public function __construct(CacheInterface $cache)
4040
protected function configure()
4141
{
4242
$this
43-
->setName('simple:task-worker:work')
43+
->setName('task-worker:work')
4444
->setDescription('Patiently wait for a task(s) to perform')
4545
->setHelp('This command allows you to start a task worker')
46-
->addOption('env', 'e', InputOption::VALUE_REQUIRED, 'Dot env file directory', realpath(__DIR__ . '/../../../examples/'))
46+
->addOption('dotenv', 'e', InputOption::VALUE_REQUIRED, 'Load configuration from environment file', '.env')
4747
->addOption(Worker::OPT_SLEEP, 's', InputOption::VALUE_REQUIRED, 'How long, in seconds, to sleep if not tasks are available', Worker::defaultOptions()[Worker::OPT_SLEEP])
4848
->addOption(Worker::OPT_ATTEMPTS, 'a', InputOption::VALUE_REQUIRED, 'How long many attempts a task is allowed before being failed (zero is unlimited)', Worker::defaultOptions()[Worker::OPT_ATTEMPTS])
4949
->addOption(Worker::OPT_ALIVE, 'l', InputOption::VALUE_REQUIRED, 'How long, in seconds, the worker will stay alive for (zero is unlimited)', Worker::defaultOptions()[Worker::OPT_ALIVE])
50-
->addOption(Worker::OPT_REST, 'r', InputOption::VALUE_REQUIRED, 'How long, in milliseconds, to rest between tasks', Worker::defaultOptions()[Worker::OPT_REST]);
50+
->addOption(Worker::OPT_REST, 'r', InputOption::VALUE_REQUIRED, 'How long, in milliseconds, to rest between tasks', Worker::defaultOptions()[Worker::OPT_REST])
51+
->addOption(Worker::OPT_MAX_TASKS, 'm', InputOption::VALUE_REQUIRED, 'How many tasks should be performed (zero is unlimited)', Worker::defaultOptions()[Worker::OPT_MAX_TASKS])
52+
->addOption(Worker::OPT_UNTIL_EMPTY, 'u', InputOption::VALUE_NONE, 'If the worker should stop when the queue is empty');
5153
}
5254

5355
/**
@@ -68,7 +70,7 @@ protected function configure()
6870
protected function execute(InputInterface $input, OutputInterface $output)
6971
{
7072

71-
$dotenv = new Dotenv($input->getOption('env'));
73+
$dotenv = new Dotenv(getcwd(), $input->getOption('dotenv'));
7274
$dotenv->load();
7375

7476
$logger = new ConsoleLogger($output);

src/Worker.php

Lines changed: 26 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -30,18 +30,21 @@ class Worker
3030
const OPT_REST = 'rest';
3131
/** Option to set the maximum number of tasks a worker should perform before stopping (zero is unlimited). */
3232
const OPT_MAX_TASKS = 'max-tasks';
33+
/** Option to set the worker to stop when the queue it empty (boolean 1 or 0). */
34+
const OPT_UNTIL_EMPTY = 'until-empty';
3335

3436
/**
3537
* @return array
3638
*/
3739
public static function defaultOptions() : array
3840
{
3941
return [
40-
self::OPT_SLEEP => 3,
41-
self::OPT_ATTEMPTS => 0,
42-
self::OPT_ALIVE => 0,
43-
self::OPT_REST => 50,
44-
self::OPT_MAX_TASKS => 0,
42+
self::OPT_SLEEP => 3,
43+
self::OPT_ATTEMPTS => 0,
44+
self::OPT_ALIVE => 0,
45+
self::OPT_REST => 50,
46+
self::OPT_MAX_TASKS => 0,
47+
self::OPT_UNTIL_EMPTY => 0,
4548
];
4649
}
4750

@@ -156,12 +159,19 @@ public function run(): int
156159
if ($this->opt(self::OPT_REST) > 0) {
157160
usleep(min(1,$this->opt(self::OPT_REST)));
158161
}
159-
} else {
160-
// Patiently wait for next task to arrive
162+
}
163+
164+
// Should we continue working?
165+
if (!$this->shouldContinueWorking($task)) {
166+
break;
167+
}
168+
169+
// Patiently wait for next task to arrive
170+
if ($task === null) {
161171
sleep(min(1, $this->opt(self::OPT_SLEEP)));
162172
}
163173
}
164-
while ($this->shouldContinueWorking());
174+
while (true);
165175
}
166176
catch (\Throwable $throwable) {
167177
$this->logger->critical($throwable->getMessage(), ['trace' => $throwable->getTrace()]);
@@ -193,17 +203,20 @@ protected function prepare(Task $task) : Task
193203
}
194204

195205
/**
206+
* @param Task $task
196207
* @return bool
197208
*/
198-
protected function shouldContinueWorking(): bool
209+
protected function shouldContinueWorking(Task $task = null): bool
199210
{
200-
$optMaxTasks = $this->opt(self::OPT_MAX_TASKS);
201-
$optAlive = $this->opt(self::OPT_ALIVE);
202-
$restartTime = $this->cache->get(sha1(self::CACHE_RESTART), 0);
211+
$optMaxTasks = $this->opt(self::OPT_MAX_TASKS);
212+
$optAlive = $this->opt(self::OPT_ALIVE);
213+
$restartTime = $this->cache->get(sha1(self::CACHE_RESTART), 0);
214+
$optUntilEmpty = $this->opt(self::OPT_UNTIL_EMPTY);
203215

204216
return
205217
($optMaxTasks <= 0 || $optMaxTasks > $this->taskCount) &&
206218
($optAlive <= 0 || $optAlive > round(time() - $this->startTime)) &&
207-
($restartTime === 0 || $restartTime <= $this->startTime);
219+
($restartTime === 0 || $restartTime <= $this->startTime) &&
220+
(!$optUntilEmpty || $task !== null);
208221
}
209222
}

0 commit comments

Comments
 (0)