Skip to content

Commit 6da801f

Browse files
authored
feat(subscription): re-enable Volatile subscriptions (#55)
* feat(eventstore): add SequenceNumberGetter interface * feat(subscription): re-enable Volatile subscription
1 parent 47f3be0 commit 6da801f

File tree

6 files changed

+240
-173
lines changed

6 files changed

+240
-173
lines changed

eventstore/inmemory/store.go

Lines changed: 24 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ import (
1010
"github.com/get-eventually/go-eventually/eventstore/stream"
1111
)
1212

13-
var _ eventstore.Store = &EventStore{}
13+
var (
14+
_ eventstore.Store = &EventStore{}
15+
_ eventstore.SequenceNumberGetter = &EventStore{}
16+
)
1417

1518
// EventStore is an in-memory eventstore.Store implementation.
1619
type EventStore struct {
@@ -29,6 +32,14 @@ func NewEventStore() *EventStore {
2932
}
3033
}
3134

35+
func contextErr(ctx context.Context) error {
36+
if err := ctx.Err(); err != nil {
37+
return fmt.Errorf("inmemory.EventStore: context done: %w", err)
38+
}
39+
40+
return nil
41+
}
42+
3243
func (s *EventStore) streamAll(ctx context.Context, es eventstore.EventStream, selectt eventstore.Select) error {
3344
for _, event := range s.events {
3445
if event.SequenceNumber < selectt.From {
@@ -162,6 +173,12 @@ func (s *EventStore) Stream(
162173
}
163174
}
164175

176+
func (s *EventStore) ensureMapsAreCreated(typ string) {
177+
if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil {
178+
s.byTypeAndInstance[typ] = make(map[string][]int)
179+
}
180+
}
181+
165182
// Append inserts the specified Domain Events into the Event Stream specified
166183
// by the current instance, returning the new version of the Event Stream.
167184
//
@@ -223,16 +240,11 @@ func (s *EventStore) Append(
223240
return lastCommittedEvent.Version, nil
224241
}
225242

226-
func (s *EventStore) ensureMapsAreCreated(typ string) {
227-
if v, ok := s.byTypeAndInstance[typ]; !ok || v == nil {
228-
s.byTypeAndInstance[typ] = make(map[string][]int)
229-
}
230-
}
231-
232-
func contextErr(ctx context.Context) error {
233-
if err := ctx.Err(); err != nil {
234-
return fmt.Errorf("inmemory.EventStore: context done: %w", err)
235-
}
243+
// LatestSequenceNumber returns the size of the internal Event Log.
244+
// This method never fails.
245+
func (s *EventStore) LatestSequenceNumber(ctx context.Context) (int64, error) {
246+
s.mx.RLock()
247+
defer s.mx.RUnlock()
236248

237-
return nil
249+
return int64(len(s.events)), nil
238250
}

eventstore/store.go

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -107,3 +107,11 @@ type Streamer interface {
107107
// Stream opens one or more Event Streams as specified by the provided Event Stream target.
108108
Stream(ctx context.Context, es EventStream, target stream.Target, selectt Select) error
109109
}
110+
111+
// SequenceNumberGetter is an Event Store trait that is used to interact with
112+
// sequence numbers (e.g. queries).
113+
type SequenceNumberGetter interface {
114+
// LatestSequenceNumber should return the latest, global Sequence Number
115+
// registered by the Event Store.
116+
LatestSequenceNumber(ctx context.Context) (int64, error)
117+
}

subscription/catchup.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,11 @@ func (s *CatchUp) Start(ctx context.Context, eventStream eventstore.EventStream)
8282
for {
8383
select {
8484
case <-ctx.Done():
85-
return ctx.Err()
85+
if err := ctx.Err(); err != nil {
86+
return fmt.Errorf("subscription.CatchUp: context error: %w", err)
87+
}
88+
89+
return nil
8690

8791
case <-time.After(b.NextBackOff()):
8892
sequenceNumber, err := s.catchUp(ctx, eventStream, lastSequenceNumber)

subscription/catchup_test.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"time"
99

1010
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
1112
"github.com/stretchr/testify/suite"
1213
"go.uber.org/zap"
1314

@@ -25,14 +26,14 @@ func TestCatchUp(t *testing.T) {
2526
s := new(CatchUpSuite)
2627

2728
logger, err := zap.NewDevelopment()
28-
assert.NoError(t, err)
29+
require.NoError(t, err)
2930

3031
s.makeSubscription = func(store eventstore.Store) subscription.Subscription {
3132
return &subscription.CatchUp{
3233
SubscriptionName: t.Name(),
3334
Checkpointer: checkpoint.NopCheckpointer,
3435
Target: stream.All{},
35-
Logger: zaplogger.Wrap(logger),
36+
Logger: zaplogger.Wrap(logger.With(zap.String("test", t.Name()))),
3637
EventStore: store,
3738
PullEvery: 10 * time.Millisecond,
3839
MaxInterval: 50 * time.Millisecond,

subscription/volatile.go

Lines changed: 84 additions & 54 deletions
Original file line numberDiff line numberDiff line change
@@ -1,56 +1,86 @@
11
package subscription
22

3-
// NOTE(ar3s3ru): Volatile subscriptions are currently disabled, since the
4-
// support for Subscriptions is currently being deprecated.
5-
6-
// import (
7-
// "context"
8-
// "fmt"
9-
10-
// "github.com/get-eventually/go-eventually/eventstore"
11-
// )
12-
13-
// var _ Subscription = Volatile{}
14-
15-
// // Volatile is a Subscription type that does not keep state of
16-
// // the last Event processed or received, nor survives the Subscription
17-
// // checkpoint between restarts.
18-
// //
19-
// // Use this Subscription type for volatile processes, such as projecting
20-
// // realtime metrics, or when you're only interested in newer events
21-
// // committed to the Event Store.
22-
// type Volatile struct {
23-
// SubscriptionName string
24-
// Target TargetStream
25-
// EventStore eventstore.Subscriber
26-
// }
27-
28-
// // Name is the name of the subscription.
29-
// func (v Volatile) Name() string { return v.SubscriptionName }
30-
31-
// // Start starts the Subscription by opening a subscribing Event Stream
32-
// // using the subscription's Subscriber instance.
33-
// func (v Volatile) Start(ctx context.Context, stream eventstore.EventStream) error {
34-
// var err error
35-
36-
// switch t := v.Target.(type) {
37-
// case TargetStreamAll:
38-
// err = v.EventStore.SubscribeToAll(ctx, stream)
39-
// case TargetStreamType:
40-
// err = v.EventStore.SubscribeToType(ctx, stream, t.Type)
41-
// default:
42-
// return fmt.Errorf("subscription.Volatile: unexpected target type")
43-
// }
44-
45-
// if err != nil {
46-
// return fmt.Errorf("subscription.Volatile: event subscriber exited with error: %w", err)
47-
// }
48-
49-
// return nil
50-
// }
51-
52-
// // Checkpoint is a no-op operation, since the transient nature of the
53-
// // Subscription does not require to persist its current state.
54-
// func (Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error {
55-
// return nil
56-
// }
3+
import (
4+
"context"
5+
"fmt"
6+
"time"
7+
8+
"github.com/get-eventually/go-eventually/eventstore"
9+
"github.com/get-eventually/go-eventually/eventstore/stream"
10+
"github.com/get-eventually/go-eventually/logger"
11+
"github.com/get-eventually/go-eventually/subscription/checkpoint"
12+
)
13+
14+
var _ Subscription = &Volatile{}
15+
16+
// Volatile is a Subscription type that does not keep state of
17+
// the last Event processed or received, nor survives the Subscription
18+
// checkpoint between restarts.
19+
//
20+
// Use this Subscription type for volatile processes, such as projecting
21+
// realtime metrics, or when you're only interested in newer events
22+
// committed to the Event Store.
23+
type Volatile struct {
24+
SubscriptionName string
25+
Target stream.Target
26+
Logger logger.Logger
27+
EventStore interface {
28+
eventstore.Streamer
29+
eventstore.SequenceNumberGetter
30+
}
31+
32+
// PullEvery is the minimum interval between each streaming call to the Event Store.
33+
//
34+
// Defaults to DefaultPullInterval if unspecified or negative value
35+
// has been provided.
36+
PullEvery time.Duration
37+
38+
// MaxInterval is the maximum interval between each streaming call to the Event Store.
39+
// Use this value to ensure a specific eventual consistency window.
40+
//
41+
// Defaults to DefaultMaxPullInterval if unspecified or negative value
42+
// has been provided.
43+
MaxInterval time.Duration
44+
45+
// BufferSize is the size of buffered channels used as EventStreams
46+
// by the Subscription when receiving Events from the Event Store.
47+
//
48+
// Defaults to DefaultPullCatchUpBufferSize if unspecified or a negative
49+
// value has been provided.
50+
BufferSize int
51+
}
52+
53+
// Name is the name of the subscription.
54+
func (v *Volatile) Name() string { return v.SubscriptionName }
55+
56+
// Start starts the Subscription by opening a subscribing Event Stream
57+
// using the subscription's Subscriber instance.
58+
func (v *Volatile) Start(ctx context.Context, es eventstore.EventStream) error {
59+
latestSequenceNumber, err := v.EventStore.LatestSequenceNumber(ctx)
60+
if err != nil {
61+
return fmt.Errorf("subscription.Volatile: failed to get latest sequence number from event store: %w", err)
62+
}
63+
64+
catchUpSubscription := &CatchUp{
65+
SubscriptionName: v.SubscriptionName,
66+
Target: v.Target,
67+
EventStore: v.EventStore,
68+
Checkpointer: checkpoint.FixedCheckpointer{StartingFrom: latestSequenceNumber},
69+
Logger: v.Logger,
70+
PullEvery: v.PullEvery,
71+
MaxInterval: v.MaxInterval,
72+
BufferSize: v.BufferSize,
73+
}
74+
75+
if err := catchUpSubscription.Start(ctx, es); err != nil {
76+
return fmt.Errorf("subscription.Volatile: internal catch-up subscription exited with error: %w", err)
77+
}
78+
79+
return nil
80+
}
81+
82+
// Checkpoint is a no-op operation, since the transient nature of the
83+
// Subscription does not require to persist its current state.
84+
func (*Volatile) Checkpoint(ctx context.Context, event eventstore.Event) error {
85+
return nil
86+
}

0 commit comments

Comments
 (0)