Skip to content

Commit

Permalink
opt: improve comments on Conn and test cases (#471)
Browse files Browse the repository at this point in the history
Fixes #470
  • Loading branch information
panjf2000 authored May 24, 2023
1 parent f80734a commit f6de680
Show file tree
Hide file tree
Showing 14 changed files with 169 additions and 95 deletions.
19 changes: 18 additions & 1 deletion client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,28 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

gerr "github.com/panjf2000/gnet/v2/pkg/errors"
"github.com/panjf2000/gnet/v2/pkg/logging"
bbPool "github.com/panjf2000/gnet/v2/pkg/pool/bytebuffer"
goPool "github.com/panjf2000/gnet/v2/pkg/pool/goroutine"
)

type clientEvents struct {
*BuiltinEventEngine
tester *testing.T
svr *testClientServer
packetLen int
rspChMap sync.Map
}

func (ev *clientEvents) OnBoot(e Engine) Action {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
return None
}

func (ev *clientEvents) OnOpen(c Conn) ([]byte, Action) {
c.SetContext([]byte{})
rspCh := make(chan []byte, 1)
Expand Down Expand Up @@ -68,6 +78,13 @@ func (ev *clientEvents) OnTick() (delay time.Duration, action Action) {
return
}

func (ev *clientEvents) OnShutdown(e Engine) {
fd, err := e.Dup()
require.ErrorIsf(ev.tester, err, gerr.ErrEmptyEngine, "expected error: %v, but got: %v",
gerr.ErrUnsupportedOp, err)
assert.EqualValuesf(ev.tester, fd, -1, "expected -1, but got: %d", fd)
}

func TestServeWithGnetClient(t *testing.T) {
// start an engine
// connect 10 clients
Expand Down Expand Up @@ -279,7 +296,7 @@ func testServeWithGnetClient(t *testing.T, network, addr string, reuseport, reus
workerPool: goPool.Default(),
}
var err error
ts.clientEV = &clientEvents{packetLen: streamLen, svr: ts}
ts.clientEV = &clientEvents{tester: t, packetLen: streamLen, svr: ts}
ts.client, err = NewClient(
ts.clientEV,
WithLogLevel(logging.DebugLevel),
Expand Down
6 changes: 4 additions & 2 deletions client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import (
"errors"
"net"
"strconv"
"sync"
"syscall"

"golang.org/x/sync/errgroup"
Expand Down Expand Up @@ -66,14 +67,15 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {

shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := engine{
ln: &listener{network: "udp"},
ln: &listener{},
opts: options,
eventHandler: eh,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}
if options.Ticker {
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
Expand Down
3 changes: 2 additions & 1 deletion client_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
eventHandler: eh,
}
cli.el = &eventloop{
Expand Down
28 changes: 12 additions & 16 deletions connection_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -246,8 +246,6 @@ func (c *conn) resetBuffer() {
c.inboundBuffer.Reset()
}

// ================================== Non-concurrency-safe API's ==================================

func (c *conn) Read(p []byte) (n int, err error) {
if c.inboundBuffer.IsEmpty() {
n = copy(p, c.buffer)
Expand Down Expand Up @@ -395,18 +393,6 @@ func (c *conn) OutboundBuffered() int {
return c.outboundBuffer.Buffered()
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
Expand Down Expand Up @@ -434,8 +420,6 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
return socket.SetKeepAlivePeriod(c.fd, int(d.Seconds()))
}

// ==================================== Concurrency-safe API's ====================================

func (c *conn) AsyncWrite(buf []byte, callback AsyncCallback) error {
if c.isDatagram {
defer func() {
Expand Down Expand Up @@ -481,3 +465,15 @@ func (c *conn) Close() error {
return
}, nil)
}

func (*conn) SetDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return gerrors.ErrUnsupportedOp
}
27 changes: 12 additions & 15 deletions connection_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,8 +114,6 @@ func (c *conn) resetBuffer() {
c.inboundBuffer.Reset()
}

// ================================== Non-concurrency-safe API's ==================================

func (c *conn) Read(p []byte) (n int, err error) {
if c.inboundBuffer.IsEmpty() {
n = copy(p, c.buffer.B)
Expand Down Expand Up @@ -264,17 +262,6 @@ func (c *conn) OutboundBuffered() int {
return 0
}

func (*conn) SetDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}
func (c *conn) Context() interface{} { return c.ctx }
func (c *conn) SetContext(ctx interface{}) { c.ctx = ctx }
func (c *conn) LocalAddr() net.Addr { return c.localAddr }
Expand Down Expand Up @@ -412,8 +399,6 @@ func (c *conn) SetKeepAlivePeriod(d time.Duration) error {
// this method is only implemented for compatibility, don't use it on Windows.
// func (c *conn) Gfd() gfd.GFD { return gfd.GFD{} }

// ==================================== Concurrency-safe API's ====================================

func (c *conn) AsyncWrite(buf []byte, cb AsyncCallback) error {
if cb == nil {
cb = func(c Conn, err error) error { return nil }
Expand Down Expand Up @@ -482,3 +467,15 @@ func (c *conn) CloseWithCallback(cb AsyncCallback) error {
}
return nil
}

func (*conn) SetDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetReadDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}

func (*conn) SetWriteDeadline(_ time.Time) error {
return errorx.ErrUnsupportedOp
}
19 changes: 12 additions & 7 deletions engine_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ package gnet
import (
"context"
"runtime"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -44,6 +45,7 @@ type engine struct {

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}
eventHandler EventHandler // user eventHandler
}
Expand All @@ -58,7 +60,9 @@ func (eng *engine) shutdown(err error) {
eng.opts.Logger.Errorf("engine is being shutdown with error: %v", err)
}

eng.workerPool.shutdown()
eng.workerPool.once.Do(func() {
eng.workerPool.shutdown()
})
}

func (eng *engine) startEventLoops() {
Expand All @@ -70,6 +74,7 @@ func (eng *engine) startEventLoops() {

func (eng *engine) closeEventLoops() {
eng.lb.iterate(func(i int, el *eventloop) bool {
el.ln.close()
_ = el.poller.Close()
return true
})
Expand All @@ -85,7 +90,6 @@ func (eng *engine) startSubReactors() {
func (eng *engine) activateEventLoops(numEventLoop int) (err error) {
network, address := eng.ln.network, eng.ln.address
ln := eng.ln
eng.ln = nil
var striker *eventloop
// Create loops locally and bind the listeners.
for i := 0; i < numEventLoop; i++ {
Expand Down Expand Up @@ -190,17 +194,15 @@ func (eng *engine) stop(s Engine) {

eng.eventHandler.OnShutdown(s)

// Notify all loops to close by closing all listeners
// Notify all event-loops to exit.
eng.lb.iterate(func(i int, el *eventloop) bool {
err := el.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on sub event-loop when stopping engine: %v", err)
}
return true
})

if eng.mainLoop != nil {
eng.ln.close()
err := eng.mainLoop.poller.UrgentTrigger(func(_ interface{}) error { return errors.ErrEngineShutdown }, nil)
if err != nil {
eng.opts.Logger.Errorf("failed to call UrgentTrigger on main event-loop when stopping engine: %v", err)
Expand All @@ -216,15 +218,17 @@ func (eng *engine) stop(s Engine) {
eng.opts.Logger.Errorf("engine shutdown error: %v", err)
}

// Close all listeners and pollers of event-loops.
eng.closeEventLoops()

if eng.mainLoop != nil {
eng.ln.close()
err := eng.mainLoop.poller.Close()
if err != nil {
eng.opts.Logger.Errorf("failed to close poller when stopping engine: %v", err)
}
}

// Put the engine into the shutdown state.
atomic.StoreInt32(&eng.inShutdown, 1)
}

Expand All @@ -249,7 +253,8 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
eventHandler: eventHandler,
}
switch options.LB {
Expand Down
5 changes: 4 additions & 1 deletion engine_windows.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package gnet
import (
"context"
"runtime"
"sync"
"sync/atomic"

"golang.org/x/sync/errgroup"
Expand All @@ -38,6 +39,7 @@ type engine struct {

shutdownCtx context.Context
shutdown context.CancelFunc
once sync.Once
}
eventHandler EventHandler // user eventHandler
}
Expand Down Expand Up @@ -123,7 +125,8 @@ func run(eventHandler EventHandler, listener *listener, options *Options, protoA
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
once sync.Once
}{&errgroup.Group{}, shutdownCtx, shutdown, sync.Once{}},
}

switch options.LB {
Expand Down
Loading

0 comments on commit f6de680

Please sign in to comment.