Skip to content

Commit

Permalink
Merge pull request linuxkerneltravel#524 from ESWZY/traffic-mgr/dynam…
Browse files Browse the repository at this point in the history
…ic-monitoring

Dynamic cluster monitoring and selection weight modification for traffic manager
  • Loading branch information
LinkinPF authored Sep 19, 2023
2 parents 895a960 + 3dfaf5f commit e73f8a3
Show file tree
Hide file tree
Showing 8 changed files with 258 additions and 242 deletions.
2 changes: 1 addition & 1 deletion eBPF_Supermarket/TrafficManager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,4 @@ test:
kubectl get pods -owide -A
kubectl get services
sudo make build
sudo go test -v ./...
sudo go test -cover -coverprofile=coverage.txt -v ./...
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
package automatic

import (
"fmt"
"strconv"
"testing"
"time"

"lmp/eTrafficManager/bpf"
"lmp/eTrafficManager/pkg/k8s"
"lmp/eTrafficManager/pkg/metrics"
)

func TestAutomatic(t *testing.T) {
fmt.Println("Starting...")
namespace := "default"
serviceName := "sisyphe-sfs"
interval := time.Second * 3
promHost, err := metrics.GetPromHost()
if err != nil || promHost == "" {
fmt.Println("[WARNING]", err)
fmt.Println("[WARNING] Skipping test...")
return
}
cs := metrics.NodeExporterClusterMetrics{Address: promHost}
err = cs.Update()
if err != nil {
fmt.Println("[WARNING]", err)
fmt.Println("[WARNING] Skipping test...")
return
}

programs, err := bpf.LoadProgram()
defer programs.Close()
if err != nil {
fmt.Println("[ERROR] Loading program failed:", err)
return
}
err = programs.Attach()
if err != nil {
fmt.Println("[ERROR] Attaching failed:", err)
}
service, pods, err := k8s.GetPodByService(serviceName, namespace, "")
if err != nil {
panic(err.Error())
}

for {
programs.InsertServiceItem(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)), len(pods.Items), bpf.RandomAction)
fmt.Printf("Service: %s, Service IP: %s, Ports: %v\n", service.Name, service.Spec.ClusterIPs, service.Spec.Ports)
fmt.Printf("Pods:\n")

var totalAvailableRate float64

for _, pod := range pods.Items {
p := pod.Spec.Containers[0].Ports[0]
fmt.Printf("- %s, IP: %s, Ports: %v\n", pod.Name, pod.Status.PodIP, strconv.Itoa(int(p.ContainerPort))+"|"+strconv.Itoa(int(p.HostPort))+"|"+string(p.Protocol))
hostIP := pod.Status.HostIP
nodeMetric := cs.Query(hostIP).(metrics.NodeExporterNodeMetric)
fmt.Println(nodeMetric.AvailableRate())
totalAvailableRate += nodeMetric.AvailableRate()
}

totalPercentage := 0.0
for i := 0; i < len(pods.Items); i++ {
hostIP := pods.Items[i].Status.HostIP
nodeMetric := cs.Query(hostIP).(metrics.NodeExporterNodeMetric)
possibility := nodeMetric.AvailableRate() / totalAvailableRate
totalPercentage += possibility
programs.AutoInsertBackend(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)), pods.Items[i].Status.PodIP, strconv.Itoa(int(pods.Items[i].Spec.Containers[0].Ports[0].ContainerPort)), i+1, possibility, totalPercentage)
}

time.Sleep(interval)
service, pods, err = k8s.GetPodByService(serviceName, namespace, "")
if err != nil {
panic(err.Error())
}
cs.Update()
s := bpf.Service{
IP: service.Spec.ClusterIPs[0],
Port: strconv.Itoa(int(service.Spec.Ports[0].Port)),
}
programs.AutoDeleteService(s, nil)
}

}
6 changes: 3 additions & 3 deletions eBPF_Supermarket/TrafficManager/bpf/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,8 +277,8 @@ func (p *Programs) AutoDeleteService(service Service, affiliatedServiceList []Se
return true
}

