From 608c6a4052b9be5f1263d3756d8f92d7d5faee67 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 21 Apr 2024 22:00:20 +0800 Subject: [PATCH 1/3] opt: refine the code of I/O handlers --- acceptor_bsd.go | 28 ----------------------- acceptor_linux.go | 28 ----------------------- acceptor_unix.go | 21 +++++++++-------- connection_bsd.go | 6 +---- connection_linux.go | 6 +---- connection_unix.go | 2 +- engine_unix.go | 2 +- eventloop_unix.go | 2 +- internal/netpoll/defs_poller.go | 3 +++ internal/netpoll/defs_poller_epoll.go | 3 --- internal/netpoll/defs_poller_kqueue.go | 3 --- internal/netpoll/poller_epoll_default.go | 2 +- internal/netpoll/poller_epoll_ultimate.go | 2 +- reactor_epoll_default.go | 12 +++++----- reactor_kqueue_default.go | 2 +- 15 files changed, 29 insertions(+), 93 deletions(-) delete mode 100644 acceptor_bsd.go delete mode 100644 acceptor_linux.go diff --git a/acceptor_bsd.go b/acceptor_bsd.go deleted file mode 100644 index 3b717e3b0..000000000 --- a/acceptor_bsd.go +++ /dev/null @@ -1,28 +0,0 @@ -// Copyright (c) 2023 The Gnet Authors. All rights reserved. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build freebsd || dragonfly || netbsd || openbsd || darwin -// +build freebsd dragonfly netbsd openbsd darwin - -package gnet - -import "github.com/panjf2000/gnet/v2/internal/netpoll" - -func (eng *engine) accept(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { - return eng.accept1(fd, filter, flags) -} - -func (el *eventloop) accept(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { - return el.accept1(fd, filter, flags) -} diff --git a/acceptor_linux.go b/acceptor_linux.go deleted file mode 100644 index 04931b64c..000000000 --- a/acceptor_linux.go +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright (c) 2023 The Gnet Authors. All rights reserved. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ - -package gnet - -import "github.com/panjf2000/gnet/v2/internal/netpoll" - -func (eng *engine) accept(fd int, ev netpoll.IOEvent) error { - return eng.accept1(fd, ev, 0) -} - -func (el *eventloop) accept(fd int, ev netpoll.IOEvent) error { - return el.accept1(fd, ev, 0) -} diff --git a/acceptor_unix.go b/acceptor_unix.go index ea37b5254..1afad82b4 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -28,7 +28,7 @@ import ( "github.com/panjf2000/gnet/v2/pkg/errors" ) -func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { +func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { for { nfd, sa, err := socket.Accept(fd) if err != nil { @@ -41,33 +41,33 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { // It's a silly error, let's retry it. continue default: - eng.opts.Logger.Errorf("Accept() failed due to error: %v", err) + el.getLogger().Errorf("Accept() failed due to error: %v", err) return errors.ErrAcceptSocket } } remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa) - if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" { - err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds())) + if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" { + err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds())) if err != nil { - eng.opts.Logger.Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err) + el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err) } } - el := eng.eventLoops.next(remoteAddr) + el := el.engine.eventLoops.next(remoteAddr) c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr) err = el.poller.Trigger(queue.HighPriority, el.register, c) if err != nil { - eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err) + el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err) _ = unix.Close(nfd) c.release() } } } -func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { +func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { if el.listeners[fd].network == "udp" { - return el.readUDP1(fd, ev, flags) + return el.readUDP(fd, ev, flags) } nfd, sa, err := socket.Accept(fd) @@ -98,6 +98,9 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) addEvents = el.poller.AddReadWrite } if err = addEvents(&c.pollAttachment, el.engine.opts.EdgeTriggeredIO); err != nil { + el.getLogger().Errorf("failed to register the accepted socket fd=%d to poller: %v", c.fd, err) + _ = unix.Close(c.fd) + c.release() return err } el.connections.addConn(c, el.idx) diff --git a/connection_bsd.go b/connection_bsd.go index 6a085c166..b24becffb 100644 --- a/connection_bsd.go +++ b/connection_bsd.go @@ -25,7 +25,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/netpoll" ) -func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) { +func (c *conn) processIO(_ int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) { el := c.loop switch filter { case unix.EVFILT_READ: @@ -56,7 +56,3 @@ func (c *conn) handleEvents(_ int, filter int16, flags uint16) (err error) { } return } - -func (el *eventloop) readUDP(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) error { - return el.readUDP1(fd, filter, flags) -} diff --git a/connection_linux.go b/connection_linux.go index 30af24d59..ac8376e4c 100644 --- a/connection_linux.go +++ b/connection_linux.go @@ -25,7 +25,7 @@ import ( "github.com/panjf2000/gnet/v2/internal/netpoll" ) -func (c *conn) handleEvents(_ int, ev uint32) error { +func (c *conn) processIO(_ int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { el := c.loop // First check for any unexpected non-IO events. // For these events we just close the corresponding connection directly. @@ -68,7 +68,3 @@ func (c *conn) handleEvents(_ int, ev uint32) error { } return nil } - -func (el *eventloop) readUDP(fd int, ev netpoll.IOEvent) error { - return el.readUDP1(fd, ev, 0) -} diff --git a/connection_unix.go b/connection_unix.go index eb0ab3b59..101a51e96 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -63,7 +63,7 @@ func newTCPConn(fd int, el *eventloop, sa unix.Sockaddr, localAddr, remoteAddr n remoteAddr: remoteAddr, pollAttachment: netpoll.PollAttachment{FD: fd}, } - c.pollAttachment.Callback = c.handleEvents + c.pollAttachment.Callback = c.processIO c.outboundBuffer.Reset(el.engine.opts.WriteBufferCap) return } diff --git a/engine_unix.go b/engine_unix.go index 5d380e70f..30bca2215 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -175,7 +175,7 @@ func (eng *engine) activateReactors(numEventLoop int) error { el.poller = p el.eventHandler = eng.eventHandler for _, ln := range eng.listeners { - if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), true); err != nil { + if err = el.poller.AddRead(ln.packPollAttachment(el.accept0), true); err != nil { return err } } diff --git a/eventloop_unix.go b/eventloop_unix.go index 41341be2d..c0b326804 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -306,7 +306,7 @@ func (el *eventloop) handleAction(c *conn, action Action) error { } } -func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { +func (el *eventloop) readUDP(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { n, sa, err := unix.Recvfrom(fd, el.buffer, 0) if err != nil { if err == unix.EAGAIN { diff --git a/internal/netpoll/defs_poller.go b/internal/netpoll/defs_poller.go index 86d5a51d2..79f27d2ea 100644 --- a/internal/netpoll/defs_poller.go +++ b/internal/netpoll/defs_poller.go @@ -20,6 +20,9 @@ package netpoll // IOFlags represents the flags of IO events. type IOFlags = uint16 +// PollEventHandler is the callback for I/O events notified by the poller. +type PollEventHandler func(int, IOEvent, IOFlags) error + // PollAttachment is the user data which is about to be stored in "void *ptr" of epoll_data or "void *udata" of kevent. type PollAttachment struct { FD int diff --git a/internal/netpoll/defs_poller_epoll.go b/internal/netpoll/defs_poller_epoll.go index d67f36d6f..81501524b 100644 --- a/internal/netpoll/defs_poller_epoll.go +++ b/internal/netpoll/defs_poller_epoll.go @@ -36,9 +36,6 @@ const ( ErrEvents = unix.EPOLLERR | unix.EPOLLHUP | unix.EPOLLRDHUP ) -// PollEventHandler is the callback for I/O events notified by the poller. -type PollEventHandler func(int, uint32) error - type eventList struct { size int events []epollevent diff --git a/internal/netpoll/defs_poller_kqueue.go b/internal/netpoll/defs_poller_kqueue.go index 0b2e883b8..89ea29ae8 100644 --- a/internal/netpoll/defs_poller_kqueue.go +++ b/internal/netpoll/defs_poller_kqueue.go @@ -33,9 +33,6 @@ const ( MaxAsyncTasksAtOneTime = 128 ) -// PollEventHandler is the callback for I/O events notified by the poller. -type PollEventHandler func(int, int16, uint16) error - type eventList struct { size int events []unix.Kevent_t diff --git a/internal/netpoll/poller_epoll_default.go b/internal/netpoll/poller_epoll_default.go index c21765772..20b9cb213 100644 --- a/internal/netpoll/poller_epoll_default.go +++ b/internal/netpoll/poller_epoll_default.go @@ -135,7 +135,7 @@ func (p *Poller) Polling(callback PollEventHandler) error { if fd := int(ev.Fd); fd == p.efd { // poller is awakened to run tasks in queues. doChores = true } else { - switch err = callback(fd, ev.Events); err { + switch err = callback(fd, ev.Events, 0); err { case nil: case errors.ErrAcceptSocket, errors.ErrEngineShutdown: return err diff --git a/internal/netpoll/poller_epoll_ultimate.go b/internal/netpoll/poller_epoll_ultimate.go index 479415366..844914d8a 100644 --- a/internal/netpoll/poller_epoll_ultimate.go +++ b/internal/netpoll/poller_epoll_ultimate.go @@ -137,7 +137,7 @@ func (p *Poller) Polling() error { if pollAttachment.FD == p.epa.FD { // poller is awakened to run tasks in queues. doChores = true } else { - switch err = pollAttachment.Callback(pollAttachment.FD, ev.events); err { + switch err = pollAttachment.Callback(pollAttachment.FD, ev.events, 0); err { case nil: case errors.ErrAcceptSocket, errors.ErrEngineShutdown: return err diff --git a/reactor_epoll_default.go b/reactor_epoll_default.go index bd0e7d88d..83be6d525 100644 --- a/reactor_epoll_default.go +++ b/reactor_epoll_default.go @@ -33,7 +33,7 @@ func (el *eventloop) rotate() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(el.engine.accept) + err := el.poller.Polling(el.accept0) if err == errors.ErrEngineShutdown { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil @@ -52,10 +52,10 @@ func (el *eventloop) orbit() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, ev uint32) error { + err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, _ netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { - // Somehow epoll notify with an event for a stale fd that is not in our connection set. + // Somehow epoll notified with an event for a stale fd that is not in our connection set. // We need to delete it from the epoll set. return el.poller.Delete(fd) } @@ -121,13 +121,13 @@ func (el *eventloop) run() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, ev uint32) error { + err := el.poller.Polling(func(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { c := el.connections.getConn(fd) if c == nil { if _, ok := el.listeners[fd]; ok { - return el.accept(fd, ev) + return el.accept(fd, ev, flags) } - // Somehow epoll notify with an event for a stale fd that is not in our connection set. + // Somehow epoll notified with an event for a stale fd that is not in our connection set. // We need to delete it from the epoll set. return el.poller.Delete(fd) diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go index bdc103f19..7b7a85128 100644 --- a/reactor_kqueue_default.go +++ b/reactor_kqueue_default.go @@ -33,7 +33,7 @@ func (el *eventloop) rotate() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(el.engine.accept) + err := el.poller.Polling(el.accept0) if err == errors.ErrEngineShutdown { el.getLogger().Debugf("main reactor is exiting in terms of the demand from user, %v", err) err = nil From a9c132d8ac305cc301e4f13e36d5df5a441748f0 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 21 Apr 2024 22:16:09 +0800 Subject: [PATCH 2/3] update code --- reactor_kqueue_default.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go index 7b7a85128..19ca75602 100644 --- a/reactor_kqueue_default.go +++ b/reactor_kqueue_default.go @@ -24,6 +24,7 @@ import ( "golang.org/x/sys/unix" + "github.com/panjf2000/gnet/v2/internal/netpoll" "github.com/panjf2000/gnet/v2/pkg/errors" ) @@ -52,7 +53,7 @@ func (el *eventloop) orbit() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { + err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) { c := el.connections.getConn(fd) if c == nil { // This might happen when the connection has already been closed, @@ -110,7 +111,7 @@ func (el *eventloop) run() error { defer runtime.UnlockOSThread() } - err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { + err := el.poller.Polling(func(fd int, filter netpoll.IOEvent, flags netpoll.IOFlags) (err error) { c := el.connections.getConn(fd) if c == nil { if _, ok := el.listeners[fd]; ok { From aee48171e34ba03c756029eabcd0930b30c28550 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 21 Apr 2024 22:27:33 +0800 Subject: [PATCH 3/3] update code --- engine_unix.go | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/engine_unix.go b/engine_unix.go index 30bca2215..18f46c89d 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -34,9 +34,9 @@ import ( ) type engine struct { - listeners map[int]*listener // listeners for accepting new connections + listeners map[int]*listener // listeners for accepting incoming connections opts *Options // options with engine - acceptor *eventloop // main event-loop for accepting connections + ingress *eventloop // main event-loop that monitors all listeners eventLoops loadBalancer // event-loops for handling events inShutdown int32 // whether the engine is in shutdown ticker struct { @@ -76,11 +76,11 @@ func (eng *engine) closeEventLoops() { _ = el.poller.Close() return true }) - if eng.acceptor != nil { + if eng.ingress != nil { for _, ln := range eng.listeners { ln.close() } - err := eng.acceptor.poller.Close() + err := eng.ingress.poller.Close() if err != nil { eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err) } @@ -179,7 +179,7 @@ func (eng *engine) activateReactors(numEventLoop int) error { return err } } - eng.acceptor = el + eng.ingress = el // Start main reactor in background. eng.workerPool.Go(el.rotate) @@ -187,7 +187,7 @@ func (eng *engine) activateReactors(numEventLoop int) error { // Start the ticker. if eng.opts.Ticker { eng.workerPool.Go(func() error { - eng.acceptor.ticker(eng.ticker.ctx) + eng.ingress.ticker(eng.ticker.ctx) return nil }) } @@ -217,8 +217,8 @@ func (eng *engine) stop(s Engine) { } return true }) - if eng.acceptor != nil { - err := eng.acceptor.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) + if eng.ingress != nil { + err := eng.ingress.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errors.ErrEngineShutdown }, nil) if err != nil { eng.opts.Logger.Errorf("failed to enqueue shutdown signal of high-priority for main event-loop: %v", err) }