Skip to content

Commit

Permalink
address comments
Browse files Browse the repository at this point in the history
Signed-off-by: AndreMouche <[email protected]>
  • Loading branch information
AndreMouche committed Jan 3, 2025
1 parent bc5287d commit 0283049
Showing 1 changed file with 15 additions and 9 deletions.
24 changes: 15 additions & 9 deletions txnkv/transaction/prewrite.go
Original file line number Diff line number Diff line change
Expand Up @@ -301,7 +301,10 @@ type prewrite1BatchReqHandler struct {
sender *locate.RegionRequestSender
resolvingRecordToken *int
attempts int
begin time.Time
// begin is the time when the first request is sent,
// it will be reset once the total duration exceeds the slowRequestThreshold
// It's used to log slow prewrite requests.
begin time.Time
}

func (action actionPrewrite) newSingleBatchPrewriteReqHandler(c *twoPhaseCommitter, batch batchMutations, bo *retry.Backoffer) *prewrite1BatchReqHandler {
Expand Down Expand Up @@ -342,27 +345,30 @@ func (handler *prewrite1BatchReqHandler) drop(err error) {
}
}

func (handler *prewrite1BatchReqHandler) beforeSend() {
func (handler *prewrite1BatchReqHandler) beforeSend(reqBegin time.Time) {
handler.attempts++
if handler.action.hasRpcRetries {
handler.req.IsRetryRequest = true
}
reqBegin := time.Now()
if handler.attempts == 1 {
return
}
if reqBegin.Sub(handler.begin) > slowRequestThreshold {
logutil.BgLogger().Warn(
"slow prewrite request",
zap.Uint64("startTS", handler.committer.startTS),
zap.Stringer("region", &handler.batch.region),
zap.Int("attempts", handler.attempts),
)
handler.begin = time.Now()
handler.begin = reqBegin
}
}

// sendAndCheckReq sends the prewrite request to the TiKV server and check the response.
// If the TiKV server returns a retryable error, the function returns true. Otherwise, it returns false.
func (handler *prewrite1BatchReqHandler) sendReqAndCheck() (retryable bool, err error) {
handler.beforeSend()
reqBegin := time.Now()
handler.beforeSend(reqBegin)
resp, retryTimes, err := handler.sender.SendReq(handler.bo, handler.req, handler.batch.region, client.ReadTimeoutShort)
// Unexpected error occurs, return it directly.
if err != nil {
Expand All @@ -385,7 +391,7 @@ func (handler *prewrite1BatchReqHandler) sendReqAndCheck() (retryable bool, err
prewriteResp := resp.Resp.(*kvrpcpb.PrewriteResponse)
keyErrs := prewriteResp.GetErrors()
if len(keyErrs) == 0 {
return false, handler.handleSingleBatchSucceed(prewriteResp)
return false, handler.handleSingleBatchSucceed(reqBegin, prewriteResp)
}

locks, e := handler.extractKeyErrs(keyErrs)
Expand Down Expand Up @@ -520,12 +526,12 @@ func (handler *prewrite1BatchReqHandler) resolveLocks(locks []*txnlock.Lock) err
}

// handleSingleBatchSucceed handles the response when the prewrite request is successful.
func (handler *prewrite1BatchReqHandler) handleSingleBatchSucceed(prewriteResp *kvrpcpb.PrewriteResponse) error {
func (handler *prewrite1BatchReqHandler) handleSingleBatchSucceed(reqBegin time.Time, prewriteResp *kvrpcpb.PrewriteResponse) error {
// Clear the RPC Error since the request is evaluated successfully.
handler.sender.SetRPCError(nil)

// Update CommitDetails
reqDuration := time.Since(handler.begin)
reqDuration := time.Since(reqBegin)
handler.committer.getDetail().MergePrewriteReqDetails(
reqDuration,
handler.batch.region.GetID(),
Expand All @@ -539,7 +545,7 @@ func (handler *prewrite1BatchReqHandler) handleSingleBatchSucceed(prewriteResp *
// In this case 1PC is not expected to be used, but still check it for safety.
if int64(handler.committer.txnSize) > config.GetGlobalConfig().TiKVClient.TTLRefreshedTxnSize &&
prewriteResp.OnePcCommitTs == 0 {
handler.committer.run(handler.committer, nil, false)
handler.committer.ttlManager.run(handler.committer, nil, false)
}
}
if handler.committer.isOnePC() {
Expand Down

0 comments on commit 0283049

Please sign in to comment.