Skip to content

Commit

Permalink
Introduce caller ID into the HTTP client
Browse files Browse the repository at this point in the history
Signed-off-by: JmPotato <[email protected]>
  • Loading branch information
JmPotato committed Dec 4, 2023
1 parent 259435d commit 5b0fe23
Showing 1 changed file with 39 additions and 16 deletions.
55 changes: 39 additions & 16 deletions client/http/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,8 @@ type Client interface {
GetMinResolvedTSByStoresIDs(context.Context, []uint64) (uint64, map[uint64]uint64, error)

/* Client-related methods */
// WithCallerID sets and returns a new client with the given caller ID.
WithCallerID(string) Client
// WithRespHandler sets and returns a new client with the given HTTP response handler.
// This allows the caller to customize how the response is handled, including error handling logic.
// Additionally, it is important for the caller to handle the content of the response body properly
Expand All @@ -89,11 +91,20 @@ type Client interface {

var _ Client = (*client)(nil)

type client struct {
// clientInner is the inner implementation of the PD HTTP client, which will
// implement some internal logics, such as HTTP client, service discovery, etc.
type clientInner struct {
pdAddrs []string
tlsConf *tls.Config
cli *http.Client
}

type client struct {
// Wrap this struct is to make sure the inner implementation
// won't be exposed and cloud be consistent during the copy.
inner *clientInner

callerID string
respHandler func(resp *http.Response, res interface{}) error

requestCounter *prometheus.CounterVec
Expand All @@ -106,15 +117,15 @@ type ClientOption func(c *client)
// WithHTTPClient configures the client with the given initialized HTTP client.
func WithHTTPClient(cli *http.Client) ClientOption {
return func(c *client) {
c.cli = cli
c.inner.cli = cli
}
}

// WithTLSConfig configures the client with the given TLS config.
// This option won't work if the client is configured with WithHTTPClient.
func WithTLSConfig(tlsConf *tls.Config) ClientOption {
return func(c *client) {
c.tlsConf = tlsConf
c.inner.tlsConf = tlsConf
}
}

Expand All @@ -134,7 +145,7 @@ func NewClient(
pdAddrs []string,
opts ...ClientOption,
) Client {
c := &client{}
c := &client{inner: &clientInner{}}
// Apply the options first.
for _, opt := range opts {
opt(c)
Expand All @@ -143,22 +154,22 @@ func NewClient(
for i, addr := range pdAddrs {
if !strings.HasPrefix(addr, httpScheme) {
var scheme string
if c.tlsConf != nil {
if c.inner.tlsConf != nil {
scheme = httpsScheme
} else {
scheme = httpScheme
}
pdAddrs[i] = fmt.Sprintf("%s://%s", scheme, addr)
}
}
c.pdAddrs = pdAddrs
c.inner.pdAddrs = pdAddrs
// Init the HTTP client if it's not configured.
if c.cli == nil {
c.cli = &http.Client{Timeout: defaultTimeout}
if c.tlsConf != nil {
if c.inner.cli == nil {
c.inner.cli = &http.Client{Timeout: defaultTimeout}
if c.inner.tlsConf != nil {
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.TLSClientConfig = c.tlsConf
c.cli.Transport = transport
transport.TLSClientConfig = c.inner.tlsConf
c.inner.cli.Transport = transport
}
}

Expand All @@ -167,12 +178,22 @@ func NewClient(

// Close closes the HTTP client.
func (c *client) Close() {
if c.cli != nil {
c.cli.CloseIdleConnections()
if c.inner == nil {
return
}
if c.inner.cli != nil {
c.inner.cli.CloseIdleConnections()
}
log.Info("[pd] http client closed")
}

// WithCallerID sets and returns a new client with the given caller ID.
func (c *client) WithCallerID(callerID string) Client {
newClient := *c
newClient.callerID = callerID
return &newClient
}

// WithRespHandler sets and returns a new client with the given HTTP response handler.
func (c *client) WithRespHandler(
handler func(resp *http.Response, res interface{}) error,
Expand Down Expand Up @@ -218,8 +239,8 @@ func (c *client) requestWithRetry(
err error
addr string
)
for idx := 0; idx < len(c.pdAddrs); idx++ {
addr = c.pdAddrs[idx]
for idx := 0; idx < len(c.inner.pdAddrs); idx++ {
addr = c.inner.pdAddrs[idx]
err = c.request(ctx, name, fmt.Sprintf("%s%s", addr, uri), method, body, res, headerOpts...)
if err == nil {
break
Expand All @@ -239,6 +260,8 @@ func (c *client) request(
logFields := []zap.Field{
zap.String("name", name),
zap.String("url", url),
zap.String("method", method),
zap.String("caller-id", c.callerID),
}
log.Debug("[pd] request the http url", logFields...)
req, err := http.NewRequestWithContext(ctx, method, url, body)
Expand All @@ -250,7 +273,7 @@ func (c *client) request(
opt(req.Header)
}
start := time.Now()
resp, err := c.cli.Do(req)
resp, err := c.inner.cli.Do(req)
if err != nil {
c.reqCounter(name, networkErrorStatus)
log.Error("[pd] do http request failed", append(logFields, zap.Error(err))...)
Expand Down

0 comments on commit 5b0fe23

Please sign in to comment.