Skip to content

Commit 0b2053a

Browse files
authored
feat: support running client with multiple event loops (#709)
Fixes #697
1 parent 902da4e commit 0b2053a

File tree

7 files changed

+202
-145
lines changed

7 files changed

+202
-145
lines changed

client_test.go

Lines changed: 7 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -31,9 +31,8 @@ type connHandler struct {
3131

3232
type clientEvents struct {
3333
*BuiltinEventEngine
34-
tester *testing.T
35-
svr *testClient
36-
packetLen int
34+
tester *testing.T
35+
svr *testClient
3736
}
3837

3938
func (ev *clientEvents) OnBoot(e Engine) Action {
@@ -67,13 +66,14 @@ func (ev *clientEvents) OnClose(Conn, error) Action {
6766

6867
func (ev *clientEvents) OnTraffic(c Conn) (action Action) {
6968
handler := c.Context().(*connHandler)
69+
packetLen := streamLen
7070
if handler.network == "udp" {
71-
ev.packetLen = datagramLen
71+
packetLen = datagramLen
7272
}
7373
buf, err := c.Next(-1)
7474
assert.NoError(ev.tester, err)
7575
handler.data = append(handler.data, buf...)
76-
if len(handler.data) < ev.packetLen {
76+
if len(handler.data) < packetLen {
7777
return
7878
}
7979
handler.rspCh <- handler.data
@@ -501,9 +501,10 @@ func runClient(t *testing.T, network, addr string, conf *testConf) {
501501
nclients: conf.clients,
502502
}
503503
var err error
504-
clientEV := &clientEvents{tester: t, packetLen: streamLen, svr: ts}
504+
clientEV := &clientEvents{tester: t, svr: ts}
505505
ts.client, err = NewClient(
506506
clientEV,
507+
WithMulticore(conf.multicore),
507508
WithEdgeTriggeredIO(conf.et),
508509
WithEdgeTriggeredIOChunk(conf.etChunk),
509510
WithTCPNoDelay(TCPNoDelay),

client_unix.go

Lines changed: 70 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,7 @@ import (
3838
// Client of gnet.
3939
type Client struct {
4040
opts *Options
41-
el *eventloop
41+
eng *engine
4242
}
4343

4444
// NewClient creates an instance of Client.
@@ -59,28 +59,19 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
5959
}
6060
logging.SetDefaultLoggerAndFlusher(logger, logFlusher)
6161

62-
var p *netpoll.Poller
63-
if p, err = netpoll.OpenPoller(); err != nil {
64-
return
65-
}
66-
6762
rootCtx, shutdown := context.WithCancel(context.Background())
6863
eg, ctx := errgroup.WithContext(rootCtx)
6964
eng := engine{
7065
listeners: make(map[int]*listener),
7166
opts: options,
7267
turnOff: shutdown,
7368
eventHandler: eh,
69+
eventLoops: new(leastConnectionsLoadBalancer),
7470
concurrency: struct {
7571
*errgroup.Group
7672
ctx context.Context
7773
}{eg, ctx},
7874
}
79-
el := eventloop{
80-
listeners: eng.listeners,
81-
engine: &eng,
82-
poller: p,
83-
}
8475

8576
if options.EdgeTriggeredIOChunk > 0 {
8677
options.EdgeTriggeredIO = true
@@ -107,39 +98,82 @@ func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
10798
default:
10899
options.WriteBufferCap = math.CeilToPowerOfTwo(wbc)
109100
}
110-
111-
el.buffer = make([]byte, options.ReadBufferCap)
112-
el.connections.init()
113-
el.eventHandler = eh
114-
cli.el = &el
101+
cli.eng = &eng
115102
return
116103
}
117104

118105
// Start starts the client event-loop, handing IO events.
119106
func (cli *Client) Start() error {
120-
logging.Infof("Starting gnet client with 1 event-loop")
121-
cli.el.eventHandler.OnBoot(Engine{cli.el.engine})
122-
cli.el.engine.concurrency.Go(cli.el.run)
107+
numEventLoop := determineEventLoops(cli.opts)
108+
logging.Infof("Starting gnet client with %d event loops", numEventLoop)
109+
110+
cli.eng.eventHandler.OnBoot(Engine{cli.eng})
111+
112+
var el0 *eventloop
113+
for i := 0; i < numEventLoop; i++ {
114+
p, err := netpoll.OpenPoller()
115+
if err != nil {
116+
cli.eng.closeEventLoops()
117+
return err
118+
}
119+
el := eventloop{
120+
listeners: cli.eng.listeners,
121+
engine: cli.eng,
122+
poller: p,
123+
buffer: make([]byte, cli.opts.ReadBufferCap),
124+
eventHandler: cli.eng.eventHandler,
125+
}
126+
el.connections.init()
127+
cli.eng.eventLoops.register(&el)
128+
if cli.opts.Ticker && el.idx == 0 {
129+
el0 = &el
130+
}
131+
}
132+
133+
cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
134+
cli.eng.concurrency.Go(el.run)
135+
return true
136+
})
137+
123138
// Start the ticker.
124-
if cli.opts.Ticker {
125-
ctx := cli.el.engine.concurrency.ctx
126-
cli.el.engine.concurrency.Go(func() error {
127-
cli.el.ticker(ctx)
139+
if el0 != nil {
140+
ctx := cli.eng.concurrency.ctx
141+
cli.eng.concurrency.Go(func() error {
142+
el0.ticker(ctx)
128143
return nil
129144
})
130145
}
146+
131147
logging.Debugf("default logging level is %s", logging.LogLevel())
148+
132149
return nil
133150
}
134151

135152
// Stop stops the client event-loop.
136-
func (cli *Client) Stop() (err error) {
137-
logging.Error(cli.el.poller.Trigger(queue.HighPriority, func(_ any) error { return errorx.ErrEngineShutdown }, nil))
138-
err = cli.el.engine.concurrency.Wait()
139-
logging.Error(cli.el.poller.Close())
140-
cli.el.eventHandler.OnShutdown(Engine{cli.el.engine})
153+
func (cli *Client) Stop() error {
154+
cli.eng.shutdown(nil)
155+
156+
cli.eng.eventHandler.OnShutdown(Engine{cli.eng})
157+
158+
// Notify all event-loops to exit.
159+
cli.eng.eventLoops.iterate(func(_ int, el *eventloop) bool {
160+
logging.Error(el.poller.Trigger(queue.HighPriority,
161+
func(_ any) error { return errorx.ErrEngineShutdown }, nil))
162+
return true
163+
})
164+
165+
// Wait for all event-loops to exit.
166+
err := cli.eng.concurrency.Wait()
167+
168+
cli.eng.closeEventLoops()
169+
170+
// Put the engine into the shutdown state.
171+
cli.eng.inShutdown.Store(true)
172+
173+
// Flush the logger.
141174
logging.Cleanup()
142-
return
175+
176+
return err
143177
}
144178

145179
// Dial is like net.Dial().
@@ -156,7 +190,7 @@ func (cli *Client) DialContext(network, address string, ctx any) (Conn, error) {
156190
return cli.EnrollContext(c, ctx)
157191
}
158192

159-
// Enroll converts a net.Conn to gnet.Conn and then adds it into Client.
193+
// Enroll converts a net.Conn to gnet.Conn and then adds it into the Client.
160194
func (cli *Client) Enroll(c net.Conn) (Conn, error) {
161195
return cli.EnrollContext(c, nil)
162196
}
@@ -196,6 +230,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
196230
}
197231
}
198232

233+
el := cli.eng.eventLoops.next(nil)
199234
var (
200235
sockAddr unix.Sockaddr
201236
gc *conn
@@ -208,7 +243,7 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
208243
}
209244
ua := c.LocalAddr().(*net.UnixAddr)
210245
ua.Name = c.RemoteAddr().String() + "." + strconv.Itoa(dupFD)
211-
gc = newStreamConn("unix", dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
246+
gc = newStreamConn("unix", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
212247
case *net.TCPConn:
213248
if cli.opts.TCPNoDelay == TCPNoDelay {
214249
if err = socket.SetNoDelay(dupFD, 1); err != nil {
@@ -229,13 +264,13 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
229264
if err != nil {
230265
return nil, err
231266
}
232-
gc = newStreamConn("tcp", dupFD, cli.el, sockAddr, c.LocalAddr(), c.RemoteAddr())
267+
gc = newStreamConn("tcp", dupFD, el, sockAddr, c.LocalAddr(), c.RemoteAddr())
233268
case *net.UDPConn:
234269
sockAddr, _, _, _, err = socket.GetUDPSockAddr(c.RemoteAddr().Network(), c.RemoteAddr().String())
235270
if err != nil {
236271
return nil, err
237272
}
238-
gc = newUDPConn(dupFD, cli.el, c.LocalAddr(), sockAddr, true)
273+
gc = newUDPConn(dupFD, el, c.LocalAddr(), sockAddr, true)
239274
default:
240275
return nil, errorx.ErrUnsupportedProtocol
241276
}
@@ -245,12 +280,12 @@ func (cli *Client) EnrollContext(c net.Conn, ctx any) (Conn, error) {
245280
ccb := &connWithCallback{c: gc, cb: func() {
246281
close(connOpened)
247282
}}
248-
err = cli.el.poller.Trigger(queue.HighPriority, cli.el.register, ccb)
283+
err = el.poller.Trigger(queue.HighPriority, el.register, ccb)
249284
if err != nil {
250285
gc.Close() //nolint:errcheck
251286
return nil, err
252287
}
253-
254288
<-connOpened
289+
255290
return gc, nil
256291
}

0 commit comments

Comments
 (0)