Skip to content

Commit

Permalink
feat(trafficManager): fast weighted backend selection (#523)
Browse files Browse the repository at this point in the history
  • Loading branch information
ESWZY authored Sep 18, 2023
1 parent 0f443aa commit c8ddc0e
Show file tree
Hide file tree
Showing 8 changed files with 78 additions and 35 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ type SiegeResponse struct {
func siegeService(siegePodName string, service *v1.Service) (*SiegeResponse, error) {
log.Println("Start Sieging")
// kubectl exec siege -- siege -c 5 -r 20000 http://sisyphe-sfs.default.svc.cluster.local
out, err := exec.Command("kubectl", "exec", siegePodName, "--", "siege", "-c", "20", "-r", "20000", "http://"+service.Spec.ClusterIPs[0]).Output()
out, err := exec.Command("kubectl", "exec", siegePodName, "--", "siege", "-c", "20", "-r", "30000", "http://"+service.Spec.ClusterIPs[0]).Output()
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -93,9 +93,11 @@ func TestServicePerformance(t *testing.T) {

// fmt.Println(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)))
programs.InsertServiceItem(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)), len(pods.Items), bpf.RandomAction)
totalPercentage := 0.0
for i := 0; i < len(pods.Items); i++ {
// fmt.Println(strconv.Itoa(int(pods.Items[i].Spec.Containers[0].Ports[0].ContainerPort)))
programs.AutoInsertBackend(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)), pods.Items[i].Status.PodIP, strconv.Itoa(int(pods.Items[i].Spec.Containers[0].Ports[0].ContainerPort)), i+1, float64(1/float64(len(pods.Items))))
totalPercentage += 1 / float64(len(pods.Items))
programs.AutoInsertBackend(service.Spec.ClusterIP, strconv.Itoa(int(service.Spec.Ports[0].Port)), pods.Items[i].Status.PodIP, strconv.Itoa(int(pods.Items[i].Spec.Containers[0].Ports[0].ContainerPort)), i+1, 1/float64(len(pods.Items)), totalPercentage)
}

err = programs.Attach()
Expand Down
7 changes: 6 additions & 1 deletion eBPF_Supermarket/TrafficManager/acceptance/weight_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,13 @@ func TestWeight(t *testing.T) {
}

progs.InsertServiceItem(targetIP, targetPort, len(serverPortList), bpf.WeightedAction)
totalPercentage := 0.0
for i := 0; i < len(serverPortList); i++ {
progs.AutoInsertBackend(targetIP, targetPort, "127.0.0.1", serverPortList[i], i+1, tc.weights[i])
totalPercentage += tc.weights[i]
progs.AutoInsertBackend(targetIP, targetPort, "127.0.0.1", serverPortList[i], i+1, tc.weights[i], totalPercentage)
}
if math.Abs(totalPercentage-1) > 0.005 {
fmt.Printf("[WARNING] Total weight for service %s:%s is not 1, but %f.\n", targetIP, targetPort, totalPercentage)
}

