Skip to content

Commit

Permalink
Better shutdown handling
Browse files Browse the repository at this point in the history
  • Loading branch information
ericvolp12 committed Oct 6, 2024
1 parent d420dfe commit 9d5cb2c
Show file tree
Hide file tree
Showing 2 changed files with 52 additions and 6 deletions.
42 changes: 37 additions & 5 deletions cmd/jetstream/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -332,17 +332,49 @@ func Jetstream(cctx *cli.Context) error {
}

log.Info("shutting down, waiting for workers to clean up...")

close(shutdownRepoStream)
close(shutdownLivenessChecker)
close(shutdownCursorManager)
close(shutdownEcho)
close(shutdownMetrics)

<-repoStreamShutdown
<-livenessCheckerShutdown
<-cursorManagerShutdown
<-echoShutdown
<-metricsShutdown
shutdownTimeout := time.After(10 * time.Second)

select {
case <-repoStreamShutdown:
log.Info("Repo stream shutdown completed")
case <-shutdownTimeout:
log.Warn("Shutdown timeout reached for repo stream")
}

select {
case <-livenessCheckerShutdown:
log.Info("Liveness checker shutdown completed")
case <-shutdownTimeout:
log.Warn("Shutdown timeout reached for liveness checker")
}

select {
case <-cursorManagerShutdown:
log.Info("Cursor manager shutdown completed")
case <-shutdownTimeout:
log.Warn("Shutdown timeout reached for cursor manager")
}

select {
case <-echoShutdown:
log.Info("Echo shutdown completed")
case <-shutdownTimeout:
log.Warn("Shutdown timeout reached for echo server")
}

select {
case <-metricsShutdown:
log.Info("Metrics shutdown completed")
case <-shutdownTimeout:
log.Warn("Shutdown timeout reached for metrics server")
}

c.Shutdown()

Expand Down
16 changes: 15 additions & 1 deletion pkg/consumer/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,13 @@ func NewConsumer(
func (c *Consumer) HandleStreamEvent(ctx context.Context, xe *events.XRPCStreamEvent) error {
ctx, span := tracer.Start(ctx, "HandleStreamEvent")
defer span.End()

select {
case <-ctx.Done():
return ctx.Err()
default:
}

switch {
case xe.RepoCommit != nil:
eventsProcessedCounter.WithLabelValues("commit", c.SocketURL).Inc()
Expand Down Expand Up @@ -373,7 +380,14 @@ func (c *Consumer) RunSequencer(ctx context.Context) error {
}

func (c *Consumer) Shutdown() {
shutdownTimeout := time.After(10 * time.Second)
shutdown := make(chan struct{})
c.sequencerShutdown <- shutdown
<-shutdown

select {
case <-shutdownTimeout:
c.logger.Warn("sequencer shutdown timed out")
case <-shutdown:
c.logger.Info("sequencer shutdown complete")
}
}

0 comments on commit 9d5cb2c

Please sign in to comment.