Skip to content

Commit

Permalink
Merge pull request #10 from bwaidelich/master
Browse files Browse the repository at this point in the history
FEATURE: Major overhaul
  • Loading branch information
Bastian Waidelich authored Jul 20, 2016
2 parents ed5531d + 17adaf7 commit 1b8f4c1
Show file tree
Hide file tree
Showing 22 changed files with 1,815 additions and 326 deletions.
7 changes: 7 additions & 0 deletions Classes/Annotations/Defer.php
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,12 @@ final class Defer
*/
public $queueName;

/**
* Optional key/value array of options passed to the queue (for example array('delay' => 123) - Supported options depend on the concrete queue implementation)
* @var array
*/
public $options;

/**
* @param array $values
* @throws \InvalidArgumentException
Expand All @@ -35,5 +41,6 @@ public function __construct(array $values)
throw new \InvalidArgumentException('A Defer annotation must specify a queueName.', 1334128835);
}
$this->queueName = isset($values['queueName']) ? $values['queueName'] : $values['value'];
$this->options = isset($values['options']) ? $values['options'] : [];
}
}
104 changes: 87 additions & 17 deletions Classes/Command/JobCommandController.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,12 @@
* source code.
*/

use TYPO3\Flow\Annotations as Flow;
use TYPO3\Flow\Cli\CommandController;
use Flowpack\JobQueue\Common\Exception as JobQueueException;
use Flowpack\JobQueue\Common\Job\JobManager;
use Flowpack\JobQueue\Common\Queue\Message;
use Flowpack\JobQueue\Common\Queue\QueueManager;
use TYPO3\Flow\Annotations as Flow;
use TYPO3\Flow\Cli\CommandController;

