Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cmd/diag/command/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,6 +160,7 @@ func newCollectCmd() *cobra.Command {
cmd.Flags().IntVar(&cOpt.MetricsLimit, "metricslimit", 10000, "metric size limit of single request, specified in series*hour per request")
cmd.Flags().StringVar(&metricsConf, "metricsconfig", "", "config file of metricsfilter")
cmd.Flags().StringSliceVar(&labels, "metricslabel", nil, "only collect metrics that match labels")
cmd.Flags().IntVar(&cOpt.MetricsMinInterval, "metrics-min-interval", 120, "the minimum interval of a single request in seconds")
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is 120s the new default value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, if you think it's best to keep it the same as before, I can change it to 5 minutes.

cmd.Flags().StringVar(&promEndpoint, "overwrite-prometheus-endpoint", "", "Prometheus endpoint")
cmd.Flags().StringSliceVarP(&cOpt.Header, "prometheus-header", "H", nil, "custom headers of http request when collect metrics")
cmd.Flags().StringVarP(&cOpt.Dir, "output", "o", "", "output directory of collected data")
Expand Down
46 changes: 24 additions & 22 deletions collector/collect.go
Original file line number Diff line number Diff line change
Expand Up @@ -108,28 +108,29 @@ type BaseOptions struct {

// CollectOptions contains the options defining which type of data to collect
type CollectOptions struct {
RawRequest interface{} // raw collect command or request
Mode string // the cluster is deployed with what type of tool
DiagMode string // run diag collect at command line mode or server mode
ProfileName string // the name of a pre-defined collecting profile
Collectors CollectTree // struct to show which collector is enabled
MetricsFilter []string // prefix of metrics to collect
MetricsExclude []string // prefix of metrics to exclude
MetricsLabel map[string]string // label to filte metrics
Dir string // target directory to store collected data
Limit int // rate limit of SCP
MetricsLimit int // query limit of one request
PerfDuration int // seconds: profile time(s), default is 30s.
CompressScp bool // compress of files during collecting
CompressMetrics bool // compress of files during collecting
RawMonitor bool // collect raw data for metrics
ExitOnError bool // break the process and exit when an error occur
ExtendedAttrs map[string]string // extended attributes used for manual collecting mode
ExplainSQLPath string // File path for explain sql
ExplainSqls []string // explain sqls
CurrDB string
Header []string
UsePortForward bool // use portforward when call api inside k8s cluster
RawRequest interface{} // raw collect command or request
Mode string // the cluster is deployed with what type of tool
DiagMode string // run diag collect at command line mode or server mode
ProfileName string // the name of a pre-defined collecting profile
Collectors CollectTree // struct to show which collector is enabled
MetricsFilter []string // prefix of metrics to collect
MetricsExclude []string // prefix of metrics to exclude
MetricsLabel map[string]string // label to filte metrics
Dir string // target directory to store collected data
Limit int // rate limit of SCP
MetricsLimit int // query limit of one request
MetricsMinInterval int // query minimum interval of one request, default is 1min.
PerfDuration int // seconds: profile time(s), default is 30s.
CompressScp bool // compress of files during collecting
CompressMetrics bool // compress of files during collecting
RawMonitor bool // collect raw data for metrics
ExitOnError bool // break the process and exit when an error occur
ExtendedAttrs map[string]string // extended attributes used for manual collecting mode
ExplainSQLPath string // File path for explain sql
ExplainSqls []string // explain sqls
CurrDB string
Header []string
UsePortForward bool // use portforward when call api inside k8s cluster
}

// CollectStat is estimated size stats of data to be collected
Expand Down Expand Up @@ -301,6 +302,7 @@ func (m *Manager) CollectClusterInfo(
filter: cOpt.MetricsFilter,
exclude: cOpt.MetricsExclude,
limit: cOpt.MetricsLimit,
minInterval: cOpt.MetricsMinInterval,
compress: cOpt.CompressMetrics,
customHeader: cOpt.Header,
portForward: cOpt.UsePortForward,
Expand Down
33 changes: 26 additions & 7 deletions collector/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
"fmt"
"io"
"maps"
"net"
"net/http"
"net/url"
"os"
Expand Down Expand Up @@ -54,7 +55,7 @@ const (
subdirMetrics = "metrics"
subdirRaw = "raw"
maxQueryRange = 120 * 60 // 120min
minQueryRange = 5 * 60 // 5min
minQueryRange = 1 * 60 // 1min
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is not used anymore

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the minimum protection value. Because if the value is lower than this, the proportion of metadata will be high. If the segmentation is too small, it would be meaningless.

)

type collectMonitor struct {
Expand Down Expand Up @@ -177,6 +178,7 @@ type MetricCollectOptions struct {
filter []string
exclude []string
limit int // series*min per query
minInterval int // the minimum interval of a single request in seconds
compress bool
customHeader []string
endpoint string
Expand Down Expand Up @@ -287,8 +289,11 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err
mu := sync.Mutex{}

key := c.endpoint
if c.minInterval < minQueryRange {
c.minInterval = minQueryRange
}
if _, ok := bars[key]; !ok {
bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s", key))
bars[key] = mb.AddBar(fmt.Sprintf(" - Querying server %s, min interval %v", key, c.minInterval))
}

if m.diagMode == DiagModeCmd {
Expand All @@ -312,7 +317,19 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err
return err
}

client := &http.Client{Timeout: time.Second * time.Duration(c.opt.APITimeout)}
client := &http.Client{
Transport: &http.Transport{
MaxIdleConns: qLimit * 2,
MaxIdleConnsPerHost: 10,
IdleConnTimeout: 30 * time.Second,
ExpectContinueTimeout: 1 * time.Second,
DialContext: (&net.Dialer{
Timeout: 5 * time.Second,
KeepAlive: 30 * time.Second,
}).DialContext,
},
Timeout: time.Second * time.Duration(c.opt.APITimeout),
}
for _, mtc := range c.metrics {
go func(tok *utils.Token, mtc string) {
bars[key].UpdateDisplay(&progress.DisplayProps{
Expand All @@ -322,7 +339,7 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err

tsEnd, _ := utils.ParseTime(c.GetBaseOptions().ScrapeEnd)
tsStart, _ := utils.ParseTime(c.GetBaseOptions().ScrapeBegin)
collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.compress, c.customHeader, "")
collectMetric(m.logger, client, key, tsStart, tsEnd, mtc, c.label, c.resultDir, c.limit, c.minInterval, c.compress, c.customHeader, "")

mu.Lock()
done++
Expand All @@ -337,6 +354,7 @@ func (c *MetricCollectOptions) Collect(m *Manager, topo *models.TiDBCluster) err
tl.Put(tok)
}(tl.Get(), mtc)
}
m.logger.Infof("Collected metrics ...")

tl.Wait()

Expand Down Expand Up @@ -407,6 +425,7 @@ func collectMetric(
label map[string]string,
resultDir string,
speedlimit int,
minInterval int,
compress bool,
customHeader []string,
instance string,
Expand Down Expand Up @@ -466,7 +485,7 @@ func collectMetric(
newLabel := make(map[string]string)
maps.Copy(newLabel, label)
newLabel["instance"] = instance
collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, compress, customHeader, instance)
collectMetric(l, c, promAddr, beginTime, endTime, mtc, newLabel, resultDir, speedlimit, minInterval, compress, customHeader, instance)
}
}
return
Expand All @@ -485,8 +504,8 @@ func collectMetric(
if block > maxQueryRange {
block = maxQueryRange
}
if block < minQueryRange {
block = minQueryRange
if block < minInterval {
block = minInterval
}

l.Debugf("Dumping metric %s-%s-%s%s...", mtc, beginTime.Format(time.RFC3339), endTime.Format(time.RFC3339), nameSuffix)
Expand Down