-
Notifications
You must be signed in to change notification settings - Fork 728
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Signed-off-by: JmPotato <[email protected]>
- Loading branch information
Showing
5 changed files
with
269 additions
and
1 deletion.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,21 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package http | ||
|
||
// The following constants are the paths of PD HTTP APIs. | ||
const ( | ||
HotRead = "/pd/api/v1/hotspot/regions/read" | ||
HotWrite = "/pd/api/v1/hotspot/regions/write" | ||
) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,17 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package http | ||
|
||
// TODO: support the customized backoff strategy. |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,174 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package http | ||
|
||
import ( | ||
"context" | ||
"crypto/tls" | ||
"encoding/json" | ||
"fmt" | ||
"io" | ||
"net/http" | ||
"strings" | ||
"time" | ||
|
||
"github.com/pingcap/errors" | ||
"github.com/pingcap/log" | ||
"go.uber.org/zap" | ||
) | ||
|
||
const defaultTimeout = 30 * time.Second | ||
|
||
// HTTPClient is a PD (Placement Driver) HTTP client. | ||
type HTTPClient interface { | ||
GetHotReadRegions(context.Context) (*StoreHotPeersInfos, error) | ||
GetHotWriteRegions(context.Context) (*StoreHotPeersInfos, error) | ||
} | ||
|
||
var _ HTTPClient = (*httpClient)(nil) | ||
|
||
type httpClient struct { | ||
pdAddrs []string | ||
tlsConf *tls.Config | ||
cli *http.Client | ||
} | ||
|
||
// HTTPClientOption configures the HTTP client. | ||
type HTTPClientOption func(hc *httpClient) | ||
|
||
// WithHTTPClient configures the client with the given initialized HTTP client. | ||
func WithHTTPClient(cli *http.Client) HTTPClientOption { | ||
return func(hc *httpClient) { | ||
hc.cli = cli | ||
} | ||
} | ||
|
||
// WithTLSConfig configures the client with the given TLS config. | ||
func WithTLSConfig(tlsConf *tls.Config) HTTPClientOption { | ||
return func(hc *httpClient) { | ||
hc.tlsConf = tlsConf | ||
} | ||
} | ||
|
||
// NewHTTPClient creates a PD HTTP client with the given PD addresses and TLS config. | ||
func NewHTTPClient( | ||
pdAddrs []string, | ||
opts ...HTTPClientOption, | ||
) HTTPClient { | ||
hc := &httpClient{} | ||
// Apply the options first. | ||
for _, opt := range opts { | ||
opt(hc) | ||
} | ||
// Normalize the addresses with correct scheme prefix. | ||
for i, addr := range pdAddrs { | ||
if !strings.HasPrefix(addr, "http") { | ||
if hc.tlsConf != nil { | ||
addr = "https://" + addr | ||
} else { | ||
addr = "http://" + addr | ||
} | ||
pdAddrs[i] = addr | ||
} | ||
} | ||
hc.pdAddrs = pdAddrs | ||
// Init the HTTP client. | ||
if hc.cli != nil { | ||
cli := &http.Client{Timeout: defaultTimeout} | ||
if hc.tlsConf != nil { | ||
transport := http.DefaultTransport.(*http.Transport).Clone() | ||
transport.TLSClientConfig = hc.tlsConf | ||
cli.Transport = transport | ||
} | ||
} | ||
|
||
return hc | ||
} | ||
|
||
func (hc *httpClient) pdAddr() string { | ||
// TODO: support the customized PD address selection strategy. | ||
return hc.pdAddrs[0] | ||
} | ||
|
||
func (hc *httpClient) request( | ||
ctx context.Context, | ||
name, method, uri string, | ||
res interface{}, | ||
) error { | ||
reqURL := fmt.Sprintf("%s%s", hc.pdAddr(), uri) | ||
logFields := []zap.Field{ | ||
zap.String("name", name), | ||
zap.String("url", reqURL), | ||
zap.String("method", method), | ||
} | ||
log.Debug("[pd] request the http url", logFields...) | ||
req, err := http.NewRequestWithContext(ctx, method, reqURL, nil) | ||
if err != nil { | ||
log.Error("[pd] create http request failed", append(logFields, zap.Error(err))...) | ||
return errors.Trace(err) | ||
} | ||
// TODO: integrate the metrics. | ||
resp, err := hc.cli.Do(req) | ||
if err != nil { | ||
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...) | ||
return errors.Trace(err) | ||
} | ||
defer func() { | ||
err = resp.Body.Close() | ||
if err != nil { | ||
log.Warn("[pd] close http response body failed", append(logFields, zap.Error(err))...) | ||
} | ||
}() | ||
|
||
if resp.StatusCode != http.StatusOK { | ||
logFields = append(logFields, zap.String("status", resp.Status)) | ||
|
||
bs, readErr := io.ReadAll(resp.Body) | ||
if readErr != nil { | ||
logFields = append(logFields, zap.NamedError("read-body-error", err)) | ||
} else { | ||
logFields = append(logFields, zap.ByteString("body", bs)) | ||
} | ||
|
||
log.Error("[pd] request failed with a non-200 status", logFields...) | ||
return errors.Errorf("request pd http api failed with status: '%s'", resp.Status) | ||
} | ||
|
||
err = json.NewDecoder(resp.Body).Decode(res) | ||
if err != nil { | ||
return errors.Trace(err) | ||
} | ||
return nil | ||
} | ||
|
||
// GetHotReadRegions gets the hot read region statistics info. | ||
func (hc *httpClient) GetHotReadRegions(ctx context.Context) (*StoreHotPeersInfos, error) { | ||
var hotReadRegions StoreHotPeersInfos | ||
err := hc.request(ctx, "GetHotReadRegions", http.MethodGet, HotRead, &hotReadRegions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &hotReadRegions, nil | ||
} | ||
|
||
// GetHotWriteRegions gets the hot write region statistics info. | ||
func (hc *httpClient) GetHotWriteRegions(ctx context.Context) (*StoreHotPeersInfos, error) { | ||
var hotWriteRegions StoreHotPeersInfos | ||
err := hc.request(ctx, "GetHotWriteRegions", http.MethodGet, HotWrite, &hotWriteRegions) | ||
if err != nil { | ||
return nil, err | ||
} | ||
return &hotWriteRegions, nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,56 @@ | ||
// Copyright 2023 TiKV Project Authors. | ||
// | ||
// Licensed under the Apache License, Version 2.0 (the "License"); | ||
// you may not use this file except in compliance with the License. | ||
// You may obtain a copy of the License at | ||
// | ||
// http://www.apache.org/licenses/LICENSE-2.0 | ||
// | ||
// Unless required by applicable law or agreed to in writing, software | ||
// distributed under the License is distributed on an "AS IS" BASIS, | ||
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
// See the License for the specific language governing permissions and | ||
// limitations under the License. | ||
|
||
package http | ||
|
||
import "time" | ||
|
||
// NOTICE: the structures below are copied from the PD API definitions. | ||
// Please make sure the consistency if any change happens to the PD API. | ||
|
||
// StoreHotPeersInfos is used to get human-readable description for hot regions. | ||
type StoreHotPeersInfos struct { | ||
AsPeer StoreHotPeersStat `json:"as_peer"` | ||
AsLeader StoreHotPeersStat `json:"as_leader"` | ||
} | ||
|
||
// StoreHotPeersStat is used to record the hot region statistics group by store. | ||
type StoreHotPeersStat map[uint64]*HotPeersStat | ||
|
||
// HotPeersStat records all hot regions statistics | ||
type HotPeersStat struct { | ||
StoreByteRate float64 `json:"store_bytes"` | ||
StoreKeyRate float64 `json:"store_keys"` | ||
StoreQueryRate float64 `json:"store_query"` | ||
TotalBytesRate float64 `json:"total_flow_bytes"` | ||
TotalKeysRate float64 `json:"total_flow_keys"` | ||
TotalQueryRate float64 `json:"total_flow_query"` | ||
Count int `json:"regions_count"` | ||
Stats []HotPeerStatShow `json:"statistics"` | ||
} | ||
|
||
// HotPeerStatShow records the hot region statistics for output | ||
type HotPeerStatShow struct { | ||
StoreID uint64 `json:"store_id"` | ||
Stores []uint64 `json:"stores"` | ||
IsLeader bool `json:"is_leader"` | ||
IsLearner bool `json:"is_learner"` | ||
RegionID uint64 `json:"region_id"` | ||
HotDegree int `json:"hot_degree"` | ||
ByteRate float64 `json:"flow_bytes"` | ||
KeyRate float64 `json:"flow_keys"` | ||
QueryRate float64 `json:"flow_query"` | ||
AntiCount int `json:"anti_count"` | ||
LastUpdateTime time.Time `json:"last_update_time,omitempty"` | ||
} |