Skip to content

Commit

Permalink
Merge pull request #581 from patchlevel/aggregate-id-type-configurable
Browse files Browse the repository at this point in the history
make aggregate id type configurable between uuid and string
  • Loading branch information
DavidBadura authored Apr 18, 2024
2 parents 80fa7c6 + e7c57ac commit d27f8bc
Show file tree
Hide file tree
Showing 18 changed files with 191 additions and 180 deletions.
12 changes: 12 additions & 0 deletions docs/pages/aggregate_id.md
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,12 @@ since only an aggregate-wide unique string is expected in the store.

This library provides you with a few options for generating the id.

!!! warning

Performance reasons, the default configuration of the store require an uuid string for `aggregate id`.
But technically, for the library, it can be any string.
If you want to use a custom id, you have to change the `aggregate_id_type` in the [store](store.md) configuration.

## Uuid

The easiest way is to use an `uuid` as an aggregate ID.
Expand Down Expand Up @@ -68,6 +74,12 @@ final class Profile extends BasicAggregateRoot
private CustomId $id;
}
```
!!! warning

If you want to use a custom id that is not an uuid,
you need to change the `aggregate_id_type` to `string` in the store configuration.
More information can be found [here](store.md).

!!! note

If you want to use snapshots, then you have to make sure that the aggregate id are normalized.
Expand Down
2 changes: 1 addition & 1 deletion docs/pages/message_decorator.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ final class OnSystemRecordedDecorator implements MessageDecorator
```
!!! note

The Message is immutable, for more information look up [here](event_bus.md#message).
The Message is immutable, for more information look up [here](message.md).

!!! tip

Expand Down
35 changes: 20 additions & 15 deletions docs/pages/store.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ Each message contains an event and the associated headers.

!!! note

More information about the message can be found [here](event_bus.md).
More information about the message can be found [here](message.md).

The store is optimized to efficiently store and load events for aggregates.
We currently only offer one [doctrine dbal](https://www.doctrine-project.org/projects/dbal.html) store.
Expand Down Expand Up @@ -39,28 +39,33 @@ $store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths(['src/Event']),
null,
'eventstore',
['table_name' => 'eventstore'],
);
```
## Schema

The table structure of the `DoctrineDbalStore` looks like this:

| Column | Type | Description |
|------------------|----------|--------------------------------------------------|
| id | bigint | The index of the whole stream (autoincrement) |
| aggregate | string | The name of the aggregate |
| aggregate_id | string | The id of the aggregate |
| playhead | int | The current playhead of the aggregate |
| event | string | The name of the event |
| payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |
| Column | Type | Description |
|------------------|-------------|--------------------------------------------------|
| id | bigint | The index of the whole stream (autoincrement) |
| aggregate | string | The name of the aggregate |
| aggregate_id | uuid/string | The id of the aggregate |
| playhead | int | The current playhead of the aggregate |
| event | string | The name of the event |
| payload | json | The payload of the event |
| recorded_on | datetime | The date when the event was recorded |
| new_stream_start | bool | If the event is the first event of the aggregate |
| archived | bool | If the event is archived |
| custom_headers | json | Custom headers for the event |

With the help of the `SchemaDirector`, the database structure can be created, updated and deleted.

!!! note

The default type of the `aggregate_id` column is `uuid` if the database supports it and `string` if not.
You can change the type with the `aggregate_id_type` to `string` if you want use custom id.

!!! tip

You can also use doctrine [migration](migration.md) to create and keep your schema in sync.
Expand Down Expand Up @@ -280,7 +285,7 @@ foreach ($stream as $message) {
```
!!! note

You can find more information about the `Message` object [here](event_bus.md).
You can find more information about the `Message` object [here](message.md).

!!! warning

Expand Down
26 changes: 17 additions & 9 deletions docs/pages/subscription.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,10 +125,6 @@ final class WelcomeEmailProcessor
}
}
```
!!! note

More about the processor can be found [here](processor.md).

### Subscribe

A subscriber (projector/processor) can subscribe any number of events.
Expand All @@ -155,16 +151,28 @@ final class DoStuffSubscriber
}
}
```
!!! note

