From 47baa33589e84ec02ee43d7d242d1b3bc8b6692b Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Wed, 18 Dec 2024 11:46:13 +0800 Subject: [PATCH 1/6] fix back up --- pkg/backup/backup_test.go | 75 ++++++++++++++++++++++ pkg/vm/engine/tae/catalog/catalogreplay.go | 2 +- pkg/vm/engine/tae/logtail/backup.go | 4 ++ 3 files changed, 80 insertions(+), 1 deletion(-) diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index eba0d42b7eb64..8eed272a31c0d 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -264,6 +264,81 @@ func TestBackupData2(t *testing.T) { } +func TestBackupData3(t *testing.T) { + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + ctx := context.Background() + + opts := config.WithLongScanAndCKPOpts(nil) + db := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer db.Close() + defer opts.Fs.Close(ctx) + + schema := catalog.MockSchemaAll(13, 3) + schema.Extra.BlockMaxRows = 10 + schema.Extra.ObjectMaxBlocks = 10 + db.BindSchema(schema) + + totalRows := 20 + bat := catalog.MockBatch(schema, int(totalRows)) + defer bat.Close() + db.CreateRelAndAppend2(bat, true) + t.Log(db.Catalog.SimplePPString(common.PPL1)) + + dir := path.Join(db.Dir, "/local") + c := fileservice.Config{ + Name: defines.LocalFileServiceName, + Backend: "DISK", + DataDir: dir, + } + service, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + defer service.Close(ctx) + backupTime := time.Now().UTC() + currTs := types.BuildTS(backupTime.UnixNano(), 0) + locations := make([]string, 0) + locations = append(locations, backupTime.Format(time.DateTime)) + location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second) + assert.Nil(t, err) + db.BGCheckpointRunner.DisableCheckpoint() + locations = append(locations, location) + compacted := db.BGCheckpointRunner.GetCompacted() + checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted) + files := make(map[string]string, 0) + for _, candidate := range checkpoints { + if files[candidate.GetLocation().Name().String()] == "" { + var loc string + loc = candidate.GetLocation().String() + loc += ":" + loc += fmt.Sprintf("%d", candidate.GetVersion()) + files[candidate.GetLocation().Name().String()] = loc + } + } + for _, location := range files { + locations = append(locations, location) + } + fileList := make([]*taeFile, 0) + err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList) + assert.Nil(t, err) + fileMap := make(map[string]struct{}) + for _, file := range fileList { + _, ok := fileMap[file.path] + assert.True(t, !ok) + fileMap[file.path] = struct{}{} + } + db.Opts.Fs = service + db.Restart(ctx) + t.Log(db.Catalog.SimplePPString(3)) + txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name) + testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true) + assert.NoError(t, txn.Commit(context.Background())) + db.MergeBlocks(true) + db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second) + t.Log(db.Catalog.SimplePPString(3)) + db.Restart(ctx) + +} + func Test_saveTaeFilesList(t *testing.T) { type args struct { ctx context.Context diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 7bec226ae909e..9dcff066832d0 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -500,7 +500,7 @@ func (catalog *Catalog) onReplayCheckpointObject( if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() { obj = newObject() rel.AddEntryLocked(obj) - logutil.Warnf("obj %v, tbl %v-%d delete %v, create %v, end %v", + logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), entryNode.DeletedAt.ToString(), txnNode.End.ToString()) } else { diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 6884c8321583b..67f108fea5470 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -729,6 +729,10 @@ func ReWriteCheckpointAndBlockFromKey( objectio.WithSorted()(insertObjData[i].stats) newMeta.GetVectorByName(ObjectAttr_ObjectStats).Update(row, insertObjData[i].stats[:], false) newMeta.GetVectorByName(EntryNode_DeleteAt).Update(row, types.TS{}, false) + createTS := newMeta.GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS) + newMeta.GetVectorByName(txnbase.SnapshotAttr_CommitTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_StartTS).Update(row, createTS.Prev(), false) } } } From 348961736e22d2dbf435a20fd21ad5b85ec0f947 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Wed, 18 Dec 2024 14:18:28 +0800 Subject: [PATCH 2/6] update create node in replay --- pkg/vm/engine/tae/catalog/catalogreplay.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 9dcff066832d0..78028cac68bbc 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -500,6 +500,7 @@ func (catalog *Catalog) onReplayCheckpointObject( if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() { obj = newObject() rel.AddEntryLocked(obj) + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), entryNode.DeletedAt.ToString(), txnNode.End.ToString()) From 1893d9ccdf99de25f93e2df9691c661a641820be Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 19 Dec 2024 11:00:12 +0800 Subject: [PATCH 3/6] reset create node and delete node in replay --- pkg/vm/engine/tae/catalog/catalogreplay.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 78028cac68bbc..80b6ec2dc6b9b 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -506,9 +506,11 @@ func (catalog *Catalog) onReplayCheckpointObject( entryNode.DeletedAt.ToString(), txnNode.End.ToString()) } else { if !entryNode.DeletedAt.IsEmpty() { - panic(fmt.Sprintf("logic error: obj %v, tbl %v-%d create %v, delete %v, end %v", + logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), - entryNode.DeletedAt.ToString(), txnNode.End.ToString())) + entryNode.DeletedAt.ToString(), txnNode.End.ToString()) + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) + obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(obj.DeleteAt) } } } From eabc4f115882c4e6bb7192c22148a279b7598c65 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 19 Dec 2024 11:13:55 +0800 Subject: [PATCH 4/6] update --- pkg/vm/engine/tae/catalog/catalogreplay.go | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index 80b6ec2dc6b9b..a5500c292e29e 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -509,8 +509,13 @@ func (catalog *Catalog) onReplayCheckpointObject( logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), entryNode.DeletedAt.ToString(), txnNode.End.ToString()) - obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) - obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(obj.DeleteAt) + obj, _ = rel.GetObjectByID(objid, isTombstone) + if obj == nil { + obj = newObject() + rel.AddEntryLocked(obj) + } + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.CreatedAt) + obj.DeleteNode = *txnbase.NewTxnMVCCNodeWithTS(entryNode.DeletedAt) } } } From b79fdd9763c6ba7cf269414ed80d328dee4c5207 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Thu, 19 Dec 2024 14:10:28 +0800 Subject: [PATCH 5/6] add ut --- pkg/backup/backup_test.go | 86 +++++++++++++++++++++- pkg/vm/engine/tae/catalog/catalogreplay.go | 6 +- pkg/vm/engine/tae/logtail/backup.go | 12 ++- 3 files changed, 97 insertions(+), 7 deletions(-) diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 8eed272a31c0d..694d2f4d55ac8 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -17,19 +17,21 @@ package backup import ( "context" "fmt" - "github.com/panjf2000/ants/v2" - "github.com/prashantv/gostub" "path" "sync" "testing" "time" + "github.com/panjf2000/ants/v2" + "github.com/prashantv/gostub" + "github.com/matrixorigin/matrixone/pkg/common/runtime" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/defines" "github.com/matrixorigin/matrixone/pkg/fileservice" "github.com/matrixorigin/matrixone/pkg/logservice" "github.com/matrixorigin/matrixone/pkg/sql/parsers/tree" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/catalog" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/db/testutil" @@ -339,6 +341,86 @@ func TestBackupData3(t *testing.T) { } +func TestBackupData4(t *testing.T) { + defer testutils.AfterTest(t)() + testutils.EnsureNoLeak(t) + ctx := context.Background() + + fault.Enable() + defer fault.Disable() + fault.AddFaultPoint(ctx, "back up UT", ":::", "echo", 0, "debug", false) + defer fault.RemoveFaultPoint(ctx, "back up UT") + + opts := config.WithLongScanAndCKPOpts(nil) + db := testutil.NewTestEngine(ctx, ModuleName, t, opts) + defer db.Close() + defer opts.Fs.Close(ctx) + + schema := catalog.MockSchemaAll(13, 3) + schema.Extra.BlockMaxRows = 10 + schema.Extra.ObjectMaxBlocks = 10 + db.BindSchema(schema) + + totalRows := 20 + bat := catalog.MockBatch(schema, int(totalRows)) + defer bat.Close() + db.CreateRelAndAppend2(bat, true) + t.Log(db.Catalog.SimplePPString(common.PPL1)) + + dir := path.Join(db.Dir, "/local") + c := fileservice.Config{ + Name: defines.LocalFileServiceName, + Backend: "DISK", + DataDir: dir, + } + service, err := fileservice.NewFileService(ctx, c, nil) + assert.Nil(t, err) + defer service.Close(ctx) + backupTime := time.Now().UTC() + currTs := types.BuildTS(backupTime.UnixNano(), 0) + locations := make([]string, 0) + locations = append(locations, backupTime.Format(time.DateTime)) + location, err := db.ForceCheckpointForBackup(ctx, currTs, 20*time.Second) + assert.Nil(t, err) + db.BGCheckpointRunner.DisableCheckpoint() + locations = append(locations, location) + compacted := db.BGCheckpointRunner.GetCompacted() + checkpoints := db.BGCheckpointRunner.GetAllCheckpointsForBackup(compacted) + files := make(map[string]string, 0) + for _, candidate := range checkpoints { + if files[candidate.GetLocation().Name().String()] == "" { + var loc string + loc = candidate.GetLocation().String() + loc += ":" + loc += fmt.Sprintf("%d", candidate.GetVersion()) + files[candidate.GetLocation().Name().String()] = loc + } + } + for _, location := range files { + locations = append(locations, location) + } + fileList := make([]*taeFile, 0) + err = execBackup(ctx, "", db.Opts.Fs, service, locations, 1, types.TS{}, "full", &fileList) + assert.Nil(t, err) + fileMap := make(map[string]struct{}) + for _, file := range fileList { + _, ok := fileMap[file.path] + assert.True(t, !ok) + fileMap[file.path] = struct{}{} + } + db.Opts.Fs = service + db.Restart(ctx) + t.Log(db.Catalog.SimplePPString(3)) + txn, rel := testutil.GetDefaultRelation(t, db.DB, schema.Name) + testutil.CheckAllColRowsByScan(t, rel, int(totalRows), true) + assert.NoError(t, txn.Commit(context.Background())) + db.MergeBlocks(true) + db.ForceGlobalCheckpoint(ctx, db.TxnMgr.Now(), time.Second, time.Second) + t.Log(db.Catalog.SimplePPString(3)) + db.Restart(ctx) + +} + func Test_saveTaeFilesList(t *testing.T) { type args struct { ctx context.Context diff --git a/pkg/vm/engine/tae/catalog/catalogreplay.go b/pkg/vm/engine/tae/catalog/catalogreplay.go index a5500c292e29e..95bf7aed87b45 100644 --- a/pkg/vm/engine/tae/catalog/catalogreplay.go +++ b/pkg/vm/engine/tae/catalog/catalogreplay.go @@ -21,6 +21,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/moerr" "github.com/matrixorigin/matrixone/pkg/container/types" "github.com/matrixorigin/matrixone/pkg/logutil" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/common" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/containers" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/iface/txnif" @@ -500,7 +501,10 @@ func (catalog *Catalog) onReplayCheckpointObject( if objid.Offset() == Backup_Object_Offset && entryNode.DeletedAt.IsEmpty() { obj = newObject() rel.AddEntryLocked(obj) - obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) + _, sarg, _ := fault.TriggerFault("back up UT") + if sarg == "" { + obj.CreateNode = *txnbase.NewTxnMVCCNodeWithTS(obj.CreatedAt) + } logutil.Warnf("obj %v, tbl %v-%d create %v, delete %v, end %v", objid.String(), rel.fullName, rel.ID, entryNode.CreatedAt.ToString(), entryNode.DeletedAt.ToString(), txnNode.End.ToString()) diff --git a/pkg/vm/engine/tae/logtail/backup.go b/pkg/vm/engine/tae/logtail/backup.go index 67f108fea5470..0405569342904 100644 --- a/pkg/vm/engine/tae/logtail/backup.go +++ b/pkg/vm/engine/tae/logtail/backup.go @@ -22,6 +22,7 @@ import ( "github.com/matrixorigin/matrixone/pkg/common/mpool" "github.com/matrixorigin/matrixone/pkg/container/vector" "github.com/matrixorigin/matrixone/pkg/pb/plan" + "github.com/matrixorigin/matrixone/pkg/util/fault" "github.com/matrixorigin/matrixone/pkg/vm/engine" "github.com/matrixorigin/matrixone/pkg/vm/engine/tae/index" "go.uber.org/zap" @@ -729,10 +730,13 @@ func ReWriteCheckpointAndBlockFromKey( objectio.WithSorted()(insertObjData[i].stats) newMeta.GetVectorByName(ObjectAttr_ObjectStats).Update(row, insertObjData[i].stats[:], false) newMeta.GetVectorByName(EntryNode_DeleteAt).Update(row, types.TS{}, false) - createTS := newMeta.GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS) - newMeta.GetVectorByName(txnbase.SnapshotAttr_CommitTS).Update(row, createTS, false) - newMeta.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).Update(row, createTS, false) - newMeta.GetVectorByName(txnbase.SnapshotAttr_StartTS).Update(row, createTS.Prev(), false) + _, sarg, _ := fault.TriggerFault("back up UT") + if sarg == "" { + createTS := newMeta.GetVectorByName(EntryNode_CreateAt).Get(i).(types.TS) + newMeta.GetVectorByName(txnbase.SnapshotAttr_CommitTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_PrepareTS).Update(row, createTS, false) + newMeta.GetVectorByName(txnbase.SnapshotAttr_StartTS).Update(row, createTS.Prev(), false) + } } } } From 688bd2199d078aadb59a7d4295c38a34ed6bcb91 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Mon, 23 Dec 2024 15:43:14 +0800 Subject: [PATCH 6/6] fix --- pkg/backup/backup_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/backup/backup_test.go b/pkg/backup/backup_test.go index 694d2f4d55ac8..64fb74f1b8160 100644 --- a/pkg/backup/backup_test.go +++ b/pkg/backup/backup_test.go @@ -345,10 +345,10 @@ func TestBackupData4(t *testing.T) { defer testutils.AfterTest(t)() testutils.EnsureNoLeak(t) ctx := context.Background() - + fault.Enable() defer fault.Disable() - fault.AddFaultPoint(ctx, "back up UT", ":::", "echo", 0, "debug", false) + fault.AddFaultPoint(ctx, "back up UT", ":::", "echo", 0, "debug") defer fault.RemoveFaultPoint(ctx, "back up UT") opts := config.WithLongScanAndCKPOpts(nil)