Skip to content

Commit

Permalink
abort lock table when tn restart (#20915)
Browse files Browse the repository at this point in the history
abort lock table when tn restart

Approved by: @zhangxu19830126
  • Loading branch information
iamlinjunhong authored Dec 24, 2024
1 parent fa123fd commit 4803a23
Show file tree
Hide file tree
Showing 7 changed files with 218 additions and 132 deletions.
13 changes: 10 additions & 3 deletions pkg/lockservice/lock_table_allocator.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,15 @@ func (l *lockTableAllocator) Get(
return l.registerBind(binds, group, tableID, originTableID, sharding)
}

func (l *lockTableAllocator) KeepLockTableBind(serviceID string) bool {
func (l *lockTableAllocator) KeepLockTableBind(
serviceID string,
version uint64,
) bool {
b := l.getServiceBinds(serviceID)
if b == nil {
return false
}
return b.active()
return b.active() && (version == 0 || version == l.version)
}

func (l *lockTableAllocator) AddCannotCommit(values []pb.OrphanTxn) [][]byte {
Expand Down Expand Up @@ -835,12 +838,16 @@ func (l *lockTableAllocator) handleKeepLockTableBind(
req *pb.Request,
resp *pb.Response,
cs morpc.ClientSession) {
resp.KeepLockTableBind.OK = l.KeepLockTableBind(req.KeepLockTableBind.ServiceID)
resp.KeepLockTableBind.OK = l.KeepLockTableBind(
req.KeepLockTableBind.ServiceID,
req.KeepLockTableBind.Version,
)
if !resp.KeepLockTableBind.OK {
// resp.KeepLockTableBind.Status = pb.Status_ServiceCanRestart
writeResponse(l.logger, cancel, resp, nil, cs)
return
}
resp.KeepLockTableBind.Version = l.version
b := l.getServiceBinds(req.KeepLockTableBind.ServiceID)
if b.isStatus(pb.Status_ServiceLockEnable) {
if req.KeepLockTableBind.Status != pb.Status_ServiceLockEnable {
Expand Down
4 changes: 3 additions & 1 deletion pkg/lockservice/lock_table_allocator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,7 +201,9 @@ func TestKeepaliveBind(t *testing.T) {
time.Sleep(time.Millisecond * 20)
}

assert.True(t, a.KeepLockTableBind("s1"))
assert.True(t, a.KeepLockTableBind("s1", 0))

assert.False(t, a.KeepLockTableBind("s1", 1))
})
}

Expand Down
2 changes: 2 additions & 0 deletions pkg/lockservice/lock_table_keeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
req.Method = pb.Method_KeepLockTableBind
req.KeepLockTableBind.ServiceID = k.serviceID
req.KeepLockTableBind.Status = k.service.getStatus()
req.KeepLockTableBind.Version = k.service.tnVersion
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
req.KeepLockTableBind.LockTables = k.service.topGroupTables()
req.KeepLockTableBind.TxnIDs = k.service.activeTxnHolder.getAllTxnID()
Expand All @@ -200,6 +201,7 @@ func (k *lockTableKeeper) doKeepLockTableBind(ctx context.Context) {
defer releaseResponse(resp)

if resp.KeepLockTableBind.OK {
k.service.tnVersion = resp.KeepLockTableBind.Version
switch resp.KeepLockTableBind.Status {
case pb.Status_ServiceLockEnable:
if !k.service.isStatus(pb.Status_ServiceLockEnable) {
Expand Down
2 changes: 2 additions & 0 deletions pkg/lockservice/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ type service struct {
stopOnce sync.Once
fetchWhoWaitingListC chan who
logger *log.MOLogger
tnVersion uint64

remote struct {
client Client
Expand Down Expand Up @@ -96,6 +97,7 @@ func NewLockService(
stopper.WithLogger(getLogger(cfg.ServiceID).RawLogger())),
fetchWhoWaitingListC: make(chan who, 10240),
logger: getLogger(cfg.ServiceID),
tnVersion: 0,
}

for _, opt := range opts {
Expand Down
2 changes: 1 addition & 1 deletion pkg/lockservice/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,7 @@ type LockTableAllocator interface {
// KeepLockTableBind once a cn is bound to a Table, a heartbeat needs to be sent
// periodically to keep the binding in place. If no heartbeat is sent for a long
// period of time to maintain the binding, the binding will become invalid.
KeepLockTableBind(serviceID string) bool
KeepLockTableBind(serviceID string, version uint64) bool
// Valid check for changes in the binding relationship of a specific lock-table.
Valid(serviceID string, txnID []byte, binds []pb.LockTable) ([]uint64, error)
// AddCannotCommit add cannot commit txn.
Expand Down
Loading

0 comments on commit 4803a23

Please sign in to comment.