Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 1 addition & 4 deletions .travis.yml
Original file line number Diff line number Diff line change
@@ -1,13 +1,10 @@
language: php

php:
- 5.6
- 7.0
- 7.1

matrix:
fast_finish: true
allow_failures:
- php: 7.0

before_script:
- composer install --no-interaction --dev --prefer-dist
Expand Down
2 changes: 1 addition & 1 deletion composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
}
],
"require": {
"php": ">=5.6",
"php": ">=7.1",
"illuminate/queue": "^5.3.24",
"laravel-doctrine/orm": "^1.0"
},
Expand Down
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
<?php


namespace MaxBrokman\SafeQueue;
namespace MaxBrokman\SafeQueue\Exceptions;

use Exception;

Expand Down
14 changes: 14 additions & 0 deletions src/Exceptions/QueueFailureException.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
<?php declare(strict_types=1);

namespace MaxBrokman\SafeQueue\Exceptions;

use MaxBrokman\SafeQueue\QueueFailure;
use Throwable;

class QueueFailureException extends \Exception implements QueueFailure
{
public function __construct(Throwable $previous = null)
{
parent::__construct("Unable to fetch job from queue", 500, $previous);
}
}
10 changes: 10 additions & 0 deletions src/QueueFailure.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
<?php declare(strict_types=1);

namespace MaxBrokman\SafeQueue;

/**
* Interface QueueMustStop used in testing to force a stop.
*/
interface QueueFailure
{
}
50 changes: 30 additions & 20 deletions src/Worker.php
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
<?php

<?php declare(strict_types=1);

namespace MaxBrokman\SafeQueue;

Expand All @@ -12,6 +11,8 @@
use Illuminate\Queue\QueueManager;
use Illuminate\Queue\Worker as IlluminateWorker;
use Illuminate\Queue\WorkerOptions;
use MaxBrokman\SafeQueue\Exceptions\EntityManagerClosedException;
use MaxBrokman\SafeQueue\Exceptions\QueueFailureException;
use Symfony\Component\Debug\Exception\FatalThrowableError;
use Throwable;

Expand Down Expand Up @@ -54,10 +55,11 @@ public function __construct(
*
* This is a slight re-working of the parent implementation to aid testing.
*
* @param string $connectionName
* @param null $queue
* @param string $connectionName
* @param null $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return void
* @throws Exception
*/
public function daemon($connectionName, $queue, WorkerOptions $options)
{
Expand Down Expand Up @@ -100,10 +102,11 @@ public function stop()
* We clear the entity manager, assert that it's open and also assert that
* the database has an open connection before running each job.
*
* @param string $connectionName
* @param string $queue
* @param string $connectionName
* @param string $queue
* @param \Illuminate\Queue\WorkerOptions $options
* @return bool
* @throws Exception
*/
public function runNextJob($connectionName, $queue, WorkerOptions $options)
{
Expand All @@ -113,7 +116,7 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
$this->assertEntityManagerOpen();
} catch (EntityManagerClosedException $e) {
if ($this->exceptions) {
$this->exceptions->report(new EntityManagerClosedException);
$this->exceptions->report($e);
}

return false;
Expand All @@ -122,9 +125,7 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
$this->assertGoodDatabaseConnection();

try {
$job = $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
$job = $this->fetchNewJob($connectionName, $queue);

// If we're able to pull a job off of the stack, we will process it and then return
// from this method. If there is no job on the queue, we will "sleep" the worker
Expand All @@ -137,20 +138,12 @@ public function runNextJob($connectionName, $queue, WorkerOptions $options)
return true;
}

} catch (Exception $e) {
if ($this->exceptions) {
$this->exceptions->report($e);
}

if ($e instanceof QueueMustStop) {
return false;
}
} catch (Throwable $e) {
} catch (Exception | Throwable $e) {
if ($this->exceptions) {
$this->exceptions->report(new FatalThrowableError($e));
}

if ($e instanceof QueueMustStop) {
if ($e instanceof QueueMustStop || $e instanceof QueueFailure) {
return false;
}
}
Expand Down Expand Up @@ -186,4 +179,21 @@ private function assertGoodDatabaseConnection()
$connection->connect();
}
}

/**
* @param string $connectionName
* @param null $queue
* @return \Illuminate\Contracts\Queue\Job|null
* @throws QueueFailureException
*/
private function fetchNewJob(string $connectionName, $queue): ?\Illuminate\Contracts\Queue\Job
{
try {
return $this->getNextJob(
$this->manager->connection($connectionName), $queue
);
} catch (Exception | Throwable $e) {
throw new QueueFailureException($e);
}
}
}
42 changes: 36 additions & 6 deletions tests/WorkerTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,13 @@
use Illuminate\Queue\QueueManager;
use Illuminate\Queue\Worker as IlluminateWorker;
use Illuminate\Queue\WorkerOptions;
use MaxBrokman\SafeQueue\EntityManagerClosedException;
use MaxBrokman\SafeQueue\Exceptions\EntityManagerClosedException;
use MaxBrokman\SafeQueue\Exceptions\QueueFailureException;
use MaxBrokman\SafeQueue\QueueMustStop;
use MaxBrokman\SafeQueue\Stopper;
use MaxBrokman\SafeQueue\Worker;
use Mockery as m;
use Symfony\Component\Debug\Exception\FatalThrowableError;

