Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue #919: Enable IEventStore to load events up to a given sequence number #1008

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions Source/EventFlow.MongoDB/EventStore/MongoDbEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,17 @@ public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEvent
.ConfigureAwait(continueOnCapturedContext: false);
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id, int fromEventSequenceNumber, int toEventSequenceNumber,
CancellationToken cancellationToken)
{
return await MongoDbEventStoreCollection
.Find(model => model.AggregateId == id.Value &&
model.AggregateSequenceNumber >= fromEventSequenceNumber &&
model.AggregateSequenceNumber <= toEventSequenceNumber)
.ToListAsync(cancellationToken)
.ConfigureAwait(continueOnCapturedContext: false);
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
DeleteResult affectedRows = await MongoDbEventStoreCollection
Expand Down
31 changes: 31 additions & 0 deletions Source/EventFlow.MsSql/EventStores/MsSqlEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,6 +195,37 @@ ORDER BY
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
AggregateId = @AggregateId AND
AggregateSequenceNumber >= @FromEventSequenceNumber AND
AggregateSequenceNumber <= @ToEventSequenceNumber
ORDER BY
AggregateSequenceNumber ASC";
var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("mssql-fetch-events"),
null,
cancellationToken,
sql,
new
{
AggregateId = id.Value,
FromEventSequenceNumber = fromEventSequenceNumber,
ToEventSequenceNumber = toEventSequenceNumber
})
.ConfigureAwait(false);
return eventDataModels;
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
const string sql = @"DELETE FROM EventFlow WHERE AggregateId = @AggregateId";
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,38 @@ INSERT INTO
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
const string sql = @"
SELECT
GlobalSequenceNumber, BatchId, AggregateId, AggregateName, Data, Metadata, AggregateSequenceNumber
FROM EventFlow
WHERE
AggregateId = @AggregateId AND
AggregateSequenceNumber >= @FromEventSequenceNumber AND
AggregateSequenceNumber <= @ToSequenceNumber
ORDER BY
AggregateSequenceNumber ASC;";

var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("postgresql-fetch-events"),
null,
cancellationToken,
sql,
new
{
AggregateId = id.Value,
FromEventSequenceNumber = fromEventSequenceNumber,
ToSequenceNumber = toEventSequenceNumber
})
.ConfigureAwait(false);
return eventDataModels;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
Expand All @@ -184,6 +216,7 @@ FROM EventFlow
AggregateSequenceNumber >= @FromEventSequenceNumber
ORDER BY
AggregateSequenceNumber ASC;";

var eventDataModels = await _connection.QueryAsync<EventDataModel>(
Label.Named("postgresql-fetch-events"),
null,
Expand Down
16 changes: 16 additions & 0 deletions Source/EventFlow.TestHelpers/Suites/TestSuiteForEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,22 @@ public async Task LoadingOfEventsCanStartLater()
domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4);
domainEvents.ElementAt(2).AggregateSequenceNumber.Should().Be(5);
}

[Test]
public async Task LoadingOfEventsCanStartLaterAndStopEarlier()
{
// Arrange
var id = ThingyId.New;
await PublishPingCommandsAsync(id, 5);

// Act
var domainEvents = await EventStore.LoadEventsAsync<ThingyAggregate, ThingyId>(id, 3, 4, CancellationToken.None);

// Assert
domainEvents.Should().HaveCount(2);
domainEvents.ElementAt(0).AggregateSequenceNumber.Should().Be(3);
domainEvents.ElementAt(1).AggregateSequenceNumber.Should().Be(4);
}

[Test]
public async Task AggregateCanHaveMultipleCommits()
Expand Down
10 changes: 10 additions & 0 deletions Source/EventFlow.Tests/IntegrationTests/CancellationTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -379,6 +379,16 @@ public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEvent
return result;
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
var result = await _inner.LoadCommittedEventsAsync(id, fromEventSequenceNumber, toEventSequenceNumber, cancellationToken);
await LoadCompletionSource.Task;
return result;
}

public Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
return _inner.DeleteEventsAsync(id, cancellationToken);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,27 @@ public async Task Test_Arrange_EventStore(int eventInStore, int fromEventSequenc
// Assert
domainEvents.Should().HaveCount(expectedNumberOfEvents);
}

[Description("Mock test")]
[TestCase(5, 3, 5, 3)]
[TestCase(5, 0, 5, 5)]
[TestCase(5, 1, 2, 2)]
[TestCase(0, 1, 2, 0)]
public async Task Test_Arrange_EventStore_SequenceRange(int eventInStore, int fromEventSequenceNumber, int toEventSequenceNumber, int expectedNumberOfEvents)
{
// Arrange
Arrange_EventStore(ManyDomainEvents<ThingyPingEvent>(eventInStore));

// Act
var domainEvents = await _eventStoreMock.Object.LoadEventsAsync<ThingyAggregate, ThingyId>(
A<ThingyId>(),
fromEventSequenceNumber,
toEventSequenceNumber,
CancellationToken.None);

// Assert
domainEvents.Should().HaveCount(expectedNumberOfEvents);
}

