Skip to content

Commit

Permalink
update:remove old conf method,add default ping to avoid to be closed
Browse files Browse the repository at this point in the history
  • Loading branch information
nizonglong committed Aug 11, 2022
1 parent 614ffeb commit 067027e
Show file tree
Hide file tree
Showing 9 changed files with 91 additions and 41 deletions.
3 changes: 0 additions & 3 deletions go/_examples/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,6 @@ import (
func main() {
// create WsService with ConnConf, this is recommended, key and secret will be needed by some channels
// ctx and logger could be nil, they'll be initialized by default
// ws, err := gate.NewWsService(nil, nil, gate.NewConnConf("",
// "YOUR_API_KEY", "YOUR_API_SECRET", 10))
// RECOMMEND this way to get a ConnConf
ws, err := gate.NewWsService(nil, nil, gate.NewConnConfFromOption(&gate.ConfOptions{
Key: "YOUR_API_KEY", Secret: "YOUR_API_SECRET", MaxRetryConn: 10, SkipTlsVerify: false,
}))
Expand Down
8 changes: 8 additions & 0 deletions go/changelog.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,13 @@
# Changelog

## v0.2.7

2022-08-11

- remove client method `NewConnConf`. Recommend to use `NewConnConfFromOption` instead
- add new config field `PingInterval` to send ping message
- add default ping message to avoid to be closed by server

## v0.2.6

2022-05-24
Expand Down
8 changes: 7 additions & 1 deletion go/channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"fmt"
"io"
"net"
"strings"
"time"

"github.com/gorilla/websocket"
Expand Down Expand Up @@ -107,6 +108,10 @@ func (ws *WsService) baseSubscribe(event string, channel string, payload []strin
return err
}

if strings.HasSuffix(channel, "ping") {
return nil
}

if v, ok := ws.conf.subscribeMsg.Load(channel); ok {
if op != nil && op.IsReConnect {
return nil
Expand Down Expand Up @@ -198,7 +203,8 @@ func (ws *WsService) SetCallBack(channel string, call callBack) {
}

func (ws *WsService) receiveCallMsg(channel string, msgCh chan *UpdateMsg) {
defer close(msgCh)
// avoid send closed channel error
// defer close(msgCh)
for {
select {
case <-ws.Ctx.Done():
Expand Down
4 changes: 3 additions & 1 deletion go/channel_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,9 @@ func TestNilCallBack(t *testing.T) {
}

func TestSubscribeFutures(t *testing.T) {
ws, err := NewWsService(nil, nil, NewConnConf(FuturesUsdtUrl, "", "", 10))
ws, err := NewWsService(nil, nil, NewConnConfFromOption(&ConfOptions{
URL: FuturesUsdtUrl, Key: "", Secret: "", MaxRetryConn: 10,
}))
if err != nil {
log.Fatal(err)
}
Expand Down
81 changes: 60 additions & 21 deletions go/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package gatews
import (
"context"
"crypto/tls"
"fmt"
"log"
"os"
"strings"
Expand Down Expand Up @@ -33,6 +34,7 @@ type ConnConf struct {
MaxRetryConn int
SkipTlsVerify bool
ShowReconnectMsg bool
PingInterval string
}

type ConfOptions struct {
Expand All @@ -42,6 +44,7 @@ type ConfOptions struct {
MaxRetryConn int
SkipTlsVerify bool
ShowReconnectMsg bool
PingInterval string
}

func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsService, error) {
Expand All @@ -52,25 +55,25 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS
ctx = context.Background()
}

var cfg *ConnConf
defaultConf := getInitConnConf()
if conf != nil {
cfg = conf
conf = applyOptionConf(defaultConf, conf)
} else {
cfg = getInitConnConf()
conf = defaultConf
}

stop := false
retry := 0
var conn *websocket.Conn
for !stop {
dialer := websocket.DefaultDialer
if cfg.SkipTlsVerify {
if conf.SkipTlsVerify {
dialer.TLSClientConfig = &tls.Config{InsecureSkipVerify: true}
}
c, _, err := dialer.Dial(cfg.URL, nil)
c, _, err := dialer.Dial(conf.URL, nil)
if err != nil {
if retry >= cfg.MaxRetryConn {
log.Printf("max reconnect time %d reached, give it up", cfg.MaxRetryConn)
if retry >= conf.MaxRetryConn {
log.Printf("max reconnect time %d reached, give it up", conf.MaxRetryConn)
return nil, err
}
retry++
Expand All @@ -89,7 +92,7 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS

ws := &WsService{
mu: new(sync.Mutex),
conf: cfg,
conf: conf,
Logger: logger,
Ctx: ctx,
Client: conn,
Expand All @@ -98,6 +101,8 @@ func NewWsService(ctx context.Context, logger *log.Logger, conf *ConnConf) (*WsS
once: new(sync.Once),
}

go ws.activePing()

return ws, nil
}

Expand All @@ -110,25 +115,24 @@ func getInitConnConf() *ConnConf {
URL: BaseUrl,
SkipTlsVerify: false,
ShowReconnectMsg: true,
PingInterval: DefaultPingInterval,
}
}

func NewConnConf(url, key, secret string, maxRetry int) *ConnConf {
if url == "" {
url = BaseUrl
func applyOptionConf(defaultConf, userConf *ConnConf) *ConnConf {
if userConf.URL == "" {
userConf.URL = defaultConf.URL
}
if maxRetry == 0 {
maxRetry = MaxRetryConn

if userConf.MaxRetryConn == 0 {
userConf.MaxRetryConn = defaultConf.MaxRetryConn
}
return &ConnConf{
subscribeMsg: new(sync.Map),
MaxRetryConn: maxRetry,
Key: key,
Secret: secret,
URL: url,
SkipTlsVerify: false,
ShowReconnectMsg: true,

if userConf.PingInterval == "" {
userConf.PingInterval = defaultConf.PingInterval
}

return userConf
}

// NewConnConfFromOption conf from options, recommend using this
Expand All @@ -147,6 +151,7 @@ func NewConnConfFromOption(op *ConfOptions) *ConnConf {
URL: op.URL,
SkipTlsVerify: op.SkipTlsVerify,
ShowReconnectMsg: op.ShowReconnectMsg,
PingInterval: op.PingInterval,
}
}

Expand Down Expand Up @@ -265,3 +270,37 @@ func (ws *WsService) GetChannels() []string {
func (ws *WsService) GetConnection() *websocket.Conn {
return ws.Client
}

func (ws *WsService) activePing() {
du, err := time.ParseDuration(ws.conf.PingInterval)
if err != nil {
ws.Logger.Printf("failed to parse ping interval: %s, use default ping interval 10s instead", ws.conf.PingInterval)
du, err = time.ParseDuration(DefaultPingInterval)
if err != nil {
du = time.Second * 10
}
}

ticker := time.NewTicker(du)
defer ticker.Stop()

for {
select {
case <-ws.Ctx.Done():
return
case <-ticker.C:
subscribeMap := map[string]int{}
ws.conf.subscribeMsg.Range(func(key, value any) bool {
splits := strings.Split(key.(string), ".")
if len(splits) == 2 {
subscribeMap[splits[0]] = 1
}
return true
})

for app := range subscribeMap {
ws.Subscribe(fmt.Sprintf("%s.ping", app), nil)
}
}
}
}
5 changes: 3 additions & 2 deletions go/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,9 @@ func TestGetChannels(t *testing.T) {
}

func TestGetConf(t *testing.T) {
ws, err := NewWsService(nil, nil, NewConnConf(
"", "KEY", "SECRET", 10))
ws, err := NewWsService(nil, nil, NewConnConfFromOption(&ConfOptions{
URL: "", Key: "KEY", Secret: "SECRET", MaxRetryConn: 10,
}))
if err != nil {
log.Fatal(err)
}
Expand Down
2 changes: 2 additions & 0 deletions go/constant.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,6 @@ const (

ServiceTypeSpot = 1
ServiceTypeFutures = 2

DefaultPingInterval = "10s"
)
15 changes: 8 additions & 7 deletions go/go.mod
Original file line number Diff line number Diff line change
@@ -1,14 +1,15 @@
module github.com/gateio/gatews/go

go 1.15
go 1.18

require (
github.com/deckarep/golang-set v1.7.1
github.com/gansidui/skiplist v0.0.0-20141121051332-c6a909ce563b
github.com/gorilla/websocket v1.4.2
github.com/shopspring/decimal v1.2.0
github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba
//github.com/shopspring/decimal v1.2.0 // for test
//github.com/gansidui/skiplist v0.0.0-20141121051332-c6a909ce563b // for test
//github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba // for test
)

// just for test and examples
//require (
// github.com/gansidui/skiplist v0.0.0-20141121051332-c6a909ce563b // indirect
// github.com/shopspring/decimal v1.3.1 // indirect
// github.com/yireyun/go-queue v0.0.0-20220725040158-a4dd64810e1e // indirect
//)
6 changes: 0 additions & 6 deletions go/go.sum
Original file line number Diff line number Diff line change
@@ -1,10 +1,4 @@
github.com/deckarep/golang-set v1.7.1 h1:SCQV0S6gTtp6itiFrTqI+pfmJ4LN85S1YzhDf9rTHJQ=
github.com/deckarep/golang-set v1.7.1/go.mod h1:93vsz/8Wt4joVM7c2AVqh+YRMiUSc14yDtF28KmMOgQ=
github.com/gansidui/skiplist v0.0.0-20141121051332-c6a909ce563b h1:MAoeneEI/UCOxABHa7aU2+8dqM89Uaj6tSMFxr1wbe0=
github.com/gansidui/skiplist v0.0.0-20141121051332-c6a909ce563b/go.mod h1:8VKNiVGGPIhJE0qomrfsTKscI5iypji4r9wt4gEUDWE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/shopspring/decimal v1.2.0 h1:abSATXmQEYyShuxI4/vyW3tV1MrKAJzCZ/0zLUXYbsQ=
github.com/shopspring/decimal v1.2.0/go.mod h1:DKyhrW/HYNuLGql+MJL6WCR6knT2jwCFRcu2hWCYk4o=
github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba h1:z2jLif5Ec1ZMr/Aq2qav4L53ZFCCfsO6I2RSjWo9ltI=
github.com/yireyun/go-queue v0.0.0-20210520035143-72b190eafcba/go.mod h1:NS8O3p7NiPwC1Yw9xTd9DnDBguxFJG/BL1VPTSfJ5Gw=

0 comments on commit 067027e

Please sign in to comment.