From 273e66d3645f0d7b8ed70e8b492e32ac54274517 Mon Sep 17 00:00:00 2001 From: woodsaj Date: Fri, 15 Dec 2017 15:42:24 +0800 Subject: [PATCH] refactor to allow metric collection to be split between multiple tasks - For large k8s clusters, it is not possible to collect all metrics in one go. This commit allows metrics to be collected on a per namespace and or per node basis. - Additionally, by allowing per node metrics each node in the cluster can collect its own pod and container metrics. --- kubestate/client.go | 32 +++++-- kubestate/kubestate.go | 184 ++++++++++++++++++++++++++++------------- 2 files changed, 151 insertions(+), 65 deletions(-) diff --git a/kubestate/client.go b/kubestate/client.go index 9a13454..57209ce 100644 --- a/kubestate/client.go +++ b/kubestate/client.go @@ -4,6 +4,7 @@ import ( "flag" . "github.com/intelsdi-x/snap-plugin-utilities/logger" + "k8s.io/apimachinery/pkg/fields" "k8s.io/client-go/kubernetes" "k8s.io/client-go/pkg/api/v1" v1batch "k8s.io/client-go/pkg/apis/batch/v1" @@ -50,18 +51,35 @@ var newClient = func(incluster bool, kubeconfigpath string) (*Client, error) { return c, nil } -func (c *Client) GetPods() (*v1.PodList, error) { - return c.clientset.Core().Pods("").List(v1.ListOptions{}) +func (c *Client) GetPods(namespace, node string) (*v1.PodList, error) { + opts := v1.ListOptions{} + if node != "*" { + opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String() + } + if namespace == "*" { + namespace = "" + } + return c.clientset.Core().Pods(namespace).List(opts) } -func (c *Client) GetNodes() (*v1.NodeList, error) { +func (c *Client) GetNodes(node string) (*v1.NodeList, error) { + opts := v1.ListOptions{} + if node != "*" { + opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", node).String() + } return c.clientset.Core().Nodes().List(v1.ListOptions{}) } -func (c *Client) GetDeployments() (*v1beta1.DeploymentList, error) { - return c.clientset.Extensions().Deployments("").List(v1.ListOptions{}) +func (c *Client) GetDeployments(namespace string) (*v1beta1.DeploymentList, error) { + if namespace == "*" { + namespace = "" + } + return c.clientset.Extensions().Deployments(namespace).List(v1.ListOptions{}) } -func (c *Client) GetJobs() (*v1batch.JobList, error) { - return c.clientset.BatchClient.Jobs("").List(v1.ListOptions{}) +func (c *Client) GetJobs(namespace string) (*v1batch.JobList, error) { + if namespace == "*" { + namespace = "" + } + return c.clientset.BatchClient.Jobs(namespace).List(v1.ListOptions{}) } diff --git a/kubestate/kubestate.go b/kubestate/kubestate.go index 067d640..8c147aa 100644 --- a/kubestate/kubestate.go +++ b/kubestate/kubestate.go @@ -45,77 +45,145 @@ func (n *Kubestate) CollectMetrics(mts []plugin.Metric) ([]plugin.Metric, error) return metrics, nil } +type MetricScope struct { + Resource string + Node string + Namespace string +} + var collect = func(client *Client, mts []plugin.Metric) ([]plugin.Metric, error) { metrics := make([]plugin.Metric, 0) - if shouldCollectMetricsFor("pod", mts) || shouldCollectMetricsFor("container", mts) { - pods, err := client.GetPods() - if err != nil { - return nil, err - } - - podCollector := new(podCollector) - for _, p := range pods.Items { - podMetrics, _ := podCollector.Collect(mts, p) - metrics = append(metrics, podMetrics...) - } - } - - if shouldCollectMetricsFor("node", mts) { - nodes, err := client.GetNodes() - if err != nil { - return nil, err - } - - nodeCollector := new(nodeCollector) - for _, n := range nodes.Items { - nodeMetrics, _ := nodeCollector.Collect(mts, n) - metrics = append(metrics, nodeMetrics...) - } - } - - if shouldCollectMetricsFor("deployment", mts) { - deployments, err := client.GetDeployments() - if err != nil { - return nil, err + // group the request metrics by resource, namespace and node + // + // grafanalabs.kubestate.pod.. + // grafanalabs.kubestate.container.. + // grafanalabs.kubestate.deployment. + // grafanalabs.kubestate.job. + // grafanalabs.kubestate.node. + // + groupedMts := make(map[MetricScope][]plugin.Metric) + for _, mt := range mts { + ns := mt.Namespace.Strings() + if len(ns) < 3 { + continue } - - deploymentCollector := new(deploymentCollector) - for _, d := range deployments.Items { - deploymentMetrics, _ := deploymentCollector.Collect(mts, d) - metrics = append(metrics, deploymentMetrics...) + switch ns[2] { + case "pod": + namespace := "*" + node := "*" + if len(ns) >= 4 { + namespace = ns[4] + } + if len(ns) >= 5 { + node = ns[5] + } + scope := MetricScope{ + Resource: "pod/container", + Namespace: namespace, + Node: node, + } + groupedMts[scope] = append(groupedMts[scope], mt) + case "container": + namespace := "*" + node := "*" + if len(ns) >= 4 { + namespace = ns[4] + } + if len(ns) >= 5 { + node = ns[5] + } + scope := MetricScope{ + Resource: "pod/container", + Namespace: namespace, + Node: node, + } + groupedMts[scope] = append(groupedMts[scope], mt) + case "deployment": + namespace := "*" + if len(ns) >= 4 { + namespace = ns[4] + } + scope := MetricScope{ + Resource: "deployment", + Namespace: namespace, + } + groupedMts[scope] = append(groupedMts[scope], mt) + case "job": + namespace := "*" + if len(ns) >= 4 { + namespace = ns[4] + } + scope := MetricScope{ + Resource: "job", + Namespace: namespace, + } + groupedMts[scope] = append(groupedMts[scope], mt) + case "node": + node := "*" + if len(ns) >= 4 { + node = ns[4] + } + scope := MetricScope{ + Resource: "node", + Node: node, + } + groupedMts[scope] = append(groupedMts[scope], mt) } } - if shouldCollectMetricsFor("job", mts) { - jobs, err := client.GetJobs() - if err != nil { - return nil, err - } - - jobCollector := new(jobCollector) - for _, d := range jobs.Items { - jobMetrics, _ := jobCollector.Collect(mts, d) - metrics = append(metrics, jobMetrics...) + for scope, scopedMts := range groupedMts { + switch scope.Resource { + case "pod/container": + pods, err := client.GetPods(scope.Namespace, scope.Node) + if err != nil { + return nil, err + } + + podCollector := new(podCollector) + for _, p := range pods.Items { + podMetrics, _ := podCollector.Collect(scopedMts, p) + metrics = append(metrics, podMetrics...) + } + case "deployment": + deployments, err := client.GetDeployments(scope.Namespace) + if err != nil { + return nil, err + } + + deploymentCollector := new(deploymentCollector) + for _, d := range deployments.Items { + deploymentMetrics, _ := deploymentCollector.Collect(scopedMts, d) + metrics = append(metrics, deploymentMetrics...) + } + case "job": + jobs, err := client.GetJobs(scope.Namespace) + if err != nil { + return nil, err + } + + jobCollector := new(jobCollector) + for _, d := range jobs.Items { + jobMetrics, _ := jobCollector.Collect(scopedMts, d) + metrics = append(metrics, jobMetrics...) + } + case "node": + nodes, err := client.GetNodes(scope.Node) + if err != nil { + return nil, err + } + + nodeCollector := new(nodeCollector) + for _, n := range nodes.Items { + nodeMetrics, _ := nodeCollector.Collect(scopedMts, n) + metrics = append(metrics, nodeMetrics...) + } } } return metrics, nil } -func shouldCollectMetricsFor(metricType string, mts []plugin.Metric) bool { - for _, mt := range mts { - ns := mt.Namespace.Strings() - if len(ns) < 3 { - continue - } - if ns[2] == metricType { - return true - } - } - return false -} - func boolInt(b bool) int { if b { return 1