Skip to content

Commit

Permalink
Merge pull request #1477 from gotd/feat/improve-reconnect-backoff
Browse files Browse the repository at this point in the history
feat(telegram): better reconnects
  • Loading branch information
ernado authored Nov 28, 2024
2 parents 6937c38 + b3e3698 commit 0ad2b33
Show file tree
Hide file tree
Showing 8 changed files with 38 additions and 17 deletions.
2 changes: 1 addition & 1 deletion examples/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ require (
github.com/golang/snappy v0.0.4 // indirect
github.com/gotd/ige v0.2.2 // indirect
github.com/gotd/neo v0.1.5 // indirect
github.com/klauspost/compress v1.17.10 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/kr/text v0.2.0 // indirect
github.com/pkg/errors v0.9.1 // indirect
Expand Down
1 change: 1 addition & 0 deletions examples/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N
github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw=
github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
Expand Down
23 changes: 12 additions & 11 deletions telegram/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,12 +59,12 @@ type Client struct {
// Ref: https://pkg.go.dev/sync/atomic#pkg-note-BUG

// Connection factory fields.
connsCounter atomic.Int64
create connConstructor // immutable
resolver dcs.Resolver // immutable
onDead func() // immutable
connBackoff func() backoff.BackOff // immutable
defaultMode manager.ConnMode // immutable
connsCounter atomic.Int64
create connConstructor // immutable
resolver dcs.Resolver // immutable
onDead func() // immutable
newConnBackoff func() backoff.BackOff // immutable
defaultMode manager.ConnMode // immutable

// Migration state.
migrationTimeout time.Duration // immutable
Expand All @@ -90,10 +90,11 @@ type Client struct {
testDC bool // immutable

// Connection state. Guarded by connMux.
session *pool.SyncSession
cfg *manager.AtomicConfig
conn clientConn
connMux sync.Mutex
session *pool.SyncSession
cfg *manager.AtomicConfig
conn clientConn
connBackoff atomic.Pointer[backoff.BackOff]
connMux sync.Mutex

// Restart signal channel.
restart chan struct{} // immutable
Expand Down Expand Up @@ -160,7 +161,7 @@ func NewClient(appID int, appHash string, opt Options) *Client {
create: defaultConstructor(),
resolver: opt.Resolver,
defaultMode: mode,
connBackoff: opt.ReconnectionBackoff,
newConnBackoff: opt.ReconnectionBackoff,
onDead: opt.OnDead,
clock: opt.Clock,
device: opt.Device,
Expand Down
21 changes: 19 additions & 2 deletions telegram/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
"github.com/gotd/td/exchange"
"github.com/gotd/td/tdsync"
"github.com/gotd/td/telegram/auth"
"github.com/gotd/td/tgerr"
)

func (c *Client) runUntilRestart(ctx context.Context) error {
Expand Down Expand Up @@ -54,13 +55,24 @@ func (c *Client) runUntilRestart(ctx context.Context) error {
}

func (c *Client) isPermanentError(err error) bool {
return errors.Is(err, exchange.ErrKeyFingerprintNotFound)
// See https://github.com/gotd/td/issues/1458.
if errors.Is(err, exchange.ErrKeyFingerprintNotFound) {
return true
}
if tgerr.Is(err, "AUTH_KEY_UNREGISTERED", "SESSION_EXPIRED") {
return true
}
if auth.IsUnauthorized(err) {
return true
}
return false
}

func (c *Client) reconnectUntilClosed(ctx context.Context) error {
// Note that we currently have no timeout on connection, so this is
// potentially eternal.
b := tdsync.SyncBackoff(backoff.WithContext(c.connBackoff(), ctx))
b := tdsync.SyncBackoff(backoff.WithContext(c.newConnBackoff(), ctx))
c.connBackoff.Store(&b)

return backoff.RetryNotify(func() error {
if err := c.runUntilRestart(ctx); err != nil {
Expand All @@ -83,6 +95,11 @@ func (c *Client) reconnectUntilClosed(ctx context.Context) error {
func (c *Client) onReady() {
c.log.Debug("Ready")
c.ready.Signal()

if b := c.connBackoff.Load(); b != nil {
// Reconnect faster next time.
(*b).Reset()
}
}

func (c *Client) resetReady() {
Expand Down
2 changes: 1 addition & 1 deletion telegram/connect_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ func (m fingerprintNotFoundConn) Ready() <-chan struct{} {

func TestClient_reconnectUntilClosed(t *testing.T) {
client := Client{
connBackoff: func() backoff.BackOff {
newConnBackoff: func() backoff.BackOff {
return backoff.NewConstantBackOff(time.Nanosecond)
},
log: zap.NewNop(),
Expand Down
2 changes: 1 addition & 1 deletion telegram/message/entity/format_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ func TestBuilder_GrowText(t *testing.T) {
)

b.GrowText(100)
a.Equal(100, b.message.Cap())
a.LessOrEqual(100, b.message.Cap())
}

func TestBuilder_GrowEntities(t *testing.T) {
Expand Down
2 changes: 1 addition & 1 deletion telegram/migrate_to_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func newMigrationClient(t *testing.T, h migrationTestHandler) *Client {
session: pool.NewSyncSession(pool.Session{
DC: 2,
}),
connBackoff: defaultBackoff(clock.System),
newConnBackoff: defaultBackoff(clock.System),
ctx: context.Background(),
cancel: func() {},
migrationTimeout: 10 * time.Second,
Expand Down
2 changes: 2 additions & 0 deletions telegram/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,8 @@ func defaultBackoff(c clock.Clock) func() backoff.BackOff {
b := backoff.NewExponentialBackOff()
b.Clock = c
b.MaxElapsedTime = 0
b.MaxInterval = time.Second * 5
b.InitialInterval = time.Millisecond * 100
return b
}
}

0 comments on commit 0ad2b33

Please sign in to comment.