Skip to content

Commit c52dc68

Browse files
authored
Merge pull request #13 from driftphp/feature/command-bus-distribution
Added alternative command_bus distribution
2 parents 1536a35 + 9141896 commit c52dc68

File tree

8 files changed

+174
-7
lines changed

8 files changed

+174
-7
lines changed

Bus/Bus.php

Lines changed: 62 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -17,32 +17,57 @@
1717

1818
use Drift\CommandBus\Exception\InvalidCommandException;
1919
use Drift\CommandBus\Middleware\DebugableMiddleware;
20-
use Drift\CommandBus\Middleware\Middleware;
20+
use React\EventLoop\LoopInterface;
21+
use React\Promise\Deferred;
2122

2223
/**
2324
* Interface Bus.
2425
*/
2526
abstract class Bus
2627
{
28+
/**
29+
* @var string
30+
*/
31+
const DISTRIBUTION_INLINE = 'inline';
32+
33+
/**
34+
* @var string
35+
*/
36+
const DISTRIBUTION_NEXT_TICK = 'next_tick';
37+
2738
/**
2839
* @var callable
2940
*/
3041
private $middlewareChain;
3142

43+
/**
44+
* @var LoopInterface
45+
*/
46+
private $loop;
47+
3248
/**
3349
* @var array
3450
*/
3551
private $middleware;
3652

3753
/**
38-
* @param array $middleware
54+
* @param LoopInterface $loop
55+
* @param array $middleware
56+
* @param string $distribution
3957
*/
40-
public function __construct(array $middleware)
41-
{
58+
public function __construct(
59+
LoopInterface $loop,
60+
array $middleware,
61+
string $distribution
62+
) {
63+
$this->loop = $loop;
4264
$this->middleware = array_map(function (DebugableMiddleware $middleware) {
4365
return $middleware->getMiddlewareInfo();
4466
}, $middleware);
45-
$this->middlewareChain = $this->createExecutionChain($middleware);
67+
68+
$this->middlewareChain = self::DISTRIBUTION_NEXT_TICK === $distribution
69+
? $this->createNextTickExecutionChain($middleware)
70+
: $this->createInlineExecutionChain($middleware);
4671
}
4772

4873
/**
@@ -64,13 +89,13 @@ protected function handle($command)
6489
}
6590

6691
/**
67-
* Create execution chain.
92+
* Create inline execution chain.
6893
*
6994
* @param array $middlewareList
7095
*
7196
* @return callable
7297
*/
73-
private function createExecutionChain($middlewareList)
98+
private function createInlineExecutionChain($middlewareList)
7499
{
75100
$lastCallable = function () {};
76101

@@ -83,6 +108,36 @@ private function createExecutionChain($middlewareList)
83108
return $lastCallable;
84109
}
85110

111+
/**
112+
* Create next tick execution chain.
113+
*
114+
* @param array $middlewareList
115+
*
116+
* @return callable
117+
*/
118+
private function createNextTickExecutionChain($middlewareList)
119+
{
120+
$lastCallable = function () {};
121+
122+
while ($middleware = array_pop($middlewareList)) {
123+
$lastCallable = function ($command) use ($middleware, $lastCallable) {
124+
$deferred = new Deferred();
125+
$this
126+
->loop
127+
->futureTick(function () use ($deferred, $middleware, $command, $lastCallable) {
128+
$deferred->resolve($middleware->execute(
129+
$command,
130+
$lastCallable
131+
));
132+
});
133+
134+
return $deferred->promise();
135+
};
136+
}
137+
138+
return $lastCallable;
139+
}
140+
86141
/**
87142
* Get middleware list.
88143
*

DependencyInjection/CommandBusConfiguration.php

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
namespace Drift\CommandBus\DependencyInjection;
1717

18+
use Drift\CommandBus\Bus\Bus;
1819
use Mmoreram\BaseBundle\DependencyInjection\BaseConfiguration;
1920
use Symfony\Component\Config\Definition\Builder\ArrayNodeDefinition;
2021

@@ -36,6 +37,10 @@ protected function setupTree(ArrayNodeDefinition $rootNode)
3637
->arrayNode('query_bus')
3738
->addDefaultsIfNotSet()
3839
->children()
40+
->enumNode('distribution')
41+
->values([Bus::DISTRIBUTION_INLINE, Bus::DISTRIBUTION_NEXT_TICK])
42+
->defaultValue(Bus::DISTRIBUTION_INLINE)
43+
->end()
3944
->arrayNode('middlewares')
4045
->scalarPrototype()
4146
->defaultValue([])
@@ -47,6 +52,10 @@ protected function setupTree(ArrayNodeDefinition $rootNode)
4752
->arrayNode('command_bus')
4853
->addDefaultsIfNotSet()
4954
->children()
55+
->enumNode('distribution')
56+
->values([Bus::DISTRIBUTION_INLINE, Bus::DISTRIBUTION_NEXT_TICK])
57+
->defaultValue(Bus::DISTRIBUTION_INLINE)
58+
->end()
5059
->arrayNode('middlewares')
5160
->scalarPrototype()
5261
->defaultValue([])

DependencyInjection/CommandBusExtension.php

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -65,7 +65,9 @@ protected function getParametrizationValues(array $config): array
6565
{
6666
return [
6767
'bus.query_bus.middlewares' => $config['query_bus']['middlewares'],
68+
'bus.query_bus.distribution' => $config['query_bus']['distribution'],
6869
'bus.command_bus.middlewares' => $config['command_bus']['middlewares'],
70+
'bus.command_bus.distribution' => $config['command_bus']['distribution'],
6971
'bus.command_bus.async_adapter' => $config['command_bus']['async_adapter'] ?? false,
7072
];
7173
}

DependencyInjection/CompilerPass/BusCompilerPass.php

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828
use Drift\CommandBus\Middleware\AsyncMiddleware;
2929
use Drift\CommandBus\Middleware\HandlerMiddleware;
3030
use Drift\CommandBus\Middleware\Middleware;
31+
use React\EventLoop\LoopInterface;
3132
use ReflectionClass;
3233
use ReflectionException;
3334
use Symfony\Component\DependencyInjection\Compiler\CompilerPassInterface;
@@ -172,11 +173,13 @@ private function createQueryBus(ContainerBuilder $container)
172173
{
173174
$container->setDefinition('drift.query_bus', (new Definition(
174175
QueryBus::class, [
176+
new Reference(LoopInterface::class),
175177
$this->createMiddlewaresArray(
176178
$container,
177179
'query',
178180
false
179181
),
182+
$container->getParameter('bus.query_bus.distribution'),
180183
]
181184
))
182185
->addTag('preload')
@@ -198,11 +201,13 @@ private function createCommandBus(
198201
) {
199202
$container->setDefinition('drift.command_bus', (new Definition(
200203
CommandBus::class, [
204+
new Reference(LoopInterface::class),
201205
$this->createMiddlewaresArray(
202206
$container,
203207
'command',
204208
$asyncBus
205209
),
210+
$container->getParameter('bus.command_bus.distribution'),
206211
]
207212
))
208213
->addTag('preload')
@@ -221,11 +226,13 @@ private function createInlineCommandBus(ContainerBuilder $container)
221226
{
222227
$container->setDefinition('drift.inline_command_bus', (new Definition(
223228
InlineCommandBus::class, [
229+
new Reference(LoopInterface::class),
224230
$this->createMiddlewaresArray(
225231
$container,
226232
'command',
227233
false
228234
),
235+
$container->getParameter('bus.command_bus.distribution'),
229236
]
230237
))
231238
->addTag('preload')

Tests/Bus/CommandHandlerTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
namespace Drift\CommandBus\Tests\Bus;
1717

18+
use Drift\CommandBus\Bus\Bus;
1819
use Drift\CommandBus\Middleware\HandlerMiddleware;
1920
use Drift\CommandBus\Tests\BusFunctionalTest;
2021
use Drift\CommandBus\Tests\Command\ChangeAThing;
@@ -44,9 +45,23 @@ protected static function decorateConfiguration(array $configuration): array
4445
],
4546
];
4647

48+
$configuration['command_bus']['command_bus']['distribution'] = self::distributedBus()
49+
? Bus::DISTRIBUTION_NEXT_TICK
50+
: Bus::DISTRIBUTION_INLINE;
51+
4752
return $configuration;
4853
}
4954

55+
/**
56+
* Create distributed bus.
57+
*
58+
* @return bool
59+
*/
60+
protected static function distributedBus(): bool
61+
{
62+
return false;
63+
}
64+
5065
/**
5166
* Test buses are being built.
5267
*/
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the DriftPHP Project
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*
9+
* Feel free to edit as you please, and have fun.
10+
*
11+
* @author Marc Morera <[email protected]>
12+
*/
13+
14+
declare(strict_types=1);
15+
16+
namespace Drift\CommandBus\Tests\Bus;
17+
18+
/**
19+
* Class DistributedCommandHandlerTest.
20+
*/
21+
class DistributedCommandHandlerTest extends CommandHandlerTest
22+
{
23+
/**
24+
* Create distributed bus.
25+
*
26+
* @return bool
27+
*/
28+
protected static function distributedBus(): bool
29+
{
30+
return true;
31+
}
32+
}
Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
<?php
2+
3+
/*
4+
* This file is part of the DriftPHP Project
5+
*
6+
* For the full copyright and license information, please view the LICENSE
7+
* file that was distributed with this source code.
8+
*
9+
* Feel free to edit as you please, and have fun.
10+
*
11+
* @author Marc Morera <[email protected]>
12+
*/
13+
14+
declare(strict_types=1);
15+
16+
namespace Drift\CommandBus\Tests\Bus;
17+
18+
/**
19+
* Class DistributedQueryHandlerTest.
20+
*/
21+
class DistributedQueryHandlerTest extends QueryHandlerTest
22+
{
23+
/**
24+
* Create distributed bus.
25+
*
26+
* @return bool
27+
*/
28+
protected static function distributedBus(): bool
29+
{
30+
return true;
31+
}
32+
}

Tests/Bus/QueryHandlerTest.php

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,6 +15,7 @@
1515

1616
namespace Drift\CommandBus\Tests\Bus;
1717

18+
use Drift\CommandBus\Bus\Bus;
1819
use Drift\CommandBus\Exception\InvalidCommandException;
1920
use Drift\CommandBus\Tests\BusFunctionalTest;
2021
use Drift\CommandBus\Tests\Context;
@@ -44,9 +45,23 @@ protected static function decorateConfiguration(array $configuration): array
4445
],
4546
];
4647

48+
$configuration['command_bus']['query_bus']['distribution'] = self::distributedBus()
49+
? Bus::DISTRIBUTION_NEXT_TICK
50+
: Bus::DISTRIBUTION_INLINE;
51+
4752
return $configuration;
4853
}
4954

55+
/**
56+
* Create distributed bus.
57+
*
58+
* @return bool
59+
*/
60+
protected static function distributedBus(): bool
61+
{
62+
return false;
63+
}
64+
5065
/**
5166
* Test buses are being built.
5267
*/

0 commit comments

Comments
 (0)