Skip to content

Commit

Permalink
infosync: integrate PD HTTP client into the placement manager (#48858)
Browse files Browse the repository at this point in the history
ref #35319
  • Loading branch information
JmPotato authored Nov 27, 2023
1 parent d38039e commit c82e19c
Show file tree
Hide file tree
Showing 19 changed files with 521 additions and 590 deletions.
12 changes: 6 additions & 6 deletions DEPS.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -7019,13 +7019,13 @@ def go_deps():
name = "com_github_tikv_pd_client",
build_file_proto_mode = "disable_global",
importpath = "github.com/tikv/pd/client",
sha256 = "440821579da980d0405695b463da892608a59252a296cd7e52b4f97881c5fdb7",
strip_prefix = "github.com/tikv/pd/[email protected]20231121080541-8919bc11f770",
sha256 = "5232ba0bba677a6d4614ae2cc102554d59cd00d473d9138739508d6f25169f02",
strip_prefix = "github.com/tikv/pd/[email protected]20231127075044-9f4803d8bd05",
urls = [
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231121080541-8919bc11f770.zip",
"http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
"https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231127075044-9f4803d8bd05.zip",
],
)
go_repository(
Expand Down
2 changes: 1 addition & 1 deletion br/pkg/pdutil/pd.go
Original file line number Diff line number Diff line change
Expand Up @@ -389,7 +389,7 @@ func (p *PdController) getRegionCountWith(
}
var err error
for _, addr := range p.getAllPDAddrs() {
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(start, end), p.cli, http.MethodGet, nil)
v, e := get(ctx, addr, pdhttp.RegionStatsByKeyRange(pdhttp.NewKeyRange(start, end)), p.cli, http.MethodGet, nil)
if e != nil {
err = e
continue
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -103,7 +103,7 @@ require (
github.com/tdakkota/asciicheck v0.2.0
github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966
github.com/twmb/murmur3 v1.1.6
github.com/uber/jaeger-client-go v2.22.1+incompatible
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -853,8 +853,8 @@ github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW
github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173 h1:lmJzX0kqrV7kO21wrZPbtjkidzwbDCfXeQrhDWEi5dE=
github.com/tikv/client-go/v2 v2.0.8-0.20231116051730-1c2351c28173/go.mod h1:BOGTSZtbMHEnGC4HOpbONdnTQF+E9nb2Io7c3P9sb7g=
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770 h1:YSXDKT9+KngRSAShoSQVKD/CK1kR4X/9hutKkSK9gn0=
github.com/tikv/pd/client v0.0.0-20231121080541-8919bc11f770/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05 h1:87NPUfzaVrO5MTBwVCPQ/FlJGpFnHi6WFYHDYD3n3Zc=
github.com/tikv/pd/client v0.0.0-20231127075044-9f4803d8bd05/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M=
github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ=
github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs=
Expand Down
2 changes: 2 additions & 0 deletions pkg/ddl/placement/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ go_library(
"//pkg/tablecodec",
"//pkg/util/codec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_tikv_pd_client//http",
"@in_gopkg_yaml_v2//:yaml_v2",
],
)
Expand Down Expand Up @@ -45,5 +46,6 @@ go_test(
"//pkg/util/codec",
"@com_github_pingcap_failpoint//:failpoint",
"@com_github_stretchr_testify//require",
"@com_github_tikv_pd_client//http",
],
)
86 changes: 39 additions & 47 deletions pkg/ddl/placement/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,21 +29,13 @@ import (
"github.com/pingcap/tidb/pkg/parser/model"
"github.com/pingcap/tidb/pkg/tablecodec"
"github.com/pingcap/tidb/pkg/util/codec"
pd "github.com/tikv/pd/client/http"
"gopkg.in/yaml.v2"
)

// Refer to https://github.com/tikv/pd/issues/2701 .
// IMO, it is indeed not bad to have a copy of definition.
// After all, placement rules are communicated using an HTTP API. Loose
// coupling is a good feature.

// Bundle is a group of all rules and configurations. It is used to support rule cache.
type Bundle struct {
ID string `json:"group_id"`
Index int `json:"group_index"`
Override bool `json:"group_override"`
Rules []*Rule `json:"rules"`
}
// Alias `pd.GroupBundle` is to wrap more methods.
type Bundle pd.GroupBundle

// NewBundle will create a bundle with the provided ID.
// Note that you should never pass negative id.
Expand All @@ -70,15 +62,15 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
explicitFollowerCount := options.Followers
explicitLearnerCount := options.Learners

rules := []*Rule{}
rules := []*pd.Rule{}
commonConstraints, err := NewConstraintsFromYaml([]byte(constraints))
if err != nil {
// If it's not in array format, attempt to parse it as a dictionary for more detailed definitions.
// The dictionary format specifies details for each replica. Constraints are used to define normal
// replicas that should act as voters.
// For example: CONSTRAINTS='{ "+region=us-east-1":2, "+region=us-east-2": 2, "+region=us-west-1": 1}'
normalReplicasRules, err := NewRuleBuilder().
SetRole(Voter).
SetRole(pd.Voter).
SetConstraintStr(constraints).
BuildRulesWithDictConstraintsOnly()
if err != nil {
Expand All @@ -92,7 +84,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
return nil, fmt.Errorf("%w: 'LeaderConstraints' should be [constraint1, ...] or any yaml compatible array representation", err)
}
for _, cnst := range commonConstraints {
if err := leaderConstraints.Add(cnst); err != nil {
if err := AddConstraint(&leaderConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: LeaderConstraints conflicts with Constraints", err)
}
}
Expand All @@ -101,7 +93,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
followerReplicas = explicitFollowerCount
}
if !needCreateDefault {
if len(leaderConstraints) == 0 {
if len(leaderConst) == 0 {
leaderReplicas = 0
}
if len(followerConstraints) == 0 {
Expand All @@ -115,15 +107,15 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
// create leader rule.
// if no constraints, we need create default leader rule.
if leaderReplicas > 0 {
leaderRule := NewRule(Leader, leaderReplicas, leaderConstraints)
leaderRule := NewRule(pd.Leader, leaderReplicas, leaderConstraints)
rules = append(rules, leaderRule)
}

// create follower rules.
// if no constraints, we need create default follower rules.
if followerReplicas > 0 {
builder := NewRuleBuilder().
SetRole(Voter).
SetRole(pd.Voter).
SetReplicasNum(followerReplicas).
SetSkipCheckReplicasConsistent(needCreateDefault && (explicitFollowerCount == 0)).
SetConstraintStr(followerConstraints)
Expand All @@ -133,7 +125,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
}
for _, followerRule := range followerRules {
for _, cnst := range commonConstraints {
if err := followerRule.Constraints.Add(cnst); err != nil {
if err := AddConstraint(&followerRule.LabelConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: FollowerConstraints conflicts with Constraints", err)
}
}
Expand All @@ -143,7 +135,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,

// create learner rules.
builder := NewRuleBuilder().
SetRole(Learner).
SetRole(pd.Learner).
SetReplicasNum(explicitLearnerCount).
SetConstraintStr(learnerConstraints)
learnerRules, err := builder.BuildRules()
Expand All @@ -152,7 +144,7 @@ func NewBundleFromConstraintsOptions(options *model.PlacementSettings) (*Bundle,
}
for _, rule := range learnerRules {
for _, cnst := range commonConstraints {
if err := rule.Constraints.Add(cnst); err != nil {
if err := AddConstraint(&rule.LabelConstraints, cnst); err != nil {
return nil, fmt.Errorf("%w: LearnerConstraints conflicts with Constraints", err)
}
}
Expand Down Expand Up @@ -194,7 +186,7 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
}
schedule := options.Schedule

var rules []*Rule
var rules []*pd.Rule

locationLabels, err := newLocationLabelsFromSurvivalPreferences(options.SurvivalPreferences)
if err != nil {
Expand All @@ -203,7 +195,7 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error

// in case empty primaryRegion and regions, just return an empty bundle
if primaryRegion == "" && len(regions) == 0 {
rules = append(rules, NewRule(Voter, followers+1, NewConstraintsDirect()))
rules = append(rules, NewRule(pd.Voter, followers+1, NewConstraintsDirect()))
for _, rule := range rules {
rule.LocationLabels = locationLabels
}
Expand All @@ -230,17 +222,17 @@ func NewBundleFromSugarOptions(options *model.PlacementSettings) (*Bundle, error
return nil, fmt.Errorf("%w: unsupported schedule %s", ErrInvalidPlacementOptions, schedule)
}

rules = append(rules, NewRule(Leader, 1, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))
rules = append(rules, NewRule(pd.Leader, 1, NewConstraintsDirect(NewConstraintDirect("region", pd.In, primaryRegion))))
if primaryCount > 1 {
rules = append(rules, NewRule(Voter, primaryCount-1, NewConstraintsDirect(NewConstraintDirect("region", In, primaryRegion))))
rules = append(rules, NewRule(pd.Voter, primaryCount-1, NewConstraintsDirect(NewConstraintDirect("region", pd.In, primaryRegion))))
}
if cnt := followers + 1 - primaryCount; cnt > 0 {
// delete primary from regions
regions = regions[:primaryIndex+copy(regions[primaryIndex:], regions[primaryIndex+1:])]
if len(regions) > 0 {
rules = append(rules, NewRule(Voter, cnt, NewConstraintsDirect(NewConstraintDirect("region", In, regions...))))
rules = append(rules, NewRule(pd.Voter, cnt, NewConstraintsDirect(NewConstraintDirect("region", pd.In, regions...))))
} else {
rules = append(rules, NewRule(Voter, cnt, NewConstraintsDirect()))
rules = append(rules, NewRule(pd.Voter, cnt, NewConstraintsDirect()))
}
}

Expand Down Expand Up @@ -332,8 +324,8 @@ func (b *Bundle) Tidy() error {
// refer to tidb#22065.
// add -engine=tiflash to every rule to avoid schedules to tiflash instances.
// placement rules in SQL is not compatible with `set tiflash replica` yet
err := rule.Constraints.Add(Constraint{
Op: NotIn,
err := AddConstraint(&rule.LabelConstraints, pd.LabelConstraint{
Op: pd.NotIn,
Key: EngineLabelKey,
Values: []string{EngineLabelTiFlash},
})
Expand All @@ -348,10 +340,10 @@ func (b *Bundle) Tidy() error {
groups := make(map[string]*constraintsGroup)
finalRules := tempRules[:0]
for _, rule := range tempRules {
key := rule.Constraints.FingerPrint()
key := ConstraintsFingerPrint(&rule.LabelConstraints)
existing, ok := groups[key]
if !ok {
groups[key] = &constraintsGroup{rules: []*Rule{rule}}
groups[key] = &constraintsGroup{rules: []*pd.Rule{rule}}
continue
}
existing.rules = append(existing.rules, rule)
Expand All @@ -375,7 +367,7 @@ func (b *Bundle) Tidy() error {

// constraintsGroup is a group of rules with the same constraints.
type constraintsGroup struct {
rules []*Rule
rules []*pd.Rule
// canBecameLeader means the group has leader/voter role,
// it's valid if it has leader.
canBecameLeader bool
Expand Down Expand Up @@ -411,16 +403,16 @@ func transformableLeaderConstraint(groups map[string]*constraintsGroup) error {
// MergeRulesByRole merges the rules with the same role.
func (c *constraintsGroup) MergeRulesByRole() {
// Create a map to store rules by role
rulesByRole := make(map[PeerRoleType][]*Rule)
rulesByRole := make(map[pd.PeerRoleType][]*pd.Rule)

// Iterate through each rule
for _, rule := range c.rules {
// Add the rule to the map based on its role
rulesByRole[rule.Role] = append(rulesByRole[rule.Role], rule)
if rule.Role == Leader || rule.Role == Voter {
if rule.Role == pd.Leader || rule.Role == pd.Voter {
c.canBecameLeader = true
}
if rule.Role == Leader {
if rule.Role == pd.Leader {
c.isLeaderGroup = true
}
}
Expand Down Expand Up @@ -449,11 +441,11 @@ func (c *constraintsGroup) MergeTransformableRoles() {
if len(c.rules) == 0 || len(c.rules) == 1 {
return
}
var mergedRule *Rule
newRules := make([]*Rule, 0, len(c.rules))
var mergedRule *pd.Rule
newRules := make([]*pd.Rule, 0, len(c.rules))
for _, rule := range c.rules {
// Learner is not transformable, it should be promote by PD.
if rule.Role == Learner {
if rule.Role == pd.Learner {
newRules = append(newRules, rule)
continue
}
Expand All @@ -467,7 +459,7 @@ func (c *constraintsGroup) MergeTransformableRoles() {
}
}
if mergedRule != nil {
mergedRule.Role = Voter
mergedRule.Role = pd.Voter
newRules = append(newRules, mergedRule)
}
c.rules = newRules
Expand All @@ -491,7 +483,7 @@ func (b *Bundle) RebuildForRange(rangeName string, policyName string) *Bundle {
}

b.Override = true
newRules := make([]*Rule, 0, len(rule))
newRules := make([]*pd.Rule, 0, len(rule))
for i, r := range b.Rules {
cp := r.Clone()
cp.ID = fmt.Sprintf("%s_rule_%d", strings.ToLower(policyName), i)
Expand All @@ -508,7 +500,7 @@ func (b *Bundle) RebuildForRange(rangeName string, policyName string) *Bundle {
// Reset resets the bundle ID and keyrange of all rules.
func (b *Bundle) Reset(ruleIndex int, newIDs []int64) *Bundle {
// eliminate the redundant rules.
var basicRules []*Rule
var basicRules []*pd.Rule
if len(b.Rules) != 0 {
// Make priority for rules with RuleIndexTable cause of duplication rules existence with RuleIndexPartition.
// If RuleIndexTable doesn't exist, bundle itself is a independent series of rules for a partition.
Expand All @@ -526,7 +518,7 @@ func (b *Bundle) Reset(ruleIndex int, newIDs []int64) *Bundle {
b.ID = GroupID(newIDs[0])
b.Index = ruleIndex
b.Override = true
newRules := make([]*Rule, 0, len(basicRules)*len(newIDs))
newRules := make([]*pd.Rule, 0, len(basicRules)*len(newIDs))
for i, newID := range newIDs {
// rule.id should be distinguished with each other, otherwise it will be de-duplicated in pd http api.
var ruleID string
Expand Down Expand Up @@ -566,7 +558,7 @@ func (b *Bundle) Clone() *Bundle {
newBundle := &Bundle{}
*newBundle = *b
if len(b.Rules) > 0 {
newBundle.Rules = make([]*Rule, 0, len(b.Rules))
newBundle.Rules = make([]*pd.Rule, 0, len(b.Rules))
for i := range b.Rules {
newBundle.Rules = append(newBundle.Rules, b.Rules[i].Clone())
}
Expand Down Expand Up @@ -595,10 +587,10 @@ func (b *Bundle) ObjectID() (int64, error) {
return id, nil
}

func isValidLeaderRule(rule *Rule, dcLabelKey string) bool {
if rule.Role == Leader && rule.Count == 1 {
for _, con := range rule.Constraints {
if con.Op == In && con.Key == dcLabelKey && len(con.Values) == 1 {
func isValidLeaderRule(rule *pd.Rule, dcLabelKey string) bool {
if rule.Role == pd.Leader && rule.Count == 1 {
for _, con := range rule.LabelConstraints {
if con.Op == pd.In && con.Key == dcLabelKey && len(con.Values) == 1 {
return true
}
}
Expand All @@ -610,7 +602,7 @@ func isValidLeaderRule(rule *Rule, dcLabelKey string) bool {
func (b *Bundle) GetLeaderDC(dcLabelKey string) (string, bool) {
for _, rule := range b.Rules {
if isValidLeaderRule(rule, dcLabelKey) {
return rule.Constraints[0].Values[0], true
return rule.LabelConstraints[0].Values[0], true
}
}
return "", false
Expand Down
Loading

0 comments on commit c82e19c

Please sign in to comment.