From 1e6f931867477219898762f4aaf188ab29538c70 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Tue, 24 Dec 2024 14:39:47 +0800 Subject: [PATCH 1/5] update readWriteConfilictCheck --- pkg/vm/engine/tae/txn/txnimpl/antidepend.go | 16 +++++++++++++--- pkg/vm/engine/tae/txn/txnimpl/table.go | 3 ++- 2 files changed, 15 insertions(+), 4 deletions(-) diff --git a/pkg/vm/engine/tae/txn/txnimpl/antidepend.go b/pkg/vm/engine/tae/txn/txnimpl/antidepend.go index 65421c0565405..be462b1bb88b9 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/antidepend.go +++ b/pkg/vm/engine/tae/txn/txnimpl/antidepend.go @@ -29,14 +29,24 @@ import ( var ErrRWConflict = moerr.NewTxnRWConflictNoCtx() -func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS) (err error) { +func readWriteConfilictCheck(entry *catalog.ObjectEntry, ts types.TS, inqueue bool) (err error) { lastNode := entry.GetLatestNode() + if !lastNode.HasDropIntent() { + return nil + } needWait, txnToWait := lastNode.GetLastMVCCNode().NeedWaitCommitting(ts) // TODO: // I don't think we need to wait here any more. `block` and `Object` are // local metadata and never be involved in a 2PC txn. So a prepared `block` // will never be rollbacked if needWait { + if inqueue { + deleteTS := txnToWait.GetPrepareTS() + if deleteTS.LT(&ts) { + err = ErrRWConflict + } + return + } txnToWait.GetTxnState(true) lastNode = entry.GetLatestNode() } @@ -131,12 +141,12 @@ func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) { if entry == nil { return } - return readWriteConfilictCheck(entry, ts) + return readWriteConfilictCheck(entry, ts, false) } func (checker *warChecker) checkAll(ts types.TS) (err error) { for _, obj := range checker.readSet { - if err = readWriteConfilictCheck(obj, ts); err != nil { + if err = readWriteConfilictCheck(obj, ts, true); err != nil { logutil.Error( "Txn-Check-All", zap.Error(err), diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index f670603b8e54e..ef9e95bd21310 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -163,7 +163,7 @@ func (tbl *txnTable) TransferDeleteIntent( panic(err) } ts := types.BuildTS(time.Now().UTC().UnixNano(), 0) - if err = readWriteConfilictCheck(entry, ts); err == nil { + if err = readWriteConfilictCheck(entry, ts,false); err == nil { return } err = nil @@ -504,6 +504,7 @@ func (tbl *txnTable) recurTransferDelete( err = readWriteConfilictCheck( obj, ts, + false, ) if err == nil { pkVec := tbl.store.rt.VectorPool.Small.GetVector(pkType) From 51863da37d0dd9a76b8222dd7301054bf3aa8821 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Tue, 24 Dec 2024 15:11:09 +0800 Subject: [PATCH 2/5] add ut --- pkg/vm/engine/tae/db/test/db_test.go | 34 ++++++++++++++++++++++++++++ 1 file changed, 34 insertions(+) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index 6f869696de8d4..f334884ad7b5d 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10572,6 +10572,40 @@ func TestReplayDebugLog(t *testing.T) { tae.Restart(ctx) } +func TestRW2(t *testing.T) { + ctx := context.Background() + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + + schema := catalog.MockSchemaAll(1, -1) + schema.Extra.BlockMaxRows = 2 + schema.Extra.ObjectMaxBlocks = 4 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, 1) + defer bat.Close() + /* + create obj + txn1 delete row/warchecker.add + txn2 drop obj(mock delete) + + txn2 commit + txn1 commit + */ + tae.CreateRelAndAppend(bat, true) + + txn, rel := tae.GetRelation() + obj := testutil.GetOneBlockMeta(rel) + rel.Append(ctx, bat) + { + tbl := obj.GetTable() + txn2, _ := tae.StartTxn(nil) + tbl.DropObjectEntry(obj.ID(), txn2, false) + txn2.ToPreparingLocked(tae.TxnMgr.Now()) + } + err := txn.Commit(ctx) + assert.True(t, moerr.IsMoErrCode(err, moerr.ErrTxnRWConflict)) +} + func Test_BasicTxnModeSwitch(t *testing.T) { ctx := context.Background() opts := config.WithLongScanAndCKPOpts(nil) From 2345e388f1d43ac1ee2e1b74e1adb1069e63b0be Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Tue, 24 Dec 2024 15:17:32 +0800 Subject: [PATCH 3/5] fix sca problems --- pkg/vm/engine/tae/txn/txnimpl/table.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index ef9e95bd21310..21c1aa39518d7 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -163,7 +163,7 @@ func (tbl *txnTable) TransferDeleteIntent( panic(err) } ts := types.BuildTS(time.Now().UTC().UnixNano(), 0) - if err = readWriteConfilictCheck(entry, ts,false); err == nil { + if err = readWriteConfilictCheck(entry, ts, false); err == nil { return } err = nil From 6095bfcbe0521159b44bc584fea5aa3a89b0aadc Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Tue, 24 Dec 2024 15:29:31 +0800 Subject: [PATCH 4/5] update --- pkg/vm/engine/tae/txn/txnimpl/antidepend.go | 4 ++-- pkg/vm/engine/tae/txn/txnimpl/table.go | 9 ++++++--- 2 files changed, 8 insertions(+), 5 deletions(-) diff --git a/pkg/vm/engine/tae/txn/txnimpl/antidepend.go b/pkg/vm/engine/tae/txn/txnimpl/antidepend.go index be462b1bb88b9..8663a45f895fe 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/antidepend.go +++ b/pkg/vm/engine/tae/txn/txnimpl/antidepend.go @@ -129,7 +129,7 @@ func (checker *warChecker) Insert(obj *catalog.ObjectEntry) { checker.readSet[*obj.ID()] = obj } -func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) { +func (checker *warChecker) checkOne(id *common.ID, ts types.TS, inqueue bool) (err error) { // defer func() { // logutil.Infof("checkOne blk=%s ts=%s err=%v", id.BlockString(), ts.ToString(), err) // }() @@ -141,7 +141,7 @@ func (checker *warChecker) checkOne(id *common.ID, ts types.TS) (err error) { if entry == nil { return } - return readWriteConfilictCheck(entry, ts, false) + return readWriteConfilictCheck(entry, ts, inqueue) } func (checker *warChecker) checkAll(ts types.TS) (err error) { diff --git a/pkg/vm/engine/tae/txn/txnimpl/table.go b/pkg/vm/engine/tae/txn/txnimpl/table.go index 21c1aa39518d7..44c74469e39e4 100644 --- a/pkg/vm/engine/tae/txn/txnimpl/table.go +++ b/pkg/vm/engine/tae/txn/txnimpl/table.go @@ -420,6 +420,7 @@ func (tbl *txnTable) TransferDeletes( if err = tbl.store.warChecker.checkOne( id, ts, + phase == txnif.PrePreparePhase, ); err == nil { continue } @@ -470,6 +471,7 @@ func (tbl *txnTable) recurTransferDelete( pkType *types.Type, depth int, ts types.TS, + phase string, ) error { var page2 *common.PinnedItem[*model.TransferHashPage] @@ -504,7 +506,7 @@ func (tbl *txnTable) recurTransferDelete( err = readWriteConfilictCheck( obj, ts, - false, + phase == txnif.PrePreparePhase, ) if err == nil { pkVec := tbl.store.rt.VectorPool.Small.GetVector(pkType) @@ -555,7 +557,8 @@ func (tbl *txnTable) recurTransferDelete( pk, pkType, depth+1, - ts) + ts, + phase) } func (tbl *txnTable) TransferDeleteRows( @@ -611,7 +614,7 @@ func (tbl *txnTable) TransferDeleteRows( page := pinned.Item() depth := 0 if err = tbl.recurTransferDelete( - memo, page, id, row, pk, pkType, depth, ts); err != nil { + memo, page, id, row, pk, pkType, depth, ts, phase); err != nil { return } From 916ee46e1f3cb6a4cec24106d406a3575e92ad39 Mon Sep 17 00:00:00 2001 From: jiangxinmeng1 Date: Tue, 24 Dec 2024 15:53:43 +0800 Subject: [PATCH 5/5] add ut --- pkg/vm/engine/tae/db/test/db_test.go | 69 ++++++++++++++++++++++++++++ 1 file changed, 69 insertions(+) diff --git a/pkg/vm/engine/tae/db/test/db_test.go b/pkg/vm/engine/tae/db/test/db_test.go index f334884ad7b5d..38d5c8c0e5871 100644 --- a/pkg/vm/engine/tae/db/test/db_test.go +++ b/pkg/vm/engine/tae/db/test/db_test.go @@ -10646,3 +10646,72 @@ func Test_OpenReplayDB1(t *testing.T) { assert.Error(t, db.AddCronJob(tae.DB, "unknown", false)) assert.Error(t, db.CheckCronJobs(tae.DB, db.DBTxnMode_Write)) } + +func TestRW3(t *testing.T) { + ctx := context.Background() + opts := config.WithLongScanAndCKPOpts(nil) + tae := testutil.NewTestEngine(ctx, ModuleName, t, opts) + + objCount := 100 + schema := catalog.MockSchemaAll(1, -1) + schema.Extra.BlockMaxRows = 1 + schema.Extra.ObjectMaxBlocks = 4 + tae.BindSchema(schema) + bat := catalog.MockBatch(schema, objCount) + defer bat.Close() + tae.CreateRelAndAppend(bat, true) + + objs := make([]*catalog.ObjectEntry, 0) + txn, rel := tae.GetRelation() + iter := rel.MakeObjectIt(false) + for iter.Next() { + obj := iter.GetObject().GetMeta().(*catalog.ObjectEntry) + objs = append(objs, obj) + } + assert.Equal(t, objCount, len(objs)) + err := txn.Commit(ctx) + assert.NoError(t, err) + + var wg sync.WaitGroup + + txn, rel = tae.GetRelation() + pkVec := containers.MakeVector(types.T_uint64.ToType(), common.DefaultAllocator) + defer pkVec.Close() + rowIDVec := containers.MakeVector(types.T_Rowid.ToType(), common.DefaultAllocator) + defer rowIDVec.Close() + for i, obj := range objs { + rowID := objectio.NewRowIDWithObjectIDBlkNumAndRowID(*obj.ID(), 0, 0) + rowIDVec.Append(rowID, false) + pkVec.Append(uint64(i), false) + } + err = rel.DeleteByPhyAddrKeys(rowIDVec, pkVec, handle.DT_Normal) + assert.NoError(t, err) + { + deleteObjectFn := func(offset int) func() { + return func() { + defer wg.Done() + txn, err := tae.StartTxn(nil) + assert.NoError(t, err) + obj := objs[offset] + task, err := jobs.NewFlushTableTailTask(nil, txn, []*catalog.ObjectEntry{obj}, nil, tae.Runtime) + assert.NoError(t, err) + err = task.OnExec(context.Background()) + assert.NoError(t, err) + err = txn.Commit(ctx) + assert.NoError(t, err) + } + } + workers, err := ants.NewPool(50) + assert.NoError(t, err) + for i := 0; i < objCount; i++ { + wg.Add(1) + workers.Submit(deleteObjectFn(i)) + } + } + err = txn.Commit(ctx) + assert.NoError(t, err) + wg.Wait() + + tae.CheckRowsByScan(0, true) + +}