class WorkerTest extends \PHPUnit_Framework_TestCase
{
Expand Down Expand Up @@ -123,6 +125,15 @@ protected function prepareToRunJob($job)
call_user_func_array([$popExpectation, 'andReturn'], $jobs);
}

protected function prepareToRunJobFails()
{
$this->queueManager->shouldReceive('isDownForMaintenance')->andReturn(false);
$this->queueManager->shouldReceive('connection')->andReturn($this->queue);
$this->queueManager->shouldReceive('getName')->andReturn('test');

$this->queue->shouldReceive('pop')->andThrow(new QueueFailureException(new BadThingHappened()));
}

public function testExtendsLaravelWorker()
{
$this->assertInstanceOf(IlluminateWorker::class, $this->worker);
Expand Down Expand Up @@ -160,7 +171,7 @@ public function testReconnected()
// We must stop
$this->stopper->shouldReceive('stop')->once();
// We must log this fact
$this->exceptions->shouldReceive('report')->with(m::type(BadThingHappened::class))->once();
$this->exceptions->shouldReceive('report')->with(m::type(FatalThrowableError::class))->once();

// Make a job
$job = m::mock(Job::class);
Expand All @@ -176,8 +187,8 @@ public function testReconnected()
public function testLoops()
{
// Entity manager will report open and good connection
$this->entityManager->shouldReceive('isOpen')->andReturn(true)->times(2);
$this->dbConnection->shouldReceive('ping')->andReturn(true)->times(2);
$this->entityManager->shouldReceive('isOpen')->andReturn(true)->times(3);
$this->dbConnection->shouldReceive('ping')->andReturn(true)->times(3);

// We must stop
$this->stopper->shouldReceive('stop')->once();
Expand All @@ -192,9 +203,9 @@ public function testLoops()
$jobTwo->shouldReceive('fire')->once()->andThrow(new BadThingHappened());
$jobTwo->shouldIgnoreMissing();

$this->exceptions->shouldReceive('report')->with(m::type(BadThingHappened::class))->once();
$this->exceptions->shouldReceive('report')->with(m::type(FatalThrowableError::class))->once();

$this->prepareToRunJob([$jobOne, $jobTwo]);
$this->prepareToRunJob([null, $jobOne, $jobTwo]);

$this->worker->daemon('test', null, $this->options);
}
Expand All @@ -214,6 +225,25 @@ public function testRestartsNicely()

$this->worker->daemon('test', null, $this->options);
}

public function testQueueFailure()
{
// Entity manager will report open and good connection
$this->entityManager->shouldReceive('isOpen')->andReturn(true)->times(1);
$this->dbConnection->shouldReceive('ping')->andReturn(true)->times(1);

// We must stop
$this->stopper->shouldReceive('stop')->once();

// Make a job
$job = m::mock(Job::class);

$this->exceptions->shouldReceive('report')->with(m::type(FatalThrowableError::class))->once();

$this->prepareToRunJobFails();

$this->worker->daemon('test', null, $this->options);
}
}

class BadThingHappened extends \Exception implements QueueMustStop
Expand Down