Skip to content

Commit

Permalink
Merge pull request #33 from Flowpack/bugfix/remove-message-cache-on-f…
Browse files Browse the repository at this point in the history
…ailure

BUGFIX: Remove message cache file on failure
  • Loading branch information
kdambekalns authored Jul 31, 2018
2 parents 2aa7c92 + cd1bcd3 commit 6d7bf4c
Showing 1 changed file with 6 additions and 1 deletion.
7 changes: 6 additions & 1 deletion Classes/Job/JobManager.php
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,8 @@ public function queue(string $queueName, JobInterface $job, array $options = [])
*/
public function waitAndExecute(string $queueName, $timeout = null): ?Message
{

$messageCacheIdentifier = null;
$queue = $this->queueManager->getQueue($queueName);
$message = $queue->waitAndReserve($timeout);
if ($message === null) {
Expand All @@ -102,7 +104,6 @@ public function waitAndExecute(string $queueName, $timeout = null): ?Message
$messageCacheIdentifier = sha1(serialize($message));
$this->messageCache->set($messageCacheIdentifier, $message);
Scripts::executeCommand('flowpack.jobqueue.common:job:execute', $this->flowSettings, false, [$queue->getName(), $messageCacheIdentifier]);
$this->messageCache->remove($messageCacheIdentifier);
} else {
$this->executeJobForMessage($queue, $message);
}
Expand All @@ -118,6 +119,10 @@ public function waitAndExecute(string $queueName, $timeout = null): ?Message
$this->emitMessageFailed($queue, $message, $exception);
throw new JobQueueException(sprintf('Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', $message->getIdentifier(), $queue->getName(), $message->getNumberOfReleases() + 1, $maximumNumberOfReleases + 1), 1334056584, $exception);
}
} finally {
if ($messageCacheIdentifier !== null) {
$this->messageCache->remove($messageCacheIdentifier);
}
}

$queue->finish($message->getIdentifier());
Expand Down

0 comments on commit 6d7bf4c

Please sign in to comment.