err = progs.Attach()
Expand Down
10 changes: 5 additions & 5 deletions eBPF_Supermarket/TrafficManager/bpf/bpf_connect_bpf.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file modified eBPF_Supermarket/TrafficManager/bpf/bpf_connect_bpf.o
Binary file not shown.
8 changes: 4 additions & 4 deletions eBPF_Supermarket/TrafficManager/bpf/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -54,19 +54,19 @@ static int sock4_forward_entry(struct bpf_sock_addr *ctx)
if(svc->action == SVC_ACTION_NORMAL) { // default action
key.backend_slot = sock_select_random_slot(svc->count);
} else if(svc->action == SVC_ACTION_WEIGHT) {
int slot_index = sock_select_weighted_slot(svc->count, key);
int slot_index = sock_fast_select_weighted_slot(svc->count, key);
if(slot_index < 0)
return slot_index;
key.backend_slot = slot_index;
} else if(svc->action & SVC_ACTION_REDIRECT_SVC) {
int slot_index = sock_select_weighted_slot(svc->count + 1, key);
int slot_index = sock_fast_select_weighted_slot(svc->count + 1, key);
if(slot_index > svc->count) {
key.backend_slot = slot_index;
// 3. lookup backend slot from constructed backend key
backend_slot = lookup_lb4_backend_slot(&key);
if (!backend_slot)
return -ENOENT;
bpf_printk("3.5. find backend slot: %d", backend_slot->backend_id);
// bpf_printk("3.5. find backend slot: %d", backend_slot->backend_id);
backend_id = backend_slot->backend_id;

// 4. find the info of real backend
Expand All @@ -76,7 +76,7 @@ static int sock4_forward_entry(struct bpf_sock_addr *ctx)
key.backend_slot = 0;
key.address = backend->address;
key.dport = backend->port;
slot_index = sock_select_weighted_slot(svc->count, key);
slot_index = sock_fast_select_weighted_slot(svc->count, key);
if(slot_index < 0)
return slot_index;
key.backend_slot = slot_index;
Expand Down
31 changes: 20 additions & 11 deletions eBPF_Supermarket/TrafficManager/bpf/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ package bpf
import (
"bufio"
"fmt"
"math"
"net"
"os"
"strconv"
Expand Down Expand Up @@ -146,7 +147,7 @@ func (p *Programs) InsertServiceItem(serviceIP string, servicePort string, backe
}
serviceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePortInt), u8proto.ANY, 0, 0)
// use index 0 to indicate service item, of course the possibility is zero, which is never be used
serviceValue := NewService4Value(Backend4Key{0}, uint16(backendNumber), Possibility{0}, action)
serviceValue := NewService4Value(Backend4Key{0}, uint16(backendNumber), Possibility{0, 0}, action)
err = p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(serviceKey.ToNetwork(), serviceValue.ToNetwork(), ebpf.UpdateAny)
if err != nil {
fmt.Println("[ERROR] InsertServiceItem: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update failed: ", err)
Expand All @@ -170,12 +171,12 @@ func (p *Programs) DeleteServiceItem(serviceIP string, servicePort int, slotInde
}

// InsertBackendItem inserts backend item into service map and backend map
func (p *Programs) InsertBackendItem(serviceIP string, servicePort int, backendIP string, backendPort int, backendID int, slotIndex int, possibility float64) bool {
func (p *Programs) InsertBackendItem(serviceIP string, servicePort int, backendIP string, backendPort int, backendID int, slotIndex int, possibility float64, possibilityUpperBound float64) bool {
// Use allocated backendID to point to and store backend information
backendKey := Backend4Key{uint32(backendID)}
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(slotIndex))
// We don't specify action for backendItem, but it seems that it can be expanded in the future.
backendServiceValue := NewService4Value(backendKey, 0, Possibility{possibility}, DefaultAction)
backendServiceValue := NewService4Value(backendKey, 0, Possibility{possibility, possibilityUpperBound}, DefaultAction)
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(backendServiceKey.ToNetwork(), backendServiceValue.ToNetwork(), ebpf.UpdateAny)
if err != nil {
fmt.Println("[ERROR] InsertBackendItem: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update failed:", err)
Expand Down Expand Up @@ -210,12 +211,20 @@ func (p *Programs) AutoInsertService(service Service, backendList []Backend, act
if action != RedirectAction {
affiliatedServiceList = nil
}

totalPercentage := 0.0
p.InsertServiceItem(service.IP, service.Port, len(backendList), action) // +len(affiliatedServiceList)
for i, backend := range backendList {
p.AutoInsertBackend(service.IP, service.Port, backend.IP, backend.Port, i+1, backend.Possibility)
totalPercentage += backend.Possibility
p.AutoInsertBackend(service.IP, service.Port, backend.IP, backend.Port, i+1, backend.Possibility, totalPercentage)

}
for i, affiliatedService := range affiliatedServiceList {
p.AutoInsertBackend(service.IP, service.Port, affiliatedService.IP, affiliatedService.Port, len(backendList)+i+1, affiliatedService.Possibility)
totalPercentage += affiliatedService.Possibility
p.AutoInsertBackend(service.IP, service.Port, affiliatedService.IP, affiliatedService.Port, len(backendList)+i+1, affiliatedService.Possibility, totalPercentage)
}
if math.Abs(totalPercentage-1) > 0.005 {
fmt.Printf("[WARNING] Total weight for service %s:%s is not 1, but %f.\n", service.IP, service.Port, totalPercentage)
}
}

Expand All @@ -228,7 +237,7 @@ func (p *Programs) AutoDeleteService(service Service, affiliatedServiceList []Se
return false
}
serviceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, 0)
serviceValue := NewService4Value(Backend4Key{0}, uint16(0), Possibility{0}, DefaultAction)
serviceValue := NewService4Value(Backend4Key{0}, uint16(0), Possibility{0, 0}, DefaultAction)
err = p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup(serviceKey.ToNetwork(), serviceValue)
if err != nil {
fmt.Println("[ERROR] AutoDeleteService: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup failed:", err)
Expand All @@ -238,13 +247,13 @@ func (p *Programs) AutoDeleteService(service Service, affiliatedServiceList []Se
p.DeleteServiceItem(serviceIP, servicePort, 0)
for i := 1; i <= int(serviceValue.Count); i++ {
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(i))
backendServiceValue := NewService4Value(Backend4Key{uint32(0)}, 0, Possibility{0}, DefaultAction)
backendServiceValue := NewService4Value(Backend4Key{uint32(0)}, 0, Possibility{0, 0}, DefaultAction)
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup(backendServiceKey.ToNetwork(), backendServiceValue)
if err != nil {
fmt.Println("[WARNING] AutoDeleteService: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup failed:", err)
break
}
fmt.Printf("[DEBUG] To delete backend backend service: backendServiceKey: %s, backendServiceValue: %s\n", backendServiceKey.String(), backendServiceValue.String())
fmt.Printf("[DEBUG] To delete backend service: backendServiceKey: %s, backendServiceValue: %s\n", backendServiceKey.String(), backendServiceValue.String())
p.AutoDeleteBackend(int(backendServiceValue.BackendID.ID))
p.DeleteServiceItem(serviceIP, servicePort, i)
}
Expand All @@ -254,7 +263,7 @@ func (p *Programs) AutoDeleteService(service Service, affiliatedServiceList []Se
return false
}
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(int(serviceValue.Count)+i+1))
backendServiceValue := NewService4Value(Backend4Key{uint32(0)}, 0, Possibility{0}, DefaultAction)
backendServiceValue := NewService4Value(Backend4Key{uint32(0)}, 0, Possibility{0, 0}, DefaultAction)
err = p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup(backendServiceKey.ToNetwork(), backendServiceValue)
if err != nil {
fmt.Println("[WARNING] AutoDeleteService: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup failed:", err)
Expand All @@ -277,11 +286,11 @@ func (p *Programs) DeclareBackendID() int {
}

// AutoInsertBackend inserts an organized backend item into map
func (p *Programs) AutoInsertBackend(serviceIP string, servicePortStr string, backendIP string, backendPortStr string, slotIndex int, possibility float64) (bool, int) {
func (p *Programs) AutoInsertBackend(serviceIP string, servicePortStr string, backendIP string, backendPortStr string, slotIndex int, possibility float64, possibilityUpperBound float64) (bool, int) {
backendID := p.DeclareBackendID()
servicePort, _ := strconv.Atoi(servicePortStr)
backendPort, _ := strconv.Atoi(backendPortStr)
ok := p.InsertBackendItem(serviceIP, servicePort, backendIP, backendPort, backendID, slotIndex, possibility)
ok := p.InsertBackendItem(serviceIP, servicePort, backendIP, backendPort, backendID, slotIndex, possibility, possibilityUpperBound)
if ok {
p.backEndSet[backendID] = true
fmt.Printf("[INFO] AutoInsertBackend succeeded: serviceIP: %s, servicePort: %d, backendIP: %s, backendPort: %d, backendID: %d, slotIndex: %d, possibility: %.2f\n", serviceIP, servicePort, backendIP, backendPort, backendID, slotIndex, possibility)
Expand Down
27 changes: 26 additions & 1 deletion eBPF_Supermarket/TrafficManager/bpf/connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ struct lb4_service {
__u16 count;
__u16 possibility;
__u16 action;
__u8 pad[2];
__u16 weight_range_upper;
};

struct lb4_backend {
Expand Down Expand Up @@ -186,4 +186,29 @@ static __always_inline int sock_select_weighted_slot(int sbc, struct lb4_key key
return -ENOENT;
}
return key.backend_slot;
}

static __always_inline int sock_fast_select_weighted_slot(int sbc, struct lb4_key key)
{
int l = 1, r = sbc;
struct lb4_service *backend_slot;
int random_point = bpf_get_prandom_u32() % MAX_BACKEND_SELECTION;
for(int i = 0; i < 10; i++) { // 10 = log_2(MAX_BACKEND_SELECTION)
if(l == r) return l;
int mid = (l + r) >> 1;
bpf_printk("%d", mid);
key.backend_slot = mid;
backend_slot = lookup_lb4_backend_slot(&key);
if (!backend_slot)
return -ENOENT;
if(backend_slot->weight_range_upper == random_point) {
if(backend_slot->possibility > 0) return mid;
// We can not reach here except setting the item's weight to 0,
// then we have a probability of triggering this
else return -ENOENT;
}
else if(backend_slot->weight_range_upper < random_point) l = mid + 1;
else r = mid;
}
return -ENOENT; // infinite loop
}
24 changes: 13 additions & 11 deletions eBPF_Supermarket/TrafficManager/bpf/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ func (k *Service4Key) String() string {
addr += "/i"
}
if k.BackendSlot != 0 {
addr += " slot:" + strconv.Itoa(int(k.BackendSlot))
addr += " slot: " + strconv.Itoa(int(k.BackendSlot))
}
return addr
}
Expand All @@ -91,19 +91,20 @@ type ActionAttribute struct {
}

type Service4Value struct {
BackendID Backend4Key `align:"backend_id"`
Count uint16 `align:"count"`
Possibility uint16 `align:"possibility"`
Action Action `align:"action"`
Pad pad2uint8 `align:"pad"`
BackendID Backend4Key `align:"backend_id"`
Count uint16 `align:"count"`
Possibility uint16 `align:"possibility"`
Action Action `align:"action"`
WeightRangeUpper uint16 `align:"weight_range_upper"`
}

func NewService4Value(backendId Backend4Key, count uint16, possibility Possibility, action Action) *Service4Value {
value := Service4Value{
BackendID: backendId,
Count: count,
Possibility: uint16(possibility.percentage * maxPossibilityUnit),
Action: action,
BackendID: backendId,
Count: count,
Possibility: uint16(possibility.percentage * maxPossibilityUnit),
Action: action,
WeightRangeUpper: uint16(possibility.currentPercentageRangeUpper * maxPossibilityUnit),
}

return &value
Expand Down Expand Up @@ -160,7 +161,8 @@ func (v *Backend4Value) ToNetwork() *Backend4Value {
}

type Possibility struct {
percentage float64
percentage float64
currentPercentageRangeUpper float64
}

// func tes() {
Expand Down

0 comments on commit c8ddc0e

Please sign in to comment.