private void Arrange_EventStore(IEnumerable<IDomainEvent<ThingyAggregate, ThingyId>> domainEvents)
{
Expand All @@ -123,6 +144,10 @@ private void Arrange_EventStore(IEnumerable<IDomainEvent<ThingyAggregate, Thingy
_eventStoreMock
.Setup(e => e.LoadEventsAsync<ThingyAggregate, ThingyId>(It.IsAny<ThingyId>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<ThingyId, int, CancellationToken>((id, seq, c) => Task.FromResult<IReadOnlyCollection<IDomainEvent<ThingyAggregate, ThingyId>>>(domainEventList.Skip(Math.Max(seq - 1, 0)).ToList()));

_eventStoreMock
.Setup(e => e.LoadEventsAsync<ThingyAggregate, ThingyId>(It.IsAny<ThingyId>(), It.IsAny<int>(), It.IsAny<int>(), It.IsAny<CancellationToken>()))
.Returns<ThingyId, int, int, CancellationToken>((id, from, to, c) => Task.FromResult<IReadOnlyCollection<IDomainEvent<ThingyAggregate, ThingyId>>>(domainEventList.Take(to).Skip(Math.Max(from - 1, 0)).ToList()));
}

private void Arrange_Snapshot(ThingySnapshot thingySnapshot)
Expand Down
32 changes: 29 additions & 3 deletions Source/EventFlow/EventStores/EventStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -154,6 +154,25 @@ public Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEvents
cancellationToken);
}

public async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken) where TAggregate : IAggregateRoot<TIdentity> where TIdentity : IIdentity
{
if (fromEventSequenceNumber < 1) throw new ArgumentOutOfRangeException(nameof(fromEventSequenceNumber), "Event sequence numbers start at 1");
if (toEventSequenceNumber <= fromEventSequenceNumber) throw new ArgumentOutOfRangeException(nameof(toEventSequenceNumber), "Event sequence numbers end at start");

var committedDomainEvents = await _eventPersistence.LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
toEventSequenceNumber,
cancellationToken)
.ConfigureAwait(false);

return await MapToDomainEvents<TAggregate, TIdentity>(id, cancellationToken, committedDomainEvents);
}

public virtual async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromEventSequenceNumber,
Expand All @@ -164,10 +183,17 @@ public virtual async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity
if (fromEventSequenceNumber < 1) throw new ArgumentOutOfRangeException(nameof(fromEventSequenceNumber), "Event sequence numbers start at 1");

var committedDomainEvents = await _eventPersistence.LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
cancellationToken)
id,
fromEventSequenceNumber,
cancellationToken)
.ConfigureAwait(false);

return await MapToDomainEvents<TAggregate, TIdentity>(id, cancellationToken, committedDomainEvents);
}

private async Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> MapToDomainEvents<TAggregate, TIdentity>(TIdentity id, CancellationToken cancellationToken,
IReadOnlyCollection<ICommittedDomainEvent> committedDomainEvents) where TAggregate : IAggregateRoot<TIdentity> where TIdentity : IIdentity
{
var domainEvents = (IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>)committedDomainEvents
.Select(e => _eventJsonSerializer.Deserialize<TAggregate, TIdentity>(id, e))
.ToList();
Expand Down
25 changes: 25 additions & 0 deletions Source/EventFlow/EventStores/Files/FilesEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -236,6 +236,31 @@ public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEvent
}
}

public async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
using (await _asyncLock.WaitAsync(cancellationToken).ConfigureAwait(false))
{
var committedDomainEvents = new List<ICommittedDomainEvent>();
for (var i = fromEventSequenceNumber; i <= toEventSequenceNumber ; i++)
{
var eventPath = _filesEventLocator.GetEventPath(id, i);
if (!File.Exists(eventPath))
{
return committedDomainEvents;
}

var committedDomainEvent = await LoadFileEventDataFile(eventPath).ConfigureAwait(false);
committedDomainEvents.Add(committedDomainEvent);
}

return committedDomainEvents;
}
}

public async Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken)
{
_logger.LogTrace("Deleting entity with ID {EventFilePath}", id);
Expand Down
6 changes: 6 additions & 0 deletions Source/EventFlow/EventStores/IEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
CancellationToken cancellationToken);

Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken);

Task DeleteEventsAsync(IIdentity id, CancellationToken cancellationToken);
}
Expand Down
8 changes: 8 additions & 0 deletions Source/EventFlow/EventStores/IEventStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<T
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;

Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
int fromSequenceNumber,
int toSequenceNumber,
CancellationToken cancellationToken)
where TAggregate : IAggregateRoot<TIdentity>
where TIdentity : IIdentity;

Task<IReadOnlyCollection<IDomainEvent<TAggregate, TIdentity>>> LoadEventsAsync<TAggregate, TIdentity>(
TIdentity id,
Expand Down
24 changes: 22 additions & 2 deletions Source/EventFlow/EventStores/InMemory/InMemoryEventPersistence.cs
Original file line number Diff line number Diff line change
Expand Up @@ -193,13 +193,33 @@ public Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync
IIdentity id,
int fromEventSequenceNumber,
CancellationToken cancellationToken)
{
return LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
e => e.AggregateSequenceNumber >= fromEventSequenceNumber);
}

public Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(
IIdentity id,
int fromEventSequenceNumber,
int toEventSequenceNumber,
CancellationToken cancellationToken)
{
return LoadCommittedEventsAsync(
id,
fromEventSequenceNumber,
e => e.AggregateSequenceNumber >= fromEventSequenceNumber && e.AggregateSequenceNumber <= toEventSequenceNumber);
}

private Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCommittedEventsAsync(IIdentity id,int fromEventSequenceNumber, Func<InMemoryCommittedDomainEvent, bool> filter)
{
IReadOnlyCollection<ICommittedDomainEvent> result;

if (_eventStore.TryGetValue(id.Value, out var committedDomainEvent))
result = fromEventSequenceNumber <= 1
? (IReadOnlyCollection<ICommittedDomainEvent>) committedDomainEvent
: committedDomainEvent.Where(e => e.AggregateSequenceNumber >= fromEventSequenceNumber).ToList();
? (IReadOnlyCollection<ICommittedDomainEvent>)committedDomainEvent
: committedDomainEvent.Where(filter).ToList();
else
result = new List<InMemoryCommittedDomainEvent>();

Expand Down
Loading