Skip to content

Commit

Permalink
Gracefully shutdown app
Browse files Browse the repository at this point in the history
  • Loading branch information
artjoma committed Dec 9, 2023
1 parent 38519f6 commit c2446e4
Show file tree
Hide file tree
Showing 3 changed files with 40 additions and 13 deletions.
23 changes: 16 additions & 7 deletions engine/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package engine

import (
"bytes"
"context"
"errors"
"github.com/cockroachdb/pebble"
"log/slog"
Expand All @@ -20,9 +21,10 @@ type Engine struct {
commandChan chan ProcessorCommand
fileUploader *FileUploader
fileDownloader *FileDownloader
appCtx context.Context
}

func NewEngine(tempFolder string, db *mosaicdb.Database, dataStorage *storage.Storage,
func NewEngine(appCtx context.Context, tempFolder string, db *mosaicdb.Database, dataStorage *storage.Storage,
uploadWorkersCount int) (*Engine, error) {
// create folder if not exists
err := os.MkdirAll(tempFolder, 0774)
Expand All @@ -33,6 +35,7 @@ func NewEngine(tempFolder string, db *mosaicdb.Database, dataStorage *storage.St
db: db,
storage: dataStorage,
fileDownloader: NewFileDownloader(dataStorage),
appCtx: appCtx,
}
engine.fileUploader = NewFileUploader(engine, uploadWorkersCount)
// prepare peers clients
Expand All @@ -58,13 +61,19 @@ func (engine *Engine) ExecuteCmdAsync(cmd ProcessorCommand) {
// Try to prepare command/finalize command before/after putting command to queue
// Thinking twice before add new logic
func (engine *Engine) startCommandListener() {
for cmd := range engine.commandChan {
now := time.Now()
if err := cmd.Execute(); err != nil {
slog.Error("Failed to execute", "cmd", cmd.getCmdId(), "err", err.Error())
}
for {
select {
case <-engine.appCtx.Done():
slog.Info("Finalize command listener")
return
case cmd := <-engine.commandChan:
now := time.Now()
if err := cmd.Execute(); err != nil {
slog.Error("Failed to execute", "cmd", cmd.getCmdId(), "err", err.Error())
}

slog.Info("Processor", "cmd", cmd.getCmdId(), "took(ms)", time.Since(now).Milliseconds())
slog.Info("Processor", "cmd", cmd.getCmdId(), "took(ms)", time.Since(now).Milliseconds())
}
}
}

Expand Down
2 changes: 2 additions & 0 deletions examples/test_0.sh
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
set -x

# prepare folders
mkdir -p download
# download files after fresh setup
Expand Down
28 changes: 22 additions & 6 deletions main.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package main

import (
"context"
"github.com/caarlos0/env/v10"
"log/slog"
"mosaic/api"
Expand All @@ -9,6 +10,9 @@ import (
"mosaic/storage"
"mosaic/types"
"mosaic/utils"
"os"
"os/signal"
"syscall"
)

func main() {
Expand All @@ -17,18 +21,30 @@ func main() {
appCfg := &types.AppConfig{}
utils.PanicIfErr(env.Parse(appCfg))
slog.Info("Config ok.")
setup(appCfg)
sigs := make(chan os.Signal, 1) // we need to reserve to buffer size 1, so the notifier are not blocked
signal.Notify(sigs, os.Interrupt, syscall.SIGTERM)
cancelCtx := setup(appCfg)

<-sigs
cancelCtx()
}

func setup(appCfg *types.AppConfig) {
func setup(appCfg *types.AppConfig) context.CancelFunc {
database := mosaicdb.NewDatabase(appCfg.DbPath)
storage := storage.NewStorage()
engine, err := engine.NewEngine(appCfg.FilesTempFolder, database, storage, int(appCfg.FileUploadWorkersCount))
appCtx, cancel := context.WithCancel(context.Background())
engine, err := engine.NewEngine(appCtx, appCfg.FilesTempFolder, database, storage, int(appCfg.FileUploadWorkersCount))
if err != nil {
panic(err)
}

// setup http API
httpApi := api.NewHttpApi(appCfg.ApiHttpAddress, appCfg.ApiHttpPort, engine)
// block main thread
httpApi.SetupHttpServer()
// TODO gracefully shutdown using OS signals
go func() {
// block main thread
httpApi.SetupHttpServer()
}()

return cancel

}

0 comments on commit c2446e4

Please sign in to comment.