Skip to content

Commit

Permalink
feat: Add dead callback for conn
Browse files Browse the repository at this point in the history
  • Loading branch information
KoNekoD committed May 20, 2024
1 parent 0cfc93e commit 1a7623f
Show file tree
Hide file tree
Showing 7 changed files with 18 additions and 3 deletions.
2 changes: 2 additions & 0 deletions telegram/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,6 +89,7 @@ type Client struct {
// Connection factory fields.
create connConstructor // immutable
resolver dcs.Resolver // immutable
onDead func() // immutable
connBackoff func() backoff.BackOff // immutable
defaultMode manager.ConnMode // immutable
connsCounter atomic.Int64
Expand Down Expand Up @@ -159,6 +160,7 @@ func NewClient(appID int, appHash string, opt Options) *Client {
resolver: opt.Resolver,
defaultMode: mode,
connBackoff: opt.ReconnectionBackoff,
onDead: opt.OnDead,
clock: opt.Clock,
device: opt.Device,
migrationTimeout: opt.MigrationTimeout,
Expand Down
4 changes: 3 additions & 1 deletion telegram/conn_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,13 +68,14 @@ func (c *Client) primaryDC(dc int) mtproto.Dialer {
}

func (c *Client) createPrimaryConn(setup manager.SetupCallback) pool.Conn {
return c.createConn(0, c.defaultMode, setup)
return c.createConn(0, c.defaultMode, setup, c.onDead)
}

func (c *Client) createConn(
id int64,
mode manager.ConnMode,
setup manager.SetupCallback,
onDead func(),
) pool.Conn {
opts, s := c.session.Options(c.opts)
opts.Logger = c.log.Named("conn").With(
Expand All @@ -90,6 +91,7 @@ func (c *Client) createConn(
Device: c.device,
Handler: c.asHandler(),
Setup: setup,
OnDead: onDead,
},
)
}
6 changes: 6 additions & 0 deletions telegram/internal/manager/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ type Conn struct {
// This is necessary to transfer auth from previous connection to another DC.
setup SetupCallback // nilable

// onDead is called on connection death.
onDead func()

// Wrappers for external world, like logs or PRNG.
// Should be immutable.
clock clock.Clock // immutable
Expand Down Expand Up @@ -123,6 +126,9 @@ func (c *Conn) Run(ctx context.Context) (err error) {
defer func() {
if err != nil && ctx.Err() == nil {
c.log.Debug("Connection dead", zap.Error(err))
if c.onDead != nil {
c.onDead()
}
}
}()
return c.proto.Run(ctx, func(ctx context.Context) error {
Expand Down
2 changes: 2 additions & 0 deletions telegram/internal/manager/create.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type ConnOptions struct {
Device DeviceConfig
Handler Handler
Setup SetupCallback
OnDead func()
Backoff func(ctx context.Context) backoff.BackOff
}

Expand Down Expand Up @@ -69,6 +70,7 @@ func CreateConn(
gotConfig: tdsync.NewReady(),
dead: tdsync.NewReady(),
setup: connOpts.Setup,
onDead: connOpts.OnDead,
connBackoff: connOpts.Backoff,
}

Expand Down
2 changes: 1 addition & 1 deletion telegram/migrate_to_dc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,7 @@ func newMigrationClient(t *testing.T, h migrationTestHandler) *Client {
migrationTimeout: 10 * time.Second,
}
client.init()
client.conn = client.createConn(0, manager.ConnModeUpdates, nil)
client.conn = client.createConn(0, manager.ConnModeUpdates, nil, nil)
client.cfg.Store(cfg)
return client
}
Expand Down
2 changes: 2 additions & 0 deletions telegram/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ type Options struct {

// ReconnectionBackoff configures and returns reconnection backoff object.
ReconnectionBackoff func() backoff.BackOff
// OnDead will be called on connection dead.
OnDead func()
// MigrationTimeout configures migration timeout.
MigrationTimeout time.Duration

Expand Down
3 changes: 2 additions & 1 deletion telegram/pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func (c *Client) Pool(max int64) (CloseInvoker, error) {
s := c.session.Load()
return c.createPool(s.DC, max, func() pool.Conn {
id := c.connsCounter.Inc()
return c.createConn(id, manager.ConnModeData, nil)
return c.createConn(id, manager.ConnModeData, nil, c.onDead)
})
}

Expand Down Expand Up @@ -87,6 +87,7 @@ func (c *Client) dc(ctx context.Context, dcID int, max int64, dialer mtproto.Dia
DC: dcID,
Device: c.device,
Handler: c.asHandler(),
OnDead: c.onDead,
},
)
})
Expand Down

0 comments on commit 1a7623f

Please sign in to comment.