Skip to content

Commit

Permalink
Merge pull request #566 from panjf2000/dev
Browse files Browse the repository at this point in the history
minor: v2.4.0
  • Loading branch information
panjf2000 authored Apr 4, 2024
2 parents 7301eaa + f5e5ef9 commit 4fee147
Show file tree
Hide file tree
Showing 18 changed files with 302 additions and 200 deletions.
23 changes: 23 additions & 0 deletions .github/workflows/gh-translator.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
name: 'gh-translator'

on:
issues:
types: [opened]
pull_request:
types: [opened]
issue_comment:
types: [created, edited]
discussion:
types: [created, edited, answered]
discussion_comment:
types: [created, edited]

jobs:
build:
runs-on: ubuntu-latest
steps:
- uses: usthe/[email protected]
with:
BOT_GITHUB_TOKEN: ${{ secrets.GH_TRANSLATOR_TOKEN }}
IS_MODIFY_TITLE: true
CUSTOM_BOT_NOTE: 🤖 Non-English text detected, translating...
5 changes: 3 additions & 2 deletions acceptor_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
Expand Down Expand Up @@ -51,9 +52,9 @@ func (eng *engine) accept1(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error {

el := eng.eventLoops.next(remoteAddr)
c := newTCPConn(nfd, el, sa, el.ln.addr, remoteAddr)
err = el.poller.UrgentTrigger(el.register, c)
err = el.poller.Trigger(queue.HighPriority, el.register, c)
if err != nil {
eng.opts.Logger.Errorf("UrgentTrigger() failed due to error: %v", err)
eng.opts.Logger.Errorf("failed to enqueue accepted socket of high-priority: %v", err)
_ = unix.Close(nfd)
c.release()
}
Expand Down
2 changes: 1 addition & 1 deletion acceptor_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func (eng *engine) listen() (err error) {
}
el := eng.eventLoops.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- c
el.ch <- &openConn{c: c}
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
Expand Down
147 changes: 83 additions & 64 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package gnet

import (
"bytes"
"io"
"math/rand"
"net"
Expand All @@ -21,12 +22,17 @@ import (
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

type connHandler struct {
network string
rspCh chan []byte
data []byte
}

type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
packetLen int
rspChMap sync.Map
}

func (ev *clientEvents) OnBoot(e Engine) Action {
Expand All @@ -37,11 +43,11 @@ func (ev *clientEvents) OnBoot(e Engine) Action {
return None
}

func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) {
c.SetContext([]byte{})
rspCh := make(chan []byte, 1)
ev.rspChMap.Store(c.LocalAddr().String(), rspCh)
return nil, None
var pingMsg = []byte("PING\r\n")

func (ev *clientEvents) OnOpen(Conn) (out []byte, action Action) {
out = pingMsg
return
}

func (ev *clientEvents) OnClose(Conn, error) Action {
Expand All @@ -54,24 +60,18 @@ func (ev *clientEvents) OnClose(Conn, error) Action {
}

func (ev *clientEvents) OnTraffic(c Conn) (action Action) {
ctx := c.Context()
var p []byte
if ctx != nil {
p = ctx.([]byte)
} else { // UDP
ev.packetLen = 1024
handler := c.Context().(*connHandler)
if handler.network == "udp" {
ev.packetLen = datagramLen
}
buf, err := c.Next(-1)
assert.NoError(ev.tester, err)
p = append(p, buf...)
if len(p) < ev.packetLen {
c.SetContext(p)
handler.data = append(handler.data, buf...)
if len(handler.data) < ev.packetLen {
return
}
v, _ := ev.rspChMap.Load(c.LocalAddr().String())
rspCh := v.(chan []byte)
rspCh <- p
c.SetContext([]byte{})
handler.rspCh <- handler.data
handler.data = nil
return
}

Expand Down Expand Up @@ -199,20 +199,20 @@ func TestServeWithGnetClient(t *testing.T) {

type testClientServer struct {
*BuiltinEventEngine
client *Client
clientEV *clientEvents
tester *testing.T
eng Engine
network string
addr string
multicore bool
async bool
nclients int
started int32
connected int32
clientActive int32
disconnected int32
workerPool *goPool.Pool
client *Client
tester *testing.T
eng Engine
network string
addr string
multicore bool
async bool
nclients int
started int32
connected int32
clientActive int32
disconnected int32
workerPool *goPool.Pool
udpReadHeader int32
}

func (s *testClientServer) OnBoot(eng Engine) (action Action) {
Expand All @@ -221,7 +221,7 @@ func (s *testClientServer) OnBoot(eng Engine) (action Action) {
}

func (s *testClientServer) OnOpen(c Conn) (out []byte, action Action) {
c.SetContext(c)
c.SetContext(&sync.Once{})
atomic.AddInt32(&s.connected, 1)
require.NotNil(s.tester, c.LocalAddr(), "nil local addr")
require.NotNil(s.tester, c.RemoteAddr(), "nil remote addr")
Expand All @@ -233,7 +233,7 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
logging.Debugf("error occurred on closed, %v\n", err)
}
if s.network != "udp" {
require.Equal(s.tester, c.Context(), c, "invalid context")
require.IsType(s.tester, c.Context(), new(sync.Once), "invalid context")
}

atomic.AddInt32(&s.disconnected, 1)
Expand All @@ -246,7 +246,25 @@ func (s *testClientServer) OnClose(c Conn, err error) (action Action) {
return
}

func (s *testClientServer) OnShutdown(Engine) {
if s.network == "udp" {
require.EqualValues(s.tester, int32(s.nclients), atomic.LoadInt32(&s.udpReadHeader))
}
}

func (s *testClientServer) OnTraffic(c Conn) (action Action) {
readHeader := func() {
ping := make([]byte, len(pingMsg))
n, err := io.ReadFull(c, ping)
require.NoError(s.tester, err)
require.EqualValues(s.tester, len(pingMsg), n)
require.Equal(s.tester, string(pingMsg), string(ping), "bad header")
}
v := c.Context()
if v != nil {
v.(*sync.Once).Do(readHeader)
}

if s.async {
buf := bbPool.Get()
_, _ = c.WriteTo(buf)
Expand All @@ -257,14 +275,30 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
_ = c.OutboundBuffered()
_, _ = c.Discard(1)
}
if v == nil && bytes.Equal(buf.Bytes(), pingMsg) {
atomic.AddInt32(&s.udpReadHeader, 1)
buf.Reset()
}
_ = s.workerPool.Submit(
func() {
_ = c.AsyncWrite(buf.Bytes(), nil)
if buf.Len() > 0 {
err := c.AsyncWrite(buf.Bytes(), nil)
require.NoError(s.tester, err)
}
})
return
}

buf, _ := c.Next(-1)
_, _ = c.Write(buf)
if v == nil && bytes.Equal(buf, pingMsg) {
atomic.AddInt32(&s.udpReadHeader, 1)
buf = nil
}
if len(buf) > 0 {
n, err := c.Write(buf)
require.NoError(s.tester, err)
require.EqualValues(s.tester, len(buf), n)
}
return
}

Expand All @@ -277,7 +311,7 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
if i%2 == 0 {
netConn = true
}
go startGnetClient(s.tester, s.client, s.clientEV, s.network, s.addr, s.multicore, s.async, netConn)
go startGnetClient(s.tester, s.client, s.network, s.addr, s.multicore, s.async, netConn)
}
}
if s.network == "udp" && atomic.LoadInt32(&s.clientActive) == 0 {
Expand All @@ -298,9 +332,9 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
workerPool: goPool.Default(),
}
var err error
ts.clientEV = &clientEvents{tester: t, packetLen: streamLen, svr: ts}
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
ts.clientEV,
clientEV,
WithLogLevel(logging.DebugLevel),
WithLockOSThread(true),
WithTicker(true),
Expand All @@ -324,51 +358,36 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
assert.NoError(t, err)
}

func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr string, multicore, async, netDial bool) {
func startGnetClient(t *testing.T, cli *Client, network, addr string, multicore, async, netDial bool) {
rand.Seed(time.Now().UnixNano())
var (
c Conn
err error
)
handler := &connHandler{
network: network,
rspCh: make(chan []byte, 1),
}
if netDial {
var netConn net.Conn
netConn, err = NetDial(network, addr)
require.NoError(t, err)
c, err = cli.Enroll(netConn)
c, err = cli.EnrollContext(netConn, handler)
} else {
c, err = cli.Dial(network, addr)
c, err = cli.DialContext(network, addr, handler)
}
require.NoError(t, err)
defer c.Close()
err = c.Wake(nil)
require.NoError(t, err)
var rspCh chan []byte
if network == "udp" {
rspCh = make(chan []byte, 1)
ev.rspChMap.Store(c.LocalAddr().String(), rspCh)
} else {
var (
v interface{}
ok bool
)
start := time.Now()
for time.Since(start) < time.Second {
v, ok = ev.rspChMap.Load(c.LocalAddr().String())
if ok {
break
}
time.Sleep(10 * time.Millisecond)
}
require.True(t, ok)
rspCh = v.(chan []byte)
}
rspCh := handler.rspCh
duration := time.Duration((rand.Float64()*2+1)*float64(time.Second)) / 2
t.Logf("test duration: %dms", duration/time.Millisecond)
start := time.Now()
for time.Since(start) < duration {
reqData := make([]byte, streamLen)
if network == "udp" {
reqData = reqData[:1024]
reqData = reqData[:datagramLen]
}
_, err = rand.Read(reqData)
require.NoError(t, err)
Expand Down
27 changes: 23 additions & 4 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (

"github.com/panjf2000/gnet/v2/internal/math"
"github.com/panjf2000/gnet/v2/internal/netpoll"
"github.com/panjf2000/gnet/v2/internal/queue"
"github.com/panjf2000/gnet/v2/internal/socket"
"github.com/panjf2000/gnet/v2/pkg/buffer/ring"
errorx "github.com/panjf2000/gnet/v2/pkg/errors"
Expand Down Expand Up @@ -126,7 +127,7 @@ func (cli *Client) Start() error {

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ interface{}) error { return errorx.ErrEngineShutdown }, nil))
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.ticker.cancel()
Expand All @@ -140,15 +141,25 @@ func (cli *Client) Stop() (err error) {

// Dial is like net.Dial().
func (cli *Client) Dial(network, address string) (Conn, error) {
return cli.DialContext(network, address, nil)
}

// DialContext is like Dial but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) DialContext(network, address string, ctx interface{}) (Conn, error) {
c, err := net.Dial(network, address)
if err != nil {
return nil, err
}
return cli.Enroll(c)
return cli.EnrollContext(c, ctx)
}

// Enroll converts a net.Conn to gnet.Conn and then adds it into Client.
func (cli *Client) Enroll(c net.Conn) (Conn, error) {
return cli.EnrollContext(c, nil)
}

// EnrollContext is like Enroll but also accepts an empty interface ctx that can be obtained later via Conn.Context.
func (cli *Client) EnrollContext(c net.Conn, ctx interface{}) (Conn, error) {
defer c.Close()

sc, ok := c.(syscall.Conn)
Expand Down Expand Up @@ -184,7 +195,7 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {

var (
sockAddr unix.Sockaddr
gc Conn
gc *conn
)
switch c.(type) {
case *net.UnixConn:
Expand Down Expand Up @@ -217,10 +228,18 @@ func (cli *Client) Enroll(c net.Conn) (Conn, error) {
default:
return nil, errorx.ErrUnsupportedProtocol
}
err = cli.el.poller.UrgentTrigger(cli.el.register, gc)
gc.ctx = ctx

connOpened := make(chan struct{})
ccb := &connWithCallback{c: gc, cb: func() {
close(connOpened)
}}
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
if err != nil {
gc.Close()
return nil, err
}

<-connOpened
return gc, nil
}
Loading

0 comments on commit 4fee147

Please sign in to comment.