Skip to content

Commit

Permalink
services/horizon: Add cache toggle and use libary for on-disk caching (
Browse files Browse the repository at this point in the history
…#5197)

* Add a `--history-archive-caching` flag (default=true) to toggle behavior
* Refactor to use a library: fscache
* Hook new metric into prometheus
* Add parallel read test to stress cache
* Add tests for deadlocking and other misc. scenarios
  • Loading branch information
Shaptic authored Feb 8, 2024
1 parent 4ef82e9 commit 02cd784
Show file tree
Hide file tree
Showing 17 changed files with 339 additions and 298 deletions.
5 changes: 4 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ require (
github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2
github.com/aws/aws-sdk-go v1.45.26
github.com/creachadair/jrpc2 v1.1.0
github.com/djherbis/fscache v0.10.1
github.com/elazarl/go-bindata-assetfs v1.0.1
github.com/getsentry/raven-go v0.2.0
github.com/go-chi/chi v4.1.2+incompatible
Expand Down Expand Up @@ -84,6 +85,8 @@ require (
golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect
gopkg.in/djherbis/atime.v1 v1.0.0 // indirect
gopkg.in/djherbis/stream.v1 v1.3.1 // indirect
gopkg.in/ini.v1 v1.67.0 // indirect
)

Expand All @@ -102,7 +105,7 @@ require (
github.com/google/go-cmp v0.5.9 // indirect
github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect
github.com/googleapis/gax-go/v2 v2.12.0 // indirect
github.com/hashicorp/golang-lru v1.0.2
github.com/hashicorp/golang-lru v1.0.2 // indirect
github.com/imkira/go-interpol v1.1.0 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/jmespath/go-jmespath v0.4.0 // indirect
Expand Down
6 changes: 6 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk=
github.com/djherbis/fscache v0.10.1/go.mod h1:yyPYtkNnnPXsW+81lAcQS6yab3G2CRfnPLotBvtbf0c=
github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
Expand Down Expand Up @@ -801,6 +803,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60=
gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8=
gopkg.in/djherbis/stream.v1 v1.3.1 h1:uGfmsOY1qqMjQQphhRBSGLyA9qumJ56exkRu9ASTjCw=
gopkg.in/djherbis/stream.v1 v1.3.1/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg=
gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw=
Expand Down
128 changes: 107 additions & 21 deletions historyarchive/archive.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,12 +11,15 @@ import (
"fmt"
"io"
"net/url"
"os"
"path"
"regexp"
"strconv"
"strings"
"sync"
"time"

fscache "github.com/djherbis/fscache"
log "github.com/sirupsen/logrus"

"github.com/stellar/go/support/errors"
Expand Down Expand Up @@ -50,8 +53,8 @@ type ConnectOptions struct {
CheckpointFrequency uint32
// UserAgent is the value of `User-Agent` header. Applicable only for HTTP client.
UserAgent string
// CacheConfig controls how/if bucket files are cached on the disk.
CacheConfig CacheOptions
// CachePath controls where/if bucket files are cached on the disk.
CachePath string
}

type Ledger struct {
Expand Down Expand Up @@ -117,8 +120,16 @@ type Archive struct {
checkpointManager CheckpointManager

backend ArchiveBackend
cache *ArchiveBucketCache
stats archiveStats

cache *archiveBucketCache
}

type archiveBucketCache struct {
fscache.Cache

path string
sizes sync.Map
}

func (arch *Archive) GetStats() []ArchiveStats {
Expand Down Expand Up @@ -395,23 +406,79 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) {
}

func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) {
if a.cache != nil {
rdr, foundInCache, err := a.cache.GetFile(pth, a.backend)
if !foundInCache {
a.stats.incrementDownloads()
} else {
a.stats.incrementCacheHits()
}
if err == nil {
return rdr, nil
if a.cache == nil {
a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

L := log.WithField("path", pth).WithField("cache", a.cache.path)

rdr, wrtr, err := a.cache.Get(pth)
if err != nil {
L.WithError(err).
WithField("remove", a.cache.Remove(pth)).
Warn("On-disk cache retrieval failed")
a.stats.incrementDownloads()
return a.backend.GetFile(pth)
}

// If a NEW key is being retrieved, it returns a writer to which
// you're expected to write your upstream as well as a reader that
// will read directly from it.
if wrtr != nil {
log.WithField("path", pth).Info("Caching file...")
a.stats.incrementDownloads()
upstreamReader, err := a.backend.GetFile(pth)
if err != nil {
writeErr := wrtr.Close()
readErr := rdr.Close()
removeErr := a.cache.Remove(pth)
// Execution order isn't guaranteed w/in a function call expression
// so we close them with explicit order first.
L.WithError(err).WithFields(log.Fields{
"write-close": writeErr,
"read-close": readErr,
"cache-rm": removeErr,
}).Warn("Download failed, purging from cache")
return nil, err
}

// If there's an error, retry with the uncached backend.
a.cache.Evict(pth)
// Start a goroutine to slurp up the upstream and feed
// it directly to the cache.
go func() {
written, err := io.Copy(wrtr, upstreamReader)
writeErr := wrtr.Close()
readErr := upstreamReader.Close()
fields := log.Fields{
"wr-close": writeErr,
"rd-close": readErr,
}

if err != nil {
L.WithFields(fields).WithError(err).
Warn("Failed to download and cache file")

// Removal must happen *after* handles close.
if removalErr := a.cache.Remove(pth); removalErr != nil {
L.WithError(removalErr).Warn("Removing cached file failed")
}
} else {
L.WithFields(fields).Infof("Cached %dKiB file", written/1024)

// Track how much bandwidth we've saved from caching by saving
// the size of the file we just downloaded.
a.cache.sizes.Store(pth, written)
}
}()
} else {
// Best-effort check to track bandwidth metrics
if written, found := a.cache.sizes.Load(pth); found {
a.stats.incrementCacheBandwidth(written.(int64))
}
a.stats.incrementCacheHits()
}

a.stats.incrementDownloads()
return a.backend.GetFile(pth)
return rdr, nil
}

func (a *Archive) cachedExists(pth string) (bool, error) {
Expand Down Expand Up @@ -468,6 +535,8 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
arch.backend = makeHttpBackend(parsed, opts)
} else if parsed.Scheme == "mock" {
arch.backend = makeMockBackend(opts)
} else if parsed.Scheme == "fmock" {
arch.backend = makeFailingMockBackend(opts)
} else {
err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'")
}
Expand All @@ -476,13 +545,30 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) {
return &arch, err
}

if opts.CacheConfig.Cache {
cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig)
if innerErr != nil {
return &arch, innerErr
if opts.CachePath != "" {
// Set up a <= ~10GiB LRU cache for history archives files
haunter := fscache.NewLRUHaunterStrategy(
fscache.NewLRUHaunter(0, 10<<30, time.Minute /* frequency check */),
)

// Wipe any existing cache on startup
os.RemoveAll(opts.CachePath)
fs, err := fscache.NewFs(opts.CachePath, 0755 /* drwxr-xr-x */)

if err != nil {
return &arch, errors.Wrapf(err,
"creating cache at '%s' with mode 0755 failed",
opts.CachePath)
}

cache, err := fscache.NewCacheWithHaunter(fs, haunter)
if err != nil {
return &arch, errors.Wrapf(err,
"creating cache at '%s' failed",
opts.CachePath)
}

arch.cache = cache
arch.cache = &archiveBucketCache{cache, opts.CachePath, sync.Map{}}
}

arch.stats = archiveStats{backendName: parsed.String()}
Expand Down
Loading

0 comments on commit 02cd784

Please sign in to comment.