-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathEventStoreClient.fs
95 lines (67 loc) · 2.9 KB
/
EventStoreClient.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
module EventStoreClient
open EventStore.ClientAPI
open System
open System.Text
type ProcessEventType = EventStorePersistentSubscriptionBase -> ResolvedEvent -> unit
type SubscriptionDroppedType = EventStorePersistentSubscriptionBase -> SubscriptionDropReason -> exn -> unit
// Change all this
let mutable username:string = ""
let mutable password:string = ""
let mutable address:string = ""
// And this
let eventStoreSetup name pw ad =
username <- name
password <- pw
address <- ad
let createConnection () =
let uri = Uri(sprintf "tcp://%s:%s@%s" username password address)
let conn = EventStoreConnection.Create(uri)
conn.ConnectAsync().Wait()
conn
// Persistent Subscriptions
let mutable psStream:string = ""
let mutable psGroup:string = ""
let mutable psProcess:ProcessEventType = (fun _ _ -> ())
let mutable psDropped:SubscriptionDroppedType = (fun _ _ _ -> ())
let connectToPersistentSubscription (stream:string) (group:string) (pro:ProcessEventType) (sub:SubscriptionDroppedType) =
psStream <- stream
psGroup <- group
psProcess <- pro
psDropped <- sub
let conn = createConnection()
conn.ConnectToPersistentSubscription(stream, group, pro, sub) |> ignore
let reconnectToPersistentSubscription() =
let conn = createConnection()
conn.ConnectToPersistentSubscription(psStream, psGroup, psProcess, psDropped) |> ignore
let connectToPersistentSubscriptionStream stream = connectToPersistentSubscription stream
let andGroupName group f = f group
let whenEventAppears (processEvent:ProcessEventType) f = f processEvent
let ifSubscriptionDrops (s:SubscriptionDroppedType) f = f s
// Read events forwards
let readEventsForwards count streamName start =
let connection = createConnection()
let events = connection.ReadStreamEventsForwardAsync(streamName, (int64 start), count, true)
|> Async.AwaitTask
|> Async.RunSynchronously
connection.Dispose() |> ignore
events
let readForwardAndGet count = readEventsForwards count
let eventsFromStream s f = f s
let startingAtEvent c f = f c
// Append events to stream
let appendToStreamAsync (connection:IEventStoreConnection) (eventType:string) (streamName:string) (eventData:string) (metaData:string) =
let eventBytes = Encoding.ASCII.GetBytes eventData
let eventMetaBytes = Encoding.ASCII.GetBytes metaData
let event = EventData(Guid.NewGuid(), eventType, true, eventBytes, eventMetaBytes)
connection.AppendToStreamAsync(streamName, (int64 -2), event)
|> Async.AwaitTask
|> Async.RunSynchronously
|> ignore
connection.Dispose() |> ignore
event.EventId
let writeEventOfType (eventType:string) =
let connection = createConnection()
appendToStreamAsync connection eventType
let toStream stream appendToStreamAsync = appendToStreamAsync stream
let withEventData data appendToStreamAsync = appendToStreamAsync data
let andMetaData data appendToStreamAsync = appendToStreamAsync data