diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index eba0d42b7eb64..64fb74f1b8160 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -17,19 +17,21 @@ package backup import ( "context" "fmt" - "github.com/panjf2000/ants/v2" - "github.com/prashantv/gostub" "path" "sync" "testing" "time" + "github.com/panjf2000/ants/v2" + "github.com/prashantv/gostub" + "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logservice" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" @@ -264,6 +266,161 @@ func TestBackupData2(t *testing.T) { } +func TestBackupData3(t *testing.T) { + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + ctx := context.Background() + + opts := config.WithLongScanAndCKPOpts(nil) + db := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer db.Close() + defer opts.Fs.Close(ctx) + + schema := catalog.MockSchemaAll(13, 3) + schema.Extra.BlockMaxRows = 10 + schema.Extra.ObjectMaxBlocks = 10 + db.BindSchema(schema) + + totalRows := 20 + bat := catalog.MockBatch(schema, int(totalRows)) + defer bat.Close() + db.CreateRelAndAppend2(bat, true) + t.Log(db.Catalog.SimplePPString(common.PPL1)) + + dir := path.Join(db.Dir, "/local") + c := fileservice.Config{ + Name: defines.LocalFileServiceName, + Backend: "DISK", + DataDir: dir, + } + service, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + defer service.Close(ctx) + backupTime := time.Now().UTC() + currTs := types.BuildTS(backupTime.UnixNano(), 0) + locations := make([]string, 0) + locations = append(locations, backupTime.Format(time.DateTime)) + location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second) + assert.Nil(t, err) + db.BGCheckpointRunner.DisableCheckpoint() + locations = append(locations, location) + compacted := db.BGCheckpointRunner.GetCompacted() + checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted) + files := make(map[string]string, 0) + for _, candidate := range checkpoints { + if files[candidate.GetLocation().Name().String()] == "" { + var loc string + loc = candidate.GetLocation().String() + loc += ":" + loc += fmt.Sprintf("%d", candidate.GetVersion()) + files[candidate.GetLocation().Name().String()] = loc + } + } + for _, location := range files { + locations = append(locations, location) + } + fileList := make([]*taeFile, 0) + err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList) + assert.Nil(t, err) + fileMap := make(map[string]struct{}) + for _, file := range fileList { + _, ok := fileMap[file.path] + assert.True(t, !ok) + fileMap[file.path] = struct{}{} + } + db.Opts.Fs = service + db.Restart(ctx) + t.Log(db.Catalog.SimplePPString(3)) + txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name) + testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true) + assert.NoError(t, txn.Commit(context.Background())) + db.MergeBlocks(true) + db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second) + t.Log(db.Catalog.SimplePPString(3)) + db.Restart(ctx) + +} + +func TestBackupData4(t *testing.T) { + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + ctx := context.Background() + + fault.Enable() + defer fault.Disable() + fault.AddFaultPoint(ctx, "back up UT", ":::", "echo", 0, "debug") + defer fault.RemoveFaultPoint(ctx, "back up UT") + + opts := config.WithLongScanAndCKPOpts(nil) + db := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer db.Close() + defer opts.Fs.Close(ctx) + + schema := catalog.MockSchemaAll(13, 3) + schema.Extra.BlockMaxRows = 10 + schema.Extra.ObjectMaxBlocks = 10 + db.BindSchema(schema) + + totalRows := 20 + bat := catalog.MockBatch(schema, int(totalRows)) + defer bat.Close() + db.CreateRelAndAppend2(bat, true) + t.Log(db.Catalog.SimplePPString(common.PPL1)) + + dir := path.Join(db.Dir, "/local") + c := fileservice.Config{ + Name: defines.LocalFileServiceName, + Backend: "DISK", + DataDir: dir, + } + service, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + defer service.Close(ctx) + backupTime := time.Now().UTC() + currTs := types.BuildTS(backupTime.UnixNano(), 0) + locations := make([]string, 0) + locations = append(locations, backupTime.Format(time.DateTime)) + location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second) + assert.Nil(t, err) + db.BGCheckpointRunner.DisableCheckpoint() + locations = append(locations, location) + compacted := db.BGCheckpointRunner.GetCompacted() + checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted) + files := make(map[string]string, 0) + for _, candidate := range checkpoints { + if files[candidate.GetLocation().Name().String()] == "" { + var loc string + loc = candidate.GetLocation().String() + loc += ":" + loc += fmt.Sprintf("%d", candidate.GetVersion()) + files[candidate.GetLocation().Name().String()] = loc + } + } + for _, location := range files { + locations = append(locations, location) + } + fileList := make([]*taeFile, 0) + err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList) + assert.Nil(t, err) + fileMap := make(map[string]struct{}) + for _, file := range fileList { + _, ok := fileMap[file.path] + assert.True(t, !ok) + fileMap[file.path] = struct{}{} + } + db.Opts.Fs = service + db.Restart(ctx) + t.Log(db.Catalog.SimplePPString(3)) + txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name) + testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true) + assert.NoError(t, txn.Commit(context.Background())) + db.MergeBlocks(true) + db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second) + t.Log(db.Catalog.SimplePPString(3)) + db.Restart(ctx) + +} + func Test_saveTaeFilesList(t *testing.T) { type args struct { ctx context.Context diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 7bec226ae909e..95bf7aed87b45 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -21,6 +21,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -500,14 +501,25 @@ func (catalog *Catalog) onReplayCheckpointObject( if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() { obj = newObject() rel.AddEntryLocked(obj) - logutil.Warnf("obj %v, tbl %v-%d delete %v, create %v, end %v", + _, sarg, _ := fault.TriggerFault("back up UT") + if sarg == "" { + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) + } + logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), entryNode.DeletedAt.ToString(), txnNode.End.ToString()) } else { if !entryNode.DeletedAt.IsEmpty() { - panic(fmt.Sprintf("logic error: obj %v, tbl %v-%d create %v, delete %v, end %v", + logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), - entryNode.DeletedAt.ToString(), txnNode.End.ToString())) + entryNode.DeletedAt.ToString(), txnNode.End.ToString()) + obj, _ = rel.GetObjectByID(objid, isTombstone) + if obj == nil { + obj = newObject() + rel.AddEntryLocked(obj) + } + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.CreatedAt) + obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.DeletedAt) } } } diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 6884c8321583b..0405569342904 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "go.uber.org/zap" @@ -729,6 +730,13 @@ func ReWriteCheckpointAndBlockFromKey( objectio.WithSorted()(insertObjData[i].stats) newMeta.GetVectorByName(ObjectAttr_ObjectStats).Update(row, insertObjData[i].stats[:], false) newMeta.GetVectorByName(EntryNode_DeleteAt).Update(row, types.TS{}, false) + _, sarg, _ := fault.TriggerFault("back up UT") + if sarg == "" { + createTS := newMeta.GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS) + newMeta.GetVectorByName(txnbase.SnapshotAttr_CommitTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_StartTS).Update(row, createTS.Prev(), false) + } } } }