Skip to content
This repository has been archived by the owner on Jul 29, 2020. It is now read-only.

Commit

Permalink
Merge pull request #1 from Applifier/feature/max-servers
Browse files Browse the repository at this point in the history
add MaxServers setting
  • Loading branch information
eafzali authored Jul 18, 2019
2 parents 9344880 + 3fc66a5 commit ca69eb8
Show file tree
Hide file tree
Showing 3 changed files with 21 additions and 13 deletions.
3 changes: 3 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ spec:
initialDelaySeconds: 15
periodSeconds: 60
env:
# Maximum number of servers to return in response, # default is 0 and it means unlimited
- name: JAWLB_MAXSERVERS
value: "5"
# The name of the upstream service we want
# to balance
- name: JAWLB_SERVICE
Expand Down
28 changes: 16 additions & 12 deletions lb.go
Original file line number Diff line number Diff line change
@@ -1,15 +1,17 @@
package main

import (
"github.com/joa/jawlb/internal/atomic"
"math/rand"
"time"

grpclb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)

type lb struct {
b *broadcast
rr int64
b *broadcast
max int
}

func (l *lb) BalanceLoad(req grpclb.LoadBalancer_BalanceLoadServer) error {
Expand All @@ -35,14 +37,12 @@ func (l *lb) BalanceLoad(req grpclb.LoadBalancer_BalanceLoadServer) error {
l.b.addListener(ch)
defer l.b.remListener(ch)

offset := int(atomic.IncWrapInt64(&l.rr))

for {
select {
case <-req.Context().Done():
return nil
case msg := <-ch:
servers := convertServerList(msg, offset)
servers := convertServerList(msg, l.max)

err := req.Send(&grpclb.LoadBalanceResponse{
LoadBalanceResponseType: &grpclb.LoadBalanceResponse_ServerList{
Expand All @@ -59,14 +59,18 @@ func (l *lb) BalanceLoad(req grpclb.LoadBalancer_BalanceLoadServer) error {
}
}

func convertServerList(l ServerList, offset int) []*grpclb.Server {
var servers []*grpclb.Server

func convertServerList(l ServerList, max int) []*grpclb.Server {
n := len(l)
if max > n || max == 0 {
max = n
}
servers := make([]*grpclb.Server, max)

for i := 0; i < n; i++ {
server := l[(i+offset)%n]
servers = append(servers, convertServer(server))
rand.Seed(time.Now().UnixNano())
for i := 0; i < max; i++ {
j := rand.Intn(n-i) + i
l[i], l[j] = l[j], l[i]
servers[i] = convertServer(l[i])
}

return servers
Expand Down
3 changes: 2 additions & 1 deletion main.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ var cfg = struct {
Service string `desc:"Name of the service in Kubernetes" required:"true"`
LabelSelector string `desc:"Label selector for the service (foo=bar,baz=bang)"`
TargetPort string `default:"grpc" desc:"Target port name to forward to"`
MaxServers int `default:"0" desc:"Maximum number of servers to return in response, 0 means unlimited"`

WatchMaxRetries int `default:"60" desc:"Number of times to retry establishing the Kubernetess watch"`
WatchRetryDelay time.Duration `default:"1s" desc:"Delay between retries"`
Expand Down Expand Up @@ -94,7 +95,7 @@ func startServer(bc *broadcast) *grpc.Server {
}

srv := grpc.NewServer()
grpclb.RegisterLoadBalancerServer(srv, &lb{bc, 0})
grpclb.RegisterLoadBalancerServer(srv, &lb{bc, cfg.MaxServers})

go func() {
if err := srv.Serve(conn); err != nil {
Expand Down

0 comments on commit ca69eb8

Please sign in to comment.