@@ -25,13 +25,11 @@ import (
25
25
"sync"
26
26
"time"
27
27
28
- "github.com/pingcap/errors"
29
28
"github.com/pingcap/failpoint"
30
29
"github.com/pingcap/kvproto/pkg/pdpb"
31
30
"github.com/pingcap/log"
32
31
"github.com/tikv/pd/pkg/election"
33
32
"github.com/tikv/pd/pkg/errs"
34
- "github.com/tikv/pd/pkg/mcs/utils/constant"
35
33
"github.com/tikv/pd/pkg/member"
36
34
"github.com/tikv/pd/pkg/slice"
37
35
"github.com/tikv/pd/pkg/storage/endpoint"
@@ -249,30 +247,6 @@ func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership
249
247
}
250
248
}
251
249
252
- // setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon.
253
- // One TSO Allocator should only be set once, and may be initialized and reset multiple times depending on the election.
254
- func (am * AllocatorManager ) setUpLocalAllocator (parentCtx context.Context , dcLocation string , leadership * election.Leadership ) {
255
- am .mu .Lock ()
256
- defer am .mu .Unlock ()
257
-
258
- if _ , exist := am .mu .allocatorGroups [dcLocation ]; exist {
259
- return
260
- }
261
- allocator := NewLocalTSOAllocator (am , leadership , dcLocation )
262
- // Create a new allocatorGroup
263
- ctx , cancel := context .WithCancel (parentCtx )
264
- am .mu .allocatorGroups [dcLocation ] = & allocatorGroup {
265
- dcLocation : dcLocation ,
266
- ctx : ctx ,
267
- cancel : cancel ,
268
- leadership : leadership ,
269
- allocator : allocator ,
270
- }
271
- // Start election of the Local TSO Allocator here
272
- localTSOAllocator , _ := allocator .(* LocalTSOAllocator )
273
- go am .allocatorLeaderLoop (parentCtx , localTSOAllocator )
274
- }
275
-
276
250
// getGroupID returns the keyspace group ID of the allocator manager.
277
251
func (am * AllocatorManager ) getGroupID () uint32 {
278
252
if am == nil {
@@ -478,15 +452,6 @@ func (am *AllocatorManager) GetClusterDCLocationsNumber() int {
478
452
return len (am .mu .clusterDCLocations )
479
453
}
480
454
481
- // compareAndSetMaxSuffix sets the max suffix sign if suffix is greater than am.mu.maxSuffix.
482
- func (am * AllocatorManager ) compareAndSetMaxSuffix (suffix int32 ) {
483
- am .mu .Lock ()
484
- defer am .mu .Unlock ()
485
- if suffix > am .mu .maxSuffix {
486
- am .mu .maxSuffix = suffix
487
- }
488
- }
489
-
490
455
// GetSuffixBits calculates the bits of suffix sign
491
456
// by the max number of suffix so far,
492
457
// which will be used in the TSO logical part.
@@ -516,202 +481,11 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string {
516
481
return path .Join (am .rootPath , localTSOAllocatorEtcdPrefix )
517
482
}
518
483
519
- // similar logic with leaderLoop in server/server.go
520
- func (am * AllocatorManager ) allocatorLeaderLoop (ctx context.Context , allocator * LocalTSOAllocator ) {
521
- defer logutil .LogPanic ()
522
- defer log .Info ("server is closed, return local tso allocator leader loop" ,
523
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
524
- zap .String ("dc-location" , allocator .GetDCLocation ()),
525
- zap .String ("local-tso-allocator-name" , am .member .Name ()))
526
- for {
527
- select {
528
- case <- ctx .Done ():
529
- return
530
- default :
531
- }
532
-
533
- // Check whether the Local TSO Allocator has the leader already
534
- allocatorLeader , rev , checkAgain := allocator .CheckAllocatorLeader ()
535
- if checkAgain {
536
- continue
537
- }
538
- if allocatorLeader != nil {
539
- log .Info ("start to watch allocator leader" ,
540
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
541
- zap .Stringer (fmt .Sprintf ("%s-allocator-leader" , allocator .GetDCLocation ()), allocatorLeader ),
542
- zap .String ("local-tso-allocator-name" , am .member .Name ()))
543
- // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed.
544
- allocator .WatchAllocatorLeader (ctx , allocatorLeader , rev )
545
- log .Info ("local tso allocator leader has changed, try to re-campaign a local tso allocator leader" ,
546
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
547
- zap .String ("dc-location" , allocator .GetDCLocation ()))
548
- }
549
-
550
- // Check the next-leader key
551
- nextLeader , err := am .getNextLeaderID (allocator .GetDCLocation ())
552
- if err != nil {
553
- log .Error ("get next leader from etcd failed" ,
554
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
555
- zap .String ("dc-location" , allocator .GetDCLocation ()),
556
- errs .ZapError (err ))
557
- time .Sleep (200 * time .Millisecond )
558
- continue
559
- }
560
- isNextLeader := false
561
- if nextLeader != 0 {
562
- if nextLeader != am .member .ID () {
563
- log .Info ("skip campaigning of the local tso allocator leader and check later" ,
564
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
565
- zap .String ("server-name" , am .member .Name ()),
566
- zap .Uint64 ("server-id" , am .member .ID ()),
567
- zap .Uint64 ("next-leader-id" , nextLeader ))
568
- time .Sleep (200 * time .Millisecond )
569
- continue
570
- }
571
- isNextLeader = true
572
- }
573
-
574
- // Make sure the leader is aware of this new dc-location in order to make the
575
- // Global TSO synchronization can cover up this dc-location.
576
- ok , dcLocationInfo , err := am .getDCLocationInfoFromLeader (ctx , allocator .GetDCLocation ())
577
- if err != nil {
578
- log .Error ("get dc-location info from pd leader failed" ,
579
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
580
- zap .String ("dc-location" , allocator .GetDCLocation ()),
581
- errs .ZapError (err ))
582
- // PD leader hasn't been elected out, wait for the campaign
583
- if ! longSleep (ctx , time .Second ) {
584
- return
585
- }
586
- continue
587
- }
588
- if ! ok || dcLocationInfo .Suffix <= 0 || dcLocationInfo .MaxTs == nil {
589
- log .Warn ("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round" ,
590
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
591
- zap .String ("dc-location" , allocator .GetDCLocation ()),
592
- zap .Any ("dc-location-info" , dcLocationInfo ),
593
- zap .String ("wait-duration" , checkStep .String ()))
594
- // Because the checkStep is long, we use select here to check whether the ctx is done
595
- // to prevent the leak of goroutine.
596
- if ! longSleep (ctx , checkStep ) {
597
- return
598
- }
599
- continue
600
- }
601
-
602
- am .campaignAllocatorLeader (ctx , allocator , dcLocationInfo , isNextLeader )
603
- }
604
- }
605
-
606
- // longSleep is used to sleep the long wait duration while also watching the
607
- // ctx.Done() to prevent the goroutine from leaking. This function returns
608
- // true if the sleep is over, false if the ctx is done.
609
- func longSleep (ctx context.Context , waitStep time.Duration ) bool {
610
- waitTicker := time .NewTicker (waitStep )
611
- defer waitTicker .Stop ()
612
- select {
613
- case <- ctx .Done ():
614
- return false
615
- case <- waitTicker .C :
616
- return true
617
- }
618
- }
619
-
620
- func (am * AllocatorManager ) campaignAllocatorLeader (
621
- loopCtx context.Context ,
622
- allocator * LocalTSOAllocator ,
623
- dcLocationInfo * pdpb.GetDCLocationInfoResponse ,
624
- isNextLeader bool ,
625
- ) {
626
- logger := log .With (
627
- logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ),
628
- zap .String ("dc-location" , allocator .GetDCLocation ()),
629
- zap .Any ("dc-location-info" , dcLocationInfo ),
630
- zap .String ("name" , am .member .Name ()),
631
- )
632
- logger .Info ("start to campaign local tso allocator leader" )
633
- cmps := make ([]clientv3.Cmp , 0 )
634
- nextLeaderKey := am .nextLeaderKey (allocator .GetDCLocation ())
635
- if ! isNextLeader {
636
- cmps = append (cmps , clientv3 .Compare (clientv3 .CreateRevision (nextLeaderKey ), "=" , 0 ))
637
- } else {
638
- nextLeaderValue := fmt .Sprintf ("%v" , am .member .ID ())
639
- cmps = append (cmps , clientv3 .Compare (clientv3 .Value (nextLeaderKey ), "=" , nextLeaderValue ))
640
- }
641
- failpoint .Inject ("injectNextLeaderKey" , func (val failpoint.Value ) {
642
- if val .(bool ) {
643
- // In order not to campaign leader too often in tests
644
- time .Sleep (5 * time .Second )
645
- cmps = []clientv3.Cmp {
646
- clientv3 .Compare (clientv3 .Value (nextLeaderKey ), "=" , "mockValue" ),
647
- }
648
- }
649
- })
650
- if err := allocator .CampaignAllocatorLeader (am .leaderLease , cmps ... ); err != nil {
651
- if err .Error () == errs .ErrEtcdTxnConflict .Error () {
652
- logger .Info ("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully" )
653
- } else {
654
- logger .Error ("failed to campaign local tso allocator leader due to etcd error" , errs .ZapError (err ))
655
- }
656
- return
657
- }
658
-
659
- // Start keepalive the Local TSO Allocator leadership and enable Local TSO service.
660
- ctx , cancel := context .WithCancel (loopCtx )
661
- defer cancel ()
662
- defer am .ResetAllocatorGroup (allocator .GetDCLocation (), false )
663
- // Maintain the Local TSO Allocator leader
664
- go allocator .KeepAllocatorLeader (ctx )
665
-
666
- logger .Info ("Complete campaign local tso allocator leader, begin to initialize the local TSO allocator" )
667
- if err := allocator .Initialize (int (dcLocationInfo .Suffix )); err != nil {
668
- log .Error ("failed to initialize the local TSO allocator" , errs .ZapError (err ))
669
- return
670
- }
671
- if dcLocationInfo .GetMaxTs ().GetPhysical () != 0 {
672
- if err := allocator .WriteTSO (dcLocationInfo .GetMaxTs ()); err != nil {
673
- log .Error ("failed to write the max local TSO after member changed" , errs .ZapError (err ))
674
- return
675
- }
676
- }
677
- am .compareAndSetMaxSuffix (dcLocationInfo .Suffix )
678
- allocator .EnableAllocatorLeader ()
679
- // The next leader is me, delete it to finish campaigning
680
- if err := am .deleteNextLeaderID (allocator .GetDCLocation ()); err != nil {
681
- logger .Warn ("failed to delete next leader key after campaign local tso allocator leader" , errs .ZapError (err ))
682
- }
683
- logger .Info ("local tso allocator leader is ready to serve" )
684
-
685
- leaderTicker := time .NewTicker (constant .LeaderTickInterval )
686
- defer leaderTicker .Stop ()
687
-
688
- for {
689
- select {
690
- case <- leaderTicker .C :
691
- if ! allocator .IsAllocatorLeader () {
692
- logger .Info ("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down" )
693
- return
694
- }
695
- case <- ctx .Done ():
696
- // Server is closed and it should return nil.
697
- logger .Info ("server is closed, reset the local tso allocator" )
698
- return
699
- }
700
- }
701
- }
702
-
703
484
// AllocatorDaemon is used to update every allocator's TSO and check whether we have
704
485
// any new local allocator that needs to be set up.
705
486
func (am * AllocatorManager ) AllocatorDaemon (ctx context.Context ) {
706
487
log .Info ("entering into allocator daemon" , logutil .CondUint32 ("keyspace-group-id" , am .kgID , am .kgID > 0 ))
707
488
708
- // allocatorPatroller should only work when enableLocalTSO is true to
709
- // set up the new Local TSO Allocator in time.
710
- var patrolTicker = & time.Ticker {}
711
- if am .enableLocalTSO {
712
- patrolTicker = time .NewTicker (patrolStep )
713
- defer patrolTicker .Stop ()
714
- }
715
489
tsTicker := time .NewTicker (am .updatePhysicalInterval )
716
490
failpoint .Inject ("fastUpdatePhysicalInterval" , func () {
717
491
tsTicker .Reset (time .Millisecond )
@@ -722,9 +496,6 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) {
722
496
723
497
for {
724
498
select {
725
- case <- patrolTicker .C :
726
- // Inspect the cluster dc-location info and set up the new Local TSO Allocator in time.
727
- am .allocatorPatroller (ctx )
728
499
case <- tsTicker .C :
729
500
// Update the initialized TSO Allocator to advance TSO.
730
501
am .allocatorUpdater ()
@@ -788,33 +559,6 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) {
788
559
}
789
560
}
790
561
791
- // Check if we have any new dc-location configured, if yes,
792
- // then set up the corresponding local allocator.
793
- func (am * AllocatorManager ) allocatorPatroller (serverCtx context.Context ) {
794
- // Collect all dc-locations
795
- dcLocations := am .GetClusterDCLocations ()
796
- // Get all Local TSO Allocators
797
- allocatorGroups := am .getAllocatorGroups (FilterDCLocation (GlobalDCLocation ))
798
- // Set up the new one
799
- for dcLocation := range dcLocations {
800
- if slice .NoneOf (allocatorGroups , func (i int ) bool {
801
- return allocatorGroups [i ].dcLocation == dcLocation
802
- }) {
803
- am .setUpLocalAllocator (serverCtx , dcLocation , election .NewLeadership (
804
- am .member .Client (),
805
- am .getAllocatorPath (dcLocation ),
806
- fmt .Sprintf ("%s local allocator leader election" , dcLocation ),
807
- ))
808
- }
809
- }
810
- // Clean up the unused one
811
- for _ , ag := range allocatorGroups {
812
- if _ , exist := dcLocations [ag .dcLocation ]; ! exist {
813
- am .deleteAllocatorGroup (ag .dcLocation )
814
- }
815
- }
816
- }
817
-
818
562
// ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info
819
563
// and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations.
820
564
func (am * AllocatorManager ) ClusterDCLocationChecker () {
@@ -1086,33 +830,6 @@ func (am *AllocatorManager) getNextLeaderID(dcLocation string) (uint64, error) {
1086
830
return strconv .ParseUint (string (nextLeaderValue ), 10 , 64 )
1087
831
}
1088
832
1089
- func (am * AllocatorManager ) deleteNextLeaderID (dcLocation string ) error {
1090
- nextLeaderKey := am .nextLeaderKey (dcLocation )
1091
- resp , err := kv .NewSlowLogTxn (am .member .Client ()).
1092
- Then (clientv3 .OpDelete (nextLeaderKey )).
1093
- Commit ()
1094
- if err != nil {
1095
- return errs .ErrEtcdKVDelete .Wrap (err ).GenWithStackByCause ()
1096
- }
1097
- if ! resp .Succeeded {
1098
- return errs .ErrEtcdTxnConflict .FastGenByArgs ()
1099
- }
1100
- return nil
1101
- }
1102
-
1103
- // deleteAllocatorGroup should only be used to remove the unused Local TSO Allocator from an unused dc-location.
1104
- // If you want to clear or reset a TSO allocator, use (*AllocatorManager).ResetAllocatorGroup.
1105
- func (am * AllocatorManager ) deleteAllocatorGroup (dcLocation string ) {
1106
- am .mu .Lock ()
1107
- defer am .mu .Unlock ()
1108
- if allocatorGroup , exist := am .mu .allocatorGroups [dcLocation ]; exist {
1109
- allocatorGroup .allocator .Reset ()
1110
- allocatorGroup .leadership .Reset ()
1111
- allocatorGroup .cancel ()
1112
- delete (am .mu .allocatorGroups , dcLocation )
1113
- }
1114
- }
1115
-
1116
833
// HandleRequest forwards TSO allocation requests to correct TSO Allocators.
1117
834
func (am * AllocatorManager ) HandleRequest (ctx context.Context , dcLocation string , count uint32 ) (pdpb.Timestamp , error ) {
1118
835
defer trace .StartRegion (ctx , "AllocatorManager.HandleRequest" ).End ()
@@ -1238,45 +955,6 @@ func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string
1238
955
return conn , nil
1239
956
}
1240
957
1241
- func (am * AllocatorManager ) getDCLocationInfoFromLeader (ctx context.Context , dcLocation string ) (bool , * pdpb.GetDCLocationInfoResponse , error ) {
1242
- if am .IsLeader () {
1243
- info , ok := am .GetDCLocationInfo (dcLocation )
1244
- if ! ok {
1245
- return false , & pdpb.GetDCLocationInfoResponse {}, nil
1246
- }
1247
- dcLocationInfo := & pdpb.GetDCLocationInfoResponse {Suffix : info .Suffix }
1248
- var err error
1249
- if dcLocationInfo .MaxTs , err = am .GetMaxLocalTSO (ctx ); err != nil {
1250
- return false , & pdpb.GetDCLocationInfoResponse {}, err
1251
- }
1252
- return ok , dcLocationInfo , nil
1253
- }
1254
-
1255
- leaderAddr := am .GetLeaderAddr ()
1256
- if len (leaderAddr ) < 1 {
1257
- return false , & pdpb.GetDCLocationInfoResponse {}, fmt .Errorf ("failed to get leader client url" )
1258
- }
1259
- conn , err := am .getOrCreateGRPCConn (ctx , leaderAddr )
1260
- if err != nil {
1261
- return false , & pdpb.GetDCLocationInfoResponse {}, err
1262
- }
1263
- getCtx , cancel := context .WithTimeout (ctx , rpcTimeout )
1264
- defer cancel ()
1265
- resp , err := pdpb .NewPDClient (conn ).GetDCLocationInfo (getCtx , & pdpb.GetDCLocationInfoRequest {
1266
- Header : & pdpb.RequestHeader {
1267
- SenderId : am .member .ID (),
1268
- },
1269
- DcLocation : dcLocation ,
1270
- })
1271
- if err != nil {
1272
- return false , & pdpb.GetDCLocationInfoResponse {}, err
1273
- }
1274
- if resp .GetHeader ().GetError () != nil {
1275
- return false , & pdpb.GetDCLocationInfoResponse {}, errors .Errorf ("get the dc-location info from leader failed: %s" , resp .GetHeader ().GetError ().String ())
1276
- }
1277
- return resp .GetSuffix () != 0 , resp , nil
1278
- }
1279
-
1280
958
// GetMaxLocalTSO will sync with the current Local TSO Allocators among the cluster to get the
1281
959
// max Local TSO.
1282
960
func (am * AllocatorManager ) GetMaxLocalTSO (ctx context.Context ) (* pdpb.Timestamp , error ) {
0 commit comments