Skip to content

Commit 7f2f8f0

Browse files
committed
bug: read the remaining data after the peer wrote and closed on BSD
Fixes #529, along with refactoring a few code
1 parent 1bf7af4 commit 7f2f8f0

16 files changed

+190
-64
lines changed

acceptor_bsd.go

+10
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,16 @@
1717

1818
package gnet
1919

20+
import "github.com/panjf2000/gnet/v2/internal/netpoll"
21+
22+
func (eng *engine) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
23+
return eng.accept1(fd, ev, flags)
24+
}
25+
26+
func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
27+
return el.accept1(fd, ev, flags)
28+
}
29+
2030
// The canonical BSD sockets implementation will inherit file status flags
2131
// from the listening socket, so we don't need to set the non-blocking flag
2232
// for the accepted sockets explicitly.

acceptor_linux.go

+13-1
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,19 @@
1717

1818
package gnet
1919

20-
import "golang.org/x/sys/unix"
20+
import (
21+
"golang.org/x/sys/unix"
22+
23+
"github.com/panjf2000/gnet/v2/internal/netpoll"
24+
)
25+
26+
func (eng *engine) accept(fd int, ev netpoll.IOEvent) error {
27+
return eng.accept1(fd, ev, 0)
28+
}
29+
30+
func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error {
31+
return el.accept1(fd, ev, 0)
32+
}
2133

