diff --git a/.github/workflows/release.yaml b/.github/workflows/release.yaml index 30bdec7..3006d69 100644 --- a/.github/workflows/release.yaml +++ b/.github/workflows/release.yaml @@ -12,9 +12,9 @@ jobs: name: goreleaser steps: - name: Set up Go - uses: actions/setup-go@v1 + uses: actions/setup-go@v3 with: - go-version: 1.13 + go-version: 1.19 id: go - name: Check out code into the Go module directory @@ -40,7 +40,7 @@ jobs: echo "${DOCKER_PASSWORD}" | docker login --username "${DOCKER_USERNAME}" --password-stdin - name: goreleaser - uses: goreleaser/goreleaser-action@master + uses: goreleaser/goreleaser-action@v3 with: args: release workdir: ${{github.workspace}}/mqtt-mirror @@ -52,7 +52,7 @@ jobs: - name: Create helm package uses: stefanprodan/kube-tools@v1 with: - helm: 2.16.1 + helm: 2.17.0 command: | export SEMVER=${GITHUB_REF#refs/tags/v} diff --git a/.github/workflows/test.yaml b/.github/workflows/test.yaml index 40fb0fe..fba218d 100644 --- a/.github/workflows/test.yaml +++ b/.github/workflows/test.yaml @@ -4,7 +4,7 @@ jobs: test: strategy: matrix: - go-version: [1.14.x] + go-version: [1.18.x, 1.19.x] platform: [ubuntu-latest] runs-on: ${{ matrix.platform }} steps: diff --git a/.goreleaser.yml b/.goreleaser.yml index 7f141d0..1dcb7b1 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -40,8 +40,7 @@ dockers: # GOARM of the built binary that should be used. goarm: '' - # Name templates of the built binaries that should be used. - binaries: + ids: - mqtt-mirror # Templates of the Docker image names. @@ -70,7 +69,7 @@ dockers: brews: - # Github repository to push the tap to. - github: + tap: owner: 4nte name: homebrew-tap diff --git a/bin/mqtt-mirror b/bin/mqtt-mirror new file mode 100755 index 0000000..84786a9 Binary files /dev/null and b/bin/mqtt-mirror differ diff --git a/chart/mqtt-mirror/templates/deployment.yaml b/chart/mqtt-mirror/templates/deployment.yaml index c99fca7..1adf6d4 100644 --- a/chart/mqtt-mirror/templates/deployment.yaml +++ b/chart/mqtt-mirror/templates/deployment.yaml @@ -39,6 +39,10 @@ spec: - name: VERBOSE value: "{{ .Values.verbose }}" {{end}} + {{ if .Values.name }} + - name: NAME + value: "{{ .Values.name }}" + {{end}} resources: {{- toYaml .Values.resources | nindent 12 }} {{- with .Values.nodeSelector }} diff --git a/chart/mqtt-mirror/values.yaml b/chart/mqtt-mirror/values.yaml index e0b05a9..2bf8ce4 100644 --- a/chart/mqtt-mirror/values.yaml +++ b/chart/mqtt-mirror/values.yaml @@ -10,6 +10,7 @@ image: pullPolicy: IfNotPresent verbose: false +name: "" # name of the mqtt-mirror instance (used as clientID) mqtt: source: "" diff --git a/cmd/root.go b/cmd/root.go index 509aa2f..3a42417 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -2,14 +2,16 @@ package cmd import ( "fmt" - "github.com/4nte/mqtt-mirror/internal" - "github.com/pkg/errors" - "github.com/spf13/cobra" - "github.com/spf13/viper" "net/url" "os" "os/signal" "syscall" + + "github.com/4nte/mqtt-mirror/internal" + "github.com/dchest/uniuri" + "github.com/pkg/errors" + "github.com/spf13/cobra" + "github.com/spf13/viper" ) var ( // Flags @@ -20,6 +22,8 @@ var ( // Flags targetURI string configFile string + instanceName string + Topics []string ) @@ -93,6 +97,11 @@ var rootCmd = &cobra.Command{ if target == "" { target = viper.GetString("target") } + var instanceName = viper.GetString("name") + if len(instanceName) == 0 { + instanceName = uniuri.NewLen(8) + + } topicFilter := viper.GetStringSlice("topic_filter") isVerbose := viper.GetBool("verbose") @@ -106,7 +115,7 @@ var rootCmd = &cobra.Command{ panic(err) } - terminate, err := internal.Mirror(*sourceURL, *targetURL, topicFilter, isVerbose, 0) + terminate, err := internal.Mirror(*sourceURL, *targetURL, topicFilter, isVerbose, 0, instanceName) if err != nil { panic(err) } @@ -124,6 +133,8 @@ func init() { rootCmd.PersistentFlags().StringVar(&sourceURI, "source", "", "mqtt source URI") rootCmd.PersistentFlags().StringVar(&targetURI, "target", "", "mqtt target URI") + rootCmd.PersistentFlags().StringVarP(&instanceName, "name", "", "", "mqtt-mirror instance name. If not specified, will be randomly generated") + rootCmd.PersistentFlags().StringVar(&targetURI, "config", "", "config file") err := viper.BindPFlag("source", rootCmd.PersistentFlags().Lookup("source")) @@ -134,6 +145,7 @@ func init() { viper.BindPFlag("target", rootCmd.PersistentFlags().Lookup("target")) viper.BindPFlag("topic_filter", rootCmd.PersistentFlags().Lookup("topic_filter")) viper.BindPFlag("verbose", rootCmd.PersistentFlags().Lookup("verbose")) + viper.BindPFlag("name", rootCmd.PersistentFlags().Lookup("name")) } func initConfig() { diff --git a/go-mirror b/go-mirror new file mode 100755 index 0000000..1ffc33c Binary files /dev/null and b/go-mirror differ diff --git a/go.mod b/go.mod index 75ee517..2033ccb 100644 --- a/go.mod +++ b/go.mod @@ -1,15 +1,62 @@ module github.com/4nte/mqtt-mirror -go 1.15 +go 1.19 require ( + github.com/dchest/uniuri v1.2.0 github.com/docker/go-connections v0.4.0 - github.com/eclipse/paho.mqtt.golang v1.2.0 + github.com/eclipse/paho.mqtt.golang v1.4.3 github.com/pkg/errors v0.9.1 github.com/spf13/cobra v0.0.5 github.com/spf13/viper v1.3.2 - github.com/stretchr/testify v1.6.1 + github.com/stretchr/testify v1.8.0 github.com/surgemq/message v0.0.0-20151017233315-2b7ca1ac6121 github.com/testcontainers/testcontainers-go v0.9.0 - golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa // indirect +) + +require ( + github.com/gorilla/websocket v1.5.0 // indirect + go.uber.org/atomic v1.7.0 // indirect + go.uber.org/multierr v1.6.0 // indirect +) + +require ( + github.com/Microsoft/go-winio v0.4.11 // indirect + github.com/Microsoft/hcsshim v0.8.6 // indirect + github.com/cenkalti/backoff v2.2.1+incompatible // indirect + github.com/containerd/containerd v1.4.1 // indirect + github.com/containerd/continuity v0.0.0-20190426062206-aaeac12a7ffc // indirect + github.com/davecgh/go-spew v1.1.1 // indirect + github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible // indirect + github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible // indirect + github.com/docker/go-units v0.3.3 // indirect + github.com/fsnotify/fsnotify v1.4.7 // indirect + github.com/gogo/protobuf v1.2.0 // indirect + github.com/golang/protobuf v1.3.3 // indirect + github.com/google/uuid v1.1.2 // indirect + github.com/hashicorp/hcl v1.0.0 // indirect + github.com/inconshreveable/mousetrap v1.0.0 // indirect + github.com/konsorten/go-windows-terminal-sequences v1.0.1 // indirect + github.com/magiconair/properties v1.8.0 // indirect + github.com/mitchellh/mapstructure v1.1.2 // indirect + github.com/opencontainers/go-digest v1.0.0-rc1 // indirect + github.com/opencontainers/image-spec v1.0.1 // indirect + github.com/opencontainers/runc v0.1.1 // indirect + github.com/pelletier/go-toml v1.2.0 // indirect + github.com/pmezard/go-difflib v1.0.0 // indirect + github.com/sirupsen/logrus v1.2.0 // indirect + github.com/spf13/afero v1.1.2 // indirect + github.com/spf13/cast v1.3.0 // indirect + github.com/spf13/jwalterweatherman v1.0.0 // indirect + github.com/spf13/pflag v1.0.3 // indirect + go.uber.org/zap v1.23.0 + golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 // indirect + golang.org/x/net v0.8.0 // indirect + golang.org/x/sync v0.1.0 // indirect + golang.org/x/sys v0.6.0 // indirect + golang.org/x/text v0.8.0 // indirect + google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 // indirect + google.golang.org/grpc v1.17.0 // indirect + gopkg.in/yaml.v2 v2.3.0 // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect ) diff --git a/go.sum b/go.sum index 9b41dae..32a8a33 100644 --- a/go.sum +++ b/go.sum @@ -8,6 +8,7 @@ github.com/Microsoft/go-winio v0.4.11/go.mod h1:VhR8bwka0BXejwEJY73c50VrPtXAaKcy github.com/Microsoft/hcsshim v0.8.6 h1:ZfF0+zZeYdzMIVMZHKtDKJvLHj76XCuVae/jNkjj0IA= github.com/Microsoft/hcsshim v0.8.6/go.mod h1:Op3hHsoHPAvb6lceZHDtd9OkTew38wNoXnJs8iY7rUg= github.com/armon/consul-api v0.0.0-20180202201655-eb2c6b5be1b6/go.mod h1:grANhF5doyWs3UAsr3K4I6qtAmlQcZDesFNEHPZAzj8= +github.com/benbjohnson/clock v1.1.0 h1:Q92kusRqC1XV2MjkWETPvjJVqKetz1OzxZB7mHJLju8= github.com/cenkalti/backoff v2.2.1+incompatible h1:tNowT99t7UNflLxfYYSlKYsBpXdEet03Pg2g16Swow4= github.com/cenkalti/backoff v2.2.1+incompatible/go.mod h1:90ReRw6GdpyfrHakVjL/QHaoyV4aDUVVkXQJJJ3NXXM= github.com/client9/misspell v0.3.4/go.mod h1:qj6jICC3Q7zFZvVWo7KLAzC3yx5G7kyvSDkc90ppPyw= @@ -22,6 +23,8 @@ github.com/cpuguy83/go-md2man v1.0.10/go.mod h1:SmD6nW6nTyfqj6ABTjUi3V3JVMnlJmwc github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/dchest/uniuri v1.2.0 h1:koIcOUdrTIivZgSLhHQvKgqdWZq5d7KdMEWF1Ud6+5g= +github.com/dchest/uniuri v1.2.0/go.mod h1:fSzm4SLHzNZvWLvWJew423PhAzkpNQYq+uNLq4kxhkY= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible h1:dvc1KSkIYTVjZgHf/CTC2diTYC8PzhaA5sFISRfNVrE= github.com/docker/distribution v2.7.1-0.20190205005809-0d3efadf0154+incompatible/go.mod h1:J2gT2udsDAN96Uj4KfcMRqY0/ypR+oyYUYmja8H+y+w= github.com/docker/docker v17.12.0-ce-rc1.0.20200916142827-bd33bbf0497b+incompatible h1:SiUATuP//KecDjpOK2tvZJgeScYAklvyjfK8JZlU6fo= @@ -30,8 +33,8 @@ github.com/docker/go-connections v0.4.0 h1:El9xVISelRB7BuFusrZozjnkIM5YnzCViNKoh github.com/docker/go-connections v0.4.0/go.mod h1:Gbd7IOopHjR8Iph03tsViu4nIes5XhDvyHbTtUxmeec= github.com/docker/go-units v0.3.3 h1:Xk8S3Xj5sLGlG5g67hJmYMmUgXv5N4PhkjJHHqrwnTk= github.com/docker/go-units v0.3.3/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= -github.com/eclipse/paho.mqtt.golang v1.2.0 h1:1F8mhG9+aO5/xpdtFkW4SxOJB67ukuDC3t2y2qayIX0= -github.com/eclipse/paho.mqtt.golang v1.2.0/go.mod h1:H9keYFcgq3Qr5OUJm/JZI/i6U7joQ8SYLhZwfeOo6Ts= +github.com/eclipse/paho.mqtt.golang v1.4.3 h1:2kwcUGn8seMUfWndX0hGbvH8r7crgcJguQNCyp70xik= +github.com/eclipse/paho.mqtt.golang v1.4.3/go.mod h1:CSYvoAlsMkhYOXh/oKyxa8EcBci6dVkLCbo5tTC1RIE= github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/gin-contrib/sse v0.1.0/go.mod h1:RHrZQHXnP2xjPF+u1gW/2HnVO7nvIa9PG3Gm+fLHvGI= @@ -60,9 +63,10 @@ github.com/gorilla/context v1.1.1 h1:AWwleXJkX/nhcU9bZSnZoi3h/qGYqQAGhq6zZe/aQW8 github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51q0aT7Yg= github.com/gorilla/mux v1.6.2 h1:Pgr17XVTNXAk3q/r4CpKzC5xBM/qW1uVLV+IhRZpIIk= github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/hashicorp/hcl v1.0.0 h1:0Anlzjpi4vEasTeNFn2mLJgTSwt0+6sfsiTG8qcWGx4= github.com/hashicorp/hcl v1.0.0/go.mod h1:E5yfLk+7swimpb2L/Alb/PJmXilQ/rhwaUYs4T20WEQ= -github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI= github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU= github.com/inconshreveable/mousetrap v1.0.0 h1:Z8tu5sraLXCXIcARxBp/8cbvlwVa7Z1NHg9XEKhtSvM= github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8= @@ -79,7 +83,6 @@ github.com/leodido/go-urn v1.2.0/go.mod h1:+8+nEpDfqqsY+g338gtMEUOtuK+4dEMhiQEgx github.com/magiconair/properties v1.8.0 h1:LLgXmsheXeRoUOBOjtwPQCWIYqM/LU1ayDtDePerRcY= github.com/magiconair/properties v1.8.0/go.mod h1:PppfXfuXeibc/6YijjN8zIbojt8czPbwD3XqdrwzmxQ= github.com/mattn/go-isatty v0.0.12/go.mod h1:cbi8OIDigv2wuxKPP5vlRcQ1OAZbq2CE4Kysco4FUpU= -github.com/mitchellh/go-homedir v1.1.0 h1:lukF9ziXFxDFPkA1vsr5zpc1XuPDn/wFntq5mG+4E0Y= github.com/mitchellh/go-homedir v1.1.0/go.mod h1:SfyaCUpYCn1Vlf4IUYiD9fPX4A5wJrkLzIz1N1q0pr0= github.com/mitchellh/mapstructure v1.1.2 h1:fmNYVwqnSfB9mZU6OS2O6GsXM+wcskZDuKQzvN1EDeE= github.com/mitchellh/mapstructure v1.1.2/go.mod h1:FVVH3fgwuzCH5S8UJGiWEs2h04kUh9fWfEaFds41c1Y= @@ -88,9 +91,7 @@ github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lN github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c h1:nXxl5PrvVm2L/wCy8dQu6DMTwH4oIuGN8GJDAlqDdVE= github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c/go.mod h1:BbKIizmSmc5MMPqRYbxO4ZU0S0+P200+tUnFx7PXmsc= github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/ginkgo v1.8.0 h1:VkHVNpR4iVnU8XQR6DBm8BqYjN7CRzw+xKUbVVbbW9w= github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE= -github.com/onsi/gomega v1.5.0 h1:izbySO9zDPmjJ8rDjLvkA2zJHIo+HkYXHnf7eN7SSyo= github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY= github.com/opencontainers/go-digest v1.0.0-rc1 h1:WzifXhOVOEOuFYOJAW6aQqW0TooG2iki3E3Ii+WN7gQ= github.com/opencontainers/go-digest v1.0.0-rc1/go.mod h1:cMLVZDEM3+U2I4VmLI6N8jQYUd2OVphdqWwCJHrFt2s= @@ -122,12 +123,14 @@ github.com/spf13/viper v1.3.2 h1:VUFqw5KcqRf7i70GOzW7N+Q7+gxVBkSSqiXB12+JQ4M= github.com/spf13/viper v1.3.2/go.mod h1:ZiWeW+zYFKm7srdB9IoDzzZXaJaI5eL9QjNiN/DMA2s= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= github.com/stretchr/objx v0.1.1/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w= +github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= -github.com/stretchr/testify v1.6.1 h1:hDPOHmpOpP40lSULcqw7IrRb/u7w6RpDC9399XyoNd0= github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.7.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= +github.com/stretchr/testify v1.8.0 h1:pSgiaMZlXftHpm5L7V1+rVB+AZJydKsMxsQBIJw4PKk= +github.com/stretchr/testify v1.8.0/go.mod h1:yNjHg4UonilssWZ8iaSj1OCr/vHnekPRkoO+kdMU+MU= github.com/surgemq/message v0.0.0-20151017233315-2b7ca1ac6121 h1:XPyDni7REGO4jY31bhtnRU6WSnj4r2eCYQYMeUBShx4= github.com/surgemq/message v0.0.0-20151017233315-2b7ca1ac6121/go.mod h1:aB2Klx/7IKkvtychYbb2GsLDXJMiRgGHbM688Y0eIds= github.com/testcontainers/testcontainers-go v0.9.0 h1:ZyftCfROjGrKlxk3MOUn2DAzWrUtzY/mj17iAkdUIvI= @@ -136,6 +139,13 @@ github.com/ugorji/go v1.1.7/go.mod h1:kZn38zHttfInRq0xu/PH0az30d+z6vm202qpg1oXVM github.com/ugorji/go/codec v0.0.0-20181204163529-d75b2dcb6bc8/go.mod h1:VFNgLljTbGfSG7qAOspJ7OScBnGdDN/yBr0sguwnwf0= github.com/ugorji/go/codec v1.1.7/go.mod h1:Ax+UKWsSmolVDwsd+7N3ZtXu+yMGCf907BLYF3GoBXY= github.com/xordataexchange/crypt v0.0.3-0.20170626215501-b2862e3d0a77/go.mod h1:aYKd//L2LvnjZzWKhF00oedf4jCCReLcmhLdhm1A27Q= +go.uber.org/atomic v1.7.0 h1:ADUqmZGgLDDfbSL9ZmPxKTybcoEYHgpYfELNoN+7hsw= +go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= +go.uber.org/goleak v1.1.11 h1:wy28qYRKZgnJTxGxvye5/wgWr1EKjmUDGYox5mGlRlI= +go.uber.org/multierr v1.6.0 h1:y6IPFStTAIT5Ytl7/XYmHvzXQ7S3g/IeZW9hyZ5thw4= +go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= +go.uber.org/zap v1.23.0 h1:OjGQ5KQDEUawVHxNwQgPpiypGHOxo2mNZsOqTak4fFY= +go.uber.org/zap v1.23.0/go.mod h1:D+nX8jyLsMHMYrln8A0rJjFt/T/9/bGgIhAqxv5URuY= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181203042331-505ab145d0a9/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2 h1:VklqNMn3ovrHsnt90PveolxSbWFaJdECFbxSq0Mqo2M= @@ -143,24 +153,24 @@ golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACk golang.org/x/lint v0.0.0-20181026193005-c67002cb31c3/go.mod h1:UVdnD1Gm6xHRNCYTkRU2/jEulfH38KcIWyp/GAMgvoE= golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= golang.org/x/net v0.0.0-20180906233101-161cd47e91fd/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4= -golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa h1:F+8P+gmewFQYRk6JoLQLwjBCTu3mcIURZfNkVweuRKA= -golang.org/x/net v0.0.0-20200114155413-6afb5195e5aa/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s= +golang.org/x/net v0.8.0 h1:Zrh2ngAOFYneWTAIAPethzeaQLuHwhuBkuV6ZiRnUaQ= +golang.org/x/net v0.8.0/go.mod h1:QVkue5JL9kW//ek3r6jTKnTFis1tRmNAW2P1shuFdJc= golang.org/x/oauth2 v0.0.0-20180821212333-d2e6202438be/go.mod h1:N/0e6XlmueqKjAGxoOufVs8QHGRruUQn6yWY3a++T0U= -golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f h1:wMNYb4v58l5UBM7MYRLPG6ZhfOqbKu7X5eyFl8ZhKvA= golang.org/x/sync v0.0.0-20180314180146-1d60e4601c6f/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.1.0 h1:wsuoTGHzEhffawBOhz5CYhcrV4IdKZbEyZjBMuTp12o= +golang.org/x/sync v0.1.0/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180909124046-d0be0721c37e/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a h1:1n5lsVfiQW3yfsRGu98756EH1YthsFqr/5mxHduZW2A= golang.org/x/sys v0.0.0-20181205085412-a5c9d58dba9a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a h1:1BGLXjeY4akVXGgbC9HugT3Jv3hCI0z56oJR5vAMgBU= golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= -golang.org/x/sys v0.0.0-20200116001909-b77594299b42 h1:vEOn+mP2zCOVzKckCZy6YsCtDblrpj/w7B9nxGNELpg= golang.org/x/sys v0.0.0-20200116001909-b77594299b42/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/text v0.3.0 h1:g61tztE5qeGQ89tm6NTjjM9VPIm088od1l6aSorWRWg= +golang.org/x/sys v0.6.0 h1:MVltZSvRTcU2ljQOhs94SXPftV6DCNnZViHeQps87pQ= +golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= -golang.org/x/text v0.3.2 h1:tW2bmiBqwgJj/UpqtC8EpXEZVYOwU0yG4iWbprSVAcs= golang.org/x/text v0.3.2/go.mod h1:bEr9sfX3Q8Zfm5fL9x+3itogRgK3+ptLWKqgva+5dAk= +golang.org/x/text v0.8.0 h1:57P1ETyNKtuIjB4SRd15iJxuhj8Gc416Y78H3qgMh68= +golang.org/x/text v0.8.0/go.mod h1:e1OnstbJyHTd6l/uOt8jFFHp6TRDWZR/bV3emEE/zU8= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c h1:fqgJT0MGcGpPgpWU7VRdRjuArfcOvC4AoJmILihzhDg= golang.org/x/time v0.0.0-20181108054448-85acf8d2951c/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/tools v0.0.0-20180810170437-e96c4e24768d/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= @@ -171,22 +181,19 @@ google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8 h1:Nw54tB0rB7hY/N0 google.golang.org/genproto v0.0.0-20180817151627-c66870c02cf8/go.mod h1:JiN7NxoALGmiZfu7CAH4rXhgtRTLTxftemlI0sWmxmc= google.golang.org/grpc v1.17.0 h1:TRJYBgMclJvGYn2rIMjj+h9KtMt5r1Ij7ODVRIZkwhk= google.golang.org/grpc v1.17.0/go.mod h1:6QZJwpn2B+Zp71q/5VxRsJ6NXXVCE5NRUHRo+f3cWCs= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+SDo693bJlVdllGtEeKM= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= -gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ= gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw= gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw= gopkg.in/yaml.v2 v2.2.2/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.2.8/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= gopkg.in/yaml.v2 v2.3.0 h1:clyUAQHOM3G0M3f5vQj7LuJrETvjVot3Z5el9nffUtU= gopkg.in/yaml.v2 v2.3.0/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c h1:dUUwHk2QECo/6vqA44rthZ8ie2QXMNeKRTHCNY2nXvo= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= +gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gotest.tools v0.0.0-20181223230014-1083505acf35 h1:zpdCK+REwbk+rqjJmHhiCN6iBIigrZ39glqSF0P3KF0= gotest.tools v0.0.0-20181223230014-1083505acf35/go.mod h1:R//lfYlUuTOTfblYI3lGoAAAebUdzjvbmQsuB7Ykd90= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/internal/mirror.go b/internal/mirror.go index 5240a23..e5e99a5 100644 --- a/internal/mirror.go +++ b/internal/mirror.go @@ -1,13 +1,12 @@ package internal import ( - "fmt" - "github.com/4nte/mqtt-mirror/pkg/mqtt" - mqtt2 "github.com/eclipse/paho.mqtt.golang" "net/url" - "strconv" - "strings" "time" + + "github.com/4nte/mqtt-mirror/pkg/mqtt" + mqtt2 "github.com/eclipse/paho.mqtt.golang" + "go.uber.org/zap" ) func createSourceMessageHandler(targetClient mqtt2.Client, verbose bool) mqtt2.MessageHandler { @@ -17,7 +16,7 @@ func createSourceMessageHandler(targetClient mqtt2.Client, verbose bool) mqtt2.M payload := message.Payload() qos := message.Qos() retained := message.Retained() - fmt.Printf("message replicated (%d bytes): topic=%s, QoS=%b, retained=%s\n", len(payload), topic, qos, strconv.FormatBool(retained)) + zap.L().Info("message replicated", zap.Int("bytes_len", len(payload)), zap.String("topic", topic), zap.Int("QoS", int(qos)), zap.Bool("retained", retained)) targetClient.Publish(message.Topic(), message.Qos(), message.Retained(), message.Payload()) } } @@ -48,7 +47,11 @@ func getBrokerHostString(broker url.URL) string { return host } -func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeout time.Duration) (func(), error) { +func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeout time.Duration, instanceName string) (func(), error) { + logger, _ := zap.NewDevelopment() + zap.ReplaceGlobals(logger) + defer logger.Sync() // flushes buf + done := make(chan struct{}) if timeout > 0 { go func() { @@ -56,20 +59,21 @@ func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeo done <- struct{}{} }() } + zap.L().Sugar().Infof("using clientName: %s", instanceName) - fmt.Printf("mirroring traffic (%s) --> (%s)\n", source.Host, target.Host) + zap.L().Info("mirroring traffic", zap.String("source_host", source.Host), zap.String("target_host", target.Host)) sourceHost := getBrokerHostString(source) sourcePassword, _ := source.User.Password() - sourceClient, err := mqtt.NewClient(sourceHost, source.User.Username(), sourcePassword, true) + sourceClient, err := mqtt.NewClient(sourceHost, source.User.Username(), sourcePassword, true, instanceName) if err != nil { return func() {}, err } targetHost := getBrokerHostString(target) targetPassword, _ := target.User.Password() - targetClient, err := mqtt.NewClient(targetHost, target.User.Username(), targetPassword, false) + targetClient, err := mqtt.NewClient(targetHost, target.User.Username(), targetPassword, false, instanceName) if err != nil { return func() {}, err } @@ -78,7 +82,7 @@ func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeo if len(topics) == 0 { // Subscribe to all sourceClient.Subscribe("#", qos, messageHandler) - fmt.Println("mirroring *all* topics") + zap.L().Info("mirroring *all* topics") } else { topicFilterMap := make(map[string]byte) for _, topicFilter := range topics { @@ -87,12 +91,12 @@ func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeo // Subscribe to specified filters sourceClient.SubscribeMultiple(topicFilterMap, messageHandler) - fmt.Printf("mirroring topics: %s\n", strings.Join(topics, ", ")) + zap.L().Info("mirroring messages", zap.Strings("topics", topics)) } terminate := func() { - //sourceClient.Disconnect(0) - //targetClient.Disconnect(0) + sourceClient.Disconnect(0) + targetClient.Disconnect(0) } go func() { <-done @@ -100,5 +104,4 @@ func Mirror(source url.URL, target url.URL, topics []string, verbose bool, timeo }() return terminate, nil - } diff --git a/internal/mirror_test.go b/internal/mirror_test.go index 83bbcb2..eef8f6a 100644 --- a/internal/mirror_test.go +++ b/internal/mirror_test.go @@ -3,16 +3,18 @@ package internal import ( "context" "fmt" + "net/url" + "sync" + "github.com/docker/go-connections/nat" paho "github.com/eclipse/paho.mqtt.golang" "github.com/stretchr/testify/require" "github.com/testcontainers/testcontainers-go/wait" - "net/url" - "sync" + + "path" "github.com/surgemq/message" "github.com/testcontainers/testcontainers-go" - "path" "path/filepath" "runtime" @@ -165,7 +167,7 @@ func TestMirror_withAuth(t *testing.T) { panic(err) } - terminateMirror, err := Mirror(*sourceURL, *destinationURL, []string{}, true, 0) + terminateMirror, err := Mirror(*sourceURL, *destinationURL, []string{}, true, 0, "") require.NoError(t, err) mutex := sync.Mutex{} diff --git a/mqtt-mirror b/mqtt-mirror new file mode 100755 index 0000000..119c44b Binary files /dev/null and b/mqtt-mirror differ diff --git a/pkg/mqtt/client.go b/pkg/mqtt/client.go index 6293bb8..d231ff0 100644 --- a/pkg/mqtt/client.go +++ b/pkg/mqtt/client.go @@ -2,28 +2,32 @@ package mqtt import ( "fmt" - paho "github.com/eclipse/paho.mqtt.golang" "time" + + paho "github.com/eclipse/paho.mqtt.golang" + "go.uber.org/zap" ) -func NewClient(broker string, username string, password string, isSource bool) (paho.Client, error) { - var alias string +func NewClient(broker string, username string, password string, isSource bool, clientName string) (paho.Client, error) { + var role string if isSource { - alias = "source" + role = "source" } else { - alias = "target" + role = "target" + } + + if len(clientName) > 10 { + return nil, fmt.Errorf("client name can have maximum of 10 characters") } - id := fmt.Sprintf("mqtt-mirror-%s", alias) + id := fmt.Sprintf("mqtt-mirror-%s", clientName) - clientOpts := paho.NewClientOptions().AddBroker(broker).SetAutoReconnect(true).SetMaxReconnectInterval(3 * time.Minute).SetUsername(username).SetPassword(password).SetClientID(id) + clientOpts := paho.NewClientOptions().AddBroker(broker).SetAutoReconnect(true).SetMaxReconnectInterval(15 * time.Second).SetUsername(username).SetPassword(password).SetClientID(id) clientOpts.SetOnConnectHandler(func(client paho.Client) { - fmt.Printf("connection established to %s (%s)\n", broker, alias) - // TODO: channel + zap.L().Info("connection established", zap.String("broker_uri", broker), zap.String("role", role)) }) clientOpts.SetConnectionLostHandler(func(i paho.Client, error error) { - fmt.Print(fmt.Errorf("connection lost with %s (%s)\n", broker, alias)) - // TODO: channel + zap.L().Fatal("connection lost", zap.String("broker_uri", broker), zap.String("role", role)) }) client := paho.NewClient(clientOpts) @@ -32,7 +36,7 @@ func NewClient(broker string, username string, password string, isSource bool) ( connTimeout := 15 * time.Second ok := token.WaitTimeout(connTimeout) if !ok { - err := fmt.Errorf("connection timeout exceeded (%s): %s (%s)", connTimeout.String(), broker, alias) + err := fmt.Errorf("connection timeout exceeded (%s): %s (%s)", connTimeout.String(), broker, role) return nil, err }