diff --git a/src/Elders.Cronus.Persistence.Cassandra/CassandraEventStoreDiscovery.cs b/src/Elders.Cronus.Persistence.Cassandra/CassandraEventStoreDiscovery.cs index 7e15880..12a1a6d 100644 --- a/src/Elders.Cronus.Persistence.Cassandra/CassandraEventStoreDiscovery.cs +++ b/src/Elders.Cronus.Persistence.Cassandra/CassandraEventStoreDiscovery.cs @@ -7,6 +7,8 @@ using Elders.Cronus.Persistence.Cassandra.Migrations; using Elders.Cronus.Persistence.Cassandra.Preview; using Elders.Cronus.Persistence.Cassandra.ReplicationStrategies; +using Elders.Cronus.Persistence.Cassandra.Snapshots; +using Elders.Cronus.Snapshots.SnapshotStore; using Microsoft.Extensions.DependencyInjection; namespace Elders.Cronus.Persistence.Cassandra @@ -32,6 +34,9 @@ protected virtual IEnumerable DiscoverCassandraTableNameStrateg yield return new DiscoveredModel(typeof(ITableNamingStrategy), typeof(TablePerBoundedContext), ServiceLifetime.Singleton); yield return new DiscoveredModel(typeof(TablePerBoundedContext), typeof(TablePerBoundedContext), ServiceLifetime.Singleton); yield return new DiscoveredModel(typeof(NoTableNamingStrategy), typeof(NoTableNamingStrategy), ServiceLifetime.Singleton); + + yield return new DiscoveredModel(typeof(ISnapshotsTableNamingStrategy), typeof(SnapshotsTablePerBoundedContext), ServiceLifetime.Singleton); + yield return new DiscoveredModel(typeof(SnapshotsTablePerBoundedContext), typeof(SnapshotsTablePerBoundedContext), ServiceLifetime.Singleton); } IEnumerable GetModels(DiscoveryContext context) @@ -73,6 +78,11 @@ IEnumerable GetModels(DiscoveryContext context) yield return new DiscoveredModel(typeof(MessageCounter), typeof(MessageCounter), ServiceLifetime.Transient) { CanOverrideDefaults = true }; yield return new DiscoveredModel(typeof(IRebuildIndex_EventToAggregateRootId_JobFactory), typeof(RebuildIndex_EventToAggregateRootId_JobFactory), ServiceLifetime.Transient); + + yield return new DiscoveredModel(typeof(ISnapshotReader), typeof(CassandraSnapshotReader), ServiceLifetime.Transient) { CanOverrideDefaults = true }; + yield return new DiscoveredModel(typeof(CassandraSnapshotReader), typeof(CassandraSnapshotReader), ServiceLifetime.Transient) { CanOverrideDefaults = true }; + yield return new DiscoveredModel(typeof(ISnapshotWriter), typeof(CassandraSnapshotWriter), ServiceLifetime.Transient) { CanOverrideDefaults = true }; + yield return new DiscoveredModel(typeof(CassandraSnapshotWriter), typeof(CassandraSnapshotWriter), ServiceLifetime.Transient) { CanOverrideDefaults = true }; } } } diff --git a/src/Elders.Cronus.Persistence.Cassandra/CassandraProvider.cs b/src/Elders.Cronus.Persistence.Cassandra/CassandraProvider.cs index 27a5e32..a8a5d37 100644 --- a/src/Elders.Cronus.Persistence.Cassandra/CassandraProvider.cs +++ b/src/Elders.Cronus.Persistence.Cassandra/CassandraProvider.cs @@ -1,12 +1,11 @@ using System; +using System.Threading; +using System.Threading.Tasks; using Cassandra; using Elders.Cronus.Persistence.Cassandra.ReplicationStrategies; +using Microsoft.Extensions.Logging; using Microsoft.Extensions.Options; -using System.Threading; -using System.Threading.Tasks; using DataStax = Cassandra; -using Microsoft.Extensions.Logging; -using Microsoft.AspNetCore.Hosting.Server; namespace Elders.Cronus.Persistence.Cassandra { diff --git a/src/Elders.Cronus.Persistence.Cassandra/Elders.Cronus.Persistence.Cassandra.csproj b/src/Elders.Cronus.Persistence.Cassandra/Elders.Cronus.Persistence.Cassandra.csproj index de97f02..b092f21 100644 --- a/src/Elders.Cronus.Persistence.Cassandra/Elders.Cronus.Persistence.Cassandra.csproj +++ b/src/Elders.Cronus.Persistence.Cassandra/Elders.Cronus.Persistence.Cassandra.csproj @@ -32,7 +32,7 @@ - + diff --git a/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStore.cs b/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStore.cs index eec666f..70e53df 100644 --- a/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStore.cs +++ b/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStore.cs @@ -1,7 +1,3 @@ -using Cassandra; -using Elders.Cronus.EventStore; -using Elders.Cronus.EventStore.Index; -using Microsoft.Extensions.Logging; using System; using System.Collections.Generic; using System.IO; @@ -9,6 +5,10 @@ using System.Runtime.CompilerServices; using System.Threading; using System.Threading.Tasks; +using Cassandra; +using Elders.Cronus.EventStore; +using Elders.Cronus.EventStore.Index; +using Microsoft.Extensions.Logging; namespace Elders.Cronus.Persistence.Cassandra.Preview { @@ -24,6 +24,7 @@ public CassandraEventStore(TSettings settings, IndexByEventTypeStore indexByEven public class CassandraEventStore : IEventStore, IEventStorePlayer { private const string LoadAggregateEventsQueryTemplate = @"SELECT rev,pos,ts,data FROM {0} WHERE id = ?;"; + private const string LoadAggregateEventsAfterRevisionQueryTemplate = @"SELECT rev,pos,ts,data FROM {0} WHERE id = ? and rev > ?;"; private const string InsertEventsQueryTemplate = @"INSERT INTO {0} (id,rev,pos,ts,data) VALUES (?,?,?,?,?);"; private const string LoadEventsQueryTemplate = @"SELECT id,rev,pos,ts,data FROM {0};"; @@ -42,6 +43,7 @@ public class CassandraEventStore : IEventStore, IEventStorePlayer private PreparedStatement writeStatement; private PreparedStatement readStatement; + private PreparedStatement readAfterRevisionStatement; private PreparedStatement replayStatement; private PreparedStatement replayWithoutDataStatement; private PreparedStatement loadAggregateCommitsMetaStatement; @@ -117,6 +119,13 @@ public async Task LoadAsync(IBlobId aggregateId) return new EventStream(aggregateCommits); } + public async Task LoadAsync(IBlobId aggregateId, int afterRevision) + { + List aggregateCommits = await LoadAggregateCommitsAsync(aggregateId, afterRevision).ConfigureAwait(false); + + return new EventStream(aggregateCommits); + } + public async Task DeleteAsync(AggregateEventRaw eventRaw) { try @@ -139,11 +148,25 @@ public async Task DeleteAsync(AggregateEventRaw eventRaw) } } - private async Task> LoadAggregateCommitsAsync(IBlobId id) + private Task> LoadAggregateCommitsAsync(IBlobId id) + { + return LoadAggregateCommitsAsync(id, null); + } + + private async Task> LoadAggregateCommitsAsync(IBlobId id, int? afterRevision) { ISession session = await GetSessionAsync().ConfigureAwait(false); - PreparedStatement bs = await GetReadStatementAsync(session).ConfigureAwait(false); - BoundStatement boundStatement = bs.Bind(id.RawId); + BoundStatement boundStatement; + if (afterRevision.HasValue) + { + PreparedStatement ps = await GetReadAfterRevisionStatementAsync(session).ConfigureAwait(false); + boundStatement = ps.Bind(id.RawId, afterRevision.Value); + } + else + { + PreparedStatement ps = await GetReadStatementAsync(session).ConfigureAwait(false); + boundStatement = ps.Bind(id.RawId); + } var result = await session.ExecuteAsync(boundStatement).ConfigureAwait(false); @@ -430,7 +453,6 @@ private async Task GetReadStatementAsync(ISession session) { if (readStatement is null) { - string tableName = tableNameStrategy.GetName(); readStatement = await session.PrepareAsync(string.Format(LoadAggregateEventsQueryTemplate, tableName)).ConfigureAwait(false); readStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); @@ -439,6 +461,18 @@ private async Task GetReadStatementAsync(ISession session) return readStatement; } + private async Task GetReadAfterRevisionStatementAsync(ISession session) + { + if (readAfterRevisionStatement is null) + { + string tableName = tableNameStrategy.GetName(); + readAfterRevisionStatement = await session.PrepareAsync(string.Format(LoadAggregateEventsAfterRevisionQueryTemplate, tableName)).ConfigureAwait(false); + readAfterRevisionStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + } + + return readAfterRevisionStatement; + } + private async Task GetDeleteStatement(ISession session) { if (deleteStatement is null) diff --git a/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStoreSchema.cs b/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStoreSchema.cs index 500d3a8..01fae86 100644 --- a/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStoreSchema.cs +++ b/src/Elders.Cronus.Persistence.Cassandra/Preview/CassandraEventStoreSchema.cs @@ -2,8 +2,8 @@ using System.Linq; using System.Threading.Tasks; using Cassandra; -using Elders.Cronus.AtomicAction; using Elders.Cronus.Persistence.Cassandra.Counters; +using Elders.Cronus.Persistence.Cassandra.Snapshots; using Microsoft.Extensions.Logging; namespace Elders.Cronus.Persistence.Cassandra.Preview @@ -13,6 +13,7 @@ public class CassandraEventStoreSchema : ICassandraEventStoreSchema private static readonly ILogger logger = CronusLogger.CreateLogger(typeof(CassandraEventStoreSchema)); private const string CREATE_EVENTS_TABLE_TEMPLATE = @"CREATE TABLE IF NOT EXISTS ""{0}"" (id blob, ts bigint, rev int, pos int, data blob, PRIMARY KEY (id,rev,pos)) WITH CLUSTERING ORDER BY (rev ASC, pos ASC);"; + private const string CREATE_SNAPSHOTS_TABLE_TEMPLATE = @"CREATE TABLE IF NOT EXISTS ""{0}"" (id blob, rev int, data blob, PRIMARY KEY (id,rev)) WITH CLUSTERING ORDER BY (rev ASC);"; private const string INDEX_REV = @"CREATE INDEX IF NOT EXISTS ""{0}_idx_rev"" ON ""{0}"" (rev);"; private const string INDEX_POS = @"CREATE INDEX IF NOT EXISTS ""{0}_idx_pos"" ON ""{0}"" (pos);"; @@ -25,14 +26,16 @@ public class CassandraEventStoreSchema : ICassandraEventStoreSchema private readonly ICassandraProvider cassandraProvider; private readonly ITableNamingStrategy tableNameStrategy; + private readonly ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy; private Task GetSessionAsync() => cassandraProvider.GetSessionAsync();// In order to keep only 1 session alive (https://docs.datastax.com/en/developer/csharp-driver/3.16/faq/) - public CassandraEventStoreSchema(ICassandraProvider cassandraProvider, ITableNamingStrategy tableNameStrategy, ILock @lock) + public CassandraEventStoreSchema(ICassandraProvider cassandraProvider, ITableNamingStrategy tableNameStrategy, ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy) { if (cassandraProvider is null) throw new ArgumentNullException(nameof(cassandraProvider)); this.cassandraProvider = cassandraProvider; this.tableNameStrategy = tableNameStrategy ?? throw new ArgumentNullException(nameof(tableNameStrategy)); + this.snapshotsTableNamingStrategy = snapshotsTableNamingStrategy; } public Task CreateStorageAsync() @@ -55,9 +58,8 @@ public Task CreateEventsStorageAsync() public Task CreateSnapshotsStorageAsync() { - //var createSnapshotsTable = String.Format(CreateSnapshotsTableTemplate, tableNameStrategy.GetSnapshotsTableName()).ToLower(); - //session.Execute(createSnapshotsTable); - return Task.CompletedTask; + string tableName = snapshotsTableNamingStrategy.GetName(); + return CreateSnapshotPersistanseAsync(tableName); } public Task CreateIndeciesAsync() @@ -125,5 +127,33 @@ private async Task CreateEventStoragePersistanseAsync(string tableName) throw; } } + + private async Task CreateSnapshotPersistanseAsync(string tableName) + { + try + { + ISession session = await GetSessionAsync().ConfigureAwait(false); + + logger.Debug(() => $"[EventStore] Creating table `{tableName}` with `{session.Cluster.AllHosts().First().Address}` in keyspace `{session.Keyspace}`..."); + + string tableQuery = string.Format(CREATE_SNAPSHOTS_TABLE_TEMPLATE, tableName).ToLower(); + string rev = string.Format(INDEX_REV, tableName).ToLower(); + + PreparedStatement tableStatement = await session.PrepareAsync(string.Format(tableQuery, tableName).ToLower()).ConfigureAwait(false); + tableStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + + PreparedStatement revStatement = await session.PrepareAsync(string.Format(rev, tableName).ToLower()).ConfigureAwait(false); + tableStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + + await session.ExecuteAsync(tableStatement.Bind()).ConfigureAwait(false); + await session.ExecuteAsync(revStatement.Bind()).ConfigureAwait(false); + + logger.Debug(() => $"[EventStore] Created table `{tableName}` in keyspace `{session.Keyspace}`..."); + } + catch (Exception) + { + throw; + } + } } } diff --git a/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotReader.cs b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotReader.cs new file mode 100644 index 0000000..37cb161 --- /dev/null +++ b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotReader.cs @@ -0,0 +1,133 @@ +using System; +using System.IO; +using System.Linq; +using System.Threading.Tasks; +using Cassandra; +using Elders.Cronus.Persistence.Cassandra.Preview; +using Elders.Cronus.Snapshots.SnapshotStore; +using Microsoft.Extensions.Logging; + +namespace Elders.Cronus.Persistence.Cassandra.Snapshots +{ + public sealed class CassandraSnapshotReader : ISnapshotReader + { + private const string ReadLastSnapshotQueryTemplate = @"SELECT rev,data FROM {0} WHERE id = ? ORDER BY rev DESC LIMIT 1;"; + private const string ReadSnapshotByRevisionQueryTemplate = @"SELECT data FROM {0} WHERE id = ? AND rev = ?;"; + + private readonly ICassandraProvider cassandraProvider; + private readonly ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy; + private readonly ISerializer serializer; + private readonly ILogger logger; + + private PreparedStatement readLastStatement; + private PreparedStatement readByRevisionStatement; + + public CassandraSnapshotReader(ICassandraProvider cassandraProvider, ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy, ISerializer serializer, ILogger logger) + { + this.cassandraProvider = cassandraProvider; + this.snapshotsTableNamingStrategy = snapshotsTableNamingStrategy; + this.serializer = serializer; + this.logger = logger; + } + + public async Task ReadAsync(IBlobId id) + { + logger.Debug(() => "Reading last snapshot for aggregate {id}.", Convert.ToHexString(id.RawId)); + + try + { + var session = await cassandraProvider.GetSessionAsync().ConfigureAwait(false); + var preparedStatement = await GetReadLastStatementAsync(session).ConfigureAwait(false); + var boundStatement = preparedStatement.Bind(id.RawId); + var result = await session.ExecuteAsync(boundStatement).ConfigureAwait(false); + + var row = result.GetRows().FirstOrDefault(); + if (row is not null) + { + int revision = row.GetValue(CassandraColumn.Revision); + byte[] data = row.GetValue(CassandraColumn.Data); + object state = DeserializeState(data); + + return new Snapshot(id, revision, state); + } + + return null; + } + catch (WriteTimeoutException ex) + { + logger.WarnException(ex, () => "Read timeout while reading a snapshot for aggregate {id}.", Convert.ToHexString(id.RawId)); + } + catch (Exception ex) + { + logger.ErrorException(ex, () => "Failed read snapshot for aggregate {id}.", Convert.ToHexString(id.RawId)); + } + + return null; + } + + public async Task ReadAsync(IBlobId id, int revision) + { + logger.Debug(() => "Reading snapshot for aggregate {id} and revision {revision}.", id, revision); + + try + { + var session = await cassandraProvider.GetSessionAsync().ConfigureAwait(false); + var preparedStatement = await GetReadByRevisionStatementAsync(session).ConfigureAwait(false); + var boundStatement = preparedStatement.Bind(id.RawId, revision); + var result = await session.ExecuteAsync(boundStatement).ConfigureAwait(false); + + var row = result.GetRows().FirstOrDefault(); + if (row is not null) + { + byte[] data = row.GetValue(CassandraColumn.Data); + object state = DeserializeState(data); + + return new Snapshot(id, revision, (IAggregateRootState)state); + } + + return null; + } + catch (WriteTimeoutException ex) + { + logger.WarnException(ex, () => "Read timeout while reading a snapshot for aggregate {id} and revision {revision}.", id, revision); + } + catch (Exception ex) + { + logger.ErrorException(ex, () => "Failed read snapshot for aggregate {id} and revision {revision}.", id, revision); + } + + return null; + } + + private object DeserializeState(byte[] data) + { + using var stream = new MemoryStream(data); + var state = serializer.Deserialize(stream); + return state; + } + + private async Task GetReadLastStatementAsync(ISession session) + { + if (readLastStatement is null) + { + string tableName = snapshotsTableNamingStrategy.GetName(); + readLastStatement = await session.PrepareAsync(string.Format(ReadLastSnapshotQueryTemplate, tableName)).ConfigureAwait(false); + readLastStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + } + + return readLastStatement; + } + + private async Task GetReadByRevisionStatementAsync(ISession session) + { + if (readByRevisionStatement is null) + { + string tableName = snapshotsTableNamingStrategy.GetName(); + readByRevisionStatement = await session.PrepareAsync(string.Format(ReadSnapshotByRevisionQueryTemplate, tableName)).ConfigureAwait(false); + readByRevisionStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + } + + return readByRevisionStatement; + } + } +} diff --git a/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotWriter.cs b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotWriter.cs new file mode 100644 index 0000000..f635a35 --- /dev/null +++ b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/CassandraSnapshotWriter.cs @@ -0,0 +1,70 @@ +using System; +using System.IO; +using System.Threading.Tasks; +using Cassandra; +using Elders.Cronus.Snapshots.SnapshotStore; +using Microsoft.Extensions.Logging; + +namespace Elders.Cronus.Persistence.Cassandra.Snapshots +{ + public sealed class CassandraSnapshotWriter : ISnapshotWriter + { + private const string WriteSnapshotQueryTemplate = @"INSERT INTO {0} (id,rev,data) VALUES (?,?,?);"; + + private readonly ICassandraProvider cassandraProvider; + private readonly ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy; + private readonly ISerializer serializer; + private readonly ILogger logger; + + private PreparedStatement writeStatement; + + public CassandraSnapshotWriter(ICassandraProvider cassandraProvider, ISnapshotsTableNamingStrategy snapshotsTableNamingStrategy, ISerializer serializer, ILogger logger) + { + this.cassandraProvider = cassandraProvider; + this.snapshotsTableNamingStrategy = snapshotsTableNamingStrategy; + this.serializer = serializer; + this.logger = logger; + } + + public async Task WriteAsync(IBlobId id, int revision, object state) + { + logger.Debug(() => "Writing snapshot for aggregate {id} and revision {revision}.", Convert.ToHexString(id.RawId), revision); + + try + { + var session = await cassandraProvider.GetSessionAsync().ConfigureAwait(false); + var preparedStatement = await GetWriteStatementAsync(session).ConfigureAwait(false); + var data = SerializeState(state); + var boundStatement = preparedStatement.Bind(id.RawId, revision, data); + await session.ExecuteAsync(boundStatement).ConfigureAwait(false); + } + catch (WriteTimeoutException ex) + { + logger.WarnException(ex, () => "Write timeout while persisting a snapshot for aggregate {id}.", Convert.ToHexString(id.RawId)); + } + catch (Exception ex) + { + logger.ErrorException(ex, () => "Failed persisting snapshot for aggregate {id}.", Convert.ToHexString(id.RawId)); + } + } + + private byte[] SerializeState(object state) + { + using var stream = new MemoryStream(); + serializer.Serialize(stream, state); + return stream.ToArray(); + } + + private async Task GetWriteStatementAsync(ISession session) + { + if (writeStatement is null) + { + string tableName = snapshotsTableNamingStrategy.GetName(); + writeStatement = await session.PrepareAsync(string.Format(WriteSnapshotQueryTemplate, tableName)).ConfigureAwait(false); + writeStatement.SetConsistencyLevel(ConsistencyLevel.LocalQuorum); + } + + return writeStatement; + } + } +} diff --git a/src/Elders.Cronus.Persistence.Cassandra/Snapshots/ISnapshotsTableNamingStrategy.cs b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/ISnapshotsTableNamingStrategy.cs new file mode 100644 index 0000000..fc7f9c4 --- /dev/null +++ b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/ISnapshotsTableNamingStrategy.cs @@ -0,0 +1,7 @@ +namespace Elders.Cronus.Persistence.Cassandra.Snapshots +{ + public interface ISnapshotsTableNamingStrategy + { + string GetName(); + } +} diff --git a/src/Elders.Cronus.Persistence.Cassandra/Snapshots/SnapshotsTablePerBoundedContext.cs b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/SnapshotsTablePerBoundedContext.cs new file mode 100644 index 0000000..0d41562 --- /dev/null +++ b/src/Elders.Cronus.Persistence.Cassandra/Snapshots/SnapshotsTablePerBoundedContext.cs @@ -0,0 +1,19 @@ +using Microsoft.Extensions.Options; + +namespace Elders.Cronus.Persistence.Cassandra.Snapshots +{ + public sealed class SnapshotsTablePerBoundedContext : ISnapshotsTableNamingStrategy + { + private readonly BoundedContext boundedContext; + + public SnapshotsTablePerBoundedContext(IOptionsMonitor boundedContext) + { + this.boundedContext = boundedContext.CurrentValue; + } + + public string GetName() + { + return $"{boundedContext.Name}Snapshots".ToLower(); + } + } +}