You can subscribe to multiple events on the same method or you can use "*" to subscribe to all events.
More about this can be found [here](./event_bus.md#listener).

!!! tip

If you are using psalm then you can install the event sourcing [plugin](https://github.com/patchlevel/event-sourcing-psalm-plugin)
to make the event method return the correct type.

### Subscribe all events

If you want to subscribe on all events, you can pass `*` or `Subscribe::ALL` instead of the event class.

```php
use Patchlevel\EventSourcing\Attribute\Subscribe;
use Patchlevel\EventSourcing\Message\Message;

final class WelcomeSubscriber
{
#[Subscribe('*')]
public function onProfileCreated(Message $message): void
{
echo 'Welcome!';
}
}
```
#### Argument Resolver

The library analyses the method signature and tries to resolve the arguments.
Expand Down
37 changes: 25 additions & 12 deletions src/Store/DoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@

use function array_fill;
use function array_filter;
use function array_merge;
use function array_values;
use function class_exists;
use function count;
Expand All @@ -49,13 +50,22 @@ final class DoctrineDbalStore implements Store, ArchivableStore, SubscriptionSto

private readonly HeadersSerializer $headersSerializer;

/** @var array{table_name: string, aggregate_id_type: 'string'|'uuid'} */
private readonly array $config;

/** @param array{table_name?: string, aggregate_id_type?: 'string'|'uuid'} $config */
public function __construct(
private readonly Connection $connection,
private readonly EventSerializer $eventSerializer,
HeadersSerializer|null $headersSerializer = null,
private readonly string $storeTableName = 'eventstore',
array $config = [],
) {
$this->headersSerializer = $headersSerializer ?? DefaultHeadersSerializer::createDefault();

$this->config = array_merge([
'table_name' => 'eventstore',
'aggregate_id_type' => 'uuid',
], $config);
}

public function load(
Expand All @@ -66,7 +76,7 @@ public function load(
): DoctrineDbalStoreStream {
$builder = $this->connection->createQueryBuilder()
->select('*')
->from($this->storeTableName)
->from($this->config['table_name'])
->orderBy('id', $backwards ? 'DESC' : 'ASC');

$this->applyCriteria($builder, $criteria ?? new Criteria());
Expand All @@ -90,7 +100,7 @@ public function count(Criteria|null $criteria = null): int
{
$builder = $this->connection->createQueryBuilder()
->select('COUNT(*)')
->from($this->storeTableName);
->from($this->config['table_name']);

$this->applyCriteria($builder, $criteria ?? new Criteria());

Expand Down Expand Up @@ -244,7 +254,7 @@ public function archiveMessages(string $aggregateName, string $aggregateId, int
AND aggregate_id = :aggregate_id
AND playhead < :playhead
AND archived = false',
$this->storeTableName,
$this->config['table_name'],
));

$statement->bindValue('aggregate', $aggregateName);
Expand All @@ -260,14 +270,17 @@ public function configureSchema(Schema $schema, Connection $connection): void
return;
}

$table = $schema->createTable($this->storeTableName);
$table = $schema->createTable($this->config['table_name']);

$table->addColumn('id', Types::BIGINT)
->setAutoincrement(true);
$table->addColumn('aggregate', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('aggregate_id', Types::STRING)
$table->addColumn(
'aggregate_id',
$this->config['aggregate_id_type'] === 'uuid' ? Types::GUID : Types::STRING,
)
->setLength(36)
->setNotnull(true);
$table->addColumn('playhead', Types::INTEGER)
Expand Down Expand Up @@ -321,7 +334,7 @@ public function wait(int $timeoutMilliseconds): void
return;
}

$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->storeTableName));
$this->connection->executeStatement(sprintf('LISTEN "%s"', $this->config['table_name']));

/** @var PDO $nativeConnection */
$nativeConnection = $this->connection->getNativeConnection();
Expand All @@ -347,23 +360,23 @@ public function setupSubscription(): void
$$ LANGUAGE plpgsql;
SQL,
$functionName,
$this->storeTableName,
$this->config['table_name'],
));

$this->connection->executeStatement(sprintf(
'DROP TRIGGER IF EXISTS notify_trigger ON %s;',
$this->storeTableName,
$this->config['table_name'],
));
$this->connection->executeStatement(sprintf(
'CREATE TRIGGER notify_trigger AFTER INSERT OR UPDATE ON %1$s FOR EACH ROW EXECUTE PROCEDURE %2$s();',
$this->storeTableName,
$this->config['table_name'],
$functionName,
));
}

