Skip to content

Commit

Permalink
txn-file: Fix missing txn chunks in prewrite (#1350)
Browse files Browse the repository at this point in the history
Signed-off-by: Ping Yu <[email protected]>
  • Loading branch information
pingyu authored May 27, 2024
1 parent 40e651b commit e36f29c
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 380 deletions.
45 changes: 24 additions & 21 deletions txnkv/transaction/txn_file.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ import (
"github.com/tikv/client-go/v2/metrics"
"github.com/tikv/client-go/v2/tikvrpc"
"github.com/tikv/client-go/v2/txnkv/txnlock"
"github.com/tikv/client-go/v2/util"
atomicutil "go.uber.org/atomic"
"go.uber.org/zap"
)
Expand Down Expand Up @@ -142,20 +141,26 @@ func (cs *txnChunkSlice) len() int {
// []chunkBatch is sorted by region.StartKey.
// Note: regions may be overlapping.
func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoffer) ([]chunkBatch, error) {
batchMap := make(map[locate.RegionVerID]*chunkBatch)
// Do not use `locate.RegionVerID` as map key to avoid grouping chunks to different batches when `confVer` changes.
type batchMapKey struct {
regionID uint64
regionVer uint64
}
batchMap := make(map[batchMapKey]*chunkBatch)
for i, chunkRange := range cs.chunkRanges {
regions, err := chunkRange.getOverlapRegions(c, bo)
if err != nil {
return nil, errors.WithStack(err)
}

for _, r := range regions {
if batchMap[r.Region] == nil {
batchMap[r.Region] = &chunkBatch{
key := batchMapKey{regionID: r.Region.GetID(), regionVer: r.Region.GetVer()}
if batchMap[key] == nil {
batchMap[key] = &chunkBatch{
region: r,
}
}
batchMap[r.Region].append(cs.chunkIDs[i], chunkRange)
batchMap[key].append(cs.chunkIDs[i], chunkRange)
}
}

Expand All @@ -164,10 +169,17 @@ func (cs *txnChunkSlice) groupToBatches(c *locate.RegionCache, bo *retry.Backoff
batches = append(batches, *batch)
}
sort.Slice(batches, func(i, j int) bool {
return bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey) < 0
// Sort by both chunks and region, to make sure that primary key is in the first batch:
// 1. Different batches may contain the same chunks.
// 2. Different batches may have regions with same start key (if region merge happens during grouping).
cmp := bytes.Compare(batches[i].region.StartKey, batches[j].region.StartKey)
if cmp == 0 {
return bytes.Compare(batches[i].Smallest(), batches[j].Smallest()) < 0
}
return cmp < 0
})

logutil.Logger(bo.GetCtx()).Debug("txn file group to batches", zap.Stringers("batches", batches))
logutil.Logger(bo.GetCtx()).Info("txn file group to batches", zap.Stringers("batches", batches))
return batches, nil
}

Expand Down Expand Up @@ -553,7 +565,7 @@ func (c *twoPhaseCommitter) executeTxnFile(ctx context.Context) (err error) {
return
}

func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) (txnChunkSlice, error) {
func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) (txnChunkSlice, error) {
var err error
var regionErrChunks txnChunkSlice

Expand All @@ -565,10 +577,6 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice
}

for _, batch := range batches {
if successRanges.Covered(batch.region.StartKey, batch.region.EndKey) {
continue
}

resp, err1 := action.executeBatch(c, bo, batch)
logutil.Logger(bo.GetCtx()).Debug("txn file: execute batch finished",
zap.Uint64("startTS", c.startTS),
Expand Down Expand Up @@ -605,18 +613,16 @@ func (c *twoPhaseCommitter) executeTxnFileSlice(bo *retry.Backoffer, chunkSlice
regionErrChunks.appendSlice(&batch.txnChunkSlice)
continue
}

successRanges.Insert(batch.region.StartKey, batch.region.EndKey)
}
return regionErrChunks, nil
}

func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction, successRanges *util.MergeRanges) error {
func (c *twoPhaseCommitter) executeTxnFileSliceWithRetry(bo *retry.Backoffer, chunkSlice txnChunkSlice, batches []chunkBatch, action txnFileAction) error {
currentChunks := chunkSlice
currentBatches := batches
for {
var regionErrChunks txnChunkSlice
regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action, successRanges)
regionErrChunks, err := c.executeTxnFileSlice(bo, currentChunks, currentBatches, action)
if err != nil {
return errors.WithStack(err)
}
Expand Down Expand Up @@ -687,18 +693,15 @@ func (c *twoPhaseCommitter) executeTxnFileAction(bo *retry.Backoffer, chunkSlice
if len(secondaries) == 0 {
return nil
}
primaryRegion := batches[0].region
successRanges := util.NewMergeRanges()
successRanges.Insert(primaryRegion.StartKey, primaryRegion.EndKey)
var emptySlice txnChunkSlice
if !action.asyncExecuteSecondaries() {
return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges)
return c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action)
}

c.store.WaitGroup().Add(1)
errGo := c.store.Go(func() {
defer c.store.WaitGroup().Done()
err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action, successRanges)
err := c.executeTxnFileSliceWithRetry(bo, emptySlice, secondaries, action)
logutil.Logger(bo.GetCtx()).Debug("txn file: async execute secondaries finished",
zap.Uint64("startTS", c.startTS),
zap.Stringer("action", action),
Expand Down
134 changes: 0 additions & 134 deletions util/merge_ranges.go

This file was deleted.

Loading

0 comments on commit e36f29c

Please sign in to comment.