Skip to content

Commit

Permalink
Implement a clean close or rrd database (#1923)
Browse files Browse the repository at this point in the history
Also fix corrupt db files by deleting the corrupt db
the detection is done indirectly by running an integrity check in a
separate process because the corrupt db causes a SIGBUS
  • Loading branch information
muhamadazmy committed Mar 10, 2023
1 parent 2e87481 commit 3f9ef19
Show file tree
Hide file tree
Showing 3 changed files with 81 additions and 4 deletions.
64 changes: 60 additions & 4 deletions cmds/modules/provisiond/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package provisiond

import (
"bytes"
"context"
"crypto/ed25519"
"fmt"
"os"
"os/exec"
"path/filepath"
"time"

Expand Down Expand Up @@ -66,27 +69,78 @@ var Module cli.Command = cli.Command{
Usage: "connection string to the message `BROKER`",
Value: "unix:///var/run/redis.sock",
},
&cli.StringFlag{
Name: "http",
Usage: "http listen address",
Value: ":2021",
&cli.BoolFlag{
Name: "integrity",
Usage: "run some integrity checks on some files",
},
},
Action: action,
}

// integrityChecks are started in a separate process because
// we found out that some weird db corruption causing the process
// to receive a SIGBUS error
// while we can catch the sigbus and handle it ourselves i thought
// it's better to do it in a separate process to always have a clean
// state
func integrityChecks(ctx context.Context, rootDir string) error {
err := ReportChecks(filepath.Join(rootDir, metricsStorageDB))

if err != nil {
fmt.Fprintln(os.Stderr, err)
}

return err
}

// runChecks starts provisiond with the special flag `--integrity` which runs some
// checks and return an error if checks did not pass.
// if an error is received the db files are cleaned
func runChecks(ctx context.Context, rootDir string) error {
log.Info().Msg("run integrity checks")
ctx, cancel := context.WithTimeout(ctx, 30*time.Second)
defer cancel()

cmd := exec.CommandContext(ctx, os.Args[0], "--root", rootDir, "--integrity")
var buf bytes.Buffer
cmd.Stderr = &buf

cmd.CombinedOutput()
err := cmd.Run()
if err == context.Canceled {
return err
} else if err == nil {
return nil
}

log.Error().Str("stderr", buf.String()).Err(err).Msg("integrity check failed, resetting rrd db")

// other error, we can try to clean up and continue
return os.RemoveAll(filepath.Join(rootDir, metricsStorageDB))
}

func action(cli *cli.Context) error {
var (
msgBrokerCon string = cli.String("broker")
rootDir string = cli.String("root")
integrity bool = cli.Bool("integrity")
)

ctx, _ := utils.WithSignal(context.Background())

if integrity {
return integrityChecks(ctx, rootDir)
}

utils.OnDone(ctx, func(_ error) {
log.Info().Msg("shutting down")
})

// run integrityChecks
if err := runChecks(ctx, rootDir); err != nil {
return errors.Wrap(err, "error running integrity checks")
}

// keep checking if limited-cache flag is set
if app.CheckFlag(app.LimitedCache) {
log.Error().Msg("failed cache reservation! Retrying every 30 seconds...")
Expand Down Expand Up @@ -365,6 +419,8 @@ func action(cli *cli.Context) error {

// also spawn the capacity reporter
go func() {
defer reporter.Close()

for {
err := reporter.Run(ctx)
if err == context.Canceled {
Expand Down
14 changes: 14 additions & 0 deletions cmds/modules/provisiond/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,15 @@ func reportBuilder() interface{} {
return &Report{}
}

func ReportChecks(metricsPath string) error {
rrd, err := rrd.NewRRDBolt(metricsPath, 5*time.Minute, 24*time.Hour)
if err != nil {
return errors.Wrap(err, "failed to create metrics database")
}

return rrd.Close()
}

// NewReporter creates a new capacity reporter
func NewReporter(metricsPath string, cl zbus.Client, root string) (*Reporter, error) {
idMgr := stubs.NewIdentityManagerStub(cl)
Expand Down Expand Up @@ -284,6 +293,11 @@ func (r *Reporter) getLastReportTime() (int64, bool, error) {
return int64(stored), ok, nil
}

func (r *Reporter) Close() {
_ = r.rrd.Close()
_ = r.queue.Close()
}

// Run runs the reporter
func (r *Reporter) Run(ctx context.Context) error {
// go over all user reservations
Expand Down
7 changes: 7 additions & 0 deletions pkg/rrd/rrd.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ type RRD interface {
// Last returns the last reported value for a metric given the metric
// name
Last(key string) (value float64, ok bool, err error)

// Close the db
Close() error
}

type Printer interface {
Expand Down Expand Up @@ -86,6 +89,10 @@ func newRRDBolt(path string, window time.Duration, retention time.Duration) (*rr
}, nil
}

func (r *rrdBolt) Close() error {
return r.db.Close()
}

func (r *rrdBolt) printBucket(bucket *bolt.Bucket, out io.Writer) error {
cur := bucket.Cursor()
for k, v := cur.First(); k != nil; k, v = cur.Next() {
Expand Down

0 comments on commit 3f9ef19

Please sign in to comment.