Skip to content

Commit

Permalink
Major rewrite
Browse files Browse the repository at this point in the history
  • Loading branch information
bwaidelich committed May 16, 2024
1 parent 317d8a1 commit f3737f0
Show file tree
Hide file tree
Showing 19 changed files with 423 additions and 201 deletions.
13 changes: 6 additions & 7 deletions src/EventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
use Wwwision\DCBEventStore\Exceptions\ConditionalAppendFailed;
use Wwwision\DCBEventStore\Types\AppendCondition;
use Wwwision\DCBEventStore\Types\Events;
use Wwwision\DCBEventStore\Types\SequenceNumber;
use Wwwision\DCBEventStore\Types\ReadOptions;
use Wwwision\DCBEventStore\Types\StreamQuery\StreamQuery;

/**
Expand All @@ -19,17 +19,16 @@ interface EventStore
* Returns an event stream that contains events matching the specified {@see StreamQuery} in the order they occurred
*
* @param StreamQuery $query The StreamQuery filter every event has to match
* @param SequenceNumber|null $from If specified, only events with the given {@see SequenceNumber} or a higher one will be returned
* @param ReadOptions|null $options optional configuration for this interaction ({@see ReadOptions})
*/
public function read(StreamQuery $query, ?SequenceNumber $from = null): EventStream;
public function read(StreamQuery $query, ?ReadOptions $options = null): EventStream;

/**
* Returns an event stream that contains events matching the specified {@see StreamQuery} in descending order
* Returns an event stream that contains all events
*
* @param StreamQuery $query The StreamQuery filter every event has to match
* @param SequenceNumber|null $from If specified, only events with the given {@see SequenceNumber} or a lower one will be returned
* @param ReadOptions|null $options optional configuration for this interaction ({@see ReadOptions})
*/
public function readBackwards(StreamQuery $query, ?SequenceNumber $from = null): EventStream;
public function readAll(?ReadOptions $options = null): EventStream;

/**
* Commits the specified $events if the specified {@see AppendCondition} is satisfied
Expand Down
10 changes: 2 additions & 8 deletions src/Exceptions/ConditionalAppendFailed.php
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,7 @@

use RuntimeException;
use Wwwision\DCBEventStore\EventStore;
use Wwwision\DCBEventStore\Types\EventId;
use Wwwision\DCBEventStore\Types\ExpectedHighestSequenceNumber;
use Wwwision\DCBEventStore\Types\SequenceNumber;

/**
* An exception that is thrown when a {@see EventStore::conditionalAppend()} call has failed
Expand All @@ -19,18 +17,14 @@ private function __construct(string $message)
{
parent::__construct($message);
}
public static function becauseNoEventMatchedTheQuery(ExpectedHighestSequenceNumber $expectedHighestSequenceNumber): self
{
return new self("Expected highest sequence number \"$expectedHighestSequenceNumber\" but no events in the event store match the specified query");
}

public static function becauseNoEventWhereExpected(): self
{
return new self('The event store contained events matching the specified query but none were expected');
}

public static function becauseHighestExpectedSequenceNumberDoesNotMatch(ExpectedHighestSequenceNumber $expectedHighestSequenceNumber, SequenceNumber $actualSequenceNumber): self
public static function becauseHighestExpectedSequenceNumberDoesNotMatch(ExpectedHighestSequenceNumber $expectedHighestSequenceNumber): self
{
return new self("Expected highest sequence number \"$expectedHighestSequenceNumber\" does not match the actual id of \"$actualSequenceNumber->value\"");
return new self("Expected highest sequence number \"$expectedHighestSequenceNumber\" does not match the actual sequence number");
}
}
84 changes: 57 additions & 27 deletions src/Helpers/InMemoryEventStore.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,22 @@
namespace Wwwision\DCBEventStore\Helpers;

use DateTimeImmutable;
use RuntimeException;
use Wwwision\DCBEventStore\EventStore;
use Wwwision\DCBEventStore\EventStream;
use Wwwision\DCBEventStore\Exceptions\ConditionalAppendFailed;
use Wwwision\DCBEventStore\Types\AppendCondition;
use Wwwision\DCBEventStore\Types\Event;
use Wwwision\DCBEventStore\Types\EventEnvelope;
use Wwwision\DCBEventStore\Types\Events;
use Wwwision\DCBEventStore\Types\ReadOptions;
use Wwwision\DCBEventStore\Types\SequenceNumber;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\EventTypesAndTagsCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\EventTypesCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criteria\TagsCriterion;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHashes;
use Wwwision\DCBEventStore\Types\StreamQuery\StreamQuery;

use function count;

/**
Expand All @@ -29,9 +37,9 @@
final class InMemoryEventStore implements EventStore
{
/**
* @var EventEnvelope[]
* @var array<array{sequenceNumber:int,recordedAt:DateTimeImmutable,event:Event}>
*/
private array $eventEnvelopes = [];
private array $events = [];

