Skip to content

Commit

Permalink
Merge pull request #657 from patchlevel/event-id
Browse files Browse the repository at this point in the history
[Experimental] introduce event id & fix split stream logic in stream store
  • Loading branch information
DavidBadura authored Dec 17, 2024
2 parents 9a480c8 + bed9ffd commit 366de9f
Show file tree
Hide file tree
Showing 9 changed files with 279 additions and 108 deletions.
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,10 @@ phpunit-integration: vendor
phpunit-integration-postgres: vendor ## run phpunit integration tests on postgres
DB_URL="pdo-pgsql://postgres:postgres@localhost:5432/eventstore?charset=utf8" vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-integration-mysql
phpunit-integration-mysql: vendor ## run phpunit integration tests on mysql
DB_URL="pdo-mysql://root@localhost:3306/eventstore?charset=utf8" vendor/bin/phpunit --testsuite=integration

.PHONY: phpunit-unit
phpunit-unit: vendor ## run phpunit unit tests
XDEBUG_MODE=coverage vendor/bin/phpunit --testsuite=unit
Expand Down
10 changes: 9 additions & 1 deletion docker-compose.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,12 @@ services:
- POSTGRES_PASSWORD=postgres
- POSTGRES_DB=eventstore
ports:
- 5432:5432
- 5432:5432

mysql:
image: mysql:8
environment:
- MYSQL_ALLOW_EMPTY_PASSWORD="yes"
- MYSQL_DATABASE="eventstore"
ports:
- 3306:3306
2 changes: 2 additions & 0 deletions src/Metadata/Message/MessageHeaderRegistry.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@

