Skip to content

Commit

Permalink
rewriting scraper goroutine scheduling
Browse files Browse the repository at this point in the history
  • Loading branch information
yangjunmyfm192085 committed Feb 25, 2022
1 parent 8c08596 commit f7ec7d3
Show file tree
Hide file tree
Showing 9 changed files with 325 additions and 293 deletions.
2 changes: 2 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/prometheus/prometheus v2.5.0+incompatible
github.com/spf13/cobra v1.2.1
github.com/spf13/pflag v1.0.5
go.uber.org/goleak v1.1.10
golang.org/x/perf v0.0.0-20210220033136-40a54f11e909
k8s.io/api v0.23.2
k8s.io/apimachinery v0.23.2
Expand Down Expand Up @@ -92,6 +93,7 @@ require (
go.uber.org/zap v1.19.0 // indirect
golang.org/x/crypto v0.0.0-20210817164053-32db794688a5 // indirect
golang.org/x/exp v0.0.0-20210220032938-85be41e4509f // indirect
golang.org/x/lint v0.0.0-20210508222113-6edffad5e616 // indirect
golang.org/x/mod v0.4.2 // indirect
golang.org/x/net v0.0.0-20211209124913-491a49abca63 // indirect
golang.org/x/oauth2 v0.0.0-20210819190943-2bc19b11175f // indirect
Expand Down
195 changes: 112 additions & 83 deletions pkg/scraper/scraper.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,20 @@ package scraper

import (
"context"
"math/rand"
"fmt"
"sync"
"time"

"sigs.k8s.io/metrics-server/pkg/scraper/client"

corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/labels"
apitypes "k8s.io/apimachinery/pkg/types"
v1listers "k8s.io/client-go/listers/core/v1"
"k8s.io/component-base/metrics"
"k8s.io/klog/v2"

"sigs.k8s.io/metrics-server/pkg/storage"
)

const (
maxDelayMs = 4 * 1000
delayPerSourceMs = 8
)

var (
requestDuration = metrics.NewHistogramVec(
&metrics.HistogramOpts{
Expand Down Expand Up @@ -83,110 +77,145 @@ func RegisterScraperMetrics(registrationFunc func(metrics.Registerable) error) e
return nil
}

func NewScraper(nodeLister v1listers.NodeLister, client client.KubeletMetricsGetter, scrapeTimeout time.Duration) *scraper {
return &scraper{
nodeLister: nodeLister,
kubeletClient: client,
scrapeTimeout: scrapeTimeout,
func NewManageNodeScraper(client client.KubeletMetricsGetter, scrapeTimeout time.Duration, metricResolution time.Duration, store storage.Storage) *manageNodeScraper {
return &manageNodeScraper{
kubeletClient: client,
scrapeTimeout: scrapeTimeout,
metricResolution: metricResolution,
stop: map[apitypes.UID]chan struct{}{},
isWorking: map[apitypes.UID]bool{},
storage: store,
}
}

type scraper struct {
nodeLister v1listers.NodeLister
kubeletClient client.KubeletMetricsGetter
scrapeTimeout time.Duration
type manageNodeScraper struct {
nodeLock sync.Mutex
kubeletClient client.KubeletMetricsGetter
scrapeTimeout time.Duration
metricResolution time.Duration
// Tracks all running per-node goroutines - per-node goroutine will be
// processing updates received through its corresponding channel.
stop map[apitypes.UID]chan struct{}
// Track the current state of per-node goroutines.
// Currently all update request for a given node coming when another
// update of this node is being processed are ignored.
isWorking map[apitypes.UID]bool

storage storage.Storage
}

var _ Scraper = (*scraper)(nil)

// NodeInfo contains the information needed to identify and connect to a particular node
// (node name and preferred address).
type NodeInfo struct {
Name string
ConnectAddress string
}

func (c *scraper) Scrape(baseCtx context.Context) *storage.MetricsBatch {
nodes, err := c.nodeLister.List(labels.Everything())
if err != nil {
// report the error and continue on in case of partial results
klog.ErrorS(err, "Failed to list nodes")
func (m *manageNodeScraper) AddNodeScraper(node *corev1.Node) error {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
if working, exists := m.isWorking[node.UID]; exists && working {
klog.V(1).ErrorS(fmt.Errorf("Scrape in node is already running"), "node", klog.KObj(node))
return fmt.Errorf("Scrape in node is already running, node:%v", node)
}
klog.V(1).InfoS("Scraping metrics from nodes", "nodeCount", len(nodes))

responseChannel := make(chan *storage.MetricsBatch, len(nodes))
defer close(responseChannel)

startTime := myClock.Now()

// TODO(serathius): re-evaluate this code -- do we really need to stagger fetches like this?
delayMs := delayPerSourceMs * len(nodes)
if delayMs > maxDelayMs {
delayMs = maxDelayMs
}

for _, node := range nodes {
go func(node *corev1.Node) {
// Prevents network congestion.
sleepDuration := time.Duration(rand.Intn(delayMs)) * time.Millisecond
time.Sleep(sleepDuration)
// make the timeout a bit shorter to account for staggering, so we still preserve
// the overall timeout
ctx, cancelTimeout := context.WithTimeout(baseCtx, c.scrapeTimeout-sleepDuration)
defer cancelTimeout()
klog.V(2).InfoS("Scraping node", "node", klog.KObj(node))
m, err := c.collectNode(ctx, node)
if err != nil {
klog.ErrorS(err, "Failed to scrape node", "node", klog.KObj(node))
}
responseChannel <- m
}(node)
klog.V(1).InfoS("Start scrape metrics from node", "node", klog.KObj(node))
stopCh, exists := m.stop[node.UID]
if !exists {
stopCh = make(chan struct{})
m.stop[node.UID] = stopCh
}

res := &storage.MetricsBatch{
Nodes: map[string]storage.MetricsPoint{},
Pods: map[apitypes.NamespacedName]storage.PodMetricsPoint{},
}

for range nodes {
srcBatch := <-responseChannel
if srcBatch == nil {
continue
go func() {
ticker := time.NewTicker(m.metricResolution)
defer ticker.Stop()

res, _ := m.ScrapeData(node)
if res != nil {
m.StoreData(node, res)
} else {
klog.V(1).InfoS("Scrape no value from node", "node", klog.KObj(node))
}
for nodeName, nodeMetricsPoint := range srcBatch.Nodes {
if _, nodeFind := res.Nodes[nodeName]; nodeFind {
klog.ErrorS(nil, "Got duplicate node point", "node", klog.KRef("", nodeName))
continue
for {
select {
case <-ticker.C:
res, _ := m.ScrapeData(node)
if res != nil {
m.StoreData(node, res)
} else {
klog.V(1).InfoS("Scrape no value from node", "node", klog.KObj(node))
}
case <-stopCh:
klog.V(1).InfoS("Scrape metrics from node exit", "node", klog.KObj(node))
return
}
res.Nodes[nodeName] = nodeMetricsPoint
}
for podRef, podMetricsPoint := range srcBatch.Pods {
if _, podFind := res.Pods[podRef]; podFind {
klog.ErrorS(nil, "Got duplicate pod point", "pod", klog.KRef(podRef.Namespace, podRef.Name))
continue
}
res.Pods[podRef] = podMetricsPoint
}
}()
if _, exists := m.isWorking[node.UID]; !exists || !m.isWorking[node.UID] {
m.isWorking[node.UID] = true
}

klog.V(1).InfoS("Scrape finished", "duration", myClock.Since(startTime), "nodeCount", len(res.Nodes), "podCount", len(res.Pods))
return res
return nil
}

func (c *scraper) collectNode(ctx context.Context, node *corev1.Node) (*storage.MetricsBatch, error) {
func (m *manageNodeScraper) ScrapeData(node *corev1.Node) (*storage.MetricsBatch, error) {
startTime := myClock.Now()
baseCtx, cancel := context.WithCancel(context.Background())
defer cancel()
klog.V(6).InfoS("Scraping metrics from node", "node", klog.KObj(node))
ctx, cancelTimeout := context.WithTimeout(baseCtx, m.scrapeTimeout)
defer cancelTimeout()
defer func() {
requestDuration.WithLabelValues(node.Name).Observe(float64(myClock.Since(startTime)) / float64(time.Second))
lastRequestTime.WithLabelValues(node.Name).Set(float64(myClock.Now().Unix()))
}()
ms, err := c.kubeletClient.GetMetrics(ctx, node)

ms, err := m.kubeletClient.GetMetrics(ctx, node)
if err != nil {
klog.ErrorS(err, "Failed to scrape node", "node", klog.KObj(node))
requestTotal.WithLabelValues("false").Inc()
return nil, err
}
requestTotal.WithLabelValues("true").Inc()
return ms, nil
res := &storage.MetricsBatch{
Nodes: map[string]storage.MetricsPoint{},
Pods: map[apitypes.NamespacedName]storage.PodMetricsPoint{},
}
for nodeName, nodeMetricsPoint := range ms.Nodes {
if _, nodeFind := res.Nodes[nodeName]; nodeFind {
klog.ErrorS(nil, "Got duplicate node point", "node", klog.KRef("", nodeName))
continue
}
res.Nodes[nodeName] = nodeMetricsPoint
}
for podRef, podMetricsPoint := range ms.Pods {
if _, podFind := res.Pods[podRef]; podFind {
klog.ErrorS(nil, "Got duplicate pod point", "pod", klog.KRef(podRef.Namespace, podRef.Name))
continue
}
res.Pods[podRef] = podMetricsPoint
}
return res, nil
}
func (m *manageNodeScraper) StoreData(node *corev1.Node, res *storage.MetricsBatch) {
klog.V(6).InfoS("Storing metrics from node", "node", klog.KObj(node))
m.storage.Store(res)
}
func (m *manageNodeScraper) UpdateNodeScraper(node *corev1.Node) error {
if working, exists := m.isWorking[node.UID]; exists && working {
klog.V(1).ErrorS(fmt.Errorf("Scrape in node is already running"), "node", klog.KObj(node))
return fmt.Errorf("Scrape in node is already running, node:%v", node)
} else {
return m.AddNodeScraper(node)
}
}

func (m *manageNodeScraper) DeleteNodeScraper(node *corev1.Node) {
m.nodeLock.Lock()
defer m.nodeLock.Unlock()
if working, exists := m.isWorking[node.UID]; exists && working {
klog.V(1).InfoS("Stop scrape metrics from node", "node", klog.KObj(node))
delete(m.isWorking, node.UID)
if _, exists := m.stop[node.UID]; exists {
m.stop[node.UID] <- struct{}{}
delete(m.stop, node.UID)
}
}
}

type clock interface {
Expand Down
Loading

0 comments on commit f7ec7d3

Please sign in to comment.