Skip to content

Commit

Permalink
add publishing events
Browse files Browse the repository at this point in the history
  • Loading branch information
jleonardolemos committed Dec 16, 2023
1 parent 8185661 commit 7618ecf
Show file tree
Hide file tree
Showing 16 changed files with 384 additions and 67 deletions.
5 changes: 5 additions & 0 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,11 @@
"Convenia\\Pigeon\\Tests\\": "tests/"
}
},
"scripts": {
"coverage": [
"XDEBUG_MODE=coverage ./vendor/bin/phpunit"
]
},
"extra": {
"laravel": {
"providers": [
Expand Down
17 changes: 17 additions & 0 deletions src/Drivers/Driver.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@

use Convenia\Pigeon\Consumer\Consumer;
use Convenia\Pigeon\Consumer\ConsumerContract;
use Convenia\Pigeon\Events\DispatchingEvent;
use Convenia\Pigeon\Events\EventDispatched;
use Convenia\Pigeon\Exceptions\Events\EmptyEventException;
use Convenia\Pigeon\Publisher\Publisher;
use Convenia\Pigeon\Publisher\PublisherContract;
Expand Down Expand Up @@ -69,7 +71,22 @@ public function dispatch(string $eventName, array $event, array $meta = []): voi
$publisher->header($key, $value);
}

DispatchingEvent::dispatch(
$publisher,
$eventName,
$event,
$meta
);

$publisher->disableEvents = true;
$publisher->publish($event, [], 3);

EventDispatched::dispatch(
$publisher,
$eventName,
$event,
$meta
);
}

public function events(string $event = '#'): ConsumerContract
Expand Down
25 changes: 25 additions & 0 deletions src/Events/BaseEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
<?php

namespace Convenia\Pigeon\Events;

use Illuminate\Foundation\Events\Dispatchable;
use Convenia\Pigeon\Publisher\PublisherContract;

abstract class BaseEvent
{
use Dispatchable;

public PublisherContract $publisher;
public array $userData = [];
public array $userMetaData = [];

public function __construct(
PublisherContract $publisher,
array $userData = [],
array $userMetaData = []
) {
$this->publisher = $publisher;
$this->userData = $userData;
$this->userMetaData = $userMetaData;
}
}
20 changes: 20 additions & 0 deletions src/Events/DispatchingEvent.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Convenia\Pigeon\Events;

use Convenia\Pigeon\Publisher\PublisherContract;

class DispatchingEvent extends BaseEvent
{
public string $eventName;

public function __construct(
PublisherContract $publisher,
string $eventName,
array $userData = [],
array $userMetaData = []
) {
parent::__construct($publisher, $userData, $userMetaData);
$this->eventName = $eventName;
}
}
20 changes: 20 additions & 0 deletions src/Events/EventDispatched.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
<?php

namespace Convenia\Pigeon\Events;

use Convenia\Pigeon\Publisher\PublisherContract;

class EventDispatched extends BaseEvent
{
public string $eventName;

public function __construct(
PublisherContract $publisher,
string $eventName,
array $userData = [],
array $userMetaData = []
) {
parent::__construct($publisher, $userData, $userMetaData);
$this->eventName = $eventName;
}
}
7 changes: 7 additions & 0 deletions src/Events/MessagePublished.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Convenia\Pigeon\Events;

class MessagePublished extends BaseEvent
{
}
7 changes: 7 additions & 0 deletions src/Events/PublishingMessage.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
<?php

namespace Convenia\Pigeon\Events;

class PublishingMessage extends BaseEvent
{
}
85 changes: 18 additions & 67 deletions src/Publisher/Publisher.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,34 +2,14 @@

namespace Convenia\Pigeon\Publisher;

use Convenia\Pigeon\Drivers\DriverContract;
use Illuminate\Foundation\Application;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;
use Convenia\Pigeon\Events\MessagePublished;
use Convenia\Pigeon\Events\PublishingMessage;

class Publisher implements PublisherContract
{
protected $app;
protected $driver;
protected $exchange;
protected $routing;
protected $headers = [];
use PublisherConcern;

public function __construct(Application $app, DriverContract $driver, string $exchange)
{
$this->app = $app;
$this->driver = $driver;
$this->exchange = $exchange;
}

public function routing(string $key): PublisherContract
{
$this->routing = $key;

return $this;
}
public bool $disableEvents = false;

public function bind(string $queue): PublisherContract
{
Expand All @@ -40,56 +20,27 @@ public function bind(string $queue): PublisherContract

public function publish(array $message, array $properties = [], int $channelId = null)
{
if (!$this->disableEvents) {
PublishingMessage::dispatch(
$this,
$message,
$properties
);
}

$msg = $this->makeMessage($message, $properties);
$this->driver->getChannel($channelId)->basic_publish(
$msg,
$this->exchange,
$this->routing
);
}

private function makeMessage(array $data, array $properties = [])
{
return new AMQPMessage(
json_encode($data),
$this->getMessageProps($properties)
);
}

private function getMessageProps(array $userProps): array
{
return array_merge([
'content_type' => 'application/json',
'content_encoding' => 'utf8',
'correlation_id' => Str::uuid(),
'expiration' => 60000000,
'app_id' => $this->app['config']['app_name'],
'application_headers' => new AMQPTable($this->getHeaders()),
], $userProps);
}

public function header(string $key, $value): PublisherContract
{
$this->headers = Arr::add($this->headers, $key, $value);

return $this;
}

public function getHeaders(): array
{
$configHeaders = Arr::dot($this->app['config']->get('pigeon.headers'));
$mapped = $this->mapToValues($configHeaders);

return array_merge($mapped, $this->headers);
}

protected function mapToValues(array $headers)
{
$result = [];
foreach ($headers as $key => $value) {
$result[$key] = is_callable($value) ? call_user_func($value) : $value;
if (!$this->disableEvents) {
MessagePublished::dispatch(
$this,
$message,
$properties
);
}

return $result;
}
}
83 changes: 83 additions & 0 deletions src/Publisher/PublisherConcern.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
<?php

namespace Convenia\Pigeon\Publisher;

use Convenia\Pigeon\Drivers\DriverContract;
use Illuminate\Foundation\Application;
use Illuminate\Support\Arr;
use Illuminate\Support\Str;
use PhpAmqpLib\Message\AMQPMessage;
use PhpAmqpLib\Wire\AMQPTable;

trait PublisherConcern
{
protected $app;
protected $driver;
protected $exchange;
protected $routing;
protected $headers = [];

public function __construct(Application $app, DriverContract $driver, string $exchange)
{
$this->app = $app;
$this->driver = $driver;
$this->exchange = $exchange;
}

public function routing(string $key): PublisherContract
{
$this->routing = $key;

return $this;
}

public function getRoute(): ?string
{
return $this->routing;
}

private function makeMessage(array $data, array $properties = [])
{
return new AMQPMessage(
json_encode($data),
$this->getMessageProps($properties)
);
}

private function getMessageProps(array $userProps): array
{
return array_merge([
'content_type' => 'application/json',
'content_encoding' => 'utf8',
'correlation_id' => Str::uuid(),
'expiration' => 60000000,
'app_id' => $this->app['config']['app_name'],
'application_headers' => new AMQPTable($this->getHeaders()),
], $userProps);
}

public function header(string $key, $value): PublisherContract
{
$this->headers = Arr::add($this->headers, $key, $value);

return $this;
}

public function getHeaders(): array
{
$configHeaders = Arr::dot($this->app['config']->get('pigeon.headers'));
$mapped = $this->mapToValues($configHeaders);

return array_merge($mapped, $this->headers);
}

protected function mapToValues(array $headers)
{
$result = [];
foreach ($headers as $key => $value) {
$result[$key] = is_callable($value) ? call_user_func($value) : $value;
}

return $result;
}
}
4 changes: 4 additions & 0 deletions src/Publisher/PublisherContract.php
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,13 @@ public function __construct(Application $app, DriverContract $driver, string $ex

public function routing(string $key): self;

public function getRoute(): ?string;

public function bind(string $queue): self;

public function publish(array $message, array $properties = []);

public function header(string $key, $value): self;

public function getHeaders(): array;
}
21 changes: 21 additions & 0 deletions src/Support/Testing/FakePublisher.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
<?php

namespace Convenia\Pigeon\Support\Testing;

use Convenia\Pigeon\Publisher\PublisherConcern;
use Convenia\Pigeon\Publisher\PublisherContract;

class FakePublisher implements PublisherContract
{
use PublisherConcern;

public function bind(string $queue): PublisherContract
{
throw new \Exception('Pigeon Fake does not support binding');
}

public function publish(array $message, array $properties = [], int $channelId = null)
{
throw new \Exception('Pigeon Fake does not support publishing');
}
}
Loading

0 comments on commit 7618ecf

Please sign in to comment.