diff --git a/src/bucket/Bucket.cpp b/src/bucket/Bucket.cpp index 5dc6ffed64..c170ace285 100644 --- a/src/bucket/Bucket.cpp +++ b/src/bucket/Bucket.cpp @@ -141,15 +141,14 @@ Bucket::apply(Application& app) const { ZoneScoped; - UnorderedSet emptySet; + std::unordered_set emptySet; BucketApplicator applicator( app, app.getConfig().LEDGER_PROTOCOL_VERSION, 0 /*set to 0 so we always load from the parent to check state*/, 0 /*set to a level that's not the bottom so we don't treat live entries as init*/ , - shared_from_this(), [](LedgerEntryType) { return true; }, - /*newOffersOnly=*/app.getConfig().isUsingBucketListDB(), emptySet); + shared_from_this(), [](LedgerEntryType) { return true; }, emptySet); BucketApplicator::Counters counters(app.getClock().now()); while (applicator) { diff --git a/src/bucket/BucketApplicator.cpp b/src/bucket/BucketApplicator.cpp index bdfba04cb7..be2d36b29e 100644 --- a/src/bucket/BucketApplicator.cpp +++ b/src/bucket/BucketApplicator.cpp @@ -22,15 +22,13 @@ BucketApplicator::BucketApplicator(Application& app, uint32_t level, std::shared_ptr bucket, std::function filter, - bool newOffersOnly, - UnorderedSet& seenKeys) + std::unordered_set& seenKeys) : mApp(app) , mMaxProtocolVersion(maxProtocolVersion) , mMinProtocolVersionSeen(minProtocolVersionSeen) , mLevel(level) , mBucketIter(bucket) , mEntryTypeFilter(filter) - , mNewOffersOnly(newOffersOnly) , mSeenKeys(seenKeys) { auto protocolVersion = mBucketIter.getMetadata().ledgerVersion; @@ -42,9 +40,9 @@ BucketApplicator::BucketApplicator(Application& app, protocolVersion, mMaxProtocolVersion)); } - if (newOffersOnly && !bucket->isEmpty()) + // Only apply offers if BucketListDB is enabled + if (mApp.getConfig().isUsingBucketListDB() && !bucket->isEmpty()) { - releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB()); auto offsetOp = bucket->getOfferRange(); if (offsetOp) { @@ -62,8 +60,12 @@ BucketApplicator::BucketApplicator(Application& app, BucketApplicator::operator bool() const { + // There is more work to do (i.e. (bool) *this == true) iff: + // 1. The underlying bucket iterator is not EOF and + // 2. Either BucketListDB is not enabled (so we must apply all entry types) + // or BucketListDB is enabled and we have offers still remaining. return static_cast(mBucketIter) && - (!mNewOffersOnly || mOffersRemaining); + (!mApp.getConfig().isUsingBucketListDB() || mOffersRemaining); } size_t @@ -125,7 +127,8 @@ BucketApplicator::advance(BucketApplicator::Counters& counters) // returns the file offset at the end of the currently loaded entry. // This means we must read until pos is strictly greater than the upper // bound so that we don't skip the last offer in the range. - if (mNewOffersOnly && mBucketIter.pos() > mUpperBoundOffset) + auto isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); + if (isUsingBucketListDB && mBucketIter.pos() > mUpperBoundOffset) { mOffersRemaining = false; break; @@ -136,7 +139,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters) if (shouldApplyEntry(mEntryTypeFilter, e)) { - if (mNewOffersOnly) + if (isUsingBucketListDB) { if (e.type() == LIVEENTRY || e.type() == INITENTRY) { @@ -201,7 +204,7 @@ BucketApplicator::advance(BucketApplicator::Counters& counters) } else { - releaseAssertOrThrow(!mNewOffersOnly); + releaseAssertOrThrow(!isUsingBucketListDB); if (protocolVersionIsBefore( mMinProtocolVersionSeen, Bucket:: diff --git a/src/bucket/BucketApplicator.h b/src/bucket/BucketApplicator.h index 3a8fb459fe..79e74d1dce 100644 --- a/src/bucket/BucketApplicator.h +++ b/src/bucket/BucketApplicator.h @@ -6,6 +6,7 @@ #include "bucket/Bucket.h" #include "bucket/BucketInputIterator.h" +#include "ledger/LedgerHashUtils.h" #include "util/Timer.h" #include "util/XDRStream.h" #include @@ -28,8 +29,7 @@ class BucketApplicator BucketInputIterator mBucketIter; size_t mCount{0}; std::function mEntryTypeFilter; - bool const mNewOffersOnly; - UnorderedSet& mSeenKeys; + std::unordered_set& mSeenKeys; std::streamoff mUpperBoundOffset; bool mOffersRemaining{true}; @@ -76,7 +76,7 @@ class BucketApplicator uint32_t minProtocolVersionSeen, uint32_t level, std::shared_ptr bucket, std::function filter, - bool newOffersOnly, UnorderedSet& seenKeys); + std::unordered_set& seenKeys); operator bool() const; size_t advance(Counters& counters); diff --git a/src/catchup/ApplyBucketsWork.cpp b/src/catchup/ApplyBucketsWork.cpp index 24c8314ef4..0983194821 100644 --- a/src/catchup/ApplyBucketsWork.cpp +++ b/src/catchup/ApplyBucketsWork.cpp @@ -52,15 +52,16 @@ class TempLedgerVersionSetter : NonMovableOrCopyable }; uint32_t -ApplyBucketsWork::startingLevel(bool offersOnly) +ApplyBucketsWork::startingLevel() { - return offersOnly ? 0 : BucketList::kNumLevels - 1; + return mApp.getConfig().isUsingBucketListDB() ? 0 + : BucketList::kNumLevels - 1; } bool ApplyBucketsWork::appliedAllLevels() const { - if (mOffersOnly) + if (mApp.getConfig().isUsingBucketListDB()) { return mLevel == BucketList::kNumLevels - 1; } @@ -73,7 +74,7 @@ ApplyBucketsWork::appliedAllLevels() const uint32_t ApplyBucketsWork::nextLevel() const { - return mOffersOnly ? mLevel + 1 : mLevel - 1; + return mApp.getConfig().isUsingBucketListDB() ? mLevel + 1 : mLevel - 1; } ApplyBucketsWork::ApplyBucketsWork( @@ -87,8 +88,7 @@ ApplyBucketsWork::ApplyBucketsWork( , mEntryTypeFilter(onlyApply) , mApplying(false) , mTotalSize(0) - , mOffersOnly(app.getConfig().isUsingBucketListDB()) - , mLevel(startingLevel(mOffersOnly)) + , mLevel(startingLevel()) , mMaxProtocolVersion(maxProtocolVersion) , mCounters(app.getClock().now()) { @@ -139,7 +139,7 @@ ApplyBucketsWork::doReset() if (!isAborting()) { - if (mOffersOnly) + if (mApp.getConfig().isUsingBucketListDB()) { // The current size of this set is 1.6 million during BucketApply // (as of 12/20/23). There's not a great way to estimate this, so @@ -173,7 +173,7 @@ ApplyBucketsWork::doReset() mTotalSize += bucket->getSize(); } - if (mOffersOnly) + if (mApp.getConfig().isUsingBucketListDB()) { mBucketsToIndex.emplace_back(bucket); } @@ -195,10 +195,12 @@ ApplyBucketsWork::doReset() mApp.getLedgerTxnRoot().prepareNewObjects(totalLECount); } - mLevel = startingLevel(mOffersOnly); + mLevel = startingLevel(); mApplying = false; mDelayChecked = false; - mSpawnedAssumeStateWork = false; + + mIndexBucketsWork.reset(); + mAssumeStateWork.reset(); mFirstBucket.reset(); mSecondBucket.reset(); @@ -229,24 +231,25 @@ ApplyBucketsWork::startLevel() CLOG_DEBUG(History, "ApplyBuckets : starting level {}", mLevel); auto& level = getBucketLevel(mLevel); HistoryStateBucket const& i = mApplyState.currentBuckets.at(mLevel); + bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); - bool applyFirst = mOffersOnly + bool applyFirst = isUsingBucketListDB ? (i.curr != binToHex(level.getCurr()->getHash())) : (i.snap != binToHex(level.getSnap()->getHash())); - bool applySecond = mOffersOnly + bool applySecond = isUsingBucketListDB ? (i.snap != binToHex(level.getSnap()->getHash())) : (i.curr != binToHex(level.getCurr()->getHash())); if (mApplying || applyFirst) { - mFirstBucket = getBucket(mOffersOnly ? i.curr : i.snap); + mFirstBucket = getBucket(isUsingBucketListDB ? i.curr : i.snap); mMinProtocolVersionSeen = std::min( mMinProtocolVersionSeen, Bucket::getBucketVersion(mFirstBucket)); mFirstBucketApplicator = std::make_unique( mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel, - mFirstBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys); + mFirstBucket, mEntryTypeFilter, mSeenKeys); - if (mOffersOnly) + if (isUsingBucketListDB) { CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].curr = {}", mLevel, i.curr); @@ -261,14 +264,14 @@ ApplyBucketsWork::startLevel() } if (mApplying || applySecond) { - mSecondBucket = getBucket(mOffersOnly ? i.snap : i.curr); + mSecondBucket = getBucket(isUsingBucketListDB ? i.snap : i.curr); mMinProtocolVersionSeen = std::min( mMinProtocolVersionSeen, Bucket::getBucketVersion(mSecondBucket)); mSecondBucketApplicator = std::make_unique( mApp, mMaxProtocolVersion, mMinProtocolVersionSeen, mLevel, - mSecondBucket, mEntryTypeFilter, mOffersOnly, mSeenKeys); + mSecondBucket, mEntryTypeFilter, mSeenKeys); - if (mOffersOnly) + if (isUsingBucketListDB) { CLOG_DEBUG(History, "ApplyBuckets : starting level[{}].snap = {}", mLevel, i.snap); @@ -288,34 +291,26 @@ ApplyBucketsWork::doWork() ZoneScoped; // Step 1: index buckets. Step 2: apply buckets. Step 3: assume state - if (!mSpawnedIndexBucketsWork) + bool isUsingBucketListDB = mApp.getConfig().isUsingBucketListDB(); + if (isUsingBucketListDB) { - if (mOffersOnly) - { - addWork(mBucketsToIndex); - } - else + if (!mIndexBucketsWork) { - mFinishedIndexBucketsWork = true; + mIndexBucketsWork = addWork(mBucketsToIndex); } - mSpawnedIndexBucketsWork = true; - } - - if (!mFinishedIndexBucketsWork) - { + // If indexing has not finished or finished and failed, return result + // status. If indexing finished and succeeded, move on and spawn assume + // state work. auto status = checkChildrenStatus(); - if (status == BasicWork::State::WORK_SUCCESS) - { - mFinishedIndexBucketsWork = true; - } - else + if (!mIndexBucketsWork->isDone() || + status != BasicWork::State::WORK_SUCCESS) { return status; } } - if (!mSpawnedAssumeStateWork) + if (!mAssumeStateWork) { if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked) { @@ -347,30 +342,35 @@ ApplyBucketsWork::doWork() if (mFirstBucketApplicator) { TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion); + + // When BucketListDB is enabled, we apply in order starting with + // curr. If BucketListDB is not enabled, we iterate in reverse + // starting with snap. + bool isCurr = isUsingBucketListDB; if (*mFirstBucketApplicator) { - advance(mOffersOnly ? "curr" : "snap", *mFirstBucketApplicator); + advance(isCurr ? "curr" : "snap", *mFirstBucketApplicator); return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mFirstBucket, mApplyState.currentLedger, mLevel, - /*isCurr=*/mOffersOnly, mEntryTypeFilter); + mFirstBucket, mApplyState.currentLedger, mLevel, isCurr, + mEntryTypeFilter); mFirstBucketApplicator.reset(); mFirstBucket.reset(); mApp.getCatchupManager().bucketsApplied(); } if (mSecondBucketApplicator) { + bool isCurr = !isUsingBucketListDB; TempLedgerVersionSetter tlvs(mApp, mMaxProtocolVersion); if (*mSecondBucketApplicator) { - advance(mOffersOnly ? "snap" : "curr", - *mSecondBucketApplicator); + advance(isCurr ? "curr" : "snap", *mSecondBucketApplicator); return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mSecondBucket, mApplyState.currentLedger, mLevel, - /*isCurr=*/!mOffersOnly, mEntryTypeFilter); + mSecondBucket, mApplyState.currentLedger, mLevel, isCurr, + mEntryTypeFilter); mSecondBucketApplicator.reset(); mSecondBucket.reset(); mApp.getCatchupManager().bucketsApplied(); @@ -387,9 +387,9 @@ ApplyBucketsWork::doWork() CLOG_INFO(History, "ApplyBuckets : done, assuming state"); // After all buckets applied, spawn assumeState work - addWork(mApplyState, mMaxProtocolVersion, - /* restartMerges */ true); - mSpawnedAssumeStateWork = true; + mAssumeStateWork = + addWork(mApplyState, mMaxProtocolVersion, + /* restartMerges */ true); } return checkChildrenStatus(); @@ -449,7 +449,9 @@ ApplyBucketsWork::getStatus() const { // This status string only applies to step 2 when we actually apply the // buckets. - if (mFinishedIndexBucketsWork && !mSpawnedAssumeStateWork) + bool doneIndexing = !mApp.getConfig().isUsingBucketListDB() || + (mIndexBucketsWork && mIndexBucketsWork->isDone()); + if (doneIndexing && !mSpawnedAssumeStateWork) { auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize); return fmt::format( diff --git a/src/catchup/ApplyBucketsWork.h b/src/catchup/ApplyBucketsWork.h index 10c211436d..0851e3d7d4 100644 --- a/src/catchup/ApplyBucketsWork.h +++ b/src/catchup/ApplyBucketsWork.h @@ -5,14 +5,17 @@ #pragma once #include "bucket/BucketApplicator.h" +#include "ledger/LedgerHashUtils.h" #include "work/Work.h" namespace stellar { +class AssumeStateWork; class BucketLevel; class BucketList; class Bucket; +class IndexBucketsWork; struct HistoryArchiveState; struct LedgerHeaderHistoryEntry; @@ -24,8 +27,8 @@ class ApplyBucketsWork : public Work bool mApplying{false}; bool mSpawnedAssumeStateWork{false}; - bool mSpawnedIndexBucketsWork{false}; - bool mFinishedIndexBucketsWork{false}; + std::shared_ptr mAssumeStateWork{}; + std::shared_ptr mIndexBucketsWork{}; size_t mTotalBuckets{0}; size_t mAppliedBuckets{0}; size_t mAppliedEntries{0}; @@ -33,7 +36,6 @@ class ApplyBucketsWork : public Work size_t mAppliedSize{0}; size_t mLastAppliedSizeMb{0}; size_t mLastPos{0}; - bool const mOffersOnly; uint32_t mLevel{0}; uint32_t mMaxProtocolVersion{0}; uint32_t mMinProtocolVersionSeen{UINT32_MAX}; @@ -41,7 +43,7 @@ class ApplyBucketsWork : public Work std::shared_ptr mSecondBucket; std::unique_ptr mFirstBucketApplicator; std::unique_ptr mSecondBucketApplicator; - UnorderedSet mSeenKeys; + std::unordered_set mSeenKeys; std::vector> mBucketsToIndex; BucketApplicator::Counters mCounters; @@ -54,7 +56,7 @@ class ApplyBucketsWork : public Work bool mDelayChecked{false}; - static uint32_t startingLevel(bool offersOnly); + uint32_t startingLevel(); uint32_t nextLevel() const; bool appliedAllLevels() const;