Skip to content

Commit 27baf9a

Browse files
authored
Merge pull request #710 from panjf2000/dev
minor: v2.9.0
2 parents 629c05e + 0b2053a commit 27baf9a

25 files changed

+490
-227
lines changed

.github/workflows/test.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ jobs:
4949
- name: Setup and run golangci-lint
5050
uses: golangci/golangci-lint-action@v7
5151
with:
52-
version: v2.1.5
52+
version: v2.1.6
5353
args: -v -E gocritic -E misspell -E revive -E godot --timeout 5m
5454
test:
5555
needs: lint

.github/workflows/test_gc_opt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ jobs:
4949
- name: Setup and run golangci-lint
5050
uses: golangci/golangci-lint-action@v7
5151
with:
52-
version: v2.1.5
52+
version: v2.1.6
5353
args: -v -E gocritic -E misspell -E revive -E godot --timeout 5m
5454
test:
5555
needs: lint

.github/workflows/test_poll_opt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
- name: Setup and run golangci-lint
4949
uses: golangci/golangci-lint-action@v7
5050
with:
51-
version: v2.1.5
51+
version: v2.1.6
5252
args: -v -E gocritic -E misspell -E revive -E godot
5353
test:
5454
needs: lint

.github/workflows/test_poll_opt_gc_opt.yml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ jobs:
4848
- name: Setup and run golangci-lint
4949
uses: golangci/golangci-lint-action@v7
5050
with:
51-
version: v2.1.5
51+
version: v2.1.6
5252
args: -v -E gocritic -E misspell -E revive -E godot
5353
test:
5454
needs: lint

acceptor_unix.go

Lines changed: 32 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,7 @@
1717
package gnet
1818

1919
import (
20-
"time"
20+
"runtime"
2121

2222
"golang.org/x/sys/unix"
2323

@@ -45,15 +45,26 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
4545
}
4646

