Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

opt: refine the code of I/O handlers #586

Merged
merged 3 commits into from
Apr 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 0 additions & 28 deletions acceptor_bsd.go

This file was deleted.

28 changes: 0 additions & 28 deletions acceptor_linux.go

This file was deleted.

21 changes: 12 additions & 9 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
"github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
Expand All @@ -41,33 +41,33 @@
// It's a silly error, let's retry it.
continue
default:
eng.opts.Logger.Errorf("Accept() failed due to error: %v", err)
el.getLogger().Errorf("Accept() failed due to error: %v", err)

Check warning on line 44 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L44

Added line #L44 was not covered by tests
return errors.ErrAcceptSocket
}
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds()))
if err != nil {
eng.opts.Logger.Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)

Check warning on line 53 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L53

Added line #L53 was not covered by tests
}
}

el := eng.eventLoops.next(remoteAddr)
el := el.engine.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err)

Check warning on line 61 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L61

Added line #L61 was not covered by tests
_ = unix.Close(nfd)
c.release()
}
}
}

func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.listeners[fd].network == "udp" {
return el.readUDP1(fd, ev, flags)
return el.readUDP(fd, ev, flags)
}

nfd, sa, err := socket.Accept(fd)
Expand Down Expand Up @@ -98,6 +98,9 @@
addEvents = el.poller.AddReadWrite
}
if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil {
el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err)
_ = unix.Close(c.fd)
c.release()

Check warning on line 103 in acceptor_unix.go

View check run for this annotation

Codecov / codecov/patch

acceptor_unix.go#L101-L103

Added lines #L101 - L103 were not covered by tests
return err
}
el.connections.addConn(c, el.idx)
Expand Down
6 changes: 1 addition & 5 deletions connection_bsd.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
)

func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) {
func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
el := c.loop
switch filter {
case unix.EVFILT_READ:
Expand Down Expand Up @@ -56,7 +56,3 @@ func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) {
}
return
}

func (el *eventloop) readUDP(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error {
return el.readUDP1(fd, filter, flags)
}
6 changes: 1 addition & 5 deletions connection_linux.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/panjf2000/gnet/v2/internal/netpoll"
)

func (c *conn) handleEvents(_ int, ev uint32) error {
func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
el := c.loop
// First check for any unexpected non-IO events.
// For these events we just close the corresponding connection directly.
Expand Down Expand Up @@ -68,7 +68,3 @@ func (c *conn) handleEvents(_ int, ev uint32) error {
}
return nil
}

func (el *eventloop) readUDP(fd int, ev netpoll.IOEvent) error {
return el.readUDP1(fd, ev, 0)
}
2 changes: 1 addition & 1 deletion connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, localAddr, remoteAddr n
remoteAddr: remoteAddr,
pollAttachment: netpoll.PollAttachment{FD: fd},
}
c.pollAttachment.Callback = c.handleEvents
c.pollAttachment.Callback = c.processIO
c.outboundBuffer.Reset(el.engine.opts.WriteBufferCap)
return
}
Expand Down
18 changes: 9 additions & 9 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,9 +34,9 @@ import (
)

type engine struct {
listeners map[int]*listener // listeners for accepting new connections
listeners map[int]*listener // listeners for accepting incoming connections
opts *Options // options with engine
acceptor *eventloop // main event-loop for accepting connections
ingress *eventloop // main event-loop that monitors all listeners
eventLoops loadBalancer // event-loops for handling events
inShutdown int32 // whether the engine is in shutdown
ticker struct {
Expand Down Expand Up @@ -76,11 +76,11 @@ func (eng *engine) closeEventLoops() {
_ = el.poller.Close()
return true
})
if eng.acceptor != nil {
if eng.ingress != nil {
for _, ln := range eng.listeners {
ln.close()
}
err := eng.acceptor.poller.Close()
err := eng.ingress.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
Expand Down Expand Up @@ -175,19 +175,19 @@ func (eng *engine) activateReactors(numEventLoop int) error {
el.poller = p
el.eventHandler = eng.eventHandler
for _, ln := range eng.listeners {
if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), true); err != nil {
if err = el.poller.AddRead(ln.packPollAttachment(el.accept0), true); err != nil {
return err
}
}
eng.acceptor = el
eng.ingress = el

// Start main reactor in background.
eng.workerPool.Go(el.rotate)

// Start the ticker.
if eng.opts.Ticker {
eng.workerPool.Go(func() error {
eng.acceptor.ticker(eng.ticker.ctx)
eng.ingress.ticker(eng.ticker.ctx)
return nil
})
}
Expand Down Expand Up @@ -217,8 +217,8 @@ func (eng *engine) stop(s Engine) {
}
return true
})
if eng.acceptor != nil {
err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if eng.ingress != nil {
err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err)
}
Expand Down
2 changes: 1 addition & 1 deletion eventloop_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -306,7 +306,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error {
}
}

