forked from benbjohnson/litestream
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathstore.go
More file actions
256 lines (214 loc) · 6.86 KB
/
store.go
File metadata and controls
256 lines (214 loc) · 6.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
package litestream
import (
"context"
"errors"
"fmt"
"log/slog"
"slices"
"sync"
"time"
"github.com/superfly/ltx"
)
var (
// ErrNoCompaction is returned when no new files are available from the previous level.
ErrNoCompaction = errors.New("no compaction")
// ErrCompactionTooEarly is returned when a compaction is attempted too soon
// since the last compaction time. This is used to prevent frequent
// re-compaction when restarting the process.
ErrCompactionTooEarly = errors.New("compaction too early")
// ErrTxNotAvailable is returned when a transaction does not exist.
ErrTxNotAvailable = errors.New("transaction not available")
)
// Store defaults
const (
DefaultSnapshotInterval = 24 * time.Hour
DefaultSnapshotRetention = 24 * time.Hour
DefaultRetention = 24 * time.Hour
DefaultRetentionCheckInterval = 1 * time.Hour
)
// Store represents the top-level container for databases.
//
// It manages async background tasks like compactions so that the system
// is not overloaded by too many concurrent tasks.
type Store struct {
mu sync.Mutex
dbs []*DB
levels CompactionLevels
wg sync.WaitGroup
ctx context.Context
cancel func()
// The frequency of snapshots.
SnapshotInterval time.Duration
// The duration of time that snapshots are kept before being deleted.
SnapshotRetention time.Duration
// If true, compaction is run in the background according to compaction levels.
CompactionMonitorEnabled bool
}
func NewStore(dbs []*DB, levels CompactionLevels) *Store {
s := &Store{
dbs: dbs,
levels: levels,
SnapshotInterval: DefaultSnapshotInterval,
SnapshotRetention: DefaultSnapshotRetention,
CompactionMonitorEnabled: true,
}
s.ctx, s.cancel = context.WithCancel(context.Background())
return s
}
func (s *Store) Open(ctx context.Context) error {
if err := s.levels.Validate(); err != nil {
return err
}
for _, db := range s.dbs {
if err := db.Open(); err != nil {
return err
}
}
// Start monitors for compactions & snapshots.
if s.CompactionMonitorEnabled {
// Start compaction monitors for all levels except L0.
for _, lvl := range s.levels {
lvl := lvl
if lvl.Level == 0 {
continue
}
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorCompactionLevel(s.ctx, lvl)
}()
}
// Start snapshot monitor for snapshots.
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.monitorCompactionLevel(s.ctx, s.SnapshotLevel())
}()
}
return nil
}
func (s *Store) Close(ctx context.Context) (err error) {
for _, db := range s.dbs {
if e := db.Close(ctx); e != nil && err == nil {
err = e
}
}
// Cancel and wait for background tasks to complete.
s.cancel()
s.wg.Wait()
return err
}
func (s *Store) DBs() []*DB {
s.mu.Lock()
defer s.mu.Unlock()
return slices.Clone(s.dbs)
}
// SnapshotLevel returns a pseudo compaction level based on snapshot settings.
func (s *Store) SnapshotLevel() *CompactionLevel {
return &CompactionLevel{
Level: SnapshotLevel,
Interval: s.SnapshotInterval,
}
}
func (s *Store) monitorCompactionLevel(ctx context.Context, lvl *CompactionLevel) {
slog.Info("starting compaction monitor", "level", lvl.Level, "interval", lvl.Interval)
// Start first compaction immediately to check for any missed compactions from shutdown
timer := time.NewTimer(time.Nanosecond)
LOOP:
for {
select {
case <-ctx.Done():
timer.Stop()
break LOOP
case <-timer.C:
// Reset timer before we start compactions so we don't delay it
// from long compactions.
timer = time.NewTimer(time.Until(lvl.NextCompactionAt(time.Now())))
for _, db := range s.DBs() {
// First attempt to compact the database.
if _, err := s.CompactDB(ctx, db, lvl); errors.Is(err, ErrNoCompaction) {
slog.Debug("no compaction", "level", lvl.Level, "path", db.Path())
continue
} else if errors.Is(err, ErrCompactionTooEarly) {
slog.Debug("recently compacted, skipping", "level", lvl.Level, "path", db.Path())
continue
} else if err != nil {
slog.Error("compaction failed", "level", lvl.Level, "error", err)
time.Sleep(1 * time.Second) // wait so we don't rack up S3 charges
}
// Each time we snapshot, clean up everything before the oldest snapshot.
if lvl.Level == SnapshotLevel {
if err := s.EnforceSnapshotRetention(ctx, db); err != nil {
slog.Error("retention enforcement failed", "error", err)
time.Sleep(1 * time.Second) // wait so we don't rack up S3 charges
}
}
}
}
}
}
// CompactDB performs a compaction or snapshot for a given database on a single destination level.
// This function will only proceed if a compaction has not occurred before the last compaction time.
func (s *Store) CompactDB(ctx context.Context, db *DB, lvl *CompactionLevel) (*ltx.FileInfo, error) {
dstLevel := lvl.Level
// Ensure we are not re-compacting before the most recent compaction time.
prevCompactionAt := lvl.PrevCompactionAt(time.Now())
dstInfo, err := db.MaxLTXFileInfo(ctx, dstLevel)
if err != nil {
return nil, fmt.Errorf("fetch dst level info: %w", err)
} else if dstInfo.CreatedAt.After(prevCompactionAt) {
return nil, ErrCompactionTooEarly
}
// Shortcut if this is a snapshot since we are not pulling from a previous level.
if dstLevel == SnapshotLevel {
info, err := db.Snapshot(ctx)
if err != nil {
return info, err
}
slog.InfoContext(ctx, "snapshot complete", "txid", info.MaxTXID.String(), "size", info.Size)
return info, nil
}
// Fetch latest LTX files for both the source & destination so we can see if we need to make progress.
srcLevel := s.levels.PrevLevel(dstLevel)
srcInfo, err := db.MaxLTXFileInfo(ctx, srcLevel)
if err != nil {
return nil, fmt.Errorf("fetch src level info: %w", err)
}
// Skip if there are no new files to compact.
if srcInfo.MaxTXID <= dstInfo.MinTXID {
return nil, ErrNoCompaction
}
info, err := db.Compact(ctx, dstLevel)
if err != nil {
return info, err
}
slog.InfoContext(ctx, "compaction complete",
"level", dstLevel,
slog.Group("txid",
"min", info.MinTXID.String(),
"max", info.MaxTXID.String(),
),
"size", info.Size,
)
return info, nil
}
// EnforceSnapshotRetention removes old snapshots by timestamp and then
// cleans up all lower levels based on minimum snapshot TXID.
func (s *Store) EnforceSnapshotRetention(ctx context.Context, db *DB) error {
// Enforce retention for the snapshot level.
minSnapshotTXID, err := db.EnforceSnapshotRetention(ctx, time.Now().Add(-s.SnapshotRetention))
if err != nil {
return fmt.Errorf("enforce snapshot retention: %w", err)
}
// We should also enforce retention for L0 on the same schedule as L1.
for _, lvl := range s.levels {
// Skip L0 since it is enforced on a more frequent basis.
if lvl.Level == 0 {
continue
}
if err := db.EnforceRetentionByTXID(ctx, lvl.Level, minSnapshotTXID); err != nil {
return fmt.Errorf("enforce L%d retention: %w", lvl.Level, err)
}
}
return nil
}