From ae92786a1be47f57455a8190b51a1a0ac9f334dc Mon Sep 17 00:00:00 2001 From: prolic Date: Mon, 20 Apr 2020 22:24:57 -0400 Subject: [PATCH 1/8] refactor to use amphp and event-store v8 --- .travis.yml | 10 +- composer.json | 21 +- .../Infrastructure/InMemoryEmailGuard.php | 7 +- .../UserAggregateDefinition.php | 36 -- examples/Infrastructure/UserSpecification.php | 70 +++ examples/Infrastructure/factories.php | 61 -- examples/Model/Command/ChangeUserName.php | 32 +- examples/Model/Command/InvalidCommand.php | 28 - examples/Model/Command/RegisterUser.php | 32 +- examples/Model/Command/UnknownCommand.php | 20 +- examples/Model/Event/UserNameChanged.php | 44 ++ examples/Model/Event/UserNameWasChanged.php | 32 -- examples/Model/Event/UserRegistered.php | 49 ++ .../UserRegisteredWithDuplicateEmail.php | 49 ++ examples/Model/Event/UserWasRegistered.php | 37 -- .../UserWasRegisteredWithDuplicateEmail.php | 37 -- examples/Model/UniqueEmailGuard.php | 5 +- examples/Model/User.php | 57 +- examples/register_and_change_username.php | 97 ++-- examples/user_snapshotter.php | 45 -- src/AbstractAggregateDefinition.php | 127 ----- src/AggregateDefinition.php | 52 -- src/CommandSpecification.php | 68 +++ src/Kernel.php | 285 ++-------- src/SnapshotReadModel.php | 114 ---- tests/AbstractAggregateDefinitionTest.php | 187 ------- tests/KernelTest.php | 528 ------------------ tests/SnapshotReadModelTest.php | 156 ------ ...eamPerAggregateTestAggregateDefinition.php | 83 --- .../SingleStreamTestAggregateDefinition.php | 83 --- .../SingleStreamTestAggregateDefinition2.php | 83 --- tests/TestAsset/TestDomainEvent.php | 47 -- tests/TestAsset/TestState.php | 21 - 33 files changed, 492 insertions(+), 2111 deletions(-) delete mode 100644 examples/Infrastructure/UserAggregateDefinition.php create mode 100644 examples/Infrastructure/UserSpecification.php delete mode 100644 examples/Infrastructure/factories.php delete mode 100644 examples/Model/Command/InvalidCommand.php create mode 100644 examples/Model/Event/UserNameChanged.php delete mode 100644 examples/Model/Event/UserNameWasChanged.php create mode 100644 examples/Model/Event/UserRegistered.php create mode 100644 examples/Model/Event/UserRegisteredWithDuplicateEmail.php delete mode 100644 examples/Model/Event/UserWasRegistered.php delete mode 100644 examples/Model/Event/UserWasRegisteredWithDuplicateEmail.php delete mode 100644 examples/user_snapshotter.php delete mode 100644 src/AbstractAggregateDefinition.php delete mode 100644 src/AggregateDefinition.php create mode 100644 src/CommandSpecification.php delete mode 100644 src/SnapshotReadModel.php delete mode 100644 tests/AbstractAggregateDefinitionTest.php delete mode 100644 tests/KernelTest.php delete mode 100644 tests/SnapshotReadModelTest.php delete mode 100644 tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php delete mode 100644 tests/TestAsset/SingleStreamTestAggregateDefinition.php delete mode 100644 tests/TestAsset/SingleStreamTestAggregateDefinition2.php delete mode 100644 tests/TestAsset/TestDomainEvent.php delete mode 100644 tests/TestAsset/TestState.php diff --git a/.travis.yml b/.travis.yml index 4784e08..d2ba29f 100644 --- a/.travis.yml +++ b/.travis.yml @@ -3,18 +3,12 @@ language: php matrix: fast_finish: true include: - - php: 7.1 + - php: 7.4 env: - DEPENDENCIES="" - EXECUTE_CS_CHECK=true - TEST_COVERAGE=true - - php: 7.1 - env: - - DEPENDENCIES="--prefer-lowest --prefer-stable" - - php: 7.2 - env: - - DEPENDENCIES="" - - php: 7.2 + - php: 7.4 env: - DEPENDENCIES="--prefer-lowest --prefer-stable" diff --git a/composer.json b/composer.json index 38ab9bc..66162a7 100755 --- a/composer.json +++ b/composer.json @@ -22,23 +22,20 @@ "prooph" ], "require": { - "php": "^7.1", - "prooph/event-store": "^7.3.1", - "prooph/pdo-event-store": "^1.6", - "prooph/snapshot-store": "^1.2", - "phunkie/phunkie": "0.10.1" + "php": "^7.4", + "amphp/amp": "^2.4.3", + "prooph/event-store": "dev-master", + "phunkie/phunkie": "0.11.1" }, "require-dev": { - "phpunit/phpunit": "^6.0", - "phpspec/prophecy": "^1.7", - "prooph/php-cs-fixer-config": "^0.2.1", + "phpspec/prophecy": "^1.10.3", + "phpspec/prophecy-phpunit": "^2.0", + "phpunit/phpunit": "^9.1", + "prooph/event-store-client": "dev-master", + "prooph/php-cs-fixer-config": "^0.3.1", "php-coveralls/php-coveralls": "^2.1", "malukenho/docheader": "^0.1.4" }, - "suggest": { - "prooph/pdo-snapshot-store": "For PDO as snapshot store", - "prooph/mongodb-snapshot-store": "For MongoDB as snapshot store" - }, "autoload": { "psr-4": { "Prooph\\Micro\\": "src/" diff --git a/examples/Infrastructure/InMemoryEmailGuard.php b/examples/Infrastructure/InMemoryEmailGuard.php index 6614af1..a1f4d87 100644 --- a/examples/Infrastructure/InMemoryEmailGuard.php +++ b/examples/Infrastructure/InMemoryEmailGuard.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -20,7 +21,7 @@ final class InMemoryEmailGuard implements UniqueEmailGuard public function isUnique(string $email): bool { - $isUnique = ! in_array($email, $this->knownEmails); + $isUnique = ! \in_array($email, $this->knownEmails); if ($isUnique) { $this->knownEmails[] = $email; diff --git a/examples/Infrastructure/UserAggregateDefinition.php b/examples/Infrastructure/UserAggregateDefinition.php deleted file mode 100644 index fc60915..0000000 --- a/examples/Infrastructure/UserAggregateDefinition.php +++ /dev/null @@ -1,36 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Infrastructure; - -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\StreamName; -use Prooph\Micro\AbstractAggregateDefinition; -use Prooph\MicroExample\Model\User; - -final class UserAggregateDefinition extends AbstractAggregateDefinition -{ - public function aggregateType(): string - { - return 'user'; - } - - public function streamName(): StreamName - { - return new StreamName('user_stream'); - } - - public function apply($state, Message ...$events) - { - return User\apply($state, ...$events); - } -} diff --git a/examples/Infrastructure/UserSpecification.php b/examples/Infrastructure/UserSpecification.php new file mode 100644 index 0000000..520249d --- /dev/null +++ b/examples/Infrastructure/UserSpecification.php @@ -0,0 +1,70 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\MicroExample\Infrastructure; + +use Phunkie\Types\ImmList; +use Prooph\EventStore\EventData; +use Prooph\EventStore\EventId; +use Prooph\EventStore\ResolvedEvent; +use Prooph\EventStore\Util\Json; +use Prooph\Micro\CommandSpecification; +use Prooph\MicroExample\Model\Event\UserNameChanged; +use Prooph\MicroExample\Model\Event\UserRegistered; +use Prooph\MicroExample\Model\Event\UserRegisteredWithDuplicateEmail; +use Prooph\MicroExample\Model\User; + +final class UserSpecification extends CommandSpecification +{ + public function mapToEventData(object $event): EventData + { + return new EventData( + EventId::generate(), + $event->messageName(), + true, + Json::encode($event->payload()), + Json::encode(['causation_name' => $this->command->messageName()]) + ); + } + + public function mapToEvent(ResolvedEvent $resolvedEvent): object + { + switch ($resolvedEvent->originalEvent()->eventType()) { + case 'username-changed': + return new UserNameChanged(Json::decode($resolvedEvent->originalEvent()->data())); + case 'user-registered': + return new UserRegistered(Json::decode($resolvedEvent->originalEvent()->data())); + case 'user-registered-with-duplicate-email': + return new UserRegisteredWithDuplicateEmail(Json::decode($resolvedEvent->originalEvent()->data())); + default: + throw new \UnexpectedValueException( + 'Unknown event type ' . $resolvedEvent->originalEvent()->eventType() . ' returned' + ); + } + } + + public function initialState() + { + return []; + } + + public function streamName(): string + { + return 'user-' . $this->command->payload()['id']; + } + + public function apply($state, ImmList $events) + { + return User\apply($state, $events); + } +} diff --git a/examples/Infrastructure/factories.php b/examples/Infrastructure/factories.php deleted file mode 100644 index bd3ae94..0000000 --- a/examples/Infrastructure/factories.php +++ /dev/null @@ -1,61 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -use Prooph\EventStore\EventStore; -use Prooph\EventStore\InMemoryEventStore; -use Prooph\EventStore\Projection\InMemoryProjectionManager; -use Prooph\MicroExample\Infrastructure\InMemoryEmailGuard; -use Prooph\MicroExample\Model\UniqueEmailGuard; -use Prooph\SnapshotStore\InMemorySnapshotStore; -use Prooph\SnapshotStore\SnapshotStore; - -$factories = [ - 'eventStore' => function (): EventStore { - static $eventStore = null; - - if (null === $eventStore) { - $eventStore = new InMemoryEventStore(); - } - - return $eventStore; - }, - 'snapshotStore' => function (): SnapshotStore { - static $snapshotStore = null; - - if (null === $snapshotStore) { - $snapshotStore = new InMemorySnapshotStore(); - } - - return $snapshotStore; - }, - 'emailGuard' => function (): UniqueEmailGuard { - static $emailGuard = null; - - if (null === $emailGuard) { - $emailGuard = new InMemoryEmailGuard(); - } - - return $emailGuard; - }, -]; - -$factories['projectionManager'] = function () use (&$factories): InMemoryProjectionManager { - static $manager = null; - - if (null === $manager) { - $manager = new InMemoryProjectionManager($factories['eventStore']()); - } - - return $manager; -}; - -return $factories; diff --git a/examples/Model/Command/ChangeUserName.php b/examples/Model/Command/ChangeUserName.php index 5d5e20e..a88abfd 100644 --- a/examples/Model/Command/ChangeUserName.php +++ b/examples/Model/Command/ChangeUserName.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,20 +13,31 @@ namespace Prooph\MicroExample\Model\Command; -use Prooph\Common\Messaging\Command; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -final class ChangeUserName extends Command implements PayloadConstructable +final class ChangeUserName { - use PayloadTrait; + protected array $payload; + + public function __construct(array $payload = []) + { + $this->payload = $payload; + } + + public function messageName(): string + { + return 'change-username'; + } + + public function payload(): array + { + return $this->payload; + } - public function userId(): string + public function id(): string { return $this->payload()['id']; } - public function username(): string + public function name(): string { return $this->payload()['name']; } diff --git a/examples/Model/Command/InvalidCommand.php b/examples/Model/Command/InvalidCommand.php deleted file mode 100644 index fdfbf76..0000000 --- a/examples/Model/Command/InvalidCommand.php +++ /dev/null @@ -1,28 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Model\Command; - -use Prooph\Common\Messaging\Command; - -final class InvalidCommand extends Command -{ - protected function setPayload(array $payload): void - { - // do nothing - } - - public function payload(): array - { - return []; - } -} diff --git a/examples/Model/Command/RegisterUser.php b/examples/Model/Command/RegisterUser.php index 8be2154..01ac46d 100644 --- a/examples/Model/Command/RegisterUser.php +++ b/examples/Model/Command/RegisterUser.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,20 +13,31 @@ namespace Prooph\MicroExample\Model\Command; -use Prooph\Common\Messaging\Command; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -final class RegisterUser extends Command implements PayloadConstructable +final class RegisterUser { - use PayloadTrait; + protected array $payload; + + public function __construct(array $payload = []) + { + $this->payload = $payload; + } + + public function messageName(): string + { + return 'register-user'; + } + + public function payload(): array + { + return $this->payload; + } - public function userId(): string + public function id(): string { return $this->payload()['id']; } - public function userName(): string + public function name(): string { return $this->payload()['name']; } diff --git a/examples/Model/Command/UnknownCommand.php b/examples/Model/Command/UnknownCommand.php index d89d8ad..774101b 100644 --- a/examples/Model/Command/UnknownCommand.php +++ b/examples/Model/Command/UnknownCommand.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,19 +13,6 @@ namespace Prooph\MicroExample\Model\Command; -use Prooph\Common\Messaging\Command; - -final class UnknownCommand extends Command +final class UnknownCommand { - protected $messageName = 'unknown command'; - - protected function setPayload(array $payload): void - { - // do nothing - } - - public function payload(): array - { - return []; - } } diff --git a/examples/Model/Event/UserNameChanged.php b/examples/Model/Event/UserNameChanged.php new file mode 100644 index 0000000..e7a53e9 --- /dev/null +++ b/examples/Model/Event/UserNameChanged.php @@ -0,0 +1,44 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\MicroExample\Model\Event; + +final class UserNameChanged +{ + protected array $payload; + + public function __construct(array $payload = []) + { + $this->payload = $payload; + } + + public function messageName(): string + { + return 'user-name-changed'; + } + + public function payload(): array + { + return $this->payload; + } + + public function id(): string + { + return $this->payload()['id']; + } + + public function name(): string + { + return $this->payload()['name']; + } +} diff --git a/examples/Model/Event/UserNameWasChanged.php b/examples/Model/Event/UserNameWasChanged.php deleted file mode 100644 index b7bfd13..0000000 --- a/examples/Model/Event/UserNameWasChanged.php +++ /dev/null @@ -1,32 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Model\Event; - -use Prooph\Common\Messaging\DomainEvent; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -final class UserNameWasChanged extends DomainEvent implements PayloadConstructable -{ - use PayloadTrait; - - public function userId(): string - { - return $this->payload()['id']; - } - - public function username(): string - { - return $this->payload()['name']; - } -} diff --git a/examples/Model/Event/UserRegistered.php b/examples/Model/Event/UserRegistered.php new file mode 100644 index 0000000..5d47540 --- /dev/null +++ b/examples/Model/Event/UserRegistered.php @@ -0,0 +1,49 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\MicroExample\Model\Event; + +final class UserRegistered +{ + protected array $payload; + + public function __construct(array $payload = []) + { + $this->payload = $payload; + } + + public function messageName(): string + { + return 'user-registered'; + } + + public function payload(): array + { + return $this->payload; + } + + public function id(): string + { + return $this->payload()['id']; + } + + public function name(): string + { + return $this->payload()['name']; + } + + public function email(): string + { + return $this->payload()['email']; + } +} diff --git a/examples/Model/Event/UserRegisteredWithDuplicateEmail.php b/examples/Model/Event/UserRegisteredWithDuplicateEmail.php new file mode 100644 index 0000000..29921e7 --- /dev/null +++ b/examples/Model/Event/UserRegisteredWithDuplicateEmail.php @@ -0,0 +1,49 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\MicroExample\Model\Event; + +final class UserRegisteredWithDuplicateEmail +{ + protected array $payload; + + public function __construct(array $payload = []) + { + $this->payload = $payload; + } + + public function messageName(): string + { + return 'user-registered-with-duplicate-email'; + } + + public function payload(): array + { + return $this->payload; + } + + public function id(): string + { + return $this->payload()['id']; + } + + public function name(): string + { + return $this->payload()['name']; + } + + public function email(): string + { + return $this->payload()['email']; + } +} diff --git a/examples/Model/Event/UserWasRegistered.php b/examples/Model/Event/UserWasRegistered.php deleted file mode 100644 index 6d3759d..0000000 --- a/examples/Model/Event/UserWasRegistered.php +++ /dev/null @@ -1,37 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Model\Event; - -use Prooph\Common\Messaging\DomainEvent; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -final class UserWasRegistered extends DomainEvent implements PayloadConstructable -{ - use PayloadTrait; - - public function userId(): string - { - return $this->payload()['id']; - } - - public function userName(): string - { - return $this->payload()['name']; - } - - public function email(): string - { - return $this->payload()['email']; - } -} diff --git a/examples/Model/Event/UserWasRegisteredWithDuplicateEmail.php b/examples/Model/Event/UserWasRegisteredWithDuplicateEmail.php deleted file mode 100644 index f128ccb..0000000 --- a/examples/Model/Event/UserWasRegisteredWithDuplicateEmail.php +++ /dev/null @@ -1,37 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Model\Event; - -use Prooph\Common\Messaging\DomainEvent; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -final class UserWasRegisteredWithDuplicateEmail extends DomainEvent implements PayloadConstructable -{ - use PayloadTrait; - - public function userId(): string - { - return $this->payload()['id']; - } - - public function userName(): string - { - return $this->payload()['name']; - } - - public function email(): string - { - return $this->payload()['email']; - } -} diff --git a/examples/Model/UniqueEmailGuard.php b/examples/Model/UniqueEmailGuard.php index 5343525..1be400f 100644 --- a/examples/Model/UniqueEmailGuard.php +++ b/examples/Model/UniqueEmailGuard.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. diff --git a/examples/Model/User.php b/examples/Model/User.php index 4fcd642..5313ecb 100644 --- a/examples/Model/User.php +++ b/examples/Model/User.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,58 +13,52 @@ namespace Prooph\MicroExample\Model\User; +use Amp\Promise; +use Amp\Success; use InvalidArgumentException; -use Prooph\Common\Messaging\Message; +use Phunkie\Types\ImmList; use Prooph\MicroExample\Model\Command\ChangeUserName; use Prooph\MicroExample\Model\Command\RegisterUser; -use Prooph\MicroExample\Model\Event\UserNameWasChanged; -use Prooph\MicroExample\Model\Event\UserWasRegistered; -use Prooph\MicroExample\Model\Event\UserWasRegisteredWithDuplicateEmail; +use Prooph\MicroExample\Model\Event\UserNameChanged; +use Prooph\MicroExample\Model\Event\UserRegistered; +use Prooph\MicroExample\Model\Event\UserRegisteredWithDuplicateEmail; use Prooph\MicroExample\Model\UniqueEmailGuard; const registerUser = '\Prooph\MicroExample\Model\User\registerUser'; -function registerUser(callable $stateResolver, RegisterUser $command, UniqueEmailGuard $guard): array +function registerUser(callable $stateResolver, RegisterUser $command, UniqueEmailGuard $guard): Promise { if ($guard->isUnique($command->email())) { - return [new UserWasRegistered($command->payload())]; + return new Success(ImmList(new UserRegistered($command->payload()))); } - return [new UserWasRegisteredWithDuplicateEmail($command->payload())]; + return new Success(ImmList(new UserRegisteredWithDuplicateEmail($command->payload()))); } const changeUserName = '\Prooph\MicroExample\Model\User\changeUserName'; -function changeUserName(callable $stateResolver, ChangeUserName $command): array +function changeUserName(callable $stateResolver, ChangeUserName $command): Promise { - if (! mb_strlen($command->username()) > 3) { + if (! \mb_strlen($command->name()) > 3) { throw new InvalidArgumentException('Username too short'); } - return [new UserNameWasChanged($command->payload())]; + return new Success(ImmList(new UserNameChanged($command->payload()))); } const apply = '\Prooph\MicroExample\Model\User\apply'; -function apply($state, Message ...$events): array +function apply($state, ImmList $events): array { - if (null === $state) { - $state = []; - } - - foreach ($events as $event) { - switch ($event->messageName()) { - case UserWasRegistered::class: - $state = array_merge($state, $event->payload(), ['activated' => true]); - break; - case UserWasRegisteredWithDuplicateEmail::class: - $state = array_merge($state, $event->payload(), ['activated' => false, 'blocked_reason' => 'duplicate email']); - break; - case UserNameWasChanged::class: - $state = array_merge($state, $event->payload()); - break; + return $events->fold($state, function ($state, $e) { + \var_dump($state, $e); + switch (\get_class($e)) { + case UserRegistered::class: + return \array_merge($state, $e->payload(), ['activated' => true]); + case UserRegisteredWithDuplicateEmail::class: + return \array_merge($state, $e->payload(), ['activated' => false, 'blocked_reason' => 'duplicate email']); + case UserNameChanged::class: + return \array_merge($state, $e->payload()); } - } - - return $state; + }); } diff --git a/examples/register_and_change_username.php b/examples/register_and_change_username.php index dc27f31..744d9a1 100644 --- a/examples/register_and_change_username.php +++ b/examples/register_and_change_username.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,45 +13,32 @@ namespace Prooph\MicroExample\Script; +use Amp\Loop; +use Amp\Promise; use Phunkie\Validation\Validation; -use Prooph\Common\Messaging\Message; +use Prooph\EventStore\UserCredentials; +use Prooph\EventStore\Util\Guid; +use Prooph\EventStoreClient\ConnectionSettings; +use Prooph\EventStoreClient\EventStoreConnectionFactory; use Prooph\Micro\Kernel; -use Prooph\MicroExample\Infrastructure\UserAggregateDefinition; +use Prooph\MicroExample\Infrastructure\InMemoryEmailGuard; +use Prooph\MicroExample\Infrastructure\UserSpecification; use Prooph\MicroExample\Model\Command\ChangeUserName; -use Prooph\MicroExample\Model\Command\InvalidCommand; use Prooph\MicroExample\Model\Command\RegisterUser; use Prooph\MicroExample\Model\Command\UnknownCommand; use Prooph\MicroExample\Model\User; -$start = microtime(true); - $autoloader = require __DIR__ . '/../vendor/autoload.php'; $autoloader->addPsr4('Prooph\\MicroExample\\', __DIR__); require 'Model/User.php'; -//We could also use a container here, if dependencies grow -$factories = include 'Infrastructure/factories.php'; - -$commandMap = [ - RegisterUser::class => [ - 'handler' => function (callable $stateResolver, Message $message) use (&$factories): array { - return User\registerUser($stateResolver, $message, $factories['emailGuard']()); - }, - 'definition' => UserAggregateDefinition::class, - ], - ChangeUserName::class => [ - 'handler' => User\changeUserName, - 'definition' => UserAggregateDefinition::class, - ], -]; - function showResult(Validation $result): void { $on = match($result); switch (true) { case $on(Success(_)): echo $result->show() . PHP_EOL; - echo json_encode($result->getOrElse('')->head()->payload()) . PHP_EOL . PHP_EOL; + echo \json_encode($result->getOrElse('')->head()->payload()) . PHP_EOL . PHP_EOL; break; case $on(Failure(_)): echo $result->show() . PHP_EOL . PHP_EOL; @@ -58,23 +46,56 @@ function showResult(Validation $result): void } } -$dispatch = Kernel\buildCommandDispatcher($factories['eventStore'](), $commandMap, $factories['snapshotStore']()); +Loop::run(function (): \Generator { + $start = \microtime(true); + + $settings = ConnectionSettings::create() + ->setDefaultUserCredentials( + new UserCredentials('admin', 'changeit') + ); + + $connection = EventStoreConnectionFactory::createFromConnectionString( + 'ConnectTo=tcp://admin:changeit@localhost:1113', + $settings->build() + ); + + $connection->onConnected(function () { + echo 'Event Store connection established' . PHP_EOL; + }); + + $connection->onClosed(function () { + echo 'Event Store connection closed' . PHP_EOL; + }); + + yield $connection->connectAsync(); + + $uniqueEmailGuard = new InMemoryEmailGuard(); + + $commandMap = ImmMap([ + ChangeUserName::class => fn (ChangeUserName $m) => new UserSpecification($m, User\changeUserName), + RegisterUser::class => fn (RegisterUser $m) => new UserSpecification($m, function (callable $stateResolver, $message) use ($uniqueEmailGuard): Promise { + return User\registerUser($stateResolver, $message, $uniqueEmailGuard); + }), + ]); + + $dispatch = Kernel\buildCommandDispatcher($connection, $commandMap); + + $userId = Guid::generateString(); -/* @var Validation $result */ -$result = $dispatch(new RegisterUser(['id' => '1', 'name' => 'Alex', 'email' => 'member@getprooph.org'])); -showResult($result); + /* @var Validation $result */ + $result = yield $dispatch(new RegisterUser(['id' => $userId, 'name' => 'Alex', 'email' => 'member@getprooph.org'])); + showResult($result); -$result = $dispatch(new ChangeUserName(['id' => '1', 'name' => 'Sascha'])); -showResult($result); + $result = yield $dispatch(new ChangeUserName(['id' => $userId, 'name' => 'Sascha'])); + showResult($result); -// a TypeError -$result = $dispatch(new InvalidCommand()); -showResult($result); + // unknown command + $result = yield $dispatch(new UnknownCommand()); + showResult($result); -// unknown command -$result = $dispatch(new UnknownCommand()); -showResult($result); + $time = \microtime(true) - $start; -$time = microtime(true) - $start; + echo $time . "secs runtime\n\n"; -echo $time . "secs runtime\n\n"; + $connection->close(); +}); diff --git a/examples/user_snapshotter.php b/examples/user_snapshotter.php deleted file mode 100644 index 6aee9b6..0000000 --- a/examples/user_snapshotter.php +++ /dev/null @@ -1,45 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\MicroExample\Script; - -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Projection\ProjectionManager; -use Prooph\Micro\SnapshotReadModel; -use Prooph\MicroExample\Infrastructure\UserAggregateDefinition; - -$autoloader = require __DIR__ . '/../vendor/autoload.php'; -$autoloader->addPsr4('Prooph\\MicroExample\\', __DIR__); -require 'Model/User.php'; - -//We could also use a container here, if dependencies grow -$factories = include 'Infrastructure/factories.php'; - -/* @var ProjectionManager $projectionManager */ -$projectionManager = $factories['projectionManager'](); - -$readModel = new SnapshotReadModel( - $factories['snapshotStore'](), - new UserAggregateDefinition() -); - -$projection = $projectionManager->createReadModelProjection( - 'user_snapshots', - $readModel -); - -$projection - ->fromStream('user_stream') - ->whenAny(function ($state, Message $event): void { - $this->readModel()->stack('replay', $event); - }) - ->run(); diff --git a/src/AbstractAggregateDefinition.php b/src/AbstractAggregateDefinition.php deleted file mode 100644 index 5f7c1fb..0000000 --- a/src/AbstractAggregateDefinition.php +++ /dev/null @@ -1,127 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\Micro; - -use Iterator; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\Metadata\Operator; -use RuntimeException; - -abstract class AbstractAggregateDefinition implements AggregateDefinition -{ - public function identifierName(): string - { - return 'id'; - } - - public function versionName(): string - { - return 'version'; - } - - public function extractAggregateId(Message $message): string - { - $idKey = $this->identifierName(); - - $payload = $message->payload(); - - if (! array_key_exists($idKey, $payload)) { - throw new RuntimeException(sprintf( - 'Missing aggregate id key "%s" in payload of message %s. Payload was %s', - $idKey, - $message->messageName(), - json_encode($payload) - )); - } - - return $payload[$idKey]; - } - - public function extractAggregateVersion(Message $message): int - { - $versionKey = $this->versionName(); - - $metadata = $message->metadata(); - - if (! array_key_exists($versionKey, $metadata)) { - throw new RuntimeException(sprintf( - 'Missing aggregate version key "%s" in metadata of message %s. Metadata was %s', - $versionKey, - $message->messageName(), - json_encode($metadata) - )); - } - - return $metadata[$versionKey]; - } - - public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher - { - // if one stream per aggregate is used, you can simply return null instead - - return (new MetadataMatcher()) - ->withMetadataMatch('_aggregate_id', Operator::EQUALS(), $aggregateId) - - // this is only required when using a single stream for all aggregates - ->withMetadataMatch('_aggregate_type', Operator::EQUALS(), $this->aggregateType()) - - // this is only required when using one stream per aggregate type - ->withMetadataMatch('_aggregate_version', Operator::GREATER_THAN_EQUALS(), $aggregateVersion); - } - - public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher - { - return new class($aggregateId, $this->aggregateType(), $aggregateVersion) implements MetadataEnricher { - private $aggregateId; - private $aggregateType; - private $aggregateVersion; - - public function __construct(string $aggregateId, string $aggregateType, int $aggregateVersion) - { - $this->aggregateId = $aggregateId; - $this->aggregateType = $aggregateType; - $this->aggregateVersion = $aggregateVersion; - } - - public function enrich(Message $message): Message - { - $message = $message->withAddedMetadata('_aggregate_id', $this->aggregateId); - - // this is only required when using a single stream for all aggregates - $message = $message->withAddedMetadata('_aggregate_type', $this->aggregateType); - - // this is only required when using one stream per aggregate type - $message = $message->withAddedMetadata('_aggregate_version', $this->aggregateVersion); - - return $message; - } - }; - } - - public function reconstituteState($state, Iterator $events) - { - return $this->apply($state, ...$events); - } - - public function hasOneStreamPerAggregate(): bool - { - return false; - } - - public function stateType(): string - { - return 'array'; - } -} diff --git a/src/AggregateDefinition.php b/src/AggregateDefinition.php deleted file mode 100644 index 5e864ab..0000000 --- a/src/AggregateDefinition.php +++ /dev/null @@ -1,52 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\Micro; - -use Iterator; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\StreamName; - -interface AggregateDefinition -{ - public function aggregateType(): string; - - /** - * Returns the key in message payload to identify the aggregate id - */ - public function identifierName(): string; - - /** - * Returns the key in message metadata to identify version number - */ - public function versionName(): string; - - public function extractAggregateId(Message $message): string; - - public function extractAggregateVersion(Message $message): int; - - public function streamName(): StreamName; - - public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher; - - public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher; - - public function hasOneStreamPerAggregate(): bool; - - public function reconstituteState($state, Iterator $events); - - public function apply($state, Message ...$events); - - public function stateType(): string; -} diff --git a/src/CommandSpecification.php b/src/CommandSpecification.php new file mode 100644 index 0000000..007f296 --- /dev/null +++ b/src/CommandSpecification.php @@ -0,0 +1,68 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace Prooph\Micro; + +use Amp\Promise; +use Closure; +use Phunkie\Types\ImmList; +use Prooph\EventStore\EventData; +use Prooph\EventStore\ExpectedVersion; +use Prooph\EventStore\ResolvedEvent; + +abstract class CommandSpecification +{ + protected object $command; + protected Closure $handler; + + public function __construct(object $command, callable $handler) + { + $this->command = $command; + $this->handler = Closure::fromCallable($handler); + } + + public function handle(Closure $stateResolver): Promise + { + return ($this->handler)($stateResolver, $this->command); + } + + /** + * @param ImmList $events + * @return mixed + */ + public function reconstituteFromHistory(ImmList $events) + { + return $this->apply($this->initialState(), $events->map(fn ($e) => $this->mapToEvent($e))); + } + + public function expectedVersion(): int + { + return ExpectedVersion::ANY; + } + + abstract public function mapToEventData(object $event): EventData; + + abstract public function mapToEvent(ResolvedEvent $resolvedEvent): object; + + /** @return mixed */ + abstract public function initialState(); + + abstract public function streamName(): string; + + /** + * @param mixed $initialState + * @param ImmList $events + * @return mixed + */ + abstract public function apply($initialState, ImmList $events); +} diff --git a/src/Kernel.php b/src/Kernel.php index b290876..93895ea 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -1,8 +1,9 @@ - * (c) 2017-2017 Sascha-Oliver Prolic + * (c) 2017-2020 prooph software GmbH + * (c) 2017-2020 Sascha-Oliver Prolic * * For the full copyright and license information, please view the LICENSE * file that was distributed with this source code. @@ -12,242 +13,78 @@ namespace Prooph\Micro\Kernel; -use EmptyIterator; -use Iterator; -use Phunkie\Types\ImmList; -use Phunkie\Types\Kind; -use Phunkie\Validation\Validation; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\EventStore; -use Prooph\EventStore\Stream; -use Prooph\EventStore\StreamName; -use Prooph\EventStore\TransactionalEventStore; -use Prooph\Micro\AggregateDefinition; -use Prooph\SnapshotStore\Snapshot; -use Prooph\SnapshotStore\SnapshotStore; -use RuntimeException; -use function Phunkie\Functions\function1\compose; +use function Amp\call; +use Amp\Promise; +use Closure; +use function Failure; +use Generator; +use Phunkie\Types\ImmMap; +use Prooph\EventStore\Async\EventStoreConnection; +use Prooph\EventStore\SliceReadStatus; +use Prooph\EventStore\StreamEventsSlice; +use Prooph\Micro\CommandSpecification; +use function Success; + +const buildCommandDispatcher = 'Prooph\Micro\Kernel\buildCommandDispatcher'; -const buildCommandDispatcher = '\Prooph\Micro\Kernel\buildCommandDispatcher'; -/** - * builds a dispatcher and returns a function that receives a messages and returns Success | Failure - * - * usage: - * $dispatch = buildDispatcher($eventStore, $commandMap, $snapshotStore, $publisher); - * $attempt = $dispatch($message); - * - * $commandMap is expected to be an array like this: - * [ - * RegisterUser::class => [ - * 'handler' => function (array $state, Message $message) use (&$factories): AggregateResult { - * return \Prooph\MicroExample\Model\User\registerUser($state, $message, $factories['emailGuard']()); - * }, - * 'definition' => UserAggregateDefinition::class, - * ], - * ChangeUserName::class => [ - * 'handler' => '\Prooph\MicroExample\Model\User\changeUserName', - * 'definition' => UserAggregateDefinition::class, - * ], - * ] - * $message is expected to be an instance of Prooph\Common\Messaging\Message - */ function buildCommandDispatcher( - EventStore $eventStore, - array $commandMap, - SnapshotStore $snapshotStore = null, - callable $publisher = null + EventStoreConnection $eventStore, + ImmMap $commandMap ): callable { - return function (Message $message) use ($eventStore, $snapshotStore, $commandMap, $publisher): Validation { - try { - $definition = getAggregateDefinition($message, $commandMap); - $aggregateId = $definition->extractAggregateId($message); - } catch (\Throwable $e) { - return Failure($e); - } - - $lastVersion = 0; - - $stateResolver = function () use ($message, $definition, $eventStore, $snapshotStore, $aggregateId, &$lastVersion) { - $snapshot = loadSnapshot($message, $definition, $snapshotStore); + return function (object $m) use ($eventStore, $commandMap): Promise { + return call(function () use ($m, $eventStore, $commandMap): Generator { + $messageClass = \get_class($m); + $config = $commandMap->get($messageClass); - $nextVersion = 1; - $state = null; - - if (null !== $snapshot) { - $nextVersion = $snapshot->lastVersion() + 1; - $state = $snapshot->aggregateRoot(); - } - - $events = loadEvents($eventStore, $definition, $aggregateId, $nextVersion); - $lastEvent = end($events); - - if (false !== $lastEvent) { - $lastVersion = $definition->extractAggregateVersion($lastEvent); + if ($config->isEmpty()) { + return Failure('No configuration found for ' . $messageClass); } - return $definition->reconstituteState($state, $events); - }; + $specification = $config->get()($m); + \assert($specification instanceof CommandSpecification); - $handleCommand = function (Message $message) use ($stateResolver, $commandMap): ImmList { - $handler = getHandler($message, $commandMap); + try { + $es = yield $specification->handle(stateResolver($eventStore, $specification)); - $events = $handler($stateResolver, $message); - - if (! is_array($events)) { - throw new \RuntimeException('The command handler did not return an array'); + yield $eventStore->appendToStreamAsync( + $specification->streamName(), + $specification->expectedVersion(), + $es->map(fn ($e) => $specification->mapToEventData($e))->toArray() + ); + } catch (\Throwable $e) { + return Failure($e); } - return ImmList(...$events); - }; - - $enrichEvents = function (ImmList $events) use ($message, $definition, $aggregateId, &$lastVersion): Kind { - $enricher = getEnricherFor($definition, $aggregateId, $message, $lastVersion); - - return $events->map($enricher); - }; - - $persistEvents = function (ImmList $events) use ($eventStore, $definition, $message, $aggregateId): Kind { - return persistEvents($events, $eventStore, $definition, $aggregateId); - }; - - $publishEvents = function (ImmList $events) use ($publisher): Kind { - if ($events->isEmpty() || null === $publisher) { - return $events; - } - - return $events->map($publisher); - }; - - $pipe = function () use ($message, $handleCommand, $enrichEvents, $persistEvents, $publishEvents) { - return compose( - $handleCommand, - $enrichEvents, - $persistEvents, - $publishEvents - )($message); - }; - - return Attempt($pipe); + return Success($es); + }); }; } -const loadSnapshot = '\Prooph\Micro\Kernel\loadSnapshot'; +const stateResolver = 'Prooph\Micro\Kernel\stateResolver'; -function loadSnapshot(Message $message, AggregateDefinition $definition, SnapshotStore $snapshotStore = null): ?Snapshot +function stateResolver(EventStoreConnection $eventStore, CommandSpecification $specification): Closure { - if (null === $snapshotStore) { - return null; - } - - return $snapshotStore->get($definition->aggregateType(), $definition->extractAggregateId($message)); -} - -const loadEvents = '\Prooph\Micro\Kernel\loadEvents'; - -function loadEvents( - EventStore $eventStore, - AggregateDefinition $definition, - string $aggregateId, - int $nextVersion -): Iterator { - $streamName = $definition->streamName(); - $metadataMatcher = $definition->metadataMatcher($aggregateId, $nextVersion); - - if (! $eventStore->hasStream($streamName)) { - return new EmptyIterator(); - } - - if ($definition->hasOneStreamPerAggregate()) { - // append aggregate id to stream name - $streamName = new StreamName($streamName->toString() . '-' . $aggregateId); - } - - return $eventStore->load($streamName, $nextVersion, null, $metadataMatcher); -} - -const getEnricherFor = '\Prooph\Micro\Kernel\getEnricherFor'; - -function getEnricherFor(AggregateDefinition $definition, string $aggregateId, Message $message, int &$lastVersion): callable -{ - return function (Message $event) use ($definition, $aggregateId, $message, &$lastVersion): Message { - $metadataEnricher = $definition->metadataEnricher($aggregateId, ++$lastVersion, $message); - - if (null !== $metadataEnricher) { - $event = $metadataEnricher->enrich($event); - } - - return $event; + return function () use ($eventStore, $specification): Promise { + return call(function () use ($eventStore, $specification): Generator { + $slice = yield $eventStore->readStreamEventsForwardAsync( + $specification->streamName(), + 0, + 4096, + ); + + \assert($slice instanceof StreamEventsSlice); + + switch ($slice->status()->value()) { + case SliceReadStatus::SUCCESS: + return $specification->reconstituteFromHistory(ImmList(...$slice->events())); + break; + case SliceReadStatus::STREAM_NOT_FOUND: + return null; + break; + case SliceReadStatus::STREAM_DELETED: + throw new \RuntimeException('Stream deleted'); + break; + } + }); }; } - -const persistEvents = '\Prooph\Micro\Kernel\persistEvents'; - -function persistEvents( - ImmList $events, - EventStore $eventStore, - AggregateDefinition $definition, - string $aggregateId -): Kind { - if ($events->isEmpty()) { - return $events; - } - - $streamName = $definition->streamName(); - - if ($definition->hasOneStreamPerAggregate()) { - $streamName = new StreamName($streamName->toString() . '-' . $aggregateId); // append aggregate id to stream name - } - - if ($eventStore instanceof TransactionalEventStore) { - $eventStore->beginTransaction(); - } - - try { - if ($eventStore->hasStream($streamName)) { - $eventStore->appendTo($streamName, $events->iterator()); - } else { - $eventStore->create(new Stream($streamName, $events->iterator())); - } - } catch (\Throwable $e) { - if ($eventStore instanceof TransactionalEventStore) { - $eventStore->rollback(); - } - - throw $e; - } - - if ($eventStore instanceof TransactionalEventStore) { - $eventStore->commit(); - } - - return $events; -} - -const getHandler = '\Prooph\Micro\Kernel\getHandler'; - -function getHandler(Message $m, array $c): callable -{ - $n = $m->messageName(); - - if (! array_key_exists($n, $c)) { - throw new RuntimeException(sprintf( - 'Unknown message "%s". Message name not mapped to an aggregate.', - $n - )); - } - - return $c[$n]['handler']; -} - -const getAggregateDefinition = '\Prooph\Micro\Kernel\getAggregateDefinition'; - -function getAggregateDefinition(Message $m, array $c): AggregateDefinition -{ - $n = $m->messageName(); - - if (! isset($c[$m->messageName()])) { - throw new RuntimeException(sprintf('Unknown message %s. Message name not mapped to an aggregate.', $n)); - } - - return new $c[$n]['definition'](); -} diff --git a/src/SnapshotReadModel.php b/src/SnapshotReadModel.php deleted file mode 100644 index a4a496e..0000000 --- a/src/SnapshotReadModel.php +++ /dev/null @@ -1,114 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace Prooph\Micro; - -use BadMethodCallException; -use DateTimeImmutable; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Projection\ReadModel; -use Prooph\SnapshotStore\Snapshot; -use Prooph\SnapshotStore\SnapshotStore; -use RuntimeException; - -final class SnapshotReadModel implements ReadModel -{ - /** - * @var SnapshotStore - */ - private $snapshotStore; - - /** - * @var AggregateDefinition - */ - private $aggregateDefinition; - - /** - * @var array - */ - private $cache = []; - - public function __construct( - SnapshotStore $snapshotStore, - AggregateDefinition $aggregateDefiniton - ) { - $this->snapshotStore = $snapshotStore; - $this->aggregateDefinition = $aggregateDefiniton; - } - - public function stack(string $operation, ...$events): void - { - foreach ($events as $event) { - if (! $event instanceof Message) { - throw new RuntimeException(get_class($this) . ' can only handle events of type ' . Message::class); - } - - $aggregateId = $this->aggregateDefinition->extractAggregateId($event); - - if (! isset($this->cache[$aggregateId])) { - $snapshot = $this->snapshotStore->get( - $this->aggregateDefinition->aggregateType(), - $aggregateId - ); - - if (! $snapshot) { - $state = []; - } else { - $state = $snapshot->aggregateRoot(); - } - } else { - $state = $this->cache[$aggregateId]; - } - - $this->cache[$aggregateId] = $this->aggregateDefinition->apply($state, $event); - } - } - - public function persist(): void - { - $snapshots = []; - - foreach ($this->cache as $aggregateId => $state) { - $snapshots[] = new Snapshot( - $this->aggregateDefinition->aggregateType(), - $aggregateId, - $state, - $state[$this->aggregateDefinition->versionName()], - new DateTimeImmutable('now', new \DateTimeZone('UTC')) - ); - } - - $this->snapshotStore->save(...$snapshots); - - $this->cache = []; - } - - public function init(): void - { - throw new BadMethodCallException('Initializing a snapshot read model is not supported'); - } - - public function isInitialized(): bool - { - return true; - } - - public function reset(): void - { - throw new BadMethodCallException('Resetting a snapshot read model is not supported'); - } - - public function delete(): void - { - throw new BadMethodCallException('Deleting a snapshot read model is not supported'); - } -} diff --git a/tests/AbstractAggregateDefinitionTest.php b/tests/AbstractAggregateDefinitionTest.php deleted file mode 100644 index 376003d..0000000 --- a/tests/AbstractAggregateDefinitionTest.php +++ /dev/null @@ -1,187 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro; - -use PHPUnit\Framework\TestCase; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\FieldType; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\Metadata\Operator; -use Prooph\EventStore\StreamName; -use Prooph\Micro\AbstractAggregateDefinition; - -class AbstractAggregateDefinitionTest extends TestCase -{ - /** - * @test - */ - public function it_returns_identifier_and_version_name(): void - { - $this->assertEquals('id', $this->createDefinition()->identifierName()); - $this->assertEquals('version', $this->createDefinition()->versionName()); - } - - /** - * @test - */ - public function it_extracts_aggregate_id(): void - { - $message = $this->prophesize(Message::class); - $message->payload()->willReturn(['id' => 'some_id'])->shouldBeCalled(); - - $this->assertEquals('some_id', $this->createDefinition()->extractAggregateId($message->reveal())); - } - - /** - * @test - */ - public function it_throws_exception_when_no_id_property_found_during_extraction(): void - { - $this->expectException(\RuntimeException::class); - - $message = $this->prophesize(Message::class); - $message->payload()->willReturn([])->shouldBeCalled(); - - $this->createDefinition()->extractAggregateId($message->reveal()); - } - - /** - * @test - */ - public function it_extracts_aggregate_version(): void - { - $message = $this->prophesize(Message::class); - $message->metadata()->willReturn(['version' => 5])->shouldBeCalled(); - - $this->assertEquals(5, $this->createDefinition()->extractAggregateVersion($message->reveal())); - } - - /** - * @test - */ - public function it_throws_exception_when_no_version_property_found_during_extraction(): void - { - $this->expectException(\RuntimeException::class); - - $message = $this->prophesize(Message::class); - $message->metadata()->willReturn([])->shouldBeCalled(); - - $this->createDefinition()->extractAggregateVersion($message->reveal()); - } - - /** - * @test - */ - public function it_returns_metadata_matcher(): void - { - $metadataMatcher = $this->createDefinition()->metadataMatcher('some_id', 5); - - $this->assertInstanceOf(MetadataMatcher::class, $metadataMatcher); - - $this->assertEquals( - [ - [ - 'field' => '_aggregate_id', - 'operator' => Operator::EQUALS(), - 'value' => 'some_id', - 'fieldType' => FieldType::METADATA(), - ], - [ - 'field' => '_aggregate_type', - 'operator' => Operator::EQUALS(), - 'value' => 'foo', - 'fieldType' => FieldType::METADATA(), - ], - [ - 'field' => '_aggregate_version', - 'operator' => Operator::GREATER_THAN_EQUALS(), - 'value' => 5, - 'fieldType' => FieldType::METADATA(), - ], - ], - $metadataMatcher->data() - ); - } - - /** - * @test - */ - public function it_returns_metadata_enricher(): void - { - $enricher = $this->createDefinition()->metadataEnricher('some_id', 42); - - $this->assertInstanceOf(MetadataEnricher::class, $enricher); - - $enrichedMessage = $this->prophesize(Message::class); - $enrichedMessage->withAddedMetadata('_aggregate_type', 'foo')->willReturn($enrichedMessage)->shouldBeCalled(); - $enrichedMessage->withAddedMetadata('_aggregate_version', 42)->willReturn($enrichedMessage)->shouldBeCalled(); - $enrichedMessage = $enrichedMessage->reveal(); - - $message = $this->prophesize(Message::class); - $message->withAddedMetadata('_aggregate_id', 'some_id')->willReturn($enrichedMessage)->shouldBeCalled(); - - $result = $enricher->enrich($message->reveal()); - - $this->assertSame($enrichedMessage, $result); - } - - /** - * @test - */ - public function it_reconstitutes_state(): void - { - $message = $this->prophesize(Message::class); - - $state = $this->createDefinition()->reconstituteState([], new \ArrayIterator([$message->reveal()])); - - $this->assertArrayHasKey('count', $state); - $this->assertEquals(1, $state['count']); - } - - /** - * @test - */ - public function it_has_not_one_stream_per_aggregate_as_default(): void - { - $this->assertFalse($this->createDefinition()->hasOneStreamPerAggregate()); - } - - public function createDefinition(): AbstractAggregateDefinition - { - return new class() extends AbstractAggregateDefinition { - public function streamName(): StreamName - { - return new StreamName('foo'); - } - - public function aggregateType(): string - { - return 'foo'; - } - - public function apply($state, Message ...$events): array - { - if (! isset($state['count'])) { - $state['count'] = 0; - } - - foreach ($events as $event) { - ++$state['count']; - } - - return $state; - } - }; - } -} diff --git a/tests/KernelTest.php b/tests/KernelTest.php deleted file mode 100644 index b71930c..0000000 --- a/tests/KernelTest.php +++ /dev/null @@ -1,528 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro; - -use PHPUnit\Framework\TestCase; -use Phunkie\Validation\Failure; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\EventStore; -use Prooph\EventStore\InMemoryEventStore; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Stream; -use Prooph\EventStore\StreamName; -use Prooph\EventStore\TransactionalEventStore; -use Prooph\Micro\AggregateDefinition; -use Prooph\Micro\Kernel as k; -use Prooph\SnapshotStore\InMemorySnapshotStore; -use Prooph\SnapshotStore\Snapshot; -use Prooph\SnapshotStore\SnapshotStore; -use ProophTest\Micro\TestAsset\OneStreamPerAggregateTestAggregateDefinition; -use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition; -use ProophTest\Micro\TestAsset\SingleStreamTestAggregateDefinition2; -use ProophTest\Micro\TestAsset\TestDomainEvent; -use ProophTest\Micro\TestAsset\TestState; -use Prophecy\Argument; - -class KernelTest extends TestCase -{ - /** - * @test - */ - public function it_builds_command_dispatcher_and_dispatches(): void - { - $commandMap = [ - 'some_command' => [ - 'handler' => function (callable $stateResolver, Message $message): array { - $stateResolver(); - - return [new TestDomainEvent(['foo' => 'bar'])]; - }, - 'definition' => SingleStreamTestAggregateDefinition::class, - ], - ]; - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, new InMemorySnapshotStore()); - - $command = $this->prophesize(Message::class); - $command->messageName()->willReturn('some_command')->shouldBeCalled(); - - $validation = $dispatch($command->reveal()); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_builds_command_dispatcher_and_dispatches_with_state_coming_from_snapshot_store(): void - { - $commandMap = [ - 'some_command' => [ - 'handler' => function (callable $stateResolver, Message $message): array { - $stateResolver(); - - return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; - }, - 'definition' => SingleStreamTestAggregateDefinition::class, - ], - ]; - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $snapshotStore = new InMemorySnapshotStore(); - $snapshotStore->save(new Snapshot('test', 'some_id', ['version' => 4], 4, new \DateTimeImmutable())); - - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); - - $command = $this->prophesize(Message::class); - $command->messageName()->willReturn('some_command')->shouldBeCalled(); - - $validation = $dispatch($command->reveal()); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_builds_command_dispatcher_and_dispatches_with_object_state_coming_from_snapshot_store(): void - { - $commandMap = [ - 'some_command' => [ - 'handler' => function (callable $stateResolver, Message $message): array { - $stateResolver(); - - return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; - }, - 'definition' => SingleStreamTestAggregateDefinition2::class, - ], - ]; - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 5, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $snapshotStore = new InMemorySnapshotStore(); - $snapshotStore->save(new Snapshot('test', 'some_id', new TestState(), 4, new \DateTimeImmutable())); - - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); - - $command = $this->prophesize(Message::class); - $command->messageName()->willReturn('some_command')->shouldBeCalled(); - - $validation = $dispatch($command->reveal()); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_builds_command_dispatcher_and_dispatches_without_object_state_coming_from_snapshot_store(): void - { - $commandMap = [ - 'some_command' => [ - 'handler' => function (callable $stateResolver, Message $message): array { - $stateResolver(); - - return [new TestDomainEvent(['foo' => 'bar', 'id' => 'some_id'])]; - }, - 'definition' => SingleStreamTestAggregateDefinition2::class, - ], - ]; - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $snapshotStore = new InMemorySnapshotStore(); - - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, $snapshotStore); - - $command = $this->prophesize(Message::class); - $command->messageName()->willReturn('some_command')->shouldBeCalled(); - - $validation = $dispatch($command->reveal()); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_does_not_load_events_when_state_is_not_resolved(): void - { - $commandMap = [ - 'some_command' => [ - 'handler' => function (callable $stateResolver, Message $message): array { - return [new TestDomainEvent(['foo' => 'bar'])]; - }, - 'definition' => SingleStreamTestAggregateDefinition::class, - ], - ]; - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream('foo')->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldNotBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $dispatch = \Prooph\Micro\Kernel\buildCommandDispatcher($eventStore->reveal(), $commandMap, new InMemorySnapshotStore()); - - $command = $this->prophesize(Message::class); - $command->messageName()->willReturn('some_command')->shouldBeCalled(); - - $validation = $dispatch($command->reveal()); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_loads_snapshot_from_empty_snapshot_store(): void - { - $snapshotStore = $this->prophesize(SnapshotStore::class); - - $message = $this->prophesize(Message::class); - $message = $message->reveal(); - - $definition = $this->prophesize(AggregateDefinition::class); - $definition->aggregateType()->willReturn('test')->shouldBeCalled(); - $definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled(); - - $result = k\loadSnapshot($message, $definition->reveal(), $snapshotStore->reveal()); - - $this->assertNull($result); - } - - /** - * @test - */ - public function it_loads_snapshot_from_snapshot_store(): void - { - $snapshotStore = new InMemorySnapshotStore(); - $snapshotStore->save(new Snapshot( - 'test', - '42', - ['foo' => 'bar'], - 1, - new \DateTimeImmutable() - )); - - $message = $this->prophesize(Message::class); - $message = $message->reveal(); - - $definition = $this->prophesize(AggregateDefinition::class); - $definition->aggregateType()->willReturn('test')->shouldBeCalled(); - $definition->extractAggregateId($message)->willReturn('42')->shouldBeCalled(); - - $result = k\loadSnapshot($message, $definition->reveal(), $snapshotStore); - $this->assertInstanceOf(Snapshot::class, $result); - $this->assertEquals(['foo' => 'bar'], $result->aggregateRoot()); - } - - /** - * @test - */ - public function it_returns_early_when_loading_snapshot_and_no_snapshot_store_given(): void - { - $message = $this->prophesize(Message::class); - $message = $message->reveal(); - - $definition = $this->prophesize(AggregateDefinition::class); - - $result = k\loadSnapshot($message, $definition->reveal(), null); - $this->assertNull($result); - } - - /** - * @test - */ - public function it_loads_events_when_stream_not_found(): void - { - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream(new StreamName('foo'))->willReturn(false)->shouldBeCalled(); - - $result = k\loadEvents($eventStore->reveal(), new SingleStreamTestAggregateDefinition(), 'foo', 1); - - $this->assertInstanceOf(\EmptyIterator::class, $result); - } - - /** - * @test - */ - public function it_loads_events_when_stream_found(): void - { - $streamName = new StreamName('foo'); - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream($streamName)->willReturn(true)->shouldBeCalled(); - $eventStore->load($streamName, 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - - $result = k\loadEvents($eventStore->reveal(), new SingleStreamTestAggregateDefinition(), 'foo', 1); - - $this->assertInstanceOf(\ArrayIterator::class, $result); - } - - /** - * @test - */ - public function it_loads_events_when_stream_found_using_one_stream_per_aggregate(): void - { - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream(Argument::type(StreamName::class))->willReturn(true)->shouldBeCalled(); - $eventStore->load(Argument::type(StreamName::class), 1, null, null)->willReturn(new \ArrayIterator())->shouldBeCalled(); - - $result = k\loadEvents($eventStore->reveal(), new OneStreamPerAggregateTestAggregateDefinition(), 'foo', 1); - - $this->assertInstanceOf(\ArrayIterator::class, $result); - } - - /** - * @test - */ - public function it_appends_to_stream_during_persist_when_stream_found(): void - { - $streamName = new StreamName('foo'); - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream($streamName)->willReturn(true)->shouldBeCalled(); - $eventStore->appendTo($streamName, Argument::type(\Iterator::class))->shouldBeCalled(); - - $message = $this->prophesize(Message::class); - $raisedEvents = [$message->reveal()]; - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn(new StreamName('foo'))->shouldBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(false)->shouldBeCalled(); - - $validation = k\persistEvents(ImmList(...$raisedEvents), $eventStore->reveal(), $aggregateDefinition->reveal(), 'some_id'); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_returns_early_on_persist_when_list_is_empty(): void - { - $streamName = new StreamName('foo'); - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream($streamName)->willReturn(true)->shouldNotBeCalled(); - $eventStore->appendTo($streamName, Argument::type(\Iterator::class))->shouldNotBeCalled(); - - $raisedEvents = []; - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn(new StreamName('foo'))->shouldNotBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(false)->shouldNotBeCalled(); - - $validation = k\persistEvents(ImmList(...$raisedEvents), $eventStore->reveal(), $aggregateDefinition->reveal(), 'some_id'); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_persist_transactional_when_using_transactional_event_store(): void - { - $message = $this->prophesize(Message::class); - $raisedEvents = [$message->reveal()]; - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn(new StreamName('foo'))->shouldBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(false)->shouldBeCalled(); - - $validation = k\persistEvents(ImmList(...$raisedEvents), new InMemoryEventStore(), $aggregateDefinition->reveal(), 'some_id'); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_rolls_back_transaction_during_persist_when_using_transactional_event_store(): void - { - $this->expectException(\Exception::class); - - $eventStore = $this->prophesize(TransactionalEventStore::class); - - $eventStore->beginTransaction()->shouldBeCalledTimes(2); - $eventStore->commit()->shouldBeCalledTimes(1); - $eventStore->rollback()->shouldBeCalledTimes(1); - $eventStore->hasStream(Argument::any())->willReturn(false, true)->shouldBeCalledTimes(2); - $eventStore->create(Argument::any())->shouldBeCalledTimes(1); - $eventStore->appendTo(Argument::any(), Argument::any())->willThrow(\Exception::class)->shouldBeCalledTimes(1); - $eventStore = $eventStore->reveal(); - - $message = $this->prophesize(Message::class); - $message->metadata()->willReturn([]); - $raisedEvents = [$message->reveal()]; - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn(new StreamName('foo'))->shouldBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(true)->shouldBeCalled(); - - k\persistEvents(ImmList(...$raisedEvents), $eventStore, $aggregateDefinition->reveal(), 'one'); - k\persistEvents(ImmList(...$raisedEvents), $eventStore, $aggregateDefinition->reveal(), 'one'); - } - - /** - * @test - */ - public function it_appends_to_stream_during_persist_when_stream_found_using_one_stream_per_aggregate(): void - { - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream(Argument::type(StreamName::class))->willReturn(true)->shouldBeCalled(); - $eventStore->appendTo(Argument::type(StreamName::class), Argument::type(\Iterator::class))->shouldBeCalled(); - - $message = $this->prophesize(Message::class); - $raisedEvents = [$message->reveal()]; - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn(new StreamName('foo'))->shouldBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(true)->shouldBeCalled(); - - $validation = k\persistEvents(ImmList(...$raisedEvents), $eventStore->reveal(), $aggregateDefinition->reveal(), 'some_id'); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_creates_stream_during_persist_when_no_stream_found_and_enriches_with_metadata(): void - { - $finalMessage = $this->prophesize(Message::class); - - $message = $this->prophesize(Message::class); - $message->withAddedMetadata('key', 'value')->willReturn($finalMessage->reveal())->shouldBeCalled(); - - $streamName = $this->prophesize(StreamName::class); - $streamName = $streamName->reveal(); - - $eventStore = $this->prophesize(EventStore::class); - $eventStore->hasStream($streamName)->willReturn(false)->shouldBeCalled(); - $eventStore->create(Argument::type(Stream::class))->shouldBeCalled(); - - $aggregateDefinition = $this->prophesize(AggregateDefinition::class); - $aggregateDefinition->streamName()->willReturn($streamName)->shouldBeCalled(); - $aggregateDefinition->hasOneStreamPerAggregate()->willReturn(false)->shouldBeCalled(); - $aggregateDefinition->metadataEnricher('some_id', 1, $message)->willReturn(new class() implements MetadataEnricher { - public function enrich(Message $message): Message - { - return $message->withAddedMetadata('key', 'value'); - } - } - )->shouldBeCalled(); - $aggregateDefinition = $aggregateDefinition->reveal(); - - $message = $message->reveal(); - - $events = ImmList($message); - - $lastVersion = 0; - - $enricher = k\getEnricherFor($aggregateDefinition, 'some_id', $message, $lastVersion); - - $events = $events->map($enricher); - - $validation = k\persistEvents($events, $eventStore->reveal(), $aggregateDefinition, 'some_id'); - - $this->assertEquals(1, $lastVersion); - - if ($validation instanceof Failure) { - $this->fail($validation->toString()); - } - } - - /** - * @test - */ - public function it_gets_handler(): void - { - $message = $this->prophesize(Message::class); - $message->messageName()->willReturn('foo')->shouldBeCalled(); - - $commandMap = [ - 'foo' => [ - 'handler' => function (): void { - }, - 'definition' => SingleStreamTestAggregateDefinition::class, - ], - ]; - - $result = k\getHandler($message->reveal(), $commandMap); - - $this->assertInstanceOf(\Closure::class, $result); - } - - /** - * @test - */ - public function it_throws_exception_when_no_handler_found(): void - { - $this->expectException(\RuntimeException::class); - - $message = $this->prophesize(Message::class); - $message->messageName()->willReturn('unknown')->shouldBeCalled(); - - $commandMap = ['foo' => ['handler' => function (): void { - }]]; - - k\getHandler($message->reveal(), $commandMap); - } - - /** - * @test - */ - public function it_throws_exception_when_no_definition_found(): void - { - $this->expectException(\RuntimeException::class); - - $message = $this->prophesize(Message::class); - $message->messageName()->willReturn('bar')->shouldBeCalled(); - - $commandMap = []; - - k\getAggregateDefinition($message->reveal(), $commandMap); - } -} diff --git a/tests/SnapshotReadModelTest.php b/tests/SnapshotReadModelTest.php deleted file mode 100644 index df0b1db..0000000 --- a/tests/SnapshotReadModelTest.php +++ /dev/null @@ -1,156 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro; - -use PHPUnit\Framework\TestCase; -use Prooph\Common\Messaging\Message; -use Prooph\Micro\AggregateDefinition; -use Prooph\Micro\SnapshotReadModel; -use Prooph\SnapshotStore\InMemorySnapshotStore; -use Prooph\SnapshotStore\Snapshot; -use Prooph\SnapshotStore\SnapshotStore; - -class SnapshotReadModelTest extends TestCase -{ - /** - * @test - */ - public function it_stacks_and_persists_when_snapshot_exists(): void - { - $message1 = $this->prophesize(Message::class); - $message1 = $message1->reveal(); - - $message2 = $this->prophesize(Message::class); - $message2 = $message2->reveal(); - - $snapshotStore = new InMemorySnapshotStore(); - $snapshotStore->save(new Snapshot('message', 'some_id', ['foo' => 'bar', 'version' => 1], 1, new \DateTimeImmutable())); - - $definition = $this->prophesize(AggregateDefinition::class); - $definition->versionName()->willReturn('version')->shouldBeCalled(); - $definition->extractAggregateId($message1)->willReturn('some_id')->shouldBeCalled(); - $definition->extractAggregateId($message2)->willReturn('some_id')->shouldBeCalled(); - $definition->aggregateType()->willReturn('message')->shouldBeCalled(); - $definition->apply(['foo' => 'bar', 'version' => 1], $message1)->willReturn(['foo' => 'baz', 'version' => 2])->shouldBeCalled(); - $definition->apply(['foo' => 'baz', 'version' => 2], $message2)->willReturn(['foo' => 'bam', 'version' => 3])->shouldBeCalled(); - - $readModel = new SnapshotReadModel($snapshotStore, $definition->reveal()); - - $readModel->stack('apply', $message1); - $readModel->stack('apply', $message2); - - $readModel->persist(); - } - - /** - * @test - */ - public function it_stacks_and_persists_when_snapshot_does_not_exist(): void - { - $message1 = $this->prophesize(Message::class); - $message1 = $message1->reveal(); - - $message2 = $this->prophesize(Message::class); - $message2 = $message2->reveal(); - - $snapshotStore = new InMemorySnapshotStore(); - - $definition = $this->prophesize(AggregateDefinition::class); - $definition->versionName()->willReturn('version')->shouldBeCalled(); - $definition->extractAggregateId($message1)->willReturn('some_id')->shouldBeCalled(); - $definition->extractAggregateId($message2)->willReturn('some_id')->shouldBeCalled(); - $definition->aggregateType()->willReturn('message')->shouldBeCalled(); - $definition->apply([], $message1)->willReturn(['foo' => 'bar', 'version' => 1])->shouldBeCalled(); - $definition->apply(['foo' => 'bar', 'version' => 1], $message2)->willReturn(['foo' => 'baz', 'version' => 2])->shouldBeCalled(); - - $readModel = new SnapshotReadModel($snapshotStore, $definition->reveal()); - - $readModel->stack('apply', $message1); - $readModel->stack('apply', $message2); - - $readModel->persist(); - } - - /** - * @test - */ - public function it_throws_exception_during_stack_when_invalid_event_type_given(): void - { - $this->expectException(\RuntimeException::class); - - $snapshotStore = $this->prophesize(SnapshotStore::class); - $definition = $this->prophesize(AggregateDefinition::class); - - $readModel = new SnapshotReadModel($snapshotStore->reveal(), $definition->reveal()); - - $readModel->stack('foo', 'bar'); - } - - /** - * @test - */ - public function it_is_already_initialized(): void - { - $snapshotStore = $this->prophesize(SnapshotStore::class); - $definition = $this->prophesize(AggregateDefinition::class); - - $readModel = new SnapshotReadModel($snapshotStore->reveal(), $definition->reveal()); - - $this->assertTrue($readModel->isInitialized()); - } - - /** - * @test - */ - public function it_cannot_init(): void - { - $this->expectException(\BadMethodCallException::class); - - $snapshotStore = $this->prophesize(SnapshotStore::class); - $definition = $this->prophesize(AggregateDefinition::class); - - $readModel = new SnapshotReadModel($snapshotStore->reveal(), $definition->reveal()); - - $readModel->init(); - } - - /** - * @test - */ - public function it_cannot_reset(): void - { - $this->expectException(\BadMethodCallException::class); - - $snapshotStore = $this->prophesize(SnapshotStore::class); - $definition = $this->prophesize(AggregateDefinition::class); - - $readModel = new SnapshotReadModel($snapshotStore->reveal(), $definition->reveal()); - - $readModel->reset(); - } - - /** - * @test - */ - public function it_cannot_delete(): void - { - $this->expectException(\BadMethodCallException::class); - - $snapshotStore = $this->prophesize(SnapshotStore::class); - $definition = $this->prophesize(AggregateDefinition::class); - - $readModel = new SnapshotReadModel($snapshotStore->reveal(), $definition->reveal()); - - $readModel->delete(); - } -} diff --git a/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php b/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php deleted file mode 100644 index 3a43565..0000000 --- a/tests/TestAsset/OneStreamPerAggregateTestAggregateDefinition.php +++ /dev/null @@ -1,83 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro\TestAsset; - -use Iterator; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\StreamName; -use Prooph\Micro\AggregateDefinition; - -final class OneStreamPerAggregateTestAggregateDefinition implements AggregateDefinition -{ - public function identifierName(): string - { - return 'id'; - } - - public function aggregateType(): string - { - return 'test'; - } - - public function versionName(): string - { - return 'version'; - } - - public function extractAggregateId(Message $message): string - { - return 'some_id'; - } - - public function extractAggregateVersion(Message $message): int - { - return 1; - } - - public function streamName(): StreamName - { - return new StreamName('foo'); - } - - public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher - { - return null; - } - - public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher - { - return null; - } - - public function reconstituteState($state, Iterator $events): array - { - return $state; - } - - public function apply($state, Message ...$events): array - { - return []; - } - - public function hasOneStreamPerAggregate(): bool - { - return true; - } - - public function stateType(): string - { - return 'array'; - } -} diff --git a/tests/TestAsset/SingleStreamTestAggregateDefinition.php b/tests/TestAsset/SingleStreamTestAggregateDefinition.php deleted file mode 100644 index af7d06a..0000000 --- a/tests/TestAsset/SingleStreamTestAggregateDefinition.php +++ /dev/null @@ -1,83 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro\TestAsset; - -use Iterator; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\StreamName; -use Prooph\Micro\AggregateDefinition; - -final class SingleStreamTestAggregateDefinition implements AggregateDefinition -{ - public function identifierName(): string - { - return 'id'; - } - - public function aggregateType(): string - { - return 'test'; - } - - public function versionName(): string - { - return 'version'; - } - - public function extractAggregateId(Message $message): string - { - return 'some_id'; - } - - public function extractAggregateVersion(Message $message): int - { - return 1; - } - - public function streamName(): StreamName - { - return new StreamName('foo'); - } - - public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher - { - return null; - } - - public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher - { - return null; - } - - public function reconstituteState($state, Iterator $events) - { - return $state; - } - - public function apply($state, Message ...$events) - { - return []; - } - - public function hasOneStreamPerAggregate(): bool - { - return false; - } - - public function stateType(): string - { - return 'array'; - } -} diff --git a/tests/TestAsset/SingleStreamTestAggregateDefinition2.php b/tests/TestAsset/SingleStreamTestAggregateDefinition2.php deleted file mode 100644 index bd02cd1..0000000 --- a/tests/TestAsset/SingleStreamTestAggregateDefinition2.php +++ /dev/null @@ -1,83 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro\TestAsset; - -use Iterator; -use Prooph\Common\Messaging\Message; -use Prooph\EventStore\Metadata\MetadataEnricher; -use Prooph\EventStore\Metadata\MetadataMatcher; -use Prooph\EventStore\StreamName; -use Prooph\Micro\AggregateDefinition; - -final class SingleStreamTestAggregateDefinition2 implements AggregateDefinition -{ - public function identifierName(): string - { - return 'id'; - } - - public function aggregateType(): string - { - return 'test'; - } - - public function versionName(): string - { - return 'version'; - } - - public function extractAggregateId(Message $message): string - { - return 'some_id'; - } - - public function extractAggregateVersion(Message $message): int - { - return 1; - } - - public function streamName(): StreamName - { - return new StreamName('foo'); - } - - public function metadataEnricher(string $aggregateId, int $aggregateVersion, Message $causation = null): ?MetadataEnricher - { - return null; - } - - public function metadataMatcher(string $aggregateId, int $aggregateVersion): ?MetadataMatcher - { - return null; - } - - public function reconstituteState($state, Iterator $events) - { - return $state; - } - - public function apply($state, Message ...$events) - { - return []; - } - - public function hasOneStreamPerAggregate(): bool - { - return false; - } - - public function stateType(): string - { - return TestState::class; - } -} diff --git a/tests/TestAsset/TestDomainEvent.php b/tests/TestAsset/TestDomainEvent.php deleted file mode 100644 index 09f8e7a..0000000 --- a/tests/TestAsset/TestDomainEvent.php +++ /dev/null @@ -1,47 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro\TestAsset; - -use Prooph\Common\Messaging\DomainEvent; -use Prooph\Common\Messaging\PayloadConstructable; -use Prooph\Common\Messaging\PayloadTrait; - -class TestDomainEvent extends DomainEvent implements PayloadConstructable -{ - use PayloadTrait; - - public static function with(array $payload, int $version): TestDomainEvent - { - $event = new static($payload); - - return $event->withVersion($version); - } - - public static function withPayloadAndSpecifiedCreatedAt(array $payload, int $version, \DateTimeImmutable $createdAt): TestDomainEvent - { - $event = new static($payload); - $event->createdAt = $createdAt; - - return $event->withVersion($version); - } - - public function withVersion(int $version): TestDomainEvent - { - return $this->withAddedMetadata('_aggregate_version', $version); - } - - public function version(): int - { - return $this->metadata['_aggregate_version']; - } -} diff --git a/tests/TestAsset/TestState.php b/tests/TestAsset/TestState.php deleted file mode 100644 index 6c65f81..0000000 --- a/tests/TestAsset/TestState.php +++ /dev/null @@ -1,21 +0,0 @@ - - * (c) 2017-2017 Sascha-Oliver Prolic - * - * For the full copyright and license information, please view the LICENSE - * file that was distributed with this source code. - */ - -declare(strict_types=1); - -namespace ProophTest\Micro\TestAsset; - -class TestState -{ - public function version(): int - { - return 4; - } -} From f215ee3dc822152a4961476b08a6e4cd7822233f Mon Sep 17 00:00:00 2001 From: prolic Date: Tue, 21 Apr 2020 22:52:45 -0400 Subject: [PATCH 2/8] add tests --- composer.json | 4 +- examples/register_and_change_username.php | 7 +- spec/CommandSpecificationSpec.php | 82 ++++++++++++++++++++++ spec/KernelSpec.php | 83 +++++++++++++++++++++++ src/Kernel.php | 2 +- 5 files changed, 169 insertions(+), 9 deletions(-) create mode 100644 spec/CommandSpecificationSpec.php create mode 100644 spec/KernelSpec.php diff --git a/composer.json b/composer.json index 66162a7..34cec45 100755 --- a/composer.json +++ b/composer.json @@ -28,9 +28,7 @@ "phunkie/phunkie": "0.11.1" }, "require-dev": { - "phpspec/prophecy": "^1.10.3", - "phpspec/prophecy-phpunit": "^2.0", - "phpunit/phpunit": "^9.1", + "kahlan/kahlan": "^4.7.4", "prooph/event-store-client": "dev-master", "prooph/php-cs-fixer-config": "^0.3.1", "php-coveralls/php-coveralls": "^2.1", diff --git a/examples/register_and_change_username.php b/examples/register_and_change_username.php index 744d9a1..efe4ab5 100644 --- a/examples/register_and_change_username.php +++ b/examples/register_and_change_username.php @@ -14,7 +14,6 @@ namespace Prooph\MicroExample\Script; use Amp\Loop; -use Amp\Promise; use Phunkie\Validation\Validation; use Prooph\EventStore\UserCredentials; use Prooph\EventStore\Util\Guid; @@ -72,10 +71,8 @@ function showResult(Validation $result): void $uniqueEmailGuard = new InMemoryEmailGuard(); $commandMap = ImmMap([ - ChangeUserName::class => fn (ChangeUserName $m) => new UserSpecification($m, User\changeUserName), - RegisterUser::class => fn (RegisterUser $m) => new UserSpecification($m, function (callable $stateResolver, $message) use ($uniqueEmailGuard): Promise { - return User\registerUser($stateResolver, $message, $uniqueEmailGuard); - }), + ChangeUserName::class => fn ($m) => new UserSpecification($m, User\changeUserName), + RegisterUser::class => fn ($m) => new UserSpecification($m, fn (callable $s, $m) => User\registerUser($s, $m, $uniqueEmailGuard)), ]); $dispatch = Kernel\buildCommandDispatcher($connection, $commandMap); diff --git a/spec/CommandSpecificationSpec.php b/spec/CommandSpecificationSpec.php new file mode 100644 index 0000000..64621d7 --- /dev/null +++ b/spec/CommandSpecificationSpec.php @@ -0,0 +1,82 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\Micro; + +use Amp\Success; +use Kahlan\Plugin\Double; +use Prooph\EventStore\EventId; +use Prooph\EventStore\ExpectedVersion; +use Prooph\EventStore\RecordedEvent; +use Prooph\EventStore\ResolvedEvent; +use Prooph\Micro\CommandSpecification; + +describe('Prooph Micro', function () { + context('Command Specification', function () { + describe('Command Handling', function () { + it('can handle incoming commands', function () { + $command = new \stdClass(); + $handler = fn ($s, $m) => new Success('foo'); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + expect($spec->handle(fn () => []))->toEqual(new Success('foo')); + }); + }); + + describe('Aggregate State Reconstruction', function () { + it('can reconstitute aggregate state from event history', function () { + $command = new \stdClass(); + $handler = fn () => new Success(); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + allow($spec)->toReceive('initialState')->andReturn(0); + allow($spec)->toReceive('mapToEvent')->andRun(function (ResolvedEvent $re): object { + $e = new \stdClass(); + $e->v = (int) $re->originalEvent()->data(); + + return $e; + }); + allow($spec)->toReceive('apply')->andReturn(6); + + $now = new \DateTimeImmutable(); + + $re1 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '1', '', $now), null, null); + $re2 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '2', '', $now), null, null); + $re3 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '3', '', $now), null, null); + + expect($spec->reconstituteFromHistory(ImmList($re1, $re2, $re3)))->toBe(6); + }); + }); + + describe('Default expected version value', function () { + it('will return "any" as default value for expected version', function () { + $command = new \stdClass(); + $handler = fn () => new Success(); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + expect($spec->expectedVersion())->toBe(ExpectedVersion::ANY); + }); + }); + }); +}); diff --git a/spec/KernelSpec.php b/spec/KernelSpec.php new file mode 100644 index 0000000..64e79f5 --- /dev/null +++ b/spec/KernelSpec.php @@ -0,0 +1,83 @@ + + * (c) 2017-2020 Sascha-Oliver Prolic + * + * For the full copyright and license information, please view the LICENSE + * file that was distributed with this source code. + */ + +declare(strict_types=1); + +namespace ProophTest\Micro; + +use function Amp\Promise\wait; +use Amp\Success; +use Kahlan\Plugin\Double; +use Prooph\EventStore\Async\EventStoreConnection; +use Prooph\EventStore\SliceReadStatus; +use Prooph\EventStore\StreamEventsSlice; +use Prooph\Micro\CommandSpecification; +use function Prooph\Micro\Kernel\stateResolver; + +describe('Prooph Micro', function () { + context('Kernel', function () { + describe('State Resolver', function () { + it('will throw, when stream not found', function () { + $command = new \stdClass(); + $handler = fn ($s, $m) => $s(); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + $slice = Double::instance([ + 'extends' => StreamEventsSlice::class, + 'magicMethods' => true, + ]); + + allow($spec)->toReceive('streamName')->andReturn('test-stream'); + allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); + allow(StreamEventsSlice::class)->toReceive('status')->andReturn(SliceReadStatus::streamNotFound()); + + $closure = fn () => wait(stateResolver($connection, $spec)()); + + expect($closure)->toThrow(new \RuntimeException('Stream not found')); + }); + + it('will throw, when stream deleted', function () { + $command = new \stdClass(); + $handler = fn ($s, $m) => $s(); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + $slice = Double::instance([ + 'extends' => StreamEventsSlice::class, + 'magicMethods' => true, + ]); + + allow($spec)->toReceive('streamName')->andReturn('test-stream'); + allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); + allow(StreamEventsSlice::class)->toReceive('status')->andReturn(SliceReadStatus::streamDeleted()); + + $closure = fn () => wait(stateResolver($connection, $spec)()); + + expect($closure)->toThrow(new \RuntimeException('Stream deleted')); + }); + }); + }); +}); diff --git a/src/Kernel.php b/src/Kernel.php index 93895ea..ded8b04 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -79,7 +79,7 @@ function stateResolver(EventStoreConnection $eventStore, CommandSpecification $s return $specification->reconstituteFromHistory(ImmList(...$slice->events())); break; case SliceReadStatus::STREAM_NOT_FOUND: - return null; + throw new \RuntimeException('Stream not found'); break; case SliceReadStatus::STREAM_DELETED: throw new \RuntimeException('Stream deleted'); From 03d41488f28586a2c72c039de17b977768c313ce Mon Sep 17 00:00:00 2001 From: prolic Date: Tue, 21 Apr 2020 23:01:40 -0400 Subject: [PATCH 3/8] update travis build --- .travis.yml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/.travis.yml b/.travis.yml index d2ba29f..7236066 100644 --- a/.travis.yml +++ b/.travis.yml @@ -23,9 +23,10 @@ before_script: - phpenv config-rm xdebug.ini - composer self-update - composer update --prefer-dist $DEPENDENCIES + - mkdir -p build/logs script: - - if [[ $TEST_COVERAGE == 'true' ]]; then php -dzend_extension=xdebug.so ./vendor/bin/phpunit --coverage-text --coverage-clover ./build/logs/clover.xml; else ./vendor/bin/phpunit; fi + - if [[ $TEST_COVERAGE == 'true' ]]; then php -dzend_extension=xdebug.so ./vendor/bin/kahlan --coverage=4 --clover=./build/logs/clover.xml; else ./vendor/bin/kahlan; fi - if [[ $EXECUTE_CS_CHECK == 'true' ]]; then ./vendor/bin/php-cs-fixer fix -v --diff --dry-run; fi - if [[ $EXECUTE_CS_CHECK == 'true' ]]; then ./vendor/bin/docheader check src/ tests/ examples/; fi From 3ffc1df017948c9c6d1a14e5a95733710c9142db Mon Sep 17 00:00:00 2001 From: prolic Date: Wed, 22 Apr 2020 14:33:18 -0400 Subject: [PATCH 4/8] add kernel specs --- spec/KernelSpec.php | 169 +++++++++++++++++++++++++++++++++++++++++++- src/Kernel.php | 3 - 2 files changed, 166 insertions(+), 6 deletions(-) diff --git a/spec/KernelSpec.php b/spec/KernelSpec.php index 64e79f5..c2412b4 100644 --- a/spec/KernelSpec.php +++ b/spec/KernelSpec.php @@ -16,15 +16,65 @@ use function Amp\Promise\wait; use Amp\Success; use Kahlan\Plugin\Double; +use Phunkie\Types\ImmList; +use Phunkie\Validation\Failure; +use Phunkie\Validation\Success as ValidationSuccess; use Prooph\EventStore\Async\EventStoreConnection; +use Prooph\EventStore\EventData; +use Prooph\EventStore\EventId; +use Prooph\EventStore\RecordedEvent; +use Prooph\EventStore\ResolvedEvent; use Prooph\EventStore\SliceReadStatus; use Prooph\EventStore\StreamEventsSlice; use Prooph\Micro\CommandSpecification; +use function Prooph\Micro\Kernel\buildCommandDispatcher; use function Prooph\Micro\Kernel\stateResolver; describe('Prooph Micro', function () { context('Kernel', function () { - describe('State Resolver', function () { + describe('when resolving state', function () { + it('will reconstitute state when event history found', function () { + $command = new \stdClass(); + $handler = fn ($s, $m) => $s(); + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + $slice = Double::instance([ + 'extends' => StreamEventsSlice::class, + 'magicMethods' => true, + ]); + + $now = new \DateTimeImmutable(); + + $re1 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '1', '', $now), null, null); + $re2 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '2', '', $now), null, null); + $re3 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '3', '', $now), null, null); + + allow($spec)->toReceive('streamName')->andReturn('test-stream'); + allow($spec)->toReceive('initialState')->andReturn(0); + allow($spec)->toReceive('mapToEvent')->andRun(function (ResolvedEvent $re): object { + $e = new \stdClass(); + $e->v = (int) $re->originalEvent()->data(); + + return $e; + }); + allow($spec)->toReceive('apply')->andReturn(6); + + allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); + + allow($slice)->toReceive('status')->andReturn(SliceReadStatus::success()); + allow($slice)->toReceive('events')->andReturn([$re1, $re2, $re3]); + + expect(wait(stateResolver($connection, $spec)()))->toBe(6); + }); + it('will throw, when stream not found', function () { $command = new \stdClass(); $handler = fn ($s, $m) => $s(); @@ -45,7 +95,7 @@ allow($spec)->toReceive('streamName')->andReturn('test-stream'); allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); - allow(StreamEventsSlice::class)->toReceive('status')->andReturn(SliceReadStatus::streamNotFound()); + allow($slice)->toReceive('status')->andReturn(SliceReadStatus::streamNotFound()); $closure = fn () => wait(stateResolver($connection, $spec)()); @@ -72,12 +122,125 @@ allow($spec)->toReceive('streamName')->andReturn('test-stream'); allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); - allow(StreamEventsSlice::class)->toReceive('status')->andReturn(SliceReadStatus::streamDeleted()); + allow($slice)->toReceive('status')->andReturn(SliceReadStatus::streamDeleted()); $closure = fn () => wait(stateResolver($connection, $spec)()); expect($closure)->toThrow(new \RuntimeException('Stream deleted')); }); }); + + describe('when executing the command dispatcher', function () { + it('will execute the command handler successfully', function () { + $command = new \stdClass(); + $handler = function ($s, $m) { + $event = new \stdClass(); + $event->v = 'foo'; + + return new Success(ImmList($event)); + }; + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + $map = ImmMap([ + \stdClass::class => fn ($m) => $spec, + ]); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + allow($spec)->toReceive('streamName')->andReturn('test-stream'); + allow($spec)->toReceive('mapToEventData')->andRun(function (object $e): object { + return new EventData( + null, + 'test-event', + false, + $e->v, + '' + ); + }); + + allow($connection)->toReceive('appendToStreamAsync')->andRun(fn () => new Success()); + + $dispatch = buildCommandDispatcher($connection, $map); + + $result = wait($dispatch($command)); + + $expectedEvent = new \stdClass(); + $expectedEvent->v = 'foo'; + + expect($result)->toBeAnInstanceOf(ValidationSuccess::class); + $list = $result->getOrElse(null); + expect($list)->toBeAnInstanceOf(ImmList::class); + expect($list->head())->toEqual($expectedEvent); + }); + + it('returns failure when no specification found for a given command', function () { + $command = new \stdClass(); + $map = ImmMap(); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + $dispatch = buildCommandDispatcher($connection, $map); + + expect(wait($dispatch($command)))->toBeAnInstanceOf(Failure::class); + }); + + it('returns failure when command handler throws', function () { + $command = new \stdClass(); + $handler = function ($s, $m) { + throw new \RuntimeException('Boom!'); + }; + + $spec = Double::instance([ + 'extends' => CommandSpecification::class, + 'args' => [$command, $handler], + ]); + + $map = ImmMap([ + \stdClass::class => fn ($m) => $spec, + ]); + + $connection = Double::instance([ + 'implements' => EventStoreConnection::class, + ]); + + $slice = Double::instance([ + 'extends' => StreamEventsSlice::class, + 'magicMethods' => true, + ]); + + $now = new \DateTimeImmutable(); + + $re1 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '1', '', $now), null, null); + $re2 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '2', '', $now), null, null); + $re3 = new ResolvedEvent(new RecordedEvent('stream', 0, EventId::generate(), 'test', false, '3', '', $now), null, null); + + allow($spec)->toReceive('streamName')->andReturn('test-stream'); + allow($spec)->toReceive('initialState')->andReturn(0); + allow($spec)->toReceive('mapToEvent')->andRun(function (ResolvedEvent $re): object { + $e = new \stdClass(); + $e->v = (int) $re->originalEvent()->data(); + + return $e; + }); + allow($spec)->toReceive('apply')->andReturn(6); + + allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); + + allow($slice)->toReceive('status')->andReturn(SliceReadStatus::success()); + allow($slice)->toReceive('events')->andReturn([$re1, $re2, $re3]); + + $dispatch = buildCommandDispatcher($connection, $map); + + expect(wait($dispatch($command)))->toBeAnInstanceOf(Failure::class); + }); + }); }); }); diff --git a/src/Kernel.php b/src/Kernel.php index ded8b04..7eb49b1 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -77,13 +77,10 @@ function stateResolver(EventStoreConnection $eventStore, CommandSpecification $s switch ($slice->status()->value()) { case SliceReadStatus::SUCCESS: return $specification->reconstituteFromHistory(ImmList(...$slice->events())); - break; case SliceReadStatus::STREAM_NOT_FOUND: throw new \RuntimeException('Stream not found'); - break; case SliceReadStatus::STREAM_DELETED: throw new \RuntimeException('Stream deleted'); - break; } }); }; From ddd07bdf0df13a9ed3c7058587be982fb224b3db Mon Sep 17 00:00:00 2001 From: prolic Date: Sun, 26 Apr 2020 16:19:55 -0400 Subject: [PATCH 5/8] make handlers return async generators --- .../Infrastructure/InMemoryEmailGuard.php | 6 +- examples/Model/UniqueEmailGuard.php | 7 +- examples/Model/User.php | 18 ++--- spec/CommandSpecificationSpec.php | 4 +- spec/KernelSpec.php | 9 ++- src/CommandSpecification.php | 4 +- src/Kernel.php | 74 ++++++++++++------- 7 files changed, 77 insertions(+), 45 deletions(-) diff --git a/examples/Infrastructure/InMemoryEmailGuard.php b/examples/Infrastructure/InMemoryEmailGuard.php index a1f4d87..01573ba 100644 --- a/examples/Infrastructure/InMemoryEmailGuard.php +++ b/examples/Infrastructure/InMemoryEmailGuard.php @@ -13,13 +13,15 @@ namespace Prooph\MicroExample\Infrastructure; +use Amp\Promise; +use Amp\Success; use Prooph\MicroExample\Model\UniqueEmailGuard; final class InMemoryEmailGuard implements UniqueEmailGuard { private $knownEmails = []; - public function isUnique(string $email): bool + public function isUnique(string $email): Promise { $isUnique = ! \in_array($email, $this->knownEmails); @@ -27,6 +29,6 @@ public function isUnique(string $email): bool $this->knownEmails[] = $email; } - return $isUnique; + return new Success($isUnique); } } diff --git a/examples/Model/UniqueEmailGuard.php b/examples/Model/UniqueEmailGuard.php index 1be400f..be6108d 100644 --- a/examples/Model/UniqueEmailGuard.php +++ b/examples/Model/UniqueEmailGuard.php @@ -13,7 +13,12 @@ namespace Prooph\MicroExample\Model; +use Amp\Promise; + interface UniqueEmailGuard { - public function isUnique(string $email): bool; + /** + * @return Promise + */ + public function isUnique(string $email): Promise; } diff --git a/examples/Model/User.php b/examples/Model/User.php index 5313ecb..cae5bb0 100644 --- a/examples/Model/User.php +++ b/examples/Model/User.php @@ -13,8 +13,7 @@ namespace Prooph\MicroExample\Model\User; -use Amp\Promise; -use Amp\Success; +use Generator; use InvalidArgumentException; use Phunkie\Types\ImmList; use Prooph\MicroExample\Model\Command\ChangeUserName; @@ -26,24 +25,26 @@ const registerUser = '\Prooph\MicroExample\Model\User\registerUser'; -function registerUser(callable $stateResolver, RegisterUser $command, UniqueEmailGuard $guard): Promise +function registerUser(callable $stateResolver, RegisterUser $command, UniqueEmailGuard $guard): Generator { - if ($guard->isUnique($command->email())) { - return new Success(ImmList(new UserRegistered($command->payload()))); + if (! yield $guard->isUnique($command->email())) { + yield new UserRegisteredWithDuplicateEmail($command->payload()); + + return; } - return new Success(ImmList(new UserRegisteredWithDuplicateEmail($command->payload()))); + yield new UserRegistered($command->payload()); } const changeUserName = '\Prooph\MicroExample\Model\User\changeUserName'; -function changeUserName(callable $stateResolver, ChangeUserName $command): Promise +function changeUserName(callable $stateResolver, ChangeUserName $command): Generator { if (! \mb_strlen($command->name()) > 3) { throw new InvalidArgumentException('Username too short'); } - return new Success(ImmList(new UserNameChanged($command->payload()))); + yield new UserNameChanged($command->payload()); } const apply = '\Prooph\MicroExample\Model\User\apply'; @@ -51,7 +52,6 @@ function changeUserName(callable $stateResolver, ChangeUserName $command): Promi function apply($state, ImmList $events): array { return $events->fold($state, function ($state, $e) { - \var_dump($state, $e); switch (\get_class($e)) { case UserRegistered::class: return \array_merge($state, $e->payload(), ['activated' => true]); diff --git a/spec/CommandSpecificationSpec.php b/spec/CommandSpecificationSpec.php index 64621d7..249b2b4 100644 --- a/spec/CommandSpecificationSpec.php +++ b/spec/CommandSpecificationSpec.php @@ -26,14 +26,14 @@ describe('Command Handling', function () { it('can handle incoming commands', function () { $command = new \stdClass(); - $handler = fn ($s, $m) => new Success('foo'); + $handler = fn ($s, $m) => yield 'foo'; $spec = Double::instance([ 'extends' => CommandSpecification::class, 'args' => [$command, $handler], ]); - expect($spec->handle(fn () => []))->toEqual(new Success('foo')); + expect($spec->handle(fn () => []))->toBeAnInstanceOf(\Generator::class); }); }); diff --git a/spec/KernelSpec.php b/spec/KernelSpec.php index c2412b4..94d59b0 100644 --- a/spec/KernelSpec.php +++ b/spec/KernelSpec.php @@ -69,10 +69,11 @@ allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); + allow($slice)->toReceive('isEndOfStream')->andReturn(true); allow($slice)->toReceive('status')->andReturn(SliceReadStatus::success()); allow($slice)->toReceive('events')->andReturn([$re1, $re2, $re3]); - expect(wait(stateResolver($connection, $spec)()))->toBe(6); + expect(wait(stateResolver($connection, $spec, 5)()))->toBe(6); }); it('will throw, when stream not found', function () { @@ -97,7 +98,7 @@ allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); allow($slice)->toReceive('status')->andReturn(SliceReadStatus::streamNotFound()); - $closure = fn () => wait(stateResolver($connection, $spec)()); + $closure = fn () => wait(stateResolver($connection, $spec, 5)()); expect($closure)->toThrow(new \RuntimeException('Stream not found')); }); @@ -124,7 +125,7 @@ allow($connection)->toReceive('readStreamEventsForwardAsync')->andReturn(new Success($slice)); allow($slice)->toReceive('status')->andReturn(SliceReadStatus::streamDeleted()); - $closure = fn () => wait(stateResolver($connection, $spec)()); + $closure = fn () => wait(stateResolver($connection, $spec, 5)()); expect($closure)->toThrow(new \RuntimeException('Stream deleted')); }); @@ -137,7 +138,7 @@ $event = new \stdClass(); $event->v = 'foo'; - return new Success(ImmList($event)); + yield $event; }; $spec = Double::instance([ diff --git a/src/CommandSpecification.php b/src/CommandSpecification.php index 007f296..a296841 100644 --- a/src/CommandSpecification.php +++ b/src/CommandSpecification.php @@ -13,8 +13,8 @@ namespace Prooph\Micro; -use Amp\Promise; use Closure; +use Generator; use Phunkie\Types\ImmList; use Prooph\EventStore\EventData; use Prooph\EventStore\ExpectedVersion; @@ -31,7 +31,7 @@ public function __construct(object $command, callable $handler) $this->handler = Closure::fromCallable($handler); } - public function handle(Closure $stateResolver): Promise + public function handle(Closure $stateResolver): Generator { return ($this->handler)($stateResolver, $this->command); } diff --git a/src/Kernel.php b/src/Kernel.php index 7eb49b1..7d687f9 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -14,6 +14,7 @@ namespace Prooph\Micro\Kernel; use function Amp\call; +use Amp\Producer; use Amp\Promise; use Closure; use function Failure; @@ -29,10 +30,11 @@ function buildCommandDispatcher( EventStoreConnection $eventStore, - ImmMap $commandMap + ImmMap $commandMap, + int $readBatchSize = 200 ): callable { - return function (object $m) use ($eventStore, $commandMap): Promise { - return call(function () use ($m, $eventStore, $commandMap): Generator { + return function (object $m) use ($eventStore, $commandMap, $readBatchSize): Promise { + return call(function () use ($m, $eventStore, $commandMap, $readBatchSize): Generator { $messageClass = \get_class($m); $config = $commandMap->get($messageClass); @@ -44,44 +46,66 @@ function buildCommandDispatcher( \assert($specification instanceof CommandSpecification); try { - $es = yield $specification->handle(stateResolver($eventStore, $specification)); + $iterator = new Producer(function (callable $emit) use ($eventStore, $specification, $readBatchSize): Generator { + foreach ($specification->handle(stateResolver($eventStore, $specification, $readBatchSize)) as $eventOrPromise) { + if ($eventOrPromise instanceof Promise) { + yield $eventOrPromise; + } else { + yield $emit($eventOrPromise); + } + } + }); + + $events = []; + $eventData = []; + + while (yield $iterator->advance()) { + $event = $iterator->getCurrent(); + $events[] = $event; + $eventData[] = $specification->mapToEventData($event); + } yield $eventStore->appendToStreamAsync( $specification->streamName(), $specification->expectedVersion(), - $es->map(fn ($e) => $specification->mapToEventData($e))->toArray() + $eventData ); } catch (\Throwable $e) { return Failure($e); } - return Success($es); + return Success(ImmList(...$events)); }); }; } const stateResolver = 'Prooph\Micro\Kernel\stateResolver'; -function stateResolver(EventStoreConnection $eventStore, CommandSpecification $specification): Closure +function stateResolver(EventStoreConnection $eventStore, CommandSpecification $specification, int $readBatchSize): Closure { - return function () use ($eventStore, $specification): Promise { - return call(function () use ($eventStore, $specification): Generator { - $slice = yield $eventStore->readStreamEventsForwardAsync( - $specification->streamName(), - 0, - 4096, - ); - - \assert($slice instanceof StreamEventsSlice); - - switch ($slice->status()->value()) { - case SliceReadStatus::SUCCESS: - return $specification->reconstituteFromHistory(ImmList(...$slice->events())); - case SliceReadStatus::STREAM_NOT_FOUND: - throw new \RuntimeException('Stream not found'); - case SliceReadStatus::STREAM_DELETED: - throw new \RuntimeException('Stream deleted'); - } + return function () use ($eventStore, $specification, $readBatchSize): Promise { + return call(function () use ($eventStore, $specification, $readBatchSize): Generator { + $events = []; + + do { + $slice = yield $eventStore->readStreamEventsForwardAsync( + $specification->streamName(), + 0, + $readBatchSize, + ); + \assert($slice instanceof StreamEventsSlice); + + switch ($slice->status()->value()) { + case SliceReadStatus::STREAM_NOT_FOUND: + throw new \RuntimeException('Stream not found'); + case SliceReadStatus::STREAM_DELETED: + throw new \RuntimeException('Stream deleted'); + } + + $events = \array_merge($events, $slice->events()); + } while (! $slice->isEndOfStream() && \count($slice->events()) === $readBatchSize); + + return $specification->reconstituteFromHistory(ImmList(...$events)); }); }; } From 7c6f8efde4944f94cbcad2b458f9d197373635b4 Mon Sep 17 00:00:00 2001 From: prolic Date: Tue, 9 Jun 2020 13:25:13 -0400 Subject: [PATCH 6/8] fix kernel event dispatching --- spec/KernelSpec.php | 18 +++++++------- src/Kernel.php | 57 ++++++++++++++++++++++++--------------------- 2 files changed, 40 insertions(+), 35 deletions(-) diff --git a/spec/KernelSpec.php b/spec/KernelSpec.php index 94d59b0..a8c01cd 100644 --- a/spec/KernelSpec.php +++ b/spec/KernelSpec.php @@ -15,10 +15,9 @@ use function Amp\Promise\wait; use Amp\Success; +use function expect; use Kahlan\Plugin\Double; use Phunkie\Types\ImmList; -use Phunkie\Validation\Failure; -use Phunkie\Validation\Success as ValidationSuccess; use Prooph\EventStore\Async\EventStoreConnection; use Prooph\EventStore\EventData; use Prooph\EventStore\EventId; @@ -29,6 +28,7 @@ use Prooph\Micro\CommandSpecification; use function Prooph\Micro\Kernel\buildCommandDispatcher; use function Prooph\Micro\Kernel\stateResolver; +use RuntimeException; describe('Prooph Micro', function () { context('Kernel', function () { @@ -174,10 +174,8 @@ $expectedEvent = new \stdClass(); $expectedEvent->v = 'foo'; - expect($result)->toBeAnInstanceOf(ValidationSuccess::class); - $list = $result->getOrElse(null); - expect($list)->toBeAnInstanceOf(ImmList::class); - expect($list->head())->toEqual($expectedEvent); + expect($result)->toBeAnInstanceOf(ImmList::class); + expect($result->head())->toEqual($expectedEvent); }); it('returns failure when no specification found for a given command', function () { @@ -189,8 +187,11 @@ ]); $dispatch = buildCommandDispatcher($connection, $map); + $syncedDispatch = fn () => wait($dispatch($command)); - expect(wait($dispatch($command)))->toBeAnInstanceOf(Failure::class); + expect($syncedDispatch)->toThrow( + new RuntimeException('No configuration found for stdClass') + ); }); it('returns failure when command handler throws', function () { @@ -239,8 +240,9 @@ allow($slice)->toReceive('events')->andReturn([$re1, $re2, $re3]); $dispatch = buildCommandDispatcher($connection, $map); + $syncedDispatch = fn () => wait($dispatch($command)); - expect(wait($dispatch($command)))->toBeAnInstanceOf(Failure::class); + expect($syncedDispatch)->toThrow(new RuntimeException('Boom!')); }); }); }); diff --git a/src/Kernel.php b/src/Kernel.php index 7d687f9..c3264d4 100644 --- a/src/Kernel.php +++ b/src/Kernel.php @@ -17,14 +17,13 @@ use Amp\Producer; use Amp\Promise; use Closure; -use function Failure; use Generator; use Phunkie\Types\ImmMap; use Prooph\EventStore\Async\EventStoreConnection; use Prooph\EventStore\SliceReadStatus; use Prooph\EventStore\StreamEventsSlice; use Prooph\Micro\CommandSpecification; -use function Success; +use RuntimeException; const buildCommandDispatcher = 'Prooph\Micro\Kernel\buildCommandDispatcher'; @@ -39,42 +38,46 @@ function buildCommandDispatcher( $config = $commandMap->get($messageClass); if ($config->isEmpty()) { - return Failure('No configuration found for ' . $messageClass); + throw new RuntimeException( + 'No configuration found for ' . $messageClass + ); } $specification = $config->get()($m); \assert($specification instanceof CommandSpecification); - try { - $iterator = new Producer(function (callable $emit) use ($eventStore, $specification, $readBatchSize): Generator { - foreach ($specification->handle(stateResolver($eventStore, $specification, $readBatchSize)) as $eventOrPromise) { - if ($eventOrPromise instanceof Promise) { - yield $eventOrPromise; - } else { - yield $emit($eventOrPromise); - } - } - }); + $iterator = new Producer(function (callable $emit) use ($eventStore, $specification, $readBatchSize): Generator { + $generator = $specification->handle(stateResolver($eventStore, $specification, $readBatchSize)); - $events = []; - $eventData = []; + while ($generator->valid()) { + $eventOrPromise = $generator->current(); - while (yield $iterator->advance()) { - $event = $iterator->getCurrent(); - $events[] = $event; - $eventData[] = $specification->mapToEventData($event); + if ($eventOrPromise instanceof Promise) { + $generator->send(yield $eventOrPromise); + } else { + yield $emit($eventOrPromise); + $generator->next(); + } } + }); - yield $eventStore->appendToStreamAsync( - $specification->streamName(), - $specification->expectedVersion(), - $eventData - ); - } catch (\Throwable $e) { - return Failure($e); + $events = []; + $eventData = []; + + while (yield $iterator->advance()) { + $event = $iterator->getCurrent(); + + $events[] = $event; + $eventData[] = $specification->mapToEventData($event); } - return Success(ImmList(...$events)); + yield $eventStore->appendToStreamAsync( + $specification->streamName(), + $specification->expectedVersion(), + $eventData + ); + + return ImmList(...$events); }); }; } From 77460d0acce640feb17524442167e3cd43f2006d Mon Sep 17 00:00:00 2001 From: prolic Date: Tue, 9 Jun 2020 13:54:45 -0400 Subject: [PATCH 7/8] update example --- examples/register_and_change_username.php | 43 ++++++++++++++--------- 1 file changed, 26 insertions(+), 17 deletions(-) diff --git a/examples/register_and_change_username.php b/examples/register_and_change_username.php index efe4ab5..8b32d42 100644 --- a/examples/register_and_change_username.php +++ b/examples/register_and_change_username.php @@ -14,7 +14,8 @@ namespace Prooph\MicroExample\Script; use Amp\Loop; -use Phunkie\Validation\Validation; +use const PHP_EOL; +use Phunkie\Types\ImmList; use Prooph\EventStore\UserCredentials; use Prooph\EventStore\Util\Guid; use Prooph\EventStoreClient\ConnectionSettings; @@ -26,22 +27,17 @@ use Prooph\MicroExample\Model\Command\RegisterUser; use Prooph\MicroExample\Model\Command\UnknownCommand; use Prooph\MicroExample\Model\User; +use Throwable; $autoloader = require __DIR__ . '/../vendor/autoload.php'; $autoloader->addPsr4('Prooph\\MicroExample\\', __DIR__); require 'Model/User.php'; -function showResult(Validation $result): void +function showResult($result): void { - $on = match($result); - switch (true) { - case $on(Success(_)): - echo $result->show() . PHP_EOL; - echo \json_encode($result->getOrElse('')->head()->payload()) . PHP_EOL . PHP_EOL; - break; - case $on(Failure(_)): - echo $result->show() . PHP_EOL . PHP_EOL; - break; + if ($result instanceof ImmList) { + echo $result->show() . PHP_EOL; + echo \json_encode($result->head()->payload()) . PHP_EOL . PHP_EOL; } } @@ -54,7 +50,7 @@ function showResult(Validation $result): void ); $connection = EventStoreConnectionFactory::createFromConnectionString( - 'ConnectTo=tcp://admin:changeit@localhost:1113', + 'ConnectTo=tcp://admin:changeit@10.121.1.4:1113', $settings->build() ); @@ -79,15 +75,28 @@ function showResult(Validation $result): void $userId = Guid::generateString(); - /* @var Validation $result */ - $result = yield $dispatch(new RegisterUser(['id' => $userId, 'name' => 'Alex', 'email' => 'member@getprooph.org'])); + try { + $result = yield $dispatch(new RegisterUser(['id' => $userId, 'name' => 'Alex', 'email' => 'member@getprooph.org'])); + } catch (Throwable $e) { + echo \get_class($e) . ': ' . $e->getMessage() . PHP_EOL . PHP_EOL; + } + showResult($result); - $result = yield $dispatch(new ChangeUserName(['id' => $userId, 'name' => 'Sascha'])); + try { + $result = yield $dispatch(new ChangeUserName(['id' => $userId, 'name' => 'Sascha'])); + } catch (Throwable $e) { + echo \get_class($e) . ': ' . $e->getMessage() . PHP_EOL . PHP_EOL; + } + showResult($result); - // unknown command - $result = yield $dispatch(new UnknownCommand()); + try { + $result = yield $dispatch(new UnknownCommand()); + } catch (Throwable $e) { + echo \get_class($e) . ': ' . $e->getMessage() . PHP_EOL . PHP_EOL; + } + showResult($result); $time = \microtime(true) - $start; From cbe45afd06143f05aab83c3bd93d33d5928849e1 Mon Sep 17 00:00:00 2001 From: prolic Date: Tue, 9 Jun 2020 13:57:00 -0400 Subject: [PATCH 8/8] remove phpunit xml --- phpunit.xml.dist | 22 ---------------------- 1 file changed, 22 deletions(-) delete mode 100644 phpunit.xml.dist diff --git a/phpunit.xml.dist b/phpunit.xml.dist deleted file mode 100644 index ce92fd9..0000000 --- a/phpunit.xml.dist +++ /dev/null @@ -1,22 +0,0 @@ - - - - ./tests/ - - - - - ./src/ - - -