Skip to content

Commit

Permalink
Add support AMQP (#12)
Browse files Browse the repository at this point in the history
* create AMQP event queue

* try run functional test of AMQP

* sudo required for travis

* not run tests on HHVM

* add rabbitmq-server to addons

* not run tests on HHVM

* change $channel_rpc_timeout

* try change $channel_rpc_timeout and $read_write_timeout

* verbose phpunit

* use dist trusty

* remove rabbitmq-server addons

* change $read_write_timeout and $heartbeat

* try publish before subscribe

* disable functional test

* disable verbose phpunit

* fix memory limit error

* try test on HHVM

* HHVM not longer supported
  • Loading branch information
peter-gribanov authored Aug 6, 2019
1 parent 764057f commit bbb1332
Show file tree
Hide file tree
Showing 11 changed files with 607 additions and 19 deletions.
5 changes: 5 additions & 0 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,20 +20,25 @@ matrix:
- php: 5.5
dist: trusty
- php: 5.5
dist: trusty
env: SYMFONY_VERSION=2.7.*
dist: trusty
- php: 5.5
dist: trusty
env: SYMFONY_VERSION=2.8.*
dist: trusty
- php: 5.5
dist: trusty
env: SYMFONY_VERSION=3.4.*
dist: trusty
- php: 7.1
env: SYMFONY_VERSION=4.3.* PHPUNIT_VERSION=5.7.*
- php: 5.5
dist: trusty
env: PREDIS_VERSION=1.0.*
dist: trusty
- php: 5.5
dist: trusty
env: PREDIS_VERSION=1.1.*
dist: trusty
- php: 5.6
Expand Down
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,7 @@ $bus->pullAndPublish($purchase_order);
* [Predis queue](docs/queue/pull/predis.md)
* [Subscribe](docs/queue/subscribe/subscribe.md)
* [Executing queue](docs/queue/subscribe/executing.md)
* [AMQP queue](docs/queue/subscribe/amqp.md)
* [Predis queue](docs/queue/subscribe/predis.md)
* Serialize command
* [Simple payload serializer](docs/queue/serialize/simple.md)
Expand Down
2 changes: 2 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
"ddd",
"domain-event",
"predis",
"amqp",
"symfony"
],
"autoload": {
Expand All @@ -33,6 +34,7 @@
"symfony/dependency-injection": "~2.3|~3.0|~4.0",
"symfony/serializer": "~2.3|~3.0|~4.0",
"predis/predis": "~1.0|~1.1",
"php-amqplib/php-amqplib": "^2.9",
"phpunit/phpunit": "4.8.*",
"scrutinizer/ocular": "~1.5",
"satooshi/php-coveralls": "^2.0"
Expand Down
70 changes: 70 additions & 0 deletions docs/queue/subscribe/amqp.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
AMQP queue
==========

Queues are designed to distribute the load and delay publishing of events or transfer their publishing to separate
processes.

The queue stores events in [RabbitMQ](https://www.rabbitmq.com/), using the [php-amqplib](https://github.com/php-amqplib/php-amqplib)
library to access it.

This queue uses a [serializer](https://symfony.com/doc/current/components/serializer.html) to convert event objects
to strings and back while waiting for the transport of objects across the AMQP. The serializer uses the `predis`
format as a default. You can change format if you need. You can make messages more optimal for a RabbitMQ than JSON.

If the message could not be deserialized, then a critical message is written to the log so that the administrator can
react quickly to the problem and the message is placed again at the end of the queue, so as not to lose it.

You can use any implementations of [callable type](http://php.net/manual/en/language.types.callable.php) as a queue
subscriber.

Configure queue:

```php
use GpsLab\Domain\Event\Queue\Subscribe\AMQPSubscribeEventQueue;
use GpsLab\Domain\Event\Queue\Serializer\SymfonySerializer;
use PhpAmqpLib\Connection\AMQPStreamConnection;
use Symfony\Component\Serializer\Serializer;

//$connection = new AMQPStreamConnection('localhost', 5672, 'guest', 'guest'); // AMQP connection
//$channel = $connection->channel();
//$symfony_serializer = new Serializer(); // Symfony serializer
//$logger = new Logger(); // PSR-3 logger
$queue_name = 'article_queue';
$format = 'json'; // default: predis
// you can create another implementation of serializer
$serializer = new SymfonySerializer($symfony_serializer, $format);
$queue = new AMQPSubscribeEventQueue($channel, $serializer, $logger, $queue_name);
```

Subscribe to the queue:

```php
use GpsLab\Domain\Event\Bus\ListenerLocatedEventBus;
use GpsLab\Domain\Event\Listener\Locator\DirectBindingEventListenerLocator;

$locator = new DirectBindingEventListenerLocator();
$bus = new ListenerLocatedEventBus($locator);

$handler = function(ArticleRenamedEvent $event) use ($bus) {
$bus->publish($event);
};

$queue->subscribe($handler);
```

You can unsubscribe of the queue:

```php
$queue->unsubscribe($handler);
```

Make event and publish it into queue:

```php
$event = new ArticleRenamedEvent();
$event->new_name = $new_name;

$queue->publish($event);
```

You can use [QueueEventBus](../bus.md) for publish events in queue.
4 changes: 2 additions & 2 deletions docs/queue/subscribe/executing.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,11 @@ subscriber.
```php
use GpsLab\Domain\Event\Bus\ListenerLocatedEventBus;
use GpsLab\Domain\Event\Listener\Locator\DirectBindingEventListenerLocator;
use GpsLab\Domain\Event\Queue\PubSub\ExecutingEventQueue;
use GpsLab\Domain\Event\Queue\Subscribe\ExecutingSubscribeEventQueue;

$locator = new DirectBindingEventListenerLocator();
$bus = new ListenerLocatedEventBus($locator);
$queue = new ExecutingEventQueue();
$queue = new ExecutingSubscribeEventQueue();
```

Subscribe to the queue:
Expand Down
4 changes: 2 additions & 2 deletions docs/queue/subscribe/predis.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ subscriber.
Configure queue:

```php
use GpsLab\Domain\Event\Queue\PubSub\PredisEventQueue;
use GpsLab\Domain\Event\Queue\Subscribe\PredisSubscribeEventQueue;
use GpsLab\Domain\Event\Queue\Serializer\SymfonySerializer;
use Symfony\Component\Serializer\Serializer;
use Superbalist\PubSub\Redis\RedisPubSubAdapter;
Expand All @@ -39,7 +39,7 @@ $queue_name = 'article_queue';
$format = 'json'; // default: predis
// you can create another implementation of serializer
$serializer = new SymfonySerializer($symfony_serializer, $format);
$queue = new PredisEventQueue($predis, $serializer, $logger, $queue_name);
$queue = new PredisSubscribeEventQueue($predis, $serializer, $logger, $queue_name);
```

Subscribe to the queue:
Expand Down
1 change: 1 addition & 0 deletions docs/queue/subscribe/subscribe.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,4 +13,5 @@ subscriber.
You can use one of the existing queues:

* [Executing queue](executing.md)
* [AMQP queue](amqp.md)
* [Predis queue](predis.md)
172 changes: 172 additions & 0 deletions src/Queue/Subscribe/AMQPSubscribeEventQueue.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
<?php

/**
* GpsLab component.
*
* @author Peter Gribanov <[email protected]>
* @copyright Copyright (c) 2011, Peter Gribanov
* @license http://opensource.org/licenses/MIT
*/

namespace GpsLab\Domain\Event\Queue\Subscribe;

use GpsLab\Domain\Event\Event;
use GpsLab\Domain\Event\Queue\Serializer\Serializer;
use PhpAmqpLib\Channel\AMQPChannel;
use PhpAmqpLib\Message\AMQPMessage;
use Psr\Log\LoggerInterface;

class AMQPSubscribeEventQueue implements SubscribeEventQueue
{
/**
* @var AMQPChannel
*/
private $channel;

/**
* @var Serializer
*/
private $serializer;

/**
* @var LoggerInterface
*/
private $logger;

/**
* @var callable[]
*/
private $handlers = [];

/**
* @var string
*/
private $queue_name = '';

/**
* @var bool
*/
private $subscribed = false;

/**
* @var bool
*/
private $declared = false;

/**
* @param AMQPChannel $channel
* @param Serializer $serializer
* @param LoggerInterface $logger
* @param string $queue_name
*/
public function __construct(AMQPChannel $channel, Serializer $serializer, LoggerInterface $logger, $queue_name)
{
$this->channel = $channel;
$this->serializer = $serializer;
$this->logger = $logger;
$this->queue_name = $queue_name;
}

/**
* Publish event to queue.
*
* @param Event $event
*
* @return bool
*/
public function publish(Event $event)
{
$message = $this->serializer->serialize($event);
$this->declareQueue();
$this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name);

return true;
}

/**
* Subscribe on event queue.
*
* @throws \ErrorException
*
* @param callable $handler
*/
public function subscribe(callable $handler)
{
$this->handlers[] = $handler;

// laze subscribe
if (!$this->subscribed) {
$this->declareQueue();
$this->channel->basic_consume(
$this->queue_name,
'',
false,
true,
false,
false,
function (AMQPMessage $message) {
$this->handle($message->body);
}
);

$this->subscribed = true;
}

while ($this->channel->is_consuming()) {
$this->channel->wait();
}
}

/**
* Unsubscribe on event queue.
*
* @param callable $handler
*
* @return bool
*/
public function unsubscribe(callable $handler)
{
$index = array_search($handler, $this->handlers);

if ($index === false) {
return false;
}

unset($this->handlers[$index]);

return true;
}

private function declareQueue()
{
// laze declare queue
if (!$this->declared) {
$this->channel->queue_declare($this->queue_name, false, false, false, false);
$this->declared = true;
}
}

/**
* @param string $message
*/
private function handle($message)
{
try {
$event = $this->serializer->deserialize($message);
} catch (\Exception $e) { // catch only deserialize exception
// it's a critical error
// it is necessary to react quickly to it
$this->logger->critical('Failed denormalize a event in the AMQP queue', [$message, $e->getMessage()]);

// try denormalize in later
$this->declareQueue();
$this->channel->basic_publish(new AMQPMessage($message), '', $this->queue_name);

return; // no event for handle
}

foreach ($this->handlers as $handler) {
call_user_func($handler, $event);
}
}
}
9 changes: 3 additions & 6 deletions src/Queue/Subscribe/PredisSubscribeEventQueue.php
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ public function __construct(
*/
public function publish(Event $event)
{
$massage = $this->serializer->serialize($event);
$this->client->publish($this->queue_name, $massage);
$message = $this->serializer->serialize($event);
$this->client->publish($this->queue_name, $message);

return true;
}
Expand Down Expand Up @@ -128,10 +128,7 @@ private function handle($message)
} catch (\Exception $e) { // catch only deserialize exception
// it's a critical error
// it is necessary to react quickly to it
$this->logger->critical(
'Failed denormalize a event in the Redis queue',
[$message, $e->getMessage()]
);
$this->logger->critical('Failed denormalize a event in the Redis queue', [$message, $e->getMessage()]);

// try denormalize in later
$this->client->publish($this->queue_name, $message);
Expand Down
Loading

0 comments on commit bbb1332

Please sign in to comment.