diff --git a/go.mod b/go.mod index a7fd2ef71c..17627b7674 100644 --- a/go.mod +++ b/go.mod @@ -12,6 +12,7 @@ require ( github.com/asaskevich/govalidator v0.0.0-20230301143203-a9d515a09cc2 github.com/aws/aws-sdk-go v1.45.26 github.com/creachadair/jrpc2 v1.1.0 + github.com/djherbis/fscache v0.10.1 github.com/elazarl/go-bindata-assetfs v1.0.1 github.com/getsentry/raven-go v0.2.0 github.com/go-chi/chi v4.1.2+incompatible @@ -84,6 +85,8 @@ require ( golang.org/x/xerrors v0.0.0-20220907171357-04be3eba64a2 // indirect google.golang.org/genproto/googleapis/api v0.0.0-20230913181813-007df8e322eb // indirect google.golang.org/genproto/googleapis/rpc v0.0.0-20230920204549-e6e6cdab5c13 // indirect + gopkg.in/djherbis/atime.v1 v1.0.0 // indirect + gopkg.in/djherbis/stream.v1 v1.3.1 // indirect gopkg.in/ini.v1 v1.67.0 // indirect ) @@ -102,7 +105,7 @@ require ( github.com/google/go-cmp v0.5.9 // indirect github.com/google/go-querystring v0.0.0-20160401233042-9235644dd9e5 // indirect github.com/googleapis/gax-go/v2 v2.12.0 // indirect - github.com/hashicorp/golang-lru v1.0.2 + github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/imkira/go-interpol v1.1.0 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/jmespath/go-jmespath v0.4.0 // indirect diff --git a/go.sum b/go.sum index bb1175e120..d2358499b2 100644 --- a/go.sum +++ b/go.sum @@ -104,6 +104,8 @@ github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSs github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= +github.com/djherbis/fscache v0.10.1 h1:hDv+RGyvD+UDKyRYuLoVNbuRTnf2SrA2K3VyR1br9lk= +github.com/djherbis/fscache v0.10.1/go.mod h1:yyPYtkNnnPXsW+81lAcQS6yab3G2CRfnPLotBvtbf0c= github.com/eapache/go-resiliency v1.2.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= @@ -801,6 +803,10 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8 gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20190902080502-41f04d3bba15/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/djherbis/atime.v1 v1.0.0 h1:eMRqB/JrLKocla2PBPKgQYg/p5UG4L6AUAs92aP7F60= +gopkg.in/djherbis/atime.v1 v1.0.0/go.mod h1:hQIUStKmJfvf7xdh/wtK84qe+DsTV5LnA9lzxxtPpJ8= +gopkg.in/djherbis/stream.v1 v1.3.1 h1:uGfmsOY1qqMjQQphhRBSGLyA9qumJ56exkRu9ASTjCw= +gopkg.in/djherbis/stream.v1 v1.3.1/go.mod h1:aEV8CBVRmSpLamVJfM903Npic1IKmb2qS30VAZ+sssg= gopkg.in/errgo.v2 v2.1.0/go.mod h1:hNsd1EY+bozCKY1Ytp96fpM3vjJbqLJn88ws8XvfDNI= gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys= gopkg.in/gavv/httpexpect.v1 v1.0.0-20170111145843-40724cf1e4a0 h1:r5ptJ1tBxVAeqw4CrYWhXIMr0SybY3CDHuIbCg5CFVw= diff --git a/historyarchive/archive.go b/historyarchive/archive.go index d53ab37071..bbd030ffce 100644 --- a/historyarchive/archive.go +++ b/historyarchive/archive.go @@ -11,12 +11,15 @@ import ( "fmt" "io" "net/url" + "os" "path" "regexp" "strconv" "strings" "sync" + "time" + fscache "github.com/djherbis/fscache" log "github.com/sirupsen/logrus" "github.com/stellar/go/support/errors" @@ -50,8 +53,8 @@ type ConnectOptions struct { CheckpointFrequency uint32 // UserAgent is the value of `User-Agent` header. Applicable only for HTTP client. UserAgent string - // CacheConfig controls how/if bucket files are cached on the disk. - CacheConfig CacheOptions + // CachePath controls where/if bucket files are cached on the disk. + CachePath string } type Ledger struct { @@ -117,8 +120,16 @@ type Archive struct { checkpointManager CheckpointManager backend ArchiveBackend - cache *ArchiveBucketCache stats archiveStats + + cache *archiveBucketCache +} + +type archiveBucketCache struct { + fscache.Cache + + path string + sizes sync.Map } func (arch *Archive) GetStats() []ArchiveStats { @@ -395,23 +406,79 @@ func (a *Archive) GetXdrStream(pth string) (*XdrStream, error) { } func (a *Archive) cachedGet(pth string) (io.ReadCloser, error) { - if a.cache != nil { - rdr, foundInCache, err := a.cache.GetFile(pth, a.backend) - if !foundInCache { - a.stats.incrementDownloads() - } else { - a.stats.incrementCacheHits() - } - if err == nil { - return rdr, nil + if a.cache == nil { + a.stats.incrementDownloads() + return a.backend.GetFile(pth) + } + + L := log.WithField("path", pth).WithField("cache", a.cache.path) + + rdr, wrtr, err := a.cache.Get(pth) + if err != nil { + L.WithError(err). + WithField("remove", a.cache.Remove(pth)). + Warn("On-disk cache retrieval failed") + a.stats.incrementDownloads() + return a.backend.GetFile(pth) + } + + // If a NEW key is being retrieved, it returns a writer to which + // you're expected to write your upstream as well as a reader that + // will read directly from it. + if wrtr != nil { + log.WithField("path", pth).Info("Caching file...") + a.stats.incrementDownloads() + upstreamReader, err := a.backend.GetFile(pth) + if err != nil { + writeErr := wrtr.Close() + readErr := rdr.Close() + removeErr := a.cache.Remove(pth) + // Execution order isn't guaranteed w/in a function call expression + // so we close them with explicit order first. + L.WithError(err).WithFields(log.Fields{ + "write-close": writeErr, + "read-close": readErr, + "cache-rm": removeErr, + }).Warn("Download failed, purging from cache") + return nil, err } - // If there's an error, retry with the uncached backend. - a.cache.Evict(pth) + // Start a goroutine to slurp up the upstream and feed + // it directly to the cache. + go func() { + written, err := io.Copy(wrtr, upstreamReader) + writeErr := wrtr.Close() + readErr := upstreamReader.Close() + fields := log.Fields{ + "wr-close": writeErr, + "rd-close": readErr, + } + + if err != nil { + L.WithFields(fields).WithError(err). + Warn("Failed to download and cache file") + + // Removal must happen *after* handles close. + if removalErr := a.cache.Remove(pth); removalErr != nil { + L.WithError(removalErr).Warn("Removing cached file failed") + } + } else { + L.WithFields(fields).Infof("Cached %dKiB file", written/1024) + + // Track how much bandwidth we've saved from caching by saving + // the size of the file we just downloaded. + a.cache.sizes.Store(pth, written) + } + }() + } else { + // Best-effort check to track bandwidth metrics + if written, found := a.cache.sizes.Load(pth); found { + a.stats.incrementCacheBandwidth(written.(int64)) + } + a.stats.incrementCacheHits() } - a.stats.incrementDownloads() - return a.backend.GetFile(pth) + return rdr, nil } func (a *Archive) cachedExists(pth string) (bool, error) { @@ -468,6 +535,8 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) { arch.backend = makeHttpBackend(parsed, opts) } else if parsed.Scheme == "mock" { arch.backend = makeMockBackend(opts) + } else if parsed.Scheme == "fmock" { + arch.backend = makeFailingMockBackend(opts) } else { err = errors.New("unknown URL scheme: '" + parsed.Scheme + "'") } @@ -476,13 +545,30 @@ func Connect(u string, opts ConnectOptions) (*Archive, error) { return &arch, err } - if opts.CacheConfig.Cache { - cache, innerErr := MakeArchiveBucketCache(opts.CacheConfig) - if innerErr != nil { - return &arch, innerErr + if opts.CachePath != "" { + // Set up a <= ~10GiB LRU cache for history archives files + haunter := fscache.NewLRUHaunterStrategy( + fscache.NewLRUHaunter(0, 10<<30, time.Minute /* frequency check */), + ) + + // Wipe any existing cache on startup + os.RemoveAll(opts.CachePath) + fs, err := fscache.NewFs(opts.CachePath, 0755 /* drwxr-xr-x */) + + if err != nil { + return &arch, errors.Wrapf(err, + "creating cache at '%s' with mode 0755 failed", + opts.CachePath) + } + + cache, err := fscache.NewCacheWithHaunter(fs, haunter) + if err != nil { + return &arch, errors.Wrapf(err, + "creating cache at '%s' failed", + opts.CachePath) } - arch.cache = cache + arch.cache = &archiveBucketCache{cache, opts.CachePath, sync.Map{}} } arch.stats = archiveStats{backendName: parsed.String()} diff --git a/historyarchive/archive_cache.go b/historyarchive/archive_cache.go deleted file mode 100644 index fa279fffd2..0000000000 --- a/historyarchive/archive_cache.go +++ /dev/null @@ -1,225 +0,0 @@ -package historyarchive - -import ( - "io" - "os" - "path" - - lru "github.com/hashicorp/golang-lru" - log "github.com/stellar/go/support/log" -) - -type CacheOptions struct { - Cache bool - - Path string - MaxFiles uint - Log *log.Entry -} - -type ArchiveBucketCache struct { - path string - lru *lru.Cache - log *log.Entry -} - -// MakeArchiveBucketCache creates a cache on the disk at the given path that -// acts as an LRU cache, mimicking a particular upstream. -func MakeArchiveBucketCache(opts CacheOptions) (*ArchiveBucketCache, error) { - log_ := opts.Log - if opts.Log == nil { - log_ = log.WithField("subservice", "fs-cache") - } - log_ = log_. - WithField("path", opts.Path). - WithField("cap", opts.MaxFiles) - - if _, err := os.Stat(opts.Path); err == nil || os.IsExist(err) { - log_.Warnf("Cache directory already exists, removing") - os.RemoveAll(opts.Path) - } - - backend := &ArchiveBucketCache{ - path: opts.Path, - log: log_, - } - - cache, err := lru.NewWithEvict(int(opts.MaxFiles), backend.onEviction) - if err != nil { - return &ArchiveBucketCache{}, err - } - backend.lru = cache - - log_.Info("Bucket cache initialized") - return backend, nil -} - -// GetFile retrieves the file contents from the local cache if present. -// Otherwise, it returns the same result as the upstream, adding that result -// into the local cache if possible. It returns a 3-tuple of a reader (which may -// be nil on an error), an indication of whether or not it was *found* in the -// cache, and any error. -func (abc *ArchiveBucketCache) GetFile( - filepath string, - upstream ArchiveBackend, -) (io.ReadCloser, bool, error) { - L := abc.log.WithField("key", filepath) - localPath := path.Join(abc.path, filepath) - - // If the lockfile exists, we should defer to the remote source but *not* - // update the cache, as it means there's an in-progress sync of the same - // file. - _, statErr := os.Stat(NameLockfile(localPath)) - if statErr == nil || os.IsExist(statErr) { - L.Info("Incomplete file in on-disk cache: deferring") - reader, err := upstream.GetFile(filepath) - return reader, false, err - } else if _, ok := abc.lru.Get(localPath); !ok { - L.Info("File does not exist in the cache: downloading") - - // Since it's not on-disk, pull it from the remote backend, shove it - // into the cache, and write it to disk. - remote, err := upstream.GetFile(filepath) - if err != nil { - return remote, false, err - } - - local, err := abc.createLocal(filepath) - if err != nil { - // If there's some local FS error, we can still continue with the - // remote version, so just log it and continue. - L.WithError(err).Warn("Creating cache file failed") - return remote, false, nil - } - - return teeReadCloser(remote, local, func() error { - L.Debug("Download complete: removing lockfile") - return os.Remove(NameLockfile(localPath)) - }), false, nil - } - - L.Info("Found file in cache") - // The cache claims it exists, so just give it a read and send it. - local, err := os.Open(localPath) - if err != nil { - // Uh-oh, the cache and the disk are not in sync somehow? Let's evict - // this value and try again (recurse) w/ the remote version. - L.WithError(err).Warn("Opening cached file failed") - abc.lru.Remove(localPath) - return abc.GetFile(filepath, upstream) - } - - return local, true, nil -} - -func (abc *ArchiveBucketCache) Exists(filepath string) bool { - localPath := path.Join(abc.path, filepath) - - // First, check if the file exists in the cache. - if abc.lru.Contains(localPath) { - return true - } - - // If it doesn't, it may still exist on the disk which is still a cheaper - // check than going upstream. - // - // Note that this means the cache and disk are out of sync (perhaps due to - // other archives using the same cache location) so we can update it. This - // situation is well-handled by `GetFile`. - _, statErr := os.Stat(localPath) - if statErr == nil || os.IsExist(statErr) { - abc.lru.Add(localPath, struct{}{}) - return true - } - - return false -} - -// Close purges the cache and cleans up the filesystem. -func (abc *ArchiveBucketCache) Close() error { - abc.lru.Purge() - return os.RemoveAll(abc.path) -} - -// Evict removes a file from the cache and the filesystem. -func (abc *ArchiveBucketCache) Evict(filepath string) { - log.WithField("key", filepath).Info("Evicting file from the disk") - abc.lru.Remove(path.Join(abc.path, filepath)) -} - -func (abc *ArchiveBucketCache) onEviction(key, value interface{}) { - path := key.(string) - os.Remove(NameLockfile(path)) // just in case - if err := os.Remove(path); err != nil { // best effort removal - abc.log.WithError(err). - WithField("key", path). - Warn("Removal failed after cache eviction") - } -} - -func (abc *ArchiveBucketCache) createLocal(filepath string) (*os.File, error) { - localPath := path.Join(abc.path, filepath) - if err := os.MkdirAll(path.Dir(localPath), 0755 /* drwxr-xr-x */); err != nil { - return nil, err - } - - local, err := os.Create(localPath) /* mode -rw-rw-rw- */ - if err != nil { - return nil, err - } - _, err = os.Create(NameLockfile(localPath)) - if err != nil { - return nil, err - } - - abc.lru.Add(localPath, struct{}{}) // just use the cache as an array - return local, nil -} - -func NameLockfile(file string) string { - return file + ".lock" -} - -// The below is a helper interface so that we can use io.TeeReader to write -// data locally immediately as we read it remotely. - -type trc struct { - io.Reader - close func() error - closed bool // prevents a double-close -} - -func (t trc) Close() error { - if t.closed { - return nil - } - - return t.close() -} - -func teeReadCloser(r io.ReadCloser, w io.WriteCloser, onClose func() error) io.ReadCloser { - closer := trc{ - Reader: io.TeeReader(r, w), - closed: false, - } - closer.close = func() error { - if closer.closed { - return nil - } - - // Always run all closers, but return the first error - err1 := r.Close() - err2 := w.Close() - err3 := onClose() - - closer.closed = true - if err1 != nil { - return err1 - } else if err2 != nil { - return err2 - } - return err3 - } - - return closer -} diff --git a/historyarchive/archive_test.go b/historyarchive/archive_test.go index 4518315f3e..5f4fc00f4c 100644 --- a/historyarchive/archive_test.go +++ b/historyarchive/archive_test.go @@ -18,12 +18,17 @@ import ( "os" "path/filepath" "strings" + "sync" "testing" + "time" "github.com/stellar/go/xdr" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" ) +var cachePath = filepath.Join(os.TempDir(), "history-archive-test-cache") + func GetTestS3Archive() *Archive { mx := big.NewInt(0xffffffff) r, e := rand.Int(rand.Reader, mx) @@ -44,11 +49,8 @@ func GetTestS3Archive() *Archive { func GetTestMockArchive() *Archive { return MustConnect("mock://test", ConnectOptions{ CheckpointFrequency: 64, - CacheConfig: CacheOptions{ - Cache: true, - Path: filepath.Join(os.TempDir(), "history-archive-test-cache"), - MaxFiles: 5, - }}) + CachePath: cachePath, + }) } var tmpdirs []string @@ -554,7 +556,95 @@ func TestGetLedgers(t *testing.T) { assert.Equal(t, uint32(1), archive.GetStats()[0].GetRequests()) assert.Equal(t, uint32(0), archive.GetStats()[0].GetDownloads()) assert.EqualError(t, err, "checkpoint 1023 is not published") + ledgerHeaders, transactions, results := makeFakeArchive(t, archive) + + stats := archive.GetStats()[0] + ledgers, err := archive.GetLedgers(1000, 1002) + + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + // it started at 1, incurred 6 requests total: 3 queries + 3 downloads + assert.EqualValues(t, 7, stats.GetRequests()) + // started 0, incurred 3 file downloads + assert.EqualValues(t, 3, stats.GetDownloads()) + assert.EqualValues(t, 0, stats.GetCacheHits()) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + // Repeat the same check but ensure the cache was used + ledgers, err = archive.GetLedgers(1000, 1002) // all cached + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + // downloads should not change because of the cache + assert.EqualValues(t, 3, stats.GetDownloads()) + // but requests increase because of 3 fetches to categories + assert.EqualValues(t, 10, stats.GetRequests()) + assert.EqualValues(t, 3, stats.GetCacheHits()) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + // remove the cached files without informing it and ensure it fills up again + require.NoError(t, os.RemoveAll(cachePath)) + ledgers, err = archive.GetLedgers(1000, 1002) // uncached, refetch + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + + // downloads should increase again + assert.EqualValues(t, 6, stats.GetDownloads()) + assert.EqualValues(t, 3, stats.GetCacheHits()) +} + +func TestStressfulGetLedgers(t *testing.T) { + archive := GetTestMockArchive() + ledgerHeaders, transactions, results := makeFakeArchive(t, archive) + + var wg sync.WaitGroup + for i := 0; i < 10; i++ { + wg.Add(1) + + go func() { + time.Sleep(time.Millisecond) // encourage interleaved execution + ledgers, err := archive.GetLedgers(1000, 1002) + assert.NoError(t, err) + assert.Len(t, ledgers, 3) + for i, seq := range []uint32{1000, 1001, 1002} { + ledger := ledgers[seq] + assertXdrEquals(t, ledgerHeaders[i], ledger.Header) + assertXdrEquals(t, transactions[i], ledger.Transaction) + assertXdrEquals(t, results[i], ledger.TransactionResult) + } + + wg.Done() + }() + } + + require.Eventually(t, func() bool { wg.Wait(); return true }, time.Minute, time.Second) +} + +func TestCacheDeadlocks(t *testing.T) { + archive := MustConnect("fmock://test", ConnectOptions{ + CheckpointFrequency: 64, + CachePath: cachePath, + }) + makeFakeArchive(t, archive) + _, err := archive.GetLedgers(1000, 1002) + require.Error(t, err) +} + +func makeFakeArchive(t *testing.T, archive *Archive) ( + []xdr.LedgerHeaderHistoryEntry, + []xdr.TransactionHistoryEntry, + []xdr.TransactionHistoryResultEntry, +) { ledgerHeaders := []xdr.LedgerHeaderHistoryEntry{ { Hash: xdr.Hash{1}, @@ -637,36 +727,5 @@ func TestGetLedgers(t *testing.T) { []xdrEntry{results[0], results[1], results[2]}, ) - stats := archive.GetStats()[0] - ledgers, err := archive.GetLedgers(1000, 1002) - - assert.NoError(t, err) - assert.Len(t, ledgers, 3) - // it started at 1, incurred 6 requests total, 3 queries, 3 downloads - assert.EqualValues(t, 7, stats.GetRequests()) - // started 0, incurred 3 file downloads - assert.EqualValues(t, 3, stats.GetDownloads()) - for i, seq := range []uint32{1000, 1001, 1002} { - ledger := ledgers[seq] - assertXdrEquals(t, ledgerHeaders[i], ledger.Header) - assertXdrEquals(t, transactions[i], ledger.Transaction) - assertXdrEquals(t, results[i], ledger.TransactionResult) - } - - // Repeat the same check but ensure the cache was used - ledgers, err = archive.GetLedgers(1000, 1002) // all cached - assert.NoError(t, err) - assert.Len(t, ledgers, 3) - - // downloads should not change because of the cache - assert.EqualValues(t, 3, stats.GetDownloads()) - // but requests increase because of 3 fetches to categories - assert.EqualValues(t, 10, stats.GetRequests()) - assert.EqualValues(t, 3, stats.GetCacheHits()) - for i, seq := range []uint32{1000, 1001, 1002} { - ledger := ledgers[seq] - assertXdrEquals(t, ledgerHeaders[i], ledger.Header) - assertXdrEquals(t, transactions[i], ledger.Transaction) - assertXdrEquals(t, results[i], ledger.TransactionResult) - } + return ledgerHeaders, transactions, results } diff --git a/historyarchive/failing_mock_archive.go b/historyarchive/failing_mock_archive.go new file mode 100644 index 0000000000..5966cb30e7 --- /dev/null +++ b/historyarchive/failing_mock_archive.go @@ -0,0 +1,76 @@ +package historyarchive + +import ( + "io" + + "github.com/stellar/go/support/errors" +) + +// FailingMockArchiveBackend is a mocking backend that will fail only when you +// try to read but otherwise behave like MockArchiveBackend. +type FailingMockArchiveBackend struct { + files map[string][]byte +} + +func (b *FailingMockArchiveBackend) Exists(pth string) (bool, error) { + _, ok := b.files[pth] + return ok, nil +} + +func (b *FailingMockArchiveBackend) Size(pth string) (int64, error) { + f, ok := b.files[pth] + sz := int64(0) + if ok { + sz = int64(len(f)) + } + return sz, nil +} + +func (b *FailingMockArchiveBackend) GetFile(pth string) (io.ReadCloser, error) { + data, ok := b.files[pth] + if !ok { + return nil, errors.New("file does not exist") + } + + fr := FakeReader{} + fr.data = make([]byte, len(data)) + copy(fr.data[:], data[:]) + return &fr, nil +} + +func (b *FailingMockArchiveBackend) PutFile(pth string, in io.ReadCloser) error { + buf, e := io.ReadAll(in) + if e != nil { + return e + } + b.files[pth] = buf + return nil +} + +func (b *FailingMockArchiveBackend) ListFiles(pth string) (chan string, chan error) { + return nil, nil +} + +func (b *FailingMockArchiveBackend) CanListFiles() bool { + return false +} + +func makeFailingMockBackend(opts ConnectOptions) ArchiveBackend { + b := new(FailingMockArchiveBackend) + b.files = make(map[string][]byte) + return b +} + +type FakeReader struct { + data []byte +} + +func (fr *FakeReader) Read(b []byte) (int, error) { + return 0, io.ErrClosedPipe +} + +func (fr *FakeReader) Close() error { + return nil +} + +var _ io.ReadCloser = &FakeReader{} diff --git a/historyarchive/mocks.go b/historyarchive/mocks.go index fe497ec36e..fa5716e5de 100644 --- a/historyarchive/mocks.go +++ b/historyarchive/mocks.go @@ -137,3 +137,8 @@ func (m *MockArchiveStats) GetCacheHits() uint32 { a := m.Called() return a.Get(0).(uint32) } + +func (m *MockArchiveStats) GetCacheBandwidth() uint64 { + a := m.Called() + return a.Get(0).(uint64) +} diff --git a/historyarchive/stats.go b/historyarchive/stats.go index c182853d1b..6dbf8ceed2 100644 --- a/historyarchive/stats.go +++ b/historyarchive/stats.go @@ -8,6 +8,7 @@ type archiveStats struct { fileDownloads atomic.Uint32 fileUploads atomic.Uint32 cacheHits atomic.Uint32 + cacheBw atomic.Uint64 backendName string } @@ -16,6 +17,7 @@ type ArchiveStats interface { GetDownloads() uint32 GetUploads() uint32 GetCacheHits() uint32 + GetCacheBandwidth() uint64 GetBackendName() string } @@ -37,6 +39,10 @@ func (as *archiveStats) incrementCacheHits() { as.cacheHits.Add(1) } +func (as *archiveStats) incrementCacheBandwidth(bytes int64) { + as.cacheBw.Add(uint64(bytes)) +} + func (as *archiveStats) GetRequests() uint32 { return as.requests.Load() } @@ -55,3 +61,6 @@ func (as *archiveStats) GetBackendName() string { func (as *archiveStats) GetCacheHits() uint32 { return as.cacheHits.Load() } +func (as *archiveStats) GetCacheBandwidth() uint64 { + return as.cacheBw.Load() +} diff --git a/historyarchive/xdrstream.go b/historyarchive/xdrstream.go index de8efc3bb6..313c600f8b 100644 --- a/historyarchive/xdrstream.go +++ b/historyarchive/xdrstream.go @@ -107,7 +107,7 @@ func (x *XdrStream) ExpectedHash() ([sha256.Size]byte, bool) { func (x *XdrStream) Close() error { if x.validateHash { // Read all remaining data from rdr - _, err := io.Copy(ioutil.Discard, x.rdr) + _, err := io.Copy(io.Discard, x.rdr) if err != nil { // close the internal readers to avoid memory leaks x.closeReaders() diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 07bbf975fa..7d14ca314e 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -407,6 +407,7 @@ func runDBReingestRange(ledgerRanges []history.LedgerRange, reingestForce bool, ingestConfig := ingest.Config{ NetworkPassphrase: config.NetworkPassphrase, HistoryArchiveURLs: config.HistoryArchiveURLs, + HistoryArchiveCaching: config.HistoryArchiveCaching, CheckpointFrequency: config.CheckpointFrequency, ReingestEnabled: true, MaxReingestRetries: int(retries), diff --git a/services/horizon/cmd/ingest.go b/services/horizon/cmd/ingest.go index 3833dba7fd..18452dc74a 100644 --- a/services/horizon/cmd/ingest.go +++ b/services/horizon/cmd/ingest.go @@ -128,6 +128,7 @@ var ingestVerifyRangeCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, CheckpointFrequency: globalConfig.CheckpointFrequency, @@ -210,6 +211,7 @@ var ingestStressTestCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, RoundingSlippageFilter: globalConfig.RoundingSlippageFilter, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, @@ -349,6 +351,7 @@ var ingestBuildStateCmd = &cobra.Command{ NetworkPassphrase: globalConfig.NetworkPassphrase, HistorySession: horizonSession, HistoryArchiveURLs: globalConfig.HistoryArchiveURLs, + HistoryArchiveCaching: globalConfig.HistoryArchiveCaching, CaptiveCoreBinaryPath: globalConfig.CaptiveCoreBinaryPath, CaptiveCoreConfigUseDB: globalConfig.CaptiveCoreConfigUseDB, CheckpointFrequency: globalConfig.CheckpointFrequency, diff --git a/services/horizon/internal/config.go b/services/horizon/internal/config.go index 8fb31075b8..54f843b810 100644 --- a/services/horizon/internal/config.go +++ b/services/horizon/internal/config.go @@ -27,6 +27,7 @@ type Config struct { CaptiveCoreStoragePath string CaptiveCoreReuseStoragePath bool CaptiveCoreConfigUseDB bool + HistoryArchiveCaching bool StellarCoreURL string diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index eb229c65b2..87deb28c48 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -51,6 +51,9 @@ const ( NetworkPassphraseFlagName = "network-passphrase" // HistoryArchiveURLsFlagName is the command line flag for specifying the history archive URLs HistoryArchiveURLsFlagName = "history-archive-urls" + // HistoryArchiveCaching is the flag for controlling whether or not there's + // an on-disk cache for history archive downloads + HistoryArchiveCachingFlagName = "history-archive-caching" // NetworkFlagName is the command line flag for specifying the "network" NetworkFlagName = "network" // EnableIngestionFilteringFlagName is the command line flag for enabling the experimental ingestion filtering feature (now enabled by default) @@ -236,11 +239,7 @@ func Flags() (*Config, support.ConfigOptions) { OptType: types.Bool, FlagDefault: true, Required: false, - Usage: `when enabled, Horizon ingestion will instruct the captive - core invocation to use an external db url for ledger states rather than in memory(RAM).\n - Will result in several GB of space shifting out of RAM and to the external db persistence.\n - The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or\n - or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`, + Usage: `when enabled, Horizon ingestion will instruct the captive core invocation to use an external db url for ledger states rather than in memory(RAM). Will result in several GB of space shifting out of RAM and to the external db persistence. The external db url is determined by the presence of DATABASE parameter in the captive-core-config-path or if absent, the db will default to sqlite and the db file will be stored at location derived from captive-core-storage-path parameter.`, CustomSetValue: func(opt *support.ConfigOption) error { if val := viper.GetBool(opt.Name); val { config.CaptiveCoreConfigUseDB = val @@ -372,6 +371,14 @@ func Flags() (*Config, support.ConfigOptions) { Usage: "comma-separated list of stellar history archives to connect with", UsedInCommands: IngestionCommands, }, + &support.ConfigOption{ + Name: HistoryArchiveCachingFlagName, + ConfigKey: &config.HistoryArchiveCaching, + OptType: types.Bool, + FlagDefault: true, + Usage: "adds caching for history archive downloads (requires an add'l 10GB of disk space on mainnet)", + UsedInCommands: IngestionCommands, + }, &support.ConfigOption{ Name: "port", ConfigKey: &config.Port, diff --git a/services/horizon/internal/ingest/fsm.go b/services/horizon/internal/ingest/fsm.go index e0c667b033..892868e5b9 100644 --- a/services/horizon/internal/ingest/fsm.go +++ b/services/horizon/internal/ingest/fsm.go @@ -595,6 +595,11 @@ func addHistoryArchiveStatsMetrics(s *system, stats []historyarchive.ArchiveStat "source": historyServerStat.GetBackendName(), "type": "cache_hits"}). Add(float64(historyServerStat.GetCacheHits())) + s.Metrics().HistoryArchiveStatsCounter. + With(prometheus.Labels{ + "source": historyServerStat.GetBackendName(), + "type": "cache_bandwidth"}). + Add(float64(historyServerStat.GetCacheBandwidth())) } } diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index 7dfaea366e..91bbf0b10f 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -87,8 +87,9 @@ type Config struct { CaptiveCoreConfigUseDB bool NetworkPassphrase string - HistorySession db.SessionInterface - HistoryArchiveURLs []string + HistorySession db.SessionInterface + HistoryArchiveURLs []string + HistoryArchiveCaching bool DisableStateVerification bool EnableReapLookupTables bool @@ -222,6 +223,11 @@ type system struct { func NewSystem(config Config) (System, error) { ctx, cancel := context.WithCancel(context.Background()) + cachingPath := "" + if config.HistoryArchiveCaching { + cachingPath = path.Join(config.CaptiveCoreStoragePath, "bucket-cache") + } + archive, err := historyarchive.NewArchivePool( config.HistoryArchiveURLs, historyarchive.ConnectOptions{ @@ -229,12 +235,7 @@ func NewSystem(config Config) (System, error) { NetworkPassphrase: config.NetworkPassphrase, CheckpointFrequency: config.CheckpointFrequency, UserAgent: fmt.Sprintf("horizon/%s golang/%s", apkg.Version(), runtime.Version()), - CacheConfig: historyarchive.CacheOptions{ - Cache: true, - Path: path.Join(config.CaptiveCoreStoragePath, "bucket-cache"), - Log: log.WithField("subservice", "ha-cache"), - MaxFiles: 150, - }, + CachePath: cachingPath, }, ) if err != nil { diff --git a/services/horizon/internal/ingest/resume_state_test.go b/services/horizon/internal/ingest/resume_state_test.go index f1f8b2ce2a..985391883f 100644 --- a/services/horizon/internal/ingest/resume_state_test.go +++ b/services/horizon/internal/ingest/resume_state_test.go @@ -267,6 +267,7 @@ func (s *ResumeTestTestSuite) mockSuccessfulIngestion() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() s.runner.On("RunAllProcessorsOnLedger", mock.AnythingOfType("xdr.LedgerCloseMeta")). @@ -384,6 +385,7 @@ func (s *ResumeTestTestSuite) TestReapingObjectsDisabled() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() // Reap lookup tables not executed @@ -434,6 +436,7 @@ func (s *ResumeTestTestSuite) TestErrorReapingObjectsIgnored() { mockStats.On("GetRequests").Return(uint32(0)) mockStats.On("GetUploads").Return(uint32(0)) mockStats.On("GetCacheHits").Return(uint32(0)) + mockStats.On("GetCacheBandwidth").Return(uint64(0)) s.historyAdapter.On("GetStats").Return([]historyarchive.ArchiveStats{mockStats}).Once() next, err := resumeState{latestSuccessfullyProcessedLedger: 100}.run(s.system) diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index d4b34f9f4d..60ba7b6c2a 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -97,6 +97,7 @@ func initIngester(app *App) { ), NetworkPassphrase: app.config.NetworkPassphrase, HistoryArchiveURLs: app.config.HistoryArchiveURLs, + HistoryArchiveCaching: app.config.HistoryArchiveCaching, CheckpointFrequency: app.config.CheckpointFrequency, StellarCoreURL: app.config.StellarCoreURL, CaptiveCoreBinaryPath: app.config.CaptiveCoreBinaryPath,