diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index 2957a0e956..c170ace285 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -53,6 +53,12 @@ Bucket::isIndexed() const return static_cast(mIndex); } +std::optional> +Bucket::getOfferRange() const +{ + return getIndex().getOfferRange(); +} + void Bucket::setIndex(std::unique_ptr&& index) { @@ -135,13 +141,14 @@ Bucket::apply(Application& app) const { ZoneScoped; + std::unordered_set emptySet; BucketApplicator applicator( app, app.getConfig().LEDGER_PROTOCOL_VERSION, 0 /*set to 0 so we always load from the parent to check state*/, 0 /*set to a level that's not the bottom so we don't treat live entries as init*/ , - shared_from_this(), [](LedgerEntryType) { return true; }); + shared_from_this(), [](LedgerEntryType) { return true; }, emptySet); BucketApplicator::Counters counters(app.getClock().now()); while (applicator) { diff --git a/src/bucket/Bucket.h b/src/bucket/Bucket.h index 17b5951921..835f26b284 100644 --- a/src/bucket/Bucket.h +++ b/src/bucket/Bucket.h @@ -82,6 +82,11 @@ class Bucket : public std::enable_shared_from_this, // Returns true if bucket is indexed, false otherwise bool isIndexed() const; + // Returns [lowerBound, upperBound) of file offsets for all offers in the + // bucket, or std::nullopt if no offers exist + std::optional> + getOfferRange() const; + // Sets index, throws if index is already set void setIndex(std::unique_ptr&& index); diff --git a/src/bucket/BucketApplicator.cpp b/src/bucket/BucketApplicator.cpp index 701528b08c..be2d36b29e 100644 --- a/src/bucket/BucketApplicator.cpp +++ b/src/bucket/BucketApplicator.cpp @@ -21,13 +21,15 @@ BucketApplicator::BucketApplicator(Application& app, uint32_t minProtocolVersionSeen, uint32_t level, std::shared_ptr bucket, - std::function filter) + std::function filter, + std::unordered_set& seenKeys) : mApp(app) , mMaxProtocolVersion(maxProtocolVersion) , mMinProtocolVersionSeen(minProtocolVersionSeen) , mLevel(level) , mBucketIter(bucket) , mEntryTypeFilter(filter) + , mSeenKeys(seenKeys) { auto protocolVersion = mBucketIter.getMetadata().ledgerVersion; if (protocolVersion > mMaxProtocolVersion) @@ -37,11 +39,33 @@ BucketApplicator::BucketApplicator(Application& app, "bucket protocol version {:d} exceeds maxProtocolVersion {:d}"), protocolVersion, mMaxProtocolVersion)); } + + // Only apply offers if BucketListDB is enabled + if (mApp.getConfig().isUsingBucketListDB() && !bucket->isEmpty()) + { + auto offsetOp = bucket->getOfferRange(); + if (offsetOp) + { + auto [lowOffset, highOffset] = *offsetOp; + mBucketIter.seek(lowOffset); + mUpperBoundOffset = highOffset; + } + else + { + // No offers in Bucket + mOffersRemaining = false; + } + } } BucketApplicator::operator bool() const { - return (bool)mBucketIter; + // There is more work to do (i.e. (bool) *this == true) iff: + // 1. The underlying bucket iterator is not EOF and + // 2. Either BucketListDB is not enabled (so we must apply all entry types) + // or BucketListDB is enabled and we have offers still remaining. + return static_cast(mBucketIter) && + (!mApp.getConfig().isUsingBucketListDB() || mOffersRemaining); } size_t @@ -99,11 +123,43 @@ BucketApplicator::advance(BucketApplicator::Counters& counters) for (; mBucketIter; ++mBucketIter) { + // Note: mUpperBoundOffset is not inclusive. However, mBucketIter.pos() + // returns the file offset at the end of the currently loaded entry. + // This means we must read until pos is strictly greater than the upper + // bound so that we don't skip the last offer in the range. + auto isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); + if (isUsingBucketListDB && mBucketIter.pos() > mUpperBoundOffset) + { + mOffersRemaining = false; + break; + } + BucketEntry const& e = *mBucketIter; Bucket::checkProtocolLegality(e, mMaxProtocolVersion); if (shouldApplyEntry(mEntryTypeFilter, e)) { + if (isUsingBucketListDB) + { + if (e.type() == LIVEENTRY || e.type() == INITENTRY) + { + auto [_, wasInserted] = + mSeenKeys.emplace(LedgerEntryKey(e.liveEntry())); + + // Skip seen keys + if (!wasInserted) + { + continue; + } + } + else + { + // Only apply INIT and LIVE entries + mSeenKeys.emplace(e.deadEntry()); + continue; + } + } + counters.mark(e); if (e.type() == LIVEENTRY || e.type() == INITENTRY) @@ -148,6 +204,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters) } else { + releaseAssertOrThrow(!isUsingBucketListDB); if (protocolVersionIsBefore( mMinProtocolVersionSeen, Bucket:: diff --git a/src/bucket/BucketApplicator.h b/src/bucket/BucketApplicator.h index 5e1b257c5f..79e74d1dce 100644 --- a/src/bucket/BucketApplicator.h +++ b/src/bucket/BucketApplicator.h @@ -6,6 +6,7 @@ #include "bucket/Bucket.h" #include "bucket/BucketInputIterator.h" +#include "ledger/LedgerHashUtils.h" #include "util/Timer.h" #include "util/XDRStream.h" #include @@ -28,6 +29,9 @@ class BucketApplicator BucketInputIterator mBucketIter; size_t mCount{0}; std::function mEntryTypeFilter; + std::unordered_set& mSeenKeys; + std::streamoff mUpperBoundOffset; + bool mOffersRemaining{true}; public: class Counters @@ -63,10 +67,16 @@ class BucketApplicator VirtualClock::time_point now); }; + // If newOffersOnly is true, only offers are applied. Additionally, the + // offer is only applied iff: + // 1. They are of type INITENTRY or LIVEENTRY + // 2. The LedgerKey is not in seenKeys + // When this flag is set, each offer key read is added to seenKeys BucketApplicator(Application& app, uint32_t maxProtocolVersion, uint32_t minProtocolVersionSeen, uint32_t level, std::shared_ptr bucket, - std::function filter); + std::function filter, + std::unordered_set& seenKeys); operator bool() const; size_t advance(Counters& counters); diff --git a/src/bucket/BucketIndex.h b/src/bucket/BucketIndex.h index 58daf99903..4bbae7e09c 100644 --- a/src/bucket/BucketIndex.h +++ b/src/bucket/BucketIndex.h @@ -118,6 +118,11 @@ class BucketIndex : public NonMovableOrCopyable virtual std::vector const& getPoolIDsByAsset(Asset const& asset) const = 0; + // Returns lower bound and upper bound for offer entry positions in the + // given bucket, or std::nullopt if no offers exist + virtual std::optional> + getOfferRange() const = 0; + // Returns page size for index. InidividualIndex returns 0 for page size virtual std::streamoff getPageSize() const = 0; diff --git a/src/bucket/BucketIndexImpl.cpp b/src/bucket/BucketIndexImpl.cpp index a24f6ae515..7d0d10479c 100644 --- a/src/bucket/BucketIndexImpl.cpp +++ b/src/bucket/BucketIndexImpl.cpp @@ -450,16 +450,9 @@ BucketIndexImpl::scan(Iterator start, LedgerKey const& k) const template std::optional> -BucketIndexImpl::getPoolshareTrustlineRange( - AccountID const& accountID) const +BucketIndexImpl::getOffsetBounds(LedgerKey const& lowerBound, + LedgerKey const& upperBound) const { - // Get the smallest and largest possible trustline keys for the given - // accountID - auto upperBound = getDummyPoolShareTrustlineKey( - accountID, std::numeric_limits::max()); - auto lowerBound = getDummyPoolShareTrustlineKey( - accountID, std::numeric_limits::min()); - // Get the index iterators for the bounds auto startIter = std::lower_bound( mData.keysToOffset.begin(), mData.keysToOffset.end(), lowerBound, @@ -469,9 +462,9 @@ BucketIndexImpl::getPoolshareTrustlineRange( return std::nullopt; } - auto endIter = - std::upper_bound(startIter, mData.keysToOffset.end(), upperBound, - upper_bound_pred); + auto endIter = std::upper_bound( + std::next(startIter), mData.keysToOffset.end(), upperBound, + upper_bound_pred); // Get file offsets based on lower and upper bound iterators std::streamoff startOff = startIter->second; @@ -501,6 +494,39 @@ BucketIndexImpl::getPoolIDsByAsset(Asset const& asset) const return iter->second; } +template +std::optional> +BucketIndexImpl::getPoolshareTrustlineRange( + AccountID const& accountID) const +{ + // Get the smallest and largest possible trustline keys for the given + // accountID + auto upperBound = getDummyPoolShareTrustlineKey( + accountID, std::numeric_limits::max()); + auto lowerBound = getDummyPoolShareTrustlineKey( + accountID, std::numeric_limits::min()); + + return getOffsetBounds(lowerBound, upperBound); +} + +template +std::optional> +BucketIndexImpl::getOfferRange() const +{ + // Get the smallest and largest possible offer keys + LedgerKey upperBound(OFFER); + upperBound.offer().sellerID.ed25519().fill( + std::numeric_limits::max()); + upperBound.offer().offerID = std::numeric_limits::max(); + + LedgerKey lowerBound(OFFER); + lowerBound.offer().sellerID.ed25519().fill( + std::numeric_limits::min()); + lowerBound.offer().offerID = std::numeric_limits::min(); + + return getOffsetBounds(lowerBound, upperBound); +} + #ifdef BUILD_TESTS template bool diff --git a/src/bucket/BucketIndexImpl.h b/src/bucket/BucketIndexImpl.h index 369fdf5690..b265582fc7 100644 --- a/src/bucket/BucketIndexImpl.h +++ b/src/bucket/BucketIndexImpl.h @@ -66,6 +66,12 @@ template class BucketIndexImpl : public BucketIndex // Saves index to disk, overwriting any preexisting file for this index void saveToDisk(BucketManager& bm, Hash const& hash) const; + // Returns [lowFileOffset, highFileOffset) that contain the key ranges + // [lowerBound, upperBound]. If no file offsets exist, returns [0, 0] + std::optional> + getOffsetBounds(LedgerKey const& lowerBound, + LedgerKey const& upperBound) const; + friend BucketIndex; public: @@ -81,6 +87,9 @@ template class BucketIndexImpl : public BucketIndex virtual std::vector const& getPoolIDsByAsset(Asset const& asset) const override; + virtual std::optional> + getOfferRange() const override; + virtual std::streamoff getPageSize() const override { diff --git a/src/bucket/BucketInputIterator.cpp b/src/bucket/BucketInputIterator.cpp index b58494abf2..7a3673b7f4 100644 --- a/src/bucket/BucketInputIterator.cpp +++ b/src/bucket/BucketInputIterator.cpp @@ -52,7 +52,7 @@ BucketInputIterator::loadEntry() } } -size_t +std::streamoff BucketInputIterator::pos() { return mIn.pos(); @@ -124,4 +124,11 @@ BucketInputIterator::operator++() } return *this; } + +void +BucketInputIterator::seek(std::streamoff offset) +{ + mIn.seek(offset); + loadEntry(); +} } diff --git a/src/bucket/BucketInputIterator.h b/src/bucket/BucketInputIterator.h index c9aa891c7d..b9280ecad8 100644 --- a/src/bucket/BucketInputIterator.h +++ b/src/bucket/BucketInputIterator.h @@ -52,7 +52,8 @@ class BucketInputIterator BucketInputIterator& operator++(); - size_t pos(); + std::streamoff pos(); size_t size() const; + void seek(std::streamoff offset); }; } diff --git a/src/catchup/ApplyBucketsWork.cpp b/src/catchup/ApplyBucketsWork.cpp index 44d2471000..2cd2ac4571 100644 --- a/src/catchup/ApplyBucketsWork.cpp +++ b/src/catchup/ApplyBucketsWork.cpp @@ -9,6 +9,7 @@ #include "bucket/BucketManager.h" #include "catchup/AssumeStateWork.h" #include "catchup/CatchupManager.h" +#include "catchup/IndexBucketsWork.h" #include "crypto/Hex.h" #include "crypto/SecretKey.h" #include "historywork/Progress.h" @@ -50,6 +51,32 @@ class TempLedgerVersionSetter : NonMovableOrCopyable } }; +uint32_t +ApplyBucketsWork::startingLevel() +{ + return mApp.getConfig().isUsingBucketListDB() ? 0 + : BucketList::kNumLevels - 1; +} + +bool +ApplyBucketsWork::appliedAllLevels() const +{ + if (mApp.getConfig().isUsingBucketListDB()) + { + return mLevel == BucketList::kNumLevels - 1; + } + else + { + return mLevel == 0; + } +} + +uint32_t +ApplyBucketsWork::nextLevel() const +{ + return mApp.getConfig().isUsingBucketListDB() ? mLevel + 1 : mLevel - 1; +} + ApplyBucketsWork::ApplyBucketsWork( Application& app, std::map> const& buckets, @@ -61,7 +88,7 @@ ApplyBucketsWork::ApplyBucketsWork( , mEntryTypeFilter(onlyApply) , mApplying(false) , mTotalSize(0) - , mLevel(BucketList::kNumLevels - 1) + , mLevel(startingLevel()) , mMaxProtocolVersion(maxProtocolVersion) , mCounters(app.getClock().now()) { @@ -82,7 +109,7 @@ ApplyBucketsWork::getBucketLevel(uint32_t level) return mApp.getBucketManager().getBucketList().getLevel(level); } -std::shared_ptr +std::shared_ptr ApplyBucketsWork::getBucket(std::string const& hash) { auto i = mBuckets.find(hash); @@ -107,9 +134,19 @@ ApplyBucketsWork::doReset() mLastAppliedSizeMb = 0; mLastPos = 0; mMinProtocolVersionSeen = UINT32_MAX; + mSeenKeys.clear(); + mBucketsToIndex.clear(); if (!isAborting()) { + if (mApp.getConfig().isUsingBucketListDB()) + { + // The current size of this set is 1.6 million during BucketApply + // (as of 12/20/23). There's not a great way to estimate this, so + // reserving with some extra wiggle room + mSeenKeys.reserve(2'000'000); + } + // When applying buckets with accounts, we have to make sure that the // root account has been removed. This comes into play, for example, // when applying buckets from genesis the root account already exists. @@ -129,12 +166,17 @@ ApplyBucketsWork::doReset() } } - auto addBucket = [this](std::shared_ptr const& bucket) { + auto addBucket = [this](std::shared_ptr const& bucket) { if (bucket->getSize() > 0) { mTotalBuckets++; mTotalSize += bucket->getSize(); } + + if (mApp.getConfig().isUsingBucketListDB()) + { + mBucketsToIndex.emplace_back(bucket); + } }; for (auto const& hsb : mApplyState.currentBuckets) @@ -153,17 +195,33 @@ ApplyBucketsWork::doReset() mApp.getLedgerTxnRoot().prepareNewObjects(totalLECount); } - mLevel = BucketList::kNumLevels - 1; + mLevel = startingLevel(); mApplying = false; mDelayChecked = false; - mSpawnedAssumeStateWork = false; - mSnapBucket.reset(); - mCurrBucket.reset(); - mSnapApplicator.reset(); - mCurrApplicator.reset(); + mIndexBucketsWork.reset(); + mAssumeStateWork.reset(); + + mFirstBucket.reset(); + mSecondBucket.reset(); + mFirstBucketApplicator.reset(); + mSecondBucketApplicator.reset(); } +// We iterate through the BucketList either in-order (level 0 curr, level 0 +// snap, level 1 curr, etc) when only applying offers, or in reverse order +// (level 9 curr, level 8 snap, level 8 curr, etc) when applying all entry +// types. When only applying offers, we keep track of the keys we have already +// seen, and only apply an entry to the DB if it has not been seen before. This +// allows us to perform a single write to the DB and ensure that only the newest +// version is written. +// +// When applying all entry types, this seen keys set would be too large. Since +// there can be no seen keys set, if we were to apply every entry in order, we +// would overwrite the newest version of an entry with an older version as we +// iterate through the BucketList. Due to this, we iterate in reverse order such +// that the newest version of a key is written last, overwriting the older +// versions. This is much slower due to DB churn. void ApplyBucketsWork::startLevel() { @@ -173,32 +231,56 @@ ApplyBucketsWork::startLevel() CLOG_DEBUG(History, "ApplyBuckets : starting level {}", mLevel); auto& level = getBucketLevel(mLevel); HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel); + bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); - bool applySnap = (i.snap != binToHex(level.getSnap()->getHash())); - bool applyCurr = (i.curr != binToHex(level.getCurr()->getHash())); + bool applyFirst = isUsingBucketListDB + ? (i.curr != binToHex(level.getCurr()->getHash())) + : (i.snap != binToHex(level.getSnap()->getHash())); + bool applySecond = isUsingBucketListDB + ? (i.snap != binToHex(level.getSnap()->getHash())) + : (i.curr != binToHex(level.getCurr()->getHash())); - if (mApplying || applySnap) + if (mApplying || applyFirst) { - mSnapBucket = getBucket(i.snap); + mFirstBucket = getBucket(isUsingBucketListDB ? i.curr : i.snap); mMinProtocolVersionSeen = std::min( - mMinProtocolVersionSeen, Bucket::getBucketVersion(mSnapBucket)); - mSnapApplicator = std::make_unique( + mMinProtocolVersionSeen, Bucket::getBucketVersion(mFirstBucket)); + mFirstBucketApplicator = std::make_unique( mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel, - mSnapBucket, mEntryTypeFilter); - CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}", - mLevel, i.snap); + mFirstBucket, mEntryTypeFilter, mSeenKeys); + + if (isUsingBucketListDB) + { + CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}", + mLevel, i.curr); + } + else + { + CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}", + mLevel, i.snap); + } + mApplying = true; } - if (mApplying || applyCurr) + if (mApplying || applySecond) { - mCurrBucket = getBucket(i.curr); + mSecondBucket = getBucket(isUsingBucketListDB ? i.snap : i.curr); mMinProtocolVersionSeen = std::min( - mMinProtocolVersionSeen, Bucket::getBucketVersion(mCurrBucket)); - mCurrApplicator = std::make_unique( + mMinProtocolVersionSeen, Bucket::getBucketVersion(mSecondBucket)); + mSecondBucketApplicator = std::make_unique( mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel, - mCurrBucket, mEntryTypeFilter); - CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}", - mLevel, i.curr); + mSecondBucket, mEntryTypeFilter, mSeenKeys); + + if (isUsingBucketListDB) + { + CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}", + mLevel, i.snap); + } + else + { + CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}", + mLevel, i.curr); + } mApplying = true; } } @@ -208,8 +290,27 @@ ApplyBucketsWork::doWork() { ZoneScoped; - // Step 1: apply buckets. Step 2: assume state - if (!mSpawnedAssumeStateWork) + // Step 1: index buckets. Step 2: apply buckets. Step 3: assume state + bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); + if (isUsingBucketListDB) + { + if (!mIndexBucketsWork) + { + // Spawn indexing work for the first time + mIndexBucketsWork = addWork(mBucketsToIndex); + return State::WORK_RUNNING; + } + else if (mIndexBucketsWork->getState() != + BasicWork::State::WORK_SUCCESS) + { + // Exit early if indexing work is still running, or failed + return mIndexBucketsWork->getState(); + } + + // Otherwise, continue with next steps + } + + if (!mAssumeStateWork) { if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked) { @@ -232,45 +333,52 @@ ApplyBucketsWork::doWork() } // The structure of these if statements is motivated by the following: - // 1. mCurrApplicator should never be advanced if mSnapApplicator is - // not false. Otherwise it is possible for curr to modify the - // database when the invariants for snap are checked. - // 2. There is no reason to advance mSnapApplicator or mCurrApplicator - // if there is nothing to be applied. - if (mSnapApplicator) + // 1. mSecondBucketApplicator should never be advanced if + // mFirstBucketApplicator is not false. Otherwise it is possible for + // second bucket to modify the database when the invariants for first + // bucket are checked. + // 2. There is no reason to advance mFirstBucketApplicator or + // mSecondBucketApplicator if there is nothing to be applied. + if (mFirstBucketApplicator) { TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion); - if (*mSnapApplicator) + + // When BucketListDB is enabled, we apply in order starting with + // curr. If BucketListDB is not enabled, we iterate in reverse + // starting with snap. + bool isCurr = isUsingBucketListDB; + if (*mFirstBucketApplicator) { - advance("snap", *mSnapApplicator); + advance(isCurr ? "curr" : "snap", *mFirstBucketApplicator); return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mSnapBucket, mApplyState.currentLedger, mLevel, false, + mFirstBucket, mApplyState.currentLedger, mLevel, isCurr, mEntryTypeFilter); - mSnapApplicator.reset(); - mSnapBucket.reset(); + mFirstBucketApplicator.reset(); + mFirstBucket.reset(); mApp.getCatchupManager().bucketsApplied(); } - if (mCurrApplicator) + if (mSecondBucketApplicator) { + bool isCurr = !isUsingBucketListDB; TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion); - if (*mCurrApplicator) + if (*mSecondBucketApplicator) { - advance("curr", *mCurrApplicator); + advance(isCurr ? "curr" : "snap", *mSecondBucketApplicator); return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mCurrBucket, mApplyState.currentLedger, mLevel, true, + mSecondBucket, mApplyState.currentLedger, mLevel, isCurr, mEntryTypeFilter); - mCurrApplicator.reset(); - mCurrBucket.reset(); + mSecondBucketApplicator.reset(); + mSecondBucket.reset(); mApp.getCatchupManager().bucketsApplied(); } - if (mLevel != 0) + if (!appliedAllLevels()) { - --mLevel; + mLevel = nextLevel(); CLOG_DEBUG(History, "ApplyBuckets : starting next level: {}", mLevel); return State::WORK_RUNNING; @@ -279,9 +387,9 @@ ApplyBucketsWork::doWork() CLOG_INFO(History, "ApplyBuckets : done, assuming state"); // After all buckets applied, spawn assumeState work - addWork(mApplyState, mMaxProtocolVersion, - /* restartMerges */ true); - mSpawnedAssumeStateWork = true; + mAssumeStateWork = + addWork(mApplyState, mMaxProtocolVersion, + /* restartMerges */ true); } return checkChildrenStatus(); @@ -333,13 +441,17 @@ ApplyBucketsWork::advance(std::string const& bucketName, bool ApplyBucketsWork::isLevelComplete() { - return !(mApplying) || !(mSnapApplicator || mCurrApplicator); + return !(mApplying) || !(mFirstBucketApplicator || mSecondBucketApplicator); } std::string ApplyBucketsWork::getStatus() const { - if (!mSpawnedAssumeStateWork) + // This status string only applies to step 2 when we actually apply the + // buckets. + bool doneIndexing = !mApp.getConfig().isUsingBucketListDB() || + (mIndexBucketsWork && mIndexBucketsWork->isDone()); + if (doneIndexing && !mSpawnedAssumeStateWork) { auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize); return fmt::format( diff --git a/src/catchup/ApplyBucketsWork.h b/src/catchup/ApplyBucketsWork.h index 09fd8869ce..0851e3d7d4 100644 --- a/src/catchup/ApplyBucketsWork.h +++ b/src/catchup/ApplyBucketsWork.h @@ -5,14 +5,17 @@ #pragma once #include "bucket/BucketApplicator.h" +#include "ledger/LedgerHashUtils.h" #include "work/Work.h" namespace stellar { +class AssumeStateWork; class BucketLevel; class BucketList; class Bucket; +class IndexBucketsWork; struct HistoryArchiveState; struct LedgerHeaderHistoryEntry; @@ -24,6 +27,8 @@ class ApplyBucketsWork : public Work bool mApplying{false}; bool mSpawnedAssumeStateWork{false}; + std::shared_ptr mAssumeStateWork{}; + std::shared_ptr mIndexBucketsWork{}; size_t mTotalBuckets{0}; size_t mAppliedBuckets{0}; size_t mAppliedEntries{0}; @@ -34,21 +39,27 @@ class ApplyBucketsWork : public Work uint32_t mLevel{0}; uint32_t mMaxProtocolVersion{0}; uint32_t mMinProtocolVersionSeen{UINT32_MAX}; - std::shared_ptr mSnapBucket; - std::shared_ptr mCurrBucket; - std::unique_ptr mSnapApplicator; - std::unique_ptr mCurrApplicator; + std::shared_ptr mFirstBucket; + std::shared_ptr mSecondBucket; + std::unique_ptr mFirstBucketApplicator; + std::unique_ptr mSecondBucketApplicator; + std::unordered_set mSeenKeys; + std::vector> mBucketsToIndex; BucketApplicator::Counters mCounters; void advance(std::string const& name, BucketApplicator& applicator); - std::shared_ptr getBucket(std::string const& bucketHash); + std::shared_ptr getBucket(std::string const& bucketHash); BucketLevel& getBucketLevel(uint32_t level); void startLevel(); bool isLevelComplete(); bool mDelayChecked{false}; + uint32_t startingLevel(); + uint32_t nextLevel() const; + bool appliedAllLevels() const; + public: ApplyBucketsWork( Application& app,