Skip to content

Commit

Permalink
Merge pull request #588 from panjf2000/dev
Browse files Browse the repository at this point in the history
minor: v2.5.0
  • Loading branch information
panjf2000 authored Apr 22, 2024
2 parents 35c7b3b + 5cca785 commit 5c043e6
Show file tree
Hide file tree
Showing 45 changed files with 1,699 additions and 1,053 deletions.
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<br />
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/gnet/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/gnet/test.yml?branch=dev&style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/gnet"><img src="https://img.shields.io/codecov/c/github/panjf2000/gnet?style=flat-square&logo=codecov" /></a>
<a title="Supported Platforms" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/platform-Linux%20%7C%20FreeBSD%20%7C%20DragonFly%20%7C%20NetBSD%20%7C%20OpenBSD%20%7C%20Darwin%20%7C%20Windows-549688?style=flat-square&logo=launchpad" /></a>
<a title="Supported Platforms" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/platform-Linux%20%7C%20macOS%20%7C%20*BSD%20%7C%20Windows-549688?style=flat-square&logo=launchpad" /></a>
<a title="Require Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.17-30dff3?style=flat-square&logo=go" /></a>
<br />
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/gnet"><img src="https://goreportcard.com/badge/github.com/panjf2000/gnet?style=flat-square" /></a>
Expand Down Expand Up @@ -42,7 +42,8 @@ English | [中文](README_ZH.md)
- [x] Flexible ticker event
- [x] Implementation of `gnet` Client
- [x] **Windows** platform support (For compatibility in development only, do not use it in production)
- [ ] Multiple network addresses binding
- [x] **Edge-triggered** I/O support
- [x] Multiple network addresses binding
- [ ] **TLS** support
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) support

Expand Down
9 changes: 5 additions & 4 deletions README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
<br />
<a title="Build Status" target="_blank" href="https://github.com/panjf2000/gnet/actions?query=workflow%3ATests"><img src="https://img.shields.io/github/actions/workflow/status/panjf2000/gnet/test.yml?branch=dev&style=flat-square&logo=github-actions" /></a>
<a title="Codecov" target="_blank" href="https://codecov.io/gh/panjf2000/gnet"><img src="https://img.shields.io/codecov/c/github/panjf2000/gnet?style=flat-square&logo=codecov" /></a>
<a title="Supported Platforms" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/platform-Linux%20%7C%20FreeBSD%20%7C%20DragonFly%20%7C%20NetBSD%20%7C%20OpenBSD%20%7C%20Darwin%20%7C%20Windows-549688?style=flat-square&logo=launchpad" /></a>
<a title="Supported Platforms" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/platform-Linux%20%7C%20macOS%20%7C%20*BSD%20%7C%20Windows-549688?style=flat-square&logo=launchpad" /></a>
<a title="Require Go Version" target="_blank" href="https://github.com/panjf2000/gnet"><img src="https://img.shields.io/badge/go-%3E%3D1.17-30dff3?style=flat-square&logo=go" /></a>
<br />
<a title="Go Report Card" target="_blank" href="https://goreportcard.com/report/github.com/panjf2000/gnet"><img src="https://goreportcard.com/badge/github.com/panjf2000/gnet?style=flat-square" /></a>
Expand Down Expand Up @@ -42,9 +42,10 @@
- [x] 灵活的事件定时器
- [x] 实现 `gnet` 客户端
- [x] 支持 **Windows** 平台 (仅用于开发环境的兼容性,不要在生产环境中使用)
- [ ] 多网络地址绑定
- [ ] 支持 **TLS**
- [ ] 支持 [io_uring](https://kernel.dk/io_uring.pdf)
- [x] **Edge-triggered** I/O 支持
- [x] 多网络地址绑定
- [ ] **TLS** 支持
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) 支持

# 🎬 开始

Expand Down
28 changes: 0 additions & 28 deletions acceptor_bsd.go

This file was deleted.

88 changes: 45 additions & 43 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,53 +26,57 @@ import (
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
)

func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
return nil
default:
eng.opts.Logger.Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {
for {
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EAGAIN: // the Accept queue has been drained out, we can return now
return nil
case unix.EINTR, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
continue
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
return errors.ErrAcceptSocket
}
}
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if eng.opts.TCPKeepAlive > 0 && eng.ln.network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(eng.opts.TCPKeepAlive.Seconds()))
logging.Error(err)
}
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds()))
if err != nil {
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
}
}

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
_ = unix.Close(nfd)
c.release()
el := el.engine.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err)
_ = unix.Close(nfd)
c.release()
}
}
return nil
}

func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.ln.network == "udp" {
return el.readUDP1(fd, ev, flags)
func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
if el.listeners[fd].network == "udp" {
return el.readUDP(fd, ev, flags)
}

nfd, sa, err := socket.Accept(el.ln.fd)
nfd, sa, err := socket.Accept(fd)
if err != nil {
switch err {
case unix.EINTR, unix.EAGAIN, unix.ECONNABORTED:
// ECONNABORTED means that a socket on the listen
// queue was closed before we Accept()ed it;
// it's a silly error, so try again.
case unix.EINTR, unix.EAGAIN, unix.ECONNRESET, unix.ECONNABORTED:
// ECONNRESET or ECONNABORTED could indicate that a socket
// in the Accept queue was closed before we Accept()ed it.
// It's a silly error, let's retry it.
return nil
default:
el.getLogger().Errorf("Accept() failed due to error: %v", err)
Expand All @@ -81,15 +85,13 @@ func (el *eventloop) accept1(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags)
}

remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
if el.engine.opts.TCPKeepAlive > 0 && el.ln.network == "tcp" {
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second))
logging.Error(err)
if err != nil {
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
}
}

c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
if err = el.poller.AddRead(&c.pollAttachment); err != nil {
return err
}
el.connections.addConn(c, el.idx)
return el.open(c)
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
return el.register0(c)
}
100 changes: 54 additions & 46 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,64 +23,72 @@ import (
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
)

func (eng *engine) listen() (err error) {
func (eng *engine) listenStream(ln net.Listener) (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
if eng.ln.pc != nil {
// Read data from UDP socket.
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
// Accept TCP socket.
tc, e := ln.Accept()
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: errors.Join() is not supported until Go 1.20,
// we will uncomment this line after we bump up the
// minimal supported go version to 1.20.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- &openConn{c: c}
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
return
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
}
}

el := eng.eventLoops.next(addr)
c := newUDPConn(el, eng.ln.addr, addr)
el.ch <- packUDPConn(c, buffer[:n])
} else {
// Accept TCP socket.
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
return
func (eng *engine) ListenUDP(pc net.PacketConn) (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
// Read data from UDP socket.
n, addr, e := pc.ReadFrom(buffer[:])
if e != nil {
err = e
if atomic.LoadInt32(&eng.beingShutdown) == 0 {
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
} else if errors.Is(err, net.ErrClosed) {
err = errorx.ErrEngineShutdown
// TODO: ditto.
// err = errors.Join(err, errorx.ErrEngineShutdown)
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- &openConn{c: c}
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
return
}
el := eng.eventLoops.next(addr)
c := newUDPConn(el, pc, pc.LocalAddr(), addr)
el.ch <- packUDPConn(c, buffer[:n])
}
}
Loading

0 comments on commit 5c043e6

Please sign in to comment.