Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client/http: implement the marshaler interfaces for Rule/RuleOp #7462

Merged
merged 6 commits into from
Nov 30, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 13 additions & 1 deletion client/http/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ const (
Regions = "/pd/api/v1/regions"
regionsByKey = "/pd/api/v1/regions/key"
RegionsByStoreIDPrefix = "/pd/api/v1/regions/store"
regionsReplicated = "/pd/api/v1/regions/replicated"
EmptyRegions = "/pd/api/v1/regions/check/empty-region"
AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule"
AccelerateScheduleInBatch = "/pd/api/v1/regions/accelerate-schedule/batch"
Expand Down Expand Up @@ -95,9 +96,20 @@ func RegionsByStoreID(storeID uint64) string {
return fmt.Sprintf("%s/%d", RegionsByStoreIDPrefix, storeID)
}

// RegionsReplicatedByKeyRange returns the path of PD HTTP API to get replicated regions with given start key and end key.
func RegionsReplicatedByKeyRange(keyRange *KeyRange) string {
startKeyStr, endKeyStr := keyRange.EscapeAsHexStr()
return fmt.Sprintf("%s?startKey=%s&endKey=%s",
regionsReplicated, startKeyStr, endKeyStr)
}

// RegionStatsByKeyRange returns the path of PD HTTP API to get region stats by start key and end key.
func RegionStatsByKeyRange(keyRange *KeyRange) string {
func RegionStatsByKeyRange(keyRange *KeyRange, onlyCount bool) string {
startKeyStr, endKeyStr := keyRange.EscapeAsUTF8Str()
if onlyCount {
return fmt.Sprintf("%s?start_key=%s&end_key=%s&count",
StatsRegion, startKeyStr, endKeyStr)
}
return fmt.Sprintf("%s?start_key=%s&end_key=%s",
StatsRegion, startKeyStr, endKeyStr)
}
Expand Down
21 changes: 18 additions & 3 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,10 +47,11 @@ type Client interface {
GetRegions(context.Context) (*RegionsInfo, error)
GetRegionsByKeyRange(context.Context, *KeyRange, int) (*RegionsInfo, error)
GetRegionsByStoreID(context.Context, uint64) (*RegionsInfo, error)
GetRegionsReplicatedStateByKeyRange(context.Context, *KeyRange) (string, error)
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error)
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error)
GetHistoryHotRegions(context.Context, *HistoryHotRegionsRequest) (*HistoryHotRegions, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange) (*RegionStats, error)
GetRegionStatusByKeyRange(context.Context, *KeyRange, bool) (*RegionStats, error)
GetStores(context.Context) (*StoresInfo, error)
/* Config-related interfaces */
GetScheduleConfig(context.Context) (map[string]interface{}, error)
Expand Down Expand Up @@ -356,6 +357,19 @@ func (c *client) GetRegionsByStoreID(ctx context.Context, storeID uint64) (*Regi
return &regions, nil
}

// GetRegionsReplicatedStateByKeyRange gets the regions replicated state info by key range.
// The keys in the key range should be encoded in the hex bytes format (without encoding to the UTF-8 bytes).
func (c *client) GetRegionsReplicatedStateByKeyRange(ctx context.Context, keyRange *KeyRange) (string, error) {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
var state string
err := c.requestWithRetry(ctx,
"GetRegionsReplicatedStateByKeyRange", RegionsReplicatedByKeyRange(keyRange),
http.MethodGet, http.NoBody, &state)
if err != nil {
return "", err
}
return state, nil
}

// GetHotReadRegions gets the hot read region statistics info.
func (c *client) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) {
var hotReadRegions StoreHotPeersInfos
Expand Down Expand Up @@ -398,11 +412,12 @@ func (c *client) GetHistoryHotRegions(ctx context.Context, req *HistoryHotRegion
}

// GetRegionStatusByKeyRange gets the region status by key range.
// If the `onlyCount` flag is true, the result will only include the count of regions.
// The keys in the key range should be encoded in the UTF-8 bytes format.
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange) (*RegionStats, error) {
func (c *client) GetRegionStatusByKeyRange(ctx context.Context, keyRange *KeyRange, onlyCount bool) (*RegionStats, error) {
JmPotato marked this conversation as resolved.
Show resolved Hide resolved
var regionStats RegionStats
err := c.requestWithRetry(ctx,
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange),
"GetRegionStatusByKeyRange", RegionStatsByKeyRange(keyRange, onlyCount),
http.MethodGet, http.NoBody, &regionStats,
)
if err != nil {
Expand Down
121 changes: 121 additions & 0 deletions client/http/codec.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"encoding/hex"

"github.com/pingcap/errors"
)

const (
encGroupSize = 8
encMarker = byte(0xFF)
encPad = byte(0x0)
)

var pads = make([]byte, encGroupSize)

// encodeBytes guarantees the encoded value is in ascending order for comparison,
// encoding with the following rule:
//
// [group1][marker1]...[groupN][markerN]
// group is 8 bytes slice which is padding with 0.
// marker is `0xFF - padding 0 count`
//
// For example:
//
// [] -> [0, 0, 0, 0, 0, 0, 0, 0, 247]
// [1, 2, 3] -> [1, 2, 3, 0, 0, 0, 0, 0, 250]
// [1, 2, 3, 0] -> [1, 2, 3, 0, 0, 0, 0, 0, 251]
// [1, 2, 3, 4, 5, 6, 7, 8] -> [1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247]
//
// Refer: https://github.com/facebook/mysql-5.6/wiki/MyRocks-record-format#memcomparable-format
func encodeBytes(data []byte) []byte {
// Allocate more space to avoid unnecessary slice growing.
// Assume that the byte slice size is about `(len(data) / encGroupSize + 1) * (encGroupSize + 1)` bytes,
// that is `(len(data) / 8 + 1) * 9` in our implement.
dLen := len(data)
result := make([]byte, 0, (dLen/encGroupSize+1)*(encGroupSize+1))
for idx := 0; idx <= dLen; idx += encGroupSize {
remain := dLen - idx
padCount := 0
if remain >= encGroupSize {
result = append(result, data[idx:idx+encGroupSize]...)
} else {
padCount = encGroupSize - remain
result = append(result, data[idx:]...)
result = append(result, pads[:padCount]...)
}

marker := encMarker - byte(padCount)
result = append(result, marker)
}
return result
}

func decodeBytes(b []byte) ([]byte, error) {
buf := make([]byte, 0, len(b))
for {
if len(b) < encGroupSize+1 {
return nil, errors.New("insufficient bytes to decode value")
}

groupBytes := b[:encGroupSize+1]

group := groupBytes[:encGroupSize]
marker := groupBytes[encGroupSize]

padCount := encMarker - marker
if padCount > encGroupSize {
return nil, errors.Errorf("invalid marker byte, group bytes %q", groupBytes)
}

realGroupSize := encGroupSize - padCount
buf = append(buf, group[:realGroupSize]...)
b = b[encGroupSize+1:]

if padCount != 0 {
// Check validity of padding bytes.
for _, v := range group[realGroupSize:] {
if v != encPad {
return nil, errors.Errorf("invalid padding byte, group bytes %q", groupBytes)
}
}
break
}
}
return buf, nil
}

// rawKeyToKeyHexStr converts a raw key to a hex string after encoding.
func rawKeyToKeyHexStr(key []byte) string {
if len(key) == 0 {
return ""
}
return hex.EncodeToString(encodeBytes(key))
}

// keyHexStrToRawKey converts a hex string to a raw key after decoding.
func keyHexStrToRawKey(hexKey string) ([]byte, error) {
if len(hexKey) == 0 {
return make([]byte, 0), nil
}
key, err := hex.DecodeString(hexKey)
if err != nil {
return nil, err
}
return decodeBytes(key)
}
64 changes: 64 additions & 0 deletions client/http/codec_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,64 @@
// Copyright 2023 TiKV Project Authors.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package http

import (
"testing"

"github.com/stretchr/testify/require"
)

func TestBytesCodec(t *testing.T) {
inputs := []struct {
enc []byte
dec []byte
}{
{[]byte{}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 248}},
{[]byte{1, 2, 3}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 250}},
{[]byte{1, 2, 3, 0}, []byte{1, 2, 3, 0, 0, 0, 0, 0, 251}},
{[]byte{1, 2, 3, 4, 5, 6, 7}, []byte{1, 2, 3, 4, 5, 6, 7, 0, 254}},
{[]byte{0, 0, 0, 0, 0, 0, 0, 0}, []byte{0, 0, 0, 0, 0, 0, 0, 0, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{1, 2, 3, 4, 5, 6, 7, 8}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 0, 0, 0, 0, 0, 0, 0, 0, 247}},
{[]byte{1, 2, 3, 4, 5, 6, 7, 8, 9}, []byte{1, 2, 3, 4, 5, 6, 7, 8, 255, 9, 0, 0, 0, 0, 0, 0, 0, 248}},
}

for _, input := range inputs {
b := encodeBytes(input.enc)
require.Equal(t, input.dec, b)

d, err := decodeBytes(b)
require.NoError(t, err)
require.Equal(t, input.enc, d)
}

// Test error decode.
errInputs := [][]byte{
{1, 2, 3, 4},
{0, 0, 0, 0, 0, 0, 0, 247},
{0, 0, 0, 0, 0, 0, 0, 0, 246},
{0, 0, 0, 0, 0, 0, 0, 1, 247},
{1, 2, 3, 4, 5, 6, 7, 8, 0},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 255},
{1, 2, 3, 4, 5, 6, 7, 8, 255, 1, 2, 3, 4, 5, 6, 7, 8, 0},
}

for _, input := range errInputs {
_, err := decodeBytes(input)
require.Error(t, err)
}
}
Loading
Loading