Skip to content

Commit

Permalink
Add BatchIO and multiple ports options to UDPMux
Browse files Browse the repository at this point in the history
Add BatchIO and multiple ports options to
NewMultiUDPMuxPort(s)
  • Loading branch information
cnderrauber committed Aug 29, 2023
1 parent 0ec2333 commit 5113c69
Show file tree
Hide file tree
Showing 6 changed files with 198 additions and 64 deletions.
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ require (
github.com/pion/mdns v0.0.7
github.com/pion/randutil v0.1.0
github.com/pion/stun v0.6.1
github.com/pion/transport/v2 v2.2.1
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0
github.com/pion/turn/v2 v2.1.3
github.com/stretchr/testify v1.8.4
golang.org/x/net v0.13.0
Expand Down
3 changes: 2 additions & 1 deletion go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,9 @@ github.com/pion/randutil v0.1.0/go.mod h1:XcJrSMMbbMRhASFVOlj/5hQial/Y8oH/HVo7TB
github.com/pion/stun v0.6.1 h1:8lp6YejULeHBF8NmV8e2787BogQhduZugh5PdhDyyN4=
github.com/pion/stun v0.6.1/go.mod h1:/hO7APkX4hZKu/D0f2lHzNyvdkTGtIy3NDmLR7kSz/8=
github.com/pion/transport/v2 v2.0.0/go.mod h1:HS2MEBJTwD+1ZI2eSXSvHJx/HnzQqRy2/LXxt6eVMHc=
github.com/pion/transport/v2 v2.2.1 h1:7qYnCBlpgSJNYMbLCKuSY9KbQdBFoETvPNETv0y4N7c=
github.com/pion/transport/v2 v2.2.1/go.mod h1:cXXWavvCnFF6McHTft3DWS9iic2Mftcz1Aq29pGcU5g=
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0 h1:7z51t0GDPVHvR8KTnVfUGgeE0KqZvc9o5J3UMVMrykY=
github.com/pion/transport/v2 v2.2.2-0.20230829043045-6a34769ff4b0/go.mod h1:OJg3ojoBJopjEeECq2yJdXH9YVrUJ1uQ++NjXLOUorc=
github.com/pion/turn/v2 v2.1.3 h1:pYxTVWG2gpC97opdRc5IGsQ1lJ9O/IlNhkzj7MMrGAA=
github.com/pion/turn/v2 v2.1.3/go.mod h1:huEpByKKHix2/b9kmTAM3YoX6MKP+/D//0ClgUYR2fY=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
Expand Down
7 changes: 7 additions & 0 deletions udp_mux.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ type UDPMux interface {
GetConn(ufrag string, addr net.Addr) (net.PacketConn, error)
RemoveConnByUfrag(ufrag string)
GetListenAddresses() []net.Addr
ConnCount() int
}

// UDPMuxDefault is an implementation of the interface
Expand Down Expand Up @@ -176,6 +177,12 @@ func (m *UDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketConn, er
return c, nil
}

func (m *UDPMuxDefault) ConnCount() int {

Check warning on line 180 in udp_mux.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported method UDPMuxDefault.ConnCount should have comment or be unexported (revive)
m.mu.Lock()
defer m.mu.Unlock()
return len(m.connsIPv4) + len(m.connsIPv6)

Check warning on line 183 in udp_mux.go

View check run for this annotation

Codecov / codecov/patch

udp_mux.go#L180-L183

Added lines #L180 - L183 were not covered by tests
}

// RemoveConnByUfrag stops and removes the muxed packet connection
func (m *UDPMuxDefault) RemoveConnByUfrag(ufrag string) {
removedConns := make([]*udpMuxedConn, 0, 2)
Expand Down
130 changes: 105 additions & 25 deletions udp_mux_multi.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ package ice
import (
"fmt"
"net"
"time"

"github.com/pion/logging"
"github.com/pion/transport/v2"
"github.com/pion/transport/v2/stdnet"
tudp "github.com/pion/transport/v2/udp"
)

// MultiUDPMuxDefault implements both UDPMux and AllConnsGetter,
Expand All @@ -18,20 +20,63 @@ import (
type MultiUDPMuxDefault struct {
muxes []UDPMux
localAddrToMux map[string]UDPMux

// Manage port balance for mux that listen on multiple ports for same IP,
// for each IP, only return one addr (one port) for each GetListenAddresses call to
// avoid duplicate ip candidates be gathered for a single ice agent.
multiPortsAddresses []*multiPortsAddress
}

type addrMux struct {
addr net.Addr
mux UDPMux
}

// each multiPortsAddress represents muxes listen on different ports of a same IP
type multiPortsAddress struct {
addresseMuxes []*addrMux
}

func (mpa *multiPortsAddress) next() net.Addr {
leastAddr, leastConns := mpa.addresseMuxes[0].addr, mpa.addresseMuxes[0].mux.ConnCount()
for i := 1; i < len(mpa.addresseMuxes); i++ {
am := mpa.addresseMuxes[i]
if count := am.mux.ConnCount(); count < leastConns {
leastConns = count
leastAddr = am.addr
}

Check warning on line 47 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L40-L47

Added lines #L40 - L47 were not covered by tests
}
return leastAddr

Check warning on line 49 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L49

Added line #L49 was not covered by tests
}

// NewMultiUDPMuxDefault creates an instance of MultiUDPMuxDefault that
// uses the provided UDPMux instances.
func NewMultiUDPMuxDefault(muxes ...UDPMux) *MultiUDPMuxDefault {
addrToMux := make(map[string]UDPMux)
ipToAddrs := make(map[string]*multiPortsAddress)

Check warning on line 56 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L56

Added line #L56 was not covered by tests
for _, mux := range muxes {
for _, addr := range mux.GetListenAddresses() {
addrToMux[addr.String()] = mux

ip := addr.(*net.UDPAddr).IP.String()

Check failure on line 61 in udp_mux_multi.go

View workflow job for this annotation

GitHub Actions / lint / Go

type assertion must be checked (forcetypeassert)
if mpa, ok := ipToAddrs[ip]; ok {
mpa.addresseMuxes = append(mpa.addresseMuxes, &addrMux{addr, mux})
} else {
ipToAddrs[ip] = &multiPortsAddress{
addresseMuxes: []*addrMux{{addr, mux}},
}
}

Check warning on line 68 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L60-L68

Added lines #L60 - L68 were not covered by tests
}
}

multiPortsAddresses := make([]*multiPortsAddress, 0, len(ipToAddrs))
for _, mpa := range ipToAddrs {
multiPortsAddresses = append(multiPortsAddresses, mpa)
}

Check warning on line 75 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L72-L75

Added lines #L72 - L75 were not covered by tests
return &MultiUDPMuxDefault{
muxes: muxes,
localAddrToMux: addrToMux,
muxes: muxes,
localAddrToMux: addrToMux,
multiPortsAddresses: multiPortsAddresses,

Check warning on line 79 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L77-L79

Added lines #L77 - L79 were not covered by tests
}
}

Expand All @@ -45,6 +90,15 @@ func (m *MultiUDPMuxDefault) GetConn(ufrag string, addr net.Addr) (net.PacketCon
return mux.GetConn(ufrag, addr)
}

// ConnCount return count of working connections created by the mux.
func (m *MultiUDPMuxDefault) ConnCount() int {
var count int
for _, mux := range m.muxes {
count += mux.ConnCount()
}
return count

Check warning on line 99 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L94-L99

Added lines #L94 - L99 were not covered by tests
}

// RemoveConnByUfrag stops and removes the muxed packet connection
// from all underlying UDPMux instances.
func (m *MultiUDPMuxDefault) RemoveConnByUfrag(ufrag string) {
Expand All @@ -64,18 +118,24 @@ func (m *MultiUDPMuxDefault) Close() error {
return err
}

// GetListenAddresses returns the list of addresses that this mux is listening on
// GetListenAddresses returns the list of addresses that this mux is listening on,
// if there are multiple mux listen on different ports of a same IP addr, will return
// the mux who has least connections of that IP addr.
func (m *MultiUDPMuxDefault) GetListenAddresses() []net.Addr {
addrs := make([]net.Addr, 0, len(m.localAddrToMux))
for _, mux := range m.muxes {
addrs = append(addrs, mux.GetListenAddresses()...)
addrs := make([]net.Addr, 0, len(m.multiPortsAddresses))
for _, mpa := range m.multiPortsAddresses {
addrs = append(addrs, mpa.next())

Check warning on line 127 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L125-L127

Added lines #L125 - L127 were not covered by tests
}
return addrs
}

// NewMultiUDPMuxFromPort creates an instance of MultiUDPMuxDefault that
// listen all interfaces on the provided port.
func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {
return NewMultiUDPMuxFromPorts([]int{port}, opts...)

Check warning on line 135 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L135

Added line #L135 was not covered by tests
}

func NewMultiUDPMuxFromPorts(ports []int, opts ...UDPMuxFromPortOption) (*MultiUDPMuxDefault, error) {

Check warning on line 138 in udp_mux_multi.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported function NewMultiUDPMuxFromPorts should have comment or be unexported (revive)

Check warning on line 138 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L138

Added line #L138 was not covered by tests
params := multiUDPMuxFromPortParam{
networks: []NetworkType{NetworkTypeUDP4, NetworkTypeUDP6},
}
Expand All @@ -95,20 +155,29 @@ func NewMultiUDPMuxFromPort(port int, opts ...UDPMuxFromPortOption) (*MultiUDPMu
return nil, err
}

conns := make([]net.PacketConn, 0, len(ips))
conns := make([]net.PacketConn, 0, len(ports)*len(ips))

Check warning on line 158 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L158

Added line #L158 was not covered by tests
for _, ip := range ips {
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
if listenErr != nil {
err = listenErr
break
}
if params.readBufferSize > 0 {
_ = conn.SetReadBuffer(params.readBufferSize)
for _, port := range ports {
conn, listenErr := params.net.ListenUDP("udp", &net.UDPAddr{IP: ip, Port: port})
if listenErr != nil {
err = listenErr
break

Check warning on line 164 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L160-L164

Added lines #L160 - L164 were not covered by tests
}
if params.readBufferSize > 0 {
_ = conn.SetReadBuffer(params.readBufferSize)
}
if params.writeBufferSize > 0 {
_ = conn.SetWriteBuffer(params.writeBufferSize)
}
if params.batchWriteSize > 0 {
conns = append(conns, tudp.NewBatchConn(conn, params.batchWriteSize, params.batchWriteInterval))
} else {
conns = append(conns, conn)
}

Check warning on line 176 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L166-L176

Added lines #L166 - L176 were not covered by tests
}
if params.writeBufferSize > 0 {
_ = conn.SetWriteBuffer(params.writeBufferSize)
if err != nil {
break

Check warning on line 179 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L178-L179

Added lines #L178 - L179 were not covered by tests
}
conns = append(conns, conn)
}

if err != nil {
Expand Down Expand Up @@ -137,14 +206,16 @@ type UDPMuxFromPortOption interface {
}

type multiUDPMuxFromPortParam struct {
ifFilter func(string) bool
ipFilter func(ip net.IP) bool
networks []NetworkType
readBufferSize int
writeBufferSize int
logger logging.LeveledLogger
includeLoopback bool
net transport.Net
ifFilter func(string) bool
ipFilter func(ip net.IP) bool
networks []NetworkType
readBufferSize int
writeBufferSize int
logger logging.LeveledLogger
includeLoopback bool
net transport.Net
batchWriteSize int
batchWriteInterval time.Duration
}

type udpMuxFromPortOption struct {
Expand Down Expand Up @@ -226,3 +297,12 @@ func UDPMuxFromPortWithNet(n transport.Net) UDPMuxFromPortOption {
},
}
}

func UDPMuxFromPortWithBatchWrite(batchWriteSize int, batchWriteInterval time.Duration) UDPMuxFromPortOption {

Check warning on line 301 in udp_mux_multi.go

View workflow job for this annotation

GitHub Actions / lint / Go

exported: exported function UDPMuxFromPortWithBatchWrite should have comment or be unexported (revive)
return &udpMuxFromPortOption{
f: func(p *multiUDPMuxFromPortParam) {
p.batchWriteSize = batchWriteSize
p.batchWriteInterval = batchWriteInterval
},

Check warning on line 306 in udp_mux_multi.go

View check run for this annotation

Codecov / codecov/patch

udp_mux_multi.go#L301-L306

Added lines #L301 - L306 were not covered by tests
}
}
95 changes: 62 additions & 33 deletions udp_mux_multi_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
package ice

import (
"fmt"
"net"
"strings"
"sync"
Expand Down Expand Up @@ -117,39 +118,67 @@ func TestUnspecifiedUDPMux(t *testing.T) {
lim := test.TimeOut(time.Second * 30)
defer lim.Stop()

muxPort := 7778
udpMuxMulti, err := NewMultiUDPMuxFromPort(muxPort, UDPMuxFromPortWithInterfaceFilter(func(s string) bool {
defaultDockerBridgeNetwork := strings.Contains(s, "docker")
customDockerBridgeNetwork := strings.Contains(s, "br-")
return !defaultDockerBridgeNetwork && !customDockerBridgeNetwork
}))
require.NoError(t, err)

require.GreaterOrEqual(t, len(udpMuxMulti.muxes), 1, "at least have 1 muxes")
defer func() {
_ = udpMuxMulti.Close()
}()

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag1", udp)
}()
wg.Add(1)
go func() {
defer wg.Done()
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag2", udp4)
}()

// Skip IPv6 test on i386
const ptrSize = 32 << (^uintptr(0) >> 63)
if ptrSize != 32 {
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag3", udp6)
cases := map[string][]int{
"single port": {7778},
"multi ports": {7779, 7780, 7781},
}

wg.Wait()

require.NoError(t, udpMuxMulti.Close())
for name, ports := range cases {
cname, cports := name, ports
t.Run(cname, func(t *testing.T) {
udpMuxMulti, err := NewMultiUDPMuxFromPorts(cports, UDPMuxFromPortWithInterfaceFilter(func(s string) bool {
defaultDockerBridgeNetwork := strings.Contains(s, "docker")
customDockerBridgeNetwork := strings.Contains(s, "br-")
return !defaultDockerBridgeNetwork && !customDockerBridgeNetwork
}))
require.NoError(t, err)

require.GreaterOrEqual(t, len(udpMuxMulti.muxes), 1, "at least have 1 muxes")
defer func() {
_ = udpMuxMulti.Close()
}()

wg := sync.WaitGroup{}

wg.Add(1)
go func() {
defer wg.Done()
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag1", udp)
}()
wg.Add(1)
go func() {
defer wg.Done()
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag2", udp4)
}()

// Skip IPv6 test on i386
const ptrSize = 32 << (^uintptr(0) >> 63)
if ptrSize != 32 {
testMultiUDPMuxConnections(t, udpMuxMulti, "ufrag3", udp6)
}

wg.Wait()

// check port allocation is balanced
if len(cports) > 1 {
expectPorts := make(map[int]bool)
for i := range cports {
addr := udpMuxMulti.GetListenAddresses()[0]
ufrag := fmt.Sprintf("ufragetest%d", i)
conn, err := udpMuxMulti.GetConn(ufrag, addr)
require.NoError(t, err)
require.NotNil(t, conn)
require.False(t, expectPorts[conn.LocalAddr().(*net.UDPAddr).Port], fmt.Sprint("port ", conn.LocalAddr().(*net.UDPAddr).Port, " is already used", expectPorts))

Check failure on line 171 in udp_mux_multi_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

type assertion must be checked (forcetypeassert)
expectPorts[conn.LocalAddr().(*net.UDPAddr).Port] = true

Check failure on line 172 in udp_mux_multi_test.go

View workflow job for this annotation

GitHub Actions / lint / Go

type assertion must be checked (forcetypeassert)

conn2, err := udpMuxMulti.GetConn(ufrag, addr)
require.NoError(t, err)
require.Equal(t, conn, conn2)
}
require.Equal(t, len(cports), len(expectPorts))
}

require.NoError(t, udpMuxMulti.Close())
})
}
}
Loading

0 comments on commit 5113c69

Please sign in to comment.