Skip to content
This repository has been archived by the owner on Jul 29, 2020. It is now read-only.

Commit

Permalink
broadcast: Filter equal messages and serialize messages
Browse files Browse the repository at this point in the history
Using go functions within the broadcast itself is a rather bad
idea s the state of the broadcast might change while a go func
is being scheduled. As there is no guarantee on the order when
a go func is being scheduled a client may observe a new state
before an old state which would result in an inconsistent view
of the current ServerList.

The new channel filter ensures that no two same lists are
broadcasted. This happens quite often as Kubernetes may push
changes that do not modify the effective server list. In that
case all clients would reveive a state they already know. This
isn't necessary and pollutes only the logs.
  • Loading branch information
joa committed Mar 25, 2019
1 parent b7c67ea commit 21ac19e
Show file tree
Hide file tree
Showing 4 changed files with 141 additions and 4 deletions.
7 changes: 3 additions & 4 deletions broadcast.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ type broadcast struct {
func newBroadcast(ctx context.Context, src <-chan ServerList) *broadcast {
b := &broadcast{
ctx: ctx,
src: src,
src: newServerListFilter(ctx, src),
tgts: make(map[Listener]bool),
add: make(chan Listener),
rem: make(chan Listener),
Expand Down Expand Up @@ -49,15 +49,14 @@ func (b *broadcast) run() {

// send initial state if present once on registration
if len(b.state) > 0 {
go func() { l <- b.state }()
l <- b.state
}
case l := <-b.rem:
delete(b.tgts, l)
case state := <-b.src:
b.state = state
for tgt := range b.tgts {
tgt := tgt
go func() { tgt <- state }()
tgt <- state
}
}
}
Expand Down
90 changes: 90 additions & 0 deletions broadcast_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package main

import (
"context"
"net"
"testing"
"time"
)

func TestBroadcast(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

src := make(chan ServerList)
broadcast := newBroadcast(ctx, src)
listener := make(chan ServerList)

// adding a listener should not cause trouble and do not trigger the listener
// as long as the broadcast never received a server list
addDummyListeners(ctx, broadcast)
broadcast.addListener(listener)
addDummyListeners(ctx, broadcast)
assertNoDataReceived(listener, t)

// broadcasting should work
srv := Server{net.IP{127, 0, 0, 1}, 1234}
src <- ServerList{srv}
assertDataReceived(listener, t, srv)

// broadcasting the same data twice shouldn't happen
src <- ServerList{srv}
assertNoDataReceived(listener, t)

// broadcasting different data should work
srv = Server{net.IP{127, 0, 0, 1}, 5678}
src <- ServerList{srv}
assertDataReceived(listener, t, srv)

// removing a listener should mean we won't receive further updates
broadcast.remListener(listener)
src <- ServerList{srv}
assertNoDataReceived(listener, t)

// adding the listener should result in known state being sent
broadcast.addListener(listener)
assertDataReceived(listener, t, srv)
}

func addDummyListeners(ctx context.Context, b *broadcast) {
for i := 0; i < 10; i++ {
l := make(chan ServerList)
go func() {
for {
select {
case <-ctx.Done():
return
case <-l:
}
}
}()
b.addListener(l)
}
}

func assertDataReceived(listener chan ServerList, t *testing.T, expected Server) {
select {
case list := <-listener:
if len(list) != 1 {
t.Errorf("expected server list with len 1, got %d", len(list))
}

if list[0].IP.String() != expected.IP.String() {
t.Errorf("expected ip %s, got %s", expected.IP, list[0].IP)
}

if list[0].Port != expected.Port {
t.Errorf("expected port %d, got %d", expected.Port, list[0].Port)
}
case <-time.After(100 * time.Millisecond):
t.Error("expected listener to receive data")
}
}

func assertNoDataReceived(listener chan ServerList, t *testing.T) {
select {
case <-listener:
t.Error("listener received data but shouldn't")
case <-time.After(100 * time.Millisecond):
}
}
44 changes: 44 additions & 0 deletions filter.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
package main

import "context"

func newServerListFilter(ctx context.Context, in <-chan ServerList) <-chan ServerList {
out := make(chan ServerList)
go doFilter(ctx, in, out)
return out
}

func doFilter(ctx context.Context, in <-chan ServerList, out chan ServerList) {
var prev ServerList

looping:
for {
select {
case <-ctx.Done():
return
case next := <-in:
if len(prev) == len(next) {
equal := true

// make a sacrifice for the ssa range check elimination gods
n := len(prev)
_ = prev[n-1]
_ = next[n-1]

for i := range prev {
if !prev[i].Equal(next[i]) {
equal = false
break
}
}

if equal {
continue looping
}
}

prev = next
out <- next
}
}
}
4 changes: 4 additions & 0 deletions watch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@ type Server struct {
Port int32
}

func (s Server) Equal(x Server) bool {
return s.Port == x.Port && s.IP.Equal(x.IP)
}

type ServerList []Server

func watchService(ctx context.Context) (_ <-chan ServerList, err error) {
Expand Down

0 comments on commit 21ac19e

Please sign in to comment.