diff --git a/connection_bsd.go b/connection_bsd.go index 716a00180..5b2e80cf4 100644 --- a/connection_bsd.go +++ b/connection_bsd.go @@ -50,7 +50,7 @@ func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) ( // 1) EVFILT_WRITE|EV_ADD|EV_CLEAR|EV_EOF, 2) EVFILT_READ|EV_ADD|EV_CLEAR|EV_EOF. err = el.write(c) default: - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + c.outboundBuffer.Release() // don't bother to write to a connection that is already broken err = el.close(c, io.EOF) } } diff --git a/connection_linux.go b/connection_linux.go index d14e1f32e..12a57ebca 100644 --- a/connection_linux.go +++ b/connection_linux.go @@ -29,8 +29,8 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { el := c.loop // First check for any unexpected non-IO events. // For these events we just close the connection directly. - if ev&netpoll.ErrEvents != 0 && ev&unix.EPOLLIN == 0 && ev&unix.EPOLLOUT == 0 { - c.outboundBuffer.Release() // don't bother to write to a connection with some unknown error + if ev&(netpoll.ErrEvents|unix.EPOLLRDHUP) != 0 && ev&netpoll.ReadWriteEvents == 0 { + c.outboundBuffer.Release() // don't bother to write to a connection that is already broken return el.close(c, io.EOF) } // Secondly, check for EPOLLOUT before EPOLLIN, the former has a higher priority @@ -43,14 +43,14 @@ func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { // to the remote first and then close the connection. // // We perform eventloop.write for EPOLLOUT because it can take good care of either case. - if ev&(unix.EPOLLOUT|unix.EPOLLERR) != 0 { + if ev&(netpoll.WriteEvents|netpoll.ErrEvents) != 0 { if err := el.write(c); err != nil { return err } } // Check for EPOLLIN before EPOLLRDHUP in case that there are pending data in // the socket buffer. - if ev&(unix.EPOLLIN|unix.EPOLLERR) != 0 { + if ev&(netpoll.ReadEvents|netpoll.ErrEvents) != 0 { if err := el.read(c); err != nil { return err } diff --git a/eventloop_unix.go b/eventloop_unix.go index d2f1c5c22..50684a45b 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -114,11 +114,18 @@ func (el *eventloop) open(c *conn) error { return el.handleAction(c, action) } +func (el *eventloop) read0(itf interface{}) error { + return el.read(itf.(*conn)) +} + +const maxBytesTransferET = 1 << 22 + func (el *eventloop) read(c *conn) error { if !c.opened { return nil } + var recv int isET := el.engine.opts.EdgeTriggeredIO loop: n, err := unix.Read(c.fd, el.buffer) @@ -131,6 +138,7 @@ loop: } return el.close(c, os.NewSyscallError("read", err)) } + recv += n c.buffer = el.buffer[:n] action := el.eventHandler.OnTraffic(c) @@ -144,13 +152,25 @@ loop: _, _ = c.inboundBuffer.Write(c.buffer) c.buffer = c.buffer[:0] - if isET || c.isEOF { + if c.isEOF || (isET && recv < maxBytesTransferET) { goto loop } + // To prevent infinite reading in ET mode and starving other events, + // we need to set up threshold for the maximum read bytes per connection + // on each event-loop. If the threshold is reached and there are still + // unread data in the socket buffer, we must issue another read event manually. + if isET && n == len(el.buffer) { + return el.poller.Trigger(queue.LowPriority, el.read0, c) + } + return nil } +func (el *eventloop) write0(itf interface{}) error { + return el.write(itf.(*conn)) +} + // The default value of UIO_MAXIOV/IOV_MAX is 1024 on Linux and most BSD-like OSs. const iovMax = 1024 @@ -161,8 +181,9 @@ func (el *eventloop) write(c *conn) error { isET := el.engine.opts.EdgeTriggeredIO var ( - n int - err error + n int + sent int + err error ) loop: iov, _ := c.outboundBuffer.Peek(-1) @@ -182,14 +203,24 @@ loop: default: return el.close(c, os.NewSyscallError("write", err)) } - if isET && !c.outboundBuffer.IsEmpty() { + sent += n + + if isET && !c.outboundBuffer.IsEmpty() && sent < maxBytesTransferET { goto loop } // All data have been sent, it's no need to monitor the writable events for LT mode, // remove the writable event from poller to help the future event-loops if necessary. if !isET && c.outboundBuffer.IsEmpty() { - _ = el.poller.ModRead(&c.pollAttachment, false) + return el.poller.ModRead(&c.pollAttachment, false) + } + + // To prevent infinite writing in ET mode and starving other events, + // we need to set up threshold for the maximum write bytes per connection + // on each event-loop. If the threshold is reached and there are still + // pending data to write, we must issue another write event manually. + if isET && !c.outboundBuffer.IsEmpty() { + return el.poller.Trigger(queue.HighPriority, el.write0, c) } return nil diff --git a/eventloop_unix_test.go b/eventloop_unix_test.go index b9cc2121e..e21bb83f1 100644 --- a/eventloop_unix_test.go +++ b/eventloop_unix_test.go @@ -48,7 +48,7 @@ var ( func BenchmarkGC4El100k(b *testing.B) { oldGc := debug.SetGCPercent(-1) - ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 100000) + ts1 := benchServeGC(b, "tcp", ":0", true, 4, 100000) b.Run("Run-4-eventloop-100000", func(b *testing.B) { for i := 0; i < b.N; i++ { runtime.GC() @@ -62,7 +62,7 @@ func BenchmarkGC4El100k(b *testing.B) { func BenchmarkGC4El200k(b *testing.B) { oldGc := debug.SetGCPercent(-1) - ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 200000) + ts1 := benchServeGC(b, "tcp", ":0", true, 4, 200000) b.Run("Run-4-eventloop-200000", func(b *testing.B) { for i := 0; i < b.N; i++ { runtime.GC() @@ -76,7 +76,7 @@ func BenchmarkGC4El200k(b *testing.B) { func BenchmarkGC4El500k(b *testing.B) { oldGc := debug.SetGCPercent(-1) - ts1 := benchServeGC(b, "tcp", ":9001", true, 4, 500000) + ts1 := benchServeGC(b, "tcp", ":0", true, 4, 500000) b.Run("Run-4-eventloop-500000", func(b *testing.B) { for i := 0; i < b.N; i++ { runtime.GC() @@ -146,73 +146,73 @@ func TestServeGC(t *testing.T) { if testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 1, 10000) + testServeGC(t, "tcp", ":0", true, true, 1, 10000) }) t.Run("1-loop-100000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 1, 100000) + testServeGC(t, "tcp", ":0", true, true, 1, 100000) }) t.Run("1-loop-1000000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 1, 1000000) + testServeGC(t, "tcp", ":0", true, true, 1, 1000000) }) t.Run("2-loop-10000", func(t *testing.T) { if testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 2, 10000) + testServeGC(t, "tcp", ":0", true, true, 2, 10000) }) t.Run("2-loop-100000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 2, 100000) + testServeGC(t, "tcp", ":0", true, true, 2, 100000) }) t.Run("2-loop-1000000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 2, 1000000) + testServeGC(t, "tcp", ":0", true, true, 2, 1000000) }) t.Run("4-loop-10000", func(t *testing.T) { if testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 4, 10000) + testServeGC(t, "tcp", ":0", true, true, 4, 10000) }) t.Run("4-loop-100000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 4, 100000) + testServeGC(t, "tcp", ":0", true, true, 4, 100000) }) t.Run("4-loop-1000000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 4, 1000000) + testServeGC(t, "tcp", ":0", true, true, 4, 1000000) }) t.Run("16-loop-10000", func(t *testing.T) { if testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 16, 10000) + testServeGC(t, "tcp", ":0", true, true, 16, 10000) }) t.Run("16-loop-100000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 16, 100000) + testServeGC(t, "tcp", ":0", true, true, 16, 100000) }) t.Run("16-loop-1000000", func(t *testing.T) { if !testBigGC { t.Skipf("Skip when testBigGC=%t", testBigGC) } - testServeGC(t, "tcp", ":9000", true, true, 16, 1000000) + testServeGC(t, "tcp", ":0", true, true, 16, 1000000) }) }) } diff --git a/internal/netpoll/defs_poller_epoll.go b/internal/netpoll/defs_poller_epoll.go index 423439cbe..f6b37feaa 100644 --- a/internal/netpoll/defs_poller_epoll.go +++ b/internal/netpoll/defs_poller_epoll.go @@ -34,9 +34,14 @@ const ( MinPollEventsCap = 32 // MaxAsyncTasksAtOneTime is the maximum amount of asynchronous tasks that the event-loop will process at one time. MaxAsyncTasksAtOneTime = 256 - // ErrEvents represents exceptional events that are not read/write, like socket being closed, - // reading/writing from/to a closed socket, etc. - ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP + // ReadEvents represents readable events that are polled by epoll. + ReadEvents = unix.EPOLLIN | unix.EPOLLPRI + // WriteEvents represents writeable events that are polled by epoll. + WriteEvents = unix.EPOLLOUT + // ReadWriteEvents represents both readable and writeable events. + ReadWriteEvents = ReadEvents | WriteEvents + // ErrEvents represents exceptional events that occurred on the local side. + ErrEvents = unix.EPOLLERR | unix.EPOLLHUP ) type eventList struct { diff --git a/internal/netpoll/poller_epoll_default.go b/internal/netpoll/poller_epoll_default.go index 12a094664..d5e4d4e48 100644 --- a/internal/netpoll/poller_epoll_default.go +++ b/internal/netpoll/poller_epoll_default.go @@ -193,17 +193,11 @@ func (p *Poller) Polling(callback PollEventHandler) error { } } -const ( - readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP - writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP - readWriteEvents = readEvents | writeEvents -) - // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { - var ev uint32 = readWriteEvents + var ev uint32 = ReadWriteEvents if edgeTriggered { - ev |= unix.EPOLLET + ev |= unix.EPOLLET | unix.EPOLLRDHUP } return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) @@ -211,9 +205,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { // AddRead registers the given file-descriptor with readable event to the poller. func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { - var ev uint32 = readEvents + var ev uint32 = ReadEvents if edgeTriggered { - ev |= unix.EPOLLET + ev |= unix.EPOLLET | unix.EPOLLRDHUP } return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) @@ -221,9 +215,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { // AddWrite registers the given file-descriptor with writable event to the poller. func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { - var ev uint32 = writeEvents + var ev uint32 = WriteEvents if edgeTriggered { - ev |= unix.EPOLLET + ev |= unix.EPOLLET | unix.EPOLLRDHUP } return os.NewSyscallError("epoll_ctl add", unix.EpollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) @@ -231,9 +225,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { // ModRead renews the given file-descriptor with readable event in the poller. func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { - var ev uint32 = readEvents + var ev uint32 = ReadEvents if edgeTriggered { - ev |= unix.EPOLLET + ev |= unix.EPOLLET | unix.EPOLLRDHUP } return os.NewSyscallError("epoll_ctl mod", unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) @@ -241,9 +235,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { - var ev uint32 = readWriteEvents + var ev uint32 = ReadWriteEvents if edgeTriggered { - ev |= unix.EPOLLET + ev |= unix.EPOLLET | unix.EPOLLRDHUP } return os.NewSyscallError("epoll_ctl mod", unix.EpollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &unix.EpollEvent{Fd: int32(pa.FD), Events: ev})) diff --git a/internal/netpoll/poller_epoll_ultimate.go b/internal/netpoll/poller_epoll_ultimate.go index 84d415f87..fec70097e 100644 --- a/internal/netpoll/poller_epoll_ultimate.go +++ b/internal/netpoll/poller_epoll_ultimate.go @@ -195,18 +195,12 @@ func (p *Poller) Polling() error { } } -const ( - readEvents = unix.EPOLLIN | unix.EPOLLPRI | unix.EPOLLRDHUP - writeEvents = unix.EPOLLOUT | unix.EPOLLRDHUP - readWriteEvents = readEvents | writeEvents -) - // AddReadWrite registers the given file-descriptor with readable and writable events to the poller. func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent - ev.events = readWriteEvents + ev.events = ReadWriteEvents if edgeTriggered { - ev.events |= unix.EPOLLET + ev.events |= unix.EPOLLET | unix.EPOLLRDHUP } convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) @@ -215,9 +209,9 @@ func (p *Poller) AddReadWrite(pa *PollAttachment, edgeTriggered bool) error { // AddRead registers the given file-descriptor with readable event to the poller. func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent - ev.events = readEvents + ev.events = ReadEvents if edgeTriggered { - ev.events |= unix.EPOLLET + ev.events |= unix.EPOLLET | unix.EPOLLRDHUP } convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) @@ -226,9 +220,9 @@ func (p *Poller) AddRead(pa *PollAttachment, edgeTriggered bool) error { // AddWrite registers the given file-descriptor with writable event to the poller. func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent - ev.events = writeEvents + ev.events = WriteEvents if edgeTriggered { - ev.events |= unix.EPOLLET + ev.events |= unix.EPOLLET | unix.EPOLLRDHUP } convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl add", epollCtl(p.fd, unix.EPOLL_CTL_ADD, pa.FD, &ev)) @@ -237,9 +231,9 @@ func (p *Poller) AddWrite(pa *PollAttachment, edgeTriggered bool) error { // ModRead renews the given file-descriptor with readable event in the poller. func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent - ev.events = readEvents + ev.events = ReadEvents if edgeTriggered { - ev.events |= unix.EPOLLET + ev.events |= unix.EPOLLET | unix.EPOLLRDHUP } convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev)) @@ -248,9 +242,9 @@ func (p *Poller) ModRead(pa *PollAttachment, edgeTriggered bool) error { // ModReadWrite renews the given file-descriptor with readable and writable events in the poller. func (p *Poller) ModReadWrite(pa *PollAttachment, edgeTriggered bool) error { var ev epollevent - ev.events = readWriteEvents + ev.events = ReadWriteEvents if edgeTriggered { - ev.events |= unix.EPOLLET + ev.events |= unix.EPOLLET | unix.EPOLLRDHUP } convertPollAttachment(unsafe.Pointer(&ev.data), pa) return os.NewSyscallError("epoll_ctl mod", epollCtl(p.fd, unix.EPOLL_CTL_MOD, pa.FD, &ev))