Skip to content
This repository has been archived by the owner on Feb 21, 2020. It is now read-only.

refactor to allow metric collection to be split between multiple tasks #10

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 25 additions & 7 deletions kubestate/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"flag"

. "github.com/intelsdi-x/snap-plugin-utilities/logger"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/pkg/api/v1"
v1batch "k8s.io/client-go/pkg/apis/batch/v1"
Expand Down Expand Up @@ -50,18 +51,35 @@ var newClient = func(incluster bool, kubeconfigpath string) (*Client, error) {
return c, nil
}

func (c *Client) GetPods() (*v1.PodList, error) {
return c.clientset.Core().Pods("").List(v1.ListOptions{})
func (c *Client) GetPods(namespace, node string) (*v1.PodList, error) {
opts := v1.ListOptions{}
if node != "*" {
opts.FieldSelector = fields.OneTermEqualSelector("spec.nodeName", node).String()
}
if namespace == "*" {
namespace = ""
}
return c.clientset.Core().Pods(namespace).List(opts)
}

func (c *Client) GetNodes() (*v1.NodeList, error) {
func (c *Client) GetNodes(node string) (*v1.NodeList, error) {
opts := v1.ListOptions{}
if node != "*" {
opts.FieldSelector = fields.OneTermEqualSelector("metadata.name", node).String()
}
return c.clientset.Core().Nodes().List(v1.ListOptions{})
}

func (c *Client) GetDeployments() (*v1beta1.DeploymentList, error) {
return c.clientset.Extensions().Deployments("").List(v1.ListOptions{})
func (c *Client) GetDeployments(namespace string) (*v1beta1.DeploymentList, error) {
if namespace == "*" {
namespace = ""
}
return c.clientset.Extensions().Deployments(namespace).List(v1.ListOptions{})
}

func (c *Client) GetJobs() (*v1batch.JobList, error) {
return c.clientset.BatchClient.Jobs("").List(v1.ListOptions{})
func (c *Client) GetJobs(namespace string) (*v1batch.JobList, error) {
if namespace == "*" {
namespace = ""
}
return c.clientset.BatchClient.Jobs(namespace).List(v1.ListOptions{})
}
184 changes: 126 additions & 58 deletions kubestate/kubestate.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,77 +45,145 @@ func (n *Kubestate) CollectMetrics(mts []plugin.Metric) ([]plugin.Metric, error)
return metrics, nil
}

type MetricScope struct {
Resource string
Node string
Namespace string
}

var collect = func(client *Client, mts []plugin.Metric) ([]plugin.Metric, error) {
metrics := make([]plugin.Metric, 0)

if shouldCollectMetricsFor("pod", mts) || shouldCollectMetricsFor("container", mts) {
pods, err := client.GetPods()
if err != nil {
return nil, err
}

podCollector := new(podCollector)
for _, p := range pods.Items {
podMetrics, _ := podCollector.Collect(mts, p)
metrics = append(metrics, podMetrics...)
}
}

if shouldCollectMetricsFor("node", mts) {
nodes, err := client.GetNodes()
if err != nil {
return nil, err
}

nodeCollector := new(nodeCollector)
for _, n := range nodes.Items {
nodeMetrics, _ := nodeCollector.Collect(mts, n)
metrics = append(metrics, nodeMetrics...)
}
}

if shouldCollectMetricsFor("deployment", mts) {
deployments, err := client.GetDeployments()
if err != nil {
return nil, err
// group the request metrics by resource, namespace and node
//
// grafanalabs.kubestate.pod.<namespace>.<node>
// grafanalabs.kubestate.container.<namespace>.<node>
// grafanalabs.kubestate.deployment.<namespace>
// grafanalabs.kubestate.job.<namespace>
// grafanalabs.kubestate.node.<node>
//
groupedMts := make(map[MetricScope][]plugin.Metric)
for _, mt := range mts {
ns := mt.Namespace.Strings()
if len(ns) < 3 {
continue
}

deploymentCollector := new(deploymentCollector)
for _, d := range deployments.Items {
deploymentMetrics, _ := deploymentCollector.Collect(mts, d)
metrics = append(metrics, deploymentMetrics...)
switch ns[2] {
case "pod":
namespace := "*"
node := "*"
if len(ns) >= 4 {
namespace = ns[4]
}
if len(ns) >= 5 {
node = ns[5]
}
scope := MetricScope{
Resource: "pod/container",
Namespace: namespace,
Node: node,
}
groupedMts[scope] = append(groupedMts[scope], mt)
case "container":
namespace := "*"
node := "*"
if len(ns) >= 4 {
namespace = ns[4]
}
if len(ns) >= 5 {
node = ns[5]
}
scope := MetricScope{
Resource: "pod/container",
Namespace: namespace,
Node: node,
}
groupedMts[scope] = append(groupedMts[scope], mt)
case "deployment":
namespace := "*"
if len(ns) >= 4 {
namespace = ns[4]
}
scope := MetricScope{
Resource: "deployment",
Namespace: namespace,
}
groupedMts[scope] = append(groupedMts[scope], mt)
case "job":
namespace := "*"
if len(ns) >= 4 {
namespace = ns[4]
}
scope := MetricScope{
Resource: "job",
Namespace: namespace,
}
groupedMts[scope] = append(groupedMts[scope], mt)
case "node":
node := "*"
if len(ns) >= 4 {
node = ns[4]
}
scope := MetricScope{
Resource: "node",
Node: node,
}
groupedMts[scope] = append(groupedMts[scope], mt)
}
}

if shouldCollectMetricsFor("job", mts) {
jobs, err := client.GetJobs()
if err != nil {
return nil, err
}

jobCollector := new(jobCollector)
for _, d := range jobs.Items {
jobMetrics, _ := jobCollector.Collect(mts, d)
metrics = append(metrics, jobMetrics...)
for scope, scopedMts := range groupedMts {
switch scope.Resource {
case "pod/container":
pods, err := client.GetPods(scope.Namespace, scope.Node)
if err != nil {
return nil, err
}

podCollector := new(podCollector)
for _, p := range pods.Items {
podMetrics, _ := podCollector.Collect(scopedMts, p)
metrics = append(metrics, podMetrics...)
}
case "deployment":
deployments, err := client.GetDeployments(scope.Namespace)
if err != nil {
return nil, err
}

deploymentCollector := new(deploymentCollector)
for _, d := range deployments.Items {
deploymentMetrics, _ := deploymentCollector.Collect(scopedMts, d)
metrics = append(metrics, deploymentMetrics...)
}
case "job":
jobs, err := client.GetJobs(scope.Namespace)
if err != nil {
return nil, err
}

jobCollector := new(jobCollector)
for _, d := range jobs.Items {
jobMetrics, _ := jobCollector.Collect(scopedMts, d)
metrics = append(metrics, jobMetrics...)
}
case "node":
nodes, err := client.GetNodes(scope.Node)
if err != nil {
return nil, err
}

nodeCollector := new(nodeCollector)
for _, n := range nodes.Items {
nodeMetrics, _ := nodeCollector.Collect(scopedMts, n)
metrics = append(metrics, nodeMetrics...)
}
}
}

return metrics, nil
}

func shouldCollectMetricsFor(metricType string, mts []plugin.Metric) bool {
for _, mt := range mts {
ns := mt.Namespace.Strings()
if len(ns) < 3 {
continue
}
if ns[2] == metricType {
return true
}
}
return false
}

func boolInt(b bool) int {
if b {
return 1
Expand Down