From ccfc60a1db3fbb7c084f2242df07048c156c1def Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 28 Nov 2024 13:14:39 +0300 Subject: [PATCH 1/5] feat(telegram): faster reconnects --- telegram/client.go | 23 ++++++++++++----------- telegram/connect.go | 12 +++++++++++- telegram/connect_test.go | 2 +- telegram/migrate_to_dc_test.go | 2 +- telegram/options.go | 2 ++ 5 files changed, 27 insertions(+), 14 deletions(-) diff --git a/telegram/client.go b/telegram/client.go index 3f9c4c85eb..72b1c82370 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -59,12 +59,12 @@ type Client struct { // Ref: https://pkg.go.dev/sync/atomic#pkg-note-BUG // Connection factory fields. - connsCounter atomic.Int64 - create connConstructor // immutable - resolver dcs.Resolver // immutable - onDead func() // immutable - connBackoff func() backoff.BackOff // immutable - defaultMode manager.ConnMode // immutable + connsCounter atomic.Int64 + create connConstructor // immutable + resolver dcs.Resolver // immutable + onDead func() // immutable + newConnBackoff func() backoff.BackOff // immutable + defaultMode manager.ConnMode // immutable // Migration state. migrationTimeout time.Duration // immutable @@ -90,10 +90,11 @@ type Client struct { testDC bool // immutable // Connection state. Guarded by connMux. - session *pool.SyncSession - cfg *manager.AtomicConfig - conn clientConn - connMux sync.Mutex + session *pool.SyncSession + cfg *manager.AtomicConfig + conn clientConn + connBackoff backoff.BackOff + connMux sync.Mutex // Restart signal channel. restart chan struct{} // immutable @@ -160,7 +161,7 @@ func NewClient(appID int, appHash string, opt Options) *Client { create: defaultConstructor(), resolver: opt.Resolver, defaultMode: mode, - connBackoff: opt.ReconnectionBackoff, + newConnBackoff: opt.ReconnectionBackoff, onDead: opt.OnDead, clock: opt.Clock, device: opt.Device, diff --git a/telegram/connect.go b/telegram/connect.go index f93ebbaf22..73b7065f45 100644 --- a/telegram/connect.go +++ b/telegram/connect.go @@ -60,7 +60,10 @@ func (c *Client) isPermanentError(err error) bool { func (c *Client) reconnectUntilClosed(ctx context.Context) error { // Note that we currently have no timeout on connection, so this is // potentially eternal. - b := tdsync.SyncBackoff(backoff.WithContext(c.connBackoff(), ctx)) + b := tdsync.SyncBackoff(backoff.WithContext(c.newConnBackoff(), ctx)) + c.connMux.Lock() + c.connBackoff = b + c.connMux.Unlock() return backoff.RetryNotify(func() error { if err := c.runUntilRestart(ctx); err != nil { @@ -83,6 +86,13 @@ func (c *Client) reconnectUntilClosed(ctx context.Context) error { func (c *Client) onReady() { c.log.Debug("Ready") c.ready.Signal() + + c.connMux.Lock() + if b := c.connBackoff; b != nil { + // Reconnect faster next time. + b.Reset() + } + c.connMux.Unlock() } func (c *Client) resetReady() { diff --git a/telegram/connect_test.go b/telegram/connect_test.go index 4166cf6284..8d6aaf82c6 100644 --- a/telegram/connect_test.go +++ b/telegram/connect_test.go @@ -33,7 +33,7 @@ func (m fingerprintNotFoundConn) Ready() <-chan struct{} { func TestClient_reconnectUntilClosed(t *testing.T) { client := Client{ - connBackoff: func() backoff.BackOff { + newConnBackoff: func() backoff.BackOff { return backoff.NewConstantBackOff(time.Nanosecond) }, log: zap.NewNop(), diff --git a/telegram/migrate_to_dc_test.go b/telegram/migrate_to_dc_test.go index 66ff791b8e..a4b9fd39c4 100644 --- a/telegram/migrate_to_dc_test.go +++ b/telegram/migrate_to_dc_test.go @@ -110,7 +110,7 @@ func newMigrationClient(t *testing.T, h migrationTestHandler) *Client { session: pool.NewSyncSession(pool.Session{ DC: 2, }), - connBackoff: defaultBackoff(clock.System), + newConnBackoff: defaultBackoff(clock.System), ctx: context.Background(), cancel: func() {}, migrationTimeout: 10 * time.Second, diff --git a/telegram/options.go b/telegram/options.go index be64f23514..9c271fd149 100644 --- a/telegram/options.go +++ b/telegram/options.go @@ -156,6 +156,8 @@ func defaultBackoff(c clock.Clock) func() backoff.BackOff { b := backoff.NewExponentialBackOff() b.Clock = c b.MaxElapsedTime = 0 + b.MaxInterval = time.Second * 5 + b.InitialInterval = time.Millisecond * 100 return b } } From db21b9f08f556069070572040a987b6b742898bd Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 28 Nov 2024 13:14:43 +0300 Subject: [PATCH 2/5] chore(deps): update --- examples/go.mod | 2 +- examples/go.sum | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/examples/go.mod b/examples/go.mod index cdaf8c4005..3a37ddd9f3 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -33,7 +33,7 @@ require ( github.com/golang/snappy v0.0.4 // indirect github.com/gotd/ige v0.2.2 // indirect github.com/gotd/neo v0.1.5 // indirect - github.com/klauspost/compress v1.17.10 // indirect + github.com/klauspost/compress v1.17.11 // indirect github.com/kr/pretty v0.3.1 // indirect github.com/kr/text v0.2.0 // indirect github.com/pkg/errors v0.9.1 // indirect diff --git a/examples/go.sum b/examples/go.sum index e1e6709cdc..60323b375f 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -52,6 +52,7 @@ github.com/klauspost/compress v1.17.8 h1:YcnTYrq7MikUT7k0Yb5eceMmALQPYBW/Xltxn0N github.com/klauspost/compress v1.17.8/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/compress v1.17.9/go.mod h1:Di0epgTjJY877eYKx5yC51cX2A2Vl2ibi7bDH9ttBbw= github.com/klauspost/compress v1.17.10/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= +github.com/klauspost/compress v1.17.11/go.mod h1:pMDklpSncoRMuLFrf1W9Ss9KT+0rH90U12bZKk7uwG0= github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= github.com/kr/pretty v0.3.1/go.mod h1:hoEshYVHaxMs3cyo3Yncou5ZscifuDolrwPKZanG3xk= github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= From ac1f960feb71b079a19a388615f9a2dc8229a59d Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 28 Nov 2024 13:22:27 +0300 Subject: [PATCH 3/5] fix(telegram): handle auth errors on reconnect Ref #1458 --- telegram/connect.go | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/telegram/connect.go b/telegram/connect.go index 73b7065f45..5da1948392 100644 --- a/telegram/connect.go +++ b/telegram/connect.go @@ -12,6 +12,7 @@ import ( "github.com/gotd/td/exchange" "github.com/gotd/td/tdsync" "github.com/gotd/td/telegram/auth" + "github.com/gotd/td/tgerr" ) func (c *Client) runUntilRestart(ctx context.Context) error { @@ -54,7 +55,17 @@ func (c *Client) runUntilRestart(ctx context.Context) error { } func (c *Client) isPermanentError(err error) bool { - return errors.Is(err, exchange.ErrKeyFingerprintNotFound) + // See https://github.com/gotd/td/issues/1458. + if errors.Is(err, exchange.ErrKeyFingerprintNotFound) { + return true + } + if tgerr.Is(err, "AUTH_KEY_UNREGISTERED", "SESSION_EXPIRED") { + return true + } + if auth.IsUnauthorized(err) { + return true + } + return false } func (c *Client) reconnectUntilClosed(ctx context.Context) error { From b5ad1c68671d219574b3efc212db248c8665d2ed Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 28 Nov 2024 13:31:30 +0300 Subject: [PATCH 4/5] fix(telegram): use atomic pointer for connBackoff --- telegram/client.go | 2 +- telegram/connect.go | 10 +++------- 2 files changed, 4 insertions(+), 8 deletions(-) diff --git a/telegram/client.go b/telegram/client.go index 72b1c82370..32ffa4eaf1 100644 --- a/telegram/client.go +++ b/telegram/client.go @@ -93,7 +93,7 @@ type Client struct { session *pool.SyncSession cfg *manager.AtomicConfig conn clientConn - connBackoff backoff.BackOff + connBackoff atomic.Pointer[backoff.BackOff] connMux sync.Mutex // Restart signal channel. diff --git a/telegram/connect.go b/telegram/connect.go index 5da1948392..3b359dbc3d 100644 --- a/telegram/connect.go +++ b/telegram/connect.go @@ -72,9 +72,7 @@ func (c *Client) reconnectUntilClosed(ctx context.Context) error { // Note that we currently have no timeout on connection, so this is // potentially eternal. b := tdsync.SyncBackoff(backoff.WithContext(c.newConnBackoff(), ctx)) - c.connMux.Lock() - c.connBackoff = b - c.connMux.Unlock() + c.connBackoff.Store(&b) return backoff.RetryNotify(func() error { if err := c.runUntilRestart(ctx); err != nil { @@ -98,12 +96,10 @@ func (c *Client) onReady() { c.log.Debug("Ready") c.ready.Signal() - c.connMux.Lock() - if b := c.connBackoff; b != nil { + if b := c.connBackoff.Load(); b != nil { // Reconnect faster next time. - b.Reset() + (*b).Reset() } - c.connMux.Unlock() } func (c *Client) resetReady() { From b3e36980a27002d0d58b98bb346e243000a2f448 Mon Sep 17 00:00:00 2001 From: Aleksandr Razumov Date: Thu, 28 Nov 2024 13:33:11 +0300 Subject: [PATCH 5/5] test(entity): fix --- telegram/message/entity/format_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/telegram/message/entity/format_test.go b/telegram/message/entity/format_test.go index 2a42d833a8..c20649ef33 100644 --- a/telegram/message/entity/format_test.go +++ b/telegram/message/entity/format_test.go @@ -54,7 +54,7 @@ func TestBuilder_GrowText(t *testing.T) { ) b.GrowText(100) - a.Equal(100, b.message.Cap()) + a.LessOrEqual(100, b.message.Cap()) } func TestBuilder_GrowEntities(t *testing.T) {