diff --git a/pkg/vm/engine/tae/db/checkpoint/entry.go b/pkg/vm/engine/tae/db/checkpoint/entry.go index 43608dc82198d..fc4051f53521c 100644 --- a/pkg/vm/engine/tae/db/checkpoint/entry.go +++ b/pkg/vm/engine/tae/db/checkpoint/entry.go @@ -60,6 +60,11 @@ func NewCheckpointEntry(sid string, start, end types.TS, typ EntryType) *Checkpo } } +// e.start >= o.end +func (e *CheckpointEntry) AllGE(o *CheckpointEntry) bool { + return e.start.GE(&o.end) +} + func (e *CheckpointEntry) SetVersion(version uint32) { e.Lock() defer e.Unlock() @@ -111,8 +116,8 @@ func (e *CheckpointEntry) HasOverlap(from, to types.TS) bool { } return true } -func (e *CheckpointEntry) LessEq(ts types.TS) bool { - return e.end.LE(&ts) +func (e *CheckpointEntry) LessEq(ts *types.TS) bool { + return e.end.LE(ts) } func (e *CheckpointEntry) SetLocation(cn, tn objectio.Location) { e.Lock() diff --git a/pkg/vm/engine/tae/db/checkpoint/info.go b/pkg/vm/engine/tae/db/checkpoint/info.go index 33ad60bcc792c..646c1ecf38892 100644 --- a/pkg/vm/engine/tae/db/checkpoint/info.go +++ b/pkg/vm/engine/tae/db/checkpoint/info.go @@ -16,33 +16,43 @@ package checkpoint import ( "context" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "time" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" + "github.com/matrixorigin/matrixone/pkg/container/types" - "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" ) +type RunnerWriter interface { + RemoveCheckpointMetaFile(string) + AddCheckpointMetaFile(string) + UpdateCompacted(entry *CheckpointEntry) +} + type RunnerReader interface { + String() string GetAllIncrementalCheckpoints() []*CheckpointEntry GetAllGlobalCheckpoints() []*CheckpointEntry GetPenddingIncrementalCount() int GetGlobalCheckpointCount() int CollectCheckpointsInRange(ctx context.Context, start, end types.TS) (ckpLoc string, lastEnd types.TS, err error) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry - MaxGlobalCheckpoint() *CheckpointEntry GetLowWaterMark() types.TS MaxLSN() uint64 GetCatalog() *catalog.Catalog GetCheckpointMetaFiles() map[string]struct{} - RemoveCheckpointMetaFile(string) - AddCheckpointMetaFile(string) ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry GetCompacted() *CheckpointEntry - UpdateCompacted(entry *CheckpointEntry) - GetDriver() wal.Driver + + // for test, delete in next phase + GetAllCheckpoints() []*CheckpointEntry + GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry + + MaxGlobalCheckpoint() *CheckpointEntry + MaxIncrementalCheckpoint() *CheckpointEntry + GetDirtyCollector() logtail.Collector } func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncateLSN uint64) *containers.Batch { @@ -80,38 +90,28 @@ func (r *runner) collectCheckpointMetadata(start, end types.TS, ckpLSN, truncate return bat } func (r *runner) GetAllIncrementalCheckpoints() []*CheckpointEntry { - r.storage.Lock() - snapshot := r.storage.incrementals.Copy() - r.storage.Unlock() - return snapshot.Items() + return r.store.GetAllIncrementalCheckpoints() } func (r *runner) GetAllGlobalCheckpoints() []*CheckpointEntry { - r.storage.Lock() - snapshot := r.storage.globals.Copy() - r.storage.Unlock() - return snapshot.Items() + return r.store.GetAllGlobalCheckpoints() } + func (r *runner) MaxLSN() uint64 { endTs := types.BuildTS(time.Now().UTC().UnixNano(), 0) return r.source.GetMaxLSN(types.TS{}, endTs) } + func (r *runner) MaxLSNInRange(end types.TS) uint64 { return r.source.GetMaxLSN(types.TS{}, end) } func (r *runner) MaxGlobalCheckpoint() *CheckpointEntry { - r.storage.RLock() - defer r.storage.RUnlock() - global, _ := r.storage.globals.Max() - return global + return r.store.MaxGlobalCheckpoint() } func (r *runner) MaxIncrementalCheckpoint() *CheckpointEntry { - r.storage.RLock() - defer r.storage.RUnlock() - entry, _ := r.storage.incrementals.Max() - return entry + return r.store.MaxIncrementalCheckpoint() } func (r *runner) GetCatalog() *catalog.Catalog { @@ -119,262 +119,52 @@ func (r *runner) GetCatalog() *catalog.Catalog { } func (r *runner) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry { - r.storage.Lock() - tree := r.storage.incrementals.Copy() - r.storage.Unlock() - it := tree.Iter() - ok := it.Seek(NewCheckpointEntry(r.rt.SID(), ts, ts, ET_Incremental)) - incrementals := make([]*CheckpointEntry, 0) - if ok { - for len(incrementals) < cnt { - e := it.Item() - if !e.IsFinished() { - break - } - if e.start.LT(&ts) { - if !it.Next() { - break - } - continue - } - incrementals = append(incrementals, e) - if !it.Next() { - break - } - } - } - return incrementals + return r.store.ICKPSeekLT(ts, cnt) } func (r *runner) ICKPRange(start, end *types.TS, cnt int) []*CheckpointEntry { - r.storage.Lock() - tree := r.storage.incrementals.Copy() - r.storage.Unlock() - it := tree.Iter() - ok := it.Seek(NewCheckpointEntry(r.rt.SID(), *start, *start, ET_Incremental)) - incrementals := make([]*CheckpointEntry, 0) - if ok { - for len(incrementals) < cnt { - e := it.Item() - if !e.IsFinished() { - break - } - if e.start.GE(start) && e.start.LT(end) { - incrementals = append(incrementals, e) - } - if !it.Next() { - break - } - } - } - return incrementals + return r.store.ICKPRange(start, end, cnt) } func (r *runner) GetCompacted() *CheckpointEntry { - r.storage.Lock() - defer r.storage.Unlock() - return r.storage.compacted.Load() + return r.store.GetCompacted() } func (r *runner) UpdateCompacted(entry *CheckpointEntry) { - r.storage.Lock() - defer r.storage.Unlock() - r.storage.compacted.Store(entry) + r.store.UpdateCompacted(entry) } // this API returns the min ts of all checkpoints // the start of global checkpoint is always 0 func (r *runner) GetLowWaterMark() types.TS { - r.storage.RLock() - defer r.storage.RUnlock() - global, okG := r.storage.globals.Min() - incremental, okI := r.storage.incrementals.Min() - if !okG && !okI { - return types.TS{} - } - if !okG { - return incremental.start - } - if !okI { - return global.start - } - if global.end.LT(&incremental.start) { - return global.end - } - return incremental.start + return r.store.GetLowWaterMark() } func (r *runner) GetPenddingIncrementalCount() int { - entries := r.GetAllIncrementalCheckpoints() - global := r.MaxGlobalCheckpoint() - - count := 0 - for i := len(entries) - 1; i >= 0; i-- { - if global != nil && entries[i].end.LE(&global.end) { - break - } - if !entries[i].IsFinished() { - continue - } - count++ - } - return count + return r.store.GetPenddingIncrementalCount() } func (r *runner) GetGlobalCheckpointCount() int { - r.storage.RLock() - defer r.storage.RUnlock() - return r.storage.globals.Len() -} - -func (r *runner) getLastFinishedGlobalCheckpointLocked() *CheckpointEntry { - g, ok := r.storage.globals.Max() - if !ok { - return nil - } - if g.IsFinished() { - return g - } - it := r.storage.globals.Iter() - it.Seek(g) - defer it.Release() - if !it.Prev() { - return nil - } - return it.Item() + return r.store.GetGlobalCheckpointCount() } func (r *runner) GetAllCheckpoints() []*CheckpointEntry { - ckps := make([]*CheckpointEntry, 0) - var ts types.TS - r.storage.Lock() - g := r.getLastFinishedGlobalCheckpointLocked() - tree := r.storage.incrementals.Copy() - r.storage.Unlock() - if g != nil { - ts = g.GetEnd() - ckps = append(ckps, g) - } - pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental) - iter := tree.Iter() - defer iter.Release() - if ok := iter.Seek(pivot); ok { - for { - e := iter.Item() - if !e.IsFinished() { - break - } - ckps = append(ckps, e) - if !iter.Next() { - break - } - } - } - return ckps + return r.store.GetAllCheckpoints() } func (r *runner) GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry { - ckps := make([]*CheckpointEntry, 0) - var ts types.TS - if compact != nil { - ts = compact.GetEnd() - ckps = append(ckps, compact) - } - r.storage.Lock() - g := r.getLastFinishedGlobalCheckpointLocked() - tree := r.storage.incrementals.Copy() - r.storage.Unlock() - if g != nil { - if ts.IsEmpty() { - ts = g.GetEnd() - } - ckps = append(ckps, g) - } - pivot := NewCheckpointEntry(r.rt.SID(), ts.Next(), ts.Next(), ET_Incremental) - iter := tree.Iter() - defer iter.Release() - if ok := iter.Seek(pivot); ok { - for { - e := iter.Item() - if !e.IsFinished() { - break - } - ckps = append(ckps, e) - if !iter.Next() { - break - } - } - } - return ckps - + return r.store.GetAllCheckpointsForBackup(compact) } func (r *runner) GCByTS(ctx context.Context, ts types.TS) error { - prev := r.gcTS.Load() - if prev == nil { - r.gcTS.Store(ts) - } else { - prevTS := prev.(types.TS) - if prevTS.LT(&ts) { - r.gcTS.Store(ts) - } - } - logutil.Debugf("GC %v", ts.ToString()) - r.gcCheckpointQueue.Enqueue(struct{}{}) - return nil -} - -func (r *runner) getGCTS() types.TS { - prev := r.gcTS.Load() - if prev == nil { - return types.TS{} - } - return prev.(types.TS) -} - -func (r *runner) getGCedTS() types.TS { - r.storage.RLock() - minGlobal, _ := r.storage.globals.Min() - minIncremental, _ := r.storage.incrementals.Min() - r.storage.RUnlock() - if minGlobal == nil { - return types.TS{} - } - if minIncremental == nil { - return minGlobal.end - } - if minIncremental.start.GE(&minGlobal.end) { - return minGlobal.end - } - return minIncremental.start -} - -func (r *runner) getTSToGC() types.TS { - maxGlobal := r.MaxGlobalCheckpoint() - if maxGlobal == nil { - return types.TS{} - } - if maxGlobal.IsFinished() { - return maxGlobal.end.Prev() - } - globals := r.GetAllGlobalCheckpoints() - if len(globals) == 1 { - return types.TS{} + if _, updated := r.store.UpdateGCIntent(&ts); !updated { + // TODO + return nil } - maxGlobal = globals[len(globals)-1] - return maxGlobal.end.Prev() + _, err := r.gcCheckpointQueue.Enqueue(struct{}{}) + return err } -func (r *runner) ExistPendingEntryToGC() bool { - _, needGC := r.getTSTOGC() - return needGC -} - -func (r *runner) IsTSStale(ts types.TS) bool { - gcts := r.getGCTS() - if gcts.IsEmpty() { - return false - } - minValidTS := gcts.Physical() - r.options.globalVersionInterval.Nanoseconds() - return ts.Physical() < minValidTS +func (r *runner) GCNeeded() bool { + return r.store.GCNeeded() } diff --git a/pkg/vm/engine/tae/db/checkpoint/replay.go b/pkg/vm/engine/tae/db/checkpoint/replay.go index 2a31257ff88a8..2b25566f20cba 100644 --- a/pkg/vm/engine/tae/db/checkpoint/replay.go +++ b/pkg/vm/engine/tae/db/checkpoint/replay.go @@ -163,7 +163,7 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { } panic("invalid compacted checkpoint file") } - r.tryAddNewCompactedCheckpointEntry(entry[0]) + r.store.TryAddNewCompactedCheckpointEntry(entry[0]) closeCB() } } @@ -251,11 +251,11 @@ func (c *CkpReplayer) ReadCkpFiles() (err error) { } if checkpointEntry.GetType() == ET_Global { c.globalCkpIdx = i - r.tryAddNewGlobalCheckpointEntry(checkpointEntry) + r.store.TryAddNewGlobalCheckpointEntry(checkpointEntry) } else if checkpointEntry.GetType() == ET_Incremental { - r.tryAddNewIncrementalCheckpointEntry(checkpointEntry) + r.store.TryAddNewIncrementalCheckpointEntry(checkpointEntry) } else if checkpointEntry.GetType() == ET_Backup { - r.tryAddNewBackupCheckpointEntry(checkpointEntry) + r.store.TryAddNewBackupCheckpointEntry(checkpointEntry) } } return nil diff --git a/pkg/vm/engine/tae/db/checkpoint/runner.go b/pkg/vm/engine/tae/db/checkpoint/runner.go index e7754cd6f3772..28fdb72db28d2 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner.go @@ -18,8 +18,6 @@ import ( "bytes" "context" "fmt" - "strconv" - "strings" "sync" "sync/atomic" "time" @@ -34,7 +32,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/objectio" @@ -43,7 +40,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/sm" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" - "github.com/tidwall/btree" ) type timeBasedPolicy struct { @@ -194,14 +190,7 @@ type runner struct { disabled atomic.Bool // memory storage of the checkpoint entries - storage struct { - sync.RWMutex - incrementals *btree.BTreeG[*CheckpointEntry] - globals *btree.BTreeG[*CheckpointEntry] - compacted atomic.Pointer[CheckpointEntry] - } - - gcTS atomic.Value + store *runnerStore // checkpoint policy incrementalPolicy *timeBasedPolicy @@ -227,7 +216,8 @@ func NewRunner( catalog *catalog.Catalog, source logtail.Collector, wal wal.Driver, - opts ...Option) *runner { + opts ...Option, +) *runner { r := &runner{ ctx: ctx, rt: rt, @@ -236,21 +226,13 @@ func NewRunner( observers: new(observers), wal: wal, } - r.storage.incrementals = btree.NewBTreeGOptions(func(a, b *CheckpointEntry) bool { - return a.end.LT(&b.end) - }, btree.Options{ - NoLocks: true, - }) - r.storage.globals = btree.NewBTreeGOptions(func(a, b *CheckpointEntry) bool { - return a.end.LT(&b.end) - }, btree.Options{ - NoLocks: true, - }) for _, opt := range opts { opt(r) } r.fillDefaults() + r.store = newRunnerStore(r.rt.SID(), r.options.globalVersionInterval) + r.incrementalPolicy = &timeBasedPolicy{interval: r.options.minIncrementalInterval} r.globalPolicy = &countBasedPolicy{minCount: r.options.globalMinCount} r.incrementalCheckpointQueue = sm.NewSafeQueue(r.options.checkpointQueueSize, 100, r.onIncrementalCheckpointEntries) @@ -284,10 +266,6 @@ func (r *runner) AddCheckpointMetaFile(name string) { r.checkpointMetaFiles.files[name] = struct{}{} } -func (r *runner) GetDriver() wal.Driver { - return r.wal -} - func (r *runner) RemoveCheckpointMetaFile(name string) { r.checkpointMetaFiles.Lock() defer r.checkpointMetaFiles.Unlock() @@ -338,47 +316,7 @@ func (r *runner) onGlobalCheckpointEntries(items ...any) { } func (r *runner) onGCCheckpointEntries(items ...any) { - gcTS, needGC := r.getTSTOGC() - if !needGC { - return - } - r.gcCheckpointEntries(gcTS) -} - -func (r *runner) getTSTOGC() (ts types.TS, needGC bool) { - ts = r.getGCTS() - if ts.IsEmpty() { - return - } - tsTOGC := r.getTSToGC() - if tsTOGC.LT(&ts) { - ts = tsTOGC - } - gcedTS := r.getGCedTS() - if gcedTS.GE(&ts) { - return - } - needGC = true - return -} - -// PXU TODO: delete in the loop -func (r *runner) gcCheckpointEntries(ts types.TS) { - if ts.IsEmpty() { - return - } - incrementals := r.GetAllIncrementalCheckpoints() - for _, incremental := range incrementals { - if incremental.LessEq(ts) { - r.DeleteIncrementalEntry(incremental) - } - } - globals := r.GetAllGlobalCheckpoints() - for _, global := range globals { - if global.LessEq(ts) { - r.DeleteGlobalEntry(global) - } - } + r.store.TryGC() } func (r *runner) onIncrementalCheckpointEntries(items ...any) { @@ -476,23 +414,6 @@ func (r *runner) onIncrementalCheckpointEntries(items ...any) { }) } -func (r *runner) DeleteIncrementalEntry(entry *CheckpointEntry) { - r.storage.Lock() - defer r.storage.Unlock() - r.storage.incrementals.Delete(entry) - perfcounter.Update(r.ctx, func(counter *perfcounter.CounterSet) { - counter.TAE.CheckPoint.DeleteIncrementalEntry.Add(1) - }) -} -func (r *runner) DeleteGlobalEntry(entry *CheckpointEntry) { - r.storage.Lock() - defer r.storage.Unlock() - r.storage.globals.Delete(entry) - perfcounter.Update(r.ctx, func(counter *perfcounter.CounterSet) { - counter.TAE.CheckPoint.DeleteGlobalEntry.Add(1) - }) -} - func (r *runner) saveCheckpoint(start, end types.TS, ckpLSN, truncateLSN uint64) (name string, err error) { bat := r.collectCheckpointMetadata(start, end, ckpLSN, truncateLSN) defer bat.Close() @@ -613,7 +534,7 @@ func (r *runner) doGlobalCheckpoint( entry.SetLocation(cnLocation, tnLocation) files = append(files, cnLocation.Name().String()) - r.tryAddNewGlobalCheckpointEntry(entry) + r.store.TryAddNewGlobalCheckpointEntry(entry) entry.SetState(ST_Finished) var name string if name, err = r.saveCheckpoint(entry.start, entry.end, 0, 0); err != nil { @@ -650,82 +571,6 @@ func (r *runner) onPostCheckpointEntries(entries ...any) { } } -func (r *runner) tryAddNewGlobalCheckpointEntry(entry *CheckpointEntry) (success bool) { - r.storage.Lock() - defer r.storage.Unlock() - r.storage.globals.Set(entry) - return true -} - -func (r *runner) tryAddNewCompactedCheckpointEntry(entry *CheckpointEntry) (success bool) { - if entry.entryType != ET_Compacted { - panic("tryAddNewCompactedCheckpointEntry entry type is error") - } - r.storage.Lock() - defer r.storage.Unlock() - old := r.storage.compacted.Load() - if old != nil { - end := old.end - if entry.end.LT(&end) { - return true - } - } - r.storage.compacted.Store(entry) - return true -} - -func (r *runner) tryAddNewIncrementalCheckpointEntry(entry *CheckpointEntry) (success bool) { - r.storage.Lock() - defer r.storage.Unlock() - maxEntry, _ := r.storage.incrementals.Max() - - // if it's the first entry, add it - if maxEntry == nil { - r.storage.incrementals.Set(entry) - success = true - return - } - - // if it is not the right candidate, skip this request - // [startTs, endTs] --> [endTs+1, ?] - endTS := maxEntry.GetEnd() - startTS := entry.GetStart() - nextTS := endTS.Next() - if !nextTS.Equal(&startTS) { - success = false - return - } - - // if the max entry is not finished, skip this request - if !maxEntry.IsFinished() { - success = false - return - } - - r.storage.incrementals.Set(entry) - - success = true - return -} - -// Since there is no wal after recovery, the checkpoint lsn before backup must be set to 0. -func (r *runner) tryAddNewBackupCheckpointEntry(entry *CheckpointEntry) (success bool) { - entry.entryType = ET_Incremental - success = r.tryAddNewIncrementalCheckpointEntry(entry) - if !success { - return - } - r.storage.Lock() - defer r.storage.Unlock() - it := r.storage.incrementals.Iter() - for it.Next() { - e := it.Item() - e.ckpLSN = 0 - e.truncateLSN = 0 - } - return -} - func (r *runner) tryScheduleIncrementalCheckpoint(start, end types.TS) { // ts := types.BuildTS(time.Now().UTC().UnixNano(), 0) _, count := r.source.ScanInRange(start, end) @@ -733,7 +578,7 @@ func (r *runner) tryScheduleIncrementalCheckpoint(start, end types.TS) { return } entry := NewCheckpointEntry(r.rt.SID(), start, end, ET_Incremental) - r.tryAddNewIncrementalCheckpointEntry(entry) + r.store.TryAddNewIncrementalCheckpointEntry(entry) } func (r *runner) TryScheduleCheckpoint(endts types.TS) { @@ -842,129 +687,5 @@ func (r *runner) GetDirtyCollector() logtail.Collector { func (r *runner) CollectCheckpointsInRange( ctx context.Context, start, end types.TS, ) (locations string, checkpointed types.TS, err error) { - if r.IsTSStale(end) { - return "", types.TS{}, moerr.NewInternalErrorf(ctx, "ts %v is staled", end.ToString()) - } - r.storage.Lock() - tree := r.storage.incrementals.Copy() - global, _ := r.storage.globals.Max() - r.storage.Unlock() - locs := make([]string, 0) - ckpStart := types.MaxTs() - newStart := start - if global != nil && global.HasOverlap(start, end) { - locs = append(locs, global.GetLocation().String()) - locs = append(locs, strconv.Itoa(int(global.version))) - newStart = global.end.Next() - ckpStart = global.GetEnd() - checkpointed = global.GetEnd() - } - pivot := NewCheckpointEntry(r.rt.SID(), newStart, newStart, ET_Incremental) - - // For debug - // checkpoints := make([]*CheckpointEntry, 0) - // defer func() { - // items := tree.Items() - // logutil.Infof("CollectCheckpointsInRange: Pivot: %s", pivot.String()) - // for i, item := range items { - // logutil.Infof("CollectCheckpointsInRange: Source[%d]: %s", i, item.String()) - // } - // for i, ckp := range checkpoints { - // logutil.Infof("CollectCheckpointsInRange: Found[%d]:%s", i, ckp.String()) - // } - // logutil.Infof("CollectCheckpointsInRange: Checkpointed=%s", checkpointed.ToString()) - // }() - - iter := tree.Iter() - defer iter.Release() - - if ok := iter.Seek(pivot); ok { - if ok = iter.Prev(); ok { - e := iter.Item() - if !e.IsCommitted() { - if len(locs) == 0 { - return - } - duration := fmt.Sprintf("[%s_%s]", - ckpStart.ToString(), - ckpStart.ToString()) - locs = append(locs, duration) - locations = strings.Join(locs, ";") - return - } - if e.HasOverlap(newStart, end) { - locs = append(locs, e.GetLocation().String()) - locs = append(locs, strconv.Itoa(int(e.version))) - start := e.GetStart() - if start.LT(&ckpStart) { - ckpStart = start - } - checkpointed = e.GetEnd() - // checkpoints = append(checkpoints, e) - } - iter.Next() - } - for { - e := iter.Item() - if !e.IsCommitted() || !e.HasOverlap(newStart, end) { - break - } - locs = append(locs, e.GetLocation().String()) - locs = append(locs, strconv.Itoa(int(e.version))) - start := e.GetStart() - if start.LT(&ckpStart) { - ckpStart = start - } - checkpointed = e.GetEnd() - // checkpoints = append(checkpoints, e) - if ok = iter.Next(); !ok { - break - } - } - } else { - // if it is empty, quick quit - if ok = iter.Last(); !ok { - if len(locs) == 0 { - return - } - duration := fmt.Sprintf("[%s_%s]", - ckpStart.ToString(), - ckpStart.ToString()) - locs = append(locs, duration) - locations = strings.Join(locs, ";") - return - } - // get last entry - e := iter.Item() - // if it is committed and visible, quick quit - if !e.IsCommitted() || !e.HasOverlap(newStart, end) { - if len(locs) == 0 { - return - } - duration := fmt.Sprintf("[%s_%s]", - ckpStart.ToString(), - ckpStart.ToString()) - locs = append(locs, duration) - locations = strings.Join(locs, ";") - return - } - locs = append(locs, e.GetLocation().String()) - locs = append(locs, strconv.Itoa(int(e.version))) - start := e.GetStart() - if start.LT(&ckpStart) { - ckpStart = start - } - checkpointed = e.GetEnd() - // checkpoints = append(checkpoints, e) - } - - if len(locs) == 0 { - return - } - duration := fmt.Sprintf("[%s_%s]", - ckpStart.ToString(), - checkpointed.ToString()) - locs = append(locs, duration) - locations = strings.Join(locs, ";") - return + return r.store.CollectCheckpointsInRange(ctx, start, end) } diff --git a/pkg/vm/engine/tae/db/checkpoint/runner_test.go b/pkg/vm/engine/tae/db/checkpoint/runner_test.go index ffafa4deb89db..a58097163dcaf 100644 --- a/pkg/vm/engine/tae/db/checkpoint/runner_test.go +++ b/pkg/vm/engine/tae/db/checkpoint/runner_test.go @@ -33,7 +33,7 @@ func TestCkpCheck(t *testing.T) { r := NewRunner(context.Background(), nil, nil, nil, nil) for i := 0; i < 100; i += 10 { - r.storage.incrementals.Set(&CheckpointEntry{ + r.store.incrementals.Set(&CheckpointEntry{ start: types.BuildTS(int64(i), 0), end: types.BuildTS(int64(i+9), 0), state: ST_Finished, @@ -42,7 +42,7 @@ func TestCkpCheck(t *testing.T) { }) } - r.storage.incrementals.Set(&CheckpointEntry{ + r.store.incrementals.Set(&CheckpointEntry{ start: types.BuildTS(int64(100), 0), end: types.BuildTS(int64(109), 0), state: ST_Running, @@ -90,7 +90,7 @@ func TestGetCheckpoints1(t *testing.T) { if i == 4 { entry.state = ST_Pending } - r.storage.incrementals.Set(entry) + r.store.incrementals.Set(entry) } ctx := context.Background() @@ -170,7 +170,7 @@ func TestGetCheckpoints2(t *testing.T) { cnLocation: objectio.Location(fmt.Sprintf("global%d", i)), version: 100, } - r.storage.globals.Set(entry) + r.store.globals.Set(entry) } start := timestamps[i].Next() if addGlobal { @@ -186,7 +186,7 @@ func TestGetCheckpoints2(t *testing.T) { if i == 4 { entry.state = ST_Pending } - r.storage.incrementals.Set(entry) + r.store.incrementals.Set(entry) } ctx := context.Background() @@ -271,7 +271,7 @@ func TestICKPSeekLT(t *testing.T) { if i == 4 { entry.state = ST_Pending } - r.storage.incrementals.Set(entry) + r.store.incrementals.Set(entry) } // 0, 1 diff --git a/pkg/vm/engine/tae/db/checkpoint/store.go b/pkg/vm/engine/tae/db/checkpoint/store.go new file mode 100644 index 0000000000000..d02b2bf9f3780 --- /dev/null +++ b/pkg/vm/engine/tae/db/checkpoint/store.go @@ -0,0 +1,679 @@ +// Copyright 2021 Matrix Origin +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package checkpoint + +import ( + "context" + "fmt" + "strconv" + "strings" + "sync" + "sync/atomic" + "time" + + "github.com/matrixorigin/matrixone/pkg/common/moerr" + "github.com/matrixorigin/matrixone/pkg/container/types" + "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/tidwall/btree" + "go.uber.org/zap" +) + +func newRunnerStore( + sid string, + globalHistoryDuration time.Duration, +) *runnerStore { + s := new(runnerStore) + s.sid = sid + s.globalHistoryDuration = globalHistoryDuration + s.incrementals = btree.NewBTreeGOptions( + func(a, b *CheckpointEntry) bool { + return a.end.LT(&b.end) + }, btree.Options{ + NoLocks: true, + }, + ) + s.globals = btree.NewBTreeGOptions( + func(a, b *CheckpointEntry) bool { + return a.end.LT(&b.end) + }, btree.Options{ + NoLocks: true, + }, + ) + return s +} + +type runnerStore struct { + sync.RWMutex + sid string + + globalHistoryDuration time.Duration + + incrementals *btree.BTreeG[*CheckpointEntry] + globals *btree.BTreeG[*CheckpointEntry] + compacted atomic.Pointer[CheckpointEntry] + // metaFiles map[string]struct{} + + gcIntent types.TS + gcCount int + gcTime time.Time + gcWatermark atomic.Value +} + +func (s *runnerStore) ExportStatsLocked() []zap.Field { + fields := make([]zap.Field, 0, 8) + fields = append(fields, zap.Int("gc-count", s.gcCount)) + fields = append(fields, zap.Time("gc-time", s.gcTime)) + wm := s.gcWatermark.Load() + if wm != nil { + fields = append(fields, zap.String("gc-watermark", wm.(types.TS).ToString())) + } + fields = append(fields, zap.Int("global-count", s.globals.Len())) + fields = append(fields, zap.Int("incremental-count", s.incrementals.Len())) + return fields +} + +func (s *runnerStore) AddNewIncrementalEntry(entry *CheckpointEntry) { + s.Lock() + defer s.Unlock() + s.incrementals.Set(entry) +} + +func (s *runnerStore) CleanPenddingCheckpoint() { + prev := s.MaxIncrementalCheckpoint() + if prev == nil { + return + } + if !prev.IsFinished() { + s.Lock() + s.incrementals.Delete(prev) + s.Unlock() + } + if prev.IsRunning() { + logutil.Warnf("Delete a running checkpoint entry") + } + prev = s.MaxGlobalCheckpoint() + if prev == nil { + return + } + if !prev.IsFinished() { + s.Lock() + s.incrementals.Delete(prev) + s.Unlock() + } + if prev.IsRunning() { + logutil.Warnf("Delete a running checkpoint entry") + } +} + +func (s *runnerStore) TryAddNewCompactedCheckpointEntry(entry *CheckpointEntry) (success bool) { + if entry.entryType != ET_Compacted { + panic("TryAddNewCompactedCheckpointEntry entry type is error") + } + s.Lock() + defer s.Unlock() + old := s.compacted.Load() + if old != nil { + end := old.end + if entry.end.LT(&end) { + return true + } + } + s.compacted.Store(entry) + return true +} + +func (s *runnerStore) TryAddNewIncrementalCheckpointEntry(entry *CheckpointEntry) (success bool) { + s.Lock() + defer s.Unlock() + maxEntry, _ := s.incrementals.Max() + + // if it's the first entry, add it + if maxEntry == nil { + s.incrementals.Set(entry) + success = true + return + } + + // if it is not the right candidate, skip this request + // [startTs, endTs] --> [endTs+1, ?] + endTS := maxEntry.GetEnd() + startTS := entry.GetStart() + nextTS := endTS.Next() + if !nextTS.Equal(&startTS) { + success = false + return + } + + // if the max entry is not finished, skip this request + if !maxEntry.IsFinished() { + success = false + return + } + + s.incrementals.Set(entry) + + success = true + return +} + +// Since there is no wal after recovery, the checkpoint lsn before backup must be set to 0. +func (s *runnerStore) TryAddNewBackupCheckpointEntry(entry *CheckpointEntry) (success bool) { + entry.entryType = ET_Incremental + success = s.TryAddNewIncrementalCheckpointEntry(entry) + if !success { + return + } + s.Lock() + defer s.Unlock() + it := s.incrementals.Iter() + for it.Next() { + e := it.Item() + e.ckpLSN = 0 + e.truncateLSN = 0 + } + return +} + +func (s *runnerStore) TryAddNewGlobalCheckpointEntry( + entry *CheckpointEntry, +) (success bool) { + s.Lock() + defer s.Unlock() + s.globals.Set(entry) + return true +} + +func (s *runnerStore) GetAllGlobalCheckpoints() []*CheckpointEntry { + s.Lock() + snapshot := s.globals.Copy() + s.Unlock() + return snapshot.Items() +} + +func (s *runnerStore) GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry { + ckps := make([]*CheckpointEntry, 0) + var ts types.TS + if compact != nil { + ts = compact.GetEnd() + ckps = append(ckps, compact) + } + s.Lock() + g := s.MaxFinishedGlobalCheckpointLocked() + tree := s.incrementals.Copy() + s.Unlock() + if g != nil { + if ts.IsEmpty() { + ts = g.GetEnd() + } + ckps = append(ckps, g) + } + pivot := NewCheckpointEntry(s.sid, ts.Next(), ts.Next(), ET_Incremental) + iter := tree.Iter() + defer iter.Release() + if ok := iter.Seek(pivot); ok { + for { + e := iter.Item() + if !e.IsFinished() { + break + } + ckps = append(ckps, e) + if !iter.Next() { + break + } + } + } + return ckps +} + +func (s *runnerStore) GetAllCheckpoints() []*CheckpointEntry { + ckps := make([]*CheckpointEntry, 0) + var ts types.TS + s.Lock() + g := s.MaxFinishedGlobalCheckpointLocked() + tree := s.incrementals.Copy() + s.Unlock() + if g != nil { + ts = g.GetEnd() + ckps = append(ckps, g) + } + pivot := NewCheckpointEntry(s.sid, ts.Next(), ts.Next(), ET_Incremental) + iter := tree.Iter() + defer iter.Release() + if ok := iter.Seek(pivot); ok { + for { + e := iter.Item() + if !e.IsFinished() { + break + } + ckps = append(ckps, e) + if !iter.Next() { + break + } + } + } + return ckps +} + +func (s *runnerStore) MaxFinishedGlobalCheckpointLocked() *CheckpointEntry { + g, ok := s.globals.Max() + if !ok { + return nil + } + if g.IsFinished() { + return g + } + it := s.globals.Iter() + it.Seek(g) + defer it.Release() + if !it.Prev() { + return nil + } + return it.Item() +} + +func (s *runnerStore) GetPenddingIncrementalCount() int { + entries := s.GetAllIncrementalCheckpoints() + global := s.MaxGlobalCheckpoint() + + count := 0 + for i := len(entries) - 1; i >= 0; i-- { + if global != nil && entries[i].end.LE(&global.end) { + break + } + if !entries[i].IsFinished() { + continue + } + count++ + } + return count +} + +func (s *runnerStore) GetAllIncrementalCheckpoints() []*CheckpointEntry { + s.Lock() + snapshot := s.incrementals.Copy() + s.Unlock() + return snapshot.Items() +} + +func (s *runnerStore) GetGlobalCheckpointCount() int { + s.RLock() + defer s.RUnlock() + return s.globals.Len() +} + +func (s *runnerStore) GetLowWaterMark() types.TS { + s.RLock() + defer s.RUnlock() + global, okG := s.globals.Min() + incremental, okI := s.incrementals.Min() + if !okG && !okI { + return types.TS{} + } + if !okG { + return incremental.start + } + if !okI { + return global.start + } + if global.end.LT(&incremental.start) { + return global.end + } + return incremental.start +} + +func (s *runnerStore) GetCompacted() *CheckpointEntry { + return s.compacted.Load() +} + +func (s *runnerStore) UpdateCompacted(entry *CheckpointEntry) { + s.compacted.Store(entry) +} + +func (s *runnerStore) ICKPRange( + start, end *types.TS, cnt int, +) []*CheckpointEntry { + s.Lock() + tree := s.incrementals.Copy() + s.Unlock() + it := tree.Iter() + ok := it.Seek(NewCheckpointEntry(s.sid, *start, *start, ET_Incremental)) + incrementals := make([]*CheckpointEntry, 0) + if ok { + for len(incrementals) < cnt { + e := it.Item() + if !e.IsFinished() { + break + } + if e.start.GE(start) && e.start.LT(end) { + incrementals = append(incrementals, e) + } + if !it.Next() { + break + } + } + } + return incrementals +} + +func (s *runnerStore) ICKPSeekLT(ts types.TS, cnt int) []*CheckpointEntry { + s.Lock() + tree := s.incrementals.Copy() + s.Unlock() + it := tree.Iter() + ok := it.Seek(NewCheckpointEntry(s.sid, ts, ts, ET_Incremental)) + incrementals := make([]*CheckpointEntry, 0) + if ok { + for len(incrementals) < cnt { + e := it.Item() + if !e.IsFinished() { + break + } + if e.start.LT(&ts) { + if !it.Next() { + break + } + continue + } + incrementals = append(incrementals, e) + if !it.Next() { + break + } + } + } + return incrementals +} + +func (s *runnerStore) MaxGlobalCheckpoint() *CheckpointEntry { + s.RLock() + defer s.RUnlock() + global, _ := s.globals.Max() + return global +} + +func (s *runnerStore) MaxIncrementalCheckpoint() *CheckpointEntry { + s.RLock() + defer s.RUnlock() + entry, _ := s.incrementals.Max() + return entry +} + +func (s *runnerStore) IsStale(ts *types.TS) bool { + waterMark := s.gcWatermark.Load() + if waterMark == nil { + return false + } + wm := waterMark.(types.TS) + minPhysical := wm.Physical() - s.globalHistoryDuration.Nanoseconds() + return ts.Physical() < minPhysical +} + +func (s *runnerStore) TryGC() (gdeleted, ideleted int) { + s.Lock() + defer s.Unlock() + // if there's no intent, no need to GC + if s.gcIntent.IsEmpty() { + return + } + intent := s.gcIntent + + minTS := s.minTSLocked() + // no need to GC if the minTS is larger than the gcIntent + // gcIntent minTS + // ----------+-----------------+---------------------------> + if minTS.GE(&intent) { + return + } + + safeTS := s.getSafeGCTSLocked() + if intent.GT(&safeTS) { + intent = safeTS + } + return s.doGC(&intent) +} + +func (s *runnerStore) UpdateGCIntent(newIntent *types.TS) (oldIntent types.TS, updated bool) { + s.Lock() + defer s.Unlock() + oldIntent = s.gcIntent + if s.gcIntent.LT(newIntent) { + s.gcIntent = *newIntent + updated = true + } + return +} + +func (s *runnerStore) GCNeeded() bool { + s.RLock() + defer s.RUnlock() + // no gc intent, no need to GC + if s.gcIntent.IsEmpty() { + return false + } + intent := s.gcIntent + safeTS := s.getSafeGCTSLocked() + // if the safeTS is less than the intent, use the safeTS as the intent + if safeTS.LT(&intent) { + intent = safeTS + } + minTS := s.minTSLocked() + return minTS.LT(&intent) +} + +func (s *runnerStore) CollectCheckpointsInRange( + ctx context.Context, start, end types.TS, +) (locations string, checkpointed types.TS, err error) { + if s.IsStale(&end) { + return "", types.TS{}, moerr.NewInternalErrorf(ctx, "ts %v is staled", end.ToString()) + } + s.Lock() + tree := s.incrementals.Copy() + global, _ := s.globals.Max() + s.Unlock() + locs := make([]string, 0) + ckpStart := types.MaxTs() + newStart := start + if global != nil && global.HasOverlap(start, end) { + locs = append(locs, global.GetLocation().String()) + locs = append(locs, strconv.Itoa(int(global.version))) + newStart = global.end.Next() + ckpStart = global.GetEnd() + checkpointed = global.GetEnd() + } + pivot := NewCheckpointEntry(s.sid, newStart, newStart, ET_Incremental) + + // For debug + // checkpoints := make([]*CheckpointEntry, 0) + // defer func() { + // items := tree.Items() + // logutil.Infof("CollectCheckpointsInRange: Pivot: %s", pivot.String()) + // for i, item := range items { + // logutil.Infof("CollectCheckpointsInRange: Source[%d]: %s", i, item.String()) + // } + // for i, ckp := range checkpoints { + // logutil.Infof("CollectCheckpointsInRange: Found[%d]:%s", i, ckp.String()) + // } + // logutil.Infof("CollectCheckpointsInRange: Checkpointed=%s", checkpointed.ToString()) + // }() + + iter := tree.Iter() + defer iter.Release() + + if ok := iter.Seek(pivot); ok { + if ok = iter.Prev(); ok { + e := iter.Item() + if !e.IsCommitted() { + if len(locs) == 0 { + return + } + duration := fmt.Sprintf("[%s_%s]", + ckpStart.ToString(), + ckpStart.ToString()) + locs = append(locs, duration) + locations = strings.Join(locs, ";") + return + } + if e.HasOverlap(newStart, end) { + locs = append(locs, e.GetLocation().String()) + locs = append(locs, strconv.Itoa(int(e.version))) + start := e.GetStart() + if start.LT(&ckpStart) { + ckpStart = start + } + checkpointed = e.GetEnd() + // checkpoints = append(checkpoints, e) + } + iter.Next() + } + for { + e := iter.Item() + if !e.IsCommitted() || !e.HasOverlap(newStart, end) { + break + } + locs = append(locs, e.GetLocation().String()) + locs = append(locs, strconv.Itoa(int(e.version))) + start := e.GetStart() + if start.LT(&ckpStart) { + ckpStart = start + } + checkpointed = e.GetEnd() + // checkpoints = append(checkpoints, e) + if ok = iter.Next(); !ok { + break + } + } + } else { + // if it is empty, quick quit + if ok = iter.Last(); !ok { + if len(locs) == 0 { + return + } + duration := fmt.Sprintf("[%s_%s]", + ckpStart.ToString(), + ckpStart.ToString()) + locs = append(locs, duration) + locations = strings.Join(locs, ";") + return + } + // get last entry + e := iter.Item() + // if it is committed and visible, quick quit + if !e.IsCommitted() || !e.HasOverlap(newStart, end) { + if len(locs) == 0 { + return + } + duration := fmt.Sprintf("[%s_%s]", + ckpStart.ToString(), + ckpStart.ToString()) + locs = append(locs, duration) + locations = strings.Join(locs, ";") + return + } + locs = append(locs, e.GetLocation().String()) + locs = append(locs, strconv.Itoa(int(e.version))) + start := e.GetStart() + if start.LT(&ckpStart) { + ckpStart = start + } + checkpointed = e.GetEnd() + // checkpoints = append(checkpoints, e) + } + + if len(locs) == 0 { + return + } + duration := fmt.Sprintf("[%s_%s]", + ckpStart.ToString(), + checkpointed.ToString()) + locs = append(locs, duration) + locations = strings.Join(locs, ";") + return +} + +// ----------------------------------------------------------------------- +// the following are internal apis +// ----------------------------------------------------------------------- + +// minTSLocked returns the minimum timestamp that is not garbage collected +func (s *runnerStore) minTSLocked() types.TS { + minGlobal, _ := s.globals.Min() + minIncremental, _ := s.incrementals.Min() + + // no global checkpoint yet. no gc executed. + if minGlobal == nil { + return types.TS{} + } + if minIncremental == nil || minIncremental.AllGE(minGlobal) { + return minGlobal.GetEnd() + } + return minIncremental.GetStart() +} + +// here we only consider the global checkpoints as the safe GC timestamp +func (s *runnerStore) getSafeGCTSLocked() (ts types.TS) { + if s.globals.Len() <= 1 { + return + } + maxGlobal, _ := s.globals.Max() + // if there's no global checkpoint, no need to GC + if maxGlobal == nil { + return + } + // if the max global checkpoint is finished, we can GC checkpoints before it + if maxGlobal.IsFinished() { + ts = maxGlobal.GetEnd() + ts = ts.Prev() + return + } + // only one non-finished global checkpoint, no need to GC + if s.globals.Len() == 1 { + return + } + items := s.globals.Items() + maxGlobal = items[len(items)-1] + ts = maxGlobal.GetEnd() + ts = ts.Prev() + return +} + +func (s *runnerStore) doGC(ts *types.TS) (gdeleted, ideleted int) { + if ts.IsEmpty() { + return + } + gloabls := s.globals.Items() + for _, e := range gloabls { + if e.LessEq(ts) { + s.globals.Delete(e) + gdeleted++ + } + } + incrementals := s.incrementals.Items() + for _, e := range incrementals { + if e.LessEq(ts) { + s.incrementals.Delete(e) + ideleted++ + } + } + s.gcCount++ + s.gcTime = time.Now() + s.gcWatermark.Store(*ts) + fields := s.ExportStatsLocked() + fields = append(fields, zap.Int("this-g-deleted", gdeleted)) + fields = append(fields, zap.Int("this-i-deleted", ideleted)) + logutil.Info( + "GC-Inmemory-Checkpoints", + fields..., + ) + return +} diff --git a/pkg/vm/engine/tae/db/checkpoint/testutils.go b/pkg/vm/engine/tae/db/checkpoint/testutils.go index b6bd7cfaacfaf..4d9bd3f426961 100644 --- a/pkg/vm/engine/tae/db/checkpoint/testutils.go +++ b/pkg/vm/engine/tae/db/checkpoint/testutils.go @@ -23,7 +23,6 @@ import ( "github.com/matrixorigin/matrixone/pkg/logutil" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/dbutils" - "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "go.uber.org/zap" ) @@ -39,10 +38,7 @@ type TestRunner interface { ForceIncrementalCheckpoint(end types.TS, truncate bool) error MaxLSNInRange(end types.TS) uint64 - ExistPendingEntryToGC() bool - MaxGlobalCheckpoint() *CheckpointEntry - MaxIncrementalCheckpoint() *CheckpointEntry - GetDirtyCollector() logtail.Collector + GCNeeded() bool } // DisableCheckpoint stops generating checkpoint @@ -55,30 +51,7 @@ func (r *runner) EnableCheckpoint() { } func (r *runner) CleanPenddingCheckpoint() { - prev := r.MaxIncrementalCheckpoint() - if prev == nil { - return - } - if !prev.IsFinished() { - r.storage.Lock() - r.storage.incrementals.Delete(prev) - r.storage.Unlock() - } - if prev.IsRunning() { - logutil.Warnf("Delete a running checkpoint entry") - } - prev = r.MaxGlobalCheckpoint() - if prev == nil { - return - } - if !prev.IsFinished() { - r.storage.Lock() - r.storage.incrementals.Delete(prev) - r.storage.Unlock() - } - if prev.IsRunning() { - logutil.Warnf("Delete a running checkpoint entry") - } + r.store.CleanPenddingCheckpoint() } func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) error { @@ -132,9 +105,7 @@ func (r *runner) ForceGlobalCheckpoint(end types.TS, interval time.Duration) err func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end types.TS, versionInterval time.Duration) error { prevGlobalEnd := types.TS{} - r.storage.RLock() - global, _ := r.storage.globals.Max() - r.storage.RUnlock() + global := r.store.MaxGlobalCheckpoint() if global != nil { prevGlobalEnd = global.end } @@ -142,9 +113,7 @@ func (r *runner) ForceGlobalCheckpointSynchronously(ctx context.Context, end typ r.ForceGlobalCheckpoint(end, versionInterval) op := func() (ok bool, err error) { - r.storage.RLock() - global, _ := r.storage.globals.Max() - r.storage.RUnlock() + global := r.store.MaxGlobalCheckpoint() if global == nil { return false, nil } @@ -218,9 +187,8 @@ func (r *runner) ForceIncrementalCheckpoint(end types.TS, truncate bool) error { } }() - r.storage.Lock() - r.storage.incrementals.Set(entry) - r.storage.Unlock() + // TODO: change me + r.store.AddNewIncrementalEntry(entry) var files []string if fields, files, err = r.doIncrementalCheckpoint(entry); err != nil { @@ -277,10 +245,9 @@ func (r *runner) ForceCheckpointForBackup(end types.TS) (location string, err er start = prev.end.Next() } entry := NewCheckpointEntry(r.rt.SID(), start, end, ET_Incremental) - r.storage.Lock() - r.storage.incrementals.Set(entry) + // TODO: change me + r.store.AddNewIncrementalEntry(entry) now := time.Now() - r.storage.Unlock() var files []string if _, files, err = r.doIncrementalCheckpoint(entry); err != nil { return diff --git a/pkg/vm/engine/tae/db/checkpoint/types.go b/pkg/vm/engine/tae/db/checkpoint/types.go index 992695ab42cb8..8b42fb90b9da5 100644 --- a/pkg/vm/engine/tae/db/checkpoint/types.go +++ b/pkg/vm/engine/tae/db/checkpoint/types.go @@ -47,17 +47,14 @@ type CheckpointScheduler interface { type Runner interface { CheckpointScheduler TestRunner + RunnerWriter RunnerReader + Start() Stop() - String() string - Replay(catalog.DataFactory) *CkpReplayer + Replay(catalog.DataFactory) *CkpReplayer GCByTS(ctx context.Context, ts types.TS) error - - // for test, delete in next phase - GetAllCheckpoints() []*CheckpointEntry - GetAllCheckpointsForBackup(compact *CheckpointEntry) []*CheckpointEntry } type Observer interface { diff --git a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go index 9d9b132032a6a..95287b5a8ed5a 100644 --- a/pkg/vm/engine/tae/db/gc/v3/checkpoint.go +++ b/pkg/vm/engine/tae/db/gc/v3/checkpoint.go @@ -36,6 +36,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logstore/store" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/testutils" + "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/wal" "go.uber.org/zap" ) @@ -48,7 +49,8 @@ type checkpointCleaner struct { mp *mpool.MPool fs *objectio.ObjectFS - checkpointCli checkpoint.RunnerReader + logDriver wal.Driver + checkpointCli checkpoint.Runner deleter *Deleter watermarks struct { @@ -167,13 +169,15 @@ func NewCheckpointCleaner( ctx context.Context, sid string, fs *objectio.ObjectFS, - checkpointCli checkpoint.RunnerReader, + logDriver wal.Driver, + checkpointCli checkpoint.Runner, opts ...CheckpointCleanerOption, ) Cleaner { cleaner := &checkpointCleaner{ ctx: ctx, sid: sid, fs: fs, + logDriver: logDriver, checkpointCli: checkpointCli, } for _, opt := range opts { @@ -1580,15 +1584,14 @@ func (c *checkpointCleaner) RemoveChecker(key string) error { // appendFilesToWAL append the GC meta files to WAL. func (c *checkpointCleaner) appendFilesToWAL(files ...string) error { - driver := c.checkpointCli.GetDriver() - if driver == nil { + if c.logDriver == nil { return nil } entry, err := store.BuildFilesEntry(files) if err != nil { return err } - _, err = driver.AppendEntry(store.GroupFiles, entry) + _, err = c.logDriver.AppendEntry(store.GroupFiles, entry) if err != nil { return err } diff --git a/pkg/vm/engine/tae/db/gc/v3/merge.go b/pkg/vm/engine/tae/db/gc/v3/merge.go index 5d0aa85dd20f2..4a314cee69007 100644 --- a/pkg/vm/engine/tae/db/gc/v3/merge.go +++ b/pkg/vm/engine/tae/db/gc/v3/merge.go @@ -49,7 +49,7 @@ func MergeCheckpoint( ckpEntries []*checkpoint.CheckpointEntry, bf *bloomfilter.BloomFilter, end *types.TS, - client checkpoint.RunnerReader, + client checkpoint.Runner, pool *mpool.MPool, ) (deleteFiles, newFiles []string, checkpointEntry *checkpoint.CheckpointEntry, ckpData *logtail.CheckpointData, err error) { ckpData = logtail.NewCheckpointData(sid, pool) diff --git a/pkg/vm/engine/tae/db/open.go b/pkg/vm/engine/tae/db/open.go index bf1e6802486f0..95bee1419e0ed 100644 --- a/pkg/vm/engine/tae/db/open.go +++ b/pkg/vm/engine/tae/db/open.go @@ -287,7 +287,10 @@ func Open( // sjw TODO: cleaner need to support replay and write mode cleaner := gc2.NewCheckpointCleaner( opts.Ctx, - opts.SID, fs, db.BGCheckpointRunner, + opts.SID, + fs, + db.Wal, + db.BGCheckpointRunner, gc2.WithCanGCCacheSize(opts.GCCfg.CacheSize), gc2.WithMaxMergeCheckpointCount(opts.GCCfg.GCMergeCount), gc2.WithEstimateRows(opts.GCCfg.GCestimateRows), diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 65d08df6987c2..0f4d7833826ac 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -4617,11 +4617,11 @@ func TestReadCheckpoint(t *testing.T) { now = time.Now() assert.Equal(t, uint64(0), tae.Wal.GetPenddingCnt()) testutils.WaitExpect(10000, func() bool { - tae.BGCheckpointRunner.ExistPendingEntryToGC() - return !tae.BGCheckpointRunner.ExistPendingEntryToGC() + tae.BGCheckpointRunner.GCNeeded() + return !tae.BGCheckpointRunner.GCNeeded() }) t.Log(time.Since(now)) - assert.False(t, tae.BGCheckpointRunner.ExistPendingEntryToGC()) + assert.False(t, tae.BGCheckpointRunner.GCNeeded()) entries := tae.BGCheckpointRunner.GetAllGlobalCheckpoints() for _, entry := range entries { t.Log(entry.String()) @@ -5658,7 +5658,9 @@ func TestGCWithCheckpoint(t *testing.T) { opts := config.WithQuickScanAndCKPAndGCOpts(nil) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner := gc.NewCheckpointCleaner( + context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner, + ) manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() @@ -5695,7 +5697,9 @@ func TestGCWithCheckpoint(t *testing.T) { end := entries[num-1].GetEnd() maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) - cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner2 := gc.NewCheckpointCleaner( + context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner, + ) manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() @@ -5728,7 +5732,7 @@ func TestGCDropDB(t *testing.T) { opts := config.WithQuickScanAndCKPAndGCOpts(nil) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner) manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() @@ -5768,7 +5772,9 @@ func TestGCDropDB(t *testing.T) { end := entries[num-1].GetEnd() maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) - cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner2 := gc.NewCheckpointCleaner( + context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner, + ) manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() @@ -5802,7 +5808,9 @@ func TestGCDropTable(t *testing.T) { opts := config.WithQuickScanAndCKPAndGCOpts(nil) tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) defer tae.Close() - cleaner := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner := gc.NewCheckpointCleaner( + context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner, + ) manager := gc.NewDiskCleaner(cleaner, true) manager.Start() defer manager.Stop() @@ -5857,7 +5865,9 @@ func TestGCDropTable(t *testing.T) { end := entries[num-1].GetEnd() maxEnd := manager.GetCleaner().GetScanWaterMark().GetEnd() assert.True(t, end.Equal(&maxEnd)) - cleaner2 := gc.NewCheckpointCleaner(context.Background(), "", tae.Runtime.Fs, tae.BGCheckpointRunner) + cleaner2 := gc.NewCheckpointCleaner( + context.Background(), "", tae.Runtime.Fs, tae.Wal, tae.BGCheckpointRunner, + ) manager2 := gc.NewDiskCleaner(cleaner2, true) manager2.Start() defer manager2.Stop() @@ -7710,10 +7720,10 @@ func TestGCCheckpoint1(t *testing.T) { maxGlobal := tae.BGCheckpointRunner.MaxGlobalCheckpoint() testutils.WaitExpect(4000, func() bool { - tae.BGCheckpointRunner.ExistPendingEntryToGC() - return !tae.BGCheckpointRunner.ExistPendingEntryToGC() + tae.BGCheckpointRunner.GCNeeded() + return !tae.BGCheckpointRunner.GCNeeded() }) - assert.False(t, tae.BGCheckpointRunner.ExistPendingEntryToGC()) + assert.False(t, tae.BGCheckpointRunner.GCNeeded()) globals := tae.BGCheckpointRunner.GetAllGlobalCheckpoints() assert.Equal(t, 1, len(globals))