27
27
using System . Threading ;
28
28
using System . Threading . Tasks ;
29
29
using EventFlow . Aggregates ;
30
+ using EventFlow . Core ;
30
31
using EventFlow . Exceptions ;
31
32
using EventFlow . Logs ;
32
33
using EventStore . ClientAPI ;
33
34
using EventStore . ClientAPI . Exceptions ;
34
35
35
36
namespace EventFlow . EventStores . EventStore
36
37
{
37
- public class EventStoreEventStore : EventStoreBase
38
+ public class EventStoreEventPersistence : IEventPersistence
38
39
{
40
+ private readonly ILog _log ;
39
41
private readonly IEventStoreConnection _connection ;
40
42
41
43
private class EventStoreEvent : ICommittedDomainEvent
42
44
{
43
45
public string AggregateId { get ; set ; }
44
- public string AggregateName { get ; set ; }
45
46
public string Data { get ; set ; }
46
47
public string Metadata { get ; set ; }
47
48
public int AggregateSequenceNumber { get ; set ; }
48
49
}
49
50
50
- public EventStoreEventStore (
51
+ public EventStoreEventPersistence (
51
52
ILog log ,
52
- IAggregateFactory aggregateFactory ,
53
- IEventJsonSerializer eventJsonSerializer ,
54
- IEventUpgradeManager eventUpgradeManager ,
55
- IEventStoreConnection connection ,
56
- IEnumerable < IMetadataProvider > metadataProviders )
57
- : base ( log , aggregateFactory , eventJsonSerializer , eventUpgradeManager , metadataProviders )
53
+ IEventStoreConnection connection )
58
54
{
55
+ _log = log ;
59
56
_connection = connection ;
60
57
}
61
58
62
- protected override async Task < AllCommittedEventsPage > LoadAllCommittedDomainEvents (
59
+ public async Task < AllCommittedEventsPage > LoadAllCommittedEvents (
63
60
GlobalPosition globalPosition ,
64
61
int pageSize ,
65
62
CancellationToken cancellationToken )
@@ -104,19 +101,17 @@ private static Position ParsePosition(GlobalPosition globalPosition)
104
101
return new Position ( commitPosition , preparePosition ) ;
105
102
}
106
103
107
- protected override async Task < IReadOnlyCollection < ICommittedDomainEvent > > CommitEventsAsync < TAggregate , TIdentity > (
108
- TIdentity id ,
104
+ public async Task < IReadOnlyCollection < ICommittedDomainEvent > > CommitEventsAsync (
105
+ IIdentity id ,
109
106
IReadOnlyCollection < SerializedEvent > serializedEvents ,
110
107
CancellationToken cancellationToken )
111
108
{
112
- var aggregateName = typeof ( TAggregate ) . Name ;
113
109
var committedDomainEvents = serializedEvents
114
110
. Select ( e => new EventStoreEvent
115
111
{
116
112
AggregateSequenceNumber = e . AggregateSequenceNumber ,
117
113
Metadata = e . SerializedMetadata ,
118
114
AggregateId = id . Value ,
119
- AggregateName = aggregateName ,
120
115
Data = e . SerializedData
121
116
} )
122
117
. ToList ( ) ;
@@ -126,7 +121,7 @@ protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> Commit
126
121
. Select ( e =>
127
122
{
128
123
var guid = Guid . Parse ( e . Metadata [ "guid" ] ) ;
129
- var eventType = string . Format ( "{0}.{1}.{2}" , aggregateName , e . Metadata . EventName , e . Metadata . EventVersion ) ;
124
+ var eventType = string . Format ( "{0}.{1}.{2}" , e . Metadata [ MetadataKeys . AggregateName ] , e . Metadata . EventName , e . Metadata . EventVersion ) ;
130
125
var data = Encoding . UTF8 . GetBytes ( e . SerializedData ) ;
131
126
var meta = Encoding . UTF8 . GetBytes ( e . SerializedMetadata ) ;
132
127
return new EventData ( guid , eventType , true , data , meta ) ;
@@ -142,9 +137,9 @@ protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> Commit
142
137
{
143
138
await transaction . WriteAsync ( eventDatas ) . ConfigureAwait ( false ) ;
144
139
var writeResult = await transaction . CommitAsync ( ) . ConfigureAwait ( false ) ;
145
- Log . Verbose (
146
- "Wrote aggregate {0} with version {1} ({2},{3})" ,
147
- aggregateName ,
140
+ _log . Verbose (
141
+ "Wrote entity {0} with version {1} ({2},{3})" ,
142
+ id ,
148
143
writeResult . NextExpectedVersion - 1 ,
149
144
writeResult . LogPosition . CommitPosition ,
150
145
writeResult . LogPosition . PreparePosition ) ;
@@ -158,8 +153,8 @@ protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> Commit
158
153
return committedDomainEvents ;
159
154
}
160
155
161
- protected override async Task < IReadOnlyCollection < ICommittedDomainEvent > > LoadCommittedEventsAsync < TAggregate , TIdentity > (
162
- TIdentity id ,
156
+ public async Task < IReadOnlyCollection < ICommittedDomainEvent > > LoadCommittedEventsAsync (
157
+ IIdentity id ,
163
158
CancellationToken cancellationToken )
164
159
{
165
160
var streamEvents = new List < ResolvedEvent > ( ) ;
@@ -182,9 +177,7 @@ protected override async Task<IReadOnlyCollection<ICommittedDomainEvent>> LoadCo
182
177
return Map ( streamEvents ) ;
183
178
}
184
179
185
- public override Task DeleteAggregateAsync < TAggregate , TIdentity > (
186
- TIdentity id ,
187
- CancellationToken cancellationToken )
180
+ public Task DeleteEventsAsync ( IIdentity id , CancellationToken cancellationToken )
188
181
{
189
182
return _connection . DeleteStreamAsync ( id . Value , ExpectedVersion . Any ) ;
190
183
}
@@ -197,10 +190,9 @@ private static IReadOnlyCollection<EventStoreEvent> Map(IEnumerable<ResolvedEven
197
190
AggregateSequenceNumber = e . Event . EventNumber + 1 ,
198
191
Metadata = Encoding . UTF8 . GetString ( e . Event . Metadata ) ,
199
192
AggregateId = e . OriginalStreamId ,
200
- AggregateName = e . Event . EventType . Split ( '.' ) [ 0 ] ,
201
193
Data = Encoding . UTF8 . GetString ( e . Event . Data ) ,
202
194
} )
203
195
. ToList ( ) ;
204
196
}
205
197
}
206
- }
198
+ }
0 commit comments