Skip to content

Commit

Permalink
fix: structured concurrency
Browse files Browse the repository at this point in the history
  • Loading branch information
ernado committed Apr 15, 2023
1 parent f6d112d commit 665ad92
Show file tree
Hide file tree
Showing 8 changed files with 39 additions and 37 deletions.
2 changes: 1 addition & 1 deletion .golangci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ linters:
enable:
- depguard
- dogsled
- dupl
# - dupl
- errcheck
- gochecknoinits
- goconst
Expand Down
3 changes: 3 additions & 0 deletions telegram/updates/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,9 @@ func (cfg *Config) setDefaults() {
if cfg.TracerProvider == nil {
cfg.TracerProvider = trace.NewNoopTracerProvider()
}
if cfg.Storage == nil {
cfg.Storage = newMemStorage()
}
if cfg.OnChannelTooLong == nil {
cfg.OnChannelTooLong = func(channelID int64) {
cfg.Logger.Error("Difference too long", zap.Int64("channel_id", channelID))
Expand Down
2 changes: 1 addition & 1 deletion telegram/updates/internal/e2e/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func testManager(t *testing.T, f func(s *server, storage updates.StateStorage) c
})

t.Log("Waiting for shutdown")
require.NoError(t, g.Wait())
require.ErrorIs(t, g.Wait(), context.Canceled)

t.Log("Checking")
require.Equal(t, s.messages, h.messages)
Expand Down
6 changes: 0 additions & 6 deletions telegram/updates/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -157,12 +157,6 @@ func (m *Manager) Run(ctx context.Context, api API, userID int64, opt AuthOption
wg.Go(func() error {
return m.state.Run(ctx)
})
wg.Go(func() error {
<-ctx.Done()
lg.Debug("Stopping")
m.state.Stop()
return nil
})
lg.Debug("Wait")
return wg.Wait()
}
Expand Down
6 changes: 6 additions & 0 deletions telegram/updates/sequence_box.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,9 @@ func newSequenceBox(cfg sequenceConfig) *sequenceBox {
}

func (s *sequenceBox) Handle(ctx context.Context, u update) error {
ctx, span := s.tracer.Start(ctx, "sequenceBox.Handle")
defer span.End()

log := s.log.With(zap.Int("upd_from", u.start()), zap.Int("upd_to", u.end()))
if checkGap(s.state, u.State, u.Count) == gapIgnore {
log.Debug("Outdated update, skipping", zap.Int("internalState", s.state))
Expand Down Expand Up @@ -110,6 +113,9 @@ func (s *sequenceBox) Handle(ctx context.Context, u update) error {
}

func (s *sequenceBox) applyPending(ctx context.Context) error {
ctx, span := s.tracer.Start(ctx, "sequenceBox.applyPending")
defer span.End()

sort.SliceStable(s.pending, func(i, j int) bool {
return s.pending[i].start() < s.pending[j].start()
})
Expand Down
24 changes: 7 additions & 17 deletions telegram/updates/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,7 @@ func newState(ctx context.Context, cfg stateConfig) *internalState {
state := s.newChannelState(id, info.AccessHash, info.Pts)
s.channels[id] = state
s.wg.Go(func() error {
state.Run(ctx)
return nil
return state.Run(ctx)
})
}

Expand All @@ -138,27 +137,19 @@ func (s *internalState) Push(ctx context.Context, u tg.UpdatesClass) error {
}
}

func (s *internalState) Stop() {
close(s.externalQueue)
for _, c := range s.channels {
c.Stop()
}
}

func (s *internalState) Run(ctx context.Context) error {
s.log.Debug("Starting updates handler")
defer s.log.Debug("Updates handler stopped")
s.getDifferenceLogger(ctx)

for {
select {
case u, ok := <-s.externalQueue:
if !ok {
if len(s.pts.pending) > 0 || len(s.qts.pending) > 0 || len(s.seq.pending) > 0 {
s.getDifferenceLogger(ctx)
}
return nil
case <-ctx.Done():
if len(s.pts.pending) > 0 || len(s.qts.pending) > 0 || len(s.seq.pending) > 0 {
s.getDifferenceLogger(ctx)
}
return ctx.Err()
case u := <-s.externalQueue:
ctx := trace.ContextWithSpanContext(ctx, u.span)
if err := s.handleUpdates(ctx, u.update); err != nil {
s.log.Error("Handle updates error", zap.Error(err))
Expand Down Expand Up @@ -328,8 +319,7 @@ func (s *internalState) handleChannel(ctx context.Context, channelID int64, date
state = s.newChannelState(channelID, accessHash, localPts)
s.channels[channelID] = state
s.wg.Go(func() error {
state.Run(ctx)
return nil
return state.Run(ctx)
})
}

Expand Down
12 changes: 12 additions & 0 deletions telegram/updates/state_apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package updates
import (
"context"

"go.opentelemetry.io/otel/trace"
"go.uber.org/zap"

"github.com/gotd/td/tg"
Expand Down Expand Up @@ -33,6 +34,9 @@ func (s *internalState) applySeq(ctx context.Context, state int, updates []updat
}

func (s *internalState) applyCombined(ctx context.Context, comb *tg.UpdatesCombined) (ptsChanged bool, err error) {
ctx, span := s.tracer.Start(ctx, "internalState.applyCombined")
defer span.End()

var (
ents = entities{
Users: comb.Users,
Expand All @@ -56,6 +60,7 @@ func (s *internalState) applyCombined(ctx context.Context, comb *tg.UpdatesCombi
if err := st.Push(ctx, channelUpdate{
update: u,
entities: ents,
span: trace.SpanContextFromContext(ctx),
}); err != nil {
s.log.Error("Push channel update error", zap.Error(err))
}
Expand All @@ -78,6 +83,7 @@ func (s *internalState) applyCombined(ctx context.Context, comb *tg.UpdatesCombi
if err := s.handleChannel(ctx, channelID, comb.Date, pts, ptsCount, channelUpdate{
update: u,
entities: ents,
span: trace.SpanContextFromContext(ctx),
}); err != nil {
s.log.Error("Handle channel update error", zap.Error(err))
}
Expand Down Expand Up @@ -131,6 +137,9 @@ func (s *internalState) applyCombined(ctx context.Context, comb *tg.UpdatesCombi
}

func (s *internalState) applyPts(ctx context.Context, state int, updates []update) error {
ctx, span := s.tracer.Start(ctx, "internalState.applyPts")
defer span.End()

var (
converted []tg.UpdateClass
ents entities
Expand All @@ -157,6 +166,9 @@ func (s *internalState) applyPts(ctx context.Context, state int, updates []updat
}

func (s *internalState) applyQts(ctx context.Context, state int, updates []update) error {
ctx, span := s.tracer.Start(ctx, "internalState.applyQts")
defer span.End()

var (
converted []tg.UpdateClass
ents entities
Expand Down
21 changes: 9 additions & 12 deletions telegram/updates/state_channel.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,31 +95,28 @@ func (s *channelState) Push(ctx context.Context, u channelUpdate) error {
}
}

func (s *channelState) Stop() {
close(s.updates)
}

func (s *channelState) Run(ctx context.Context) {
func (s *channelState) Run(ctx context.Context) error {
// Subscribe to channel updates.
if err := s.getDifference(ctx); err != nil {
s.log.Error("Failed to subscribe to channel updates", zap.Error(err))
}

for {
select {
case u, ok := <-s.updates:
if !ok {
if len(s.pts.pending) > 0 {
s.getDifferenceLogger(ctx)
}
return
}
case u := <-s.updates:
ctx := trace.ContextWithSpanContext(ctx, u.span)
if err := s.handleUpdate(ctx, u.update, u.entities); err != nil {
s.log.Error("Handle update error", zap.Error(err))
}
case <-s.pts.gapTimeout.C:
s.log.Debug("Gap timeout")
s.getDifferenceLogger(ctx)
case <-ctx.Done():
if len(s.pts.pending) > 0 {
// This will probably fail.
s.getDifferenceLogger(ctx)
}
return ctx.Err()
case <-s.idleTimeout.C:
s.log.Debug("Idle timeout")
s.resetIdleTimer()
Expand Down

0 comments on commit 665ad92

Please sign in to comment.