Skip to content

Commit

Permalink
Add posibility to define options directly in DSN (#20)
Browse files Browse the repository at this point in the history
* Add posibility to define options directly in DSN

---------

Co-authored-by: Victor CATARAGA <[email protected]>
Co-authored-by: Marin Bînzari <[email protected]>
  • Loading branch information
3 people authored Jan 11, 2024
1 parent b78126e commit 88381a9
Show file tree
Hide file tree
Showing 3 changed files with 93 additions and 1 deletion.
6 changes: 6 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,12 @@ framework:
results_limit: <how_many_messages_to_read>
```
Options can be configured via the DSN or via the options key under the transport in `messenger.yaml`. Example:

```
MESSENGER_TRANSPORT_DSN=azurequeue://<account_name>:<account_key>@default?queue_name=<your_queue_name>&visibility_timeout=<visibility_timeout_in_seconds>&time_to_live=<time_to_live_in_seconds>&results_limit=<how_many_messages_to_read>
```
Don't forget to create the queue with the supplied name in Azure Queue Storage.
## Further reading
Expand Down
64 changes: 63 additions & 1 deletion src/Transport/QueueTransportFactory.php
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

namespace Abau\MessengerAzureQueueTransport\Transport;

use Symfony\Component\Messenger\Exception\InvalidArgumentException;
use Symfony\Component\Messenger\Transport\TransportFactoryInterface;
use Symfony\Component\Messenger\Transport\Serialization\SerializerInterface;
use Symfony\Component\Messenger\Transport\TransportInterface;
Expand All @@ -11,12 +12,23 @@
*/
class QueueTransportFactory implements TransportFactoryInterface
{
private const OPTIONS_TYPE_MAPPING = [
'queue_name' => 'string',
'visibility_timeout' => 'integer',
'time_to_live' => 'integer',
'results_limit' => 'integer',
];

/**
* @inheritDoc
*/
public function createTransport(string $dsn, array $options, SerializerInterface $serializer): TransportInterface
{
return new QueueTransport($dsn, $options, $serializer);
return new QueueTransport(
$dsn,
$this->computeOptions($dsn, $options),
$serializer
);
}

/**
Expand All @@ -26,4 +38,54 @@ public function supports(string $dsn, array $options): bool
{
return 0 === strpos($dsn, 'azurequeue://');
}

/**
* @param string $dsn
* @param array $options
*
* @return array
*/
private function computeOptions(string $dsn, array $options): array
{
$resultOptions = $options;

if (false === $parsedUrl = parse_url($dsn)) {
throw new InvalidArgumentException('The given DSN is invalid.');
}

if (isset($parsedUrl['query'])) {
parse_str($parsedUrl['query'], $dsnOptions);
$resultOptions = array_merge($options, $dsnOptions);
}

return $this->castOptionsToTheRightType($resultOptions);
}

/**
* @param array $options
*
* @return array
*/
private function castOptionsToTheRightType(array $options): array
{
foreach ($options as $key => $value) {
if (!isset(self::OPTIONS_TYPE_MAPPING[$key])) {
continue;
}

switch (self::OPTIONS_TYPE_MAPPING[$key]) {
case 'integer':
$options[$key] = filter_var($value, \FILTER_VALIDATE_INT);
break;
case 'string':
$options[$key] = (string)$value;
break;
default:
$options[$key] = $value;
break;
}
}

return $options;
}
}
24 changes: 24 additions & 0 deletions tests/QueueTransportFactoryTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -40,4 +40,28 @@ public function testCanCreateTransport()
{
$this->assertInstanceOf(QueueTransport::class, self::$factory->createTransport('', [], $this->serializer));
}

public function testCanDefineOptionsInDsn(): void
{
$options = [
'queue_name' => 'queue_from_options',
'visibility_timeout' => 0,
'time_to_live' => 0,
'results_limit' => 0,
];

$dsn = 'azurequeue://username:password@default?queue_name=queue_from_dsn&visibility_timeout=1&time_to_live=1&results_limit=1';

$transport = self::$factory->createTransport($dsn, $options, $this->serializer);

$reflectionClass = new ReflectionClass($transport);
$reflectionProperty = $reflectionClass->getProperty('options');
$reflectionProperty->setAccessible(true);
$transportOptions = $reflectionProperty->getValue($transport);

$this->assertSame('queue_from_dsn', $transportOptions['queue_name']);
$this->assertSame(1, $transportOptions['visibility_timeout']);
$this->assertSame(1, $transportOptions['time_to_live']);
$this->assertSame(1, $transportOptions['results_limit']);
}
}

0 comments on commit 88381a9

Please sign in to comment.