Skip to content

Commit d3d6ced

Browse files
authored
Merge branch 'main' into memorycache
2 parents 300c84d + 5c1b750 commit d3d6ced

File tree

216 files changed

+20031
-13188
lines changed

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

216 files changed

+20031
-13188
lines changed

pkg/backup/backup_test.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,8 @@ func TestBackupData(t *testing.T) {
127127
locations = append(locations, backupTime.Format(time.DateTime))
128128
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
129129
assert.Nil(t, err)
130-
db.BGCheckpointRunner.DisableCheckpoint()
130+
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
131+
assert.NoError(t, err)
131132
locations = append(locations, location)
132133
checkpoints := db.BGCheckpointRunner.GetAllCheckpoints()
133134
files := make(map[string]string, 0)
@@ -232,7 +233,8 @@ func TestBackupData2(t *testing.T) {
232233
locations = append(locations, backupTime.Format(time.DateTime))
233234
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
234235
assert.Nil(t, err)
235-
db.BGCheckpointRunner.DisableCheckpoint()
236+
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
237+
assert.NoError(t, err)
236238
locations = append(locations, location)
237239
compacted := db.BGCheckpointRunner.GetCompacted()
238240
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
@@ -302,7 +304,8 @@ func TestBackupData3(t *testing.T) {
302304
locations = append(locations, backupTime.Format(time.DateTime))
303305
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
304306
assert.Nil(t, err)
305-
db.BGCheckpointRunner.DisableCheckpoint()
307+
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
308+
assert.NoError(t, err)
306309
locations = append(locations, location)
307310
compacted := db.BGCheckpointRunner.GetCompacted()
308311
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)
@@ -382,7 +385,8 @@ func TestBackupData4(t *testing.T) {
382385
locations = append(locations, backupTime.Format(time.DateTime))
383386
location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second)
384387
assert.Nil(t, err)
385-
db.BGCheckpointRunner.DisableCheckpoint()
388+
err = db.BGCheckpointRunner.DisableCheckpoint(ctx)
389+
assert.NoError(t, err)
386390
locations = append(locations, location)
387391
compacted := db.BGCheckpointRunner.GetCompacted()
388392
checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted)

pkg/backup/tae.go

+34-32
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,6 @@ import (
1919
"crypto/sha256"
2020
"encoding/json"
2121
"fmt"
22-
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
2322
"io"
2423
"os"
2524
"path"
@@ -30,6 +29,9 @@ import (
3029
"sync"
3130
"time"
3231

32+
"github.com/matrixorigin/matrixone/pkg/objectio/ioutil"
33+
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/gc/v3"
34+
3335
"github.com/matrixorigin/matrixone/pkg/common/moerr"
3436
"github.com/matrixorigin/matrixone/pkg/common/runtime"
3537
"github.com/matrixorigin/matrixone/pkg/container/types"
@@ -39,7 +41,6 @@ import (
3941
"github.com/matrixorigin/matrixone/pkg/objectio"
4042
"github.com/matrixorigin/matrixone/pkg/sql/plan/function/ctl"
4143
"github.com/matrixorigin/matrixone/pkg/util/executor"
42-
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/blockio"
4344
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common"
4445
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/checkpoint"
4546
"github.com/matrixorigin/matrixone/pkg/vm/engine/tae/logtail"
@@ -305,7 +306,7 @@ func execBackup(
305306
if err != nil {
306307
return err
307308
}
308-
key, err := blockio.EncodeLocationFromString(metaLoc)
309+
key, err := objectio.StringToLocation(metaLoc)
309310
if err != nil {
310311
return err
311312
}
@@ -372,7 +373,7 @@ func execBackup(
372373
taeFileList = append(taeFileList, sizeList...)
373374
now = time.Now()
374375
if trimInfo != "" {
375-
cnLocation, err := blockio.EncodeLocationFromString(cnLoc)
376+
cnLocation, err := objectio.StringToLocation(cnLoc)
376377
if err != nil {
377378
return err
378379
}
@@ -426,32 +427,34 @@ func execBackup(
426427

427428
// CopyCheckpointDir copy checkpoint dir from srcFs to dstFs
428429
// return taeFile list
429-
// copy: if copy is true,it means not to check the suffix name and copy all files.
430+
// doCopy: if doCopy is true,it means not to check the suffix name and doCopy all files.
430431
func copyFileAndGetMetaFiles(
431432
ctx context.Context,
432433
srcFs, dstFs fileservice.FileService,
433434
dir string,
434435
backup types.TS,
435-
decodeFunc func(string) (types.TS, types.TS, string),
436-
copy bool,
437-
) ([]*taeFile, []*checkpoint.MetaFile, []fileservice.DirEntry, error) {
436+
decoder func(string) ioutil.TSRangeFile,
437+
doCopy bool,
438+
) ([]*taeFile, []ioutil.TSRangeFile, []fileservice.DirEntry, error) {
438439
files, err := fileservice.SortedList(srcFs.List(ctx, dir))
439440
if err != nil {
440441
return nil, nil, nil, err
441442
}
442443
taeFileList := make([]*taeFile, 0, len(files))
443-
metaFiles := make([]*checkpoint.MetaFile, 0)
444+
metaFiles := make([]ioutil.TSRangeFile, 0)
444445
var checksum []byte
445446
for i, file := range files {
446447
if file.IsDir {
447448
panic("not support dir")
448449
}
449-
start, end, ext := decodeFunc(file.Name)
450-
if !backup.IsEmpty() && start.GE(&backup) {
450+
meta := decoder(file.Name)
451+
meta.SetIdx(i)
452+
453+
if !backup.IsEmpty() && meta.GetStart().GE(&backup) {
451454
logutil.Infof("[Backup] skip file %v", file.Name)
452455
continue
453456
}
454-
if copy || ext == blockio.AcctExt || ext == blockio.SnapshotExt {
457+
if doCopy || meta.IsAcctExt() || meta.IsSnapshotExt() {
455458
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, file.Name, dir)
456459
if err != nil {
457460
return nil, nil, nil, err
@@ -465,19 +468,16 @@ func copyFileAndGetMetaFiles(
465468
})
466469
}
467470

468-
if copy || ext == blockio.CheckpointExt || ext == blockio.GCFullExt {
469-
metaFile := checkpoint.NewMetaFile(i, start, end, file.Name)
470-
metaFiles = append(metaFiles, metaFile)
471+
if doCopy || meta.IsCKPFile() || meta.IsFullGCExt() {
472+
metaFiles = append(metaFiles, meta)
471473
}
472474
}
473475

474476
if len(metaFiles) == 0 {
475477
return taeFileList, metaFiles, files, nil
476478
}
477479
sort.Slice(metaFiles, func(i, j int) bool {
478-
end1 := metaFiles[i].GetEnd()
479-
end2 := metaFiles[j].GetEnd()
480-
return end1.LT(&end2)
480+
return metaFiles[i].GetEnd().LT(metaFiles[j].GetEnd())
481481
})
482482

483483
return taeFileList, metaFiles, files, nil
@@ -492,17 +492,17 @@ func CopyGCDir(
492492
var checksum []byte
493493

494494
taeFileList, metaFiles, files, err := copyFileAndGetMetaFiles(
495-
ctx, srcFs, dstFs, dir, backup, blockio.DecodeGCMetadataFileName, false)
495+
ctx, srcFs, dstFs, dir, backup, ioutil.DecodeGCMetadataName, false,
496+
)
496497
if err != nil {
497498
return nil, err
498499
}
499500

500-
copyFiles := make([]*checkpoint.MetaFile, 0)
501+
copyFiles := make([]ioutil.TSRangeFile, 0)
501502

502503
for _, metaFile := range metaFiles {
503-
name := metaFile.GetName()
504504
window := gc.NewGCWindow(common.DebugAllocator, srcFs)
505-
err = window.ReadTable(ctx, gc.GCMetaDir+name, srcFs)
505+
err = window.ReadTable(ctx, metaFile.GetGCFullName(), srcFs)
506506
if err != nil {
507507
return nil, err
508508
}
@@ -519,7 +519,7 @@ func CopyGCDir(
519519
}
520520
filesList = append(filesList, &taeFile{
521521
path: object.ObjectName().String(),
522-
size: files[metaFile.GetIndex()].Size,
522+
size: files[metaFile.GetIdx()].Size,
523523
checksum: checksum,
524524
needCopy: true,
525525
ts: backup,
@@ -534,13 +534,12 @@ func CopyGCDir(
534534
for i, metaFile := range copyFiles {
535535
name := metaFile.GetName()
536536
if i == len(metaFiles)-1 {
537-
end := metaFile.GetEnd()
538-
if !min.IsEmpty() && end.LT(&min) {
537+
if !min.IsEmpty() && metaFile.GetEnd().LT(&min) {
539538
// It means that the gc consumption is too slow, and the gc water level needs to be raised.
540539
// Otherwise, the gc will not work after the cluster is restored because it cannot find the checkpoint.
541540
// The gc water level is determined by the name of the meta,
542541
// so the name of the last gc meta needs to be modified.
543-
name = blockio.UpdateGCMetadataFileName(name, end, min)
542+
name = ioutil.InheritGCMetadataName(name, metaFile.GetEnd(), &min)
544543
}
545544
}
546545
checksum, err = CopyFileWithRetry(ctx, srcFs, dstFs, metaFile.GetName(), dir, name)
@@ -549,7 +548,7 @@ func CopyGCDir(
549548
}
550549
taeFileList = append(taeFileList, &taeFile{
551550
path: dir + string(os.PathSeparator) + name,
552-
size: files[metaFile.GetIndex()].Size,
551+
size: files[metaFile.GetIdx()].Size,
553552
checksum: checksum,
554553
needCopy: true,
555554
ts: backup,
@@ -563,11 +562,14 @@ func CopyCheckpointDir(
563562
srcFs, dstFs fileservice.FileService,
564563
dir string, backup types.TS,
565564
) ([]*taeFile, types.TS, error) {
566-
decodeFunc := func(name string) (types.TS, types.TS, string) {
567-
start, end, _ := blockio.DecodeCheckpointMetadataFileName(name)
568-
return start, end, ""
565+
decoder := func(name string) ioutil.TSRangeFile {
566+
meta := ioutil.DecodeCKPMetaName(name)
567+
meta.SetExt("")
568+
return meta
569569
}
570-
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(ctx, srcFs, dstFs, dir, backup, decodeFunc, true)
570+
taeFileList, metaFiles, _, err := copyFileAndGetMetaFiles(
571+
ctx, srcFs, dstFs, dir, backup, decoder, true,
572+
)
571573
if err != nil {
572574
return nil, types.TS{}, err
573575
}
@@ -577,7 +579,7 @@ func CopyCheckpointDir(
577579
for i := len(metaFiles) - 1; i >= 0; i-- {
578580
ckpStart := metaFiles[i].GetStart()
579581
if ckpStart.IsEmpty() {
580-
minTs = metaFiles[i].GetEnd()
582+
minTs = *metaFiles[i].GetEnd()
581583
break
582584
}
583585
}

pkg/bootstrap/custom_upgrade.go

+1
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
)
3030

3131
func (s *service) UpgradeTenant(ctx context.Context, tenantName string, retryCount uint32, isALLAccount bool) (bool, error) {
32+
s.logger.Info("start manual bootstrap upgrade")
3233
// Before manually upgrade, check if there are unready upgrade tasks in system upgrade environment
3334
err := s.UpgradePreCheck(ctx)
3435
if err != nil {

0 commit comments

Comments
 (0)