From be4a366da715ff3eeb628d98248068364be8ef91 Mon Sep 17 00:00:00 2001 From: okJiang <819421878@qq.com> Date: Fri, 22 Nov 2024 13:58:38 +0800 Subject: [PATCH] tso/local: remove `allocatorPatroller`/`PriorityChecker` func (#8845) close tikv/pd#8802 Signed-off-by: okJiang <819421878@qq.com> Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com> --- pkg/tso/allocator_manager.go | 418 +---------------------------------- pkg/tso/filter.go | 5 - pkg/tso/local_allocator.go | 69 ------ server/api/tso.go | 38 +--- server/api/tso_test.go | 65 ------ 5 files changed, 3 insertions(+), 592 deletions(-) delete mode 100644 server/api/tso_test.go diff --git a/pkg/tso/allocator_manager.go b/pkg/tso/allocator_manager.go index a02d4884e17..821e1fc5d62 100644 --- a/pkg/tso/allocator_manager.go +++ b/pkg/tso/allocator_manager.go @@ -25,13 +25,11 @@ import ( "sync" "time" - "github.com/pingcap/errors" "github.com/pingcap/failpoint" "github.com/pingcap/kvproto/pkg/pdpb" "github.com/pingcap/log" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/mcs/utils/constant" "github.com/tikv/pd/pkg/member" "github.com/tikv/pd/pkg/slice" "github.com/tikv/pd/pkg/storage/endpoint" @@ -249,30 +247,6 @@ func (am *AllocatorManager) SetUpGlobalAllocator(ctx context.Context, leadership } } -// setUpLocalAllocator is used to set up an allocator, which will initialize the allocator and put it into allocator daemon. -// One TSO Allocator should only be set once, and may be initialized and reset multiple times depending on the election. -func (am *AllocatorManager) setUpLocalAllocator(parentCtx context.Context, dcLocation string, leadership *election.Leadership) { - am.mu.Lock() - defer am.mu.Unlock() - - if _, exist := am.mu.allocatorGroups[dcLocation]; exist { - return - } - allocator := NewLocalTSOAllocator(am, leadership, dcLocation) - // Create a new allocatorGroup - ctx, cancel := context.WithCancel(parentCtx) - am.mu.allocatorGroups[dcLocation] = &allocatorGroup{ - dcLocation: dcLocation, - ctx: ctx, - cancel: cancel, - leadership: leadership, - allocator: allocator, - } - // Start election of the Local TSO Allocator here - localTSOAllocator, _ := allocator.(*LocalTSOAllocator) - go am.allocatorLeaderLoop(parentCtx, localTSOAllocator) -} - // getGroupID returns the keyspace group ID of the allocator manager. func (am *AllocatorManager) getGroupID() uint32 { if am == nil { @@ -478,15 +452,6 @@ func (am *AllocatorManager) GetClusterDCLocationsNumber() int { return len(am.mu.clusterDCLocations) } -// compareAndSetMaxSuffix sets the max suffix sign if suffix is greater than am.mu.maxSuffix. -func (am *AllocatorManager) compareAndSetMaxSuffix(suffix int32) { - am.mu.Lock() - defer am.mu.Unlock() - if suffix > am.mu.maxSuffix { - am.mu.maxSuffix = suffix - } -} - // GetSuffixBits calculates the bits of suffix sign // by the max number of suffix so far, // which will be used in the TSO logical part. @@ -516,202 +481,11 @@ func (am *AllocatorManager) getLocalTSOAllocatorPath() string { return path.Join(am.rootPath, localTSOAllocatorEtcdPrefix) } -// similar logic with leaderLoop in server/server.go -func (am *AllocatorManager) allocatorLeaderLoop(ctx context.Context, allocator *LocalTSOAllocator) { - defer logutil.LogPanic() - defer log.Info("server is closed, return local tso allocator leader loop", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation()), - zap.String("local-tso-allocator-name", am.member.Name())) - for { - select { - case <-ctx.Done(): - return - default: - } - - // Check whether the Local TSO Allocator has the leader already - allocatorLeader, rev, checkAgain := allocator.CheckAllocatorLeader() - if checkAgain { - continue - } - if allocatorLeader != nil { - log.Info("start to watch allocator leader", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Stringer(fmt.Sprintf("%s-allocator-leader", allocator.GetDCLocation()), allocatorLeader), - zap.String("local-tso-allocator-name", am.member.Name())) - // WatchAllocatorLeader will keep looping and never return unless the Local TSO Allocator leader has changed. - allocator.WatchAllocatorLeader(ctx, allocatorLeader, rev) - log.Info("local tso allocator leader has changed, try to re-campaign a local tso allocator leader", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation())) - } - - // Check the next-leader key - nextLeader, err := am.getNextLeaderID(allocator.GetDCLocation()) - if err != nil { - log.Error("get next leader from etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation()), - errs.ZapError(err)) - time.Sleep(200 * time.Millisecond) - continue - } - isNextLeader := false - if nextLeader != 0 { - if nextLeader != am.member.ID() { - log.Info("skip campaigning of the local tso allocator leader and check later", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("server-name", am.member.Name()), - zap.Uint64("server-id", am.member.ID()), - zap.Uint64("next-leader-id", nextLeader)) - time.Sleep(200 * time.Millisecond) - continue - } - isNextLeader = true - } - - // Make sure the leader is aware of this new dc-location in order to make the - // Global TSO synchronization can cover up this dc-location. - ok, dcLocationInfo, err := am.getDCLocationInfoFromLeader(ctx, allocator.GetDCLocation()) - if err != nil { - log.Error("get dc-location info from pd leader failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation()), - errs.ZapError(err)) - // PD leader hasn't been elected out, wait for the campaign - if !longSleep(ctx, time.Second) { - return - } - continue - } - if !ok || dcLocationInfo.Suffix <= 0 || dcLocationInfo.MaxTs == nil { - log.Warn("pd leader is not aware of dc-location during allocatorLeaderLoop, wait next round", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation()), - zap.Any("dc-location-info", dcLocationInfo), - zap.String("wait-duration", checkStep.String())) - // Because the checkStep is long, we use select here to check whether the ctx is done - // to prevent the leak of goroutine. - if !longSleep(ctx, checkStep) { - return - } - continue - } - - am.campaignAllocatorLeader(ctx, allocator, dcLocationInfo, isNextLeader) - } -} - -// longSleep is used to sleep the long wait duration while also watching the -// ctx.Done() to prevent the goroutine from leaking. This function returns -// true if the sleep is over, false if the ctx is done. -func longSleep(ctx context.Context, waitStep time.Duration) bool { - waitTicker := time.NewTicker(waitStep) - defer waitTicker.Stop() - select { - case <-ctx.Done(): - return false - case <-waitTicker.C: - return true - } -} - -func (am *AllocatorManager) campaignAllocatorLeader( - loopCtx context.Context, - allocator *LocalTSOAllocator, - dcLocationInfo *pdpb.GetDCLocationInfoResponse, - isNextLeader bool, -) { - logger := log.With( - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocator.GetDCLocation()), - zap.Any("dc-location-info", dcLocationInfo), - zap.String("name", am.member.Name()), - ) - logger.Info("start to campaign local tso allocator leader") - cmps := make([]clientv3.Cmp, 0) - nextLeaderKey := am.nextLeaderKey(allocator.GetDCLocation()) - if !isNextLeader { - cmps = append(cmps, clientv3.Compare(clientv3.CreateRevision(nextLeaderKey), "=", 0)) - } else { - nextLeaderValue := fmt.Sprintf("%v", am.member.ID()) - cmps = append(cmps, clientv3.Compare(clientv3.Value(nextLeaderKey), "=", nextLeaderValue)) - } - failpoint.Inject("injectNextLeaderKey", func(val failpoint.Value) { - if val.(bool) { - // In order not to campaign leader too often in tests - time.Sleep(5 * time.Second) - cmps = []clientv3.Cmp{ - clientv3.Compare(clientv3.Value(nextLeaderKey), "=", "mockValue"), - } - } - }) - if err := allocator.CampaignAllocatorLeader(am.leaderLease, cmps...); err != nil { - if err.Error() == errs.ErrEtcdTxnConflict.Error() { - logger.Info("failed to campaign local tso allocator leader due to txn conflict, another allocator may campaign successfully") - } else { - logger.Error("failed to campaign local tso allocator leader due to etcd error", errs.ZapError(err)) - } - return - } - - // Start keepalive the Local TSO Allocator leadership and enable Local TSO service. - ctx, cancel := context.WithCancel(loopCtx) - defer cancel() - defer am.ResetAllocatorGroup(allocator.GetDCLocation(), false) - // Maintain the Local TSO Allocator leader - go allocator.KeepAllocatorLeader(ctx) - - logger.Info("Complete campaign local tso allocator leader, begin to initialize the local TSO allocator") - if err := allocator.Initialize(int(dcLocationInfo.Suffix)); err != nil { - log.Error("failed to initialize the local TSO allocator", errs.ZapError(err)) - return - } - if dcLocationInfo.GetMaxTs().GetPhysical() != 0 { - if err := allocator.WriteTSO(dcLocationInfo.GetMaxTs()); err != nil { - log.Error("failed to write the max local TSO after member changed", errs.ZapError(err)) - return - } - } - am.compareAndSetMaxSuffix(dcLocationInfo.Suffix) - allocator.EnableAllocatorLeader() - // The next leader is me, delete it to finish campaigning - if err := am.deleteNextLeaderID(allocator.GetDCLocation()); err != nil { - logger.Warn("failed to delete next leader key after campaign local tso allocator leader", errs.ZapError(err)) - } - logger.Info("local tso allocator leader is ready to serve") - - leaderTicker := time.NewTicker(constant.LeaderTickInterval) - defer leaderTicker.Stop() - - for { - select { - case <-leaderTicker.C: - if !allocator.IsAllocatorLeader() { - logger.Info("no longer a local tso allocator leader because lease has expired, local tso allocator leader will step down") - return - } - case <-ctx.Done(): - // Server is closed and it should return nil. - logger.Info("server is closed, reset the local tso allocator") - return - } - } -} - // AllocatorDaemon is used to update every allocator's TSO and check whether we have // any new local allocator that needs to be set up. func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { log.Info("entering into allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) - // allocatorPatroller should only work when enableLocalTSO is true to - // set up the new Local TSO Allocator in time. - var patrolTicker = &time.Ticker{} - if am.enableLocalTSO { - patrolTicker = time.NewTicker(patrolStep) - defer patrolTicker.Stop() - } tsTicker := time.NewTicker(am.updatePhysicalInterval) failpoint.Inject("fastUpdatePhysicalInterval", func() { tsTicker.Reset(time.Millisecond) @@ -722,21 +496,13 @@ func (am *AllocatorManager) AllocatorDaemon(ctx context.Context) { for { select { - case <-patrolTicker.C: - // Inspect the cluster dc-location info and set up the new Local TSO Allocator in time. - am.allocatorPatroller(ctx) case <-tsTicker.C: // Update the initialized TSO Allocator to advance TSO. am.allocatorUpdater() case <-checkerTicker.C: // Check and maintain the cluster's meta info about dc-location distribution. go am.ClusterDCLocationChecker() - // We won't have any Local TSO Allocator set up in PD without enabling Local TSO. - if am.enableLocalTSO { - // Check the election priority of every Local TSO Allocator this PD is holding. - go am.PriorityChecker() - } - // PS: ClusterDCLocationChecker and PriorityChecker are time consuming and low frequent to run, + // PS: ClusterDCLocationChecker are time consuming and low frequent to run, // we should run them concurrently to speed up the progress. case <-ctx.Done(): log.Info("exit allocator daemon", logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0)) @@ -788,33 +554,6 @@ func (am *AllocatorManager) updateAllocator(ag *allocatorGroup) { } } -// Check if we have any new dc-location configured, if yes, -// then set up the corresponding local allocator. -func (am *AllocatorManager) allocatorPatroller(serverCtx context.Context) { - // Collect all dc-locations - dcLocations := am.GetClusterDCLocations() - // Get all Local TSO Allocators - allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation)) - // Set up the new one - for dcLocation := range dcLocations { - if slice.NoneOf(allocatorGroups, func(i int) bool { - return allocatorGroups[i].dcLocation == dcLocation - }) { - am.setUpLocalAllocator(serverCtx, dcLocation, election.NewLeadership( - am.member.Client(), - am.getAllocatorPath(dcLocation), - fmt.Sprintf("%s local allocator leader election", dcLocation), - )) - } - } - // Clean up the unused one - for _, ag := range allocatorGroups { - if _, exist := dcLocations[ag.dcLocation]; !exist { - am.deleteAllocatorGroup(ag.dcLocation) - } - } -} - // ClusterDCLocationChecker collects all dc-locations of a cluster, computes some related info // and stores them into the DCLocationInfo, then finally writes them into am.mu.clusterDCLocations. func (am *AllocatorManager) ClusterDCLocationChecker() { @@ -975,72 +714,6 @@ func (am *AllocatorManager) GetLocalTSOSuffixPath(dcLocation string) string { return path.Join(am.GetLocalTSOSuffixPathPrefix(), dcLocation) } -// PriorityChecker is used to check the election priority of a Local TSO Allocator. -// In the normal case, if we want to elect a Local TSO Allocator for a certain DC, -// such as dc-1, we need to make sure the follow priority rules: -// 1. The PD server with dc-location="dc-1" needs to be elected as the allocator -// leader with the highest priority. -// 2. If all PD servers with dc-location="dc-1" are down, then the other PD servers -// of DC could be elected. -func (am *AllocatorManager) PriorityChecker() { - defer logutil.LogPanic() - - serverID := am.member.ID() - myServerDCLocation := am.getServerDCLocation(serverID) - // Check all Local TSO Allocator followers to see if their priorities is higher than the leaders - // Filter out allocators with leadership and initialized - allocatorGroups := am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterAvailableLeadership()) - for _, allocatorGroup := range allocatorGroups { - localTSOAllocator, _ := allocatorGroup.allocator.(*LocalTSOAllocator) - leaderServerID := localTSOAllocator.GetAllocatorLeader().GetMemberId() - // No leader, maybe the leader is not been watched yet - if leaderServerID == 0 { - continue - } - leaderServerDCLocation := am.getServerDCLocation(leaderServerID) - // For example, an allocator leader for dc-1 is elected by a server of dc-2, then the server of dc-1 will - // find this allocator's dc-location isn't the same with server of dc-2 but is same with itself. - if allocatorGroup.dcLocation != leaderServerDCLocation && allocatorGroup.dcLocation == myServerDCLocation { - log.Info("try to move the local tso allocator", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Uint64("old-leader-id", leaderServerID), - zap.String("old-dc-location", leaderServerDCLocation), - zap.Uint64("next-leader-id", serverID), - zap.String("next-dc-location", myServerDCLocation)) - if err := am.transferLocalAllocator(allocatorGroup.dcLocation, am.member.ID()); err != nil { - log.Error("move the local tso allocator failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Uint64("old-leader-id", leaderServerID), - zap.String("old-dc-location", leaderServerDCLocation), - zap.Uint64("next-leader-id", serverID), - zap.String("next-dc-location", myServerDCLocation), - errs.ZapError(err)) - continue - } - } - } - // Check next leader and resign - // Filter out allocators with leadership - allocatorGroups = am.getAllocatorGroups(FilterDCLocation(GlobalDCLocation), FilterUnavailableLeadership()) - for _, allocatorGroup := range allocatorGroups { - nextLeader, err := am.getNextLeaderID(allocatorGroup.dcLocation) - if err != nil { - log.Error("get next leader from etcd failed", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.String("dc-location", allocatorGroup.dcLocation), - errs.ZapError(err)) - continue - } - // nextLeader is not empty and isn't same with the server ID, resign the leader - if nextLeader != 0 && nextLeader != serverID { - log.Info("next leader key found, resign current leader", - logutil.CondUint32("keyspace-group-id", am.kgID, am.kgID > 0), - zap.Uint64("nextLeaderID", nextLeader)) - am.ResetAllocatorGroup(allocatorGroup.dcLocation, false) - } - } -} - // TransferAllocatorForDCLocation transfer local tso allocator to the target member for the given dcLocation func (am *AllocatorManager) TransferAllocatorForDCLocation(dcLocation string, memberID uint64) error { if dcLocation == GlobalDCLocation { @@ -1063,56 +736,6 @@ func (am *AllocatorManager) TransferAllocatorForDCLocation(dcLocation string, me return am.transferLocalAllocator(dcLocation, memberID) } -func (am *AllocatorManager) getServerDCLocation(serverID uint64) string { - am.mu.RLock() - defer am.mu.RUnlock() - for dcLocation, info := range am.mu.clusterDCLocations { - if slice.AnyOf(info.ServerIDs, func(i int) bool { return info.ServerIDs[i] == serverID }) { - return dcLocation - } - } - return "" -} - -func (am *AllocatorManager) getNextLeaderID(dcLocation string) (uint64, error) { - nextLeaderKey := am.nextLeaderKey(dcLocation) - nextLeaderValue, err := etcdutil.GetValue(am.member.Client(), nextLeaderKey) - if err != nil { - return 0, err - } - if len(nextLeaderValue) == 0 { - return 0, nil - } - return strconv.ParseUint(string(nextLeaderValue), 10, 64) -} - -func (am *AllocatorManager) deleteNextLeaderID(dcLocation string) error { - nextLeaderKey := am.nextLeaderKey(dcLocation) - resp, err := kv.NewSlowLogTxn(am.member.Client()). - Then(clientv3.OpDelete(nextLeaderKey)). - Commit() - if err != nil { - return errs.ErrEtcdKVDelete.Wrap(err).GenWithStackByCause() - } - if !resp.Succeeded { - return errs.ErrEtcdTxnConflict.FastGenByArgs() - } - return nil -} - -// deleteAllocatorGroup should only be used to remove the unused Local TSO Allocator from an unused dc-location. -// If you want to clear or reset a TSO allocator, use (*AllocatorManager).ResetAllocatorGroup. -func (am *AllocatorManager) deleteAllocatorGroup(dcLocation string) { - am.mu.Lock() - defer am.mu.Unlock() - if allocatorGroup, exist := am.mu.allocatorGroups[dcLocation]; exist { - allocatorGroup.allocator.Reset() - allocatorGroup.leadership.Reset() - allocatorGroup.cancel() - delete(am.mu.allocatorGroups, dcLocation) - } -} - // HandleRequest forwards TSO allocation requests to correct TSO Allocators. func (am *AllocatorManager) HandleRequest(ctx context.Context, dcLocation string, count uint32) (pdpb.Timestamp, error) { defer trace.StartRegion(ctx, "AllocatorManager.HandleRequest").End() @@ -1238,45 +861,6 @@ func (am *AllocatorManager) getOrCreateGRPCConn(ctx context.Context, addr string return conn, nil } -func (am *AllocatorManager) getDCLocationInfoFromLeader(ctx context.Context, dcLocation string) (bool, *pdpb.GetDCLocationInfoResponse, error) { - if am.IsLeader() { - info, ok := am.GetDCLocationInfo(dcLocation) - if !ok { - return false, &pdpb.GetDCLocationInfoResponse{}, nil - } - dcLocationInfo := &pdpb.GetDCLocationInfoResponse{Suffix: info.Suffix} - var err error - if dcLocationInfo.MaxTs, err = am.GetMaxLocalTSO(ctx); err != nil { - return false, &pdpb.GetDCLocationInfoResponse{}, err - } - return ok, dcLocationInfo, nil - } - - leaderAddr := am.GetLeaderAddr() - if len(leaderAddr) < 1 { - return false, &pdpb.GetDCLocationInfoResponse{}, fmt.Errorf("failed to get leader client url") - } - conn, err := am.getOrCreateGRPCConn(ctx, leaderAddr) - if err != nil { - return false, &pdpb.GetDCLocationInfoResponse{}, err - } - getCtx, cancel := context.WithTimeout(ctx, rpcTimeout) - defer cancel() - resp, err := pdpb.NewPDClient(conn).GetDCLocationInfo(getCtx, &pdpb.GetDCLocationInfoRequest{ - Header: &pdpb.RequestHeader{ - SenderId: am.member.ID(), - }, - DcLocation: dcLocation, - }) - if err != nil { - return false, &pdpb.GetDCLocationInfoResponse{}, err - } - if resp.GetHeader().GetError() != nil { - return false, &pdpb.GetDCLocationInfoResponse{}, errors.Errorf("get the dc-location info from leader failed: %s", resp.GetHeader().GetError().String()) - } - return resp.GetSuffix() != 0, resp, nil -} - // GetMaxLocalTSO will sync with the current Local TSO Allocators among the cluster to get the // max Local TSO. func (am *AllocatorManager) GetMaxLocalTSO(ctx context.Context) (*pdpb.Timestamp, error) { diff --git a/pkg/tso/filter.go b/pkg/tso/filter.go index 4e28e520889..c2e32a62106 100644 --- a/pkg/tso/filter.go +++ b/pkg/tso/filter.go @@ -24,11 +24,6 @@ func FilterUninitialized() func(ag *allocatorGroup) bool { return func(ag *allocatorGroup) bool { return !ag.allocator.IsInitialize() } } -// FilterAvailableLeadership will filter out the allocatorGroup whose leadership is available. -func FilterAvailableLeadership() func(ag *allocatorGroup) bool { - return func(ag *allocatorGroup) bool { return ag.leadership.Check() } -} - // FilterUnavailableLeadership will filter out the allocatorGroup whose leadership is unavailable. func FilterUnavailableLeadership() func(ag *allocatorGroup) bool { return func(ag *allocatorGroup) bool { return !ag.leadership.Check() } diff --git a/pkg/tso/local_allocator.go b/pkg/tso/local_allocator.go index 040b5891d12..e3456261a3a 100644 --- a/pkg/tso/local_allocator.go +++ b/pkg/tso/local_allocator.go @@ -26,11 +26,8 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/tikv/pd/pkg/election" "github.com/tikv/pd/pkg/errs" - "github.com/tikv/pd/pkg/utils/keypath" - "github.com/tikv/pd/pkg/utils/logutil" "github.com/tikv/pd/pkg/utils/tsoutil" "github.com/tikv/pd/pkg/utils/typeutil" - clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -53,37 +50,6 @@ type LocalTSOAllocator struct { tsoAllocatorRoleGauge prometheus.Gauge } -// NewLocalTSOAllocator creates a new local TSO allocator. -func NewLocalTSOAllocator( - am *AllocatorManager, - leadership *election.Leadership, - dcLocation string, -) Allocator { - return &LocalTSOAllocator{ - allocatorManager: am, - leadership: leadership, - timestampOracle: newLocalTimestampOracle(am, leadership, dcLocation), - rootPath: leadership.GetLeaderKey(), - tsoAllocatorRoleGauge: tsoAllocatorRole.WithLabelValues(am.getGroupIDStr(), dcLocation), - } -} - -func newLocalTimestampOracle(am *AllocatorManager, leadership *election.Leadership, dcLocation string) *timestampOracle { - oracle := ×tampOracle{ - client: leadership.GetClient(), - keyspaceGroupID: am.kgID, - tsPath: keypath.KeyspaceGroupLocalTSPath(localTSOAllocatorEtcdPrefix, am.kgID, dcLocation), - storage: am.storage, - saveInterval: am.saveInterval, - updatePhysicalInterval: am.updatePhysicalInterval, - maxResetTSGap: am.maxResetTSGap, - dcLocation: dcLocation, - tsoMux: &tsoObject{}, - metrics: newTSOMetrics(am.getGroupIDStr(), dcLocation), - } - return oracle -} - // GetTimestampPath returns the timestamp path in etcd. func (lta *LocalTSOAllocator) GetTimestampPath() string { if lta == nil || lta.timestampOracle == nil { @@ -138,16 +104,6 @@ func (lta *LocalTSOAllocator) Reset() { lta.timestampOracle.ResetTimestamp() } -// setAllocatorLeader sets the current Local TSO Allocator leader. -func (lta *LocalTSOAllocator) setAllocatorLeader(member any) { - lta.allocatorLeader.Store(member) -} - -// unsetAllocatorLeader unsets the current Local TSO Allocator leader. -func (lta *LocalTSOAllocator) unsetAllocatorLeader() { - lta.allocatorLeader.Store(&pdpb.Member{}) -} - // GetAllocatorLeader returns the Local TSO Allocator leader. func (lta *LocalTSOAllocator) GetAllocatorLeader() *pdpb.Member { allocatorLeader := lta.allocatorLeader.Load() @@ -184,22 +140,6 @@ func (lta *LocalTSOAllocator) WriteTSO(maxTS *pdpb.Timestamp) error { return lta.timestampOracle.resetUserTimestamp(context.Background(), lta.leadership, tsoutil.GenerateTS(maxTS), true) } -// EnableAllocatorLeader sets the Local TSO Allocator itself to a leader. -func (lta *LocalTSOAllocator) EnableAllocatorLeader() { - lta.setAllocatorLeader(lta.allocatorManager.member.GetMember()) -} - -// CampaignAllocatorLeader is used to campaign a Local TSO Allocator's leadership. -func (lta *LocalTSOAllocator) CampaignAllocatorLeader(leaseTimeout int64, cmps ...clientv3.Cmp) error { - return lta.leadership.Campaign(leaseTimeout, lta.allocatorManager.member.MemberValue(), cmps...) -} - -// KeepAllocatorLeader is used to keep the PD leader's leadership. -func (lta *LocalTSOAllocator) KeepAllocatorLeader(ctx context.Context) { - defer logutil.LogPanic() - lta.leadership.Keep(ctx) -} - // IsAllocatorLeader returns whether the allocator is still a // Local TSO Allocator leader by checking its leadership's lease and leader info. func (lta *LocalTSOAllocator) IsAllocatorLeader() bool { @@ -251,15 +191,6 @@ func (lta *LocalTSOAllocator) CheckAllocatorLeader() (*pdpb.Member, int64, bool) return allocatorLeader, rev, false } -// WatchAllocatorLeader is used to watch the changes of the Local TSO Allocator leader. -func (lta *LocalTSOAllocator) WatchAllocatorLeader(serverCtx context.Context, allocatorLeader *pdpb.Member, revision int64) { - lta.setAllocatorLeader(allocatorLeader) - // Check the cluster dc-locations to update the max suffix bits - go lta.allocatorManager.ClusterDCLocationChecker() - lta.leadership.Watch(serverCtx, revision) - lta.unsetAllocatorLeader() -} - func (lta *LocalTSOAllocator) getMetrics() *tsoMetrics { return lta.timestampOracle.metrics } diff --git a/server/api/tso.go b/server/api/tso.go index ddac5b65bc4..95096257066 100644 --- a/server/api/tso.go +++ b/server/api/tso.go @@ -15,10 +15,8 @@ package api import ( - "fmt" "net/http" - "github.com/gorilla/mux" "github.com/tikv/pd/server" "github.com/unrolled/render" ) @@ -41,39 +39,7 @@ func newTSOHandler(svr *server.Server, rd *render.Render) *tsoHandler { // @Param name path string true "PD server name" // @Param body body object true "json params" // @Produce json -// @Success 200 {string} string "The transfer command is submitted." -// @Failure 400 {string} string "The input is invalid." -// @Failure 404 {string} string "The member does not exist." -// @Failure 500 {string} string "PD server failed to proceed the request." // @Router /tso/allocator/transfer/{name} [post] -func (h *tsoHandler) TransferLocalTSOAllocator(w http.ResponseWriter, r *http.Request) { - members, membersErr := getMembers(h.svr) - if membersErr != nil { - h.rd.JSON(w, http.StatusInternalServerError, membersErr.Error()) - return - } - name := mux.Vars(r)["name"] - dcLocation := r.URL.Query().Get("dcLocation") - if len(dcLocation) < 1 { - h.rd.JSON(w, http.StatusBadRequest, "dcLocation is undefined") - return - } - var memberID uint64 - for _, m := range members.GetMembers() { - if m.GetName() == name { - memberID = m.GetMemberId() - break - } - } - if memberID == 0 { - h.rd.JSON(w, http.StatusNotFound, fmt.Sprintf("not found, pd: %s", name)) - return - } - // TODO: support local tso forward in api service mode in the future. - err := h.svr.GetTSOAllocatorManager().TransferAllocatorForDCLocation(dcLocation, memberID) - if err != nil { - h.rd.JSON(w, http.StatusInternalServerError, err.Error()) - return - } - h.rd.JSON(w, http.StatusOK, "The transfer command is submitted.") +func (h *tsoHandler) TransferLocalTSOAllocator(w http.ResponseWriter, _ *http.Request) { + h.rd.JSON(w, http.StatusOK, "The transfer command is deprecated.") } diff --git a/server/api/tso_test.go b/server/api/tso_test.go deleted file mode 100644 index c0fbe27321d..00000000000 --- a/server/api/tso_test.go +++ /dev/null @@ -1,65 +0,0 @@ -// Copyright 2021 TiKV Project Authors. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package api - -import ( - "fmt" - "testing" - "time" - - "github.com/stretchr/testify/suite" - tu "github.com/tikv/pd/pkg/utils/testutil" - "github.com/tikv/pd/server" - "github.com/tikv/pd/server/config" -) - -type tsoTestSuite struct { - suite.Suite - svr *server.Server - cleanup tu.CleanupFunc - urlPrefix string -} - -func TestTSOTestSuite(t *testing.T) { - suite.Run(t, new(tsoTestSuite)) -} - -func (suite *tsoTestSuite) SetupSuite() { - re := suite.Require() - suite.svr, suite.cleanup = mustNewServer(re, func(cfg *config.Config) { - cfg.EnableLocalTSO = true - cfg.Labels[config.ZoneLabel] = "dc-1" - }) - server.MustWaitLeader(re, []*server.Server{suite.svr}) - - addr := suite.svr.GetAddr() - suite.urlPrefix = fmt.Sprintf("%s%s/api/v1", addr, apiPrefix) -} - -func (suite *tsoTestSuite) TearDownSuite() { - suite.cleanup() -} - -func (suite *tsoTestSuite) TestTransferAllocator() { - re := suite.Require() - tu.Eventually(re, func() bool { - suite.svr.GetTSOAllocatorManager().ClusterDCLocationChecker() - _, err := suite.svr.GetTSOAllocatorManager().GetAllocator("dc-1") - return err == nil - }, tu.WithWaitFor(15*time.Second), tu.WithTickInterval(3*time.Second)) - addr := suite.urlPrefix + "/tso/allocator/transfer/pd1?dcLocation=dc-1" - err := tu.CheckPostJSON(testDialClient, addr, nil, tu.StatusOK(re)) - re.NoError(err) -}