private function __construct()
{
Expand All @@ -42,52 +50,74 @@ public static function create(): self
return new self();
}

public function read(StreamQuery $query, ?SequenceNumber $from = null): InMemoryEventStream
public function read(StreamQuery $query, ?ReadOptions $options = null): InMemoryEventStream
{
$matchingEventEnvelopes = $this->eventEnvelopes;
if ($from !== null) {
$matchingEventEnvelopes = array_filter($matchingEventEnvelopes, static fn (EventEnvelope $eventEnvelope) => $eventEnvelope->sequenceNumber->value >= $from->value);
$options ??= ReadOptions::create();
$matchingEventEnvelopes = [];
$events = $this->events;
if ($options->backwards) {
$events = array_reverse($events);
}
if (!$query->isWildcard()) {
$matchingEventEnvelopes = array_filter($matchingEventEnvelopes, static fn (EventEnvelope $eventEnvelope) => $query->matches($eventEnvelope->event));
foreach ($events as $event) {
if ($options->from !== null && (($options->backwards && $event['sequenceNumber'] > $options->from->value) || (!$options->backwards && $event['sequenceNumber'] < $options->from->value))) {
continue;
}
$matchedCriterionHashes = [];
if (!$query->isWildcard()) {
foreach ($query->criteria as $criterion) {
if (array_key_exists($criterion->hash()->value, $matchedCriterionHashes)) {
continue;
}
if (self::criterionMatchesEvent($criterion, $event['event'])) {
$matchedCriterionHashes[$criterion->hash()->value] = true;
}
}
if ($matchedCriterionHashes === []) {
continue;
}
}
$matchingEventEnvelopes[] = new EventEnvelope(SequenceNumber::fromInteger($event['sequenceNumber']), $event['recordedAt'], CriterionHashes::fromArray(array_keys($matchedCriterionHashes)), $event['event']);
}
return InMemoryEventStream::create(...$matchingEventEnvelopes);
}

public function readBackwards(StreamQuery $query, ?SequenceNumber $from = null): InMemoryEventStream
public function readAll(?ReadOptions $options = null): EventStream
{
$matchingEventEnvelopes = array_reverse($this->eventEnvelopes);
if ($from !== null) {
$matchingEventEnvelopes = array_filter($matchingEventEnvelopes, static fn (EventEnvelope $eventEnvelope) => $eventEnvelope->sequenceNumber->value <= $from->value);
}
if (!$query->isWildcard()) {
$matchingEventEnvelopes = array_filter($matchingEventEnvelopes, static fn (EventEnvelope $eventEnvelope) => $query->matches($eventEnvelope->event));
}
return InMemoryEventStream::create(...$matchingEventEnvelopes);
return $this->read(StreamQuery::wildcard(), $options);
}

private static function criterionMatchesEvent(Criterion $criterion, Event $event): bool
{
return match ($criterion::class) {
EventTypesAndTagsCriterion::class => $event->tags->containEvery($criterion->tags) && $criterion->eventTypes->contain($event->type),
EventTypesCriterion::class => $criterion->eventTypes->contain($event->type),
TagsCriterion::class => $event->tags->containEvery($criterion->tags),
default => throw new RuntimeException(sprintf('The criterion type "%s" is not supported by the %s', $criterion::class, self::class), 1700302540),
};
}

public function append(Events $events, AppendCondition $condition): void
{
if (!$condition->expectedHighestSequenceNumber->isAny()) {
$lastEventEnvelope = $this->readBackwards($condition->query)->first();
$lastEventEnvelope = $this->read($condition->query, ReadOptions::create(backwards: true))->first();
if ($lastEventEnvelope === null) {
if (!$condition->expectedHighestSequenceNumber->isNone()) {
throw ConditionalAppendFailed::becauseNoEventMatchedTheQuery($condition->expectedHighestSequenceNumber);
throw ConditionalAppendFailed::becauseHighestExpectedSequenceNumberDoesNotMatch($condition->expectedHighestSequenceNumber);
}
} elseif ($condition->expectedHighestSequenceNumber->isNone()) {
throw ConditionalAppendFailed::becauseNoEventWhereExpected();
} elseif (!$condition->expectedHighestSequenceNumber->matches($lastEventEnvelope->sequenceNumber)) {
throw ConditionalAppendFailed::becauseHighestExpectedSequenceNumberDoesNotMatch($condition->expectedHighestSequenceNumber, $lastEventEnvelope->sequenceNumber);
throw ConditionalAppendFailed::becauseHighestExpectedSequenceNumberDoesNotMatch($condition->expectedHighestSequenceNumber);
}
}
$sequenceNumber = count($this->eventEnvelopes);
$sequenceNumber = count($this->events);
foreach ($events as $event) {
$sequenceNumber++;
$this->eventEnvelopes[] = new EventEnvelope(
SequenceNumber::fromInteger($sequenceNumber),
new DateTimeImmutable(),
$event,
);
$this->events[] = [
'sequenceNumber' => $sequenceNumber,
'recordedAt' => new DateTimeImmutable(),
'event' => $event,
];
}
}
}
2 changes: 1 addition & 1 deletion src/Types/Event.php
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
final class Event
{
public function __construct(
public readonly EventId $id, // required for deduplication
public readonly EventId $id, // required for deduplication – TODO really? the sequenceNumber should work
public readonly EventType $type,
public readonly EventData $data, // opaque, no size limit?
public readonly Tags $tags,
Expand Down
4 changes: 3 additions & 1 deletion src/Types/EventEnvelope.php
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
namespace Wwwision\DCBEventStore\Types;

use DateTimeImmutable;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHashes;

/**
* An {@see Event} with its global {@see SequenceNumber} in the Events Store
Expand All @@ -13,7 +14,8 @@ final class EventEnvelope
{
public function __construct(
public readonly SequenceNumber $sequenceNumber,
public DateTimeImmutable $recordedAt,
public readonly DateTimeImmutable $recordedAt,
public readonly CriterionHashes $criterionHashes,
public readonly Event $event,
) {
}
Expand Down
43 changes: 43 additions & 0 deletions src/Types/ReadOptions.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
<?php

declare(strict_types=1);

namespace Wwwision\DCBEventStore\Types;

use Wwwision\DCBEventStore\EventStore;

/**
* Condition for {@see EventStore::read()} and {@see EventStore::readAll()}
*/
final class ReadOptions
{
/**
* @param SequenceNumber|null $from If specified, only events with the given {@see SequenceNumber} or a higher (lower if backwards) one will be returned
* @param bool $backwards If true, events will be returned in descending order, otherwise in the order they were appended
*/
private function __construct(
public readonly ?SequenceNumber $from,
public bool $backwards,
) {
}

public static function create(
?SequenceNumber $from = null,
bool $backwards = null,
): self {
return new self(
$from,
$backwards ?? false,
);
}

public function with(
?SequenceNumber $from = null,
bool $backwards = null,
): self {
return new self(
$from ?? $this->from,
$backwards ?? $this->backwards,
);
}
}
11 changes: 11 additions & 0 deletions src/Types/StreamQuery/Criteria.php
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,12 @@ public function with(Criterion $criterion): self
return new self(...[...$this->criteria, $criterion]);
}

// TODO: Deduplicate
public function merge(self $other): self
{
return new self(...[...$this->criteria, ...$other->criteria]);
}

public function getIterator(): Traversable
{
return new ArrayIterator($this->criteria);
Expand All @@ -66,6 +72,11 @@ public function map(Closure $callback): array
return array_map($callback, $this->criteria);
}

public function hashes(): CriterionHashes
{
return CriterionHashes::fromArray(array_map(static fn (Criterion $criterion) => $criterion->hash(), $this->criteria));
}

/**
* @return Criterion[]
*/
Expand Down
13 changes: 10 additions & 3 deletions src/Types/StreamQuery/Criteria/EventTypesAndTagsCriterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,28 @@

namespace Wwwision\DCBEventStore\Types\StreamQuery\Criteria;

use Wwwision\DCBEventStore\Types\Event;
use Wwwision\DCBEventStore\Types\EventTypes;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHash;
use Wwwision\DCBEventStore\Types\Tags;

final class EventTypesAndTagsCriterion implements Criterion
{
private readonly CriterionHash $hash;

public function __construct(
public readonly EventTypes $eventTypes,
public readonly Tags $tags,
) {
$this->hash = CriterionHash::fromParts(
substr(substr(self::class, 0, -9), strrpos(self::class, '\\') + 1),
implode(',', $this->eventTypes->toStringArray()),
implode(',', $this->tags->toSimpleArray()),
);
}

public function matches(Event $event): bool
public function hash(): CriterionHash
{
return $this->eventTypes->contain($event->type) && $event->tags->containEvery($this->tags);
return $this->hash;
}
}
12 changes: 9 additions & 3 deletions src/Types/StreamQuery/Criteria/EventTypesCriterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@

namespace Wwwision\DCBEventStore\Types\StreamQuery\Criteria;

use Wwwision\DCBEventStore\Types\Event;
use Wwwision\DCBEventStore\Types\EventTypes;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHash;

final class EventTypesCriterion implements Criterion
{
private readonly CriterionHash $hash;

public function __construct(
public readonly EventTypes $eventTypes,
) {
$this->hash = CriterionHash::fromParts(
substr(substr(self::class, 0, -9), strrpos(self::class, '\\') + 1),
implode(',', $this->eventTypes->toStringArray()),
);
}

public function matches(Event $event): bool
public function hash(): CriterionHash
{
return $this->eventTypes->contain($event->type);
return $this->hash;
}
}
12 changes: 9 additions & 3 deletions src/Types/StreamQuery/Criteria/TagsCriterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,19 +4,25 @@

namespace Wwwision\DCBEventStore\Types\StreamQuery\Criteria;

use Wwwision\DCBEventStore\Types\Event;
use Wwwision\DCBEventStore\Types\StreamQuery\Criterion;
use Wwwision\DCBEventStore\Types\StreamQuery\CriterionHash;
use Wwwision\DCBEventStore\Types\Tags;

final class TagsCriterion implements Criterion
{
private readonly CriterionHash $hash;

public function __construct(
public readonly Tags $tags,
) {
$this->hash = CriterionHash::fromParts(
substr(substr(self::class, 0, -9), strrpos(self::class, '\\') + 1),
implode(',', $this->tags->toSimpleArray()),
);
}

public function matches(Event $event): bool
public function hash(): CriterionHash
{
return $event->tags->containEvery($this->tags);
return $this->hash;
}
}
4 changes: 1 addition & 3 deletions src/Types/StreamQuery/Criterion.php
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,12 @@

namespace Wwwision\DCBEventStore\Types\StreamQuery;

use Wwwision\DCBEventStore\Types\Event;

/**
* Common marker interface for {@see StreamQuery} criteria
*
* @internal This is not meant to be implemented by external packages!
*/
interface Criterion
{
public function matches(Event $event): bool;
public function hash(): CriterionHash;
}
Loading

0 comments on commit f3737f0

Please sign in to comment.