Skip to content

Commit 3860208

Browse files
authored
feat: support edge-triggered I/O (#576)
Fixes #573
1 parent 601bfdf commit 3860208

32 files changed

+1159
-675
lines changed

acceptor_unix.go

+8-4
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
3434
if err != nil {
3535
switch err {
3636
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
37-
// ECONNABORTED means that a socket on the listen
37+
// ECONNABORTED indicates that a socket on the listen
3838
// queue was closed before we Accept()ed it;
3939
// it's a silly error, so try again.
4040
return nil
@@ -66,11 +66,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
6666
return el.readUDP1(fd, ev, flags)
6767
}
6868

69-
nfd, sa, err := socket.Accept(el.ln.fd)
69+
nfd, sa, err := socket.Accept(fd)
7070
if err != nil {
7171
switch err {
7272
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
73-
// ECONNABORTED means that a socket on the listen
73+
// ECONNABORTED indicates that a socket on the listen
7474
// queue was closed before we Accept()ed it;
7575
// it's a silly error, so try again.
7676
return nil
@@ -87,7 +87,11 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
8787
}
8888

8989
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
90-
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
90+
addEvents := el.poller.AddRead
91+
if el.engine.opts.EdgeTriggeredIO {
92+
addEvents = el.poller.AddReadWrite
93+
}
94+
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
9195
return err
9296
}
9397
el.connections.addConn(c, el.idx)

client_test.go

+140-38
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ type connHandler struct {
3131
type clientEvents struct {
3232
*BuiltinEventEngine
3333
tester *testing.T
34-
svr *testClientServer
34+
svr *testClient
3535
packetLen int
3636
}
3737

@@ -87,117 +87,219 @@ func (ev *clientEvents) OnShutdown(e Engine) {
8787
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
8888
}
8989

90-
func TestServeWithGnetClient(t *testing.T) {
90+
func TestClient(t *testing.T) {
9191
// start an engine
9292
// connect 10 clients
9393
// each client will pipe random data for 1-3 seconds.
9494
// the writes to the engine will be random sizes. 0KB - 1MB.
9595
// the engine will echo back the data.
9696
// waits for graceful connection closing.
97-
t.Run("poll", func(t *testing.T) {
97+
t.Run("poll-LT", func(t *testing.T) {
9898
t.Run("tcp", func(t *testing.T) {
9999
t.Run("1-loop", func(t *testing.T) {
100-
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
100+
runClient(t, "tcp", ":9991", false, false, false, false, 10, RoundRobin)
101101
})
102102
t.Run("N-loop", func(t *testing.T) {
103-
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
103+
runClient(t, "tcp", ":9992", false, false, true, false, 10, LeastConnections)
104104
})
105105
})
106106
t.Run("tcp-async", func(t *testing.T) {
107107
t.Run("1-loop", func(t *testing.T) {
108-
testServeWithGnetClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
108+
runClient(t, "tcp", ":9991", false, false, false, true, 10, RoundRobin)
109109
})
110110
t.Run("N-loop", func(t *testing.T) {
111-
testServeWithGnetClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
111+
runClient(t, "tcp", ":9992", false, false, true, true, 10, LeastConnections)
112112
})
113113
})
114114
t.Run("udp", func(t *testing.T) {
115115
t.Run("1-loop", func(t *testing.T) {
116-
testServeWithGnetClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
116+
runClient(t, "udp", ":9991", false, false, false, false, 10, RoundRobin)
117117
})
118118
t.Run("N-loop", func(t *testing.T) {
119-
testServeWithGnetClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
119+
runClient(t, "udp", ":9992", false, false, true, false, 10, LeastConnections)
120120
})
121121
})
122122
t.Run("udp-async", func(t *testing.T) {
123123
t.Run("1-loop", func(t *testing.T) {
124-
testServeWithGnetClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
124+
runClient(t, "udp", ":9991", false, false, false, true, 10, RoundRobin)
125125
})
126126
t.Run("N-loop", func(t *testing.T) {
127-
testServeWithGnetClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
127+
runClient(t, "udp", ":9992", false, false, true, true, 10, LeastConnections)
128128
})
129129
})
130130
t.Run("unix", func(t *testing.T) {
131131
t.Run("1-loop", func(t *testing.T) {
132-
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
132+
runClient(t, "unix", "gnet1.sock", false, false, false, false, 10, RoundRobin)
133133
})
134134
t.Run("N-loop", func(t *testing.T) {
135-
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
135+
runClient(t, "unix", "gnet2.sock", false, false, true, false, 10, SourceAddrHash)
136136
})
137137
})
138138
t.Run("unix-async", func(t *testing.T) {
139139
t.Run("1-loop", func(t *testing.T) {
140-
testServeWithGnetClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
140+
runClient(t, "unix", "gnet1.sock", false, false, false, true, 10, RoundRobin)
141141
})
142142
t.Run("N-loop", func(t *testing.T) {
143-
testServeWithGnetClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
143+
runClient(t, "unix", "gnet2.sock", false, false, true, true, 10, SourceAddrHash)
144144
})
145145
})
146146
})
147147

148-
t.Run("poll-reuseport", func(t *testing.T) {
148+
t.Run("poll-ET", func(t *testing.T) {
149149
t.Run("tcp", func(t *testing.T) {
150150
t.Run("1-loop", func(t *testing.T) {
151-
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
151+
runClient(t, "tcp", ":9991", true, false, false, false, 10, RoundRobin)
152152
})
153153
t.Run("N-loop", func(t *testing.T) {
154-
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
154+
runClient(t, "tcp", ":9992", true, false, true, false, 10, LeastConnections)
155155
})
156156
})
157157
t.Run("tcp-async", func(t *testing.T) {
158158
t.Run("1-loop", func(t *testing.T) {
159-
testServeWithGnetClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
159+
runClient(t, "tcp", ":9991", true, false, false, true, 10, RoundRobin)
160160
})
161161
t.Run("N-loop", func(t *testing.T) {
162-
testServeWithGnetClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
162+
runClient(t, "tcp", ":9992", true, false, true, true, 10, LeastConnections)
163163
})
164164
})
165165
t.Run("udp", func(t *testing.T) {
166166
t.Run("1-loop", func(t *testing.T) {
167-
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
167+
runClient(t, "udp", ":9991", true, false, false, false, 10, RoundRobin)
168168
})
169169
t.Run("N-loop", func(t *testing.T) {
170-
testServeWithGnetClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
170+
runClient(t, "udp", ":9992", true, false, true, false, 10, LeastConnections)
171171
})
172172
})
173173
t.Run("udp-async", func(t *testing.T) {
174174
t.Run("1-loop", func(t *testing.T) {
175-
testServeWithGnetClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
175+
runClient(t, "udp", ":9991", true, false, false, true, 10, RoundRobin)
176176
})
177177
t.Run("N-loop", func(t *testing.T) {
178-
testServeWithGnetClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
178+
runClient(t, "udp", ":9992", true, false, true, true, 10, LeastConnections)
179179
})
180180
})
181181
t.Run("unix", func(t *testing.T) {
182182
t.Run("1-loop", func(t *testing.T) {
183-
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
183+
runClient(t, "unix", "gnet1.sock", true, false, false, false, 10, RoundRobin)
184184
})
185185
t.Run("N-loop", func(t *testing.T) {
186-
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
186+
runClient(t, "unix", "gnet2.sock", true, false, true, false, 10, SourceAddrHash)
187187
})
188188
})
189189
t.Run("unix-async", func(t *testing.T) {
190190
t.Run("1-loop", func(t *testing.T) {
191-
testServeWithGnetClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
191+
runClient(t, "unix", "gnet1.sock", true, false, false, true, 10, RoundRobin)
192192
})
193193
t.Run("N-loop", func(t *testing.T) {
194-
testServeWithGnetClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
194+
runClient(t, "unix", "gnet2.sock", true, false, true, true, 10, SourceAddrHash)
195+
})
196+
})
197+
})
198+
199+
t.Run("poll-LT-reuseport", func(t *testing.T) {
200+
t.Run("tcp", func(t *testing.T) {
201+
t.Run("1-loop", func(t *testing.T) {
202+
runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin)
203+
})
204+
t.Run("N-loop", func(t *testing.T) {
205+
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
206+
})
207+
})
208+
t.Run("tcp-async", func(t *testing.T) {
209+
t.Run("1-loop", func(t *testing.T) {
210+
runClient(t, "tcp", ":9991", false, true, false, true, 10, RoundRobin)
211+
})
212+
t.Run("N-loop", func(t *testing.T) {
213+
runClient(t, "tcp", ":9992", false, true, true, false, 10, LeastConnections)
214+
})
215+
})
216+
t.Run("udp", func(t *testing.T) {
217+
t.Run("1-loop", func(t *testing.T) {
218+
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
219+
})
220+
t.Run("N-loop", func(t *testing.T) {
221+
runClient(t, "udp", ":9992", false, true, true, false, 10, LeastConnections)
222+
})
223+
})
224+
t.Run("udp-async", func(t *testing.T) {
225+
t.Run("1-loop", func(t *testing.T) {
226+
runClient(t, "udp", ":9991", false, true, false, false, 10, RoundRobin)
227+
})
228+
t.Run("N-loop", func(t *testing.T) {
229+
runClient(t, "udp", ":9992", false, true, true, true, 10, LeastConnections)
230+
})
231+
})
232+
t.Run("unix", func(t *testing.T) {
233+
t.Run("1-loop", func(t *testing.T) {
234+
runClient(t, "unix", "gnet1.sock", false, true, false, false, 10, RoundRobin)
235+
})
236+
t.Run("N-loop", func(t *testing.T) {
237+
runClient(t, "unix", "gnet2.sock", false, true, true, false, 10, LeastConnections)
238+
})
239+
})
240+
t.Run("unix-async", func(t *testing.T) {
241+
t.Run("1-loop", func(t *testing.T) {
242+
runClient(t, "unix", "gnet1.sock", false, true, false, true, 10, RoundRobin)
243+
})
244+
t.Run("N-loop", func(t *testing.T) {
245+
runClient(t, "unix", "gnet2.sock", false, true, true, true, 10, LeastConnections)
246+
})
247+
})
248+
})
249+
250+
t.Run("poll-ET-reuseport", func(t *testing.T) {
251+
t.Run("tcp", func(t *testing.T) {
252+
t.Run("1-loop", func(t *testing.T) {
253+
runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin)
254+
})
255+
t.Run("N-loop", func(t *testing.T) {
256+
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
257+
})
258+
})
259+
t.Run("tcp-async", func(t *testing.T) {
260+
t.Run("1-loop", func(t *testing.T) {
261+
runClient(t, "tcp", ":9991", true, true, false, true, 10, RoundRobin)
262+
})
263+
t.Run("N-loop", func(t *testing.T) {
264+
runClient(t, "tcp", ":9992", true, true, true, false, 10, LeastConnections)
265+
})
266+
})
267+
t.Run("udp", func(t *testing.T) {
268+
t.Run("1-loop", func(t *testing.T) {
269+
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
270+
})
271+
t.Run("N-loop", func(t *testing.T) {
272+
runClient(t, "udp", ":9992", true, true, true, false, 10, LeastConnections)
273+
})
274+
})
275+
t.Run("udp-async", func(t *testing.T) {
276+
t.Run("1-loop", func(t *testing.T) {
277+
runClient(t, "udp", ":9991", true, true, false, false, 10, RoundRobin)
278+
})
279+
t.Run("N-loop", func(t *testing.T) {
280+
runClient(t, "udp", ":9992", true, true, true, true, 10, LeastConnections)
281+
})
282+
})
283+
t.Run("unix", func(t *testing.T) {
284+
t.Run("1-loop", func(t *testing.T) {
285+
runClient(t, "unix", "gnet1.sock", true, true, false, false, 10, RoundRobin)
286+
})
287+
t.Run("N-loop", func(t *testing.T) {
288+
runClient(t, "unix", "gnet2.sock", true, true, true, false, 10, LeastConnections)
289+
})
290+
})
291+
t.Run("unix-async", func(t *testing.T) {
292+
t.Run("1-loop", func(t *testing.T) {
293+
runClient(t, "unix", "gnet1.sock", true, true, false, true, 10, RoundRobin)
294+
})
295+
t.Run("N-loop", func(t *testing.T) {
296+
runClient(t, "unix", "gnet2.sock", true, true, true, true, 10, LeastConnections)
195297
})
196298
})
197299
})
198300
}
199301

200-
type testClientServer struct {
302+
type testClient struct {
201303
*BuiltinEventEngine
202304
client *Client
203305
tester *testing.T
@@ -215,20 +317,20 @@ type testClientServer struct {
215317
udpReadHeader int32
216318
}
217319

218-
func (s *testClientServer) OnBoot(eng Engine) (action Action) {
320+
func (s *testClient) OnBoot(eng Engine) (action Action) {
219321
s.eng = eng
220322
return
221323
}
222324

223-
func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) {
325+
func (s *testClient) OnOpen(c Conn) (out []byte, action Action) {
224326
c.SetContext(&sync.Once{})
225327
atomic.AddInt32(&s.connected, 1)
226328
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
227329
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
228330
return
229331
}
230332

231-
func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
333+
func (s *testClient) OnClose(c Conn, err error) (action Action) {
232334
if err != nil {
233335
logging.Debugf("error occurred on closed, %v\n", err)
234336
}
@@ -246,13 +348,13 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
246348
return
247349
}
248350

249-
func (s *testClientServer) OnShutdown(Engine) {
351+
func (s *testClient) OnShutdown(Engine) {
250352
if s.network == "udp" {
251353
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
252354
}
253355
}
254356

255-
func (s *testClientServer) OnTraffic(c Conn) (action Action) {
357+
func (s *testClient) OnTraffic(c Conn) (action Action) {
256358
readHeader := func() {
257359
ping := make([]byte, len(pingMsg))
258360
n, err := io.ReadFull(c, ping)
@@ -302,7 +404,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
302404
return
303405
}
304406

305-
func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
407+
func (s *testClient) OnTick() (delay time.Duration, action Action) {
306408
delay = time.Second / 5
307409
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
308410
for i := 0; i < s.nclients; i++ {
@@ -321,8 +423,8 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
321423
return
322424
}
323425

324-
func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reuseaddr, multicore, async bool, nclients int, lb LoadBalancing) {
325-
ts := &testClientServer{
426+
func runClient(t *testing.T, network, addr string, et, reuseport, multicore, async bool, nclients int, lb LoadBalancing) {
427+
ts := &testClient{
326428
tester: t,
327429
network: network,
328430
addr: addr,
@@ -347,10 +449,10 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
347449

348450
err = Run(ts,
349451
network+"://"+addr,
452+
WithEdgeTriggeredIO(et),
350453
WithLockOSThread(async),
351454
WithMulticore(multicore),
352455
WithReusePort(reuseport),
353-
WithReuseAddr(reuseaddr),
354456
WithTicker(true),
355457
WithTCPKeepAlive(time.Minute*1),
356458
WithTCPNoDelay(TCPDelay),

0 commit comments

Comments
 (0)