/**
* Job command controller
Expand All @@ -37,43 +38,112 @@ class JobCommandController extends CommandController
/**
* Work on a queue and execute jobs
*
* @param string $queueName The name of the queue
* This command is used to execute jobs that are submitted to a queue.
* It is meant to run in a "server loop" and should be backed by some Process Control System (e.g. supervisord) that
* will restart the script if it died (due to exceptions or memory limits for example).
*
* Alternatively the <i>exit-after</i> flag can be used in conjunction with cron-jobs in order to manually (re)start
* the worker after a given amount of time.
*
* With the <i>limit</i> flag the number of executed jobs can be limited before the script exits.
* This can be combined with <i>exit-after</i> to exit when either the time or job limit is reached
*
* The <i>verbose</i> flag can be used to gain some insight about which jobs are executed etc.
*
* @param string $queue Name of the queue to fetch messages from. Can also be a comma-separated list of queues.
* @param int $exitAfter If set, this command will exit after the given amount of seconds
* @param int $limit If set, only the given amount of jobs are processed (successful or not) before the script exits
* @param bool $verbose Output debugging information
* @return void
*/
public function workCommand($queueName)
public function workCommand($queue, $exitAfter = null, $limit = null, $verbose = false)
{
if ($verbose) {
$this->output('Watching queue <b>"%s"</b>', [$queue]);
if ($exitAfter !== null) {
$this->output(' for <b>%d</b> seconds', [$exitAfter]);
}
$this->outputLine('...');
}
$startTime = time();
$timeout = null;
$numberOfJobExecutions = 0;
do {
$message = null;
if ($exitAfter !== null) {
$timeout = max(1, $exitAfter - (time() - $startTime));
}
try {
$this->jobManager->waitAndExecute($queueName);
$message = $this->jobManager->waitAndExecute($queue, $timeout);
} catch (JobQueueException $exception) {
$this->outputLine($exception->getMessage());
if ($exception->getPrevious() instanceof \Exception) {
$this->outputLine($exception->getPrevious()->getMessage());
$numberOfJobExecutions ++;
$this->outputLine('<error>%s</error>', [$exception->getMessage()]);
if ($verbose && $exception->getPrevious() instanceof \Exception) {
$this->outputLine(' Reason: %s', [$exception->getPrevious()->getMessage()]);
}
} catch (\Exception $exception) {
$this->outputLine('Unexpected exception during job execution: %s', array($exception->getMessage()));
$this->outputLine('<error>Unexpected exception during job execution: %s, aborting...</error>', [$exception->getMessage()]);
$this->quit(1);
}
if ($message !== null) {
$numberOfJobExecutions ++;
if ($verbose) {
$messagePayload = strlen($message->getPayload()) <= 50 ? $message->getPayload() : substr($message->getPayload(), 0, 50) . '...';
$this->outputLine('<success>Successfully executed job "%s" (%s)</success>', [$message->getIdentifier(), $messagePayload]);
}
}
if ($exitAfter !== null && (time() - $startTime) >= $exitAfter) {
if ($verbose) {
$this->outputLine('Quitting after %d seconds due to <i>--exit-after</i> flag', [time() - $startTime]);
}
$this->quit();
}
if ($limit !== null && $numberOfJobExecutions >= $limit) {
if ($verbose) {
$this->outputLine('Quitting after %d executed job%s due to <i>--limit</i> flag', [$numberOfJobExecutions, $numberOfJobExecutions > 1 ? 's' : '']);
}
$this->quit();
}

} while (true);
}

/**
* List queued jobs
*
* @param string $queueName The name of the queue
* @param integer $limit Number of jobs to list
* Shows the label of the next <i>$limit</i> Jobs in a given queue.
*
* @param string $queue The name of the queue
* @param integer $limit Number of jobs to list (some queues only support a limit of 1)
* @return void
*/
public function listCommand($queueName, $limit = 1)
public function listCommand($queue, $limit = 1)
{
$jobs = $this->jobManager->peek($queueName, $limit);
$totalCount = $this->queueManager->getQueue($queueName)->count();
$jobs = $this->jobManager->peek($queue, $limit);
$totalCount = $this->queueManager->getQueue($queue)->count();
foreach ($jobs as $job) {
$this->outputLine('<u>%s</u>', array($job->getLabel()));
$this->outputLine('<b>%s</b>', [$job->getLabel()]);
}

if ($totalCount > count($jobs)) {
$this->outputLine('(%d omitted) ...', array($totalCount - count($jobs)));
$this->outputLine('(%d omitted) ...', [$totalCount - count($jobs)]);
}
$this->outputLine('(<b>%d total</b>)', array($totalCount));
$this->outputLine('(<b>%d total</b>)', [$totalCount]);
}

/**
* Execute one job
*
* @param string $queue
* @param string $serializedMessage An instance of Message serialized and base64-encoded
* @return void
* @internal This command is mainly used by the JobManager and FakeQueue in order to execute commands in sub requests
*/
public function executeCommand($queue, $serializedMessage)
{
/** @var Message $message */
$message = unserialize(base64_decode($serializedMessage));
$queue = $this->queueManager->getQueue($queue);
$this->jobManager->executeJobForMessage($queue, $message);
}
}
148 changes: 148 additions & 0 deletions Classes/Command/QueueCommandController.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,148 @@
<?php
namespace Flowpack\JobQueue\Common\Command;

/*
* This file is part of the Flowpack.JobQueue.Common package.
*
* (c) Contributors to the package
*
* This package is Open Source Software. For the full copyright and license
* information, please view the LICENSE file which was distributed with this
* source code.
*/

use Flowpack\JobQueue\Common\Queue\QueueManager;
use TYPO3\Flow\Annotations as Flow;
use TYPO3\Flow\Cli\CommandController;
use TYPO3\Flow\Utility\TypeHandling;

