Skip to content

Commit a217137

Browse files
Merge branch 'release-6.5' into cherry-pick-8703-to-release-6.5
2 parents 5b6e4bc + dcb426b commit a217137

File tree

8 files changed

+117
-22
lines changed

8 files changed

+117
-22
lines changed

OWNERS

+1
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,6 @@ reviewers:
2222
- BusyJay
2323
- howardlau1999
2424
- Luffbee
25+
- okJiang
2526
- shafreeck
2627
- xhebox

client/client.go

+13-5
Original file line numberDiff line numberDiff line change
@@ -731,14 +731,22 @@ func (c *client) handleDispatcher(
731731
if dc == globalDCLocation {
732732
go func() {
733733
var updateTicker = &time.Ticker{}
734-
setNewUpdateTicker := func(ticker *time.Ticker) {
734+
setNewUpdateTicker := func(interval time.Duration) {
735735
if updateTicker.C != nil {
736736
updateTicker.Stop()
737737
}
738-
updateTicker = ticker
738+
if interval == 0 {
739+
updateTicker = &time.Ticker{}
740+
} else {
741+
updateTicker = time.NewTicker(interval)
742+
}
743+
}
744+
// If the TSO Follower Proxy is enabled, set the update interval to the member update interval.
745+
if c.option.getEnableTSOFollowerProxy() {
746+
setNewUpdateTicker(memberUpdateInterval)
739747
}
740748
// Set to nil before returning to ensure that the existing ticker can be GC.
741-
defer setNewUpdateTicker(nil)
749+
defer setNewUpdateTicker(0)
742750

743751
for {
744752
select {
@@ -749,11 +757,11 @@ func (c *client) handleDispatcher(
749757
if enableTSOFollowerProxy && updateTicker.C == nil {
750758
// Because the TSO Follower Proxy is enabled,
751759
// the periodic check needs to be performed.
752-
setNewUpdateTicker(time.NewTicker(memberUpdateInterval))
760+
setNewUpdateTicker(memberUpdateInterval)
753761
} else if !enableTSOFollowerProxy && updateTicker.C != nil {
754762
// Because the TSO Follower Proxy is disabled,
755763
// the periodic check needs to be turned off.
756-
setNewUpdateTicker(&time.Ticker{})
764+
setNewUpdateTicker(0)
757765
} else {
758766
// The status of TSO Follower Proxy does not change, and updateConnectionCtxs is not triggered
759767
continue

pkg/schedule/schedulers/OWNERS

-7
This file was deleted.

plugin/scheduler_example/evict_leader.go

+14-2
Original file line numberDiff line numberDiff line change
@@ -274,9 +274,21 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
274274
args = append(args, handler.config.getRanges(id)...)
275275
}
276276

277-
handler.config.BuildWithArgs(args)
278-
err := handler.config.Persist()
277+
err := handler.config.BuildWithArgs(args)
279278
if err != nil {
279+
handler.config.mu.Lock()
280+
handler.config.cluster.ResumeLeaderTransfer(id)
281+
handler.config.mu.Unlock()
282+
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
283+
return
284+
}
285+
286+
err = handler.config.Persist()
287+
if err != nil {
288+
handler.config.mu.Lock()
289+
delete(handler.config.StoreIDWitRanges, id)
290+
handler.config.cluster.ResumeLeaderTransfer(id)
291+
handler.config.mu.Unlock()
280292
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
281293
}
282294
handler.rd.JSON(w, http.StatusOK, nil)

server/grpc_service.go

+8-3
Original file line numberDiff line numberDiff line change
@@ -227,13 +227,18 @@ type tsoRequest struct {
227227
}
228228

229229
func (s *GrpcServer) dispatchTSORequest(ctx context.Context, request *tsoRequest, forwardedHost string, doneCh <-chan struct{}, errCh chan<- error) {
230-
tsoRequestChInterface, loaded := s.tsoDispatcher.LoadOrStore(forwardedHost, make(chan *tsoRequest, maxMergeTSORequests))
230+
val, loaded := s.tsoDispatcher.Load(forwardedHost)
231+
if !loaded {
232+
val = make(chan *tsoRequest, maxMergeTSORequests)
233+
val, loaded = s.tsoDispatcher.LoadOrStore(forwardedHost, val)
234+
}
235+
reqCh := val.(chan *tsoRequest)
231236
if !loaded {
232237
tsDeadlineCh := make(chan deadline, 1)
233-
go s.handleDispatcher(ctx, forwardedHost, tsoRequestChInterface.(chan *tsoRequest), tsDeadlineCh, doneCh, errCh)
238+
go s.handleDispatcher(ctx, forwardedHost, reqCh, tsDeadlineCh, doneCh, errCh)
234239
go watchTSDeadline(ctx, tsDeadlineCh)
235240
}
236-
tsoRequestChInterface.(chan *tsoRequest) <- request
241+
reqCh <- request
237242
}
238243

239244
func (s *GrpcServer) handleDispatcher(ctx context.Context, forwardedHost string, tsoRequestCh <-chan *tsoRequest, tsDeadlineCh chan<- deadline, doneCh <-chan struct{}, errCh chan<- error) {

server/schedulers/evict_leader.go

+12-2
Original file line numberDiff line numberDiff line change
@@ -98,6 +98,9 @@ func (conf *evictLeaderSchedulerConfig) getStores() []uint64 {
9898
}
9999

100100
func (conf *evictLeaderSchedulerConfig) BuildWithArgs(args []string) error {
101+
failpoint.Inject("buildWithArgsErr", func() {
102+
failpoint.Return(errors.New("fail to build with args"))
103+
})
101104
if len(args) < 1 {
102105
return errs.ErrSchedulerConfig.FastGenByArgs("id")
103106
}
@@ -390,8 +393,15 @@ func (handler *evictLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
390393
args = append(args, handler.config.getRanges(id)...)
391394
}
392395

393-
handler.config.BuildWithArgs(args)
394-
err := handler.config.Persist()
396+
err := handler.config.BuildWithArgs(args)
397+
if err != nil {
398+
handler.config.mu.Lock()
399+
handler.config.cluster.ResumeLeaderTransfer(id)
400+
handler.config.mu.Unlock()
401+
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
402+
return
403+
}
404+
err = handler.config.Persist()
395405
if err != nil {
396406
handler.config.removeStore(id)
397407
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())

server/schedulers/grant_leader.go

+10-3
Original file line numberDiff line numberDiff line change
@@ -294,10 +294,17 @@ func (handler *grantLeaderHandler) UpdateConfig(w http.ResponseWriter, r *http.R
294294
args = append(args, handler.config.getRanges(id)...)
295295
}
296296

297-
handler.config.BuildWithArgs(args)
298-
err := handler.config.Persist()
297+
err := handler.config.BuildWithArgs(args)
299298
if err != nil {
300-
handler.config.removeStore(id)
299+
handler.config.mu.Lock()
300+
handler.config.cluster.ResumeLeaderTransfer(id)
301+
handler.config.mu.Unlock()
302+
handler.rd.JSON(w, http.StatusBadRequest, err.Error())
303+
return
304+
}
305+
err = handler.config.Persist()
306+
if err != nil {
307+
_, _ = handler.config.removeStore(id)
301308
handler.rd.JSON(w, http.StatusInternalServerError, err.Error())
302309
return
303310
}

tests/pdctl/scheduler/scheduler_test.go

+59
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
"testing"
2121
"time"
2222

23+
"github.com/pingcap/failpoint"
2324
"github.com/pingcap/kvproto/pkg/metapb"
2425
"github.com/stretchr/testify/require"
2526
"github.com/tikv/pd/pkg/testutil"
@@ -480,3 +481,61 @@ func TestScheduler(t *testing.T) {
480481
echo = mustExec([]string{"-u", pdAddr, "scheduler", "remove", "split-bucket-scheduler"}, nil)
481482
re.Contains(echo, "Success!")
482483
}
484+
485+
func TestEvictLeaderScheduler(t *testing.T) {
486+
re := require.New(t)
487+
ctx, cancel := context.WithCancel(context.Background())
488+
defer cancel()
489+
cluster, err := tests.NewTestCluster(ctx, 1)
490+
re.NoError(err)
491+
defer cluster.Destroy()
492+
err = cluster.RunInitialServers()
493+
re.NoError(err)
494+
cluster.WaitLeader()
495+
pdAddr := cluster.GetConfig().GetClientURL()
496+
cmd := pdctlCmd.GetRootCmd()
497+
498+
stores := []*metapb.Store{
499+
{
500+
Id: 1,
501+
State: metapb.StoreState_Up,
502+
LastHeartbeat: time.Now().UnixNano(),
503+
},
504+
{
505+
Id: 2,
506+
State: metapb.StoreState_Up,
507+
LastHeartbeat: time.Now().UnixNano(),
508+
},
509+
{
510+
Id: 3,
511+
State: metapb.StoreState_Up,
512+
LastHeartbeat: time.Now().UnixNano(),
513+
},
514+
{
515+
Id: 4,
516+
State: metapb.StoreState_Up,
517+
LastHeartbeat: time.Now().UnixNano(),
518+
},
519+
}
520+
leaderServer := cluster.GetServer(cluster.GetLeader())
521+
re.NoError(leaderServer.BootstrapCluster())
522+
for _, store := range stores {
523+
pdctl.MustPutStore(re, leaderServer.GetServer(), store)
524+
}
525+
526+
pdctl.MustPutRegion(re, cluster, 1, 1, []byte("a"), []byte("b"))
527+
output, err := pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "2"}...)
528+
re.NoError(err)
529+
re.Contains(string(output), "Success!")
530+
failpoint.Enable("github.com/tikv/pd/server/schedulers/buildWithArgsErr", "return(true)")
531+
output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
532+
re.NoError(err)
533+
re.Contains(string(output), "fail to build with args")
534+
failpoint.Disable("github.com/tikv/pd/server/schedulers/buildWithArgsErr")
535+
output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "remove", "evict-leader-scheduler"}...)
536+
re.NoError(err)
537+
re.Contains(string(output), "Success!")
538+
output, err = pdctl.ExecuteCommand(cmd, []string{"-u", pdAddr, "scheduler", "add", "evict-leader-scheduler", "1"}...)
539+
re.NoError(err)
540+
re.Contains(string(output), "Success!")
541+
}

0 commit comments

Comments
 (0)