2234
func setNonBlock(fd int, nonBlocking bool) error {
2335
return unix.SetNonblock(fd, nonBlocking)

acceptor_unix.go

+3-3
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ import (
2929
"github.com/panjf2000/gnet/v2/pkg/logging"
3030
)
3131

32-
func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
32+
func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
3333
nfd, sa, err := unix.Accept(fd)
3434
if err != nil {
3535
switch err {
@@ -64,9 +64,9 @@ func (eng *engine) accept(fd int, _ netpoll.IOEvent) error {
6464
return nil
6565
}
6666

67-
func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error {
67+
func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
6868
if el.ln.network == "udp" {
69-
return el.readUDP(fd, ev)
69+
return el.readUDP1(fd, ev, flags)
7070
}
7171

7272
nfd, sa, err := unix.Accept(el.ln.fd)

client_test.go

+82
Original file line numberDiff line numberDiff line change
@@ -491,3 +491,85 @@ func TestWakeConnImmediately(t *testing.T) {
491491
err = Run(serverEV, serverEV.network+"://"+serverEV.addr, WithTicker(true))
492492
assert.NoError(t, err)
493493
}
494+
495+
func TestClientReadOnEOF(t *testing.T) {
496+
ln, err := net.Listen("tcp", "127.0.0.1:9999")
497+
assert.NoError(t, err)
498+
defer ln.Close()
499+
500+
go func() {
501+
for {
502+
conn, err := ln.Accept()
503+
if err != nil {
504+
break
505+
}
506+
go process(conn)
507+
}
508+
}()
509+
510+
ev := &clientReadOnEOF{
511+
result: make(chan struct {
512+
data []byte
513+
err error
514+
}, 1),
515+
data: []byte("test"),
516+
}
517+
cli, err := NewClient(ev)
518+
assert.NoError(t, err)
519+
defer cli.Stop() //nolint:errcheck
520+
521+
err = cli.Start()
522+
assert.NoError(t, err)
523+
524+
_, err = cli.Dial("tcp", "127.0.0.1:9999")
525+
assert.NoError(t, err)
526+
527+
select {
528+
case res := <-ev.result:
529+
assert.NoError(t, res.err)
530+
assert.EqualValuesf(t, ev.data, res.data, "expected: %v, but got: %v", ev.data, res.data)
531+
case <-time.After(5 * time.Second):
532+
t.Errorf("timeout waiting for the result")
533+
}
534+
}
535+
536+
func process(conn net.Conn) {
537+
defer conn.Close() //noliint:errcheck
538+
buf := make([]byte, 8)
539+
n, err := conn.Read(buf)
540+
if err != nil {
541+
return
542+
}
543+
_, _ = conn.Write(buf[:n])
544+
_ = conn.Close()
545+
}
546+
547+
type clientReadOnEOF struct {
548+
BuiltinEventEngine
549+
data []byte
550+
result chan struct {
551+
data []byte
552+
err error
553+
}
554+
}
555+
556+
func (clientReadOnEOF) OnBoot(Engine) (action Action) {
557+
return None
558+
}
559+
560+
func (cli clientReadOnEOF) OnOpen(Conn) (out []byte, action Action) {
561+
return cli.data, None
562+
}
563+
564+
func (clientReadOnEOF) OnClose(Conn, error) (action Action) {
565+
return Close
566+
}
567+
568+
func (cli clientReadOnEOF) OnTraffic(c Conn) (action Action) {
569+
data, err := c.Next(-1)
570+
cli.result <- struct {
571+
data []byte
572+
err error
573+
}{data: data, err: err}
574+
return None
575+
}

connection_bsd.go

+19-8
Original file line numberDiff line numberDiff line change
@@ -18,21 +18,32 @@
1818
package gnet
1919

2020
import (
21-
"golang.org/x/sys/unix"
21+
"io"
2222

2323
"github.com/panjf2000/gnet/v2/internal/netpoll"
2424
)
2525

26-
func (c *conn) handleEvents(_ int, filter int16) (err error) {
27-
switch filter {
28-
case netpoll.EVFilterSock:
29-
err = c.loop.close(c, unix.ECONNRESET)
30-
case netpoll.EVFilterWrite:
31-
if !c.outboundBuffer.IsEmpty() {
26+
func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) {
27+
switch {
28+
case flags&netpoll.EVFlagsDelete != 0:
29+
case flags&netpoll.EVFlagsEOF != 0:
30+
if filter == netpoll.EVFilterRead { // read the remaining data after the peer wrote and closed immediately
31+
err = c.loop.read(c)
32+
} else if filter == netpoll.EVFilterWrite && !c.outboundBuffer.IsEmpty() {
3233
err = c.loop.write(c)
34+
} else {
35+
err = c.loop.close(c, io.EOF)
3336
}
34-
case netpoll.EVFilterRead:
37+
case filter == netpoll.EVFilterRead:
3538
err = c.loop.read(c)
39+
case filter == netpoll.EVFilterWrite:
40+
if !c.outboundBuffer.IsEmpty() {
41+
err = c.loop.write(c)
42+
}
3643
}
3744
return
3845
}
46+
47+
func (el *eventloop) readUDP(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error {
48+
return el.readUDP1(fd, filter, flags)
49+
}

connection_linux.go

+4
Original file line numberDiff line numberDiff line change
@@ -42,3 +42,7 @@ func (c *conn) handleEvents(_ int, ev uint32) error {
4242

4343
return nil
4444
}
45+
46+
func (el *eventloop) readUDP(fd int, ev netpoll.IOEvent) error {
47+
return el.readUDP1(fd, ev, 0)
48+
}

eventloop_unix.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -272,7 +272,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
272272
}
273273
}
274274

275-
func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent) error {
275+
func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
276276
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
277277
if err != nil {
278278
if err == unix.EAGAIN || err == unix.EWOULDBLOCK {

internal/netpoll/epoll_default_poller.go

+5-5
Original file line numberDiff line numberDiff line change
@@ -115,7 +115,7 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
115115
}
116116

117117
// Polling blocks the current goroutine, waiting for network-events.
118-
func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
118+
func (p *Poller) Polling(callback PollEventHandler) error {
119119
el := newEventList(InitPollEventsCap)
120120
var doChores bool
121121

@@ -134,17 +134,17 @@ func (p *Poller) Polling(callback func(fd int, ev uint32) error) error {
134134

135135
for i := 0; i < n; i++ {
136136
ev := &el.events[i]
137-
if fd := int(ev.Fd); fd != p.efd {
137+
if fd := int(ev.Fd); fd == p.efd { // poller is awakened to run tasks in queues.
138+
doChores = true
139+
_, _ = unix.Read(p.efd, p.efdBuf)
140+
} else {
138141
switch err = callback(fd, ev.Events); err {
139142
case nil:
140143
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
141144
return err
142145
default:
143146
logging.Warnf("error occurs in event-loop: %v", err)
144147
}
145-
} else { // poller is awakened to run tasks in queues.
146-
doChores = true
147-
_, _ = unix.Read(p.efd, p.efdBuf)
148148
}
149149
}
150150

internal/netpoll/epoll_optimized_poller.go

+4-4
Original file line numberDiff line numberDiff line change
@@ -136,17 +136,17 @@ func (p *Poller) Polling() error {
136136
for i := 0; i < n; i++ {
137137
ev := &el.events[i]
138138
pollAttachment := *(**PollAttachment)(unsafe.Pointer(&ev.data))
139-
if pollAttachment.FD != p.epa.FD {
139+
if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues.
140+
doChores = true
141+
_, _ = unix.Read(p.epa.FD, p.efdBuf)
142+
} else {
140143
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err {
141144
case nil:
142145
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
143146
return err
144147
default:
145148
logging.Warnf("error occurs in event-loop: %v", err)
146149
}
147-
} else { // poller is awakened to run tasks in queues.
148-
doChores = true
149-
_, _ = unix.Read(p.epa.FD, p.efdBuf)
150150
}
151151
}
152152

internal/netpoll/kqueue_default_poller.go

+5-10
Original file line numberDiff line numberDiff line change
@@ -106,7 +106,7 @@ func (p *Poller) Trigger(fn queue.TaskFunc, arg interface{}) (err error) {
106106
}
107107

108108
// Polling blocks the current goroutine, waiting for network-events.
109-
func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
109+
func (p *Poller) Polling(callback PollEventHandler) error {
110110
el := newEventList(InitPollEventsCap)
111111

112112
var (
@@ -126,23 +126,18 @@ func (p *Poller) Polling(callback func(fd int, filter int16) error) error {
126126
}
127127
tsp = &ts
128128

129-
var evFilter int16
130129
for i := 0; i < n; i++ {
131130
ev := &el.events[i]
132-
if fd := int(ev.Ident); fd != 0 {
133-
evFilter = el.events[i].Filter
134-
if (ev.Flags&unix.EV_EOF != 0) || (ev.Flags&unix.EV_ERROR != 0) {
135-
evFilter = EVFilterSock
136-
}
137-
switch err = callback(fd, evFilter); err {
131+
if fd := int(ev.Ident); fd == 0 { // poller is awakened to run tasks in queues
132+
doChores = true
133+
} else {
134+
switch err = callback(fd, ev.Filter, ev.Flags); err {
138135
case nil:
139136
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
140137
return err
141138
default:
142139
logging.Warnf("error occurs in event-loop: %v", err)
143140
}
144-
} else { // poller is awakened to run tasks in queues.
145-
doChores = true
146141
}
147142
}
148143

internal/netpoll/kqueue_events.go

+4-3
Original file line numberDiff line numberDiff line change
@@ -35,9 +35,10 @@ const (
3535
EVFilterWrite = unix.EVFILT_WRITE
3636
// EVFilterRead represents readable events from sockets.
3737
EVFilterRead = unix.EVFILT_READ
38-
// EVFilterSock represents exceptional events that are not read/write, like socket being closed,
39-
// reading/writing from/to a closed socket, etc.
40-
EVFilterSock = -0xd
38+
// EVFlagsDelete indicates an event has been removed from the kqueue.
39+
EVFlagsDelete = unix.EV_DELETE
40+
// EVFlagsEOF indicates filter-specific EOF condition.
41+
EVFlagsEOF = unix.EV_EOF
4142
)
4243

4344
type eventList struct {

internal/netpoll/kqueue_optimized_poller.go

+4-9
Original file line numberDiff line numberDiff line change
@@ -127,24 +127,19 @@ func (p *Poller) Polling() error {
127127
}
128128
tsp = &ts
129129

130-
var evFilter int16
131130
for i := 0; i < n; i++ {
132131
ev := &el.events[i]
133-
if ev.Ident != 0 {
134-
evFilter = ev.Filter
135-
if (ev.Flags&unix.EV_EOF != 0) || (ev.Flags&unix.EV_ERROR != 0) {
136-
evFilter = EVFilterSock
137-
}
132+
if ev.Ident == 0 { // poller is awakened to run tasks in queues
133+
doChores = true
134+
} else {
138135
pollAttachment := (*PollAttachment)(unsafe.Pointer(ev.Udata))
139-
switch err = pollAttachment.Callback(int(ev.Ident), evFilter); err {
136+
switch err = pollAttachment.Callback(int(ev.Ident), ev.Filter, ev.Flags); err {
140137
case nil:
141138
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
142139
return err
143140
default:
144141
logging.Warnf("error occurs in event-loop: %v", err)
145142
}
146-
} else { // poller is awakened to run tasks in queues.
147-
doChores = true
148143
}
149144
}
150145

internal/netpoll/poll_data_bsd.go

+1-1
Original file line numberDiff line numberDiff line change
@@ -18,4 +18,4 @@
1818
package netpoll
1919

2020
// PollEventHandler is the callback for I/O events notified by the poller.
21-
type PollEventHandler func(int, int16) error
21+
type PollEventHandler func(int, int16, uint16) error

internal/netpoll/poll_data_unix.go

+3
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,9 @@
1717

1818
package netpoll
1919

20+
// IOFlags represents the flags of IO events.
21+
type IOFlags = uint16
22+
2023
// PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent.
2124
type PollAttachment struct {
2225
FD int

0 commit comments

Comments
 (0)