Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: support primary/transfer api for scheduling and tso #8157

Merged
merged 37 commits into from
Aug 13, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7fa19d3
check primary
HuSharp May 9, 2024
1f13fa2
make test happy
HuSharp May 9, 2024
af995cc
address comment and add test
HuSharp May 9, 2024
8d36be5
only trigger by updating
HuSharp May 9, 2024
2433f0c
change log
HuSharp May 10, 2024
dd72b9c
address comment
HuSharp May 10, 2024
a39300e
change to name
HuSharp May 13, 2024
51708b5
make test happy
HuSharp May 13, 2024
c6d2bc3
address comment and change some comments
HuSharp May 14, 2024
a4c5c29
add more test
HuSharp May 14, 2024
4d0598f
merge master
HuSharp May 21, 2024
6ac311f
Merge branch 'master' into support_transfer_primary2
HuSharp May 30, 2024
510b92a
Merge branch 'master' into support_transfer_primary2
HuSharp Jun 13, 2024
b235bc1
address comment and add more comment
HuSharp Jul 1, 2024
f659782
add more comment
HuSharp Jul 1, 2024
32b0b5f
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
dbc5447
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
204ffd5
address comment
HuSharp Jul 3, 2024
ec8e737
remove redundant wait
HuSharp Jul 4, 2024
9e3b798
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
e53844e
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
cc82e7b
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
19ce9d8
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
4c7f8ac
changed by name
HuSharp Jul 25, 2024
36b5a82
refine code
HuSharp Jul 25, 2024
ea8d9e3
address comment
HuSharp Jul 30, 2024
ffb7b1b
refine code
HuSharp Jul 31, 2024
379b1f6
merge master
HuSharp Aug 5, 2024
d9bffb8
Merge branch 'master' into support_transfer_primary2
HuSharp Aug 12, 2024
d037a6a
remove delete
HuSharp Aug 12, 2024
e711fd9
refine purpose and lease
HuSharp Aug 12, 2024
d999c7f
merge master
HuSharp Aug 12, 2024
7f0a426
refine code
HuSharp Aug 12, 2024
d810ed1
address comment
HuSharp Aug 12, 2024
43830ec
non-essential exported
HuSharp Aug 13, 2024
2d9a3b0
refine check name
HuSharp Aug 13, 2024
c1da5b5
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 30 additions & 0 deletions pkg/election/leadership.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,11 @@ type Leadership struct {
leaderKey string
leaderValue string

leaderWatch struct {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
syncutil.RWMutex
val bool
}

keepAliveCtx context.Context
keepAliveCancelFunc context.CancelFunc
keepAliveCancelFuncLock syncutil.Mutex
Expand All @@ -72,6 +77,10 @@ type Leadership struct {
campaignTimes []time.Time
}

func (ls *Leadership) GetLeaderValue() string {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return ls.leaderValue
}

// NewLeadership creates a new Leadership.
func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadership {
leadership := &Leadership{
Expand Down Expand Up @@ -113,6 +122,20 @@ func (ls *Leadership) GetLeaderKey() string {
return ls.leaderKey
}

// SetLeaderWatch sets the leader watch flag.
func (ls *Leadership) SetLeaderWatch(val bool) {
ls.leaderWatch.Lock()
ls.leaderWatch.val = val
ls.leaderWatch.Unlock()
}

// GetLeaderWatch gets the leader watch flag.
func (ls *Leadership) GetLeaderWatch() bool {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
ls.leaderWatch.RLock()
defer ls.leaderWatch.RUnlock()
return ls.leaderWatch.val
}

// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`.
func (ls *Leadership) GetCampaignTimesNum() int {
if ls == nil {
Expand Down Expand Up @@ -375,6 +398,12 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) {
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
return
}
// only API update the leader key to transfer the leader will meet
if ev.Type == mvccpb.PUT && ls.GetLeaderWatch() {
log.Info("[LeaderWatch] current leadership is updated",
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose))
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
revision = wresp.Header.Revision + 1
}
Expand All @@ -393,4 +422,5 @@ func (ls *Leadership) Reset() {
}
ls.keepAliveCancelFuncLock.Unlock()
ls.getLease().Close()
ls.SetLeaderWatch(false)
}
62 changes: 58 additions & 4 deletions pkg/mcs/discovery/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,15 @@
package discovery

import (
"math/rand"
"strconv"
"time"

"github.com/pingcap/errors"
"github.com/pingcap/log"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/mcs/utils"
"github.com/tikv/pd/pkg/storage/endpoint"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/etcdutil"
"go.etcd.io/etcd/clientv3"
Expand All @@ -45,14 +48,14 @@ func Discover(cli *clientv3.Client, clusterID, serviceName string) ([]string, er
}

// GetMSMembers returns all the members of the specified service name.
func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch name {
func GetMSMembers(serviceName string, client *clientv3.Client) ([]ServiceRegistryEntry, error) {
switch serviceName {
case utils.TSOServiceName, utils.SchedulingServiceName, utils.ResourceManagerServiceName:
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return nil, err
}
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), name)
servicePath := ServicePath(strconv.FormatUint(clusterID, 10), serviceName)
resps, err := kv.NewSlowLogTxn(client).Then(clientv3.OpGet(servicePath, clientv3.WithPrefix())).Commit()
if err != nil {
return nil, errs.ErrEtcdKVGet.Wrap(err).GenWithStackByCause()
Expand All @@ -75,5 +78,56 @@ func GetMSMembers(name string, client *clientv3.Client) ([]ServiceRegistryEntry,
return entries, nil
}

return nil, errors.Errorf("unknown service name %s", name)
return nil, errors.Errorf("unknown service name %s", serviceName)
}

func TransferPrimary(client *clientv3.Client, serviceName, oldPrimary, newPrimary string) error {
log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary))
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
entries, err := GetMSMembers(serviceName, client)
if err != nil {
return err
}

// Do nothing when I am the only member of cluster.
if len(entries) == 1 && newPrimary == "" {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return errors.New("no valid follower to transfer primary")
}

var primaryIDs []string
var memberValues []string
for _, member := range entries {
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.ServiceAddr == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
memberValues = append(memberValues, string(member.MemberValue))
}
}
if len(primaryIDs) == 0 {
return errors.New("no valid follower to transfer primary")
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
}

r := rand.New(rand.NewSource(time.Now().UnixNano()))
nextPrimaryID := r.Intn(len(primaryIDs))

clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return errors.Errorf("failed to get cluster ID: %v", err)
}

