From 0f35b6f8133a4502ca6a93dfac358bab9a9d39a8 Mon Sep 17 00:00:00 2001 From: JmPotato Date: Wed, 15 Nov 2023 12:47:16 +0800 Subject: [PATCH] *: integrate PD HTTP client to the store helper (#48276) ref pingcap/tidb#35319 --- DEPS.bzl | 24 +- br/pkg/restore/BUILD.bazel | 2 +- br/pkg/restore/client.go | 4 +- dumpling/export/BUILD.bazel | 1 + dumpling/export/sql.go | 8 +- go.mod | 4 +- go.sum | 8 +- pkg/ddl/BUILD.bazel | 1 + pkg/ddl/ddl_tiflash_api.go | 10 +- pkg/domain/infosync/BUILD.bazel | 1 + pkg/domain/infosync/info.go | 22 +- pkg/domain/infosync/tiflash_manager.go | 47 ++- pkg/executor/BUILD.bazel | 2 + pkg/executor/infoschema_cluster_table_test.go | 26 +- pkg/executor/infoschema_reader.go | 66 ++-- pkg/executor/memtable_reader.go | 17 +- pkg/executor/show_placement.go | 4 +- pkg/executor/show_placement_labels_test.go | 6 +- pkg/executor/split.go | 2 +- pkg/executor/tikv_regions_peers_table_test.go | 26 +- .../test/clustertablestest/BUILD.bazel | 2 +- .../clustertablestest/cluster_tables_test.go | 10 +- pkg/server/handler/tests/BUILD.bazel | 1 - pkg/server/handler/tests/http_handler_test.go | 15 +- .../handler/tikvhandler/tikv_handler.go | 5 +- .../testserverclient/server_client.go | 7 +- pkg/store/driver/BUILD.bazel | 2 + pkg/store/driver/tikv_driver.go | 5 +- pkg/store/helper/BUILD.bazel | 4 +- pkg/store/helper/helper.go | 334 +++--------------- pkg/store/helper/helper_test.go | 105 +++--- pkg/store/mockstore/mockstore.go | 8 + pkg/store/mockstore/tikv.go | 5 +- pkg/store/mockstore/unistore.go | 5 +- pkg/util/pdapi/const.go | 3 - 35 files changed, 295 insertions(+), 497 deletions(-) diff --git a/DEPS.bzl b/DEPS.bzl index 792af269c2331..f47e6ddb17c2d 100644 --- a/DEPS.bzl +++ b/DEPS.bzl @@ -7119,26 +7119,26 @@ def go_deps(): name = "com_github_tikv_client_go_v2", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/client-go/v2", - sha256 = "b689432454a504f8ba1ad166ebf901584155edc64eed4119a30c07ab52e3af8f", - strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231030120815-1362f1e87566", + sha256 = "285edca3320cc8847aceffb5d5471fe7483c49f66795622f71ed819c72635d00", + strip_prefix = "github.com/tikv/client-go/v2@v2.0.8-0.20231114060955-8fc8a528217e", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip", - "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231030120815-1362f1e87566.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", + "http://ats.apps.svc/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/client-go/v2/com_github_tikv_client_go_v2-v2.0.8-0.20231114060955-8fc8a528217e.zip", ], ) go_repository( name = "com_github_tikv_pd_client", build_file_proto_mode = "disable_global", importpath = "github.com/tikv/pd/client", - sha256 = "cadd6f9cf411690e66fd2e5bad6ef837dd8522d98ddaf79a6a2f9361cbc558c1", - strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231113092444-be31c08186fa", + sha256 = "cb510944ce56555f005fff2d891af3fefa667f37955779b89c35fd40f51deace", + strip_prefix = "github.com/tikv/pd/client@v0.0.0-20231114041114-86831ce71865", urls = [ - "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip", - "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip", - "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip", - "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231113092444-be31c08186fa.zip", + "http://bazel-cache.pingcap.net:8080/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", + "http://ats.apps.svc/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", + "https://cache.hawkingrei.com/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", + "https://storage.googleapis.com/pingcapmirror/gomod/github.com/tikv/pd/client/com_github_tikv_pd_client-v0.0.0-20231114041114-86831ce71865.zip", ], ) go_repository( diff --git a/br/pkg/restore/BUILD.bazel b/br/pkg/restore/BUILD.bazel index f5522e40fe73a..02b2fb047063d 100644 --- a/br/pkg/restore/BUILD.bazel +++ b/br/pkg/restore/BUILD.bazel @@ -59,7 +59,6 @@ go_library( "//pkg/parser/mysql", "//pkg/sessionctx/variable", "//pkg/statistics/handle", - "//pkg/store/helper", "//pkg/store/pdtypes", "//pkg/tablecodec", "//pkg/util", @@ -92,6 +91,7 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/rangetask", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//backoff", "@org_golang_google_grpc//codes", diff --git a/br/pkg/restore/client.go b/br/pkg/restore/client.go index bff70be345932..7f47605c3b7e5 100644 --- a/br/pkg/restore/client.go +++ b/br/pkg/restore/client.go @@ -54,7 +54,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/statistics/handle" - "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/store/pdtypes" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/util/codec" @@ -64,6 +63,7 @@ import ( "github.com/tikv/client-go/v2/oracle" kvutil "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" "go.uber.org/zap/zapcore" "golang.org/x/sync/errgroup" @@ -1811,7 +1811,7 @@ func (rc *Client) GoWaitTiFlashReady(ctx context.Context, inCh <-chan *CreatedTa if err != nil { errCh <- err } - tiFlashStores := make(map[int64]helper.StoreStat) + tiFlashStores := make(map[int64]pdhttp.StoreInfo) for _, store := range tikvStats.Stores { for _, l := range store.Store.Labels { if l.Key == "engine" && l.Value == "tiflash" { diff --git a/dumpling/export/BUILD.bazel b/dumpling/export/BUILD.bazel index ca5e92fbdcdf1..e817a54df5ddc 100644 --- a/dumpling/export/BUILD.bazel +++ b/dumpling/export/BUILD.bazel @@ -59,6 +59,7 @@ go_library( "@com_github_soheilhy_cmux//:cmux", "@com_github_spf13_pflag//:pflag", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", diff --git a/dumpling/export/sql.go b/dumpling/export/sql.go index 84355b797bccc..7292e86c28467 100644 --- a/dumpling/export/sql.go +++ b/dumpling/export/sql.go @@ -22,7 +22,7 @@ import ( dbconfig "github.com/pingcap/tidb/pkg/config" "github.com/pingcap/tidb/pkg/errno" "github.com/pingcap/tidb/pkg/parser/model" - "github.com/pingcap/tidb/pkg/store/helper" + pd "github.com/tikv/pd/client/http" "go.uber.org/multierr" "go.uber.org/zap" ) @@ -1465,19 +1465,19 @@ func GetDBInfo(db *sql.Conn, tables map[string]map[string]struct{}) ([]*model.DB // GetRegionInfos get region info including regionID, start key, end key from database sql interface. // start key, end key includes information to help split table -func GetRegionInfos(db *sql.Conn) (*helper.RegionsInfo, error) { +func GetRegionInfos(db *sql.Conn) (*pd.RegionsInfo, error) { const tableRegionSQL = "SELECT REGION_ID,START_KEY,END_KEY FROM INFORMATION_SCHEMA.TIKV_REGION_STATUS ORDER BY START_KEY;" var ( regionID int64 startKey, endKey string ) - regionsInfo := &helper.RegionsInfo{Regions: make([]helper.RegionInfo, 0)} + regionsInfo := &pd.RegionsInfo{Regions: make([]pd.RegionInfo, 0)} err := simpleQuery(db, tableRegionSQL, func(rows *sql.Rows) error { err := rows.Scan(®ionID, &startKey, &endKey) if err != nil { return errors.Trace(err) } - regionsInfo.Regions = append(regionsInfo.Regions, helper.RegionInfo{ + regionsInfo.Regions = append(regionsInfo.Regions, pd.RegionInfo{ ID: regionID, StartKey: startKey, EndKey: endKey, diff --git a/go.mod b/go.mod index 92d9e353fb609..1803e2034ed0c 100644 --- a/go.mod +++ b/go.mod @@ -102,8 +102,8 @@ require ( github.com/stretchr/testify v1.8.4 github.com/tdakkota/asciicheck v0.2.0 github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 - github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566 - github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa + github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e + github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865 github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 github.com/twmb/murmur3 v1.1.6 github.com/uber/jaeger-client-go v2.22.1+incompatible diff --git a/go.sum b/go.sum index 97d954210967d..d3c10b43c725d 100644 --- a/go.sum +++ b/go.sum @@ -991,10 +991,10 @@ github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2 h1:mbAskLJ0oJf github.com/tiancaiamao/appdash v0.0.0-20181126055449-889f96f722a2/go.mod h1:2PfKggNGDuadAa0LElHrByyrz4JPZ9fFx6Gs7nx7ZZU= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a h1:J/YdBZ46WKpXsxsW93SG+q0F8KI+yFrcIDT4c/RNoc4= github.com/tiancaiamao/gp v0.0.0-20221230034425-4025bc8a4d4a/go.mod h1:h4xBhSNtOeEosLJ4P7JyKXX7Cabg7AVkWCK5gV2vOrM= -github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566 h1:ULv8/h2S2daBtNDoovptSBC5fJEBKrx0K7E1K8iVOSw= -github.com/tikv/client-go/v2 v2.0.8-0.20231030120815-1362f1e87566/go.mod h1:XiEHwWZfJqgafxW/VEgi1ltGWB9yjwCJBs2kW1xHMY4= -github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa h1:qgbTvsjSzU2A9ItK+NUSUHgtvDTeaWk1mGH2Kjbaf7s= -github.com/tikv/pd/client v0.0.0-20231113092444-be31c08186fa/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= +github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e h1:kl8+gDOfPfRqkc1VDhhjhezMvsbfRENYsm/FqSIDnwg= +github.com/tikv/client-go/v2 v2.0.8-0.20231114060955-8fc8a528217e/go.mod h1:fEAE7GS/lta+OasPOacdgy6RlJIRaq9/Cyr2WbSYcBE= +github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865 h1:Gkvo77EevOpBGIdV1c8gwRqPhVbgLPRy82tXNEFpGTc= +github.com/tikv/pd/client v0.0.0-20231114041114-86831ce71865/go.mod h1:cd6zBqRM9aogxf26K8NnFRPVtq9BnRE59tKEpX8IaWQ= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966 h1:quvGphlmUVU+nhpFa4gg4yJyTRJ13reZMDHrKwYw53M= github.com/timakin/bodyclose v0.0.0-20230421092635-574207250966/go.mod h1:27bSVNWSBOHm+qRp1T9qzaIpsWEP6TbUnei/43HK+PQ= github.com/tklauser/go-sysconf v0.3.9/go.mod h1:11DU/5sG7UexIrp/O6g35hrWzu0JxlwQ3LSFUzyeuhs= diff --git a/pkg/ddl/BUILD.bazel b/pkg/ddl/BUILD.bazel index 5e907cc13c382..c244607ec0351 100644 --- a/pkg/ddl/BUILD.bazel +++ b/pkg/ddl/BUILD.bazel @@ -170,6 +170,7 @@ go_library( "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/rangetask", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@org_golang_x_sync//errgroup", "@org_uber_go_atomic//:atomic", diff --git a/pkg/ddl/ddl_tiflash_api.go b/pkg/ddl/ddl_tiflash_api.go index 93b1ddb38ecda..a16502ee04ae6 100644 --- a/pkg/ddl/ddl_tiflash_api.go +++ b/pkg/ddl/ddl_tiflash_api.go @@ -35,11 +35,11 @@ import ( "github.com/pingcap/tidb/pkg/infoschema" "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/sessionctx" - "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/intest" "github.com/pingcap/tidb/pkg/util/logutil" + pd "github.com/tikv/pd/client/http" atomicutil "go.uber.org/atomic" "go.uber.org/zap" ) @@ -111,7 +111,7 @@ func NewPollTiFlashBackoffContext(minThreshold, maxThreshold TiFlashTick, capaci // TiFlashManagementContext is the context for TiFlash Replica Management type TiFlashManagementContext struct { - TiFlashStores map[int64]helper.StoreStat + TiFlashStores map[int64]pd.StoreInfo PollCounter uint64 Backoff *PollTiFlashBackoffContext // tables waiting for updating progress after become available. @@ -206,7 +206,7 @@ func NewTiFlashManagementContext() (*TiFlashManagementContext, error) { } return &TiFlashManagementContext{ PollCounter: 0, - TiFlashStores: make(map[int64]helper.StoreStat), + TiFlashStores: make(map[int64]pd.StoreInfo), Backoff: c, UpdatingProgressTables: list.New(), }, nil @@ -293,7 +293,7 @@ func LoadTiFlashReplicaInfo(tblInfo *model.TableInfo, tableList *[]TiFlashReplic } // UpdateTiFlashHTTPAddress report TiFlash's StatusAddress's port to Pd's etcd. -func (d *ddl) UpdateTiFlashHTTPAddress(store *helper.StoreStat) error { +func (d *ddl) UpdateTiFlashHTTPAddress(store *pd.StoreInfo) error { host, _, err := net.SplitHostPort(store.Store.StatusAddress) if err != nil { return errors.Trace(err) @@ -338,7 +338,7 @@ func updateTiFlashStores(pollTiFlashContext *TiFlashManagementContext) error { if err != nil { return err } - pollTiFlashContext.TiFlashStores = make(map[int64]helper.StoreStat) + pollTiFlashContext.TiFlashStores = make(map[int64]pd.StoreInfo) for _, store := range tikvStats.Stores { for _, l := range store.Store.Labels { if l.Key == "engine" && l.Value == "tiflash" { diff --git a/pkg/domain/infosync/BUILD.bazel b/pkg/domain/infosync/BUILD.bazel index b128dfc06ee37..a6d571336bfc5 100644 --- a/pkg/domain/infosync/BUILD.bazel +++ b/pkg/domain/infosync/BUILD.bazel @@ -50,6 +50,7 @@ go_library( "@com_github_tikv_client_go_v2//oracle", "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@io_etcd_go_etcd_client_v3//:client", "@io_etcd_go_etcd_client_v3//concurrency", "@org_uber_go_zap//:zap", diff --git a/pkg/domain/infosync/info.go b/pkg/domain/infosync/info.go index 3fa8ccec5d2c9..79631640843ba 100644 --- a/pkg/domain/infosync/info.go +++ b/pkg/domain/infosync/info.go @@ -23,7 +23,6 @@ import ( "net/http" "os" "path" - "regexp" "strconv" "strings" "sync" @@ -47,7 +46,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/terror" "github.com/pingcap/tidb/pkg/sessionctx/binloginfo" "github.com/pingcap/tidb/pkg/sessionctx/variable" - "github.com/pingcap/tidb/pkg/store/helper" util2 "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/dbterror" "github.com/pingcap/tidb/pkg/util/hack" @@ -57,6 +55,7 @@ import ( "github.com/tikv/client-go/v2/oracle" "github.com/tikv/client-go/v2/tikv" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" "go.etcd.io/etcd/client/v3/concurrency" "go.uber.org/zap" @@ -411,7 +410,7 @@ func DeleteTiFlashTableSyncProgress(tableInfo *model.TableInfo) error { } // MustGetTiFlashProgress gets tiflash replica progress from tiflashProgressCache, if cache not exist, it calculates progress from PD and TiFlash and inserts progress into cache. -func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *map[int64]helper.StoreStat) (float64, error) { +func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *map[int64]pdhttp.StoreInfo) (float64, error) { is, err := getGlobalInfoSyncer() if err != nil { return 0, err @@ -428,7 +427,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m if err != nil { return 0, err } - stores := make(map[int64]helper.StoreStat) + stores := make(map[int64]pdhttp.StoreInfo) for _, store := range tikvStats.Stores { for _, l := range store.Store.Labels { if l.Key == "engine" && l.Value == "tiflash" { @@ -448,6 +447,7 @@ func MustGetTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores *m return progress, nil } +// TODO: replace with the unified PD HTTP client. func doRequest(ctx context.Context, apiName string, addrs []string, route, method string, body io.Reader) ([]byte, error) { var err error var req *http.Request @@ -501,16 +501,6 @@ func doRequest(ctx context.Context, apiName string, addrs []string, route, metho return nil, err } -func removeVAndHash(v string) string { - if v == "" { - return v - } - versionHash := regexp.MustCompile("-[0-9]+-g[0-9a-f]{7,}(-dev)?") - v = versionHash.ReplaceAllLiteralString(v, "") - v = strings.TrimSuffix(v, "-dirty") - return strings.TrimPrefix(v, "v") -} - func doRequestWithFailpoint(req *http.Request) (resp *http.Response, err error) { fpEnabled := false failpoint.Inject("FailPlacement", func(val failpoint.Value) { @@ -1107,7 +1097,7 @@ func GetLabelRules(ctx context.Context, ruleIDs []string) (map[string]*label.Rul } // CalculateTiFlashProgress calculates TiFlash replica progress -func CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]helper.StoreStat) (float64, error) { +func CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]pdhttp.StoreInfo) (float64, error) { is, err := getGlobalInfoSyncer() if err != nil { return 0, errors.Trace(err) @@ -1195,7 +1185,7 @@ func GetTiFlashRegionCountFromPD(ctx context.Context, tableID int64, regionCount } // GetTiFlashStoresStat gets the TiKV store information by accessing PD's api. -func GetTiFlashStoresStat(ctx context.Context) (*helper.StoresStat, error) { +func GetTiFlashStoresStat(ctx context.Context) (*pdhttp.StoresInfo, error) { is, err := getGlobalInfoSyncer() if err != nil { return nil, errors.Trace(err) diff --git a/pkg/domain/infosync/tiflash_manager.go b/pkg/domain/infosync/tiflash_manager.go index 73500f9eea65c..3b892c5378904 100644 --- a/pkg/domain/infosync/tiflash_manager.go +++ b/pkg/domain/infosync/tiflash_manager.go @@ -41,6 +41,7 @@ import ( "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/syncutil" "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" clientv3 "go.etcd.io/etcd/client/v3" "go.uber.org/zap" ) @@ -67,9 +68,9 @@ type TiFlashReplicaManager interface { // GetRegionCountFromPD is a helper function calling `/stats/region`. GetRegionCountFromPD(ctx context.Context, tableID int64, regionCount *int) error // GetStoresStat gets the TiKV store information by accessing PD's api. - GetStoresStat(ctx context.Context) (*helper.StoresStat, error) + GetStoresStat(ctx context.Context) (*pd.StoresInfo, error) // CalculateTiFlashProgress calculates TiFlash replica progress - CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]helper.StoreStat) (float64, error) + CalculateTiFlashProgress(tableID int64, replicaCount uint64, TiFlashStores map[int64]pd.StoreInfo) (float64, error) // UpdateTiFlashProgressCache updates tiflashProgressCache UpdateTiFlashProgressCache(tableID int64, progress float64) // GetTiFlashProgressFromCache gets tiflash replica progress from tiflashProgressCache @@ -91,11 +92,9 @@ type TiFlashReplicaManagerCtx struct { } // Close is called to close TiFlashReplicaManagerCtx. -func (m *TiFlashReplicaManagerCtx) Close(ctx context.Context) { +func (m *TiFlashReplicaManagerCtx) Close(context.Context) {} -} - -func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, keyspaceID tikv.KeyspaceID, tableID int64) (int, error) { +func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]pd.StoreInfo, keyspaceID tikv.KeyspaceID, tableID int64) (int, error) { // storeIDs -> regionID, PD will not create two peer on the same store var flashPeerCount int for _, store := range tiFlashStores { @@ -121,7 +120,7 @@ func getTiFlashPeerWithoutLagCount(tiFlashStores map[int64]helper.StoreStat, key } // calculateTiFlashProgress calculates progress based on the region status from PD and TiFlash. -func calculateTiFlashProgress(keyspaceID tikv.KeyspaceID, tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { +func calculateTiFlashProgress(keyspaceID tikv.KeyspaceID, tableID int64, replicaCount uint64, tiFlashStores map[int64]pd.StoreInfo) (float64, error) { var regionCount int if err := GetTiFlashRegionCountFromPD(context.Background(), tableID, ®ionCount); err != nil { logutil.BgLogger().Error("Fail to get regionCount from PD.", @@ -172,7 +171,7 @@ func encodeRuleID(c tikv.Codec, ruleID string) string { } // CalculateTiFlashProgress calculates TiFlash replica progress. -func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { +func (m *TiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]pd.StoreInfo) (float64, error) { return calculateTiFlashProgress(m.codec.GetKeyspaceID(), tableID, replicaCount, tiFlashStores) } @@ -520,8 +519,8 @@ func (m *TiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, tab } // GetStoresStat gets the TiKV store information by accessing PD's api. -func (m *TiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) { - var storesStat helper.StoresStat +func (m *TiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*pd.StoresInfo, error) { + var storesStat pd.StoresInfo res, err := doRequest(ctx, "GetStoresStat", m.etcdCli.Endpoints(), pdapi.Stores, "GET", nil) if err != nil { return nil, errors.Trace(err) @@ -610,7 +609,7 @@ type MockTiFlash struct { StatusAddr string StatusServer *httptest.Server SyncStatus map[int]mockTiFlashTableInfo - StoreInfo map[uint64]helper.StoreBaseStat + StoreInfo map[uint64]pd.MetaStore GlobalTiFlashPlacementRules map[string]placement.TiFlashRule PdEnabled bool TiflashDelay time.Duration @@ -672,7 +671,7 @@ func NewMockTiFlash() *MockTiFlash { StatusAddr: "", StatusServer: nil, SyncStatus: make(map[int]mockTiFlashTableInfo), - StoreInfo: make(map[uint64]helper.StoreBaseStat), + StoreInfo: make(map[uint64]pd.MetaStore), GlobalTiFlashPlacementRules: make(map[string]placement.TiFlashRule), PdEnabled: true, TiflashDelay: 0, @@ -804,7 +803,7 @@ func (tiflash *MockTiFlash) HandleGetPDRegionRecordStats(_ int64) helper.PDRegio // AddStore is mock function for adding store info into MockTiFlash. func (tiflash *MockTiFlash) AddStore(storeID uint64, address string) { - tiflash.StoreInfo[storeID] = helper.StoreBaseStat{ + tiflash.StoreInfo[storeID] = pd.MetaStore{ ID: int64(storeID), Address: address, State: 0, @@ -813,7 +812,7 @@ func (tiflash *MockTiFlash) AddStore(storeID uint64, address string) { StatusAddress: tiflash.StatusAddr, GitHash: "mock-tikv-githash", StartTimestamp: tiflash.StartTime.Unix(), - Labels: []helper.StoreLabel{{ + Labels: []pd.StoreLabel{{ Key: "engine", Value: "tiflash", }}, @@ -822,16 +821,16 @@ func (tiflash *MockTiFlash) AddStore(storeID uint64, address string) { // HandleGetStoresStat is mock function for GetStoresStat. // It returns address of our mocked TiFlash server. -func (tiflash *MockTiFlash) HandleGetStoresStat() *helper.StoresStat { +func (tiflash *MockTiFlash) HandleGetStoresStat() *pd.StoresInfo { tiflash.Lock() defer tiflash.Unlock() if len(tiflash.StoreInfo) == 0 { // default Store - return &helper.StoresStat{ + return &pd.StoresInfo{ Count: 1, - Stores: []helper.StoreStat{ + Stores: []pd.StoreInfo{ { - Store: helper.StoreBaseStat{ + Store: pd.MetaStore{ ID: 1, Address: "127.0.0.1:3930", State: 0, @@ -840,7 +839,7 @@ func (tiflash *MockTiFlash) HandleGetStoresStat() *helper.StoresStat { StatusAddress: tiflash.StatusAddr, GitHash: "mock-tikv-githash", StartTimestamp: tiflash.StartTime.Unix(), - Labels: []helper.StoreLabel{{ + Labels: []pd.StoreLabel{{ Key: "engine", Value: "tiflash", }}, @@ -849,11 +848,11 @@ func (tiflash *MockTiFlash) HandleGetStoresStat() *helper.StoresStat { }, } } - stores := make([]helper.StoreStat, 0, len(tiflash.StoreInfo)) + stores := make([]pd.StoreInfo, 0, len(tiflash.StoreInfo)) for _, storeInfo := range tiflash.StoreInfo { - stores = append(stores, helper.StoreStat{Store: storeInfo, Status: helper.StoreDetailStat{}}) + stores = append(stores, pd.StoreInfo{Store: storeInfo, Status: pd.StoreStatus{}}) } - return &helper.StoresStat{ + return &pd.StoresInfo{ Count: len(tiflash.StoreInfo), Stores: stores, } @@ -970,7 +969,7 @@ func (tiflash *MockTiFlash) SetNetworkError(e bool) { } // CalculateTiFlashProgress return truncated string to avoid float64 comparison. -func (m *mockTiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]helper.StoreStat) (float64, error) { +func (m *mockTiFlashReplicaManagerCtx) CalculateTiFlashProgress(tableID int64, replicaCount uint64, tiFlashStores map[int64]pd.StoreInfo) (float64, error) { return calculateTiFlashProgress(tikv.NullspaceID, tableID, replicaCount, tiFlashStores) } @@ -1093,7 +1092,7 @@ func (m *mockTiFlashReplicaManagerCtx) GetRegionCountFromPD(ctx context.Context, } // GetStoresStat gets the TiKV store information by accessing PD's api. -func (m *mockTiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*helper.StoresStat, error) { +func (m *mockTiFlashReplicaManagerCtx) GetStoresStat(ctx context.Context) (*pd.StoresInfo, error) { m.Lock() defer m.Unlock() if m.tiflash == nil { diff --git a/pkg/executor/BUILD.bazel b/pkg/executor/BUILD.bazel index 247972dd8dbef..4f3ef84706174 100644 --- a/pkg/executor/BUILD.bazel +++ b/pkg/executor/BUILD.bazel @@ -269,6 +269,7 @@ go_library( "@com_github_tikv_client_go_v2//txnkv/txnsnapshot", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@com_github_twmb_murmur3//:murmur3", "@com_sourcegraph_sourcegraph_appdash//:appdash", "@com_sourcegraph_sourcegraph_appdash//opentracing", @@ -467,6 +468,7 @@ go_test( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_uber_go_atomic//:atomic", "@org_uber_go_goleak//:goleak", diff --git a/pkg/executor/infoschema_cluster_table_test.go b/pkg/executor/infoschema_cluster_table_test.go index f4050860e4125..b8187b8b38cfb 100644 --- a/pkg/executor/infoschema_cluster_table_test.go +++ b/pkg/executor/infoschema_cluster_table_test.go @@ -34,10 +34,13 @@ import ( "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" "github.com/pingcap/tidb/pkg/store/helper" + "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util" "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" "google.golang.org/grpc" ) @@ -53,9 +56,12 @@ type infosSchemaClusterTableSuite struct { func createInfosSchemaClusterTableSuite(t *testing.T) *infosSchemaClusterTableSuite { s := new(infosSchemaClusterTableSuite) - s.store, s.dom = testkit.CreateMockStoreAndDomain(t) - s.rpcServer, s.listenAddr = setUpRPCService(t, s.dom, "127.0.0.1:0") s.httpServer, s.mockAddr = s.setUpMockPDHTTPServer() + s.store, s.dom = testkit.CreateMockStoreAndDomain( + t, + mockstore.WithTiKVOptions(tikv.WithPDHTTPClient([]string{s.mockAddr})), + ) + s.rpcServer, s.listenAddr = setUpRPCService(t, s.dom, "127.0.0.1:0") s.startTime = time.Now() t.Cleanup(func() { if s.rpcServer != nil { @@ -101,12 +107,12 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server srv := httptest.NewServer(router) // mock store stats stat mockAddr := strings.TrimPrefix(srv.URL, "http://") - router.Handle(pdapi.Stores, fn.Wrap(func() (*helper.StoresStat, error) { - return &helper.StoresStat{ + router.Handle(pdapi.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { + return &pd.StoresInfo{ Count: 1, - Stores: []helper.StoreStat{ + Stores: []pd.StoreInfo{ { - Store: helper.StoreBaseStat{ + Store: pd.MetaStore{ ID: 1, Address: "127.0.0.1:20160", State: 0, @@ -121,15 +127,15 @@ func (s *infosSchemaClusterTableSuite) setUpMockPDHTTPServer() (*httptest.Server }, nil })) // mock regions - router.Handle(pdapi.Regions, fn.Wrap(func() (*helper.RegionsInfo, error) { - return &helper.RegionsInfo{ + router.Handle(pdapi.Regions, fn.Wrap(func() (*pd.RegionsInfo, error) { + return &pd.RegionsInfo{ Count: 1, - Regions: []helper.RegionInfo{ + Regions: []pd.RegionInfo{ { ID: 1, StartKey: "", EndKey: "", - Epoch: helper.RegionEpoch{ + Epoch: pd.RegionEpoch{ ConfVer: 1, Version: 2, }, diff --git a/pkg/executor/infoschema_reader.go b/pkg/executor/infoschema_reader.go index d16f1b6620492..3614b2c69e7fe 100644 --- a/pkg/executor/infoschema_reader.go +++ b/pkg/executor/infoschema_reader.go @@ -70,7 +70,6 @@ import ( "github.com/pingcap/tidb/pkg/util/keydecoder" "github.com/pingcap/tidb/pkg/util/logutil" "github.com/pingcap/tidb/pkg/util/memory" - "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/resourcegrouptag" "github.com/pingcap/tidb/pkg/util/sem" "github.com/pingcap/tidb/pkg/util/servermemorylimit" @@ -81,6 +80,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" + pd "github.com/tikv/pd/client/http" "go.uber.org/zap" ) @@ -152,9 +152,9 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex case infoschema.TableUserPrivileges: e.setDataFromUserPrivileges(sctx) case infoschema.TableTiKVRegionStatus: - err = e.setDataForTiKVRegionStatus(sctx) + err = e.setDataForTiKVRegionStatus(ctx, sctx) case infoschema.TableTiDBHotRegions: - err = e.setDataForTiDBHotRegions(sctx) + err = e.setDataForTiDBHotRegions(ctx, sctx) case infoschema.TableConstraints: e.setDataFromTableConstraints(sctx, dbs) case infoschema.TableSessionVar: @@ -164,7 +164,7 @@ func (e *memtableRetriever) retrieve(ctx context.Context, sctx sessionctx.Contex case infoschema.TableTiFlashReplica: e.dataForTableTiFlashReplica(sctx, dbs) case infoschema.TableTiKVStoreStatus: - err = e.dataForTiKVStoreStatus(sctx) + err = e.dataForTiKVStoreStatus(ctx, sctx) case infoschema.TableClientErrorsSummaryGlobal, infoschema.TableClientErrorsSummaryByUser, infoschema.TableClientErrorsSummaryByHost: @@ -1172,8 +1172,8 @@ func (e *memtableRetriever) setDataFromViews(ctx sessionctx.Context, schemas []* e.rows = rows } -func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err error) { - tikvStore, ok := ctx.GetStore().(helper.Storage) +func (e *memtableRetriever) dataForTiKVStoreStatus(ctx context.Context, sctx sessionctx.Context) (err error) { + tikvStore, ok := sctx.GetStore().(helper.Storage) if !ok { return errors.New("Information about TiKV store status can be gotten only when the storage is TiKV") } @@ -1181,7 +1181,7 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err Store: tikvStore, RegionCache: tikvStore.GetRegionCache(), } - storesStat, err := tikvHelper.GetStoresStat() + storesStat, err := tikvHelper.PDHTTPClient().GetStores(ctx) if err != nil { return err } @@ -1211,15 +1211,15 @@ func (e *memtableRetriever) dataForTiKVStoreStatus(ctx sessionctx.Context) (err row[13].SetFloat64(storeStat.Status.RegionWeight) row[14].SetFloat64(storeStat.Status.RegionScore) row[15].SetInt64(storeStat.Status.RegionSize) - startTs := types.NewTime(types.FromGoTime(storeStat.Status.StartTs), mysql.TypeDatetime, types.DefaultFsp) + startTs := types.NewTime(types.FromGoTime(storeStat.Status.StartTS), mysql.TypeDatetime, types.DefaultFsp) row[16].SetMysqlTime(startTs) - lastHeartbeatTs := types.NewTime(types.FromGoTime(storeStat.Status.LastHeartbeatTs), mysql.TypeDatetime, types.DefaultFsp) + lastHeartbeatTs := types.NewTime(types.FromGoTime(storeStat.Status.LastHeartbeatTS), mysql.TypeDatetime, types.DefaultFsp) row[17].SetMysqlTime(lastHeartbeatTs) row[18].SetString(storeStat.Status.Uptime, mysql.DefaultCollationName) if sem.IsEnabled() { // Patch out IP addresses etc if the user does not have the RESTRICTED_TABLES_ADMIN privilege - checker := privilege.GetPrivilegeManager(ctx) - if checker == nil || !checker.RequestDynamicVerification(ctx.GetSessionVars().ActiveRoles, "RESTRICTED_TABLES_ADMIN", false) { + checker := privilege.GetPrivilegeManager(sctx) + if checker == nil || !checker.RequestDynamicVerification(sctx.GetSessionVars().ActiveRoles, "RESTRICTED_TABLES_ADMIN", false) { row[1].SetString(strconv.FormatInt(storeStat.Store.ID, 10), mysql.DefaultCollationName) row[1].SetNull() row[6].SetNull() @@ -1580,7 +1580,7 @@ func keyColumnUsageInTable(schema *model.DBInfo, table *model.TableInfo) [][]typ return rows } -func (e *memtableRetriever) setDataForTiKVRegionStatus(sctx sessionctx.Context) (err error) { +func (e *memtableRetriever) setDataForTiKVRegionStatus(ctx context.Context, sctx sessionctx.Context) (err error) { checker := privilege.GetPrivilegeManager(sctx) var extractorTableIDs []int64 tikvStore, ok := sctx.GetStore().(helper.Storage) @@ -1592,14 +1592,14 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(sctx sessionctx.Context) RegionCache: tikvStore.GetRegionCache(), } requestByTableRange := false - allRegionsInfo := helper.NewRegionsInfo() + var allRegionsInfo *pd.RegionsInfo is := sctx.GetDomainInfoSchema().(infoschema.InfoSchema) if e.extractor != nil { extractor, ok := e.extractor.(*plannercore.TiKVRegionStatusExtractor) if ok && len(extractor.GetTablesID()) > 0 { extractorTableIDs = extractor.GetTablesID() for _, tableID := range extractorTableIDs { - regionsInfo, err := e.getRegionsInfoForTable(tikvHelper, is, tableID) + regionsInfo, err := e.getRegionsInfoForTable(ctx, tikvHelper, is, tableID) if err != nil { if errors.ErrorEqual(err, infoschema.ErrTableExists) { continue @@ -1612,7 +1612,7 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(sctx sessionctx.Context) } } if !requestByTableRange { - allRegionsInfo, err = tikvHelper.GetRegionsInfo() + allRegionsInfo, err = tikvHelper.PDHTTPClient().GetRegions(ctx) if err != nil { return err } @@ -1638,7 +1638,7 @@ func (e *memtableRetriever) setDataForTiKVRegionStatus(sctx sessionctx.Context) return nil } -func (e *memtableRetriever) getRegionsInfoForTable(h *helper.Helper, is infoschema.InfoSchema, tableID int64) (*helper.RegionsInfo, error) { +func (e *memtableRetriever) getRegionsInfoForTable(ctx context.Context, h *helper.Helper, is infoschema.InfoSchema, tableID int64) (*pd.RegionsInfo, error) { tbl, _ := is.TableByID(tableID) if tbl == nil { return nil, infoschema.ErrTableExists.GenWithStackByArgs(tableID) @@ -1646,16 +1646,16 @@ func (e *memtableRetriever) getRegionsInfoForTable(h *helper.Helper, is infosche pt := tbl.Meta().GetPartitionInfo() if pt == nil { - regionsInfo, err := e.getRegionsInfoForSingleTable(h, tableID) + regionsInfo, err := e.getRegionsInfoForSingleTable(ctx, h, tableID) if err != nil { return nil, err } return regionsInfo, nil } - allRegionsInfo := helper.NewRegionsInfo() + var allRegionsInfo *pd.RegionsInfo for _, def := range pt.Definitions { - regionsInfo, err := e.getRegionsInfoForSingleTable(h, def.ID) + regionsInfo, err := e.getRegionsInfoForSingleTable(ctx, h, def.ID) if err != nil { return nil, err } @@ -1664,13 +1664,13 @@ func (e *memtableRetriever) getRegionsInfoForTable(h *helper.Helper, is infosche return allRegionsInfo, nil } -func (*memtableRetriever) getRegionsInfoForSingleTable(helper *helper.Helper, tableID int64) (*helper.RegionsInfo, error) { +func (*memtableRetriever) getRegionsInfoForSingleTable(ctx context.Context, helper *helper.Helper, tableID int64) (*pd.RegionsInfo, error) { sk, ek := tablecodec.GetTableHandleKeyRange(tableID) - sRegion, err := helper.GetRegionByKey(codec.EncodeBytes(nil, sk)) + sRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, sk)) if err != nil { return nil, err } - eRegion, err := helper.GetRegionByKey(codec.EncodeBytes(nil, ek)) + eRegion, err := helper.PDHTTPClient().GetRegionByKey(ctx, codec.EncodeBytes(nil, ek)) if err != nil { return nil, err } @@ -1682,10 +1682,10 @@ func (*memtableRetriever) getRegionsInfoForSingleTable(helper *helper.Helper, ta if err != nil { return nil, err } - return helper.GetRegionsInfoByRange(sk, ek) + return helper.PDHTTPClient().GetRegionsByKey(ctx, sk, ek, -1) } -func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *helper.RegionInfo, table *helper.TableInfo) { +func (e *memtableRetriever) setNewTiKVRegionStatusCol(region *pd.RegionInfo, table *helper.TableInfo) { row := make([]types.Datum, len(infoschema.TableTiKVRegionStatusCols)) row[0].SetInt64(region.ID) row[1].SetString(region.StartKey, mysql.DefaultCollationName) @@ -1731,22 +1731,22 @@ const ( downPeer = "DOWN" ) -func (e *memtableRetriever) setDataForTiDBHotRegions(ctx sessionctx.Context) error { - tikvStore, ok := ctx.GetStore().(helper.Storage) +func (e *memtableRetriever) setDataForTiDBHotRegions(ctx context.Context, sctx sessionctx.Context) error { + tikvStore, ok := sctx.GetStore().(helper.Storage) if !ok { return errors.New("Information about hot region can be gotten only when the storage is TiKV") } - allSchemas := ctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas() + allSchemas := sctx.GetInfoSchema().(infoschema.InfoSchema).AllSchemas() tikvHelper := &helper.Helper{ Store: tikvStore, RegionCache: tikvStore.GetRegionCache(), } - metrics, err := tikvHelper.ScrapeHotInfo(pdapi.HotRead, allSchemas) + metrics, err := tikvHelper.ScrapeHotInfo(ctx, helper.HotRead, allSchemas) if err != nil { return err } e.setDataForHotRegionByMetrics(metrics, "read") - metrics, err = tikvHelper.ScrapeHotInfo(pdapi.HotWrite, allSchemas) + metrics, err = tikvHelper.ScrapeHotInfo(ctx, helper.HotWrite, allSchemas) if err != nil { return err } @@ -2256,9 +2256,11 @@ func (e *memtableRetriever) setDataFromSequences(ctx sessionctx.Context, schemas // dataForTableTiFlashReplica constructs data for table tiflash replica info. func (e *memtableRetriever) dataForTableTiFlashReplica(ctx sessionctx.Context, schemas []*model.DBInfo) { - checker := privilege.GetPrivilegeManager(ctx) - var rows [][]types.Datum - var tiFlashStores map[int64]helper.StoreStat + var ( + checker = privilege.GetPrivilegeManager(ctx) + rows [][]types.Datum + tiFlashStores map[int64]pd.StoreInfo + ) for _, schema := range schemas { for _, tbl := range schema.Tables { if tbl.TiFlashReplica == nil { diff --git a/pkg/executor/memtable_reader.go b/pkg/executor/memtable_reader.go index b0cdbe40d2a12..00f340b921318 100644 --- a/pkg/executor/memtable_reader.go +++ b/pkg/executor/memtable_reader.go @@ -49,6 +49,7 @@ import ( "github.com/pingcap/tidb/pkg/util/execdetails" "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/pingcap/tidb/pkg/util/set" + pd "github.com/tikv/pd/client/http" "google.golang.org/grpc" "google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials/insecure" @@ -812,7 +813,7 @@ func (*hotRegionsHistoryRetriver) getHotRegionRowWithSchemaInfo( tables []helper.TableInfoWithKeyRange, tz *time.Location, ) ([][]types.Datum, error) { - regionsInfo := []*helper.RegionInfo{ + regionsInfo := []*pd.RegionInfo{ { ID: int64(hisHotRegion.RegionID), StartKey: hisHotRegion.StartKey, @@ -872,7 +873,7 @@ type tikvRegionPeersRetriever struct { retrieved bool } -func (e *tikvRegionPeersRetriever) retrieve(_ context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { +func (e *tikvRegionPeersRetriever) retrieve(ctx context.Context, sctx sessionctx.Context) ([][]types.Datum, error) { if e.extractor.SkipRequest || e.retrieved { return nil, nil } @@ -886,12 +887,12 @@ func (e *tikvRegionPeersRetriever) retrieve(_ context.Context, sctx sessionctx.C RegionCache: tikvStore.GetRegionCache(), } - var regionsInfo, regionsInfoByStoreID []helper.RegionInfo - regionMap := make(map[int64]*helper.RegionInfo) + var regionsInfo, regionsInfoByStoreID []pd.RegionInfo + regionMap := make(map[int64]*pd.RegionInfo) storeMap := make(map[int64]struct{}) if len(e.extractor.StoreIDs) == 0 && len(e.extractor.RegionIDs) == 0 { - regionsInfo, err := tikvHelper.GetRegionsInfo() + regionsInfo, err := tikvHelper.PDHTTPClient().GetRegions(ctx) if err != nil { return nil, err } @@ -902,7 +903,7 @@ func (e *tikvRegionPeersRetriever) retrieve(_ context.Context, sctx sessionctx.C // if a region_id located in 1, 4, 7 store we will get all of them when request any store_id, // storeMap is used to filter peers on unexpected stores. storeMap[int64(storeID)] = struct{}{} - storeRegionsInfo, err := tikvHelper.GetStoreRegionsInfo(storeID) + storeRegionsInfo, err := tikvHelper.PDHTTPClient().GetRegionsByStoreID(ctx, storeID) if err != nil { return nil, err } @@ -925,7 +926,7 @@ func (e *tikvRegionPeersRetriever) retrieve(_ context.Context, sctx sessionctx.C // if there is storeIDs, target region_id is fetched by storeIDs, // otherwise we need to fetch it from PD. if len(e.extractor.StoreIDs) == 0 { - regionInfo, err := tikvHelper.GetRegionInfoByID(regionID) + regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(ctx, regionID) if err != nil { return nil, err } @@ -950,7 +951,7 @@ func (e *tikvRegionPeersRetriever) isUnexpectedStoreID(storeID int64, storeMap m } func (e *tikvRegionPeersRetriever) packTiKVRegionPeersRows( - regionsInfo []helper.RegionInfo, storeMap map[int64]struct{}) ([][]types.Datum, error) { + regionsInfo []pd.RegionInfo, storeMap map[int64]struct{}) ([][]types.Datum, error) { //nolint: prealloc var rows [][]types.Datum for _, region := range regionsInfo { diff --git a/pkg/executor/show_placement.go b/pkg/executor/show_placement.go index 6af87d39bf3ce..29c8e221a2f5d 100644 --- a/pkg/executor/show_placement.go +++ b/pkg/executor/show_placement.go @@ -31,12 +31,12 @@ import ( "github.com/pingcap/tidb/pkg/parser/model" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/privilege" - "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/table" "github.com/pingcap/tidb/pkg/tablecodec" "github.com/pingcap/tidb/pkg/types" "github.com/pingcap/tidb/pkg/util/codec" "github.com/pingcap/tidb/pkg/util/sqlexec" + pd "github.com/tikv/pd/client/http" ) type showPlacementLabelsResultBuilder struct { @@ -61,7 +61,7 @@ func (b *showPlacementLabelsResultBuilder) AppendStoreLabels(bj types.BinaryJSON return errors.New("only array or null type is allowed") } - labels := make([]*helper.StoreLabel, 0, bj.GetElemCount()) + labels := make([]*pd.StoreLabel, 0, bj.GetElemCount()) err = gjson.Unmarshal(data, &labels) if err != nil { return errors.Trace(err) diff --git a/pkg/executor/show_placement_labels_test.go b/pkg/executor/show_placement_labels_test.go index 33bf42b611d55..5ffab3b3eb3f5 100644 --- a/pkg/executor/show_placement_labels_test.go +++ b/pkg/executor/show_placement_labels_test.go @@ -18,14 +18,14 @@ import ( gjson "encoding/json" "testing" - "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/types" "github.com/stretchr/testify/require" + pd "github.com/tikv/pd/client/http" ) func TestShowPlacementLabelsBuilder(t *testing.T) { cases := []struct { - stores [][]*helper.StoreLabel + stores [][]*pd.StoreLabel expects [][]interface{} }{ { @@ -33,7 +33,7 @@ func TestShowPlacementLabelsBuilder(t *testing.T) { expects: nil, }, { - stores: [][]*helper.StoreLabel{ + stores: [][]*pd.StoreLabel{ {{Key: "zone", Value: "z1"}, {Key: "rack", Value: "r3"}, {Key: "host", Value: "h1"}}, {{Key: "zone", Value: "z1"}, {Key: "rack", Value: "r1"}, {Key: "host", Value: "h2"}}, {{Key: "zone", Value: "z1"}, {Key: "rack", Value: "r2"}, {Key: "host", Value: "h2"}}, diff --git a/pkg/executor/split.go b/pkg/executor/split.go index e2830894589a7..d56981ef8a363 100644 --- a/pkg/executor/split.go +++ b/pkg/executor/split.go @@ -824,7 +824,7 @@ func getRegionInfo(store helper.Storage, regions []regionMeta) ([]regionMeta, er RegionCache: store.GetRegionCache(), } for i := range regions { - regionInfo, err := tikvHelper.GetRegionInfoByID(regions[i].region.Id) + regionInfo, err := tikvHelper.PDHTTPClient().GetRegionByID(context.TODO(), regions[i].region.Id) if err != nil { return nil, err } diff --git a/pkg/executor/tikv_regions_peers_table_test.go b/pkg/executor/tikv_regions_peers_table_test.go index 4994f9b73fe69..b716cee80fb69 100644 --- a/pkg/executor/tikv_regions_peers_table_test.go +++ b/pkg/executor/tikv_regions_peers_table_test.go @@ -26,39 +26,42 @@ import ( "github.com/gorilla/mux" "github.com/pingcap/fn" "github.com/pingcap/tidb/pkg/store/helper" + "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/testkit" "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" ) -var regionsInfo = map[uint64]helper.RegionInfo{ +var regionsInfo = map[uint64]pd.RegionInfo{ 1: { ID: 1, - Peers: []helper.RegionPeer{{ID: 11, StoreID: 1, IsLearner: false}, {ID: 12, StoreID: 2, IsLearner: false}, {ID: 13, StoreID: 3, IsLearner: false}}, - Leader: helper.RegionPeer{ID: 11, StoreID: 1, IsLearner: false}, + Peers: []pd.RegionPeer{{ID: 11, StoreID: 1, IsLearner: false}, {ID: 12, StoreID: 2, IsLearner: false}, {ID: 13, StoreID: 3, IsLearner: false}}, + Leader: pd.RegionPeer{ID: 11, StoreID: 1, IsLearner: false}, }, 2: { ID: 2, - Peers: []helper.RegionPeer{{ID: 21, StoreID: 1, IsLearner: false}, {ID: 22, StoreID: 2, IsLearner: false}, {ID: 23, StoreID: 3, IsLearner: false}}, - Leader: helper.RegionPeer{ID: 22, StoreID: 2, IsLearner: false}, + Peers: []pd.RegionPeer{{ID: 21, StoreID: 1, IsLearner: false}, {ID: 22, StoreID: 2, IsLearner: false}, {ID: 23, StoreID: 3, IsLearner: false}}, + Leader: pd.RegionPeer{ID: 22, StoreID: 2, IsLearner: false}, }, 3: { ID: 3, - Peers: []helper.RegionPeer{{ID: 31, StoreID: 1, IsLearner: false}, {ID: 32, StoreID: 2, IsLearner: false}, {ID: 33, StoreID: 3, IsLearner: false}}, - Leader: helper.RegionPeer{ID: 33, StoreID: 3, IsLearner: false}, + Peers: []pd.RegionPeer{{ID: 31, StoreID: 1, IsLearner: false}, {ID: 32, StoreID: 2, IsLearner: false}, {ID: 33, StoreID: 3, IsLearner: false}}, + Leader: pd.RegionPeer{ID: 33, StoreID: 3, IsLearner: false}, }, } -var storeRegionsInfo = &helper.RegionsInfo{ +var storeRegionsInfo = &pd.RegionsInfo{ Count: 3, - Regions: []helper.RegionInfo{ + Regions: []pd.RegionInfo{ regionsInfo[1], regionsInfo[2], regionsInfo[3], }, } -var storesRegionsInfo = map[uint64]*helper.RegionsInfo{ +var storesRegionsInfo = map[uint64]*pd.RegionsInfo{ 1: storeRegionsInfo, 2: storeRegionsInfo, 3: storeRegionsInfo, @@ -109,7 +112,8 @@ func TestTikvRegionPeers(t *testing.T) { router.HandleFunc(pdapi.RegionByID+"/"+"{id}", regionsInfoHandler) defer server.Close() - store := testkit.CreateMockStore(t) + store := testkit.CreateMockStore(t, + mockstore.WithTiKVOptions(tikv.WithPDHTTPClient([]string{mockAddr}))) store = &mockStore{ store.(helper.Storage), diff --git a/pkg/infoschema/test/clustertablestest/BUILD.bazel b/pkg/infoschema/test/clustertablestest/BUILD.bazel index 3c6e587dbccaf..61977e7ff7ce3 100644 --- a/pkg/infoschema/test/clustertablestest/BUILD.bazel +++ b/pkg/infoschema/test/clustertablestest/BUILD.bazel @@ -29,7 +29,6 @@ go_test( "//pkg/session", "//pkg/session/txninfo", "//pkg/sessionctx/variable", - "//pkg/store/helper", "//pkg/store/mockstore", "//pkg/store/mockstore/mockstorage", "//pkg/store/mockstore/unistore", @@ -53,6 +52,7 @@ go_test( "@com_github_pingcap_tipb//go-tipb", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_uber_go_goleak//:goleak", ], diff --git a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go index c741b42f7eb29..301c677e793e8 100644 --- a/pkg/infoschema/test/clustertablestest/cluster_tables_test.go +++ b/pkg/infoschema/test/clustertablestest/cluster_tables_test.go @@ -42,7 +42,6 @@ import ( "github.com/pingcap/tidb/pkg/parser/auth" "github.com/pingcap/tidb/pkg/parser/mysql" "github.com/pingcap/tidb/pkg/server" - "github.com/pingcap/tidb/pkg/store/helper" "github.com/pingcap/tidb/pkg/store/mockstore" "github.com/pingcap/tidb/pkg/store/mockstore/mockstorage" "github.com/pingcap/tidb/pkg/store/mockstore/unistore" @@ -57,6 +56,7 @@ import ( "github.com/pingcap/tipb/go-tipb" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" + pd "github.com/tikv/pd/client/http" "google.golang.org/grpc" ) @@ -768,12 +768,12 @@ func (s *clusterTablesSuite) setUpMockPDHTTPServer() (*httptest.Server, string) srv := httptest.NewServer(router) // mock store stats stat mockAddr := strings.TrimPrefix(srv.URL, "http://") - router.Handle(pdapi.Stores, fn.Wrap(func() (*helper.StoresStat, error) { - return &helper.StoresStat{ + router.Handle(pdapi.Stores, fn.Wrap(func() (*pd.StoresInfo, error) { + return &pd.StoresInfo{ Count: 1, - Stores: []helper.StoreStat{ + Stores: []pd.StoreInfo{ { - Store: helper.StoreBaseStat{ + Store: pd.MetaStore{ ID: 1, Address: "127.0.0.1:20160", State: 0, diff --git a/pkg/server/handler/tests/BUILD.bazel b/pkg/server/handler/tests/BUILD.bazel index f992b3170f853..7acaf6cb34af5 100644 --- a/pkg/server/handler/tests/BUILD.bazel +++ b/pkg/server/handler/tests/BUILD.bazel @@ -28,7 +28,6 @@ go_test( "//pkg/server/handler/optimizor", "//pkg/server/handler/tikvhandler", "//pkg/server/internal/testserverclient", - "//pkg/server/internal/testutil", "//pkg/server/internal/util", "//pkg/session", "//pkg/sessionctx", diff --git a/pkg/server/handler/tests/http_handler_test.go b/pkg/server/handler/tests/http_handler_test.go index 002022b60a881..628bb78287e8a 100644 --- a/pkg/server/handler/tests/http_handler_test.go +++ b/pkg/server/handler/tests/http_handler_test.go @@ -50,7 +50,6 @@ import ( "github.com/pingcap/tidb/pkg/server/handler/optimizor" "github.com/pingcap/tidb/pkg/server/handler/tikvhandler" "github.com/pingcap/tidb/pkg/server/internal/testserverclient" - "github.com/pingcap/tidb/pkg/server/internal/testutil" "github.com/pingcap/tidb/pkg/server/internal/util" "github.com/pingcap/tidb/pkg/session" "github.com/pingcap/tidb/pkg/sessionctx" @@ -450,7 +449,13 @@ func TestBinlogRecover(t *testing.T) { func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) { var err error - ts.store, err = mockstore.NewMockStore() + ts.Port = uint(rand.Int31n(50000)) + 10000 + ts.StatusPort = ts.Port + 1 + ts.store, err = mockstore.NewMockStore( + mockstore.WithTiKVOptions( + tikv.WithPDHTTPClient([]string{ts.Addr()}), + ), + ) require.NoError(t, err) ts.domain, err = session.BootstrapSession(ts.store) require.NoError(t, err) @@ -458,14 +463,12 @@ func (ts *basicHTTPHandlerTestSuite) startServer(t *testing.T) { cfg := util.NewTestConfig() cfg.Store = "tikv" - cfg.Port = 0 - cfg.Status.StatusPort = 0 + cfg.Port = ts.Port + cfg.Status.StatusPort = ts.StatusPort cfg.Status.ReportStatus = true server, err := server2.NewServer(cfg, ts.tidbdrv) require.NoError(t, err) - ts.Port = testutil.GetPortFromTCPAddr(server.ListenAddr()) - ts.StatusPort = testutil.GetPortFromTCPAddr(server.StatusListenerAddr()) ts.server = server ts.server.SetDomain(ts.domain) go func() { diff --git a/pkg/server/handler/tikvhandler/tikv_handler.go b/pkg/server/handler/tikvhandler/tikv_handler.go index a215221da1f82..5b1ef2a3fee16 100644 --- a/pkg/server/handler/tikvhandler/tikv_handler.go +++ b/pkg/server/handler/tikvhandler/tikv_handler.go @@ -1443,12 +1443,13 @@ func (h RegionHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { writeError(w, err) return } - hotRead, err := h.ScrapeHotInfo(pdapi.HotRead, schema.AllSchemas()) + ctx := context.Background() + hotRead, err := h.ScrapeHotInfo(ctx, helper.HotRead, schema.AllSchemas()) if err != nil { writeError(w, err) return } - hotWrite, err := h.ScrapeHotInfo(pdapi.HotWrite, schema.AllSchemas()) + hotWrite, err := h.ScrapeHotInfo(ctx, helper.HotWrite, schema.AllSchemas()) if err != nil { writeError(w, err) return diff --git a/pkg/server/internal/testserverclient/server_client.go b/pkg/server/internal/testserverclient/server_client.go index f7d614328ef1a..a05b24b2b8ccb 100644 --- a/pkg/server/internal/testserverclient/server_client.go +++ b/pkg/server/internal/testserverclient/server_client.go @@ -72,7 +72,12 @@ func NewTestServerClient() *TestServerClient { } } -// statusURL return the full URL of a status path +// Addr returns the address of the server. +func (cli *TestServerClient) Addr() string { + return fmt.Sprintf("%s://localhost:%d", cli.StatusScheme, cli.Port) +} + +// StatusURL returns the full URL of a status path func (cli *TestServerClient) StatusURL(path string) string { return fmt.Sprintf("%s://localhost:%d%s", cli.StatusScheme, cli.StatusPort, path) } diff --git a/pkg/store/driver/BUILD.bazel b/pkg/store/driver/BUILD.bazel index cf59533038170..52d540fd3274b 100644 --- a/pkg/store/driver/BUILD.bazel +++ b/pkg/store/driver/BUILD.bazel @@ -8,6 +8,7 @@ go_library( deps = [ "//pkg/executor/importer", "//pkg/kv", + "//pkg/metrics", "//pkg/sessionctx/variable", "//pkg/store/copr", "//pkg/store/driver/error", @@ -23,6 +24,7 @@ go_library( "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//util", "@com_github_tikv_pd_client//:client", + "@com_github_tikv_pd_client//http", "@org_golang_google_grpc//:grpc", "@org_golang_google_grpc//keepalive", "@org_uber_go_zap//:zap", diff --git a/pkg/store/driver/tikv_driver.go b/pkg/store/driver/tikv_driver.go index f73d910195137..facdf3e8cf1dc 100644 --- a/pkg/store/driver/tikv_driver.go +++ b/pkg/store/driver/tikv_driver.go @@ -29,6 +29,7 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/executor/importer" "github.com/pingcap/tidb/pkg/kv" + "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/sessionctx/variable" "github.com/pingcap/tidb/pkg/store/copr" derr "github.com/pingcap/tidb/pkg/store/driver/error" @@ -41,6 +42,7 @@ import ( "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/util" pd "github.com/tikv/pd/client" + pdhttp "github.com/tikv/pd/client/http" "go.uber.org/zap" "google.golang.org/grpc" "google.golang.org/grpc/keepalive" @@ -214,7 +216,8 @@ func (d TiKVDriver) OpenWithOptions(path string, options ...Option) (resStore kv tikv.WithCodec(codec), ) - s, err = tikv.NewKVStore(uuid, pdClient, spkv, &injectTraceClient{Client: rpcClient}, tikv.WithPDHTTPClient(tlsConfig, etcdAddrs)) + s, err = tikv.NewKVStore(uuid, pdClient, spkv, &injectTraceClient{Client: rpcClient}, + tikv.WithPDHTTPClient(etcdAddrs, pdhttp.WithTLSConfig(tlsConfig), pdhttp.WithMetrics(metrics.PDAPIRequestCounter, metrics.PDAPIExecutionHistogram))) if err != nil { return nil, errors.Trace(err) } diff --git a/pkg/store/helper/BUILD.bazel b/pkg/store/helper/BUILD.bazel index 2758031eda44d..4e46ec6b32ff0 100644 --- a/pkg/store/helper/BUILD.bazel +++ b/pkg/store/helper/BUILD.bazel @@ -8,7 +8,6 @@ go_library( deps = [ "//pkg/ddl/placement", "//pkg/kv", - "//pkg/metrics", "//pkg/parser/model", "//pkg/store/driver/error", "//pkg/tablecodec", @@ -23,6 +22,7 @@ go_library( "@com_github_tikv_client_go_v2//tikv", "@com_github_tikv_client_go_v2//tikvrpc", "@com_github_tikv_client_go_v2//txnkv/txnlock", + "@com_github_tikv_pd_client//http", "@org_uber_go_zap//:zap", ], ) @@ -47,6 +47,8 @@ go_test( "@com_github_pingcap_log//:log", "@com_github_stretchr_testify//require", "@com_github_tikv_client_go_v2//testutils", + "@com_github_tikv_client_go_v2//tikv", + "@com_github_tikv_pd_client//http", "@io_opencensus_go//stats/view", "@org_uber_go_goleak//:goleak", "@org_uber_go_zap//:zap", diff --git a/pkg/store/helper/helper.go b/pkg/store/helper/helper.go index 71f860257b129..78d0004acd6f0 100644 --- a/pkg/store/helper/helper.go +++ b/pkg/store/helper/helper.go @@ -36,7 +36,6 @@ import ( "github.com/pingcap/kvproto/pkg/kvrpcpb" "github.com/pingcap/tidb/pkg/ddl/placement" "github.com/pingcap/tidb/pkg/kv" - "github.com/pingcap/tidb/pkg/metrics" "github.com/pingcap/tidb/pkg/parser/model" derr "github.com/pingcap/tidb/pkg/store/driver/error" "github.com/pingcap/tidb/pkg/tablecodec" @@ -48,6 +47,7 @@ import ( "github.com/tikv/client-go/v2/tikv" "github.com/tikv/client-go/v2/tikvrpc" "github.com/tikv/client-go/v2/txnkv/txnlock" + pd "github.com/tikv/pd/client/http" "go.uber.org/zap" ) @@ -80,6 +80,7 @@ type Storage interface { GetMinSafeTS(txnScope string) uint64 GetLockWaits() ([]*deadlockpb.WaitForEntry, error) GetCodec() tikv.Codec + GetPDHTTPClient() pd.Client } // Helper is a middleware to get some information from tikv/pd. It can be used for TiDB's http api or mem table. @@ -96,6 +97,11 @@ func NewHelper(store Storage) *Helper { } } +// PDHTTPClient returns the PD HTTP client. +func (h *Helper) PDHTTPClient() pd.Client { + return h.Store.GetPDHTTPClient() +} + // MaxBackoffTimeoutForMvccGet is a derived value from previous implementation possible experiencing value 5000ms. const MaxBackoffTimeoutForMvccGet = 5000 @@ -276,27 +282,6 @@ func (h *Helper) GetMvccByStartTs(startTS uint64, startKey, endKey kv.Key) (*Mvc } } -// StoreHotRegionInfos records all hog region stores. -// it's the response of PD. -type StoreHotRegionInfos struct { - AsPeer map[uint64]*HotRegionsStat `json:"as_peer"` - AsLeader map[uint64]*HotRegionsStat `json:"as_leader"` -} - -// HotRegionsStat records echo store's hot region. -// it's the response of PD. -type HotRegionsStat struct { - RegionsStat []RegionStat `json:"statistics"` -} - -// RegionStat records each hot region's statistics -// it's the response of PD. -type RegionStat struct { - RegionID uint64 `json:"region_id"` - FlowBytes float64 `json:"flow_bytes"` - HotDegree int `json:"hot_degree"` -} - // RegionMetric presents the final metric output entry. type RegionMetric struct { FlowBytes uint64 `json:"flow_bytes"` @@ -304,9 +289,15 @@ type RegionMetric struct { Count int `json:"region_count"` } +// Constants that used to distinguish the hot region info request. +const ( + HotRead = "read" + HotWrite = "write" +) + // ScrapeHotInfo gets the needed hot region information by the url given. -func (h *Helper) ScrapeHotInfo(rw string, allSchemas []*model.DBInfo) ([]HotTableIndex, error) { - regionMetrics, err := h.FetchHotRegion(rw) +func (h *Helper) ScrapeHotInfo(ctx context.Context, rw string, allSchemas []*model.DBInfo) ([]HotTableIndex, error) { + regionMetrics, err := h.FetchHotRegion(ctx, rw) if err != nil { return nil, err } @@ -314,19 +305,28 @@ func (h *Helper) ScrapeHotInfo(rw string, allSchemas []*model.DBInfo) ([]HotTabl } // FetchHotRegion fetches the hot region information from PD's http api. -func (h *Helper) FetchHotRegion(rw string) (map[uint64]RegionMetric, error) { - var regionResp StoreHotRegionInfos - if err := h.requestPD("FetchHotRegion", "GET", rw, nil, ®ionResp); err != nil { +func (h *Helper) FetchHotRegion(ctx context.Context, rw string) (map[uint64]RegionMetric, error) { + var ( + regionResp *pd.StoreHotPeersInfos + err error + ) + switch rw { + case HotRead: + regionResp, err = h.PDHTTPClient().GetHotReadRegions(ctx) + case HotWrite: + regionResp, err = h.PDHTTPClient().GetHotWriteRegions(ctx) + } + if err != nil { return nil, err } metricCnt := 0 for _, hotRegions := range regionResp.AsLeader { - metricCnt += len(hotRegions.RegionsStat) + metricCnt += len(hotRegions.Stats) } metric := make(map[uint64]RegionMetric, metricCnt) for _, hotRegions := range regionResp.AsLeader { - for _, region := range hotRegions.RegionsStat { - metric[region.RegionID] = RegionMetric{FlowBytes: uint64(region.FlowBytes), MaxHotDegree: region.HotDegree} + for _, region := range hotRegions.Stats { + metric[region.RegionID] = RegionMetric{FlowBytes: uint64(region.ByteRate), MaxHotDegree: region.HotDegree} } } return metric, nil @@ -593,81 +593,6 @@ func (r *RegionFrameRange) GetIndexFrame(tableID, indexID int64, dbName, tableNa return nil } -// RegionPeer stores information of one peer. -type RegionPeer struct { - ID int64 `json:"id"` - StoreID int64 `json:"store_id"` - IsLearner bool `json:"is_learner"` -} - -// RegionEpoch stores the information about its epoch. -type RegionEpoch struct { - ConfVer int64 `json:"conf_ver"` - Version int64 `json:"version"` -} - -// RegionPeerStat stores one field `DownSec` which indicates how long it's down than `RegionPeer`. -type RegionPeerStat struct { - Peer RegionPeer `json:"peer"` - DownSec int64 `json:"down_seconds"` -} - -// RegionInfo stores the information of one region. -type RegionInfo struct { - ID int64 `json:"id"` - StartKey string `json:"start_key"` - EndKey string `json:"end_key"` - Epoch RegionEpoch `json:"epoch"` - Peers []RegionPeer `json:"peers"` - Leader RegionPeer `json:"leader"` - DownPeers []RegionPeerStat `json:"down_peers"` - PendingPeers []RegionPeer `json:"pending_peers"` - WrittenBytes uint64 `json:"written_bytes"` - ReadBytes uint64 `json:"read_bytes"` - ApproximateSize int64 `json:"approximate_size"` - ApproximateKeys int64 `json:"approximate_keys"` - - ReplicationStatus *ReplicationStatus `json:"replication_status,omitempty"` -} - -// RegionsInfo stores the information of regions. -type RegionsInfo struct { - Count int64 `json:"count"` - Regions []RegionInfo `json:"regions"` -} - -// NewRegionsInfo returns RegionsInfo -func NewRegionsInfo() *RegionsInfo { - return &RegionsInfo{ - Regions: make([]RegionInfo, 0), - } -} - -// Merge merged 2 regionsInfo into one -func (r *RegionsInfo) Merge(other *RegionsInfo) *RegionsInfo { - newRegionsInfo := &RegionsInfo{ - Regions: make([]RegionInfo, 0, r.Count+other.Count), - } - m := make(map[int64]RegionInfo, r.Count+other.Count) - for _, region := range r.Regions { - m[region.ID] = region - } - for _, region := range other.Regions { - m[region.ID] = region - } - for _, region := range m { - newRegionsInfo.Regions = append(newRegionsInfo.Regions, region) - } - newRegionsInfo.Count = int64(len(newRegionsInfo.Regions)) - return newRegionsInfo -} - -// ReplicationStatus represents the replication mode status of the region. -type ReplicationStatus struct { - State string `json:"state"` - StateID int64 `json:"state_id"` -} - // TableInfo stores the information of a table or an index type TableInfo struct { DB *model.DBInfo @@ -679,13 +604,13 @@ type TableInfo struct { } type withKeyRange interface { - getStartKey() string - getEndKey() string + GetStartKey() string + GetEndKey() string } // isIntersecting returns true if x and y intersect. func isIntersecting(x, y withKeyRange) bool { - return isIntersectingKeyRange(x, y.getStartKey(), y.getEndKey()) + return isIntersectingKeyRange(x, y.GetStartKey(), y.GetEndKey()) } // isIntersectingKeyRange returns true if [startKey, endKey) intersect with x. @@ -695,22 +620,19 @@ func isIntersectingKeyRange(x withKeyRange, startKey, endKey string) bool { // isBehind returns true is x is behind y func isBehind(x, y withKeyRange) bool { - return isBehindKeyRange(x, y.getStartKey(), y.getEndKey()) + return isBehindKeyRange(x, y.GetStartKey(), y.GetEndKey()) } // IsBefore returns true is x is before [startKey, endKey) func isBeforeKeyRange(x withKeyRange, startKey, _ string) bool { - return x.getEndKey() != "" && x.getEndKey() <= startKey + return x.GetEndKey() != "" && x.GetEndKey() <= startKey } // IsBehind returns true is x is behind [startKey, endKey) func isBehindKeyRange(x withKeyRange, _, endKey string) bool { - return endKey != "" && x.getStartKey() >= endKey + return endKey != "" && x.GetStartKey() >= endKey } -func (r *RegionInfo) getStartKey() string { return r.StartKey } -func (r *RegionInfo) getEndKey() string { return r.EndKey } - // TableInfoWithKeyRange stores table or index informations with its key range. type TableInfoWithKeyRange struct { *TableInfo @@ -718,8 +640,11 @@ type TableInfoWithKeyRange struct { EndKey string } -func (t TableInfoWithKeyRange) getStartKey() string { return t.StartKey } -func (t TableInfoWithKeyRange) getEndKey() string { return t.EndKey } +// GetStartKey implements `withKeyRange` interface. +func (t TableInfoWithKeyRange) GetStartKey() string { return t.StartKey } + +// GetEndKey implements `withKeyRange` interface. +func (t TableInfoWithKeyRange) GetEndKey() string { return t.EndKey } // NewTableWithKeyRange constructs TableInfoWithKeyRange for given table, it is exported only for test. func NewTableWithKeyRange(db *model.DBInfo, table *model.TableInfo) TableInfoWithKeyRange { @@ -745,10 +670,10 @@ func (*Helper) FilterMemDBs(oldSchemas []*model.DBInfo) (schemas []*model.DBInfo // GetRegionsTableInfo returns a map maps region id to its tables or indices. // Assuming tables or indices key ranges never intersect. // Regions key ranges can intersect. -func (h *Helper) GetRegionsTableInfo(regionsInfo *RegionsInfo, schemas []*model.DBInfo) map[int64][]TableInfo { +func (h *Helper) GetRegionsTableInfo(regionsInfo *pd.RegionsInfo, schemas []*model.DBInfo) map[int64][]TableInfo { tables := h.GetTablesInfoWithKeyRange(schemas) - regions := make([]*RegionInfo, 0, len(regionsInfo.Regions)) + regions := make([]*pd.RegionInfo, 0, len(regionsInfo.Regions)) for i := 0; i < len(regionsInfo.Regions); i++ { regions = append(regions, ®ionsInfo.Regions[i]) } @@ -808,21 +733,21 @@ func (*Helper) GetTablesInfoWithKeyRange(schemas []*model.DBInfo) []TableInfoWit } } slices.SortFunc(tables, func(i, j TableInfoWithKeyRange) int { - return cmp.Compare(i.getStartKey(), j.getStartKey()) + return cmp.Compare(i.StartKey, j.StartKey) }) return tables } // ParseRegionsTableInfos parses the tables or indices in regions according to key range. -func (*Helper) ParseRegionsTableInfos(regionsInfo []*RegionInfo, tables []TableInfoWithKeyRange) map[int64][]TableInfo { +func (*Helper) ParseRegionsTableInfos(regionsInfo []*pd.RegionInfo, tables []TableInfoWithKeyRange) map[int64][]TableInfo { tableInfos := make(map[int64][]TableInfo, len(regionsInfo)) if len(tables) == 0 || len(regionsInfo) == 0 { return tableInfos } // tables is sorted in GetTablesInfoWithKeyRange func - slices.SortFunc(regionsInfo, func(i, j *RegionInfo) int { - return cmp.Compare(i.getStartKey(), j.getStartKey()) + slices.SortFunc(regionsInfo, func(i, j *pd.RegionInfo) int { + return cmp.Compare(i.StartKey, j.StartKey) }) idx := 0 @@ -848,171 +773,6 @@ func bytesKeyToHex(key []byte) string { return strings.ToUpper(hex.EncodeToString(key)) } -// GetRegionsInfo gets the region information of current store by using PD's api. -func (h *Helper) GetRegionsInfo() (*RegionsInfo, error) { - var regionsInfo RegionsInfo - err := h.requestPD("GetRegions", "GET", pdapi.Regions, nil, ®ionsInfo) - return ®ionsInfo, err -} - -// GetStoreRegionsInfo gets the region in given store. -func (h *Helper) GetStoreRegionsInfo(storeID uint64) (*RegionsInfo, error) { - var regionsInfo RegionsInfo - err := h.requestPD("GetStoreRegions", "GET", pdapi.StoreRegions+"/"+strconv.FormatUint(storeID, 10), nil, ®ionsInfo) - return ®ionsInfo, err -} - -// GetRegionInfoByID gets the region information of the region ID by using PD's api. -func (h *Helper) GetRegionInfoByID(regionID uint64) (*RegionInfo, error) { - var regionInfo RegionInfo - err := h.requestPD("GetRegionByID", "GET", pdapi.RegionByID+"/"+strconv.FormatUint(regionID, 10), nil, ®ionInfo) - return ®ionInfo, err -} - -// GetRegionsInfoByRange scans region by key range -func (h *Helper) GetRegionsInfoByRange(sk, ek []byte) (*RegionsInfo, error) { - var regionsInfo RegionsInfo - err := h.requestPD("GetRegionByRange", "GET", fmt.Sprintf("%v?key=%s&end_key=%s&limit=-1", pdapi.ScanRegions, - url.QueryEscape(string(sk)), url.QueryEscape(string(ek))), nil, ®ionsInfo) - return ®ionsInfo, err -} - -// GetRegionByKey gets regioninfo by key -func (h *Helper) GetRegionByKey(k []byte) (*RegionInfo, error) { - var regionInfo RegionInfo - err := h.requestPD("GetRegionByKey", "GET", fmt.Sprintf("%v/%v", pdapi.RegionByKey, url.QueryEscape(string(k))), nil, ®ionInfo) - return ®ionInfo, err -} - -// request PD API, decode the response body into res -func (h *Helper) requestPD(apiName, method, uri string, body io.Reader, res interface{}) error { - etcd, ok := h.Store.(kv.EtcdBackend) - if !ok { - return errors.WithStack(errors.New("not implemented")) - } - pdHosts, err := etcd.EtcdAddrs() - if err != nil { - return err - } - if len(pdHosts) == 0 { - return errors.New("pd unavailable") - } - for _, host := range pdHosts { - err = requestPDForOneHost(host, apiName, method, uri, body, res) - if err == nil { - break - } - // Try to request from another PD node when some nodes may down. - } - return err -} - -func requestPDForOneHost(host, apiName, method, uri string, body io.Reader, res interface{}) error { - urlVar := fmt.Sprintf("%s://%s%s", util.InternalHTTPSchema(), host, uri) - logutil.BgLogger().Debug("RequestPD URL", zap.String("url", urlVar)) - req, err := http.NewRequest(method, urlVar, body) - if err != nil { - logutil.BgLogger().Warn("requestPDForOneHost new request failed", - zap.String("url", urlVar), zap.Error(err)) - return errors.Trace(err) - } - start := time.Now() - resp, err := util.InternalHTTPClient().Do(req) - if err != nil { - metrics.PDAPIRequestCounter.WithLabelValues(apiName, "network error").Inc() - logutil.BgLogger().Warn("requestPDForOneHost do request failed", - zap.String("url", urlVar), zap.Error(err)) - return errors.Trace(err) - } - metrics.PDAPIExecutionHistogram.WithLabelValues(apiName).Observe(time.Since(start).Seconds()) - metrics.PDAPIRequestCounter.WithLabelValues(apiName, resp.Status).Inc() - defer func() { - err = resp.Body.Close() - if err != nil { - logutil.BgLogger().Warn("requestPDForOneHost close body failed", - zap.String("url", urlVar), zap.Error(err)) - } - }() - - if resp.StatusCode != http.StatusOK { - logFields := []zap.Field{ - zap.String("url", urlVar), - zap.String("status", resp.Status), - } - - bs, readErr := io.ReadAll(resp.Body) - if readErr != nil { - logFields = append(logFields, zap.NamedError("readBodyError", err)) - } else { - logFields = append(logFields, zap.ByteString("body", bs)) - } - - logutil.BgLogger().Warn("requestPDForOneHost failed with non 200 status", logFields...) - return errors.Errorf("PD request failed with status: '%s'", resp.Status) - } - - err = json.NewDecoder(resp.Body).Decode(res) - if err != nil { - return errors.Trace(err) - } - return nil -} - -// StoresStat stores all information get from PD's api. -type StoresStat struct { - Count int `json:"count"` - Stores []StoreStat `json:"stores"` -} - -// StoreStat stores information of one store. -type StoreStat struct { - Store StoreBaseStat `json:"store"` - Status StoreDetailStat `json:"status"` -} - -// StoreBaseStat stores the basic information of one store. -type StoreBaseStat struct { - ID int64 `json:"id"` - Address string `json:"address"` - State int64 `json:"state"` - StateName string `json:"state_name"` - Version string `json:"version"` - Labels []StoreLabel `json:"labels"` - StatusAddress string `json:"status_address"` - GitHash string `json:"git_hash"` - StartTimestamp int64 `json:"start_timestamp"` -} - -// StoreLabel stores the information of one store label. -type StoreLabel struct { - Key string `json:"key"` - Value string `json:"value"` -} - -// StoreDetailStat stores the detail information of one store. -type StoreDetailStat struct { - Capacity string `json:"capacity"` - Available string `json:"available"` - LeaderCount int64 `json:"leader_count"` - LeaderWeight float64 `json:"leader_weight"` - LeaderScore float64 `json:"leader_score"` - LeaderSize int64 `json:"leader_size"` - RegionCount int64 `json:"region_count"` - RegionWeight float64 `json:"region_weight"` - RegionScore float64 `json:"region_score"` - RegionSize int64 `json:"region_size"` - StartTs time.Time `json:"start_ts"` - LastHeartbeatTs time.Time `json:"last_heartbeat_ts"` - Uptime string `json:"uptime"` -} - -// GetStoresStat gets the TiKV store information by accessing PD's api. -func (h *Helper) GetStoresStat() (*StoresStat, error) { - var storesStat StoresStat - err := h.requestPD("GetStoresStat", "GET", pdapi.Stores, nil, &storesStat) - return &storesStat, err -} - // GetPDAddr return the PD Address. func (h *Helper) GetPDAddr() ([]string, error) { etcd, ok := h.Store.(kv.EtcdBackend) diff --git a/pkg/store/helper/helper_test.go b/pkg/store/helper/helper_test.go index 2be42a85198e0..d4ee767f6ecd7 100644 --- a/pkg/store/helper/helper_test.go +++ b/pkg/store/helper/helper_test.go @@ -16,6 +16,7 @@ package helper_test import ( "bufio" + "context" "crypto/tls" "encoding/json" "fmt" @@ -34,6 +35,8 @@ import ( "github.com/pingcap/tidb/pkg/util/pdapi" "github.com/stretchr/testify/require" "github.com/tikv/client-go/v2/testutils" + "github.com/tikv/client-go/v2/tikv" + pd "github.com/tikv/pd/client/http" "go.opencensus.io/stats/view" "go.uber.org/zap" ) @@ -45,7 +48,7 @@ func TestHotRegion(t *testing.T) { Store: store, RegionCache: store.GetRegionCache(), } - regionMetric, err := h.FetchHotRegion(pdapi.HotRead) + regionMetric, err := h.FetchHotRegion(context.Background(), "read") require.NoError(t, err) expected := map[uint64]helper.RegionMetric{ @@ -89,7 +92,7 @@ func TestTiKVRegionsInfo(t *testing.T) { Store: store, RegionCache: store.GetRegionCache(), } - regionsInfo, err := h.GetRegionsInfo() + regionsInfo, err := h.PDHTTPClient().GetRegions(context.Background()) require.NoError(t, err) require.Equal(t, getMockTiKVRegionsInfo(), regionsInfo) } @@ -102,7 +105,7 @@ func TestTiKVStoresStat(t *testing.T) { RegionCache: store.GetRegionCache(), } - stat, err := h.GetStoresStat() + stat, err := h.PDHTTPClient().GetStores(context.Background()) require.NoError(t, err) data, err := json.Marshal(stat) @@ -138,18 +141,20 @@ func (s *mockStore) Describe() string { } func createMockStore(t *testing.T) (store helper.Storage) { + server := mockPDHTTPServer() + + pdAddrs := []string{"invalid_pd_address", server.URL[len("http://"):]} s, err := mockstore.NewMockStore( mockstore.WithClusterInspector(func(c testutils.Cluster) { mockstore.BootstrapWithMultiRegions(c, []byte("x")) }), + mockstore.WithTiKVOptions(tikv.WithPDHTTPClient(pdAddrs)), ) require.NoError(t, err) - server := mockPDHTTPServer() - store = &mockStore{ s.(helper.Storage), - []string{"invalid_pd_address", server.URL[len("http://"):]}, + pdAddrs, } t.Cleanup(func() { @@ -174,22 +179,22 @@ func mockPDHTTPServer() *httptest.Server { func mockHotRegionResponse(w http.ResponseWriter, _ *http.Request) { w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) - regionsStat := helper.HotRegionsStat{ - RegionsStat: []helper.RegionStat{ + regionsStat := pd.HotPeersStat{ + Stats: []pd.HotPeerStatShow{ { - FlowBytes: 100, + ByteRate: 100, RegionID: 2, HotDegree: 1, }, { - FlowBytes: 200, + ByteRate: 200, RegionID: 4, HotDegree: 2, }, }, } - resp := helper.StoreHotRegionInfos{ - AsLeader: make(map[uint64]*helper.HotRegionsStat), + resp := pd.StoreHotPeersInfos{ + AsLeader: make(pd.StoreHotPeersStat), } resp.AsLeader[0] = ®ionsStat data, err := json.MarshalIndent(resp, "", " ") @@ -258,33 +263,33 @@ func getRegionsTableInfoAns(dbs []*model.DBInfo) map[int64][]helper.TableInfo { return ans } -func getMockTiKVRegionsInfo() *helper.RegionsInfo { - regions := []helper.RegionInfo{ +func getMockTiKVRegionsInfo() *pd.RegionsInfo { + regions := []pd.RegionInfo{ { ID: 1, StartKey: "", EndKey: "12341234", - Epoch: helper.RegionEpoch{ + Epoch: pd.RegionEpoch{ ConfVer: 1, Version: 1, }, - Peers: []helper.RegionPeer{ + Peers: []pd.RegionPeer{ {ID: 2, StoreID: 1}, {ID: 15, StoreID: 51}, {ID: 66, StoreID: 99, IsLearner: true}, {ID: 123, StoreID: 111, IsLearner: true}, }, - Leader: helper.RegionPeer{ + Leader: pd.RegionPeer{ ID: 2, StoreID: 1, }, - DownPeers: []helper.RegionPeerStat{ + DownPeers: []pd.RegionPeerStat{ { - helper.RegionPeer{ID: 66, StoreID: 99, IsLearner: true}, - 120, + Peer: pd.RegionPeer{ID: 66, StoreID: 99, IsLearner: true}, + DownSec: 120, }, }, - PendingPeers: []helper.RegionPeer{ + PendingPeers: []pd.RegionPeer{ {ID: 15, StoreID: 51}, }, WrittenBytes: 100, @@ -297,66 +302,66 @@ func getMockTiKVRegionsInfo() *helper.RegionsInfo { ID: 2, StartKey: "7480000000000000FF295F698000000000FF0000010000000000FA", EndKey: "7480000000000000FF2B5F698000000000FF0000010000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 3, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 3, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 3, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 3, StoreID: 1}, }, // table: 63, record + index: 1, 2 { ID: 3, StartKey: "7480000000000000FF3F5F698000000000FF0000010000000000FA", EndKey: "7480000000000000FF425F698000000000FF0000010000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 4, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 4, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 4, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 4, StoreID: 1}, }, // table: 66, record { ID: 4, StartKey: "7480000000000000FF425F72C000000000FF0000000000000000FA", EndKey: "", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 5, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 5, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 5, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 5, StoreID: 1}, }, // table: 66, record + index: 3 { ID: 5, StartKey: "7480000000000000FF425F698000000000FF0000030000000000FA", EndKey: "7480000000000000FF425F72C000000000FF0000000000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 6, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 6, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 6, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 6, StoreID: 1}, }, // table: 66, index: 1 { ID: 6, StartKey: "7480000000000000FF425F698000000000FF0000010000000000FA", EndKey: "7480000000000000FF425F698000000000FF0000020000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 7, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 7, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 7, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 7, StoreID: 1}, }, // table: 66, index: 2 { ID: 7, StartKey: "7480000000000000FF425F698000000000FF0000020000000000FA", EndKey: "7480000000000000FF425F698000000000FF0000030000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 8, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 8, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 8, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 8, StoreID: 1}, }, // merge region 7, 5 { ID: 8, StartKey: "7480000000000000FF425F698000000000FF0000020000000000FA", EndKey: "7480000000000000FF425F72C000000000FF0000000000000000FA", - Epoch: helper.RegionEpoch{ConfVer: 1, Version: 1}, - Peers: []helper.RegionPeer{{ID: 9, StoreID: 1}}, - Leader: helper.RegionPeer{ID: 9, StoreID: 1}, + Epoch: pd.RegionEpoch{ConfVer: 1, Version: 1}, + Peers: []pd.RegionPeer{{ID: 9, StoreID: 1}}, + Leader: pd.RegionPeer{ID: 9, StoreID: 1}, }, } - return &helper.RegionsInfo{ + return &pd.RegionsInfo{ Count: int64(len(regions)), Regions: regions, } @@ -387,24 +392,24 @@ func mockStoreStatResponse(w http.ResponseWriter, _ *http.Request) { if err != nil { log.Panic("mock tikv store api response failed", zap.Error(err)) } - storesStat := helper.StoresStat{ + storesStat := pd.StoresInfo{ Count: 1, - Stores: []helper.StoreStat{ + Stores: []pd.StoreInfo{ { - Store: helper.StoreBaseStat{ + Store: pd.MetaStore{ ID: 1, Address: "127.0.0.1:20160", State: 0, StateName: "Up", Version: "3.0.0-beta", - Labels: []helper.StoreLabel{ + Labels: []pd.StoreLabel{ { Key: "test", Value: "test", }, }, }, - Status: helper.StoreDetailStat{ + Status: pd.StoreStatus{ Capacity: "60 GiB", Available: "100 GiB", LeaderCount: 10, @@ -415,8 +420,8 @@ func mockStoreStatResponse(w http.ResponseWriter, _ *http.Request) { RegionWeight: 999999.999999, RegionScore: 999999.999999, RegionSize: 1000, - StartTs: startTs, - LastHeartbeatTs: lastHeartbeatTs, + StartTS: startTs, + LastHeartbeatTS: lastHeartbeatTs, Uptime: "1h30m", }, }, diff --git a/pkg/store/mockstore/mockstore.go b/pkg/store/mockstore/mockstore.go index 4d2bed7ce0bc7..d8945e5401056 100644 --- a/pkg/store/mockstore/mockstore.go +++ b/pkg/store/mockstore/mockstore.go @@ -92,6 +92,7 @@ type mockOptions struct { txnLocalLatches uint storeType StoreType ddlCheckerHijack bool + tikvOptions []tikv.Option } // MockTiKVStoreOption is used to control some behavior of mock tikv. @@ -106,6 +107,13 @@ func WithMultipleOptions(opts ...MockTiKVStoreOption) MockTiKVStoreOption { } } +// WithTiKVOptions sets KV options. +func WithTiKVOptions(opts ...tikv.Option) MockTiKVStoreOption { + return func(args *mockOptions) { + args.tikvOptions = opts + } +} + // WithClientHijacker hijacks KV client's behavior, makes it easy to simulate the network // problem between TiDB and TiKV. func WithClientHijacker(hijacker func(tikv.Client) tikv.Client) MockTiKVStoreOption { diff --git a/pkg/store/mockstore/tikv.go b/pkg/store/mockstore/tikv.go index d2cfedfe609ba..eb56eae38f0a4 100644 --- a/pkg/store/mockstore/tikv.go +++ b/pkg/store/mockstore/tikv.go @@ -32,7 +32,10 @@ func newMockTikvStore(opt *mockOptions) (kv.Storage, error) { } opt.clusterInspector(cluster) - kvstore, err := tikv.NewTestTiKVStore(newClientRedirector(client), pdClient, opt.clientHijacker, opt.pdClientHijacker, opt.txnLocalLatches) + kvstore, err := tikv.NewTestTiKVStore( + newClientRedirector(client), pdClient, + opt.clientHijacker, opt.pdClientHijacker, + opt.txnLocalLatches, opt.tikvOptions...) if err != nil { return nil, err } diff --git a/pkg/store/mockstore/unistore.go b/pkg/store/mockstore/unistore.go index d234f7cbe8893..2526eb70677dd 100644 --- a/pkg/store/mockstore/unistore.go +++ b/pkg/store/mockstore/unistore.go @@ -33,7 +33,10 @@ func newUnistore(opts *mockOptions) (kv.Storage, error) { Client: pdClient, } - kvstore, err := tikv.NewTestTiKVStore(newClientRedirector(client), pdClient, opts.clientHijacker, opts.pdClientHijacker, opts.txnLocalLatches) + kvstore, err := tikv.NewTestTiKVStore( + newClientRedirector(client), pdClient, + opts.clientHijacker, opts.pdClientHijacker, + opts.txnLocalLatches, opts.tikvOptions...) if err != nil { return nil, err } diff --git a/pkg/util/pdapi/const.go b/pkg/util/pdapi/const.go index 243c297942a35..be4d201caada3 100644 --- a/pkg/util/pdapi/const.go +++ b/pkg/util/pdapi/const.go @@ -22,15 +22,12 @@ import ( // The following constants are the APIs of PD server. const ( HotRead = "/pd/api/v1/hotspot/regions/read" - HotWrite = "/pd/api/v1/hotspot/regions/write" HotHistory = "/pd/api/v1/hotspot/regions/history" Regions = "/pd/api/v1/regions" StoreRegions = "/pd/api/v1/regions/store" - ScanRegions = "/pd/api/v1/regions/key" EmptyRegions = "/pd/api/v1/regions/check/empty-region" AccelerateSchedule = "/pd/api/v1/regions/accelerate-schedule" RegionByID = "/pd/api/v1/region/id" - RegionByKey = "/pd/api/v1/region/key" store = "/pd/api/v1/store" Stores = "/pd/api/v1/stores" Status = "/pd/api/v1/status"