Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: implement more rule and batch related interfaces #7430

Merged
merged 5 commits into from
Nov 27, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 22 additions & 13 deletions client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,19 +23,20 @@ import (
// The following constants are the paths of PD HTTP APIs.
const (
// Metadata
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
HotRead = "/pd/api/v1/hotspot/regions/read"
HotWrite = "/pd/api/v1/hotspot/regions/write"
HotHistory = "/pd/api/v1/hotspot/regions/history"
RegionByIDPrefix = "/pd/api/v1/region/id"
regionByKey = "/pd/api/v1/region/key"
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
store = "/pd/api/v1/store"
Stores = "/pd/api/v1/stores"
StatsRegion = "/pd/api/v1/stats/region"
// Config
Config = "/pd/api/v1/config"
ClusterVersion = "/pd/api/v1/config/cluster-version"
Expand All @@ -44,8 +45,11 @@ const (
// Rule
PlacementRule = "/pd/api/v1/config/rule"
PlacementRules = "/pd/api/v1/config/rules"
PlacementRulesInBatch = "/pd/api/v1/config/rules/batch"
placementRulesByGroup = "/pd/api/v1/config/rules/group"
PlacementRuleBundle = "/pd/api/v1/config/placement-rule"
placementRuleGroup = "/pd/api/v1/config/rule_group"
placementRuleGroups = "/pd/api/v1/config/rule_groups"
RegionLabelRule = "/pd/api/v1/config/region-label/rule"
RegionLabelRules = "/pd/api/v1/config/region-label/rules"
RegionLabelRulesByIDs = "/pd/api/v1/config/region-label/rules/ids"
Expand Down Expand Up @@ -136,6 +140,11 @@ func PlacementRuleBundleWithPartialParameter(partial bool) string {
return fmt.Sprintf("%s?partial=%t", PlacementRuleBundle, partial)
}

// PlacementRuleGroupByID returns the path of PD HTTP API to get placement rule group by ID.
func PlacementRuleGroupByID(id string) string {
return fmt.Sprintf("%s/%s", placementRuleGroup, id)
}

// SchedulerByName returns the scheduler API with the given scheduler name.
func SchedulerByName(name string) string {
return fmt.Sprintf("%s/%s", Schedulers, name)
Expand Down
88 changes: 86 additions & 2 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"bytes"
"context"
"crypto/tls"
"encoding/hex"
"encoding/json"
"fmt"
"io"
Expand Down Expand Up @@ -57,14 +58,23 @@ type Client interface {
GetPlacementRuleBundleByGroup(context.Context, string) (*GroupBundle, error)
GetPlacementRulesByGroup(context.Context, string) ([]*Rule, error)
SetPlacementRule(context.Context, *Rule) error
SetPlacementRuleInBatch(context.Context, []*RuleOp) error
SetPlacementRuleBundles(context.Context, []*GroupBundle, bool) error
DeletePlacementRule(context.Context, string, string) error
GetAllPlacementRuleGroups(context.Context) ([]*RuleGroup, error)
GetPlacementRuleGroupByID(context.Context, string) (*RuleGroup, error)
SetPlacementRuleGroup(context.Context, *RuleGroup) error
DeletePlacementRuleGroupByID(context.Context, string) error
GetAllRegionLabelRules(context.Context) ([]*LabelRule, error)
GetRegionLabelRulesByIDs(context.Context, []string) ([]*LabelRule, error)
SetRegionLabelRule(context.Context, *LabelRule) error
PatchRegionLabelRules(context.Context, *LabelRulePatch) error
/* Scheduling-related interfaces */
AccelerateSchedule(context.Context, []byte, []byte) error
AccelerateScheduleInBatch(context.Context, []struct {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
}) error
/* Other interfaces */
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

Expand Down Expand Up @@ -427,6 +437,17 @@ func (c *client) SetPlacementRule(ctx context.Context, rule *Rule) error {
http.MethodPost, bytes.NewBuffer(ruleJSON), nil)
}

// SetPlacementRuleInBatch sets the placement rules in batch.
func (c *client) SetPlacementRuleInBatch(ctx context.Context, ruleOps []*RuleOp) error {
ruleOpsJSON, err := json.Marshal(ruleOps)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleInBatch", PlacementRulesInBatch,
http.MethodPost, bytes.NewBuffer(ruleOpsJSON), nil)
}

// SetPlacementRuleBundles sets the placement rule bundles.
// If `partial` is false, all old configurations will be over-written and dropped.
func (c *client) SetPlacementRuleBundles(ctx context.Context, bundles []*GroupBundle, partial bool) error {
Expand All @@ -446,6 +467,48 @@ func (c *client) DeletePlacementRule(ctx context.Context, group, id string) erro
http.MethodDelete, http.NoBody, nil)
}

// GetAllPlacementRuleGroups gets all placement rule groups.
func (c *client) GetAllPlacementRuleGroups(ctx context.Context) ([]*RuleGroup, error) {
var ruleGroups []*RuleGroup
err := c.requestWithRetry(ctx,
"GetAllPlacementRuleGroups", placementRuleGroups,
http.MethodGet, http.NoBody, &ruleGroups)
if err != nil {
return nil, err
}
return ruleGroups, nil
}

// GetPlacementRuleGroupByID gets the placement rule group by ID.
func (c *client) GetPlacementRuleGroupByID(ctx context.Context, id string) (*RuleGroup, error) {
var ruleGroup RuleGroup
err := c.requestWithRetry(ctx,
"GetPlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodGet, http.NoBody, &ruleGroup)
if err != nil {
return nil, err
}
return &ruleGroup, nil
}

// SetPlacementRuleGroup sets the placement rule group.
func (c *client) SetPlacementRuleGroup(ctx context.Context, ruleGroup *RuleGroup) error {
ruleGroupJSON, err := json.Marshal(ruleGroup)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"SetPlacementRuleGroup", placementRuleGroup,
http.MethodPost, bytes.NewBuffer(ruleGroupJSON), nil)
}

// DeletePlacementRuleGroupByID deletes the placement rule group by ID.
func (c *client) DeletePlacementRuleGroupByID(ctx context.Context, id string) error {
return c.requestWithRetry(ctx,
"DeletePlacementRuleGroupByID", PlacementRuleGroupByID(id),
http.MethodDelete, http.NoBody, nil)
}

// GetAllRegionLabelRules gets all region label rules.
func (c *client) GetAllRegionLabelRules(ctx context.Context) ([]*LabelRule, error) {
var labelRules []*LabelRule
Expand Down Expand Up @@ -499,8 +562,8 @@ func (c *client) PatchRegionLabelRules(ctx context.Context, labelRulePatch *Labe
// AccelerateSchedule accelerates the scheduling of the regions within the given key range.
func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte) error {
input := map[string]string{
"start_key": url.QueryEscape(string(startKey)),
"end_key": url.QueryEscape(string(endKey)),
"start_key": url.QueryEscape(hex.EncodeToString(startKey)),
"end_key": url.QueryEscape(hex.EncodeToString(endKey)),
}
inputJSON, err := json.Marshal(input)
if err != nil {
Expand All @@ -511,6 +574,27 @@ func (c *client) AccelerateSchedule(ctx context.Context, startKey, endKey []byte
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// AccelerateScheduleInBatch accelerates the scheduling of the regions within the given key ranges in batch.
func (c *client) AccelerateScheduleInBatch(ctx context.Context, ranges []struct {
StartKey []byte `json:"start_key"`
EndKey []byte `json:"end_key"`
}) error {
input := make([]map[string]string, 0, len(ranges))
for _, r := range ranges {
input = append(input, map[string]string{
"start_key": url.QueryEscape(hex.EncodeToString(r.StartKey)),
"end_key": url.QueryEscape(hex.EncodeToString(r.EndKey)),
})
}
inputJSON, err := json.Marshal(input)
if err != nil {
return errors.Trace(err)
}
return c.requestWithRetry(ctx,
"AccelerateScheduleInBatch", AccelerateScheduleInBatch,
http.MethodPost, bytes.NewBuffer(inputJSON), nil)
}

// GetMinResolvedTSByStoresIDs get min-resolved-ts by stores IDs.
func (c *client) GetMinResolvedTSByStoresIDs(ctx context.Context, storeIDs []uint64) (uint64, map[uint64]uint64, error) {
uri := MinResolvedTSPrefix
Expand Down
55 changes: 54 additions & 1 deletion client/http/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,10 @@

package http

import "time"
import (
"encoding/json"
"time"
)

// NOTICE: the structures below are copied from the PD API definitions.
// Please make sure the consistency if any change happens to the PD API.
Expand Down Expand Up @@ -247,6 +250,56 @@ type Rule struct {
CreateTimestamp uint64 `json:"create_timestamp,omitempty"` // only set at runtime, recorded rule create timestamp
}

// String returns the string representation of this rule.
func (r *Rule) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// Clone returns a copy of Rule.
func (r *Rule) Clone() *Rule {
var clone Rule
json.Unmarshal([]byte(r.String()), &clone)
clone.StartKey = append(r.StartKey[:0:0], r.StartKey...)
clone.EndKey = append(r.EndKey[:0:0], r.EndKey...)
return &clone
}

// RuleOpType indicates the operation type
type RuleOpType string

const (
// RuleOpAdd a placement rule, only need to specify the field *Rule
RuleOpAdd RuleOpType = "add"
// RuleOpDel a placement rule, only need to specify the field `GroupID`, `ID`, `MatchID`
RuleOpDel RuleOpType = "del"
)

// RuleOp is for batching placement rule actions.
// The action type is distinguished by the field `Action`.
type RuleOp struct {
*Rule // information of the placement rule to add/delete the operation type
Action RuleOpType `json:"action"`
DeleteByIDPrefix bool `json:"delete_by_id_prefix"` // if action == delete, delete by the prefix of id
}

func (r RuleOp) String() string {
b, _ := json.Marshal(r)
return string(b)
}

// RuleGroup defines properties of a rule group.
type RuleGroup struct {
ID string `json:"id,omitempty"`
Index int `json:"index,omitempty"`
Override bool `json:"override,omitempty"`
}

func (g *RuleGroup) String() string {
b, _ := json.Marshal(g)
return string(b)
}

// GroupBundle represents a rule group and all rules belong to the group.
type GroupBundle struct {
ID string `json:"group_id"`
Expand Down
5 changes: 5 additions & 0 deletions pkg/schedule/checker/checker_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,11 @@ func (c *Controller) ClearSuspectKeyRanges() {
c.suspectKeyRanges.Clear()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (c *Controller) ClearSuspectRegions() {
c.suspectRegions.Clear()
}

// IsPendingRegion returns true if the given region is in the pending list.
func (c *Controller) IsPendingRegion(regionID uint64) bool {
_, exist := c.ruleChecker.pendingList.Get(regionID)
Expand Down
5 changes: 5 additions & 0 deletions pkg/utils/tsoutil/tsoutil.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,11 @@ const (
logicalBits = (1 << physicalShiftBits) - 1
)

// TimeToTS converts a `time.Time` to an `uint64` TS.
func TimeToTS(t time.Time) uint64 {
return ComposeTS(t.UnixNano()/int64(time.Millisecond), 0)
}

// ParseTS parses the ts to (physical,logical).
func ParseTS(ts uint64) (time.Time, uint64) {
physical, logical := ParseTSUint64(ts)
Expand Down
6 changes: 4 additions & 2 deletions server/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -2241,7 +2241,9 @@ func (c *RaftCluster) SetMinResolvedTS(storeID, minResolvedTS uint64) error {
return nil
}

func (c *RaftCluster) checkAndUpdateMinResolvedTS() (uint64, bool) {
// CheckAndUpdateMinResolvedTS checks and updates the min resolved ts of the cluster.
// This is exported for testing purpose.
func (c *RaftCluster) CheckAndUpdateMinResolvedTS() (uint64, bool) {
c.Lock()
defer c.Unlock()

Expand Down Expand Up @@ -2284,7 +2286,7 @@ func (c *RaftCluster) runMinResolvedTSJob() {
case <-ticker.C:
interval = c.opt.GetMinResolvedTSPersistenceInterval()
if interval != 0 {
if current, needPersist := c.checkAndUpdateMinResolvedTS(); needPersist {
if current, needPersist := c.CheckAndUpdateMinResolvedTS(); needPersist {
c.storage.SaveMinResolvedTS(current)
}
} else {
Expand Down
7 changes: 7 additions & 0 deletions server/cluster/scheduling_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -438,6 +438,13 @@ func (sc *schedulingController) ClearSuspectKeyRanges() {
sc.coordinator.GetCheckerController().ClearSuspectKeyRanges()
}

// ClearSuspectRegions clears the suspect regions, only for unit test
func (sc *schedulingController) ClearSuspectRegions() {
sc.mu.RLock()
defer sc.mu.RUnlock()
sc.coordinator.GetCheckerController().ClearSuspectRegions()
}

// AddSuspectKeyRange adds the key range with the its ruleID as the key
// The instance of each keyRange is like following format:
// [2][]byte: start key/end key
Expand Down
Loading
Loading