Skip to content
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 37 additions & 0 deletions pkg/lockservice/lock_table_remote.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,15 @@ import (
"go.uber.org/zap"
)

var (
// Backoff parameters for remote lock table retry loops (unlock, getLock).
// These prevent CPU spinning when the remote lock service is temporarily
// unavailable.
remoteRetryInitialBackoff = 100 * time.Millisecond
remoteRetryMaxBackoff = 5 * time.Second
remoteRetryMaxDuration = 30 * time.Second
)

// remoteLockTable the lock corresponding to the Table is managed by a remote LockTable.
// And the remoteLockTable acts as a proxy for this LockTable locally.
type remoteLockTable struct {
Expand Down Expand Up @@ -150,6 +159,8 @@ func (l *remoteLockTable) unlock(
txn,
l.bind,
)
backoff := remoteRetryInitialBackoff
deadline := time.Now().Add(remoteRetryMaxDuration)
for {
err := l.doUnlock(txn, commitTS, mutations...)
if err == nil {
Expand All @@ -171,13 +182,27 @@ func (l *remoteLockTable) unlock(
if err := l.handleError(err, false); err == nil {
return
}
if time.Now().After(deadline) {
l.logger.Error("unlock retry budget exhausted, giving up",
zap.Uint64("table-id", l.bind.Table),
zap.String("txn", hex.EncodeToString(txn.txnID)),
zap.Duration("budget", remoteRetryMaxDuration))
return
}
time.Sleep(backoff)
backoff *= 2
if backoff > remoteRetryMaxBackoff {
backoff = remoteRetryMaxBackoff
}
}
}

func (l *remoteLockTable) getLock(
key []byte,
txn pb.WaitTxn,
fn func(Lock)) {
backoff := remoteRetryInitialBackoff
deadline := time.Now().Add(remoteRetryMaxDuration)
for {
lock, ok, err := l.doGetLock(key, txn)
if err == nil {
Expand All @@ -192,6 +217,18 @@ func (l *remoteLockTable) getLock(
if err = l.handleError(err, false); err == nil {
return
}
if time.Now().After(deadline) {
l.logger.Error("getLock retry budget exhausted, giving up",
zap.Uint64("table-id", l.bind.Table),
zap.String("txn", hex.EncodeToString(txn.TxnID)),
zap.Duration("budget", remoteRetryMaxDuration))
return
}
time.Sleep(backoff)
backoff *= 2
if backoff > remoteRetryMaxBackoff {
backoff = remoteRetryMaxBackoff
}
}
}

Expand Down
6 changes: 5 additions & 1 deletion pkg/sql/colexec/lockop/lock_op.go
Original file line number Diff line number Diff line change
Expand Up @@ -869,8 +869,12 @@ func waitToRetryLock(ctx context.Context, wait time.Duration) bool {
}

func getRetryWaitDuration(err error, retryState *lockRetryState) (time.Duration, bool) {
// When backend budget is disabled, only non-backend errors may retry.
if defaultMaxWaitTimeOnRetryBackendLock <= 0 {
return 0, false
if isBoundedRetryLockError(err) {
return 0, false
}
return defaultWaitTimeOnRetryLock, true
}

now := time.Now()
Expand Down
52 changes: 52 additions & 0 deletions pkg/sql/colexec/lockop/lock_op_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,6 +404,58 @@ func TestLockWithRetryFailsFastWhenBackendRetryBudgetDisabled(t *testing.T) {
require.Less(t, time.Since(start), defaultWaitTimeOnRetryLock)
}

// TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled verifies that
// disabling the backend retry budget (setting it to 0) only prevents retries for
// backend availability errors, not for normal retryable errors like
// ErrLockTableBindChanged.
func TestLockWithRetryStillRetriesBindChangeWhenBackendBudgetDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()

lockSvc := mock_lock.NewMockLockService(ctrl)
txnOp := mock_frontend.NewMockTxnOperator(ctrl)

oldWait := defaultWaitTimeOnRetryLock
oldBudget := defaultMaxWaitTimeOnRetryBackendLock
defaultWaitTimeOnRetryLock = 50 * time.Millisecond
defaultMaxWaitTimeOnRetryBackendLock = 0
defer func() {
defaultWaitTimeOnRetryLock = oldWait
defaultMaxWaitTimeOnRetryBackendLock = oldBudget
}()

ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
defer cancel()

gomock.InOrder(
// First call: ErrLockTableBindChanged should still be retried
lockSvc.EXPECT().
Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}).
Return(lock.Result{}, moerr.NewLockTableBindChangedNoCtx()),
txnOp.EXPECT().HasLockTable(uint64(1)).Return(false),
// Second call: succeed
lockSvc.EXPECT().
Lock(ctx, uint64(1), gomock.Nil(), []byte("txn1"), lock.LockOptions{}).
Return(lock.Result{Timestamp: timestamp.Timestamp{PhysicalTime: 1}}, nil),
)

result, err := lockWithRetry(
ctx,
lockSvc,
1,
nil,
[]byte("txn1"),
lock.LockOptions{},
txnOp,
nil,
nil,
LockOptions{},
types.Type{},
)
require.NoError(t, err)
require.Equal(t, int64(1), result.Timestamp.PhysicalTime)
}

func TestLockWithRetryRetriesInsideLoopAndReturnsSecondResult(t *testing.T) {
ctrl := gomock.NewController(t)
defer ctrl.Finish()
Expand Down
19 changes: 19 additions & 0 deletions pkg/txn/service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,6 +238,11 @@ func (s *service) parallelSendWithRetry(
ctx context.Context,
requests []txn.TxnRequest,
ignoreTxnErrorCodes map[uint16]struct{}) *rpc.SendResult {
const (
initialBackoff = 100 * time.Millisecond
maxBackoff = time.Second
)
backoff := initialBackoff
for {
select {
case <-ctx.Done():
Expand All @@ -248,8 +253,22 @@ func (s *service) parallelSendWithRetry(
if err != nil {
err = moerr.AttachCause(ctx, err)
util.LogTxnSendRequestsFailed(s.logger, requests, err)
timer := time.NewTimer(backoff)
select {
case <-ctx.Done():
timer.Stop()
return nil
case <-timer.C:
}
if backoff < maxBackoff {
backoff *= 2
if backoff > maxBackoff {
backoff = maxBackoff
}
}
continue
}
backoff = initialBackoff
util.LogTxnReceivedResponses(s.logger, result.Responses)
hasError := false
for _, resp := range result.Responses {
Expand Down
Loading