Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

pdms: support primary/transfer api for scheduling and tso #8157

Merged
merged 37 commits into from
Aug 13, 2024
Merged
Changes from 1 commit
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
7fa19d3
check primary
HuSharp May 9, 2024
1f13fa2
make test happy
HuSharp May 9, 2024
af995cc
address comment and add test
HuSharp May 9, 2024
8d36be5
only trigger by updating
HuSharp May 9, 2024
2433f0c
change log
HuSharp May 10, 2024
dd72b9c
address comment
HuSharp May 10, 2024
a39300e
change to name
HuSharp May 13, 2024
51708b5
make test happy
HuSharp May 13, 2024
c6d2bc3
address comment and change some comments
HuSharp May 14, 2024
a4c5c29
add more test
HuSharp May 14, 2024
4d0598f
merge master
HuSharp May 21, 2024
6ac311f
Merge branch 'master' into support_transfer_primary2
HuSharp May 30, 2024
510b92a
Merge branch 'master' into support_transfer_primary2
HuSharp Jun 13, 2024
b235bc1
address comment and add more comment
HuSharp Jul 1, 2024
f659782
add more comment
HuSharp Jul 1, 2024
32b0b5f
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
dbc5447
Merge branch 'master' into support_transfer_primary2
HuSharp Jul 2, 2024
204ffd5
address comment
HuSharp Jul 3, 2024
ec8e737
remove redundant wait
HuSharp Jul 4, 2024
9e3b798
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
e53844e
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 8, 2024
cc82e7b
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
19ce9d8
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Jul 9, 2024
4c7f8ac
changed by name
HuSharp Jul 25, 2024
36b5a82
refine code
HuSharp Jul 25, 2024
ea8d9e3
address comment
HuSharp Jul 30, 2024
ffb7b1b
refine code
HuSharp Jul 31, 2024
379b1f6
merge master
HuSharp Aug 5, 2024
d9bffb8
Merge branch 'master' into support_transfer_primary2
HuSharp Aug 12, 2024
d037a6a
remove delete
HuSharp Aug 12, 2024
e711fd9
refine purpose and lease
HuSharp Aug 12, 2024
d999c7f
merge master
HuSharp Aug 12, 2024
7f0a426
refine code
HuSharp Aug 12, 2024
d810ed1
address comment
HuSharp Aug 12, 2024
43830ec
non-essential exported
HuSharp Aug 13, 2024
2d9a3b0
refine check name
HuSharp Aug 13, 2024
c1da5b5
Merge branch 'master' into support_transfer_primary2
ti-chi-bot[bot] Aug 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Prev Previous commit
Next Next commit
remove redundant wait
Signed-off-by: husharp <jinhao.hu@pingcap.com>
HuSharp committed Jul 4, 2024
commit ec8e737816cbc9d7727af88ccd632c052d701634
7 changes: 6 additions & 1 deletion pkg/election/leadership.go
Original file line number Diff line number Diff line change
@@ -173,10 +173,15 @@ func (ls *Leadership) Campaign(leaseTimeout int64, leaderData string, cmps ...cl
ls.setLease(newLease)

failpoint.Inject("skipGrantLeader", func(val failpoint.Value) {
name, ok := val.(string)
if name == "" {
// return directly when not set the name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
var member pdpb.Member
_ = member.Unmarshal([]byte(leaderData))
name, ok := val.(string)
if ok && member.Name == name {
// only return when the name is set and the name is equal to the leader name
failpoint.Return(errors.Errorf("failed to grant lease"))
}
})
11 changes: 4 additions & 7 deletions tests/integrations/mcs/members/member_test.go
Original file line number Diff line number Diff line change
@@ -16,7 +16,6 @@ package members_test

import (
"context"
"fmt"
"testing"
"time"

@@ -292,17 +291,16 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpired() {
}
}
// Mock the new primary can not grant leader which means the lease will expire
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", fmt.Sprintf("return(\"%s\")", newPrimary)))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`))
err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary)
re.NoError(err)

// Wait for the old primary exit and new primary campaign
// cannot check newPrimary isServing when skipGrantLeader is enabled
testutil.Eventually(re, func() bool {
return !nodes[primary].IsServing()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

// Wait for the new primary lease to expire which is `DefaultLeaderLease`
time.Sleep(4 * time.Second)
// TODO: Add campaign times check in mcs to avoid frequent campaign
re.NoError(failpoint.Disable("github.com/tikv/pd/pkg/election/skipGrantLeader"))
// Can still work after lease expired
@@ -339,17 +337,16 @@ func (suite *memberTestSuite) TestTransferPrimaryWhileLeaseExpiredAndServerDown(
}
}
// Mock the new primary can not grant leader which means the lease will expire
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", fmt.Sprintf("return(\"%s\")", newPrimary)))
re.NoError(failpoint.Enable("github.com/tikv/pd/pkg/election/skipGrantLeader", `return()`))
err = suite.pdClient.TransferMicroServicePrimary(suite.ctx, service, newPrimary)
re.NoError(err)

// Wait for the old primary exit and new primary campaign
// cannot check newPrimary isServing when skipGrantLeader is enabled
testutil.Eventually(re, func() bool {
return !nodes[primary].IsServing()
}, testutil.WithWaitFor(5*time.Second), testutil.WithTickInterval(50*time.Millisecond))

// Wait for the new primary lease to expire which is `DefaultLeaderLease`
time.Sleep(4 * time.Second)
// TODO: Add campaign times check in mcs to avoid frequent campaign
// for now, close the current primary to mock the server down
nodes[newPrimary].Close()

Unchanged files with check annotations Beta

NewPrimary: newPrimary,
})
if err != nil {
return errors.Trace(err)

Check warning on line 970 in client/http/interface.go

Codecov / codecov/patch

client/http/interface.go#L970

Added line #L970 was not covered by tests
}
return c.request(ctx, newRequestInfo().
WithName(transferMicroServicePrimaryName).
return entries, nil
}
return nil, errors.Errorf("unknown service name %s", serviceName)

Check warning on line 82 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L82

Added line #L82 was not covered by tests
}
// TransferPrimary transfers the primary of the specified service.
log.Info("transfer primary", zap.String("service", serviceName), zap.String("from", oldPrimary), zap.String("to", newPrimary))
entries, err := GetMSMembers(serviceName, client)
if err != nil {
return err

Check warning on line 90 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L90

Added line #L90 was not covered by tests
}
// Do nothing when I am the only member of cluster.
if len(entries) == 1 {
return errors.New(fmt.Sprintf("no valid secondary to transfer primary, the only member is %s", entries[0].Name))

Check warning on line 95 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L95

Added line #L95 was not covered by tests
}
var primaryIDs []string
if (newPrimary == "" && member.ServiceAddr != oldPrimary) || (newPrimary != "" && member.Name == newPrimary) {
primaryIDs = append(primaryIDs, member.ServiceAddr)
if string(member.MemberValue) == "" {
return errors.New(fmt.Sprintf("member %s value is empty", member.Name))

Check warning on line 104 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L104

Added line #L104 was not covered by tests
}
memberValues = append(memberValues, string(member.MemberValue))
}
clusterID, err := etcdutil.GetClusterID(client, utils.ClusterIDPath)
if err != nil {
return errors.Errorf("failed to get cluster ID: %v", err)

Check warning on line 118 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L118

Added line #L118 was not covered by tests
}
var primaryKey string
// grant the primary lease to the new primary.
grantResp, err := client.Grant(client.Ctx(), utils.DefaultLeaderLease)
if err != nil {
return errors.Errorf("failed to grant lease for %s, err: %v", serviceName, err)

Check warning on line 136 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L136

Added line #L136 was not covered by tests
}
// update primary key to notify old primary server.
putResp, err := kv.NewSlowLogTxn(client).
Then(clientv3.OpPut(primaryKey, memberValues[nextPrimaryID], clientv3.WithLease(grantResp.ID))).
Commit()
if err != nil || !putResp.Succeeded {
return errors.Errorf("failed to write primary flag for %s, err: %v", serviceName, err)

Check warning on line 143 in pkg/mcs/discovery/discover.go

Codecov / codecov/patch

pkg/mcs/discovery/discover.go#L143

Added line #L143 was not covered by tests
}
return nil
}
return s.cfg.BackendEndpoints
}
func (s *Server) GetParticipant() *member.Participant {
return s.participant

Check warning on line 133 in pkg/mcs/scheduling/server/server.go

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L132-L133

Added lines #L132 - L133 were not covered by tests
}
// SetLogLevel sets log level.
s.participant.UnsetLeader()
defer log.Info("scheduling primary exit the primary watch loop")
select {
case <-ctx.Done():
return

Check warning on line 367 in pkg/mcs/scheduling/server/server.go

Codecov / codecov/patch

pkg/mcs/scheduling/server/server.go#L366-L367

Added lines #L366 - L367 were not covered by tests
// 3. exit the primary watch loop, 4.`exitPrimary` will help delete the leader key.
case exitPrimary <- struct{}{}:
return
func AttachExpectedPrimaryFlag(client *clientv3.Client, leaderPath string) string {
primary, err := etcdutil.GetValue(client, strings.Join([]string{leaderPath, ExpectedPrimaryFlag}, "/"))
if err != nil {
log.Error("get expected primary flag error", errs.ZapError(err))
return ""

Check warning on line 86 in pkg/mcs/utils/util.go

Codecov / codecov/patch

pkg/mcs/utils/util.go#L85-L86

Added lines #L85 - L86 were not covered by tests
}
return string(primary)
Then(clientv3.OpDelete(strings.Join([]string{leaderPath, ExpectedPrimaryFlag}, "/"))).
Commit()
if err != nil || !resp.Succeeded {
log.Error("change expected primary error", errs.ZapError(err))
return

Check warning on line 103 in pkg/mcs/utils/util.go

Codecov / codecov/patch

pkg/mcs/utils/util.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}
}
log.Info("set expected primary flag", zap.String("leader-path", leaderPath))
leaderRaw, err := etcdutil.GetValue(client, leaderPath)
if err != nil {
log.Error("get primary key error", zap.Error(err))
return

Check warning on line 113 in pkg/mcs/utils/util.go

Codecov / codecov/patch

pkg/mcs/utils/util.go#L112-L113

Added lines #L112 - L113 were not covered by tests
}
grantResp, err := client.Grant(client.Ctx(), DefaultLeaderLease)
if err != nil {
log.Error("grant lease for expected primary error", errs.ZapError(err))
return

Check warning on line 118 in pkg/mcs/utils/util.go

Codecov / codecov/patch

pkg/mcs/utils/util.go#L117-L118

Added lines #L117 - L118 were not covered by tests
}
// write a flag to indicate the current primary has exited
resp, err := kv.NewSlowLogTxn(client).
clientv3.OpDelete(leaderPath)).
Commit()
if err != nil || !resp.Succeeded {
log.Error("change expected primary error", errs.ZapError(err))
return

Check warning on line 129 in pkg/mcs/utils/util.go

Codecov / codecov/patch

pkg/mcs/utils/util.go#L128-L129

Added lines #L128 - L129 were not covered by tests
}
}
// only `/ms/primary/transfer` API update primary will set `leaderPath` to the expected primary.
curPrimary, err := etcdutil.GetValue(gta.member.Client(), gta.member.GetLeaderPath())
if err != nil {
log.Error("tso primary getting the leader meets error", errs.ZapError(err))
return

Check warning on line 702 in pkg/tso/global_allocator.go

Codecov / codecov/patch

pkg/tso/global_allocator.go#L701-L702

Added lines #L701 - L702 were not covered by tests
}
if curPrimary != nil && resp.Kvs[0].Value != nil && string(curPrimary) != string(resp.Kvs[0].Value) {
// 1. modify the expected primary flag to the new primary.
func TransferPrimary(c *gin.Context) {
svr := c.MustGet(middlewares.ServerContextKey).(*server.Server)
if !svr.IsAPIServiceMode() {
c.AbortWithStatusJSON(http.StatusNotFound, "not support micro service")
return

Check warning on line 96 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L95-L96

Added lines #L95 - L96 were not covered by tests
}
if service := c.Param("service"); len(service) > 0 {
var input map[string]string
if err := c.BindJSON(&input); err != nil {
c.String(http.StatusBadRequest, err.Error())
return

Check warning on line 103 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L102-L103

Added lines #L102 - L103 were not covered by tests
}
newPrimary, keyspaceGroupID := "", utils.DefaultKeyspaceGroupID
}
if v, ok := input["keyspace_group_id"]; ok {
keyspaceGroupIDRaw, err := strconv.ParseUint(v, 10, 32)
if err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return

Check warning on line 115 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L112-L115

Added lines #L112 - L115 were not covered by tests
}
keyspaceGroupID = uint32(keyspaceGroupIDRaw)

Check warning on line 117 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L117

Added line #L117 was not covered by tests
}
oldPrimary, _ := svr.GetServicePrimaryAddr(c.Request.Context(), service)
if oldPrimary == newPrimary {
c.AbortWithStatusJSON(http.StatusInternalServerError, "new primary is the same as the old one")
return

Check warning on line 122 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L121-L122

Added lines #L121 - L122 were not covered by tests
}
if err := discovery.TransferPrimary(svr.GetClient(), service, oldPrimary, newPrimary, keyspaceGroupID); err != nil {
c.AbortWithStatusJSON(http.StatusInternalServerError, err.Error())
return
}
c.AbortWithStatusJSON(http.StatusInternalServerError, "please specify service")

Check warning on line 132 in server/apiv2/handlers/micro_service.go

Codecov / codecov/patch

server/apiv2/handlers/micro_service.go#L132

Added line #L132 was not covered by tests
}