// DeclareBackendID gets an increasing backendID
func (p *Programs) DeclareBackendID() int {
// declareBackendID gets an increasing backendID
func (p *Programs) declareBackendID() int {
// TODO: concurrency protection
backendID := p.currentIndex
p.currentIndex++
Expand All @@ -287,7 +287,7 @@ func (p *Programs) DeclareBackendID() int {

// AutoInsertBackend inserts an organized backend item into map
func (p *Programs) AutoInsertBackend(serviceIP string, servicePortStr string, backendIP string, backendPortStr string, slotIndex int, possibility float64, possibilityUpperBound float64) (bool, int) {
backendID := p.DeclareBackendID()
backendID := p.declareBackendID()
servicePort, _ := strconv.Atoi(servicePortStr)
backendPort, _ := strconv.Atoi(backendPortStr)
ok := p.InsertBackendItem(serviceIP, servicePort, backendIP, backendPort, backendID, slotIndex, possibility, possibilityUpperBound)
Expand Down
2 changes: 2 additions & 0 deletions eBPF_Supermarket/TrafficManager/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ go 1.20
require (
github.com/cilium/cilium v1.14.0-snapshot.6
github.com/cilium/ebpf v0.11.0
github.com/prometheus/client_golang v1.16.0
github.com/prometheus/common v0.42.0
k8s.io/api v0.27.2
k8s.io/apimachinery v0.27.2
k8s.io/client-go v0.27.2
Expand Down
242 changes: 4 additions & 238 deletions eBPF_Supermarket/TrafficManager/go.sum

Large diffs are not rendered by default.

15 changes: 15 additions & 0 deletions eBPF_Supermarket/TrafficManager/pkg/metrics/metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package metrics

type Metric interface {
}

type NodeMetric interface {
Update(cm ClusterMetric) error
AvailableRate() float64
}

type ClusterMetric interface {
Update() error
AvailableRate() float64
Query(name string) Metric
}
121 changes: 121 additions & 0 deletions eBPF_Supermarket/TrafficManager/pkg/metrics/node_exporter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
package metrics

import (
"context"
"fmt"
"log"
"os/exec"
"strings"
"sync"
"time"

"github.com/prometheus/client_golang/api"
"github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)

type NodeExporterMetric struct {
Load1 float64
}

type NodeExporterNodeMetric struct {
name string
load1 float64
mu sync.Mutex
}

func (nm *NodeExporterNodeMetric) Update(metric ClusterMetric) error {
nm.mu.Lock()
defer nm.mu.Unlock()
nm.load1 = metric.Query(nm.name).(NodeExporterNodeMetric).load1
return nil
}

func (nm *NodeExporterNodeMetric) AvailableRate() float64 {
if nm.load1 > 10 {
return 0
} else {
return (10 - nm.load1) / 10
}
}

type NodeExporterClusterMetrics struct {
Address string
load1 float64
data map[string]float64
mu sync.Mutex
}

func (cm *NodeExporterClusterMetrics) Update() error {
log.Printf("[INFO] Fetching Node Exporter data...")
load1Data, err := cm.GetLoad1Data(cm.Address)
if err != nil {
return fmt.Errorf("error fetching load1 data: %v", err)
}
cm.mu.Lock()
cm.data = load1Data
cm.mu.Unlock()
return nil
}

func (cm *NodeExporterClusterMetrics) AvailableRate() float64 {
return 1
}

func (cm *NodeExporterClusterMetrics) Query(name string) Metric {
cm.mu.Lock()
defer cm.mu.Unlock()
return NodeExporterNodeMetric{name: name, load1: cm.data[name], mu: sync.Mutex{}}
}

func (cm *NodeExporterClusterMetrics) GetLoad1Data(prometheusAddress string) (map[string]float64, error) {
client, err := api.NewClient(api.Config{
Address: prometheusAddress,
})
if err != nil {
return nil, err
}

query := `avg_over_time(node_load1{job="node-exporter"}[1m])`

promAPI := v1.NewAPI(client)
result, _, err := promAPI.QueryRange(
context.TODO(),
query,
v1.Range{Start: time.Now().Add(-1 * time.Minute), End: time.Now(), Step: time.Minute},
)
if err != nil {
return nil, err
}

load1Data := make(map[string]float64)
matrix, ok := result.(model.Matrix)
if !ok {
return nil, fmt.Errorf("unexpected result type")
}
totalLoad1 := 0.0
for _, sample := range matrix {
node := strings.Replace(string(sample.Metric["instance"]), ":9100", "", 1)
load1 := float64(sample.Values[0].Value)
totalLoad1 += load1
load1Data[node] = load1
}

cm.load1 = totalLoad1
return load1Data, nil
}

func GetPromHost() (string, error) {
// command := `kubectl get nodes -o=jsonpath='{.items[?(@.metadata.labels.node-role\.kubernetes\.io/control-plane=="")].status.addresses[?(@.type=="InternalIP")].address}'`
cmd := exec.Command("kubectl", "get", "nodes", "-o=jsonpath={.items[?(@.metadata.labels.node-role\\.kubernetes\\.io/control-plane==\"\")].status.addresses[?(@.type==\"InternalIP\")].address}")

output, err := cmd.Output()
if err != nil {
return "", fmt.Errorf("error executing kubectl command: %s", err)
}

address := strings.Trim(string(output), " \n")
promHost := fmt.Sprintf("http://%s:9090", address)
fmt.Println("Prometheus URL:", promHost)
return promHost, nil
}
26 changes: 26 additions & 0 deletions eBPF_Supermarket/TrafficManager/pkg/metrics/node_exporter_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package metrics

import (
"fmt"
"os"
"testing"
)

func TestNodeExporterMetric(t *testing.T) {
if os.ExpandEnv("GITHUB_ACTIONS") != "" {
return
}
promHost, err := GetPromHost()
if err != nil {
t.Errorf("%s", err)
}
cm := NodeExporterClusterMetrics{}
a, err := cm.GetLoad1Data(promHost)
if err != nil {
t.Errorf("%s", err)
return
}
for s, f := range a {
fmt.Println(s, f)
}
}

0 comments on commit e73f8a3

Please sign in to comment.