1515
1616namespace Drift \CommandBus \DependencyInjection \CompilerPass ;
1717
18+ use Drift \AMQP \DependencyInjection \CompilerPass \AMQPCompilerPass ;
1819use Drift \CommandBus \Async \AMQPAdapter ;
1920use Drift \CommandBus \Async \AsyncAdapter ;
2021use Drift \CommandBus \Async \InMemoryAdapter ;
3233use Drift \CommandBus \Middleware \AsyncMiddleware ;
3334use Drift \CommandBus \Middleware \HandlerMiddleware ;
3435use Drift \CommandBus \Middleware \Middleware ;
36+ use Drift \Postgresql \DependencyInjection \CompilerPass \PostgresqlCompilerPass ;
37+ use Drift \Redis \DependencyInjection \CompilerPass \RedisCompilerPass ;
38+ use Exception ;
3539use React \EventLoop \LoopInterface ;
3640use ReflectionClass ;
3741use ReflectionException ;
@@ -105,7 +109,7 @@ public function createAsyncMiddleware(ContainerBuilder $container): bool
105109 $ this ->createPostgreSQLAsyncAdapter ($ container , $ adapter );
106110 break ;
107111 default :
108- return false ;
112+ throw new Exception ( ' Wrong adapter ' ) ;
109113 }
110114
111115 $ container ->setDefinition (AsyncMiddleware::class,
@@ -474,11 +478,14 @@ private function createRedisAsyncAdapter(
474478 ContainerBuilder $ container ,
475479 array $ adapter
476480 ) {
481+ $ adapter ['preload ' ] = true ;
482+ RedisCompilerPass::createClient ($ container , 'command_bus ' , $ adapter );
483+
477484 $ container ->setDefinition (
478485 AsyncAdapter::class,
479486 (
480487 new Definition (RedisAdapter::class, [
481- new Reference ('redis. ' . $ adapter [ ' client ' ]. ' _client ' ),
488+ new Reference ('redis.command_bus_client ' ),
482489 new Reference ('reactphp.event_loop ' ),
483490 $ adapter ['key ' ] ?? 'commands ' ,
484491 ])
@@ -496,13 +503,18 @@ private function createPostgreSQLAsyncAdapter(
496503 ContainerBuilder $ container ,
497504 array $ adapter
498505 ) {
506+ $ channel = $ adapter ['channel ' ] ?? 'commands ' ;
507+ unset($ adapter ['channel ' ]);
508+
509+ PostgresqlCompilerPass::createclient ($ container , 'command_bus ' , $ adapter );
510+
499511 $ container ->setDefinition (
500512 AsyncAdapter::class,
501513 (
502514 new Definition (PostgreSQLAdapter::class, [
503- new Reference ('postgresql. ' . $ adapter [ ' client ' ]. ' _client ' ),
515+ new Reference ('postgresql.command_bus_client ' ),
504516 new Reference ('reactphp.event_loop ' ),
505- $ adapter [ ' channel ' ] ?? ' commands ' ,
517+ $ channel ,
506518 ])
507519 )->setLazy (true )
508520 );
@@ -518,11 +530,14 @@ private function createAMQPAsyncAdapter(
518530 ContainerBuilder $ container ,
519531 array $ adapter
520532 ) {
533+ $ adapter ['preload ' ] = true ;
534+ AMQPCompilerPass::registerClient ($ container , 'command_bus ' , $ adapter );
535+
521536 $ container ->setDefinition (
522537 AsyncAdapter::class,
523538 (
524539 new Definition (AMQPAdapter::class, [
525- new Reference ('amqp. ' . $ adapter [ ' client ' ]. ' _channel ' ),
540+ new Reference ('amqp.command_bus_channel ' ),
526541 new Reference ('reactphp.event_loop ' ),
527542 $ adapter ['queue ' ] ?? 'commands ' ,
528543 ])
0 commit comments