/**
* CLI controller to manage message queues
*/
class QueueCommandController extends CommandController
{

/**
* @Flow\Inject
* @var QueueManager
*/
protected $queueManager;

/**
* @Flow\InjectConfiguration(path="queues")
* @var array
*/
protected $queueConfigurations;

/**
* List configured queues
*
* Displays all configured queues, their type and the number of messages that are ready to be processed.
*
* @return void
*/
public function listCommand()
{
$rows = [];
foreach ($this->queueConfigurations as $queueName => $queueConfiguration) {
$queue = $this->queueManager->getQueue($queueName);
try {
$numberOfMessages = $queue->count();
} catch (\Exception $e) {
$numberOfMessages = '-';
}
$rows[] = [$queue->getName(), TypeHandling::getTypeForValue($queue), $numberOfMessages];
}
$this->output->outputTable($rows, ['Queue', 'Type', '# messages']);
}

/**
* Describe a single queue
*
* Displays the configuration for a queue, merged with the preset settings if any.
*
* @param string $queue Name of the queue to describe (e.g. "some-queue")
* @return void
*/
public function describeCommand($queue)
{
$queueSettings = $this->queueManager->getQueueSettings($queue);
$this->outputLine('Configuration options for Queue <b>%s</b>:', [$queue]);
$rows = [];
foreach ($queueSettings as $name => $value) {
$rows[] = [$name, is_array($value) ? json_encode($value, JSON_PRETTY_PRINT) : $value];
}
$this->output->outputTable($rows, ['Option', 'Value']);
}

/**
* Initialize a queue
*
* Checks connection to the queue backend and sets up prerequisites (e.g. required database tables)
* Most queue implementations don't need to be initialized explicitly, but it doesn't harm and might help to find misconfigurations
*
* @param string $queue Name of the queue to initialize (e.g. "some-queue")
* @return void
*/
public function setupCommand($queue)
{
$queue = $this->queueManager->getQueue($queue);
try {
$queue->setUp();
} catch (\Exception $exception) {
$this->outputLine('<error>An error occurred while trying to setup queue "%s":</error>', [$queue->getName()]);
$this->outputLine('%s (#%s)', [$exception->getMessage(), $exception->getCode()]);
$this->quit(1);
}
$this->outputLine('<success>Queue "%s" has been initialized successfully.</success>', [$queue->getName()]);
}

/**
* Remove all messages from a queue!
*
* This command will delete <u>all</u> messages from the given queue.
* Thus it should only be used in tests or with great care!
*
* @param string $queue Name of the queue to flush (e.g. "some-queue")
* @param bool $force This flag is required in order to avoid accidental flushes
* @return void
*/
public function flushCommand($queue, $force = false)
{
$queue = $this->queueManager->getQueue($queue);
if (!$force) {
$this->outputLine('Use the --force flag if you really want to flush queue "%s"', [$queue->getName()]);
$this->outputLine('<error>Warning: This will delete all messages from the queue!</error>');
$this->quit(1);
}
$queue->flush();
$this->outputLine('<success>Flushed queue "%s".</success>', [$queue->getName()]);
}

/**
* Submit a message to a given queue
*
* This command can be used to "manually" add messages to a given queue.
*
* <b>Example:</b>
* <i>flow queue:submit some-queue "some payload" --options '{"delay": 14}'</i>
*
* To make this work with the <i>JobManager</i> the payload has to be a serialized
* instance of an object implementing <i>JobInterface</i>.
*
* @param string $queue Name of the queue to submit a message to (e.g. "some-queue")
* @param string $payload Arbitrary payload, for example a serialized instance of a class implementing JobInterface
* @param string $options JSON encoded, for example '{"some-option": "some-value"}'
* @return void
*/
public function submitCommand($queue, $payload, $options = null)
{
$queue = $this->queueManager->getQueue($queue);
if ($options !== null) {
$options = json_decode($options, true);
}
$messageId = $queue->submit($payload, $options !== null ? $options : []);
$this->outputLine('<success>Submitted payload to queue "%s" with ID "%s".</success>', [$queue->getName(), $messageId]);
}

}
3 changes: 2 additions & 1 deletion Classes/Job/Aspect/DeferMethodCallAspect.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,11 @@ public function queueMethodCallAsJob(JoinPointInterface $joinPoint)
if ($this->processingJob) {
return $joinPoint->getAdviceChain()->proceed($joinPoint);
}
/** @var Defer $deferAnnotation */
$deferAnnotation = $this->reflectionService->getMethodAnnotation($joinPoint->getClassName(), $joinPoint->getMethodName(), Defer::class);
$queueName = $deferAnnotation->queueName;
$job = new StaticMethodCallJob($joinPoint->getClassName(), $joinPoint->getMethodName(), $joinPoint->getMethodArguments());
$this->jobManager->queue($queueName, $job);
$this->jobManager->queue($queueName, $job, $deferAnnotation->options);
return null;
}

Expand Down
7 changes: 0 additions & 7 deletions Classes/Job/JobInterface.php
Original file line number Diff line number Diff line change
Expand Up @@ -31,13 +31,6 @@ interface JobInterface
*/
public function execute(QueueInterface $queue, Message $message);

/**
* Get an optional identifier for the job
*
* @return string A job identifier
*/
public function getIdentifier();

/**
* Get a readable label for the job
*
Expand Down
Loading

0 comments on commit 1b8f4c1

Please sign in to comment.