func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
n, sa, err := unix.Recvfrom(fd, el.buffer, 0)
if err != nil {
if err == unix.EAGAIN {
Expand Down
3 changes: 3 additions & 0 deletions internal/netpoll/defs_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ package netpoll
// IOFlags represents the flags of IO events.
type IOFlags = uint16

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, IOEvent, IOFlags) error

// PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent.
type PollAttachment struct {
FD int
Expand Down
3 changes: 0 additions & 3 deletions internal/netpoll/defs_poller_epoll.go
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,6 @@ const (
ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP
)

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, uint32) error

type eventList struct {
size int
events []epollevent
Expand Down
3 changes: 0 additions & 3 deletions internal/netpoll/defs_poller_kqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,6 @@ const (
MaxAsyncTasksAtOneTime = 128
)

// PollEventHandler is the callback for I/O events notified by the poller.
type PollEventHandler func(int, int16, uint16) error

type eventList struct {
size int
events []unix.Kevent_t
Expand Down
2 changes: 1 addition & 1 deletion internal/netpoll/poller_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -135,7 +135,7 @@ func (p *Poller) Polling(callback PollEventHandler) error {
if fd := int(ev.Fd); fd == p.efd { // poller is awakened to run tasks in queues.
doChores = true
} else {
switch err = callback(fd, ev.Events); err {
switch err = callback(fd, ev.Events, 0); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
Expand Down
2 changes: 1 addition & 1 deletion internal/netpoll/poller_epoll_ultimate.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,7 @@ func (p *Poller) Polling() error {
if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues.
doChores = true
} else {
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err {
switch err = pollAttachment.Callback(pollAttachment.FD, ev.events, 0); err {
case nil:
case errors.ErrAcceptSocket, errors.ErrEngineShutdown:
return err
Expand Down
12 changes: 6 additions & 6 deletions reactor_epoll_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (el *eventloop) rotate() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(el.engine.accept)
err := el.poller.Polling(el.accept0)
if err == errors.ErrEngineShutdown {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
Expand All @@ -52,10 +52,10 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, ev uint32) error {
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
// Somehow epoll notify with an event for a stale fd that is not in our connection set.
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)
}
Expand Down Expand Up @@ -121,13 +121,13 @@ func (el *eventloop) run() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, ev uint32) error {
err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
c := el.connections.getConn(fd)
if c == nil {
if _, ok := el.listeners[fd]; ok {
return el.accept(fd, ev)
return el.accept(fd, ev, flags)
}
// Somehow epoll notify with an event for a stale fd that is not in our connection set.
// Somehow epoll notified with an event for a stale fd that is not in our connection set.
// We need to delete it from the epoll set.
return el.poller.Delete(fd)

Expand Down
7 changes: 4 additions & 3 deletions reactor_kqueue_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (

"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/pkg/errors"
)

Expand All @@ -33,7 +34,7 @@ func (el *eventloop) rotate() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(el.engine.accept)
err := el.poller.Polling(el.accept0)
if err == errors.ErrEngineShutdown {
el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err)
err = nil
Expand All @@ -52,7 +53,7 @@ func (el *eventloop) orbit() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) {
err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
c := el.connections.getConn(fd)
if c == nil {
// This might happen when the connection has already been closed,
Expand Down Expand Up @@ -110,7 +111,7 @@ func (el *eventloop) run() error {
defer runtime.UnlockOSThread()
}

err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) {
err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) {
c := el.connections.getConn(fd)
if c == nil {
if _, ok := el.listeners[fd]; ok {
Expand Down
Loading