var primaryKey string
switch serviceName {
case utils.SchedulingServiceName:
primaryKey = endpoint.SchedulingPrimaryPath(clusterID)
case utils.TSOServiceName:
tsoRootPath := endpoint.TSOSvcRootPath(clusterID)
primaryKey = endpoint.KeyspaceGroupPrimaryPath(tsoRootPath, utils.DefaultKeyspaceGroupID)
}

// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If the transaction takes a long time here, will it be possible to make the lease expire unexpectedly?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We give lease 3 seconds.

  • If this transaction takes a long time will only have the effect of causing the transfer primary to start very slowly, which will also be reflected by the api take a long time to execute it.
  • If you mean that the grant lease was not updated due to a large transaction failure, then the primary will be re-selected after a timeout. You can checked in TestTransferPrimaryWhileLeaseExpired

https://github.com/tikv/pd/pull/8157/files#diff-5a1f6bff4bb6b99fcd2f7bf4edc37aa937e8c51eafa41acc0aabd7aa1a1d1da6R267

Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID])).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s", serviceName)
}
return nil
}
2 changes: 2 additions & 0 deletions pkg/mcs/discovery/registry_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,13 @@ import (

// ServiceRegistryEntry is the registry entry of a service
type ServiceRegistryEntry struct {
Name string `json:"name"`
ServiceAddr string `json:"service-addr"`
Version string `json:"version"`
GitHash string `json:"git-hash"`
DeployPath string `json:"deploy-path"`
StartTimestamp int64 `json:"start-timestamp"`
MemberValue []byte `json:"member-value"`
}

// Serialize this service registry entry
Expand Down
62 changes: 62 additions & 0 deletions pkg/mcs/scheduling/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package server
import (
"context"
"fmt"
"github.com/tikv/pd/pkg/utils/etcdutil"
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
"net/http"
"os"
"os/signal"
Expand Down Expand Up @@ -128,6 +129,10 @@ func (s *Server) GetBackendEndpoints() string {
return s.cfg.BackendEndpoints
}

func (s *Server) GetParticipant() *member.Participant {
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return s.participant
}

// SetLogLevel sets log level.
func (s *Server) SetLogLevel(level string) error {
if !logutil.IsLevelLegal(level) {
Expand Down Expand Up @@ -243,6 +248,17 @@ func (s *Server) primaryElectionLoop() {
log.Info("the scheduling primary has changed, try to re-campaign a primary")
}

// To make sure the expected leader(if exist) and primary are on the same server.
expectedPrimary := utils.GetExpectedPrimary(s.participant.GetLeaderPath(), s.GetClient())
if expectedPrimary != "" && expectedPrimary != s.participant.GetLeadership().GetLeaderValue() {
log.Info("skip campaigning of scheduling primary and check later",
zap.String("server-name", s.Name()),
zap.String("target-primary-id", expectedPrimary),
zap.Uint64("member-id", s.participant.ID()))
time.Sleep(200 * time.Millisecond)
continue
}

s.campaignLeader()
}
}
Expand Down Expand Up @@ -290,6 +306,9 @@ func (s *Server) campaignLeader() {
member.ServiceMemberGauge.WithLabelValues(serviceName).Set(1)
log.Info("scheduling primary is ready to serve", zap.String("scheduling-primary-name", s.participant.Name()))

exitPrimary := make(chan struct{})
go s.primaryWatch(ctx, exitPrimary)

leaderTicker := time.NewTicker(utils.LeaderTickInterval)
defer leaderTicker.Stop()

Expand All @@ -304,6 +323,47 @@ func (s *Server) campaignLeader() {
// Server is closed and it should return nil.
log.Info("server is closed")
return
case <-exitPrimary:
log.Info("no longer a primary/leader because primary have been updated, the scheduling primary/leader will step down")
HuSharp marked this conversation as resolved.
Show resolved Hide resolved
return
}
}
}

func (s *Server) primaryWatch(ctx context.Context, exitPrimary chan struct{}) {
_, revision, err := s.participant.GetPersistentLeader()
if err != nil {
log.Error("[primary] getting the leader meets error", errs.ZapError(err))
return
}
log.Info("[primary] start to watch the primary", zap.Stringer("scheduling-primary", s.participant.GetLeader()))
// Watch will keep looping and never return unless the primary has changed.
s.participant.GetLeadership().SetLeaderWatch(true)
s.participant.GetLeadership().Watch(s.serverLoopCtx, revision+1)
s.participant.GetLeadership().SetLeaderWatch(false)

// only API update primary will set the expected leader
// check leader key whether deleted
leaderRaw, err := etcdutil.GetValue(s.participant.Client(), s.participant.GetLeaderPath())
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
if leaderRaw == nil {
log.Info("[primary] leader key is deleted, the primary will step down")
return
}

utils.SetExpectedPrimary(s.participant.Client(), s.participant.GetLeaderPath())

s.participant.UnsetLeader()
for {
select {
case <-ctx.Done():
log.Info("[primary] exit the primary watch loop")
return
case exitPrimary <- struct{}{}:
return
}
}
}
Expand Down Expand Up @@ -425,6 +485,7 @@ func (s *Server) startServer() (err error) {
GitHash: versioninfo.PDGitHash,
DeployPath: deployPath,
StartTimestamp: s.StartTimestamp(),
Name: s.Name(),
}
uniqueName := s.cfg.GetAdvertiseListenAddr()
uniqueID := memberutil.GenerateUniqueID(uniqueName)
Expand All @@ -436,6 +497,7 @@ func (s *Server) startServer() (err error) {
ListenUrls: []string{s.cfg.GetAdvertiseListenAddr()},
}
s.participant.InitInfo(p, endpoint.SchedulingSvcRootPath(s.clusterID), utils.PrimaryKey, "primary election")
s.serviceID.MemberValue = []byte(s.participant.MemberValue())

s.service = &Service{Server: s}
s.AddServiceReadyCallback(s.startCluster)
Expand Down
4 changes: 4 additions & 0 deletions pkg/mcs/tso/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,13 +380,17 @@ func (s *Server) startServer() (err error) {
GitHash: versioninfo.PDGitHash,
DeployPath: deployPath,
StartTimestamp: s.StartTimestamp(),
Name: s.Name(),
}
s.keyspaceGroupManager = tso.NewKeyspaceGroupManager(
s.serverLoopCtx, s.serviceID, s.GetClient(), s.GetHTTPClient(), s.cfg.AdvertiseListenAddr,
s.clusterID, legacySvcRootPath, tsoSvcRootPath, s.cfg)
if err := s.keyspaceGroupManager.Initialize(); err != nil {
return err
}
// Initialize the service ID with the member value of the primary of the default keyspace group.
memberValue, err := s.GetMember(utils.DefaultKeyspaceID, utils.DefaultKeyspaceGroupID)
s.serviceID.MemberValue = []byte(memberValue.MemberValue())

s.tsoProtoFactory = &tsoutil.TSOProtoFactory{}
s.service = &Service{Server: s}
Expand Down
47 changes: 47 additions & 0 deletions pkg/mcs/utils/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/prometheus/client_golang/prometheus/promhttp"
"github.com/soheilhy/cmux"
"github.com/tikv/pd/pkg/errs"
"github.com/tikv/pd/pkg/storage/kv"
"github.com/tikv/pd/pkg/utils/apiutil"
"github.com/tikv/pd/pkg/utils/apiutil/multiservicesapi"
"github.com/tikv/pd/pkg/utils/etcdutil"
Expand All @@ -51,6 +52,9 @@ const (
ClusterIDPath = "/pd/cluster_id"
// retryInterval is the interval to retry.
retryInterval = time.Second
// ExpectedPrimary is the path to store the expected primary
// ONLY SET VALUE BY API
ExpectedPrimary = "expected_primary"
)

// InitClusterID initializes the cluster ID.
Expand All @@ -70,6 +74,49 @@ func InitClusterID(ctx context.Context, client *clientv3.Client) (id uint64, err
return 0, errors.Errorf("failed to init cluster ID after retrying %d times", maxRetryTimes)
}

// GetExpectedPrimary indicates API has changed the primary, ONLY SET VALUE BY API.
func GetExpectedPrimary(keyPath string, client *clientv3.Client) string {
leader, err := etcdutil.GetValue(client, strings.Join([]string{keyPath, ExpectedPrimary}, "/"))
if err != nil {
log.Error("get expected primary key error", errs.ZapError(err))
return ""
}

return string(leader)
}

// RemoveExpectedPrimary removes the expected primary key.
func RemoveExpectedPrimary(client *clientv3.Client, leaderPath string) {
// remove expected leader key
resp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"))).
Commit()
if err != nil || !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
}

// SetExpectedPrimary sets the expected primary key when the current primary has exited.
func SetExpectedPrimary(client *clientv3.Client, leaderPath string) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The expected primary may be confused, How about something like xxxflag or something else?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

changed, PTAL thx!

leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("[primary] get primary key error", zap.Error(err))
return
}
// write a flag to indicate the current primary has exited
resp, err := kv.NewSlowLogTxn(client).
Then(
clientv3.OpPut(strings.Join([]string{leaderPath, ExpectedPrimary}, "/"), string(leaderRaw)),
// indicate the current primary has exited
clientv3.OpDelete(leaderPath)).
Commit()
if err != nil || !resp.Succeeded {
log.Error("change primary error", errs.ZapError(err))
return
}
}

// PromHandler is a handler to get prometheus metrics.
func PromHandler() gin.HandlerFunc {
return func(c *gin.Context) {
Expand Down
Loading
Loading