diff --git a/src/bucket/BucketApplicator.cpp b/src/bucket/BucketApplicator.cpp index 3bdfb4929c..d93cf63a1a 100644 --- a/src/bucket/BucketApplicator.cpp +++ b/src/bucket/BucketApplicator.cpp @@ -23,7 +23,7 @@ BucketApplicator::BucketApplicator(Application& app, std::shared_ptr bucket, std::function filter, bool newOffersOnly, - UnorderedSet seenKeys) + UnorderedSet& seenKeys) : mApp(app) , mMaxProtocolVersion(maxProtocolVersion) , mMinProtocolVersionSeen(minProtocolVersionSeen) @@ -42,7 +42,7 @@ BucketApplicator::BucketApplicator(Application& app, protocolVersion, mMaxProtocolVersion)); } - if (newOffersOnly) + if (newOffersOnly && !bucket->isEmpty()) { releaseAssertOrThrow(mApp.getConfig().isUsingBucketListDB()); auto [lowOffset, highOffset] = bucket->getOfferRange(); diff --git a/src/bucket/BucketApplicator.h b/src/bucket/BucketApplicator.h index b2feb53291..3a8fb459fe 100644 --- a/src/bucket/BucketApplicator.h +++ b/src/bucket/BucketApplicator.h @@ -76,7 +76,7 @@ class BucketApplicator uint32_t minProtocolVersionSeen, uint32_t level, std::shared_ptr bucket, std::function filter, - bool newOffersOnly, UnorderedSet seenKeys); + bool newOffersOnly, UnorderedSet& seenKeys); operator bool() const; size_t advance(Counters& counters); diff --git a/src/catchup/ApplyBucketsWork.cpp b/src/catchup/ApplyBucketsWork.cpp index 174f59e866..4ba0926b6b 100644 --- a/src/catchup/ApplyBucketsWork.cpp +++ b/src/catchup/ApplyBucketsWork.cpp @@ -9,6 +9,7 @@ #include "bucket/BucketManager.h" #include "catchup/AssumeStateWork.h" #include "catchup/CatchupManager.h" +#include "catchup/IndexBucketsWork.h" #include "crypto/Hex.h" #include "crypto/SecretKey.h" #include "historywork/Progress.h" @@ -108,7 +109,7 @@ ApplyBucketsWork::getBucketLevel(uint32_t level) return mApp.getBucketManager().getBucketList().getLevel(level); } -std::shared_ptr +std::shared_ptr ApplyBucketsWork::getBucket(std::string const& hash) { auto i = mBuckets.find(hash); @@ -134,17 +135,18 @@ ApplyBucketsWork::doReset() mLastPos = 0; mMinProtocolVersionSeen = UINT32_MAX; mSeenKeys.clear(); - - if (mOffersOnly) - { - // The current size of this set is 1.6 million during BucketApply (as of - // 12/20/23). There's not a great way to estimate this, so reserving - // with some extra wiggle room - mSeenKeys.reserve(2'000'000); - } + mBucketsToIndex.clear(); if (!isAborting()) { + if (mOffersOnly) + { + // The current size of this set is 1.6 million during BucketApply + // (as of 12/20/23). There's not a great way to estimate this, so + // reserving with some extra wiggle room + mSeenKeys.reserve(2'000'000); + } + // When applying buckets with accounts, we have to make sure that the // root account has been removed. This comes into play, for example, // when applying buckets from genesis the root account already exists. @@ -164,12 +166,17 @@ ApplyBucketsWork::doReset() } } - auto addBucket = [this](std::shared_ptr const& bucket) { + auto addBucket = [this](std::shared_ptr const& bucket) { if (bucket->getSize() > 0) { mTotalBuckets++; mTotalSize += bucket->getSize(); } + + if (mOffersOnly) + { + mBucketsToIndex.emplace_back(bucket); + } }; for (auto const& hsb : mApplyState.currentBuckets) @@ -280,7 +287,34 @@ ApplyBucketsWork::doWork() { ZoneScoped; - // Step 1: apply buckets. Step 2: assume state + // Step 1: index buckets. Step 2: apply buckets. Step 3: assume state + if (!mSpawnedIndexBucketsWork) + { + if (mOffersOnly) + { + addWork(mBucketsToIndex); + } + else + { + mFinishedIndexBucketsWork = true; + } + + mSpawnedIndexBucketsWork = true; + } + + if (!mFinishedIndexBucketsWork) + { + auto status = checkChildrenStatus(); + if (status == BasicWork::State::WORK_SUCCESS) + { + mFinishedIndexBucketsWork = true; + } + else + { + return status; + } + } + if (!mSpawnedAssumeStateWork) { if (mApp.getLedgerManager().rebuildingInMemoryState() && !mDelayChecked) @@ -319,8 +353,8 @@ ApplyBucketsWork::doWork() return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mFirstBucket, mApplyState.currentLedger, mLevel, false, - mEntryTypeFilter); + mFirstBucket, mApplyState.currentLedger, mLevel, + /*isCurr=*/mOffersOnly, mEntryTypeFilter); mFirstBucketApplicator.reset(); mFirstBucket.reset(); mApp.getCatchupManager().bucketsApplied(); @@ -335,8 +369,8 @@ ApplyBucketsWork::doWork() return State::WORK_RUNNING; } mApp.getInvariantManager().checkOnBucketApply( - mSecondBucket, mApplyState.currentLedger, mLevel, true, - mEntryTypeFilter); + mSecondBucket, mApplyState.currentLedger, mLevel, + /*isCurr=*/!mOffersOnly, mEntryTypeFilter); mSecondBucketApplicator.reset(); mSecondBucket.reset(); mApp.getCatchupManager().bucketsApplied(); @@ -412,7 +446,9 @@ ApplyBucketsWork::isLevelComplete() std::string ApplyBucketsWork::getStatus() const { - if (!mSpawnedAssumeStateWork) + // This status string only applies to step 2 when we actually apply the + // buckets. + if (mFinishedIndexBucketsWork && !mSpawnedAssumeStateWork) { auto size = mTotalSize == 0 ? 0 : (100 * mAppliedSize / mTotalSize); return fmt::format( diff --git a/src/catchup/ApplyBucketsWork.h b/src/catchup/ApplyBucketsWork.h index 6af520df4f..10c211436d 100644 --- a/src/catchup/ApplyBucketsWork.h +++ b/src/catchup/ApplyBucketsWork.h @@ -24,6 +24,8 @@ class ApplyBucketsWork : public Work bool mApplying{false}; bool mSpawnedAssumeStateWork{false}; + bool mSpawnedIndexBucketsWork{false}; + bool mFinishedIndexBucketsWork{false}; size_t mTotalBuckets{0}; size_t mAppliedBuckets{0}; size_t mAppliedEntries{0}; @@ -40,11 +42,12 @@ class ApplyBucketsWork : public Work std::unique_ptr mFirstBucketApplicator; std::unique_ptr mSecondBucketApplicator; UnorderedSet mSeenKeys; + std::vector> mBucketsToIndex; BucketApplicator::Counters mCounters; void advance(std::string const& name, BucketApplicator& applicator); - std::shared_ptr getBucket(std::string const& bucketHash); + std::shared_ptr getBucket(std::string const& bucketHash); BucketLevel& getBucketLevel(uint32_t level); void startLevel(); bool isLevelComplete();