forked from rbaliyan/ledger
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstream.go
More file actions
176 lines (158 loc) · 5.72 KB
/
stream.go
File metadata and controls
176 lines (158 loc) · 5.72 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
package ledger
import (
"context"
"fmt"
"time"
)
// Entry is a typed entry read back from a stream.
type Entry[I comparable, T any] struct {
ID I // Store-assigned unique ID.
Stream string // Stream name this entry belongs to.
Payload T // Decoded payload.
OrderKey string // Ordering key.
DedupKey string // Deduplication key.
SchemaVersion int // Schema version at write time (before upcasting).
Metadata map[string]string // Immutable key-value metadata (set at append).
Tags []string // Mutable tags (updated via SetTags).
Annotations map[string]string // Mutable annotations (updated via SetAnnotations).
CreatedAt time.Time // Timestamp when the entry was stored.
UpdatedAt *time.Time // Timestamp of last tag/annotation update.
}
// AppendInput describes an entry to append to a stream.
type AppendInput[T any] struct {
Payload T // Payload to encode and store.
OrderKey string // Ordering key for filtering (e.g., aggregate ID).
DedupKey string // Deduplication key. Empty means no dedup.
Metadata map[string]string // Immutable key-value metadata.
Tags []string // Initial tags (can be updated later via SetTags).
}
// options configures a Stream.
type options struct {
codec Codec
schemaVersion int
upcasters []Upcaster
}
// Option configures a Stream.
type Option func(*options)
// WithCodec sets the codec used to encode/decode payloads. Defaults to JSONCodec.
func WithCodec(c Codec) Option {
return func(o *options) { o.codec = c }
}
// WithSchemaVersion sets the schema version stamped on new entries.
// Defaults to 1. When reading entries with older versions, registered
// upcasters are applied to transform the payload before decoding.
func WithSchemaVersion(v int) Option {
return func(o *options) {
if v > 0 {
o.schemaVersion = v
}
}
}
// WithUpcaster registers an upcaster for transforming entries from one
// schema version to the next. Register in sequence (v1→v2, v2→v3).
func WithUpcaster(u Upcaster) Option {
return func(o *options) { o.upcasters = append(o.upcasters, u) }
}
// Stream is a lightweight, typed handle to a named stream in a store.
// It is cheap to create — create one per operation and discard it.
// Stream is safe for concurrent use.
type Stream[I comparable, T any] struct {
name string
store Store[I]
codec Codec
schemaVersion int
upcasters []Upcaster
}
// NewStream creates a lightweight stream handle. The stream does not need to
// exist in the store beforehand — it is created implicitly on first append.
//
// Panics if store is nil.
func NewStream[I comparable, T any](store Store[I], name string, opts ...Option) Stream[I, T] {
if store == nil {
panic("ledger: NewStream called with nil store")
}
o := options{
codec: JSONCodec{},
schemaVersion: 1,
}
for _, fn := range opts {
fn(&o)
}
return Stream[I, T]{
name: name,
store: store,
codec: o.codec,
schemaVersion: o.schemaVersion,
upcasters: o.upcasters,
}
}
// Name returns the stream name.
func (s Stream[I, T]) Name() string { return s.name }
// SchemaVersion returns the current schema version used for new entries.
func (s Stream[I, T]) SchemaVersion() int { return s.schemaVersion }
// Append encodes and appends entries to the stream. Returns IDs of newly appended entries.
// Each entry is stamped with the stream's current schema version.
// Entries with duplicate dedup keys are silently skipped.
func (s Stream[I, T]) Append(ctx context.Context, entries ...AppendInput[T]) ([]I, error) {
raw := make([]RawEntry, len(entries))
for i, e := range entries {
data, err := s.codec.Encode(e.Payload)
if err != nil {
return nil, fmt.Errorf("%w: %v", ErrEncode, err)
}
raw[i] = RawEntry{
Payload: data,
OrderKey: e.OrderKey,
DedupKey: e.DedupKey,
SchemaVersion: s.schemaVersion,
Metadata: e.Metadata,
Tags: e.Tags,
}
}
return s.store.Append(ctx, s.name, raw...)
}
// Read returns decoded entries from the stream. Entries written with an older
// schema version are automatically upcasted to the current version before
// decoding into T.
func (s Stream[I, T]) Read(ctx context.Context, opts ...ReadOption) ([]Entry[I, T], error) {
stored, err := s.store.Read(ctx, s.name, opts...)
if err != nil {
return nil, err
}
entries := make([]Entry[I, T], len(stored))
for i, se := range stored {
payload := se.Payload
if se.SchemaVersion > 0 && se.SchemaVersion < s.schemaVersion {
payload, err = upcastChain(ctx, payload, se.SchemaVersion, s.schemaVersion, s.upcasters)
if err != nil {
return nil, fmt.Errorf("entry %v: %w", se.ID, err)
}
}
var decoded T
if err := s.codec.Decode(payload, &decoded); err != nil {
return nil, fmt.Errorf("%w: %v", ErrDecode, err)
}
entries[i] = Entry[I, T]{
ID: se.ID,
Stream: se.Stream,
Payload: decoded,
OrderKey: se.OrderKey,
DedupKey: se.DedupKey,
SchemaVersion: se.SchemaVersion,
Metadata: se.Metadata,
Tags: se.Tags,
Annotations: se.Annotations,
CreatedAt: se.CreatedAt,
UpdatedAt: se.UpdatedAt,
}
}
return entries, nil
}
// SetTags replaces all tags on an entry in this stream.
func (s Stream[I, T]) SetTags(ctx context.Context, id I, tags []string) error {
return s.store.SetTags(ctx, s.name, id, tags)
}
// SetAnnotations merges annotations into an entry in this stream.
func (s Stream[I, T]) SetAnnotations(ctx context.Context, id I, annotations map[string]*string) error {
return s.store.SetAnnotations(ctx, s.name, id, annotations)
}