private function createTriggerFunctionName(): string
{
$tableConfig = explode('.', $this->storeTableName);
$tableConfig = explode('.', $this->config['table_name']);

if (count($tableConfig) === 1) {
return sprintf('notify_%1$s', $tableConfig[0]);
Expand All @@ -387,7 +400,7 @@ private function executeSave(
): void {
$query = sprintf(
"INSERT INTO %s (%s) VALUES\n(%s)",
$this->storeTableName,
$this->config['table_name'],
implode(', ', $columns),
implode("),\n(", $placeholders),
);
Expand Down
6 changes: 0 additions & 6 deletions tests/Benchmark/PersonalDataBench.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Cryptography\DoctrineCipherKeyStore;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Repository\Repository;
use Patchlevel\EventSourcing\Schema\ChainDoctrineSchemaConfigurator;
Expand Down Expand Up @@ -45,11 +44,6 @@ public function setUp(): void
[__DIR__ . '/BasicImplementation/Events'],
cryptographer: $cryptographer,
),
DefaultHeadersSerializer::createFromPaths([
__DIR__ . '/../../src',
__DIR__ . '/BasicImplementation/Events',
]),
'eventstore',
);

$this->repository = new DefaultRepository($this->store, Profile::metadata());
Expand Down
6 changes: 0 additions & 6 deletions tests/Benchmark/blackfire.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@

use Doctrine\DBAL\Driver\PDO\SQLite\Driver;
use Doctrine\DBAL\DriverManager;
use Patchlevel\EventSourcing\Message\Serializer\DefaultHeadersSerializer;
use Patchlevel\EventSourcing\Repository\DefaultRepository;
use Patchlevel\EventSourcing\Schema\DoctrineSchemaDirector;
use Patchlevel\EventSourcing\Serializer\DefaultEventSerializer;
Expand All @@ -28,11 +27,6 @@
$store = new DoctrineDbalStore(
$connection,
DefaultEventSerializer::createFromPaths([__DIR__ . '/BasicImplementation/Events']),
DefaultHeadersSerializer::createFromPaths([
__DIR__ . '/../../src',
__DIR__ . '/BasicImplementation/Events',
]),
'eventstore',
);

$repository = new DefaultRepository($store, Profile::metadata());
Expand Down
16 changes: 2 additions & 14 deletions tests/Integration/BankAccountSplitStream/AccountId.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,21 +5,9 @@
namespace Patchlevel\EventSourcing\Tests\Integration\BankAccountSplitStream;

use Patchlevel\EventSourcing\Aggregate\AggregateRootId;
use Patchlevel\EventSourcing\Aggregate\RamseyUuidBehaviour;

final class AccountId implements AggregateRootId
{
private function __construct(
private string $id,
) {
}

public static function fromString(string $id): self
{
return new self($id);
}

public function toString(): string
{
return $this->id;
}
use RamseyUuidBehaviour;
}
16 changes: 11 additions & 5 deletions tests/Integration/BankAccountSplitStream/IntegrationTest.php
Original file line number Diff line number Diff line change
Expand Up @@ -75,19 +75,22 @@ public function testSuccessful(): void
$engine->setup();
$engine->boot();

$bankAccountId = AccountId::fromString('1');
$bankAccountId = AccountId::v7();
$bankAccount = BankAccount::create($bankAccountId, 'John');
$bankAccount->addBalance(100);
$bankAccount->addBalance(500);
$repository->save($bankAccount);

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);
$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_bank_account WHERE id = ?',
[$bankAccountId->toString()],
);

self::assertIsArray($result);
self::assertArrayHasKey('id', $result);
self::assertSame('1', $result['id']);
self::assertSame($bankAccountId->toString(), $result['id']);
self::assertSame('John', $result['name']);
self::assertSame(600, $result['balance_in_cents']);

Expand Down Expand Up @@ -119,11 +122,14 @@ public function testSuccessful(): void

$engine->run();

$result = $this->connection->fetchAssociative('SELECT * FROM projection_bank_account WHERE id = ?', ['1']);
$result = $this->connection->fetchAssociative(
'SELECT * FROM projection_bank_account WHERE id = ?',
[$bankAccountId->toString()],
);

self::assertIsArray($result);
self::assertArrayHasKey('id', $result);
self::assertSame('1', $result['id']);
self::assertSame($bankAccountId->toString(), $result['id']);
self::assertSame('John', $result['name']);
self::assertSame(800, $result['balance_in_cents']);

Expand Down
Loading

0 comments on commit d27f8bc

Please sign in to comment.