Skip to content

Commit 81810d8

Browse files
authored
fix big txn dump (matrixorigin#20900)
修复了workspace 超过10M的大事务不刷盘的问题 Approved by: @triump2020, @XuPeng-SH, @sukki37
1 parent 6881f92 commit 81810d8

File tree

2 files changed

+35
-31
lines changed

2 files changed

+35
-31
lines changed

pkg/vm/engine/disttae/txn.go

+34-31
Original file line numberDiff line numberDiff line change
@@ -493,6 +493,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
493493
"WORKSPACE-QUOTA-ACQUIRE",
494494
zap.Uint64("quota", quota),
495495
zap.Uint64("remaining", remaining),
496+
zap.String("txn", txn.op.Txn().DebugString()),
496497
)
497498
txn.writeWorkspaceThreshold += quota
498499
txn.extraWriteWorkspaceThreshold += quota
@@ -513,8 +514,10 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
513514
"WORKSPACE-QUOTA-RELEASE",
514515
zap.Uint64("quota", txn.extraWriteWorkspaceThreshold),
515516
zap.Uint64("remaining", remaining),
517+
zap.String("txn", txn.op.Txn().DebugString()),
516518
)
517519
txn.extraWriteWorkspaceThreshold = 0
520+
txn.writeWorkspaceThreshold = txn.engine.config.writeWorkspaceThreshold
518521
}
519522

520523
if dumpAll {
@@ -544,7 +547,7 @@ func (txn *Transaction) dumpBatchLocked(ctx context.Context, offset int) error {
544547
}
545548

546549
func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, size *uint64, pkCount *int) error {
547-
tbSize := make(map[uint64]uint64)
550+
tbSize := make(map[uint64]int)
548551
tbCount := make(map[uint64]int)
549552
skipTable := make(map[uint64]bool)
550553

@@ -556,8 +559,8 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
556559
continue
557560
}
558561
if txn.writes[i].typ == INSERT && txn.writes[i].fileName == "" {
562+
tbSize[txn.writes[i].tableId] += txn.writes[i].bat.Size()
559563
tbCount[txn.writes[i].tableId] += txn.writes[i].bat.RowCount()
560-
tbSize[txn.writes[i].tableId] += uint64(txn.writes[i].bat.Size())
561564
}
562565
}
563566

@@ -568,12 +571,12 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
568571
sort.Slice(keys, func(i, j int) bool {
569572
return tbSize[keys[i]] < tbSize[keys[j]]
570573
})
571-
sum := uint64(0)
574+
sum := 0
572575
for _, k := range keys {
573576
if tbCount[k] >= txn.engine.config.insertEntryMaxCount {
574577
continue
575578
}
576-
if sum+tbSize[k] >= txn.writeWorkspaceThreshold {
579+
if uint64(sum+tbSize[k]) >= txn.commitWorkspaceThreshold {
577580
break
578581
}
579582
sum += tbSize[k]
@@ -612,11 +615,12 @@ func (txn *Transaction) dumpInsertBatchLocked(ctx context.Context, offset int, s
612615
*size += uint64(bat.Size())
613616
*pkCount += bat.RowCount()
614617
// skip rowid
615-
newBat := batch.NewWithSize(len(bat.Vecs) - 1)
616-
newBat.SetAttributes(bat.Attrs[1:])
617-
newBat.Vecs = bat.Vecs[1:]
618-
newBat.SetRowCount(bat.Vecs[0].Length())
619-
mp[tbKey] = append(mp[tbKey], newBat)
618+
newBatch := batch.NewWithSize(len(bat.Vecs) - 1)
619+
newBatch.SetAttributes(bat.Attrs[1:])
620+
newBatch.Vecs = bat.Vecs[1:]
621+
newBatch.SetRowCount(bat.Vecs[0].Length())
622+
mp[tbKey] = append(mp[tbKey], newBatch)
623+
defer bat.Clean(txn.proc.GetMPool())
620624

621625
keepElement = false
622626
}
@@ -719,12 +723,12 @@ func (txn *Transaction) dumpDeleteBatchLocked(ctx context.Context, offset int, s
719723
deleteCnt += bat.RowCount()
720724
*size += uint64(bat.Size())
721725

722-
newBat := batch.NewWithSize(len(bat.Vecs))
723-
newBat.SetAttributes(bat.Attrs)
724-
newBat.Vecs = bat.Vecs
725-
newBat.SetRowCount(bat.Vecs[0].Length())
726+
newBatch := batch.NewWithSize(len(bat.Vecs))
727+
newBatch.SetAttributes(bat.Attrs)
728+
newBatch.Vecs = bat.Vecs
729+
newBatch.SetRowCount(bat.Vecs[0].Length())
726730

727-
mp[tbKey] = append(mp[tbKey], newBat)
731+
mp[tbKey] = append(mp[tbKey], newBatch)
728732
defer bat.Clean(txn.proc.GetMPool())
729733

730734
keepElement = false
@@ -1308,6 +1312,22 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
13081312
return nil, nil
13091313
}
13101314

1315+
if err := txn.IncrStatementID(ctx, true); err != nil {
1316+
return nil, err
1317+
}
1318+
1319+
if err := txn.transferTombstonesByCommit(ctx); err != nil {
1320+
return nil, err
1321+
}
1322+
1323+
if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
1324+
return nil, err
1325+
}
1326+
if err := txn.dumpBatchLocked(ctx, -1); err != nil {
1327+
return nil, err
1328+
}
1329+
txn.traceWorkspaceLocked(true)
1330+
13111331
if txn.workspaceSize > 10*mpool.MB {
13121332
logutil.Info(
13131333
"BIG-TXN",
@@ -1332,23 +1352,6 @@ func (txn *Transaction) Commit(ctx context.Context) ([]txn.TxnRequest, error) {
13321352
)
13331353
}
13341354

1335-
if err := txn.IncrStatementID(ctx, true); err != nil {
1336-
return nil, err
1337-
}
1338-
1339-
if err := txn.transferTombstonesByCommit(ctx); err != nil {
1340-
return nil, err
1341-
}
1342-
1343-
if err := txn.mergeTxnWorkspaceLocked(ctx); err != nil {
1344-
return nil, err
1345-
}
1346-
if err := txn.dumpBatchLocked(ctx, -1); err != nil {
1347-
return nil, err
1348-
}
1349-
1350-
txn.traceWorkspaceLocked(true)
1351-
13521355
if !txn.hasS3Op.Load() &&
13531356
txn.op.TxnOptions().CheckDupEnabled() {
13541357
if err := txn.checkDup(); err != nil {

pkg/vm/engine/test/disttae_engine_test.go

+1
Original file line numberDiff line numberDiff line change
@@ -1304,6 +1304,7 @@ func TestWorkspaceQuota2(t *testing.T) {
13041304
ctx,
13051305
testutil.TestOptions{},
13061306
t,
1307+
testutil.WithDisttaeEngineCommitWorkspaceThreshold(1),
13071308
testutil.WithDisttaeEngineWriteWorkspaceThreshold(1),
13081309
testutil.WithDisttaeEngineQuota(800),
13091310
)

0 commit comments

Comments
 (0)