From b90f442fc6e1ff1d4e1c77e7eb59a4c2958453e2 Mon Sep 17 00:00:00 2001 From: xhylgogo <1329172872@qq.com> Date: Thu, 13 Jun 2024 17:00:40 +0800 Subject: [PATCH 1/2] feat(listener): added windows pipe connection support --- examples/windows_pipe/main.go | 57 ++++++++++++++++++ go.mod | 5 +- go.sum | 2 + listeners/pipe_windows.go | 102 +++++++++++++++++++++++++++++++++ listeners/pipe_windows_test.go | 98 +++++++++++++++++++++++++++++++ 5 files changed, 263 insertions(+), 1 deletion(-) create mode 100644 examples/windows_pipe/main.go create mode 100644 listeners/pipe_windows.go create mode 100644 listeners/pipe_windows_test.go diff --git a/examples/windows_pipe/main.go b/examples/windows_pipe/main.go new file mode 100644 index 00000000..01ae8a66 --- /dev/null +++ b/examples/windows_pipe/main.go @@ -0,0 +1,57 @@ +package main + +import ( + "log" + "os" + "os/signal" + "syscall" + + mqtt "github.com/mochi-mqtt/server/v2" + "github.com/mochi-mqtt/server/v2/hooks/auth" + "github.com/mochi-mqtt/server/v2/listeners" +) + +func main() { + sigs := make(chan os.Signal, 1) + done := make(chan bool, 1) + signal.Notify(sigs, syscall.SIGINT, syscall.SIGTERM) + go func() { + <-sigs + done <- true + }() + + // An example of configuring various server options... + options := &mqtt.Options{ + // InflightTTL: 60 * 15, // Set an example custom 15-min TTL for inflight messages + } + + server := mqtt.New(options) + + // For security reasons, the default implementation disallows all connections. + // If you want to allow all connections, you must specifically allow it. + err := server.AddHook(new(auth.AllowHook), nil) + if err != nil { + log.Fatal(err) + } + + tcp := listeners.NewWindowsPipe(listeners.Config{ + ID: "t1", + Address: `\\.\pipe\mypipename`, + }) + err = server.AddListener(tcp) + if err != nil { + log.Fatal(err) + } + + go func() { + err := server.Serve() + if err != nil { + log.Fatal(err) + } + }() + + <-done + server.Log.Warn("caught signal, stopping...") + _ = server.Close() + server.Log.Info("main.go finished") +} diff --git a/go.mod b/go.mod index 2d90f776..ecaca4ca 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ module github.com/mochi-mqtt/server/v2 -go 1.21 +go 1.21.0 + +toolchain go1.22.2 require ( github.com/alicebob/miniredis/v2 v2.23.0 @@ -11,6 +13,7 @@ require ( github.com/jinzhu/copier v0.3.5 github.com/rs/xid v1.4.0 github.com/stretchr/testify v1.8.1 + github.com/xlango/npipe v1.0.0 go.etcd.io/bbolt v1.3.5 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 6a835a58..878fddd5 100644 --- a/go.sum +++ b/go.sum @@ -267,6 +267,8 @@ github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/stretchr/testify v1.8.1 h1:w7B6lhMri9wdJUVmEZPGGhZzrYTPvgJArz7wNPgYKsk= github.com/stretchr/testify v1.8.1/go.mod h1:w2LPCIKwWwSfY2zedu0+kehJoqGctiVI29o6fzry7u4= +github.com/xlango/npipe v1.0.0 h1:/7jTCcaVNV0QWxBy7547dWZyKZijkK4Es1YgTM/OEvg= +github.com/xlango/npipe v1.0.0/go.mod h1:eDo6fWgNGHRpketf5LgDeFH9xvwSvl33nrcC1MhbF/Q= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.32/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= diff --git a/listeners/pipe_windows.go b/listeners/pipe_windows.go new file mode 100644 index 00000000..02c15421 --- /dev/null +++ b/listeners/pipe_windows.go @@ -0,0 +1,102 @@ +// SPDX-License-Identifier: MIT +// SPDX-FileCopyrightText: 2024 xlango, natefinch +// SPDX-FileContributor: xiang_yuanlang@topsec.com.cn + +package listeners + +import ( + "github.com/xlango/npipe" + "log/slog" + "net" + "sync" + "sync/atomic" +) + +type WindowsPipe struct { + sync.RWMutex + id string // the internal id of the listener + address string // the network address to bind to + listen net.Listener // a net.Listener which will listen for new clients + config Config // configuration values for the listener + log *slog.Logger // server logger + end uint32 // ensure the close methods are only called once +} + +// NewWindowsPipe initializes and returns a new WindowsPipe listener, listening on an address. +func NewWindowsPipe(config Config) *WindowsPipe { + return &WindowsPipe{ + id: config.ID, + address: config.Address, + config: config, + } +} + +// ID returns the id of the listener. +func (l *WindowsPipe) ID() string { + return l.id +} + +// Address returns the address of the listener. +func (l *WindowsPipe) Address() string { + if l.listen != nil { + return l.listen.Addr().String() + } + return l.address +} + +// Protocol returns the address of the listener. +func (l *WindowsPipe) Protocol() string { + return "WindowsPipe" +} + +// Init initializes the listener. +func (l *WindowsPipe) Init(log *slog.Logger) error { + l.log = log + + var err error + + l.listen, err = npipe.Listen(l.address) + + return err +} + +// Serve starts waiting for new WindowsPipe connections, and calls the establish +// connection callback for any received. +func (l *WindowsPipe) Serve(establish EstablishFn) { + for { + if atomic.LoadUint32(&l.end) == 1 { + return + } + + conn, err := l.listen.Accept() + if err != nil { + return + } + + if atomic.LoadUint32(&l.end) == 0 { + go func() { + err = establish(l.id, conn) + if err != nil { + l.log.Warn("", "error", err) + } + }() + } + } +} + +// Close closes the listener and any client connections. +func (l *WindowsPipe) Close(closeClients CloseFn) { + l.Lock() + defer l.Unlock() + + if atomic.CompareAndSwapUint32(&l.end, 0, 1) { + closeClients(l.id) + } + + if l.listen != nil { + err := l.listen.Close() + if err != nil { + return + } + } +} diff --git a/listeners/pipe_windows_test.go b/listeners/pipe_windows_test.go new file mode 100644 index 00000000..85b88e1e --- /dev/null +++ b/listeners/pipe_windows_test.go @@ -0,0 +1,98 @@ +package listeners + +import ( + "errors" + "github.com/stretchr/testify/require" + "github.com/xlango/npipe" + "net" + "testing" + "time" +) + +const testWindowsPipe = `\\.\pipe\mypipename` + +var ( + windowsPipeConfig = Config{ID: "t1", Address: testWindowsPipe} +) + +func TestNewWindowsPipe(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + require.Equal(t, "t1", l.id) + require.Equal(t, testWindowsPipe, l.address) +} + +func TestWindowsPipeID(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + require.Equal(t, "t1", l.ID()) +} + +func TestWindowsPipeAddress(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + require.Equal(t, testWindowsPipe, l.Address()) +} + +func TestWindowsPipeProtocol(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + require.Equal(t, "WindowsPipe", l.Protocol()) +} + +func TestWindowsPipeInit(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + err := l.Init(logger) + l.Close(MockCloser) + require.NoError(t, err) + + t2Config := windowsPipeConfig + t2Config.ID = "t2" + l2 := NewWindowsPipe(t2Config) + err = l2.Init(logger) + l2.Close(MockCloser) + require.NoError(t, err) +} + +func TestWindowsPipeServeAndClose(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + err := l.Init(logger) + require.NoError(t, err) + + o := make(chan bool) + go func(o chan bool) { + l.Serve(MockEstablisher) + o <- true + }(o) + + time.Sleep(time.Millisecond) + + var closed bool + l.Close(func(id string) { + closed = true + }) + + require.True(t, closed) + <-o + + l.Close(MockCloser) // coverage: close closed + l.Serve(MockEstablisher) // coverage: serve closed +} + +func TestWindowsPipeEstablishThenEnd(t *testing.T) { + l := NewWindowsPipe(windowsPipeConfig) + err := l.Init(logger) + require.NoError(t, err) + + o := make(chan bool) + established := make(chan bool) + go func() { + l.Serve(func(id string, c net.Conn) error { + established <- true + return errors.New("ending") // return an error to exit immediately + }) + o <- true + }() + + time.Sleep(time.Millisecond) + _, _ = npipe.Dial(l.listen.Addr().String()) + require.Equal(t, true, <-established) + l.Close(MockCloser) + <-o +} From f7cf1f315e507a47b878630f592974ff48f541ff Mon Sep 17 00:00:00 2001 From: xhylgogo <1329172872@qq.com> Date: Fri, 14 Jun 2024 15:29:19 +0800 Subject: [PATCH 2/2] fix(examples): add example windows tag --- examples/windows_pipe/main.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/examples/windows_pipe/main.go b/examples/windows_pipe/main.go index 01ae8a66..55f5e289 100644 --- a/examples/windows_pipe/main.go +++ b/examples/windows_pipe/main.go @@ -1,3 +1,6 @@ +//go:build windows +// +build windows + package main import (