From bc04bcff7d8563a7bd19416db9ea059b62e09491 Mon Sep 17 00:00:00 2001 From: Joa Ebert Date: Wed, 28 Nov 2018 22:59:29 +0100 Subject: [PATCH] repo: Initial commit --- Dockerfile | 18 +++++++ README.md | 120 +++++++++++++++++++++++++++++++++++++++++++++ broadcast.go | 63 ++++++++++++++++++++++++ go.mod | 26 ++++++++++ go.sum | 69 ++++++++++++++++++++++++++ lb.go | 60 +++++++++++++++++++++++ main.go | 136 +++++++++++++++++++++++++++++++++++++++++++++++++++ watch.go | 117 ++++++++++++++++++++++++++++++++++++++++++++ 8 files changed, 609 insertions(+) create mode 100644 Dockerfile create mode 100644 README.md create mode 100644 broadcast.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 lb.go create mode 100644 main.go create mode 100644 watch.go diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..d320778 --- /dev/null +++ b/Dockerfile @@ -0,0 +1,18 @@ +FROM golang:alpine as builder + +ENV GO111MODULE on +RUN apk update && apk add git && apk add ca-certificates +RUN adduser -D -g '' unprivileged +COPY . $GOPATH/src/github.com/joa/jawlb/ +WORKDIR $GOPATH/src/github.com/joa/jawlb/ +RUN go get -d -v +RUN go generate main.go +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build -a -installsuffix cgo -ldflags="-w -s" -o /go/bin/app + +FROM scratch +COPY --from=builder /etc/ssl/certs/ca-certificates.crt /etc/ssl/certs/ +COPY --from=builder /etc/passwd /etc/passwd +COPY --from=builder /go/bin/app /go/bin/app +USER unprivileged +EXPOSE 8000 +ENTRYPOINT ["/go/bin/app"] diff --git a/README.md b/README.md new file mode 100644 index 0000000..a0ada47 --- /dev/null +++ b/README.md @@ -0,0 +1,120 @@ +### Jawlb +Jawlb (pronounced jolp) is an unsophisticated grpclb implementation for things running in Kubernetes talking to gRPC +services within that same Kubernetes cluster. + +This load balancer performs service discovery via the Kubernetes API and announces any changes it sees +via the grpclb protocol to its clients. + +### Building +You'll want to push the result of `docker build -f Dockerfile .` into your registry. + +### grpclb +The grpclb Go implementation (others maybe too) requires you to define a SRV record for the loadbalancer. +The easy solution is to define a Kubernetes service with a port named `grpclb` and you're all set. + +### Example +The example shows a load balancer setup that'll forward requests to `myservice:grpc`. +We assume that RBAC isn't required. + +#### Load Balancer Deployment +```yaml +apiVersion: apps/v1 +kind: Deployment +metadata: + name: myservice-lb +spec: + selector: + matchLabels: + app: jawlb + service: myservice + replicas: 1 + template: + metadata: + labels: + app: jawlb + service: myservice + spec: + restartPolicy: Always + containers: + - image: "your.regist.ry/jawlb:yolo" + name: jawlb + ports: + - containerPort: 8000 + name: grpclb + env: + # The name of the upstream service we want + # to balance + - name: JAWLB_SERVICE + value: "myservice" + # The name of the port exposed by this service + # which we want to forward to + - name: JAWLB_TARGETPORT + value: "grpc" + # The namespace in which jawlb performs the lookup + # and we use the current one simply + - name: JAWLB_NAMESPACE + valueFrom: + fieldRef: + fieldPath: metadata.namespace +``` + +#### Load Balancer Service +```yaml +apiVersion: v1 +kind: Service +metadata: + name: myservice-lb + labels: + app: jawlb + service: myservice +spec: + # Make this a headless service because we'll have to use + # the dns:// resolver in the Go client + clusterIP: None + ports: + # The port MUST be named grpclb in order to create + # the proper DNS SRV entry + - name: grpclb + port: 8000 + targetPort: grpclb + selector: + app: jawlb + service: myservice +``` + +#### gRPC Client +```go +import _ "google.golang.org/grpc/balancer/grpclb" + +// When dialing, gRPC's DNS resolver will issue a SRV lookup and +// because we're so nice to provide the grpclb entry, everything +// works as expected + +conn, err := grpc.Dial( + "dns:///myservice-lb:8000", // must use the dns resolver and port exposed at the balancer + grpc.WithBalancerName("grpclb"), // select the grpclb balancer strat + grpc.WithInsecure()) + +// ... magic 🧙‍♀️ +``` + +### Configuration +Everything is passed via environment variables. + +- `JAWLB_NAMESPACE` in which namespace service lookup is performed, default `"default"` +- `JAWLB_SERVICE` the name of the Kubernetes service to balance, required +- `JAWLB_TARGETPORT` the name(!) of the target port on that service, default `"grpc"` +- `JAWLB_LABELSELECTOR` an additional label selector, default `""` +- `JAWLB_HOST` the hostname to listen on, default`""` +- `JAWLB_PORT` port to listen on, default `8000` +- `JAWLB_SHUTDOWNGRACEPERIOD` Grace period for open connections during shutdown, default `"25s"` + +### What's missing +- Potentially some actual load balancing +- Implementation of [LoadReporter](https://github.com/grpc/grpc/blob/master/src/proto/grpc/lb/v1/load_reporter.proto) service +- Readiness Probe +- Health Check + +### Resources +- [grpclb spec](https://github.com/grpc/grpc/tree/master/src/proto/grpc/lb/v1) +- [Load Balancing in gRPC](https://github.com/grpc/grpc/blob/master/doc/load-balancing.md) diff --git a/broadcast.go b/broadcast.go new file mode 100644 index 0000000..6a85eca --- /dev/null +++ b/broadcast.go @@ -0,0 +1,63 @@ +package main + +import ( + "context" +) + +type Listener chan<- ServerList + +type broadcast struct { + ctx context.Context + src <-chan ServerList + state ServerList + tgts map[Listener]bool + add chan Listener + rem chan Listener +} + +func newBroadcast(ctx context.Context, src <-chan ServerList) *broadcast { + b := &broadcast{ + ctx: ctx, + src: src, + tgts: make(map[Listener]bool), + add: make(chan Listener), + rem: make(chan Listener), + } + + go b.run() + + return b +} + +func (b *broadcast) addListener(listener Listener) { + b.add <- listener + + // send initial state if present once on registration + if len(b.state) > 0 { + go func() { listener <- b.state }() + } +} + +func (b *broadcast) remListener(listener Listener) { + b.rem <- listener +} + +func (b *broadcast) run() { + for { + select { + case <-b.ctx.Done(): + close(b.add) + close(b.rem) + return + case l := <-b.add: + b.tgts[l] = true + case l := <-b.rem: + delete(b.tgts, l) + case state := <-b.src: + b.state = state + for tgt := range b.tgts { + go func() { tgt <- state }() + } + } + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..8847d52 --- /dev/null +++ b/go.mod @@ -0,0 +1,26 @@ +module github.com/joa/jawlb + +require ( + github.com/gogo/protobuf v1.1.1 // indirect + github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c // indirect + github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf // indirect + github.com/googleapis/gnostic v0.2.0 // indirect + github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f // indirect + github.com/imdario/mergo v0.3.6 // indirect + github.com/json-iterator/go v1.1.5 // indirect + github.com/kelseyhightower/envconfig v1.3.0 + github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect + github.com/modern-go/reflect2 v1.0.1 // indirect + github.com/peterbourgon/diskv v2.0.1+incompatible // indirect + github.com/spf13/pflag v1.0.3 // indirect + golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 // indirect + golang.org/x/time v0.0.0-20181108054448-85acf8d2951c // indirect + google.golang.org/grpc v1.16.0 + gopkg.in/inf.v0 v0.9.1 // indirect + gopkg.in/yaml.v2 v2.2.1 // indirect + k8s.io/api v0.0.0-20181121071145-b7bd5f2d334c + k8s.io/apimachinery v0.0.0-20181126191516-4a9a8137c0a1 + k8s.io/client-go v9.0.0+incompatible + k8s.io/klog v0.1.0 // indirect + sigs.k8s.io/yaml v1.1.0 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..223c401 --- /dev/null +++ b/go.sum @@ -0,0 +1,69 @@ +cloud.google.com/go v0.26.0/go.mod h1:aQUYkXzVsufM+DwF1aE+0xfcU+56JwCaLick0ClmMTw= +github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= +github.com/gogo/protobuf v1.1.1 h1:72R+M5VuhED/KujmZVcIquuo8mBgX4oVda//DQb3PXo= +github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b h1:VKtxabqXZkF25pY9ekfRL6a582T4P37/31XEstQ5p58= +github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= +github.com/golang/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:tluoj9z5200jBnyusfRPU2LqT6J+DAorxEvtC7LHB+E= +github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A= +github.com/golang/protobuf v1.2.0 h1:P3YflyNX/ehuJFLhxviNdFxQPkGK5cDcApsge1SqnvM= +github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c h1:964Od4U6p2jUkFxvCydnIczKteheJEzHRToSGK3Bnlw= +github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf h1:+RRA9JqSOZFfKrOeqr2z77+8R2RKyh8PG66dcu1V0ck= +github.com/google/gofuzz v0.0.0-20170612174753-24818f796faf/go.mod h1:HP5RmnzzSNb993RKQDq4+1A4ia9nllfqcQFTQJedwGI= +github.com/googleapis/gnostic v0.2.0 h1:l6N3VoaVzTncYYW+9yOz2LJJammFZGBO13sqgEhpy9g= +github.com/googleapis/gnostic v0.2.0/go.mod h1:sJBsCZ4ayReDTBIg8b9dl28c5xFWyhBTVRp3pOg5EKY= +github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f h1:ShTPMJQes6tubcjzGMODIVG5hlrCeImaBnZzKF2N8SM= +github.com/gregjones/httpcache v0.0.0-20181110185634-c63ab54fda8f/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA= +github.com/imdario/mergo v0.3.6 h1:xTNEAn+kxVO7dTZGu0CegyqKZmoWFI0rF8UxjlB2d28= +github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= +github.com/json-iterator/go v1.1.5 h1:gL2yXlmiIo4+t+y32d4WGwOjKGYcGOuyrg46vadswDE= +github.com/json-iterator/go v1.1.5/go.mod h1:+SdeFBvtyEkXs7REEP0seUULqWtbJapLOCVDaaPEHmU= +github.com/kelseyhightower/envconfig v1.3.0 h1:IvRS4f2VcIQy6j4ORGIf9145T/AsUB+oY8LyvN8BXNM= +github.com/kelseyhightower/envconfig v1.3.0/go.mod h1:cccZRl6mQpaq41TPp5QxidR+Sa3axMbJDNb//FQX6Gg= +github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd h1:TRLaZ9cD/w8PVh93nsPXa1VrQ6jlwL5oN8l14QlcNfg= +github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= +github.com/modern-go/reflect2 v1.0.1 h1:9f412s+6RmYXLWZSEzVVgPGK7C2PphHj5RJrvfx9AWI= +github.com/modern-go/reflect2 v1.0.1/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= +github.com/peterbourgon/diskv v2.0.1+incompatible h1:UBdAOUP5p4RWqPBg048CAvpKN+vxiaj6gdUUzhl4XmI= +github.com/peterbourgon/diskv v2.0.1+incompatible/go.mod h1:uqqh8zWWbv1HBMNONnaR/tNboyR3/BZd58JJSHlUSCU= +github.com/spf13/pflag v1.0.3 h1:zPAT6CGy6wXeQ7NtTnaTerfKOsV6V6F8agHXFiazDkg= +github.com/spf13/pflag v1.0.3/go.mod h1:DYY7MBk1bdzusC3SYhjObp+wFpr4gzcvqqNjLnInEg4= +golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85 h1:et7+NAX3lLIk5qUCTA9QelBjGE/NkhzYw/mhnr0s7nI= +golang.org/x/crypto v0.0.0-20181127143415-eb0de9b17e85/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= +golang.org/x/lint v0.0.0-20180702182130-06c8688daad7/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d h1:g9qWBGx4puODJTMVyoPrpoxPFgVGd+z1DZwjfRu4d0I= +golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be h1:vEDujvNQGv4jgYKudGeI/+DAX4Jffq6hpD55MmoEvKs= +golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= +golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522 h1:Ve1ORMCxvRmSXBwJK+t3Oy+V2vRW2OetUQBq4rJIkZE= +golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= +golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= +golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= +golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= +google.golang.org/appengine v1.1.0/go.mod h1:EbEs0AVv82hx2wNQdGPgUI5lhzA/G0D9YwlJXL52JkM= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0NQvRW8DG4Yk3Q6T9cu9RcFQDu1tc= +google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= +google.golang.org/grpc v1.16.0 h1:dz5IJGuC2BB7qXR5AyHNwAUBhZscK2xVez7mznh72sY= +google.golang.org/grpc v1.16.0/go.mod h1:0JHn/cJsOMiMfNA9+DeHDlAU7KAAB5GDlYFpa9MZMio= +gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= +gopkg.in/inf.v0 v0.9.1 h1:73M5CoZyi3ZLMOyDlQh031Cx6N9NDJ2Vvfl76EDAgDc= +gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw= +gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE= +gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= +honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= +k8s.io/api v0.0.0-20181121071145-b7bd5f2d334c h1:aSW17ws1n3Y/gxcAggEFSs+UJlzpE3+stTPLQSiVEno= +k8s.io/api v0.0.0-20181121071145-b7bd5f2d334c/go.mod h1:iuAfoD4hCxJ8Onx9kaTIt30j7jUFS00AXQi6QMi99vA= +k8s.io/apimachinery v0.0.0-20181126191516-4a9a8137c0a1 h1:u/v3rSGNjiTxclqUNHYgSrCIotyczPebwV1FPXtdKRQ= +k8s.io/apimachinery v0.0.0-20181126191516-4a9a8137c0a1/go.mod h1:ccL7Eh7zubPUSh9A3USN90/OzHNSVN6zxzde07TDCL0= +k8s.io/client-go v9.0.0+incompatible h1:2kqW3X2xQ9SbFvWZjGEHBLlWc1LG9JIJNXWkuqwdZ3A= +k8s.io/client-go v9.0.0+incompatible/go.mod h1:7vJpHMYJwNQCWgzmNV+VYUl1zCObLyodBc8nIyt8L5s= +k8s.io/klog v0.1.0 h1:I5HMfc/DtuVaGR1KPwUrTc476K8NCqNBldC7H4dYEzk= +k8s.io/klog v0.1.0/go.mod h1:Gq+BEi5rUBO/HRz0bTSXDUcqjScdoY3a9IHpCEIOOfk= +sigs.k8s.io/yaml v1.1.0 h1:4A07+ZFc2wgJwo8YNlQpr1rVlgUDlxXHhPJciaPY5gs= +sigs.k8s.io/yaml v1.1.0/go.mod h1:UJmg0vDUVViEyp3mgSv9WPwZCDxu4rQW1olrI1uml+o= diff --git a/lb.go b/lb.go new file mode 100644 index 0000000..2fb877b --- /dev/null +++ b/lb.go @@ -0,0 +1,60 @@ +package main + +import ( + grpclb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/status" +) + +type lb struct { + b *broadcast +} + +func (l *lb) BalanceLoad(req grpclb.LoadBalancer_BalanceLoadServer) error { + if in, err := req.Recv(); err != nil { + return err + } else if init := in.GetInitialRequest(); init != nil { + err := req.Send(&grpclb.LoadBalanceResponse{ + LoadBalanceResponseType: &grpclb.LoadBalanceResponse_InitialResponse{ + InitialResponse: &grpclb.InitialLoadBalanceResponse{}, + }, + }) + + if err != nil { + return err + } + } else { + return status.Error(codes.InvalidArgument, "expected initial request") + } + + ch := make(chan ServerList) + defer close(ch) + + l.b.addListener(ch) + defer l.b.remListener(ch) + + for { + select { + case <-req.Context().Done(): + return nil + case msg := <-ch: + var servers []*grpclb.Server + + for _, server := range msg { + servers = append(servers, &grpclb.Server{IpAddress: server.IP, Port: server.Port}) + } + + err := req.Send(&grpclb.LoadBalanceResponse{ + LoadBalanceResponseType: &grpclb.LoadBalanceResponse_ServerList{ + ServerList: &grpclb.ServerList{ + Servers: servers, + }, + }, + }) + + if err != nil { + return err + } + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..d06fc63 --- /dev/null +++ b/main.go @@ -0,0 +1,136 @@ +package main + +import ( + "context" + "log" + "net" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + "github.com/kelseyhightower/envconfig" + "google.golang.org/grpc" + grpclb "google.golang.org/grpc/balancer/grpclb/grpc_lb_v1" +) + +var cfg = struct { + Host string `default:"" desc:"Hostname to listen on"` + Port int `default:"8000" desc:"Port of the grpclb server"` + ShutdownGracePeriod time.Duration `default:"25s" desc:"Duration of graceful shutdown period"` // during this time, we try to answer open reqs but won't accept new ones + + Namespace string `default:"default" desc:"Kubernetes namespace in which to operate; empty for all namespaces"` + Service string `desc:"Name of the service in Kubernetes" required:"true"` + LabelSelector string `desc:"Label selector for the service (foo=bar,baz=bang)"` + TargetPort string `default:"grpc" desc:"Target port name to forward to"` +}{} + +func main() { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + configure() + + ch, err := watchService(ctx) + + if err != nil { + log.Panic(err) + } + + bc := newBroadcast(ctx, ch) + + srv := startServer(bc) + + logChanges(ctx, bc) + + log.Println("waiting for TERM") + awaitTerm() + + awaitShutdown(srv) + log.Println("bye") +} + +func logChanges(ctx context.Context, bc *broadcast) { + ch := make(chan ServerList) + bc.addListener(ch) + + go func() { + for { + select { + case <-ctx.Done(): + bc.remListener(ch) + close(ch) + return + case msg := <-ch: + log.Print("endpoints:") + for _, server := range msg { + log.Printf("\t%s:%d", server.IP, server.Port) + } + } + } + }() +} + +func configure() { + envconfig.MustProcess("JAWLB", &cfg) + if len(os.Args) > 1 && os.Args[1] == "help" { + if err := envconfig.Usage("JAWLB", &cfg); err != nil { + log.Panic(err) + } + } +} + +func startServer(bc *broadcast) *grpc.Server { + // setup listening socket + conn, err := listen() + + if err != nil { + log.Panic(err) + } + + srv := grpc.NewServer() + grpclb.RegisterLoadBalancerServer(srv, &lb{bc}) + + go func() { + if err := srv.Serve(conn); err != nil { + log.Println("grpc connection closed", err) + } + }() + + return srv +} + +func listen() (conn net.Listener, err error) { + addr := net.JoinHostPort(cfg.Host, strconv.Itoa(cfg.Port)) + conn, err = net.Listen("tcp", addr) + return +} + +func awaitTerm() { + sig := make(chan os.Signal, 1) + signal.Notify(sig, + syscall.SIGTERM, + syscall.SIGHUP, + syscall.SIGINT, + syscall.SIGQUIT) + <-sig +} + +func awaitShutdown(server *grpc.Server) { + log.Println("performing graceful shutdown") + + done := make(chan bool) + + go func() { + server.GracefulStop() + done <- true + }() + + select { + case <-time.After(cfg.ShutdownGracePeriod): + log.Println("graceful shutdown failed -- hard stop") + server.Stop() + case <-done: + } +} diff --git a/watch.go b/watch.go new file mode 100644 index 0000000..d5dfe1b --- /dev/null +++ b/watch.go @@ -0,0 +1,117 @@ +package main + +import ( + "context" + "net" + + "k8s.io/api/core/v1" + meta_v1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" +) + +type Server struct { + IP net.IP + Port int32 +} + +type ServerList []Server + +func watchService(ctx context.Context) (_ <-chan ServerList, err error) { + icc, err := getConfig() + + if err != nil { + return + } + + client, err := kubernetes.NewForConfig(icc) + + if err != nil { + return + } + + ep, err := client.CoreV1().Endpoints(cfg.Namespace).Watch(meta_v1.ListOptions{ + LabelSelector: cfg.LabelSelector, + Watch: true, + }) + + if err != nil { + return + } + + ch := make(chan ServerList) + + go func() { + for { + select { + case <-ctx.Done(): + close(ch) + return + case res := <-ep.ResultChan(): + { + endpoint, ok := res.Object.(*v1.Endpoints) + + if !ok { + continue + } + + if cfg.Namespace != "" && endpoint.Namespace != cfg.Namespace { + continue + } + + if endpoint.Name != cfg.Service { + continue + } + + var ips []string + var ports []int32 + + for _, subset := range endpoint.Subsets { + for _, addr := range subset.Addresses { + ips = append(ips, addr.IP) + } + + for _, port := range subset.Ports { + if cfg.TargetPort == "" || port.Name == cfg.TargetPort { + ports = append(ports, port.Port) + } + } + } + + var servers []Server + + for _, addr := range ips { + for _, port := range ports { + servers = append(servers, Server{IP: net.ParseIP(addr), Port: port}) + } + } + + ch <- servers + } + + } + } + }() + + return ch, nil +} + +func getConfig() (cfg *rest.Config, err error) { + cfg, err = rest.InClusterConfig() + + if err != nil { + loader := clientcmd.NewNonInteractiveDeferredLoadingClientConfig( + clientcmd.NewDefaultClientConfigLoadingRules(), + &clientcmd.ConfigOverrides{}, + ) + + cfg, err = loader.ClientConfig() + + if err != nil { + return + } + } + + return +}