4747
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
48-
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
49-
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive.Seconds()))
50-
if err != nil {
48+
network := el.listeners[fd].network
49+
if opts := el.engine.opts; opts.TCPKeepAlive > 0 && network == "tcp" &&
50+
(runtime.GOOS != "linux" && runtime.GOOS != "freebsd" && runtime.GOOS != "dragonfly") {
51+
// TCP keepalive options are not inherited from the listening socket
52+
// on platforms other than Linux, FreeBSD, or DragonFlyBSD.
53+
// We therefore need to set them on the accepted socket explicitly.
54+
//
55+
// Check out https://github.com/nginx/nginx/pull/337 for details.
56+
if err = setKeepAlive(
57+
nfd,
58+
true,
59+
opts.TCPKeepAlive,
60+
opts.TCPKeepInterval,
61+
opts.TCPKeepCount); err != nil {
5162
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
5263
}
5364
}
5465

5566
el := el.engine.eventLoops.next(remoteAddr)
56-
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
67+
c := newStreamConn(network, nfd, el, sa, el.listeners[fd].addr, remoteAddr)
5768
err = el.poller.Trigger(queue.HighPriority, el.register, c)
5869
if err != nil {
5970
el.getLogger().Errorf("failed to enqueue the accepted socket fd=%d to poller: %v", c.fd, err)
@@ -64,7 +75,8 @@ func (el *eventloop) accept0(fd int, _ netpoll.IOEvent, _ netpoll.IOFlags) error
6475
}
6576

6677
func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) error {
67-
if el.listeners[fd].network == "udp" {
78+
network := el.listeners[fd].network
79+
if network == "udp" {
6880
return el.readUDP(fd, ev, flags)
6981
}
7082

@@ -82,13 +94,23 @@ func (el *eventloop) accept(fd int, ev netpoll.IOEvent, flags netpoll.IOFlags) e
8294
}
8395

8496
remoteAddr := socket.SockaddrToTCPOrUnixAddr(sa)
85-
if el.engine.opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" {
86-
err = socket.SetKeepAlivePeriod(nfd, int(el.engine.opts.TCPKeepAlive/time.Second))
87-
if err != nil {
97+
if opts := el.engine.opts; opts.TCPKeepAlive > 0 && el.listeners[fd].network == "tcp" &&
98+
(runtime.GOOS != "linux" && runtime.GOOS != "freebsd" && runtime.GOOS != "dragonfly") {
99+
// TCP keepalive options are not inherited from the listening socket
100+
// on platforms other than Linux, FreeBSD, or DragonFlyBSD.
101+
// We therefore need to set them on the accepted socket explicitly.
102+
//
103+
// Check out https://github.com/nginx/nginx/pull/337 for details.
104+
if err = setKeepAlive(
105+
nfd,
106+
true,
107+
opts.TCPKeepAlive,
108+
opts.TCPKeepInterval,
109+
opts.TCPKeepCount); err != nil {
88110
el.getLogger().Errorf("failed to set TCP keepalive on fd=%d: %v", fd, err)
89111
}
90112
}
91113

92-
c := newTCPConn(nfd, el, sa, el.listeners[fd].addr, remoteAddr)
114+
c := newStreamConn(network, nfd, el, sa, el.listeners[fd].addr, remoteAddr)
93115
return el.register0(c)
94116
}

acceptor_windows.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -44,7 +44,7 @@ func (eng *engine) listenStream(ln net.Listener) (err error) {
4444
return
4545
}
4646
el := eng.eventLoops.next(tc.RemoteAddr())
47-
c := newTCPConn(el, tc, nil)
47+
c := newStreamConn(el, tc, nil)
4848
el.ch <- &openConn{c: c}
4949
goroutine.DefaultWorkerPool.Submit(func() {
5050
var buffer [0x10000]byte

client_test.go

Lines changed: 10 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ type connHandler struct {
3131

3232
type clientEvents struct {
3333
*BuiltinEventEngine
34-
tester *testing.T
35-
svr *testClient
36-
packetLen int
34+
tester *testing.T
35+
svr *testClient
3736
}
3837

3938
func (ev *clientEvents) OnBoot(e Engine) Action {
@@ -67,13 +66,14 @@ func (ev *clientEvents) OnClose(Conn, error) Action {
6766

6867
func (ev *clientEvents) OnTraffic(c Conn) (action Action) {
6968
handler := c.Context().(*connHandler)
69+
packetLen := streamLen
7070
if handler.network == "udp" {
71-
ev.packetLen = datagramLen
71+
packetLen = datagramLen
7272
}
7373
buf, err := c.Next(-1)
7474
assert.NoError(ev.tester, err)
7575
handler.data = append(handler.data, buf...)
76-
if len(handler.data) < ev.packetLen {
76+
if len(handler.data) < packetLen {
7777
return
7878
}
7979
handler.rspCh <- handler.data
@@ -501,9 +501,10 @@ func runClient(t *testing.T, network, addr string, conf *testConf) {
501501
nclients: conf.clients,
502502
}
503503
var err error
504-
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
504+
clientEV := &clientEvents{tester: t, svr: ts}
505505
ts.client, err = NewClient(
506506
clientEV,
507+
WithMulticore(conf.multicore),
507508
WithEdgeTriggeredIO(conf.et),
508509
WithEdgeTriggeredIOChunk(conf.etChunk),
509510
WithTCPNoDelay(TCPNoDelay),
@@ -524,7 +525,9 @@ func runClient(t *testing.T, network, addr string, conf *testConf) {
524525
WithMulticore(conf.multicore),
525526
WithReusePort(conf.reuseport),
526527
WithTicker(true),
527-
WithTCPKeepAlive(time.Minute*1),
528+
WithTCPKeepAlive(time.Minute),
529+
WithTCPKeepInterval(time.Second*10),
530+
WithTCPKeepCount(10),
528531
WithLoadBalancing(conf.lb))
529532
assert.NoError(t, err)
530533
}

client_unix.go

Lines changed: 76 additions & 36 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
// Client of gnet.
3939
type Client struct {
4040
opts *Options
41-
el *eventloop
41+
eng *engine
4242
}
4343

4444
// NewClient creates an instance of Client.
@@ -59,28 +59,19 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
5959
}
6060
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)
6161

62-
var p *netpoll.Poller
63-
if p, err = netpoll.OpenPoller(); err != nil {
64-
return
65-
}
66-
6762
rootCtx, shutdown := context.WithCancel(context.Background())
6863
eg, ctx := errgroup.WithContext(rootCtx)
6964
eng := engine{
7065
listeners: make(map[int]*listener),
7166
opts: options,
7267
turnOff: shutdown,
7368
eventHandler: eh,
69+
eventLoops: new(leastConnectionsLoadBalancer),
7470
concurrency: struct {
7571
*errgroup.Group
7672
ctx context.Context
7773
}{eg, ctx},
7874
}
79-
el := eventloop{
80-
listeners: eng.listeners,
81-
engine: &eng,
82-
poller: p,
83-
}
8475

8576
if options.EdgeTriggeredIOChunk > 0 {
8677
options.EdgeTriggeredIO = true
@@ -107,39 +98,82 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
10798
default:
10899
options.WriteBufferCap = math.CeilToPowerOfTwo(wbc)
109100
}
110-
111-
el.buffer = make([]byte, options.ReadBufferCap)
112-
el.connections.init()
113-
el.eventHandler = eh
114-
cli.el = &el
101+
cli.eng = &eng
115102
return
116103
}
117104

118105
// Start starts the client event-loop, handing IO events.
119106
func (cli *Client) Start() error {
120-
logging.Infof("Starting gnet client with 1 event-loop")
121-
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
122-
cli.el.engine.concurrency.Go(cli.el.run)
107+
numEventLoop := determineEventLoops(cli.opts)
108+
logging.Infof("Starting gnet client with %d event loops", numEventLoop)
109+
110+
cli.eng.eventHandler.OnBoot(Engine{cli.eng})
111+
112+
var el0 *eventloop
113+
for i := 0; i < numEventLoop; i++ {
114+
p, err := netpoll.OpenPoller()
115+
if err != nil {
116+
cli.eng.closeEventLoops()
117+
return err
118+
}
119+
el := eventloop{
120+
listeners: cli.eng.listeners,
121+
engine: cli.eng,
122+
poller: p,
123+
buffer: make([]byte, cli.opts.ReadBufferCap),
124+
eventHandler: cli.eng.eventHandler,
125+
}
126+
el.connections.init()
127+
cli.eng.eventLoops.register(&el)
128+
if cli.opts.Ticker && el.idx == 0 {
129+
el0 = &el
130+
}
131+
}
132+
133+
cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
134+
cli.eng.concurrency.Go(el.run)
135+
return true
136+
})
137+
123138
// Start the ticker.
124-
if cli.opts.Ticker {
125-
ctx := cli.el.engine.concurrency.ctx
126-
cli.el.engine.concurrency.Go(func() error {
127-
cli.el.ticker(ctx)
139+
if el0 != nil {
140+
ctx := cli.eng.concurrency.ctx
141+
cli.eng.concurrency.Go(func() error {
142+
el0.ticker(ctx)
128143
return nil
129144
})
130145
}
146+
131147
logging.Debugf("default logging level is %s", logging.LogLevel())
148+
132149
return nil
133150
}
134151

135152
// Stop stops the client event-loop.
136-
func (cli *Client) Stop() (err error) {
137-
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
138-
err = cli.el.engine.concurrency.Wait()
139-
logging.Error(cli.el.poller.Close())
140-
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
153+
func (cli *Client) Stop() error {
154+
cli.eng.shutdown(nil)
155+
156+
cli.eng.eventHandler.OnShutdown(Engine{cli.eng})
157+
158+
// Notify all event-loops to exit.
159+
cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
160+
logging.Error(el.poller.Trigger(queue.HighPriority,
161+
func(_ any) error { return errorx.ErrEngineShutdown }, nil))
162+
return true
163+
})
164+
165+
// Wait for all event-loops to exit.
166+
err := cli.eng.concurrency.Wait()
167+
168+
cli.eng.closeEventLoops()
169+
170+
// Put the engine into the shutdown state.
171+
cli.eng.inShutdown.Store(true)
172+
173+
// Flush the logger.
141174
logging.Cleanup()
142-
return
175+
176+
return err
143177
}
144178

145179
// Dial is like net.Dial().
@@ -156,7 +190,7 @@ func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
156190
return cli.EnrollContext(c, ctx)
157191
}
158192

159-
// Enroll converts a net.Conn to gnet.Conn and then adds it into Client.
193+
// Enroll converts a net.Conn to gnet.Conn and then adds it into the Client.
160194
func (cli *Client) Enroll(c net.Conn) (Conn, error) {
161195
return cli.EnrollContext(c, nil)
162196
}
@@ -196,6 +230,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
196230
}
197231
}
198232

233+
el := cli.eng.eventLoops.next(nil)
199234
var (
200235
sockAddr unix.Sockaddr
201236
gc *conn
@@ -208,29 +243,34 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
208243
}
209244
ua := c.LocalAddr().(*net.UnixAddr)
210245
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(dupFD)
211-
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
246+
gc = newStreamConn("unix", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
212247
case *net.TCPConn:
213248
if cli.opts.TCPNoDelay == TCPNoDelay {
214249
if err = socket.SetNoDelay(dupFD, 1); err != nil {
215250
return nil, err
216251
}
217252
}
218253
if cli.opts.TCPKeepAlive > 0 {
219-
if err = socket.SetKeepAlivePeriod(dupFD, int(cli.opts.TCPKeepAlive.Seconds())); err != nil {
254+
if err = setKeepAlive(
255+
dupFD,
256+
true,
257+
cli.opts.TCPKeepAlive,
258+
cli.opts.TCPKeepInterval,
259+
cli.opts.TCPKeepCount); err != nil {
220260
return nil, err
221261
}
222262
}
223263
sockAddr, _, _, _, err = socket.GetTCPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
224264
if err != nil {
225265
return nil, err
226266
}
227-
gc = newTCPConn(dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
267+
gc = newStreamConn("tcp", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
228268
case *net.UDPConn:
229269
sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
230270
if err != nil {
231271
return nil, err
232272
}
233-
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
273+
gc = newUDPConn(dupFD, el, c.LocalAddr(), sockAddr, true)
234274
default:
235275
return nil, errorx.ErrUnsupportedProtocol
236276
}
@@ -240,12 +280,12 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
240280
ccb := &connWithCallback{c: gc, cb: func() {
241281
close(connOpened)
242282
}}
243-
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
283+
err = el.poller.Trigger(queue.HighPriority, el.register, ccb)
244284
if err != nil {
245285
gc.Close() //nolint:errcheck
246286
return nil, err
247287
}
248-
249288
<-connOpened
289+
250290
return gc, nil
251291
}

0 commit comments

Comments
 (0)