From bf7121dfaf6ab1ef3dd122edcc1f459d0b5c00a6 Mon Sep 17 00:00:00 2001 From: Andy Pan Date: Sun, 21 Apr 2024 14:17:23 +0800 Subject: [PATCH] feat: support multiple network addresses binding (#578) Fixes #428 --- README.md | 2 +- README_ZH.md | 2 +- acceptor_unix.go | 10 +- acceptor_windows.go | 100 ++++----- client_test.go | 8 +- client_unix.go | 13 +- client_windows.go | 12 +- connection_unix.go | 6 +- connection_windows.go | 20 +- engine_unix.go | 156 ++++++++------ engine_windows.go | 37 +++- eventloop_unix.go | 25 +-- gnet.go | 140 +++++++++---- gnet_test.go | 407 ++++++++++++++++++++++++++----------- internal/socket/fd_unix.go | 14 +- listener_unix.go | 2 +- listener_windows.go | 14 +- os_unix_test.go | 4 +- pkg/errors/errors.go | 28 +-- reactor_epoll_default.go | 2 +- reactor_kqueue_default.go | 2 +- 21 files changed, 646 insertions(+), 358 deletions(-) diff --git a/README.md b/README.md index d9e9d29c9..0b61d897d 100644 --- a/README.md +++ b/README.md @@ -43,7 +43,7 @@ English | [中文](README_ZH.md) - [x] Implementation of `gnet` Client - [x] **Windows** platform support (For compatibility in development only, do not use it in production) - [x] **Edge-triggered** I/O support -- [ ] Multiple network addresses binding +- [x] Multiple network addresses binding - [ ] **TLS** support - [ ] [io_uring](https://kernel.dk/io_uring.pdf) support diff --git a/README_ZH.md b/README_ZH.md index e126e2656..47011a4f5 100644 --- a/README_ZH.md +++ b/README_ZH.md @@ -43,7 +43,7 @@ - [x] 实现 `gnet` 客户端 - [x] 支持 **Windows** 平台 (仅用于开发环境的兼容性,不要在生产环境中使用) - [x] **Edge-triggered** I/O 支持 -- [ ] 多网络地址绑定 +- [x] 多网络地址绑定 - [ ] 支持 **TLS** - [ ] 支持 [io_uring](https://kernel.dk/io_uring.pdf) diff --git a/acceptor_unix.go b/acceptor_unix.go index 10467b4e6..4c1d3999c 100644 --- a/acceptor_unix.go +++ b/acceptor_unix.go @@ -45,13 +45,13 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { } remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa) - if eng.opts.TCPKeepAlive > 0 && eng.ln.network == "tcp" { + if eng.opts.TCPKeepAlive > 0 && eng.listeners[fd].network == "tcp" { err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds())) logging.Error(err) } el := eng.eventLoops.next(remoteAddr) - c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) + c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr) err = el.poller.Trigger(queue.HighPriority, el.register, c) if err != nil { eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err) @@ -62,7 +62,7 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error { } func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error { - if el.ln.network == "udp" { + if el.listeners[fd].network == "udp" { return el.readUDP1(fd, ev, flags) } @@ -81,12 +81,12 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) } remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa) - if el.engine.opts.TCPKeepAlive > 0 && el.ln.network == "tcp" { + if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" { err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second)) logging.Error(err) } - c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr) + c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr) addEvents := el.poller.AddRead if el.engine.opts.EdgeTriggeredIO { addEvents = el.poller.AddReadWrite diff --git a/acceptor_windows.go b/acceptor_windows.go index 25717e59f..ce6005c0e 100644 --- a/acceptor_windows.go +++ b/acceptor_windows.go @@ -23,7 +23,7 @@ import ( errorx "github.com/panjf2000/gnet/v2/pkg/errors" ) -func (eng *engine) listen() (err error) { +func (eng *engine) listenStream(ln net.Listener) (err error) { if eng.opts.LockOSThread { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -31,56 +31,64 @@ func (eng *engine) listen() (err error) { defer func() { eng.shutdown(err) }() - var buffer [0x10000]byte for { - if eng.ln.pc != nil { - // Read data from UDP socket. - n, addr, e := eng.ln.pc.ReadFrom(buffer[:]) - if e != nil { - err = e - if atomic.LoadInt32(&eng.beingShutdown) == 0 { - eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err) - } else if errors.Is(err, net.ErrClosed) { - err = errorx.ErrEngineShutdown - // TODO: errors.Join() is not supported until Go 1.20, - // we will uncomment this line after we bump up the - // minimal supported go version to 1.20. - // err = errors.Join(err, errorx.ErrEngineShutdown) + // Accept TCP socket. + tc, e := ln.Accept() + if e != nil { + err = e + if atomic.LoadInt32(&eng.beingShutdown) == 0 { + eng.opts.Logger.Errorf("Accept() fails due to error: %v", err) + } else if errors.Is(err, net.ErrClosed) { + err = errorx.ErrEngineShutdown + // TODO: errors.Join() is not supported until Go 1.20, + // we will uncomment this line after we bump up the + // minimal supported go version to 1.20. + // err = errors.Join(err, errorx.ErrEngineShutdown) + } + return + } + el := eng.eventLoops.next(tc.RemoteAddr()) + c := newTCPConn(tc, el) + el.ch <- &openConn{c: c} + go func(c *conn, tc net.Conn, el *eventloop) { + var buffer [0x10000]byte + for { + n, err := tc.Read(buffer[:]) + if err != nil { + el.ch <- &netErr{c, err} + return } - return + el.ch <- packTCPConn(c, buffer[:n]) } + }(c, tc, el) + } +} - el := eng.eventLoops.next(addr) - c := newUDPConn(el, eng.ln.addr, addr) - el.ch <- packUDPConn(c, buffer[:n]) - } else { - // Accept TCP socket. - tc, e := eng.ln.ln.Accept() - if e != nil { - err = e - if atomic.LoadInt32(&eng.beingShutdown) == 0 { - eng.opts.Logger.Errorf("Accept() fails due to error: %v", err) - } else if errors.Is(err, net.ErrClosed) { - err = errorx.ErrEngineShutdown - // TODO: ditto. - // err = errors.Join(err, errorx.ErrEngineShutdown) - } - return +func (eng *engine) ListenUDP(pc net.PacketConn) (err error) { + if eng.opts.LockOSThread { + runtime.LockOSThread() + defer runtime.UnlockOSThread() + } + + defer func() { eng.shutdown(err) }() + + var buffer [0x10000]byte + for { + // Read data from UDP socket. + n, addr, e := pc.ReadFrom(buffer[:]) + if e != nil { + err = e + if atomic.LoadInt32(&eng.beingShutdown) == 0 { + eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err) + } else if errors.Is(err, net.ErrClosed) { + err = errorx.ErrEngineShutdown + // TODO: ditto. + // err = errors.Join(err, errorx.ErrEngineShutdown) } - el := eng.eventLoops.next(tc.RemoteAddr()) - c := newTCPConn(tc, el) - el.ch <- &openConn{c: c} - go func(c *conn, tc net.Conn, el *eventloop) { - var buffer [0x10000]byte - for { - n, err := tc.Read(buffer[:]) - if err != nil { - el.ch <- &netErr{c, err} - return - } - el.ch <- packTCPConn(c, buffer[:n]) - } - }(c, tc, el) + return } + el := eng.eventLoops.next(addr) + c := newUDPConn(el, pc, pc.LocalAddr(), addr) + el.ch <- packUDPConn(c, buffer[:n]) } } diff --git a/client_test.go b/client_test.go index 3aa3d1fed..dccbf10dc 100644 --- a/client_test.go +++ b/client_test.go @@ -196,7 +196,7 @@ func TestClient(t *testing.T) { }) }) - t.Run("poll-LT-reuseport", func(t *testing.T) { + t.Run("poll-reuseport-LT", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { runClient(t, "tcp", ":9991", false, true, false, false, 10, RoundRobin) @@ -247,7 +247,7 @@ func TestClient(t *testing.T) { }) }) - t.Run("poll-ET-reuseport", func(t *testing.T) { + t.Run("poll-reuseport-ET", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { runClient(t, "tcp", ":9991", true, true, false, false, 10, RoundRobin) @@ -405,7 +405,7 @@ func (s *testClient) OnTraffic(c Conn) (action Action) { } func (s *testClient) OnTick() (delay time.Duration, action Action) { - delay = time.Second / 5 + delay = 100 * time.Millisecond if atomic.CompareAndSwapInt32(&s.started, 0, 1) { for i := 0; i < s.nclients; i++ { atomic.AddInt32(&s.clientActive, 1) @@ -484,7 +484,7 @@ func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore, require.NoError(t, err) rspCh := handler.rspCh duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 - t.Logf("test duration: %dms", duration/time.Millisecond) + logging.Debugf("test duration: %v", duration) start := time.Now() for time.Since(start) < duration { reqData := make([]byte, streamLen) diff --git a/client_unix.go b/client_unix.go index 7c50d0608..10bf99c07 100644 --- a/client_unix.go +++ b/client_unix.go @@ -68,7 +68,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) { shutdownCtx, shutdown := context.WithCancel(context.Background()) eng := engine{ - ln: &listener{}, + listeners: make(map[int]*listener), opts: options, eventHandler: eh, workerPool: struct { @@ -82,9 +82,9 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) { eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background()) } el := eventloop{ - ln: eng.ln, - engine: &eng, - poller: p, + listeners: eng.listeners, + engine: &eng, + poller: p, } rbc := options.ReadBufferCap @@ -115,7 +115,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) { // Start starts the client event-loop, handing IO events. func (cli *Client) Start() error { - cli.el.eventHandler.OnBoot(Engine{}) + logging.Infof("Starting gnet client with 1 event-loop") + cli.el.eventHandler.OnBoot(Engine{cli.el.engine}) cli.el.engine.workerPool.Go(cli.el.run) // Start the ticker. if cli.opts.Ticker { @@ -134,7 +135,7 @@ func (cli *Client) Stop() (err error) { } _ = cli.el.engine.workerPool.Wait() logging.Error(cli.el.poller.Close()) - cli.el.eventHandler.OnShutdown(Engine{}) + cli.el.eventHandler.OnShutdown(Engine{cli.el.engine}) logging.Cleanup() return } diff --git a/client_windows.go b/client_windows.go index 07d2294b2..33e0566ff 100644 --- a/client_windows.go +++ b/client_windows.go @@ -50,8 +50,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) { shutdownCtx, shutdown := context.WithCancel(context.Background()) eng := &engine{ - ln: &listener{}, - opts: options, + listeners: []*listener{}, + opts: options, workerPool: struct { *errgroup.Group shutdownCtx context.Context @@ -70,7 +70,7 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) { } func (cli *Client) Start() error { - cli.el.eventHandler.OnBoot(Engine{}) + cli.el.eventHandler.OnBoot(Engine{cli.el.eng}) cli.el.eng.workerPool.Go(cli.el.run) if cli.opts.Ticker { cli.el.eng.ticker.ctx, cli.el.eng.ticker.cancel = context.WithCancel(context.Background()) @@ -89,7 +89,7 @@ func (cli *Client) Stop() (err error) { cli.el.eng.ticker.cancel() } _ = cli.el.eng.workerPool.Wait() - cli.el.eventHandler.OnShutdown(Engine{}) + cli.el.eventHandler.OnShutdown(Engine{cli.el.eng}) logging.Cleanup() return } @@ -202,7 +202,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err }(c, nc, cli.el) gc = c case *net.UDPConn: - c := newUDPConn(cli.el, nc.LocalAddr(), nc.RemoteAddr()) + c := newUDPConn(cli.el, nil, nc.LocalAddr(), nc.RemoteAddr()) c.SetContext(ctx) c.rawConn = nc cli.el.ch <- &openConn{c: c, isDatagram: true, cb: func() { close(connOpened) }} @@ -213,7 +213,7 @@ func (cli *Client) EnrollContext(nc net.Conn, ctx interface{}) (gc Conn, err err if err != nil { return } - c := newUDPConn(cli.el, uc.LocalAddr(), uc.RemoteAddr()) + c := newUDPConn(cli.el, nil, uc.LocalAddr(), uc.RemoteAddr()) c.SetContext(ctx) c.rawConn = uc el.ch <- packUDPConn(c, buffer[:n]) diff --git a/connection_unix.go b/connection_unix.go index 18fe3e26e..eb0ab3b59 100644 --- a/connection_unix.go +++ b/connection_unix.go @@ -90,13 +90,13 @@ func (c *conn) release() { c.isEOF = false c.ctx = nil c.buffer = nil - if addr, ok := c.localAddr.(*net.TCPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 { + if addr, ok := c.localAddr.(*net.TCPAddr); ok && len(c.loop.listeners) == 0 && len(addr.Zone) > 0 { bsPool.Put(bs.StringToBytes(addr.Zone)) } if addr, ok := c.remoteAddr.(*net.TCPAddr); ok && len(addr.Zone) > 0 { bsPool.Put(bs.StringToBytes(addr.Zone)) } - if addr, ok := c.localAddr.(*net.UDPAddr); ok && c.localAddr != c.loop.ln.addr && len(addr.Zone) > 0 { + if addr, ok := c.localAddr.(*net.UDPAddr); ok && len(c.loop.listeners) == 0 && len(addr.Zone) > 0 { bsPool.Put(bs.StringToBytes(addr.Zone)) } if addr, ok := c.remoteAddr.(*net.UDPAddr); ok && len(addr.Zone) > 0 { @@ -451,7 +451,7 @@ func (c *conn) RemoteAddr() net.Addr { return c.remoteAddr } // func (c *conn) Gfd() gfd.GFD { return c.gfd } func (c *conn) Fd() int { return c.fd } -func (c *conn) Dup() (fd int, err error) { fd, _, err = socket.Dup(c.fd); return } +func (c *conn) Dup() (fd int, err error) { return socket.Dup(c.fd) } func (c *conn) SetReadBuffer(bytes int) error { return socket.SetRecvBuffer(c.fd, bytes) } func (c *conn) SetWriteBuffer(bytes int) error { return socket.SetSendBuffer(c.fd, bytes) } func (c *conn) SetLinger(sec int) error { return socket.SetLinger(c.fd, sec) } diff --git a/connection_windows.go b/connection_windows.go index 21ebb64d4..e46498f9d 100644 --- a/connection_windows.go +++ b/connection_windows.go @@ -49,6 +49,7 @@ type openConn struct { } type conn struct { + pc net.PacketConn ctx interface{} // user-defined context loop *eventloop // owner event-loop buffer *bbPool.ByteBuffer // reuse memory of inbound data as a temporary buffer @@ -102,8 +103,9 @@ func (c *conn) release() { c.buffer = nil } -func newUDPConn(el *eventloop, localAddr, remoteAddr net.Addr) *conn { +func newUDPConn(el *eventloop, pc net.PacketConn, localAddr, remoteAddr net.Addr) *conn { return &conn{ + pc: pc, loop: el, buffer: bbPool.Get(), localAddr: localAddr, @@ -239,13 +241,13 @@ func (c *conn) Discard(n int) (int, error) { } func (c *conn) Write(p []byte) (int, error) { - if c.rawConn == nil && c.loop.eng.ln.pc == nil { + if c.rawConn == nil && c.pc == nil { return 0, net.ErrClosed } if c.rawConn != nil { return c.rawConn.Write(p) } - return c.loop.eng.ln.pc.WriteTo(p, c.remoteAddr) + return c.pc.WriteTo(p, c.remoteAddr) } func (c *conn) Writev(bs [][]byte) (int, error) { @@ -319,7 +321,7 @@ func (c *conn) Fd() (fd int) { } func (c *conn) Dup() (fd int, err error) { - if c.rawConn == nil && c.loop.eng.ln.pc == nil { + if c.rawConn == nil && c.pc == nil { return -1, net.ErrClosed } @@ -330,7 +332,7 @@ func (c *conn) Dup() (fd int, err error) { if c.rawConn != nil { sc, ok = c.rawConn.(syscall.Conn) } else { - sc, ok = c.loop.eng.ln.pc.(syscall.Conn) + sc, ok = c.pc.(syscall.Conn) } if !ok { @@ -365,24 +367,24 @@ func (c *conn) Dup() (fd int, err error) { } func (c *conn) SetReadBuffer(bytes int) error { - if c.rawConn == nil && c.loop.eng.ln.pc == nil { + if c.rawConn == nil && c.pc == nil { return net.ErrClosed } if c.rawConn != nil { return c.rawConn.(interface{ SetReadBuffer(int) error }).SetReadBuffer(bytes) } - return c.loop.eng.ln.pc.(interface{ SetReadBuffer(int) error }).SetReadBuffer(bytes) + return c.pc.(interface{ SetReadBuffer(int) error }).SetReadBuffer(bytes) } func (c *conn) SetWriteBuffer(bytes int) error { - if c.rawConn == nil && c.loop.eng.ln.pc == nil { + if c.rawConn == nil && c.pc == nil { return net.ErrClosed } if c.rawConn != nil { return c.rawConn.(interface{ SetWriteBuffer(int) error }).SetWriteBuffer(bytes) } - return c.loop.eng.ln.pc.(interface{ SetWriteBuffer(int) error }).SetWriteBuffer(bytes) + return c.pc.(interface{ SetWriteBuffer(int) error }).SetWriteBuffer(bytes) } func (c *conn) SetLinger(sec int) error { diff --git a/engine_unix.go b/engine_unix.go index 261b67100..ffba29c1b 100644 --- a/engine_unix.go +++ b/engine_unix.go @@ -20,6 +20,7 @@ package gnet import ( "context" "runtime" + "strings" "sync" "sync/atomic" @@ -29,14 +30,15 @@ import ( "github.com/panjf2000/gnet/v2/internal/netpoll" "github.com/panjf2000/gnet/v2/internal/queue" "github.com/panjf2000/gnet/v2/pkg/errors" + "github.com/panjf2000/gnet/v2/pkg/logging" ) type engine struct { - ln *listener // the listener for accepting new connections - opts *Options // options with engine - acceptor *eventloop // main event-loop for accepting connections - eventLoops loadBalancer // event-loops for handling events - inShutdown int32 // whether the engine is in shutdown + listeners map[int]*listener // listeners for accepting new connections + opts *Options // options with engine + acceptor *eventloop // main event-loop for accepting connections + eventLoops loadBalancer // event-loops for handling events + inShutdown int32 // whether the engine is in shutdown ticker struct { ctx context.Context // context for ticker cancel context.CancelFunc // function to stop the ticker @@ -68,12 +70,16 @@ func (eng *engine) shutdown(err error) { func (eng *engine) closeEventLoops() { eng.eventLoops.iterate(func(_ int, el *eventloop) bool { - el.ln.close() + for _, ln := range el.listeners { + ln.close() + } _ = el.poller.Close() return true }) if eng.acceptor != nil { - eng.ln.close() + for _, ln := range eng.listeners { + ln.close() + } err := eng.acceptor.poller.Close() if err != nil { eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err) @@ -81,37 +87,42 @@ func (eng *engine) closeEventLoops() { } } -func (eng *engine) runEventLoops(numEventLoop int) (err error) { - network, address := eng.ln.network, eng.ln.address - ln := eng.ln - var striker *eventloop +func (eng *engine) runEventLoops(numEventLoop int) error { + var el0 *eventloop + lns := eng.listeners // Create loops locally and bind the listeners. for i := 0; i < numEventLoop; i++ { if i > 0 { - if ln, err = initListener(network, address, eng.opts); err != nil { - return + lns = make(map[int]*listener, len(eng.listeners)) + for _, l := range eng.listeners { + ln, err := initListener(l.network, l.address, eng.opts) + if err != nil { + return err + } + lns[ln.fd] = ln } } - var p *netpoll.Poller - if p, err = netpoll.OpenPoller(); err == nil { - el := new(eventloop) - el.ln = ln - el.engine = eng - el.poller = p - el.buffer = make([]byte, eng.opts.ReadBufferCap) - el.connections.init() - el.eventHandler = eng.eventHandler - if err = el.poller.AddRead(el.ln.packPollAttachment(el.accept), false); err != nil { - return + p, err := netpoll.OpenPoller() + if err != nil { + return err + } + el := new(eventloop) + el.listeners = lns + el.engine = eng + el.poller = p + el.buffer = make([]byte, eng.opts.ReadBufferCap) + el.connections.init() + el.eventHandler = eng.eventHandler + for _, ln := range lns { + if err = el.poller.AddRead(ln.packPollAttachment(el.accept), false); err != nil { + return err } - eng.eventLoops.register(el) + } + eng.eventLoops.register(el) - // Start the ticker. - if el.idx == 0 && eng.opts.Ticker { - striker = el - } - } else { - return + // Start the ticker. + if eng.opts.Ticker && el.idx == 0 { + el0 = el } } @@ -121,28 +132,30 @@ func (eng *engine) runEventLoops(numEventLoop int) (err error) { return true }) - eng.workerPool.Go(func() error { - striker.ticker(eng.ticker.ctx) - return nil - }) + if el0 != nil { + eng.workerPool.Go(func() error { + el0.ticker(eng.ticker.ctx) + return nil + }) + } - return + return nil } func (eng *engine) activateReactors(numEventLoop int) error { for i := 0; i < numEventLoop; i++ { - if p, err := netpoll.OpenPoller(); err == nil { - el := new(eventloop) - el.ln = eng.ln - el.engine = eng - el.poller = p - el.buffer = make([]byte, eng.opts.ReadBufferCap) - el.connections.init() - el.eventHandler = eng.eventHandler - eng.eventLoops.register(el) - } else { + p, err := netpoll.OpenPoller() + if err != nil { return err } + el := new(eventloop) + el.listeners = eng.listeners + el.engine = eng + el.poller = p + el.buffer = make([]byte, eng.opts.ReadBufferCap) + el.connections.init() + el.eventHandler = eng.eventHandler + eng.eventLoops.register(el) } // Start sub reactors in background. @@ -151,23 +164,25 @@ func (eng *engine) activateReactors(numEventLoop int) error { return true }) - if p, err := netpoll.OpenPoller(); err == nil { - el := new(eventloop) - el.ln = eng.ln - el.idx = -1 - el.engine = eng - el.poller = p - el.eventHandler = eng.eventHandler - if err = el.poller.AddRead(eng.ln.packPollAttachment(eng.accept), false); err != nil { + p, err := netpoll.OpenPoller() + if err != nil { + return err + } + el := new(eventloop) + el.listeners = eng.listeners + el.idx = -1 + el.engine = eng + el.poller = p + el.eventHandler = eng.eventHandler + for _, ln := range eng.listeners { + if err = el.poller.AddRead(ln.packPollAttachment(eng.accept), false); err != nil { return err } - eng.acceptor = el - - // Start main reactor in background. - eng.workerPool.Go(el.rotate) - } else { - return err } + eng.acceptor = el + + // Start main reactor in background. + eng.workerPool.Go(el.rotate) // Start the ticker. if eng.opts.Ticker { @@ -181,7 +196,7 @@ func (eng *engine) activateReactors(numEventLoop int) error { } func (eng *engine) start(numEventLoop int) error { - if eng.opts.ReusePort || eng.ln.network == "udp" { + if eng.opts.ReusePort { return eng.runEventLoops(numEventLoop) } @@ -225,8 +240,8 @@ func (eng *engine) stop(s Engine) { atomic.StoreInt32(&eng.inShutdown, 1) } -func run(eventHandler EventHandler, listener *listener, options *Options, protoAddr string) error { - // Figure out the proper number of event-loops/goroutines to run. +func run(eventHandler EventHandler, listeners []*listener, options *Options, addrs []string) error { + // Figure out the proper number of event-loop to run. numEventLoop := 1 if options.Multicore { numEventLoop = runtime.NumCPU() @@ -238,10 +253,17 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA numEventLoop = gfd.EventLoopIndexMax } + logging.Infof("Launching gnet with %d event-loops, listening on: %s", + numEventLoop, strings.Join(addrs, " | ")) + + lns := make(map[int]*listener, len(listeners)) + for _, ln := range listeners { + lns[ln.fd] = ln + } shutdownCtx, shutdown := context.WithCancel(context.Background()) eng := engine{ - ln: listener, - opts: options, + listeners: lns, + opts: options, workerPool: struct { *errgroup.Group shutdownCtx context.Context @@ -277,7 +299,9 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA } defer eng.stop(e) - allEngines.Store(protoAddr, &eng) + for _, addr := range addrs { + allEngines.Store(addr, &eng) + } return nil } diff --git a/engine_windows.go b/engine_windows.go index d0d3cb304..304b14421 100644 --- a/engine_windows.go +++ b/engine_windows.go @@ -18,16 +18,18 @@ import ( "context" "errors" "runtime" + "strings" "sync" "sync/atomic" "golang.org/x/sync/errgroup" errorx "github.com/panjf2000/gnet/v2/pkg/errors" + "github.com/panjf2000/gnet/v2/pkg/logging" ) type engine struct { - ln *listener + listeners []*listener opts *Options // options with engine eventLoops loadBalancer // event-loops for handling events ticker struct { @@ -64,7 +66,9 @@ func (eng *engine) closeEventLoops() { el.ch <- errorx.ErrEngineShutdown return true }) - eng.ln.close() + for _, ln := range eng.listeners { + ln.close() + } } func (eng *engine) start(numEventLoop int) error { @@ -86,7 +90,18 @@ func (eng *engine) start(numEventLoop int) error { } } - eng.workerPool.Go(eng.listen) + for _, ln := range eng.listeners { + l := ln + if l.pc != nil { + eng.workerPool.Go(func() error { + return eng.ListenUDP(l.pc) + }) + } else { + eng.workerPool.Go(func() error { + return eng.listenStream(l.ln) + }) + } + } return nil } @@ -111,7 +126,7 @@ func (eng *engine) stop(engine Engine) error { return nil } -func run(eventHandler EventHandler, listener *listener, options *Options, protoAddr string) error { +func run(eventHandler EventHandler, listeners []*listener, options *Options, addrs []string) error { // Figure out the proper number of event-loops/goroutines to run. numEventLoop := 1 if options.Multicore { @@ -121,11 +136,14 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA numEventLoop = options.NumEventLoop } + logging.Infof("Launching gnet with %d event-loops, listening on: %s", + numEventLoop, strings.Join(addrs, " | ")) + shutdownCtx, shutdown := context.WithCancel(context.Background()) eng := engine{ opts: options, eventHandler: eventHandler, - ln: listener, + listeners: listeners, workerPool: struct { *errgroup.Group shutdownCtx context.Context @@ -137,6 +155,11 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA switch options.LB { case RoundRobin: eng.eventLoops = new(roundRobinLoadBalancer) + // If there are more than one listener, we can't use roundRobinLoadBalancer because + // it's not concurrency-safe, replace it with leastConnectionsLoadBalancer. + if len(listeners) > 1 { + eng.eventLoops = new(leastConnectionsLoadBalancer) + } case LeastConnections: eng.eventLoops = new(leastConnectionsLoadBalancer) case SourceAddrHash: @@ -160,7 +183,9 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA } defer eng.stop(engine) //nolint:errcheck - allEngines.Store(protoAddr, &eng) + for _, addr := range addrs { + allEngines.Store(addr, &eng) + } return nil } diff --git a/eventloop_unix.go b/eventloop_unix.go index 2d6eae077..375b10675 100644 --- a/eventloop_unix.go +++ b/eventloop_unix.go @@ -37,14 +37,14 @@ import ( ) type eventloop struct { - ln *listener // listener - idx int // loop index in the engine loops list - cache bytes.Buffer // temporary buffer for scattered bytes - engine *engine // engine in loop - poller *netpoll.Poller // epoll or kqueue - buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB - connections connMatrix // loop connections storage - eventHandler EventHandler // user eventHandler + listeners map[int]*listener // listeners + idx int // loop index in the engine loops list + cache bytes.Buffer // temporary buffer for scattered bytes + engine *engine // engine in loop + poller *netpoll.Poller // epoll or kqueue + buffer []byte // read packet buffer whose capacity is set by user, default value is 64KB + connections connMatrix // loop connections storage + eventHandler EventHandler // user eventHandler } func (el *eventloop) getLogger() logging.Logger { @@ -198,7 +198,7 @@ loop: func (el *eventloop) close(c *conn, err error) (rerr error) { if addr := c.localAddr; addr != nil && strings.HasPrefix(c.localAddr.Network(), "udp") { rerr = el.poller.Delete(c.fd) - if c.fd != el.ln.fd { + if _, ok := el.listeners[c.fd]; !ok { rerr = unix.Close(c.fd) el.connections.delConn(c) } @@ -260,9 +260,6 @@ func (el *eventloop) wake(c *conn) error { } func (el *eventloop) ticker(ctx context.Context) { - if el == nil { - return - } var ( action Action delay time.Duration @@ -320,8 +317,8 @@ func (el *eventloop) readUDP1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) erro fd, el.idx, os.NewSyscallError("recvfrom", err)) } var c *conn - if fd == el.ln.fd { - c = newUDPConn(fd, el, el.ln.addr, sa, false) + if ln, ok := el.listeners[fd]; ok { + c = newUDPConn(fd, el, ln.addr, sa, false) } else { c = el.connections.getConn(fd) } diff --git a/gnet.go b/gnet.go index d511ab5be..643cd7264 100644 --- a/gnet.go +++ b/gnet.go @@ -50,7 +50,7 @@ type Engine struct { // Validate checks whether the engine is available. func (e Engine) Validate() error { - if e.eng == nil { + if e.eng == nil || len(e.eng.listeners) == 0 { return errors.ErrEmptyEngine } if e.eng.isInShutdown() { @@ -76,14 +76,14 @@ func (e Engine) CountConnections() (count int) { // It is the caller's responsibility to close dupFD when finished. // Closing listener does not affect dupFD, and closing dupFD does not affect listener. func (e Engine) Dup() (fd int, err error) { - if err = e.Validate(); err != nil { + if err := e.Validate(); err != nil { return -1, err } - - var sc string - fd, sc, err = e.eng.ln.dup() - if err != nil { - logging.Warnf("%s failed when duplicating new fd\n", sc) + if len(e.eng.listeners) > 1 { + return -1, errors.ErrUnsupportedOp + } + for _, ln := range e.eng.listeners { + fd, err = ln.dup() } return } @@ -314,7 +314,7 @@ type Conn interface { // you must invoke it within any method in EventHandler. LocalAddr() (addr net.Addr) - // RemoteAddr is the connection's remote remote address, it's not concurrency-safe, + // RemoteAddr is the connection's remote address, it's not concurrency-safe, // you must invoke it within any method in EventHandler. RemoteAddr() (addr net.Addr) @@ -419,22 +419,7 @@ func (*BuiltinEventEngine) OnTick() (delay time.Duration, action Action) { // MaxStreamBufferCap is the default buffer size for each stream-oriented connection(TCP/Unix). var MaxStreamBufferCap = 64 * 1024 // 64KB -// Run starts handling events on the specified address. -// -// Address should use a scheme prefix and be formatted -// like `tcp://192.168.0.10:9851` or `unix://socket`. -// Valid network schemes: -// -// tcp - bind to both IPv4 and IPv6 -// tcp4 - IPv4 -// tcp6 - IPv6 -// udp - bind to both IPv4 and IPv6 -// udp4 - IPv4 -// udp6 - IPv6 -// unix - Unix Domain Socket -// -// The "tcp" network scheme is assumed when one is not specified. -func Run(eventHandler EventHandler, protoAddr string, opts ...Option) (err error) { +func createListeners(addrs []string, opts ...Option) ([]*listener, *Options, error) { options := loadOptions(opts...) logger, logFlusher := logging.GetDefaultLogger(), logging.GetDefaultFlusher() @@ -449,8 +434,6 @@ func Run(eventHandler EventHandler, protoAddr string, opts ...Option) (err error } logging.SetDefaultLoggerAndFlusher(logger, logFlusher) - defer logging.Cleanup() - logging.Debugf("default logging level is %s", logging.LogLevel()) // The maximum number of operating system threads that the Go program can use is initially set to 10000, @@ -458,7 +441,7 @@ func Run(eventHandler EventHandler, protoAddr string, opts ...Option) (err error if options.LockOSThread && options.NumEventLoop > 10000 { logging.Errorf("too many event-loops under LockOSThread mode, should be less than 10,000 "+ "while you are trying to set up %d\n", options.NumEventLoop) - return errors.ErrTooManyEventLoopThreads + return nil, nil, errors.ErrTooManyEventLoopThreads } rbc := options.ReadBufferCap @@ -480,19 +463,76 @@ func Run(eventHandler EventHandler, protoAddr string, opts ...Option) (err error options.WriteBufferCap = math.CeilToPowerOfTwo(wbc) } - network, addr := parseProtoAddr(protoAddr) + // If there is UDP listener in the list, enable SO_REUSEPORT and disable edge-triggered I/O by default. + for i := 0; (!options.ReusePort || options.EdgeTriggeredIO) && i < len(addrs); i++ { + proto, _, err := parseProtoAddr(addrs[i]) + if err != nil { + return nil, nil, err + } + if strings.HasPrefix(proto, "udp") { + options.ReusePort = true + options.EdgeTriggeredIO = false + } + } - var ln *listener - if ln, err = initListener(network, addr, options); err != nil { - return + listeners := make([]*listener, len(addrs)) + for i, a := range addrs { + proto, addr, err := parseProtoAddr(a) + if err != nil { + return nil, nil, err + } + ln, err := initListener(proto, addr, options) + if err != nil { + return nil, nil, err + } + listeners[i] = ln } - defer ln.close() - if ln.network == "udp" { - options.EdgeTriggeredIO = false + return listeners, options, nil +} + +// Run starts handling events on the specified address. +// +// Address should use a scheme prefix and be formatted +// like `tcp://192.168.0.10:9851` or `unix://socket`. +// Valid network schemes: +// +// tcp - bind to both IPv4 and IPv6 +// tcp4 - IPv4 +// tcp6 - IPv6 +// udp - bind to both IPv4 and IPv6 +// udp4 - IPv4 +// udp6 - IPv6 +// unix - Unix Domain Socket +// +// The "tcp" network scheme is assumed when one is not specified. +func Run(eventHandler EventHandler, protoAddr string, opts ...Option) error { + listeners, options, err := createListeners([]string{protoAddr}, opts...) + if err != nil { + return err } + defer func() { + for _, ln := range listeners { + ln.close() + } + logging.Cleanup() + }() + return run(eventHandler, listeners, options, []string{protoAddr}) +} - return run(eventHandler, ln, options, protoAddr) +// Rotate is like Run but accepts multiple network addresses. +func Rotate(eventHandler EventHandler, addrs []string, opts ...Option) error { + listeners, options, err := createListeners(addrs, opts...) + if err != nil { + return err + } + defer func() { + for _, ln := range listeners { + ln.close() + } + logging.Cleanup() + }() + return run(eventHandler, listeners, options, addrs) } var ( @@ -504,7 +544,12 @@ var ( // Stop gracefully shuts down the engine without interrupting any active event-loops, // it waits indefinitely for connections and event-loops to be closed and then shuts down. -// Deprecated: The global Stop only shuts down the last registered Engine with the same protocol and IP:Port as the previous Engine's, which can lead to leaks of Engine if you invoke gnet.Run multiple times using the same protocol and IP:Port under the condition that WithReuseAddr(true) and WithReusePort(true) are enabled. Use Engine.Stop instead. +// +// Deprecated: The global Stop only shuts down the last registered Engine with the same +// protocol and IP:Port as the previous Engine's, which can lead to leaks of Engine if +// you invoke gnet.Run multiple times using the same protocol and IP:Port under the +// condition that WithReuseAddr(true) and WithReusePort(true) are enabled. +// Use Engine.Stop instead. func Stop(ctx context.Context, protoAddr string) error { var eng *engine if s, ok := allEngines.Load(protoAddr); ok { @@ -533,13 +578,20 @@ func Stop(ctx context.Context, protoAddr string) error { } } -func parseProtoAddr(addr string) (network, address string) { - network = "tcp" - address = strings.ToLower(addr) - if strings.Contains(address, "://") { - pair := strings.Split(address, "://") - network = pair[0] - address = pair[1] +func parseProtoAddr(protoAddr string) (string, string, error) { + protoAddr = strings.ToLower(protoAddr) + if strings.Count(protoAddr, "://") != 1 { + return "", "", errors.ErrInvalidNetworkAddress } - return + pair := strings.SplitN(protoAddr, "://", 2) + proto, addr := pair[0], pair[1] + switch proto { + case "tcp", "tcp4", "tcp6", "udp", "udp4", "udp6", "unix": + default: + return "", "", errors.ErrUnsupportedProtocol + } + if addr == "" { + return "", "", errors.ErrInvalidNetworkAddress + } + return proto, addr, nil } diff --git a/gnet_test.go b/gnet_test.go index 8d0415079..fec73ad16 100644 --- a/gnet_test.go +++ b/gnet_test.go @@ -10,6 +10,7 @@ import ( "math/rand" "net" "runtime" + "strings" "sync/atomic" "testing" "time" @@ -40,66 +41,66 @@ func TestServer(t *testing.T) { t.Run("poll-LT", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, false, true, false, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, false, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, false, true, true, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, false, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, false, false, true, true, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, false, true, true, true, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, false, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", false, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", false, false, true, false, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, false, false, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", false, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", false, false, true, true, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, false, false, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, false, true, false, false, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, false, false, true, false, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, false, true, true, false, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, false, false, true, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, false, false, true, true, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, false, true, true, true, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, false, false, true, true, true, 10, SourceAddrHash) }) }) }) @@ -107,200 +108,340 @@ func TestServer(t *testing.T) { t.Run("poll-ET", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, false, true, false, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, false, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, false, true, true, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, false, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, false, false, true, true, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, false, true, true, true, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, false, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", true, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", true, false, true, false, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, true, false, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", true, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", true, false, true, true, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, true, false, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, false, false, false, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, false, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, false, true, false, false, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, true, false, true, false, false, 10, SourceAddrHash) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, false, false, true, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, false, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, false, true, true, false, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, true, false, true, true, false, 10, SourceAddrHash) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, false, false, true, true, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, false, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, false, true, true, true, 10, SourceAddrHash) + runServer(t, []string{"unix://gnet2.sock"}, true, false, true, true, true, 10, SourceAddrHash) }) }) }) - t.Run("poll-LT-reuseport", func(t *testing.T) { + t.Run("poll-reuseport-LT", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, true, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, true, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", false, true, false, true, true, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, false, true, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", false, true, true, true, true, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, false, true, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", false, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, false, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", false, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, false, true, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", false, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, false, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", false, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, false, true, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, false, true, true, false, false, 10, LeastConnections) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, false, true, true, true, false, 10, LeastConnections) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", false, true, false, true, true, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, false, true, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", false, true, true, true, true, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, false, true, true, true, true, 10, LeastConnections) }) }) }) - t.Run("poll-ET-reuseport", func(t *testing.T) { + t.Run("poll-reuseport-ET", func(t *testing.T) { t.Run("tcp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, true, true, false, false, 10, LeastConnections) }) }) t.Run("tcp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, true, true, true, false, 10, LeastConnections) }) }) t.Run("tcp-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "tcp", ":9991", true, true, false, true, true, 10, RoundRobin) + runServer(t, []string{"tcp://:9991"}, true, true, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "tcp", ":9992", true, true, true, true, true, 10, LeastConnections) + runServer(t, []string{"tcp://:9992"}, true, true, true, true, true, 10, LeastConnections) }) }) t.Run("udp", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", true, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, true, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", true, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, true, true, true, false, false, 10, LeastConnections) }) }) t.Run("udp-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "udp", ":9991", true, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"udp://:9991"}, true, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "udp", ":9992", true, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"udp://:9992"}, true, true, true, true, false, 10, LeastConnections) }) }) t.Run("unix", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, true, false, false, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, true, false, false, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, true, true, false, false, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, true, true, true, false, false, 10, LeastConnections) }) }) t.Run("unix-async", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, true, false, true, false, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, true, false, true, false, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, true, true, true, false, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, true, true, true, true, false, 10, LeastConnections) }) }) t.Run("unix-async-writev", func(t *testing.T) { t.Run("1-loop", func(t *testing.T) { - runServer(t, "unix", "gnet1.sock", true, true, false, true, true, 10, RoundRobin) + runServer(t, []string{"unix://gnet1.sock"}, true, true, false, true, true, 10, RoundRobin) }) t.Run("N-loop", func(t *testing.T) { - runServer(t, "unix", "gnet2.sock", true, true, true, true, true, 10, LeastConnections) + runServer(t, []string{"unix://gnet2.sock"}, true, true, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-multi-addrs-LT", func(t *testing.T) { + t.Run("sync", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, true, false, false, 10, LeastConnections) + }) + }) + t.Run("sync-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, false, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, true, false, true, 10, LeastConnections) + }) + }) + t.Run("async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, false, false, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-multi-addrs-reuseport-LT", func(t *testing.T) { + t.Run("sync", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, true, false, false, 10, LeastConnections) + }) + }) + t.Run("sync-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, false, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, true, false, true, 10, LeastConnections) + }) + }) + t.Run("async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, false, true, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-multi-addrs-ET", func(t *testing.T) { + t.Run("sync", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, true, false, false, 10, LeastConnections) + }) + }) + t.Run("sync-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, false, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, true, false, true, 10, LeastConnections) + }) + }) + t.Run("async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, true, true, false, 10, LeastConnections) + }) + }) + t.Run("async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, true, false, true, true, true, 10, LeastConnections) + }) + }) + }) + + t.Run("poll-multi-addrs-reuseport-ET", func(t *testing.T) { + t.Run("sync", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, false, false, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, true, false, false, 10, LeastConnections) + }) + }) + t.Run("sync-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, false, false, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, true, false, true, 10, LeastConnections) + }) + }) + t.Run("async", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "udp://:9993", "udp://:9994", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, false, true, false, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "udp://:9997", "udp://:9998", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, true, true, false, 10, LeastConnections) + }) + }) + t.Run("async-writev", func(t *testing.T) { + t.Run("1-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9991", "tcp://:9992", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, false, true, true, 10, RoundRobin) + }) + t.Run("N-loop", func(t *testing.T) { + runServer(t, []string{"tcp://:9995", "tcp://:9996", "unix://gnet1.sock", "unix://gnet2.sock"}, true, true, true, true, true, 10, LeastConnections) }) }) }) @@ -310,25 +451,28 @@ type testServer struct { *BuiltinEventEngine tester *testing.T eng Engine - network string - addr string + addrs []string multicore bool async bool writev bool nclients int started int32 connected int32 - clientActive int32 disconnected int32 + clientActive int32 workerPool *goPool.Pool } func (s *testServer) OnBoot(eng Engine) (action Action) { s.eng = eng fd, err := s.eng.Dup() - require.NoErrorf(s.tester, err, "dup error") - assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd) - assert.NoErrorf(s.tester, SysClose(fd), "close fd error") + if len(s.addrs) > 1 { + assert.ErrorIsf(s.tester, err, errorx.ErrUnsupportedOp, "dup error") + } else { + require.NoErrorf(s.tester, err, "dup error") + assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd) + assert.NoErrorf(s.tester, SysClose(fd), "close fd error") + } return } @@ -343,25 +487,24 @@ func (s *testServer) OnOpen(c Conn) (out []byte, action Action) { func (s *testServer) OnShutdown(_ Engine) { fd, err := s.eng.Dup() - require.NoErrorf(s.tester, err, "dup error") - assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd) - assert.NoErrorf(s.tester, SysClose(fd), "close fd error") + if len(s.addrs) > 1 { + assert.ErrorIsf(s.tester, err, errorx.ErrUnsupportedOp, "dup error") + } else { + require.NoErrorf(s.tester, err, "dup error") + assert.Greaterf(s.tester, fd, 2, "expected fd: > 2, but got: %d", fd) + assert.NoErrorf(s.tester, SysClose(fd), "close fd error") + } } func (s *testServer) OnClose(c Conn, err error) (action Action) { if err != nil { logging.Debugf("error occurred on closed, %v\n", err) } - if s.network != "udp" { + if c.LocalAddr().Network() != "udp" { require.Equal(s.tester, c.Context(), c, "invalid context") } - if disconnected := atomic.AddInt32(&s.disconnected, 1); disconnected == atomic.LoadInt32(&s.connected) && disconnected == int32(s.nclients) { //nolint:gocritic - require.EqualValues(s.tester, 0, s.eng.CountConnections()) - action = Shutdown - s.workerPool.Release() - } - + atomic.AddInt32(&s.disconnected, 1) return } @@ -369,8 +512,7 @@ func (s *testServer) OnTraffic(c Conn) (action Action) { if s.async { buf := bbPool.Get() _, _ = c.WriteTo(buf) - - if s.network == "tcp" || s.network == "unix" { + if c.LocalAddr().Network() == "tcp" || c.LocalAddr().Network() == "unix" { // just for test _ = c.InboundBuffered() _ = c.OutboundBuffered() @@ -401,7 +543,7 @@ func (s *testServer) OnTraffic(c Conn) (action Action) { } }) return - } else if s.network == "udp" { + } else if c.LocalAddr().Network() == "udp" { _ = s.workerPool.Submit( func() { _ = c.AsyncWrite(buf.Bytes(), nil) @@ -412,7 +554,12 @@ func (s *testServer) OnTraffic(c Conn) (action Action) { } buf, _ := c.Next(-1) - _, _ = c.Write(buf) + if s.writev { + mid := len(buf) / 2 + _, _ = c.Writev([][]byte{buf[:mid], buf[mid:]}) + } else { + _, _ = c.Write(buf) + } // Only for code coverage of testing. if !s.multicore { @@ -425,14 +572,14 @@ func (s *testServer) OnTraffic(c Conn) (action Action) { // TODO(panjf2000): somehow these two system calls will fail with Unix Domain Socket, // returning "invalid argument" error on macOS in Github actions intermittently, // try to figure it out. - if s.network == "unix" && runtime.GOOS == "darwin" { + if c.LocalAddr().Network() == "unix" && runtime.GOOS == "darwin" { _ = c.SetReadBuffer(streamLen) _ = c.SetWriteBuffer(streamLen) } else { assert.NoErrorf(s.tester, c.SetReadBuffer(streamLen), "set read buffer error") assert.NoErrorf(s.tester, c.SetWriteBuffer(streamLen), "set write buffer error") } - if s.network == "tcp" { + if c.LocalAddr().Network() == "tcp" { assert.NoErrorf(s.tester, c.SetLinger(1), "set linger error") assert.NoErrorf(s.tester, c.SetNoDelay(false), "set no delay error") assert.NoErrorf(s.tester, c.SetKeepAlivePeriod(time.Minute), "set keep alive period error") @@ -443,44 +590,72 @@ func (s *testServer) OnTraffic(c Conn) (action Action) { } func (s *testServer) OnTick() (delay time.Duration, action Action) { - delay = time.Second / 5 + delay = 100 * time.Millisecond if atomic.CompareAndSwapInt32(&s.started, 0, 1) { - for i := 0; i < s.nclients; i++ { - atomic.AddInt32(&s.clientActive, 1) - go func() { - startClient(s.tester, s.network, s.addr, s.multicore, s.async) - atomic.AddInt32(&s.clientActive, -1) - }() + for _, protoAddr := range s.addrs { + proto, addr, err := parseProtoAddr(protoAddr) + assert.NoError(s.tester, err) + for i := 0; i < s.nclients; i++ { + atomic.AddInt32(&s.clientActive, 1) + go func() { + startClient(s.tester, proto, addr, s.multicore, s.async) + atomic.AddInt32(&s.clientActive, -1) + }() + } } } - if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 { - action = Shutdown - return + if atomic.LoadInt32(&s.clientActive) == 0 { + var streamAddrs int + for _, addr := range s.addrs { + if !strings.HasPrefix(addr, "udp") { + streamAddrs++ + } + } + streamConns := s.nclients * streamAddrs + disconnected := atomic.LoadInt32(&s.disconnected) + if int(disconnected) == streamConns && disconnected == atomic.LoadInt32(&s.connected) { + action = Shutdown + s.workerPool.Release() + require.EqualValues(s.tester, 0, s.eng.CountConnections()) + } } return } -func runServer(t *testing.T, network, addr string, et, reuseport, multicore, async, writev bool, nclients int, lb LoadBalancing) { +func runServer(t *testing.T, addrs []string, et, reuseport, multicore, async, writev bool, nclients int, lb LoadBalancing) { ts := &testServer{ tester: t, - network: network, - addr: addr, + addrs: addrs, multicore: multicore, async: async, writev: writev, nclients: nclients, workerPool: goPool.Default(), } - err := Run(ts, - network+"://"+addr, - WithEdgeTriggeredIO(et), - WithLockOSThread(async), - WithMulticore(multicore), - WithReusePort(reuseport), - WithTicker(true), - WithTCPKeepAlive(time.Minute*1), - WithTCPNoDelay(TCPDelay), - WithLoadBalancing(lb)) + var err error + if len(addrs) > 1 { + err = Rotate(ts, + addrs, + WithEdgeTriggeredIO(et), + WithLockOSThread(async), + WithMulticore(multicore), + WithReusePort(reuseport), + WithTicker(true), + WithTCPKeepAlive(time.Minute), + WithTCPNoDelay(TCPDelay), + WithLoadBalancing(lb)) + } else { + err = Run(ts, + addrs[0], + WithEdgeTriggeredIO(et), + WithLockOSThread(async), + WithMulticore(multicore), + WithReusePort(reuseport), + WithTicker(true), + WithTCPKeepAlive(time.Minute), + WithTCPNoDelay(TCPDelay), + WithLoadBalancing(lb)) + } assert.NoError(t, err) } @@ -496,7 +671,7 @@ func startClient(t *testing.T, network, addr string, multicore, async bool) { require.Equal(t, string(msg), "sweetness\r\n", "bad header") } duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 - t.Logf("test duration: %dms", duration/time.Millisecond) + logging.Debugf("test duration: %v", duration) start := time.Now() for time.Since(start) < duration { reqData := make([]byte, streamLen) @@ -546,11 +721,11 @@ func (t *testBadAddrServer) OnBoot(_ Engine) (action Action) { func TestBadAddresses(t *testing.T) { events := new(testBadAddrServer) err := Run(events, "tulip://howdy") - assert.Error(t, err) + assert.ErrorIs(t, err, errorx.ErrUnsupportedProtocol) err = Run(events, "howdy") - assert.Error(t, err) + assert.ErrorIs(t, err, errorx.ErrInvalidNetworkAddress) err = Run(events, "tcp://") - assert.NoError(t, err) + assert.ErrorIs(t, err, errorx.ErrInvalidNetworkAddress) } func TestTick(t *testing.T) { @@ -1487,7 +1662,7 @@ func runSimClient(t *testing.T, network, addr string, packetSize, batch int) { default: duration = 5 * time.Second } - t.Logf("test duration: %ds", duration/time.Second) + logging.Debugf("test duration: %v", duration) start := time.Now() for time.Since(start) < duration { batchSendAndRecv(t, c, rd, packetSize, batch) diff --git a/internal/socket/fd_unix.go b/internal/socket/fd_unix.go index 32e1d5d10..82de8ce27 100644 --- a/internal/socket/fd_unix.go +++ b/internal/socket/fd_unix.go @@ -25,7 +25,7 @@ import ( ) // Dup is the wrapper for dupCloseOnExec. -func Dup(fd int) (int, string, error) { +func Dup(fd int) (int, error) { return dupCloseOnExec(fd) } @@ -34,11 +34,11 @@ func Dup(fd int) (int, string, error) { var tryDupCloexec = int32(1) // dupCloseOnExec dups fd and marks it close-on-exec. -func dupCloseOnExec(fd int) (int, string, error) { +func dupCloseOnExec(fd int) (int, error) { if atomic.LoadInt32(&tryDupCloexec) == 1 { r, err := unix.FcntlInt(uintptr(fd), unix.F_DUPFD_CLOEXEC, 0) if err == nil { - return r, "", nil + return r, nil } switch err.(syscall.Errno) { case unix.EINVAL, unix.ENOSYS: @@ -47,7 +47,7 @@ func dupCloseOnExec(fd int) (int, string, error) { // now on. atomic.StoreInt32(&tryDupCloexec, 0) default: - return -1, "fcntl", err + return -1, err } } return dupCloseOnExecOld(fd) @@ -55,13 +55,13 @@ func dupCloseOnExec(fd int) (int, string, error) { // dupCloseOnExecOld is the traditional way to dup an fd and // set its O_CLOEXEC bit, using two system calls. -func dupCloseOnExecOld(fd int) (int, string, error) { +func dupCloseOnExecOld(fd int) (int, error) { syscall.ForkLock.RLock() defer syscall.ForkLock.RUnlock() newFD, err := syscall.Dup(fd) if err != nil { - return -1, "dup", err + return -1, err } syscall.CloseOnExec(newFD) - return newFD, "", nil + return newFD, nil } diff --git a/listener_unix.go b/listener_unix.go index 95bf0171d..e2d497b61 100644 --- a/listener_unix.go +++ b/listener_unix.go @@ -45,7 +45,7 @@ func (ln *listener) packPollAttachment(handler netpoll.PollEventHandler) *netpol return ln.pollAttachment } -func (ln *listener) dup() (int, string, error) { +func (ln *listener) dup() (int, error) { return socket.Dup(ln.fd) } diff --git a/listener_windows.go b/listener_windows.go index 1e92cdf46..b1a898131 100644 --- a/listener_windows.go +++ b/listener_windows.go @@ -37,9 +37,9 @@ type listener struct { addr net.Addr } -func (l *listener) dup() (int, string, error) { +func (l *listener) dup() (int, error) { if l.ln == nil && l.pc == nil { - return -1, "dup", errorx.ErrUnsupportedOp + return -1, errorx.ErrUnsupportedOp } var ( @@ -53,11 +53,11 @@ func (l *listener) dup() (int, string, error) { } if !ok { - return -1, "dup", errors.New("failed to convert net.Conn to syscall.Conn") + return -1, errors.New("failed to convert net.Conn to syscall.Conn") } rc, err := sc.SyscallConn() if err != nil { - return -1, "dup", errors.New("failed to get syscall.RawConn from net.Conn") + return -1, errors.New("failed to get syscall.RawConn from net.Conn") } var dupHandle windows.Handle @@ -74,13 +74,13 @@ func (l *listener) dup() (int, string, error) { ) }) if err != nil { - return -1, "dup", err + return -1, err } if e != nil { - return -1, "dup", e + return -1, e } - return int(dupHandle), "dup", nil + return int(dupHandle), nil } func (l *listener) close() { diff --git a/os_unix_test.go b/os_unix_test.go index c8acc8bc3..f3d0f0724 100644 --- a/os_unix_test.go +++ b/os_unix_test.go @@ -17,6 +17,8 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sys/unix" + + "github.com/panjf2000/gnet/v2/pkg/logging" ) var ( @@ -111,7 +113,7 @@ func (s *testMcastServer) startMcastClient() { ch := make(chan []byte, 10000) s.mcast.Store(c.LocalAddr().String(), ch) duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2 - s.t.Logf("test duration: %dms", duration/time.Millisecond) + logging.Debugf("test duration: %v", duration) start := time.Now() for time.Since(start) < duration { reqData := make([]byte, 1024) diff --git a/pkg/errors/errors.go b/pkg/errors/errors.go index cc0a24f7d..835ddedf0 100644 --- a/pkg/errors/errors.go +++ b/pkg/errors/errors.go @@ -18,29 +18,31 @@ import "errors" var ( // ErrEmptyEngine occurs when trying to do something with an empty engine. - ErrEmptyEngine = errors.New("the internal engine is empty") + ErrEmptyEngine = errors.New("gnet: the internal engine is empty") // ErrEngineShutdown occurs when server is closing. - ErrEngineShutdown = errors.New("server is going to be shutdown") + ErrEngineShutdown = errors.New("gnet: server is going to be shutdown") // ErrEngineInShutdown occurs when attempting to shut the server down more than once. - ErrEngineInShutdown = errors.New("server is already in shutdown") + ErrEngineInShutdown = errors.New("gnet: server is already in shutdown") // ErrAcceptSocket occurs when acceptor does not accept the new connection properly. - ErrAcceptSocket = errors.New("accept a new connection error") + ErrAcceptSocket = errors.New("gnet: accept a new connection error") // ErrTooManyEventLoopThreads occurs when attempting to set up more than 10,000 event-loop goroutines under LockOSThread mode. - ErrTooManyEventLoopThreads = errors.New("too many event-loops under LockOSThread mode") + ErrTooManyEventLoopThreads = errors.New("gnet: too many event-loops under LockOSThread mode") // ErrUnsupportedProtocol occurs when trying to use protocol that is not supported. - ErrUnsupportedProtocol = errors.New("only unix, tcp/tcp4/tcp6, udp/udp4/udp6 are supported") + ErrUnsupportedProtocol = errors.New("gnet: only unix, tcp/tcp4/tcp6, udp/udp4/udp6 are supported") // ErrUnsupportedTCPProtocol occurs when trying to use an unsupported TCP protocol. - ErrUnsupportedTCPProtocol = errors.New("only tcp/tcp4/tcp6 are supported") + ErrUnsupportedTCPProtocol = errors.New("gnet: only tcp/tcp4/tcp6 are supported") // ErrUnsupportedUDPProtocol occurs when trying to use an unsupported UDP protocol. - ErrUnsupportedUDPProtocol = errors.New("only udp/udp4/udp6 are supported") + ErrUnsupportedUDPProtocol = errors.New("gnet: only udp/udp4/udp6 are supported") // ErrUnsupportedUDSProtocol occurs when trying to use an unsupported Unix protocol. - ErrUnsupportedUDSProtocol = errors.New("only unix is supported") + ErrUnsupportedUDSProtocol = errors.New("gnet: only unix is supported") // ErrUnsupportedPlatform occurs when running gnet on an unsupported platform. - ErrUnsupportedPlatform = errors.New("unsupported platform in gnet") + ErrUnsupportedPlatform = errors.New("gnet: unsupported platform in gnet") // ErrUnsupportedOp occurs when calling some methods that has not been implemented yet. - ErrUnsupportedOp = errors.New("unsupported operation") + ErrUnsupportedOp = errors.New("gnet: unsupported operation") // ErrNegativeSize occurs when trying to pass a negative size to a buffer. - ErrNegativeSize = errors.New("negative size is invalid") + ErrNegativeSize = errors.New("gnet: negative size is not allowed") // ErrNoIPv4AddressOnInterface occurs when an IPv4 multicast address is set on an interface but IPv4 is not configured. - ErrNoIPv4AddressOnInterface = errors.New("no IPv4 address on interface") + ErrNoIPv4AddressOnInterface = errors.New("gnet: no IPv4 address on interface") + // ErrInvalidNetworkAddress occurs when the network address is invalid. + ErrInvalidNetworkAddress = errors.New("gnet: invalid network address") ) diff --git a/reactor_epoll_default.go b/reactor_epoll_default.go index b2c514dd5..bd0e7d88d 100644 --- a/reactor_epoll_default.go +++ b/reactor_epoll_default.go @@ -124,7 +124,7 @@ func (el *eventloop) run() error { err := el.poller.Polling(func(fd int, ev uint32) error { c := el.connections.getConn(fd) if c == nil { - if fd == el.ln.fd { + if _, ok := el.listeners[fd]; ok { return el.accept(fd, ev) } // Somehow epoll notify with an event for a stale fd that is not in our connection set. diff --git a/reactor_kqueue_default.go b/reactor_kqueue_default.go index 4cd533618..bdc103f19 100644 --- a/reactor_kqueue_default.go +++ b/reactor_kqueue_default.go @@ -113,7 +113,7 @@ func (el *eventloop) run() error { err := el.poller.Polling(func(fd int, filter int16, flags uint16) (err error) { c := el.connections.getConn(fd) if c == nil { - if fd == el.ln.fd { + if _, ok := el.listeners[fd]; ok { return el.accept(fd, filter, flags) } // This might happen when the connection has already been closed,