Skip to content

Commit

Permalink
Merge pull request #1 from williamespindola/feature/silex-updrage-com…
Browse files Browse the repository at this point in the history
…patibility

Silex updrage compatibility
  • Loading branch information
williamespindola authored Nov 22, 2016
2 parents b15230e + 9eb92fa commit 9518896
Show file tree
Hide file tree
Showing 3 changed files with 122 additions and 110 deletions.
4 changes: 2 additions & 2 deletions composer.json
Original file line number Diff line number Diff line change
Expand Up @@ -14,9 +14,9 @@
],
"require": {
"php": ">=5.5.0",
"silex/silex": "~1.3",
"silex/silex": "~2.0",
"php-amqplib/rabbitmq-bundle": "~1.9",
"ivoba/console-service-provider": "~2.0"
"ivoba/console-service-provider": "~3.0"
},
"require-dev": {
"phpunit/phpunit": "~4.8"
Expand Down
158 changes: 77 additions & 81 deletions src/fiunchinho/Silex/Provider/RabbitServiceProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,109 +10,105 @@
use OldSound\RabbitMqBundle\RabbitMq\RpcServer;
use PhpAmqpLib\Connection\AMQPConnection;
use PhpAmqpLib\Connection\AMQPLazyConnection;
use Silex\Application;
use Silex\ServiceProviderInterface;
use Pimple\Container;
use Pimple\ServiceProviderInterface;

class RabbitServiceProvider implements ServiceProviderInterface
{
const DEFAULT_CONNECTION = 'default';

public function register(Application $app)
{
$this->loadConnections($app);
$this->loadProducers($app);
$this->loadConsumers($app);
$this->loadAnonymousConsumers($app);
$this->loadMultipleConsumers($app);
$this->loadRpcClients($app);
$this->loadRpcServers($app);
}

public function boot(Application $app)
public function register(Container $container)
{
$this->loadConnections($container);
$this->loadProducers($container);
$this->loadConsumers($container);
$this->loadAnonymousConsumers($container);
$this->loadMultipleConsumers($container);
$this->loadRpcClients($container);
$this->loadRpcServers($container);
}

/**
* @param Application $app
* @param Container $container
* @throws \InvalidArgumentException
*/
private function loadConnections(Application $app)
private function loadConnections(Container $container)
{
$app['rabbit.connection'] = $app->share(function ($app) {
if (!isset($app['rabbit.connections'])) {
$container['rabbit.connection'] = function ($container) {
if (!isset($container['rabbit.connections'])) {
throw new \InvalidArgumentException('You need to specify at least a connection in your configuration.');
}

$connections = [];

foreach ($app['rabbit.connections'] as $name => $options) {
foreach ($container['rabbit.connections'] as $name => $options) {
$lazyConnection = false;

if (isset($app['rabbit.connections'][$name]['lazy'])) {
if ($app['rabbit.connections'][$name]['lazy'] === true) {
if (isset($container['rabbit.connections'][$name]['lazy'])) {
if ($container['rabbit.connections'][$name]['lazy'] === true) {
$lazyConnection = true;
}
}

switch ($lazyConnection) {
case (true):
$connection = new AMQPLazyConnection(
$app['rabbit.connections'][$name]['host'],
$app['rabbit.connections'][$name]['port'],
$app['rabbit.connections'][$name]['user'],
$app['rabbit.connections'][$name]['password'],
$app['rabbit.connections'][$name]['vhost']
$container['rabbit.connections'][$name]['host'],
$container['rabbit.connections'][$name]['port'],
$container['rabbit.connections'][$name]['user'],
$container['rabbit.connections'][$name]['password'],
$container['rabbit.connections'][$name]['vhost']
);
break;
default:
$connection = new AMQPConnection(
$app['rabbit.connections'][$name]['host'],
$app['rabbit.connections'][$name]['port'],
$app['rabbit.connections'][$name]['user'],
$app['rabbit.connections'][$name]['password'],
$app['rabbit.connections'][$name]['vhost']
$container['rabbit.connections'][$name]['host'],
$container['rabbit.connections'][$name]['port'],
$container['rabbit.connections'][$name]['user'],
$container['rabbit.connections'][$name]['password'],
$container['rabbit.connections'][$name]['vhost']
);
}

$connections[$name] = $connection;
}

return $connections;
});
};
}

/**
* @param Application $app
* @param Container $container
* @param array $options
* @param array $connections
* @return AMQPLazyConnection|AMQPConnection
* @throws \InvalidArgumentException
*/
private function getConnection(Application $app, $options, $connections)
private function getConnection(Container $container, $options, $connections)
{
$connection_name = $options['connection']?: self::DEFAULT_CONNECTION;

if (!isset($connections[$connection_name])) {
throw new \InvalidArgumentException('Configuration for connection [' . $connection_name . '] not found');
}

return $app['rabbit.connection'][$connection_name];
return $container['rabbit.connection'][$connection_name];
}

/**
* @param Application $app
* @param Container $container
*/
private function loadProducers(Application $app)
private function loadProducers(Container $container)
{
$app['rabbit.producer'] = $app->share(function ($app) {
if (!isset($app['rabbit.producers'])) {
$container['rabbit.producer'] = function ($container) {
if (!isset($container['rabbit.producers'])) {
return null;
}

$producers = [];

foreach ($app['rabbit.producers'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.producers'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);

$producer = new Producer($connection);
$producer->setExchangeOptions($options['exchange_options']);
Expand All @@ -131,27 +127,27 @@ private function loadProducers(Application $app)
}

return $producers;
});
};
}

/**
* @param Application $app
* @param Container $container
*/
private function loadConsumers(Application $app)
private function loadConsumers(Container $container)
{
$app['rabbit.consumer'] = $app->share(function ($app) {
if (!isset($app['rabbit.consumers'])) {
$container['rabbit.consumer'] = function ($container) {
if (!isset($container['rabbit.consumers'])) {
return null;
}

$consumers = [];

foreach ($app['rabbit.consumers'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.consumers'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);
$consumer = new Consumer($connection);
$consumer->setExchangeOptions($options['exchange_options']);
$consumer->setQueueOptions($options['queue_options']);
$consumer->setCallback(array($app[$options['callback']], 'execute'));
$consumer->setCallback(array($container[$options['callback']], 'execute'));

if (array_key_exists('qos_options', $options)) {
$consumer->setQosOptions(
Expand All @@ -173,54 +169,54 @@ private function loadConsumers(Application $app)
}

return $consumers;
});
};
}

/**
* @param Application $app
* @param Container $container
*/
private function loadAnonymousConsumers(Application $app)
private function loadAnonymousConsumers(Container $container)
{
$app['rabbit.anonymous_consumer'] = $app->share(function ($app) {
if (!isset($app['rabbit.anon_consumers'])) {
$container['rabbit.anonymous_consumer'] = function ($container) {
if (!isset($container['rabbit.anon_consumers'])) {
return null;
}

$consumers = [];

foreach ($app['rabbit.anon_consumers'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.anon_consumers'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);
$consumer = new AnonConsumer($connection);
$consumer->setExchangeOptions($options['exchange_options']);
$consumer->setCallback(array($app[$options['callback']], 'execute'));
$consumer->setCallback(array($container[$options['callback']], 'execute'));

$consumers[$name] = $consumer;
}

return $consumers;
});
};
}

/**
* @param Application $app
* @param Container $container
*/
private function loadMultipleConsumers(Application $app)
private function loadMultipleConsumers(Container $container)
{
$app['rabbit.multiple_consumer'] = $app->share(function ($app) {
if (!isset($app['rabbit.multiple_consumers'])) {
$container['rabbit.multiple_consumer'] = function ($container) {
if (!isset($container['rabbit.multiple_consumers'])) {
return null;
}

$consumers = [];

foreach ($app['rabbit.multiple_consumers'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.multiple_consumers'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);
$consumer = new MultipleConsumer($connection);
$consumer->setExchangeOptions($options['exchange_options']);

foreach ($options['queues'] as &$queue) {
if (isset($queue['callback'])) {
$queue['callback'] = array($app[$queue['callback']], 'execute');
$queue['callback'] = array($container[$queue['callback']], 'execute');
}
}

Expand All @@ -246,24 +242,24 @@ private function loadMultipleConsumers(Application $app)
}

return $consumers;
});
};

}

/**
* @param Application $app
* @param Container $container
*/
private function loadRpcClients(Application $app)
private function loadRpcClients(Container $container)
{
$app['rabbit.rpc_client'] = $app->share(function ($app) {
if (!isset($app['rabbit.rpc_clients'])) {
$container['rabbit.rpc_client'] = function ($container) {
if (!isset($container['rabbit.rpc_clients'])) {
return null;
}

$clients = [];

foreach ($app['rabbit.rpc_clients'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.rpc_clients'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);
$client = new RpcClient($connection);

if (array_key_exists('expect_serialized_response', $options)) {
Expand All @@ -274,26 +270,26 @@ private function loadRpcClients(Application $app)
}

return $clients;
});
};
}

/**
* @param Application $app
* @param Container $container
*/
private function loadRpcServers(Application $app)
private function loadRpcServers(Container $container)
{
$app['rabbit.rpc_server'] = $app->share(function ($app) {
if (!isset($app['rabbit.rpc_servers'])) {
$container['rabbit.rpc_server'] = function ($container) {
if (!isset($container['rabbit.rpc_servers'])) {
return null;
}

$servers = [];

foreach ($app['rabbit.rpc_servers'] as $name => $options) {
$connection = $this->getConnection($app, $options, $app['rabbit.connections']);
foreach ($container['rabbit.rpc_servers'] as $name => $options) {
$connection = $this->getConnection($container, $options, $container['rabbit.connections']);
$server = new RpcServer($connection);
$server->initServer($name);
$server->setCallback(array($app[$options['callback']], 'execute'));
$server->setCallback(array($container[$options['callback']], 'execute'));

if (array_key_exists('qos_options', $options)) {
$server->setQosOptions(
Expand All @@ -307,7 +303,7 @@ private function loadRpcServers(Application $app)
}

return $servers;
});
};

}
}
Loading

0 comments on commit 9518896

Please sign in to comment.