-
Notifications
You must be signed in to change notification settings - Fork 727
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
pdms: support primary/transfer api for scheduling and tso #8157
Changes from 34 commits
7fa19d3
1f13fa2
af995cc
8d36be5
2433f0c
dd72b9c
a39300e
51708b5
c6d2bc3
a4c5c29
4d0598f
6ac311f
510b92a
b235bc1
f659782
32b0b5f
dbc5447
204ffd5
ec8e737
9e3b798
e53844e
cc82e7b
19ce9d8
4c7f8ac
36b5a82
ea8d9e3
ffb7b1b
379b1f6
d9bffb8
d037a6a
e711fd9
d999c7f
7f0a426
d810ed1
43830ec
2d9a3b0
c1da5b5
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -71,6 +71,9 @@ type Leadership struct { | |
// campaignTimes is used to record the campaign times of the leader within `campaignTimesRecordTimeout`. | ||
// It is ordered by time to prevent the leader from campaigning too frequently. | ||
campaignTimes []time.Time | ||
// primaryWatch is for the primary watch only, | ||
// which is used to reuse `Watch` interface in `Leadership`. | ||
primaryWatch atomic.Bool | ||
} | ||
|
||
// NewLeadership creates a new Leadership. | ||
|
@@ -84,17 +87,18 @@ func NewLeadership(client *clientv3.Client, leaderKey, purpose string) *Leadersh | |
return leadership | ||
} | ||
|
||
// getLease gets the lease of leadership, only if leadership is valid, | ||
// GetLease gets the lease of leadership, only if leadership is valid, | ||
// i.e. the owner is a true leader, the lease is not nil. | ||
func (ls *Leadership) getLease() *lease { | ||
func (ls *Leadership) GetLease() *Lease { | ||
l := ls.lease.Load() | ||
if l == nil { | ||
return nil | ||
} | ||
return l.(*lease) | ||
return l.(*Lease) | ||
} | ||
|
||
func (ls *Leadership) setLease(lease *lease) { | ||
// SetLease sets the lease of leadership. | ||
func (ls *Leadership) SetLease(lease *Lease) { | ||
ls.lease.Store(lease) | ||
} | ||
|
||
|
@@ -114,6 +118,16 @@ func (ls *Leadership) GetLeaderKey() string { | |
return ls.leaderKey | ||
} | ||
|
||
// SetPrimaryWatch sets the primary watch flag. | ||
func (ls *Leadership) SetPrimaryWatch(val bool) { | ||
ls.primaryWatch.Store(val) | ||
} | ||
|
||
// IsPrimary gets the primary watch flag. | ||
func (ls *Leadership) IsPrimary() bool { | ||
return ls.primaryWatch.Load() | ||
} | ||
|
||
// GetCampaignTimesNum is used to get the campaign times of the leader within `campaignTimesRecordTimeout`. | ||
// Need to make sure `AddCampaignTimes` is called before this function. | ||
func (ls *Leadership) GetCampaignTimesNum() int { | ||
|
@@ -152,18 +166,19 @@ func (ls *Leadership) AddCampaignTimes() { | |
func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...clientv3.Cmp) error { | ||
ls.leaderValue = leaderData | ||
// Create a new lease to campaign | ||
newLease := &lease{ | ||
Purpose: ls.purpose, | ||
client: ls.client, | ||
lease: clientv3.NewLease(ls.client), | ||
} | ||
ls.setLease(newLease) | ||
newLease := NewLease(ls.client, ls.purpose) | ||
ls.SetLease(newLease) | ||
|
||
failpoint.Inject("skipGrantLeader", func(val failpoint.Value) { | ||
name, ok := val.(string) | ||
if len(name) == 0 { | ||
// return directly when not set the name | ||
failpoint.Return(errors.Errorf("failed to grant lease")) | ||
} | ||
var member pdpb.Member | ||
_ = member.Unmarshal([]byte(leaderData)) | ||
name, ok := val.(string) | ||
if ok && member.Name == name { | ||
// only return when the name is set and the name is equal to the leader name | ||
failpoint.Return(errors.Errorf("failed to grant lease")) | ||
} | ||
}) | ||
|
@@ -200,12 +215,12 @@ func (ls *Leadership) Keep(ctx context.Context) { | |
ls.keepAliveCancelFuncLock.Lock() | ||
ls.keepAliveCtx, ls.keepAliveCancelFunc = context.WithCancel(ctx) | ||
ls.keepAliveCancelFuncLock.Unlock() | ||
go ls.getLease().KeepAlive(ls.keepAliveCtx) | ||
go ls.GetLease().KeepAlive(ls.keepAliveCtx) | ||
} | ||
|
||
// Check returns whether the leadership is still available. | ||
func (ls *Leadership) Check() bool { | ||
return ls != nil && ls.getLease() != nil && !ls.getLease().IsExpired() | ||
return ls != nil && ls.GetLease() != nil && !ls.GetLease().IsExpired() | ||
} | ||
|
||
// LeaderTxn returns txn() with a leader comparison to guarantee that | ||
|
@@ -376,6 +391,13 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { | |
zap.Int64("revision", wresp.Header.Revision), zap.String("leader-key", ls.leaderKey), zap.String("purpose", ls.purpose)) | ||
return | ||
} | ||
// ONLY `{service}/primary/transfer` API update primary will meet this condition. | ||
if ev.Type == mvccpb.PUT && ls.IsPrimary() { | ||
log.Info("current leadership is updated", zap.Int64("revision", wresp.Header.Revision), | ||
zap.String("leader-key", ls.leaderKey), zap.ByteString("cur-value", ev.Kv.Value), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to check if the kv has the value? |
||
zap.String("purpose", ls.purpose)) | ||
return | ||
} | ||
} | ||
revision = wresp.Header.Revision + 1 | ||
} | ||
|
@@ -385,13 +407,14 @@ func (ls *Leadership) Watch(serverCtx context.Context, revision int64) { | |
|
||
// Reset does some defer jobs such as closing lease, resetting lease etc. | ||
func (ls *Leadership) Reset() { | ||
if ls == nil || ls.getLease() == nil { | ||
if ls == nil || ls.GetLease() == nil { | ||
return | ||
} | ||
ls.keepAliveCancelFuncLock.Lock() | ||
if ls.keepAliveCancelFunc != nil { | ||
ls.keepAliveCancelFunc() | ||
} | ||
ls.keepAliveCancelFuncLock.Unlock() | ||
ls.getLease().Close() | ||
ls.GetLease().Close() | ||
ls.SetPrimaryWatch(false) | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Do we need to skip it in non-ms mode? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Maybe it's no problem that this code makes false negatives for |
||
} |
Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||
---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
@@ -49,12 +49,14 @@ func NewTestServer(ctx context.Context, re *require.Assertions, cfg *config.Conf | |||||||||||||||||||||||||||
// GenerateConfig generates a new config with the given options. | ||||||||||||||||||||||||||||
func GenerateConfig(c *config.Config) (*config.Config, error) { | ||||||||||||||||||||||||||||
arguments := []string{ | ||||||||||||||||||||||||||||
"--name=" + c.Name, | ||||||||||||||||||||||||||||
"--listen-addr=" + c.ListenAddr, | ||||||||||||||||||||||||||||
"--advertise-listen-addr=" + c.AdvertiseListenAddr, | ||||||||||||||||||||||||||||
"--backend-endpoints=" + c.BackendEndpoints, | ||||||||||||||||||||||||||||
} | ||||||||||||||||||||||||||||
|
||||||||||||||||||||||||||||
flagSet := pflag.NewFlagSet("test", pflag.ContinueOnError) | ||||||||||||||||||||||||||||
flagSet.StringP("name", "", "", "human-readable name for this scheduling member") | ||||||||||||||||||||||||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shall we need a default name? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. default name set by this code, which is like pd/pkg/mcs/tso/server/config.go Lines 185 to 195 in a39300e
And your commented snippet is for testing to avoid using the same name for the same machine for local testing, I used addr here Lines 87 to 88 in 51708b5
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. got |
||||||||||||||||||||||||||||
flagSet.BoolP("version", "V", false, "print version information and exit") | ||||||||||||||||||||||||||||
flagSet.StringP("config", "", "", "config file") | ||||||||||||||||||||||||||||
flagSet.StringP("backend-endpoints", "", "", "url for etcd client") | ||||||||||||||||||||||||||||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think here we can optimize the case that if the updated primary is still itself, no need to return to campaign again.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/HuSharp/pd/blob/43830ec9ea80d92008be663ec5c26b3d137373a9/pkg/mcs/utils/expected_primary.go#L141-L146
I skipped the member on the outer layer(in
transfer primary
) which is not the same as oldPrimary, do you think I still need to add the restriction inside thewatch
?Maybe the
expected primary flag
should not be modified when the leader is itself, because this flag must keep the lease alive after campaigning new leader, which means that it requires the Primary to quit the currentwatch
.