Skip to content

Commit

Permalink
feat: implement gnet on Windows
Browse files Browse the repository at this point in the history
Fixes #339
  • Loading branch information
panjf2000 committed May 11, 2023
1 parent edb9318 commit 96f691f
Show file tree
Hide file tree
Showing 29 changed files with 1,754 additions and 377 deletions.
12 changes: 8 additions & 4 deletions .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ jobs:
os:
- ubuntu-latest
- macos-latest
#- windows-latest
name: Run golangci-lint
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -42,14 +43,17 @@ jobs:
uses: golangci/golangci-lint-action@v3
with:
version: v1.51.2
args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot
args: -v -E gofumpt -E gocritic -E misspell -E revive -E godot --timeout 5m
test:
needs: lint
strategy:
fail-fast: false
matrix:
go: [1.17, 1.18, 1.19]
os: [ubuntu-latest, macos-latest]
go: ['1.17', '1.18', '1.19', '1.20']
os:
- ubuntu-latest
- macos-latest
- windows-latest
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os }}
steps:
Expand All @@ -61,7 +65,7 @@ jobs:
- name: Setup Go
uses: actions/setup-go@v3
with:
go-version: '^1.17'
go-version: ${{ matrix.go }}

- name: Print Go environment
id: go-env
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/test_poll_opt.yml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ jobs:
strategy:
fail-fast: false
matrix:
go: [1.17, 1.18, 1.19]
go: ['1.17', '1.18', '1.19', '1.20']
os: [ubuntu-latest, macos-latest]
name: Go ${{ matrix.go }} @ ${{ matrix.os }}
runs-on: ${{ matrix.os }}
Expand Down
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ English | [中文](README_ZH.md)
- [x] Supporting two event-driven mechanisms: `epoll` on **Linux** and `kqueue` on **FreeBSD/DragonFly/Darwin**
- [x] Flexible ticker event
- [x] Implementation of `gnet` Client
- [ ] **Windows** platform support ([gnet v1](https://github.com/panjf2000/gnet/tree/1.x) is available on Windows, v2 not yet)
- [x] **Windows** platform support (Not for production use, only for debugging and testing)
- [ ] **TLS** support
- [ ] [io_uring](https://kernel.dk/io_uring.pdf) support

Expand Down
2 changes: 1 addition & 1 deletion README_ZH.md
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
- [x] 支持两种事件驱动机制:**Linux** 里的 `epoll` 以及 **FreeBSD/DragonFly/Darwin** 里的 `kqueue`
- [x] 灵活的事件定时器
- [x] 实现 `gnet` 客户端
- [ ] 支持 **Windows** 平台 ([gnet v1](https://github.com/panjf2000/gnet/tree/1.x) 支持 Windows,v2 暂时不支持)
- [x] 支持 **Windows** 平台 (非生产环境使用,只用来调试和测试)
- [ ] 支持 **TLS**
- [ ] 支持 [io_uring](https://kernel.dk/io_uring.pdf)

Expand Down
File renamed without changes.
71 changes: 71 additions & 0 deletions acceptor_windows.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
/*
* Copyright (c) 2023 Andy Pan.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/

package gnet

import (
"net"
"runtime"
)

func (eng *engine) listen() (err error) {
if eng.opts.LockOSThread {
runtime.LockOSThread()
defer runtime.UnlockOSThread()
}

defer func() { eng.shutdown(err) }()

var buffer [0x10000]byte
for {
if eng.ln.pc != nil {
// Read data from UDP socket.
n, addr, e := eng.ln.pc.ReadFrom(buffer[:])
if e != nil {
err = e
eng.opts.Logger.Errorf("failed to receive data from UDP fd due to error:%v", err)
return
}

el := eng.lb.next(addr)
c := newUDPConn(el, eng.ln.addr, addr)
el.ch <- packUDPConn(c, buffer[:n])
} else {
// Accept TCP socket.
tc, e := eng.ln.ln.Accept()
if e != nil {
err = e
eng.opts.Logger.Errorf("Accept() fails due to error: %v", err)
return
}
el := eng.lb.next(tc.RemoteAddr())
c := newTCPConn(tc, el)
el.ch <- c
go func(c *conn, tc net.Conn, el *eventloop) {
var buffer [0x10000]byte
for {
n, err := tc.Read(buffer[:])
if err != nil {
el.ch <- &netErr{c, err}
return
}
el.ch <- packTCPConn(c, buffer[:n])
}
}(c, tc, el)
}
}
}
8 changes: 4 additions & 4 deletions client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@
// See the License for the specific language governing permissions and
// limitations under the License.

//go:build linux || freebsd || dragonfly || darwin
// +build linux freebsd dragonfly darwin
//go:build linux || freebsd || dragonfly || darwin || windows
// +build linux freebsd dragonfly darwin windows

package gnet

Expand Down Expand Up @@ -264,6 +264,7 @@ func (s *testClientServer) OnTraffic(c Conn) (action Action) {
}

func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
delay = time.Second / 5
if atomic.CompareAndSwapInt32(&s.started, 0, 1) {
for i := 0; i < s.nclients; i++ {
atomic.AddInt32(&s.clientActive, 1)
Expand All @@ -278,7 +279,6 @@ func (s *testClientServer) OnTick() (delay time.Duration, action Action) {
action = Shutdown
return
}
delay = time.Second / 5
return
}

Expand Down Expand Up @@ -327,7 +327,7 @@ func startGnetClient(t *testing.T, cli *Client, ev *clientEvents, network, addr
)
if netDial {
var netConn net.Conn
netConn, err = net.Dial(network, addr)
netConn, err = NetDial(network, addr)
require.NoError(t, err)
c, err = cli.Enroll(netConn)
} else {
Expand Down
52 changes: 28 additions & 24 deletions client.go → client_unix.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@ import (
"errors"
"net"
"strconv"
"sync"
"syscall"

"golang.org/x/sync/errgroup"
"golang.org/x/sys/unix"

"github.com/panjf2000/gnet/v2/internal/math"
Expand All @@ -43,7 +43,7 @@ type Client struct {
}

// NewClient creates an instance of Client.
func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err error) {
func NewClient(eh EventHandler, opts ...Option) (cli *Client, err error) {
options := loadOptions(opts...)
cli = new(Client)
cli.opts = options
Expand All @@ -62,18 +62,26 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
if p, err = netpoll.OpenPoller(); err != nil {
return
}
eng := new(engine)
eng.opts = options
eng.eventHandler = eventHandler
eng.ln = &listener{network: "udp"}
eng.cond = sync.NewCond(&sync.Mutex{})

shutdownCtx, shutdown := context.WithCancel(context.Background())
eng := engine{
ln: &listener{network: "udp"},
opts: options,
eventHandler: eh,
workerPool: struct {
*errgroup.Group
shutdownCtx context.Context
shutdown context.CancelFunc
}{&errgroup.Group{}, shutdownCtx, shutdown},
}
if options.Ticker {
eng.tickerCtx, eng.cancelTicker = context.WithCancel(context.Background())
eng.ticker.ctx, eng.ticker.cancel = context.WithCancel(context.Background())
}
el := eventloop{
ln: eng.ln,
engine: &eng,
poller: p,
}
el := new(eventloop)
el.ln = eng.ln
el.engine = eng
el.poller = p

rbc := options.ReadBufferCap
switch {
Expand All @@ -97,36 +105,32 @@ func NewClient(eventHandler EventHandler, opts ...Option) (cli *Client, err erro
el.buffer = make([]byte, options.ReadBufferCap)
el.udpSockets = make(map[int]*conn)
el.connections = make(map[int]*conn)
el.eventHandler = eventHandler
cli.el = el
el.eventHandler = eh
cli.el = &el
return
}

// Start starts the client event-loop, handing IO events.
func (cli *Client) Start() error {
cli.el.eventHandler.OnBoot(Engine{})
cli.el.engine.wg.Add(1)
go func() {
cli.el.run(cli.opts.LockOSThread)
cli.el.engine.wg.Done()
}()
cli.el.engine.workerPool.Go(cli.el.run)
// Start the ticker.
if cli.opts.Ticker {
go cli.el.ticker(cli.el.engine.tickerCtx)
go cli.el.ticker(cli.el.engine.ticker.ctx)
}
return nil
}

// Stop stops the client event-loop.
func (cli *Client) Stop() (err error) {
logging.Error(cli.el.poller.UrgentTrigger(func(_ interface{}) error { return gerrors.ErrEngineShutdown }, nil))
cli.el.engine.wg.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{})
// Stop the ticker.
if cli.opts.Ticker {
cli.el.engine.cancelTicker()
cli.el.engine.ticker.cancel()
}
_ = cli.el.engine.workerPool.Wait()
logging.Error(cli.el.poller.Close())
cli.el.eventHandler.OnShutdown(Engine{})
if cli.logFlush != nil {
err = cli.logFlush()
}
Expand Down
Loading

0 comments on commit 96f691f

Please sign in to comment.