Skip to content

Commit

Permalink
feat: support configurable I/O chunk to drain at a time in edge-trigg…
Browse files Browse the repository at this point in the history
…ered mode

Fixes #643
  • Loading branch information
panjf2000 committed Nov 5, 2024
1 parent 4a0fed8 commit e74cc08
Show file tree
Hide file tree
Showing 7 changed files with 336 additions and 174 deletions.
166 changes: 109 additions & 57 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,203 +100,254 @@ func TestClient(t *testing.T) {
t.Run("poll-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-ET-chunk", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", &testConf{true, 1 << 19, false, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", &testConf{true, 1 << 19, false, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", &testConf{true, 1 << 18, false, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", &testConf{true, 1 << 19, false, true, false, false, 10, SourceAddrHash})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", &testConf{true, 1 << 18, false, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", &testConf{true, 1 << 19, false, true, true, false, 10, SourceAddrHash})
})
})
})

t.Run("poll-reuseport-LT", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{false, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{false, 0, true, true, true, false, 10, LeastConnections})
})
})
})

t.Run("poll-reuseport-ET", func(t *testing.T) {
t.Run("tcp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("tcp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
runClient(t, "tcp", ":9991", &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "tcp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("udp-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
runClient(t, "udp", ":9991", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
runClient(t, "udp", ":9992", &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
t.Run("unix", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, true, false, false, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, true, true, false, false, 10, LeastConnections})
})
})
t.Run("unix-async", func(t *testing.T) {
t.Run("1-loop", func(t *testing.T) {
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
runClient(t, "unix", "gnet1.sock", &testConf{true, 0, true, false, true, false, 10, RoundRobin})
})
t.Run("N-loop", func(t *testing.T) {
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
runClient(t, "unix", "gnet2.sock", &testConf{true, 0, true, true, true, false, 10, LeastConnections})
})
})
})
Expand Down Expand Up @@ -426,14 +477,14 @@ func (s *testClient) OnTick() (delay time.Duration, action Action) {
return
}

func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
func runClient(t *testing.T, network, addr string, conf *testConf) {
ts := &testClient{
tester: t,
network: network,
addr: addr,
multicore: multicore,
async: async,
nclients: nclients,
multicore: conf.multicore,
async: conf.async,
nclients: conf.clients,
workerPool: goPool.Default(),
}
var err error
Expand All @@ -452,13 +503,14 @@ func runClient(t *testing.T, network, addr string, et, reuseport, multicore, asy

err = Run(ts,
network+"://"+addr,
WithEdgeTriggeredIO(et),
WithLockOSThread(async),
WithMulticore(multicore),
WithReusePort(reuseport),
WithEdgeTriggeredIO(conf.et),
WithEdgeTriggeredIOChunk(conf.etChunk),
WithLockOSThread(conf.async),
WithMulticore(conf.multicore),
WithReusePort(conf.reuseport),
WithTicker(true),
WithTCPKeepAlive(time.Minute*1),
WithLoadBalancing(lb))
WithLoadBalancing(conf.lb))
assert.NoError(t, err)
}

Expand Down
7 changes: 7 additions & 0 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,13 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
poller: p,
}

if options.EdgeTriggeredIOChunk > 0 {
options.EdgeTriggeredIO = true
options.EdgeTriggeredIOChunk = math.CeilToPowerOfTwo(options.EdgeTriggeredIOChunk)

Check warning on line 92 in client_unix.go

View check run for this annotation

Codecov / codecov/patch

client_unix.go#L91-L92

Added lines #L91 - L92 were not covered by tests
} else if options.EdgeTriggeredIO {
options.EdgeTriggeredIOChunk = 1 << 20 // 1MB
}

Check warning on line 95 in client_unix.go

View check run for this annotation

Codecov / codecov/patch

client_unix.go#L94-L95

Added lines #L94 - L95 were not covered by tests

rbc := options.ReadBufferCap
switch {
case rbc <= 0:
Expand Down
4 changes: 2 additions & 2 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ type udpConn struct {
}

type openConn struct {
c *conn
cb func()
c *conn
cb func()
}

type conn struct {
Expand Down
Loading

0 comments on commit e74cc08

Please sign in to comment.