use Patchlevel\EventSourcing\Aggregate\AggregateHeader;
use Patchlevel\EventSourcing\Store\ArchivedHeader;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -81,6 +82,7 @@ public static function createWithInternalHeaders(array $headerNameToClassMap = [
'aggregate' => AggregateHeader::class,
'archived' => ArchivedHeader::class,
'newStreamStart' => StreamStartHeader::class,
'eventId' => EventIdHeader::class,
];

return new self($headerNameToClassMap + $internalHeaders);
Expand Down
17 changes: 17 additions & 0 deletions src/Store/Header/EventIdHeader.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
<?php

declare(strict_types=1);

namespace Patchlevel\EventSourcing\Store\Header;

/**
* @psalm-immutable
* @experimental
*/
final class EventIdHeader
{
public function __construct(
public readonly string $eventId,
) {
}
}
67 changes: 49 additions & 18 deletions src/Store/StreamDoctrineDbalStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,13 @@
use Patchlevel\EventSourcing\Store\Criteria\FromIndexCriterion;
use Patchlevel\EventSourcing\Store\Criteria\FromPlayheadCriterion;
use Patchlevel\EventSourcing\Store\Criteria\StreamCriterion;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
use PDO;
use Psr\Clock\ClockInterface;
use Ramsey\Uuid\Uuid;

use function array_fill;
use function array_filter;
Expand Down Expand Up @@ -192,17 +194,18 @@ public function save(Message ...$messages): void

$this->transactional(
function () use ($messages): void {
/** @var array<string, int> $achievedUntilPlayhead */
$achievedUntilPlayhead = [];
/** @var array<string, int> $achievedUntilEventId */
$achievedUntilEventId = [];

$booleanType = Type::getType(Types::BOOLEAN);
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

$columns = [
'stream',
'playhead',
'event',
'payload',
'event_id',
'event_name',
'event_payload',
'recorded_on',
'new_stream_start',
'archived',
Expand Down Expand Up @@ -232,13 +235,19 @@ function () use ($messages): void {
throw new MissingDataForStorage($e->name, $e);
}

$playhead = null;

if ($message->hasHeader(PlayheadHeader::class)) {
$playhead = $message->header(PlayheadHeader::class)->playhead;
$parameters[] = $message->header(PlayheadHeader::class)->playhead;
} else {
$parameters[] = null;
}

if ($message->hasHeader(EventIdHeader::class)) {
$eventId = $message->header(EventIdHeader::class)->eventId;
} else {
$eventId = Uuid::uuid7()->toString();
}

$parameters[] = $playhead;
$parameters[] = $eventId;
$parameters[] = $data->name;
$parameters[] = $data->payload;

Expand All @@ -248,19 +257,19 @@ function () use ($messages): void {
$parameters[] = $this->clock->now();
}

$types[$offset + 4] = $dateTimeType;
$types[$offset + 5] = $dateTimeType;

$streamStart = $message->hasHeader(StreamStartHeader::class);

if ($streamStart && $playhead !== null) {
$achievedUntilPlayhead[$streamName] = $playhead;
if ($streamStart) {
$achievedUntilEventId[$streamName] = $eventId;
}

$parameters[] = $streamStart;
$types[$offset + 5] = $booleanType;
$types[$offset + 6] = $booleanType;

$parameters[] = $message->hasHeader(ArchivedHeader::class);
$types[$offset + 6] = $booleanType;
$types[$offset + 7] = $booleanType;

$parameters[] = $this->headersSerializer->serialize($this->getCustomHeaders($message));

Expand All @@ -283,21 +292,38 @@ function () use ($messages): void {
$this->executeSave($columns, $placeholders, $parameters, $types, $this->connection);
}

foreach ($achievedUntilPlayhead as $stream => $playhead) {
$subselect = null;

foreach ($achievedUntilEventId as $stream => $eventId) {
if ($subselect === null) {
if ($this->connection->getDatabasePlatform() instanceof MySQLPlatform) {
$subselect = sprintf(
'SELECT t.id FROM (SELECT * FROM %s) AS t WHERE t.event_id = :event_id',
$this->config['table_name'],
);
} else {
$subselect = sprintf(
'SELECT id FROM %s WHERE event_id = :event_id',
$this->config['table_name'],
);
}
}

$this->connection->executeStatement(
sprintf(
<<<'SQL'
UPDATE %s
SET archived = true
WHERE stream = :stream
AND playhead < :playhead
AND id < (%s)
AND archived = false
SQL,
$this->config['table_name'],
$subselect,
),
[
'stream' => $stream,
'playhead' => $playhead,
'event_id' => $eventId,
],
);
}
Expand Down Expand Up @@ -366,10 +392,13 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(true);
$table->addColumn('playhead', Types::INTEGER)
->setNotnull(false);
$table->addColumn('event', Types::STRING)
$table->addColumn('event_id', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('event_name', Types::STRING)
->setLength(255)
->setNotnull(true);
$table->addColumn('payload', Types::JSON)
$table->addColumn('event_payload', Types::JSON)
->setNotnull(true);
$table->addColumn('recorded_on', Types::DATETIMETZ_IMMUTABLE)
->setNotnull(true);
Expand All @@ -383,6 +412,7 @@ public function configureSchema(Schema $schema, Connection $connection): void
->setNotnull(true);

$table->setPrimaryKey(['id']);
$table->addUniqueIndex(['event_id']);
$table->addUniqueIndex(['stream', 'playhead']);
$table->addIndex(['stream', 'playhead', 'archived']);
}
Expand All @@ -392,6 +422,7 @@ private function getCustomHeaders(Message $message): array
{
$filteredHeaders = [
StreamNameHeader::class,
EventIdHeader::class,
PlayheadHeader::class,
RecordedOnHeader::class,
StreamStartHeader::class,
Expand Down
8 changes: 5 additions & 3 deletions src/Store/StreamDoctrineDbalStoreStream.php
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
use Patchlevel\EventSourcing\Message\Serializer\HeadersSerializer;
use Patchlevel\EventSourcing\Serializer\EventSerializer;
use Patchlevel\EventSourcing\Serializer\SerializedEvent;
use Patchlevel\EventSourcing\Store\Header\EventIdHeader;
use Patchlevel\EventSourcing\Store\Header\PlayheadHeader;
use Patchlevel\EventSourcing\Store\Header\RecordedOnHeader;
use Patchlevel\EventSourcing\Store\Header\StreamNameHeader;
Expand Down Expand Up @@ -120,7 +121,7 @@ private function buildGenerator(
/** @var DateTimeTzImmutableType $dateTimeType */
$dateTimeType = Type::getType(Types::DATETIMETZ_IMMUTABLE);

/** @var array{id: positive-int, stream: string, playhead: int|string|null, event: string, payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */
/** @var array{id: positive-int, stream: string, playhead: int|string|null, event_id: string, event_name: string, event_payload: string, recorded_on: string, archived: int|string, new_stream_start: int|string, custom_headers: string} $data */
foreach ($result->iterateAssociative() as $data) {
if ($this->position === null) {
$this->position = 0;
Expand All @@ -129,11 +130,12 @@ private function buildGenerator(
}

$this->index = $data['id'];
$event = $eventSerializer->deserialize(new SerializedEvent($data['event'], $data['payload']));
$event = $eventSerializer->deserialize(new SerializedEvent($data['event_name'], $data['event_payload']));

$message = Message::create($event)
->withHeader(new StreamNameHeader($data['stream']))
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)));
->withHeader(new RecordedOnHeader($dateTimeType->convertToPHPValue($data['recorded_on'], $platform)))
->withHeader(new EventIdHeader($data['event_id']));

if ($data['playhead'] !== null) {
$message = $message->withHeader(new PlayheadHeader((int)$data['playhead']));
Expand Down
Loading

0 comments on commit 366de9f

Please sign in to comment.